00001 #include "ace_pch.h"
00002
00003
00004 #include "ace/WFMO_Reactor.h"
00005
00006 #if defined (ACE_WIN32)
00007
00008 #include "ace/Handle_Set.h"
00009 #include "ace/Timer_Heap.h"
00010 #include "ace/Thread.h"
00011
00012 #if !defined (__ACE_INLINE__)
00013 #include "ace/WFMO_Reactor.i"
00014 #endif
00015
00016 ACE_RCSID(ace, WFMO_Reactor, "$Id: WFMO_Reactor.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:24 chad Exp $")
00017
00018 #include "ace/Auto_Ptr.h"
00019
00020 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
00021 : wfmo_reactor_ (wfmo_reactor)
00022 {
00023 }
00024
00025 int
00026 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
00027 {
00028 if (size > MAXIMUM_WAIT_OBJECTS)
00029 ACE_ERROR_RETURN ((LM_ERROR,
00030 ACE_LIB_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
00031 size,
00032 MAXIMUM_WAIT_OBJECTS),
00033 -1);
00034
00035
00036 ACE_NEW_RETURN (this->current_handles_,
00037 ACE_HANDLE[size],
00038 -1);
00039 ACE_NEW_RETURN (this->current_info_,
00040 Current_Info[size],
00041 -1);
00042 ACE_NEW_RETURN (this->current_suspended_info_,
00043 Suspended_Info[size],
00044 -1);
00045 ACE_NEW_RETURN (this->to_be_added_info_,
00046 To_Be_Added_Info[size],
00047 -1);
00048
00049
00050 this->max_size_ = size;
00051 this->max_handlep1_ = 0;
00052 this->suspended_handles_ = 0;
00053 this->handles_to_be_added_ = 0;
00054 this->handles_to_be_deleted_ = 0;
00055 this->handles_to_be_suspended_ = 0;
00056 this->handles_to_be_resumed_ = 0;
00057
00058 for (size_t i = 0; i < size; i++)
00059 this->current_handles_[i] = ACE_INVALID_HANDLE;
00060
00061 return 0;
00062 }
00063
00064 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
00065 {
00066
00067 delete [] this->current_handles_;
00068 delete [] this->current_info_;
00069 delete [] this->current_suspended_info_;
00070 delete [] this->to_be_added_info_;
00071 }
00072
00073 ACE_Reactor_Mask
00074 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
00075 ACE_Reactor_Mask change_masks,
00076 int operation)
00077 {
00078
00079
00080
00081 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
00082
00083 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
00084 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
00085 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
00086
00087 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
00088 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
00089
00090 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
00091 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
00092
00093 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
00094 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
00095
00096 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
00097 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
00098
00099 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
00100 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
00101
00102 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
00103 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
00104
00105 switch (operation)
00106 {
00107 case ACE_Reactor::CLR_MASK:
00108
00109
00110 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00111 {
00112 ACE_CLR_BITS (existing_masks, FD_READ);
00113 ACE_CLR_BITS (existing_masks, FD_CLOSE);
00114 }
00115
00116 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00117 ACE_CLR_BITS (existing_masks, FD_WRITE);
00118
00119 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00120 ACE_CLR_BITS (existing_masks, FD_OOB);
00121
00122 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00123 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
00124
00125 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00126 ACE_CLR_BITS (existing_masks, FD_CONNECT);
00127
00128 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00129 ACE_CLR_BITS (existing_masks, FD_QOS);
00130
00131 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00132 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
00133
00134 break;
00135
00136 case ACE_Reactor::SET_MASK:
00137
00138
00139 existing_masks = 0;
00140
00141
00142 case ACE_Reactor::ADD_MASK:
00143
00144
00145
00146 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
00147 {
00148 ACE_SET_BITS (existing_masks, FD_READ);
00149 ACE_SET_BITS (existing_masks, FD_CLOSE);
00150 }
00151
00152 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
00153 ACE_SET_BITS (existing_masks, FD_WRITE);
00154
00155 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
00156 ACE_SET_BITS (existing_masks, FD_OOB);
00157
00158 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
00159 ACE_SET_BITS (existing_masks, FD_ACCEPT);
00160
00161 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
00162 ACE_SET_BITS (existing_masks, FD_CONNECT);
00163
00164 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
00165 ACE_SET_BITS (existing_masks, FD_QOS);
00166
00167 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
00168 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
00169
00170 break;
00171
00172 case ACE_Reactor::GET_MASK:
00173
00174
00175
00176
00177 ACE_UNUSED_ARG (change_masks);
00178
00179 break;
00180 }
00181
00182 return old_masks;
00183 }
00184
00185 int
00186 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
00187 ACE_Reactor_Mask mask,
00188 int &changes_required)
00189 {
00190 int error = 0;
00191
00192
00193
00194 size_t original_handle_count = this->handles_to_be_deleted_;
00195 int result = 0;
00196 size_t i;
00197
00198
00199
00200
00201
00202
00203 for (i = 0; i < this->max_handlep1_ && error == 0; i++)
00204
00205
00206 if ((this->current_handles_[i] == handle
00207 || this->current_info_[i].io_handle_ == handle)
00208 &&
00209 !this->current_info_[i].delete_entry_)
00210 {
00211 result = this->remove_handler_i (i,
00212 mask);
00213 if (result == -1)
00214 error = 1;
00215 }
00216
00217
00218 for (i = 0; i < this->suspended_handles_ && error == 0; i++)
00219
00220
00221 if ((this->current_suspended_info_[i].io_handle_ == handle
00222 || this->current_suspended_info_[i].event_handle_ == handle)
00223 &&
00224
00225 !this->current_suspended_info_[i].delete_entry_)
00226 {
00227 result = this->remove_suspended_handler_i (i,
00228 mask);
00229 if (result == -1)
00230 error = 1;
00231 }
00232
00233
00234 for (i = 0; i < this->handles_to_be_added_ && error == 0; i++)
00235
00236
00237 if ((this->to_be_added_info_[i].io_handle_ == handle
00238 || this->to_be_added_info_[i].event_handle_ == handle)
00239 &&
00240
00241 !this->to_be_added_info_[i].delete_entry_)
00242 {
00243 result = this->remove_to_be_added_handler_i (i,
00244 mask);
00245 if (result == -1)
00246 error = 1;
00247 }
00248
00249
00250
00251 if (original_handle_count < this->handles_to_be_deleted_)
00252 changes_required = 1;
00253
00254 return error ? -1 : 0;
00255 }
00256
00257 int
00258 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
00259 ACE_Reactor_Mask to_be_removed_masks)
00260 {
00261
00262 if (this->current_info_[slot].io_entry_)
00263 {
00264
00265
00266 this->bit_ops (this->current_info_[slot].network_events_,
00267 to_be_removed_masks,
00268 ACE_Reactor::CLR_MASK);
00269
00270
00271
00272
00273
00274
00275 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
00276 this->current_handles_[slot],
00277 this->current_info_[slot].network_events_);
00278 }
00279
00280 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00281
00282 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00283 else
00284
00285 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00286
00287
00288
00289 if (this->current_info_[slot].suspend_entry_)
00290 {
00291
00292 this->current_info_[slot].suspend_entry_ = 0;
00293
00294 this->handles_to_be_suspended_--;
00295 }
00296
00297
00298
00299
00300 if (this->current_info_[slot].network_events_ == 0)
00301 {
00302
00303 this->current_info_[slot].delete_entry_ = 1;
00304
00305 this->current_info_[slot].close_masks_ = to_be_removed_masks;
00306
00307 this->handles_to_be_deleted_++;
00308 }
00309
00310
00311
00312
00313
00314
00315 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00316 {
00317 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
00318 this->current_info_[slot].event_handler_->handle_close (handle,
00319 to_be_removed_masks);
00320 }
00321
00322 return 0;
00323 }
00324
00325 int
00326 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
00327 ACE_Reactor_Mask to_be_removed_masks)
00328 {
00329
00330 if (this->current_suspended_info_[slot].io_entry_)
00331 {
00332
00333
00334 this->bit_ops (this->current_suspended_info_[slot].network_events_,
00335 to_be_removed_masks,
00336 ACE_Reactor::CLR_MASK);
00337
00338
00339
00340
00341
00342
00343 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
00344 this->current_suspended_info_[slot].event_handle_,
00345 this->current_suspended_info_[slot].network_events_);
00346 }
00347
00348 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00349
00350 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00351 else
00352
00353 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00354
00355
00356
00357 if (this->current_suspended_info_[slot].resume_entry_)
00358 {
00359
00360 this->current_suspended_info_[slot].resume_entry_ = 0;
00361
00362 this->handles_to_be_resumed_--;
00363 }
00364
00365
00366
00367
00368 if (this->current_suspended_info_[slot].network_events_ == 0)
00369 {
00370
00371 this->current_suspended_info_[slot].delete_entry_ = 1;
00372
00373 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
00374
00375 this->handles_to_be_deleted_++;
00376 }
00377
00378
00379
00380
00381
00382 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00383 {
00384 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
00385 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
00386 to_be_removed_masks);
00387 }
00388
00389 return 0;
00390 }
00391
00392 int
00393 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
00394 ACE_Reactor_Mask to_be_removed_masks)
00395 {
00396
00397 if (this->to_be_added_info_[slot].io_entry_)
00398 {
00399
00400
00401 this->bit_ops (this->to_be_added_info_[slot].network_events_,
00402 to_be_removed_masks,
00403 ACE_Reactor::CLR_MASK);
00404
00405
00406
00407
00408
00409
00410 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
00411 this->to_be_added_info_[slot].event_handle_,
00412 this->to_be_added_info_[slot].network_events_);
00413 }
00414
00415 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
00416
00417 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
00418 else
00419
00420 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
00421
00422
00423
00424 if (this->to_be_added_info_[slot].suspend_entry_)
00425 {
00426
00427 this->to_be_added_info_[slot].suspend_entry_ = 0;
00428
00429 this->handles_to_be_suspended_--;
00430 }
00431
00432
00433
00434
00435 if (this->to_be_added_info_[slot].network_events_ == 0)
00436 {
00437
00438 this->to_be_added_info_[slot].delete_entry_ = 1;
00439
00440 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
00441
00442 this->handles_to_be_deleted_++;
00443 }
00444
00445
00446
00447
00448
00449 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
00450 {
00451 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
00452 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
00453 to_be_removed_masks);
00454 }
00455
00456 return 0;
00457 }
00458
00459 int
00460 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
00461 int &changes_required)
00462 {
00463 size_t i = 0;
00464
00465
00466
00467
00468
00469
00470 for (i = 0; i < this->max_handlep1_; i++)
00471
00472
00473 if ((this->current_handles_[i] == handle ||
00474 this->current_info_[i].io_handle_ == handle) &&
00475
00476 !this->current_info_[i].suspend_entry_)
00477 {
00478
00479 this->current_info_[i].suspend_entry_ = 1;
00480
00481 this->handles_to_be_suspended_++;
00482
00483 changes_required = 1;
00484 }
00485
00486
00487 for (i = 0; i < this->suspended_handles_; i++)
00488
00489
00490 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00491 this->current_suspended_info_[i].io_handle_ == handle) &&
00492
00493 this->current_suspended_info_[i].resume_entry_)
00494 {
00495
00496 this->current_suspended_info_[i].resume_entry_ = 0;
00497
00498 this->handles_to_be_resumed_--;
00499
00500 changes_required = 1;
00501 }
00502
00503
00504 for (i = 0; i < this->handles_to_be_added_; i++)
00505
00506
00507 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00508 this->to_be_added_info_[i].event_handle_ == handle) &&
00509
00510 !this->to_be_added_info_[i].suspend_entry_)
00511 {
00512
00513 this->to_be_added_info_[i].suspend_entry_ = 1;
00514
00515 this->handles_to_be_suspended_++;
00516
00517 changes_required = 1;
00518 }
00519
00520 return 0;
00521 }
00522
00523 int
00524 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
00525 int &changes_required)
00526 {
00527 size_t i = 0;
00528
00529
00530
00531
00532
00533
00534 for (i = 0; i < this->max_handlep1_; i++)
00535
00536
00537 if ((this->current_handles_[i] == handle ||
00538 this->current_info_[i].io_handle_ == handle) &&
00539
00540 this->current_info_[i].suspend_entry_)
00541 {
00542
00543 this->current_info_[i].suspend_entry_ = 0;
00544
00545 this->handles_to_be_suspended_--;
00546
00547 changes_required = 1;
00548 }
00549
00550
00551 for (i = 0; i < this->suspended_handles_; i++)
00552
00553
00554 if ((this->current_suspended_info_[i].event_handle_ == handle ||
00555 this->current_suspended_info_[i].io_handle_ == handle) &&
00556
00557 !this->current_suspended_info_[i].resume_entry_)
00558 {
00559
00560 this->current_suspended_info_[i].resume_entry_ = 1;
00561
00562 this->handles_to_be_resumed_++;
00563
00564 changes_required = 1;
00565 }
00566
00567
00568 for (i = 0; i < this->handles_to_be_added_; i++)
00569
00570
00571 if ((this->to_be_added_info_[i].io_handle_ == handle ||
00572 this->to_be_added_info_[i].event_handle_ == handle) &&
00573
00574 this->to_be_added_info_[i].suspend_entry_)
00575 {
00576
00577 this->to_be_added_info_[i].suspend_entry_ = 0;
00578
00579 this->handles_to_be_suspended_--;
00580
00581 changes_required = 1;
00582 }
00583
00584 return 0;
00585 }
00586
00587 void
00588 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
00589 {
00590 {
00591 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
00592
00593 int dummy;
00594 size_t i;
00595
00596
00597 for (i = 0; i < this->max_handlep1_; i++)
00598 this->unbind_i (this->current_handles_[i],
00599 ACE_Event_Handler::ALL_EVENTS_MASK,
00600 dummy);
00601
00602
00603 for (i = 0; i < this->suspended_handles_; i++)
00604 this->unbind_i (this->current_suspended_info_[i].event_handle_,
00605 ACE_Event_Handler::ALL_EVENTS_MASK,
00606 dummy);
00607
00608
00609 for (i = 0; i < this->handles_to_be_added_; i++)
00610 this->unbind_i (this->to_be_added_info_[i].event_handle_,
00611 ACE_Event_Handler::ALL_EVENTS_MASK,
00612 dummy);
00613
00614 }
00615
00616
00617
00618
00619
00620 this->wfmo_reactor_.wakeup_all_threads ();
00621 }
00622
00623 int
00624 ACE_WFMO_Reactor_Handler_Repository::bind_i (int io_entry,
00625 ACE_Event_Handler *event_handler,
00626 long network_events,
00627 ACE_HANDLE io_handle,
00628 ACE_HANDLE event_handle,
00629 int delete_event)
00630 {
00631
00632 if (event_handle == ACE_INVALID_HANDLE)
00633 event_handle = event_handler->get_handle ();
00634 if (this->invalid_handle (event_handle))
00635 return -1;
00636
00637 size_t current_size = this->max_handlep1_ +
00638 this->handles_to_be_added_ -
00639 this->handles_to_be_deleted_ +
00640 this->suspended_handles_;
00641
00642
00643 if (current_size < this->max_size_)
00644 {
00645
00646
00647 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
00648 io_entry,
00649 event_handler,
00650 io_handle,
00651 network_events,
00652 delete_event);
00653
00654 this->handles_to_be_added_++;
00655
00656
00657
00658 this->wfmo_reactor_.wakeup_all_threads ();
00659 }
00660 else
00661 {
00662 errno = EMFILE;
00663 return -1;
00664 }
00665
00666 return 0;
00667 }
00668
00669 int
00670 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
00671 {
00672
00673
00674 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
00675 {
00676 for (size_t i = 0; i < this->max_handlep1_; i++)
00677 {
00678
00679
00680
00681
00682 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00683 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00684 ACE_Event_Handler *event_handler = 0;
00685
00686
00687 if (this->current_info_[i].delete_entry_)
00688 {
00689
00690
00691
00692
00693
00694
00695
00696 masks = this->current_info_[i].close_masks_;
00697 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00698 {
00699
00700 if (this->current_info_[i].io_entry_)
00701 handle = this->current_info_[i].io_handle_;
00702 else
00703 handle = this->current_handles_[i];
00704
00705
00706 event_handler = this->current_info_[i].event_handler_;
00707 }
00708
00709
00710 if (this->current_info_[i].delete_event_)
00711 ACE_OS::event_destroy (&this->current_handles_[i]);
00712
00713
00714 this->handles_to_be_deleted_--;
00715 }
00716
00717
00718 else if (this->current_info_[i].suspend_entry_)
00719 {
00720 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
00721 this->current_info_[i]);
00722
00723 this->suspended_handles_++;
00724
00725
00726 this->handles_to_be_suspended_--;
00727 }
00728
00729
00730
00731 if (this->current_info_[i].delete_entry_ ||
00732 this->current_info_[i].suspend_entry_ )
00733 {
00734 size_t last_valid_slot = this->max_handlep1_ - 1;
00735
00736
00737 if (i < last_valid_slot)
00738
00739 {
00740
00741 this->current_info_[i] =
00742 this->current_info_[last_valid_slot];
00743 this->current_handles_[i] =
00744 this->current_handles_[last_valid_slot];
00745 }
00746
00747 this->current_info_[last_valid_slot].reset ();
00748 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
00749 this->max_handlep1_--;
00750 }
00751
00752
00753
00754 if (event_handler != 0)
00755 event_handler->handle_close (handle, masks);
00756 }
00757 }
00758
00759 return 0;
00760 }
00761
00762 int
00763 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
00764 {
00765
00766 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
00767 {
00768 for (size_t i = 0; i < this->suspended_handles_; i++)
00769 {
00770
00771
00772
00773
00774 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00775 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00776 ACE_Event_Handler *event_handler = 0;
00777
00778
00779 if (this->current_suspended_info_[i].delete_entry_)
00780 {
00781
00782
00783
00784
00785
00786
00787
00788 masks = this->current_suspended_info_[i].close_masks_;
00789 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00790 {
00791
00792 if (this->current_suspended_info_[i].io_entry_)
00793 handle = this->current_suspended_info_[i].io_handle_;
00794 else
00795 handle = this->current_suspended_info_[i].event_handle_;
00796
00797
00798 event_handler = this->current_suspended_info_[i].event_handler_;
00799 }
00800
00801
00802 if (this->current_suspended_info_[i].delete_event_)
00803 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
00804
00805
00806 this->handles_to_be_deleted_--;
00807 }
00808
00809 else if (this->current_suspended_info_[i].resume_entry_)
00810 {
00811
00812 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
00813
00814 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
00815 this->max_handlep1_++;
00816
00817
00818 this->handles_to_be_resumed_--;
00819 }
00820
00821
00822
00823
00824 if (this->current_suspended_info_[i].resume_entry_ ||
00825 this->current_suspended_info_[i].delete_entry_)
00826 {
00827 size_t last_valid_slot = this->suspended_handles_ - 1;
00828
00829
00830
00831
00832
00833 if (i < last_valid_slot)
00834
00835 this->current_suspended_info_[i] =
00836 this->current_suspended_info_[last_valid_slot];
00837 this->current_suspended_info_[last_valid_slot].reset ();
00838 this->suspended_handles_--;
00839 }
00840
00841
00842
00843 if (event_handler != 0)
00844 event_handler->handle_close (handle, masks);
00845 }
00846 }
00847
00848 return 0;
00849 }
00850
00851 int
00852 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
00853 {
00854
00855 for (size_t i = 0; i < this->handles_to_be_added_; i++)
00856 {
00857
00858
00859
00860
00861 ACE_HANDLE handle = ACE_INVALID_HANDLE;
00862 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
00863 ACE_Event_Handler *event_handler = 0;
00864
00865
00866 if (this->to_be_added_info_[i].delete_entry_)
00867 {
00868
00869
00870
00871
00872
00873
00874
00875 masks = this->to_be_added_info_[i].close_masks_;
00876 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
00877 {
00878
00879 if (this->to_be_added_info_[i].io_entry_)
00880 handle = this->to_be_added_info_[i].io_handle_;
00881 else
00882 handle = this->to_be_added_info_[i].event_handle_;
00883
00884
00885 event_handler = this->to_be_added_info_[i].event_handler_;
00886 }
00887
00888
00889 if (this->to_be_added_info_[i].delete_event_)
00890 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
00891
00892
00893 this->handles_to_be_deleted_--;
00894 }
00895
00896
00897 else if (this->to_be_added_info_[i].suspend_entry_)
00898 {
00899 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
00900 this->to_be_added_info_[i]);
00901
00902 this->suspended_handles_++;
00903
00904
00905 this->handles_to_be_suspended_--;
00906 }
00907
00908
00909 else
00910 {
00911
00912 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
00913
00914 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
00915 this->max_handlep1_++;
00916 }
00917
00918
00919 this->to_be_added_info_[i].reset ();
00920
00921
00922
00923 if (event_handler != 0)
00924 event_handler->handle_close (handle, masks);
00925 }
00926
00927
00928
00929 this->handles_to_be_added_ = 0;
00930
00931 return 0;
00932 }
00933
00934 void
00935 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
00936 {
00937 size_t i = 0;
00938
00939 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
00940
00941 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00942
00943 ACE_DEBUG ((LM_DEBUG,
00944 ACE_LIB_TEXT ("Max size = %d\n"),
00945 this->max_size_));
00946
00947 ACE_DEBUG ((LM_DEBUG,
00948 ACE_LIB_TEXT ("Current info table\n\n")));
00949 ACE_DEBUG ((LM_DEBUG,
00950 ACE_LIB_TEXT ("\tSize = %d\n"),
00951 this->max_handlep1_));
00952 ACE_DEBUG ((LM_DEBUG,
00953 ACE_LIB_TEXT ("\tHandles to be suspended = %d\n"),
00954 this->handles_to_be_suspended_));
00955
00956 for (i = 0; i < this->max_handlep1_; i++)
00957 this->current_info_[i].dump (this->current_handles_[i]);
00958
00959 ACE_DEBUG ((LM_DEBUG,
00960 ACE_LIB_TEXT ("\n")));
00961
00962 ACE_DEBUG ((LM_DEBUG,
00963 ACE_LIB_TEXT ("To-be-added info table\n\n")));
00964 ACE_DEBUG ((LM_DEBUG,
00965 ACE_LIB_TEXT ("\tSize = %d\n"),
00966 this->handles_to_be_added_));
00967
00968 for (i = 0; i < this->handles_to_be_added_; i++)
00969 this->to_be_added_info_[i].dump ();
00970
00971 ACE_DEBUG ((LM_DEBUG,
00972 ACE_LIB_TEXT ("\n")));
00973
00974 ACE_DEBUG ((LM_DEBUG,
00975 ACE_LIB_TEXT ("Suspended info table\n\n")));
00976 ACE_DEBUG ((LM_DEBUG,
00977 ACE_LIB_TEXT ("\tSize = %d\n"),
00978 this->suspended_handles_));
00979 ACE_DEBUG ((LM_DEBUG,
00980 ACE_LIB_TEXT ("\tHandles to be resumed = %d\n"),
00981 this->handles_to_be_resumed_));
00982
00983 for (i = 0; i < this->suspended_handles_; i++)
00984 this->current_suspended_info_[i].dump ();
00985
00986 ACE_DEBUG ((LM_DEBUG,
00987 ACE_LIB_TEXT ("\n")));
00988
00989 ACE_DEBUG ((LM_DEBUG,
00990 ACE_LIB_TEXT ("Total handles to be deleted = %d\n"),
00991 this->handles_to_be_deleted_));
00992
00993 ACE_DEBUG ((LM_DEBUG,
00994 ACE_END_DUMP));
00995 }
00996
00997
00998
00999 int
01000 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
01001 {
01002 ACE_NOTSUP_RETURN (-1);
01003 }
01004
01005 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
01006 ACE_Timer_Queue *tq,
01007 ACE_Reactor_Notify *notify)
01008 : signal_handler_ (0),
01009 delete_signal_handler_ (0),
01010 timer_queue_ (0),
01011 delete_timer_queue_ (0),
01012 delete_handler_rep_ (0),
01013 delete_notify_handler_ (0),
01014 lock_adapter_ (lock_),
01015 handler_rep_ (*this),
01016
01017 ok_to_wait_ (1),
01018
01019 wakeup_all_threads_ (0),
01020
01021 waiting_to_change_state_ (0),
01022 active_threads_ (0),
01023 owner_ (ACE_Thread::self ()),
01024 new_owner_ (0),
01025 change_state_thread_ (0),
01026 open_for_business_ (0),
01027 deactivated_ (0)
01028 {
01029 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
01030 ACE_ERROR ((LM_ERROR,
01031 ACE_LIB_TEXT ("%p\n"),
01032 ACE_LIB_TEXT ("WFMO_Reactor")));
01033 }
01034
01035 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
01036 int unused,
01037 ACE_Sig_Handler *sh,
01038 ACE_Timer_Queue *tq,
01039 ACE_Reactor_Notify *notify)
01040 : signal_handler_ (0),
01041 delete_signal_handler_ (0),
01042 timer_queue_ (0),
01043 delete_timer_queue_ (0),
01044 delete_handler_rep_ (0),
01045 delete_notify_handler_ (0),
01046 lock_adapter_ (lock_),
01047 handler_rep_ (*this),
01048
01049 ok_to_wait_ (1),
01050
01051 wakeup_all_threads_ (0),
01052
01053 waiting_to_change_state_ (0),
01054 active_threads_ (0),
01055 owner_ (ACE_Thread::self ()),
01056 new_owner_ (0),
01057 change_state_thread_ (0),
01058 open_for_business_ (0),
01059 deactivated_ (0)
01060 {
01061 ACE_UNUSED_ARG (unused);
01062
01063 if (this->open (size, 0, sh, tq, 0, notify) == -1)
01064 ACE_ERROR ((LM_ERROR,
01065 ACE_LIB_TEXT ("%p\n"),
01066 ACE_LIB_TEXT ("WFMO_Reactor")));
01067 }
01068
01069 int
01070 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
01071 {
01072 return -1;
01073 }
01074
01075 int
01076 ACE_WFMO_Reactor::open (size_t size,
01077 int unused,
01078 ACE_Sig_Handler *sh,
01079 ACE_Timer_Queue *tq,
01080 int disable_notify_pipe,
01081 ACE_Reactor_Notify *notify)
01082 {
01083 ACE_UNUSED_ARG (unused);
01084 ACE_UNUSED_ARG (disable_notify_pipe);
01085
01086
01087 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01088
01089
01090 if (this->open_for_business_)
01091 return -1;
01092
01093
01094 if (this->delete_timer_queue_)
01095 delete this->timer_queue_;
01096
01097 if (tq == 0)
01098 {
01099 ACE_NEW_RETURN (this->timer_queue_,
01100 ACE_Timer_Heap,
01101 -1);
01102 this->delete_timer_queue_ = 1;
01103 }
01104 else
01105 {
01106 this->timer_queue_ = tq;
01107 this->delete_timer_queue_ = 0;
01108 }
01109
01110
01111 if (this->delete_signal_handler_)
01112 delete this->signal_handler_;
01113
01114 if (sh == 0)
01115 {
01116 ACE_NEW_RETURN (this->signal_handler_,
01117 ACE_Sig_Handler,
01118 -1);
01119 this->delete_signal_handler_ = 1;
01120 }
01121 else
01122 {
01123 this->signal_handler_ = sh;
01124 this->delete_signal_handler_ = 0;
01125 }
01126
01127
01128 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
01129 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
01130
01131
01132 if (this->delete_handler_rep_)
01133 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
01134
01135
01136
01137 if (this->handler_rep_.open (size + 2) == -1)
01138 ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("%p\n"),
01139 ACE_LIB_TEXT ("opening handler repository")),
01140 -1);
01141 else
01142 this->delete_handler_rep_ = 1;
01143
01144 this->notify_handler_ = notify;
01145
01146 if (this->notify_handler_ == 0)
01147 {
01148 ACE_NEW_RETURN (this->notify_handler_,
01149 ACE_WFMO_Reactor_Notify,
01150 -1);
01151
01152 if (this->notify_handler_ == 0)
01153 return -1;
01154 else
01155 this->delete_notify_handler_ = 1;
01156 }
01157
01158
01159
01160
01161
01162 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
01163 ACE_ERROR_RETURN ((LM_ERROR,
01164 ACE_LIB_TEXT ("%p\n"),
01165 ACE_LIB_TEXT ("opening notify handler ")),
01166 -1);
01167
01168
01169 if (this->register_handler (&this->wakeup_all_threads_handler_,
01170 this->wakeup_all_threads_.handle ()) == -1)
01171 ACE_ERROR_RETURN ((LM_ERROR,
01172 ACE_LIB_TEXT ("%p\n"),
01173 ACE_LIB_TEXT ("registering thread wakeup handler")),
01174 -1);
01175
01176
01177
01178 if (this->handler_rep_.changes_required ())
01179 {
01180
01181 this->handler_rep_.make_changes ();
01182
01183
01184 this->wakeup_all_threads_.reset ();
01185 }
01186
01187
01188 this->open_for_business_ = 1;
01189
01190 return 0;
01191 }
01192
01193 int
01194 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
01195 {
01196 if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0)
01197 delete this->signal_handler_;
01198 this->signal_handler_ = signal_handler;
01199 this->delete_signal_handler_ = 0;
01200 return 0;
01201 }
01202
01203 ACE_Timer_Queue *
01204 ACE_WFMO_Reactor::timer_queue (void) const
01205 {
01206 return this->timer_queue_;
01207 }
01208
01209 int
01210 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
01211 {
01212 if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0)
01213 delete this->timer_queue_;
01214 this->timer_queue_ = tq;
01215 this->delete_timer_queue_ = 0;
01216 return 0;
01217 }
01218
01219 int
01220 ACE_WFMO_Reactor::set_timer_queue (ACE_Timer_Queue *tq)
01221 {
01222 return this->timer_queue (tq);
01223 }
01224
01225 int
01226 ACE_WFMO_Reactor::close (void)
01227 {
01228
01229 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
01230
01231
01232 if (!this->open_for_business_)
01233 return -1;
01234
01235
01236 this->open_for_business_ = 0;
01237
01238 this->handler_rep_.close ();
01239
01240 return 0;
01241 }
01242
01243 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
01244 {
01245
01246
01247
01248
01249 this->close ();
01250
01251
01252
01253 this->handler_rep_.make_changes ();
01254
01255 if (this->delete_timer_queue_)
01256 {
01257 delete this->timer_queue_;
01258 this->timer_queue_ = 0;
01259 this->delete_timer_queue_ = 0;
01260 }
01261
01262 if (this->delete_signal_handler_)
01263 {
01264 delete this->signal_handler_;
01265 this->signal_handler_ = 0;
01266 this->delete_signal_handler_ = 0;
01267 }
01268
01269 if (this->delete_notify_handler_)
01270 {
01271 delete this->notify_handler_;
01272 this->notify_handler_ = 0;
01273 this->delete_notify_handler_ = 0;
01274 }
01275 }
01276
01277 int
01278 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
01279 ACE_HANDLE io_handle,
01280 ACE_Event_Handler *event_handler,
01281 ACE_Reactor_Mask new_masks)
01282 {
01283
01284
01285
01286 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
01287 ACE_UNUSED_ARG (event_handle);
01288 ACE_UNUSED_ARG (io_handle);
01289 ACE_UNUSED_ARG (event_handler);
01290 ACE_UNUSED_ARG (new_masks);
01291 ACE_NOTSUP_RETURN (-1);
01292 #else
01293
01294
01295 if (io_handle == ACE_INVALID_HANDLE)
01296 io_handle = event_handler->get_handle ();
01297
01298 if (this->handler_rep_.invalid_handle (io_handle))
01299 {
01300 errno = ERROR_INVALID_HANDLE;
01301 return -1;
01302 }
01303
01304 long new_network_events = 0;
01305 int delete_event = 0;
01306 auto_ptr <ACE_Auto_Event> event;
01307
01308
01309
01310 ACE_Reactor_Mask old_masks;
01311 int found = this->handler_rep_.modify_network_events_i (io_handle,
01312 new_masks,
01313 old_masks,
01314 new_network_events,
01315 event_handle,
01316 delete_event,
01317 ACE_Reactor::ADD_MASK);
01318
01319
01320
01321 if (event_handle == ACE_INVALID_HANDLE)
01322 {
01323
01324
01325 auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
01326 event = tmp;
01327 event_handle = event->handle ();
01328 delete_event = 1;
01329 }
01330
01331 int result = ::WSAEventSelect ((SOCKET) io_handle,
01332 event_handle,
01333 new_network_events);
01334
01335 if (found)
01336 return result;
01337 else if (result != SOCKET_ERROR &&
01338 this->handler_rep_.bind_i (1,
01339 event_handler,
01340 new_network_events,
01341 io_handle,
01342 event_handle,
01343 delete_event) != -1)
01344 {
01345
01346
01347 if (delete_event)
01348 {
01349
01350
01351
01352
01353
01354
01355
01356
01357 ACE_Errno_Guard guard (errno);
01358 event->handle (ACE_INVALID_HANDLE);
01359 event->remove ();
01360 }
01361 return 0;
01362 }
01363 else
01364 return -1;
01365 #endif
01366 }
01367
01368 int
01369 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
01370 ACE_Reactor_Mask new_masks,
01371 int operation)
01372 {
01373
01374 if (this->handler_rep_.invalid_handle (io_handle))
01375 return -1;
01376
01377 long new_network_events = 0;
01378 int delete_event = 0;
01379 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
01380
01381
01382
01383 ACE_Reactor_Mask old_masks;
01384 int found = this->handler_rep_.modify_network_events_i (io_handle,
01385 new_masks,
01386 old_masks,
01387 new_network_events,
01388 event_handle,
01389 delete_event,
01390 operation);
01391 if (found)
01392 {
01393 int result = ::WSAEventSelect ((SOCKET) io_handle,
01394 event_handle,
01395 new_network_events);
01396 if (result == 0)
01397 return old_masks;
01398 else
01399 return result;
01400 }
01401 else
01402 return -1;
01403 }
01404
01405
01406
01407 int
01408 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
01409 ACE_Reactor_Mask new_masks,
01410 ACE_Reactor_Mask &old_masks,
01411 long &new_network_events,
01412 ACE_HANDLE &event_handle,
01413 int &delete_event,
01414 int operation)
01415 {
01416 long *modified_network_events = &new_network_events;
01417 int found = 0;
01418 size_t i;
01419
01420
01421
01422
01423
01424 for (i = 0; i < this->max_handlep1_ && !found; i++)
01425 if (io_handle == this->current_info_[i].io_handle_ &&
01426 !this->current_info_[i].delete_entry_)
01427 {
01428 found = 1;
01429 modified_network_events = &this->current_info_[i].network_events_;
01430 delete_event = this->current_info_[i].delete_event_;
01431 event_handle = this->current_handles_[i];
01432 }
01433
01434
01435
01436
01437
01438 for (i = 0; i < this->suspended_handles_ && !found; i++)
01439 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
01440 !this->current_suspended_info_[i].delete_entry_)
01441 {
01442 found = 1;
01443 modified_network_events = &this->current_suspended_info_[i].network_events_;
01444 delete_event = this->current_suspended_info_[i].delete_event_;
01445 event_handle = this->current_suspended_info_[i].event_handle_;
01446 }
01447
01448
01449
01450
01451
01452 for (i = 0; i < this->handles_to_be_added_ && !found; i++)
01453 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
01454 !this->to_be_added_info_[i].delete_entry_)
01455 {
01456 found = 1;
01457 modified_network_events = &this->to_be_added_info_[i].network_events_;
01458 delete_event = this->to_be_added_info_[i].delete_event_;
01459 event_handle = this->to_be_added_info_[i].event_handle_;
01460 }
01461
01462 old_masks = this->bit_ops (*modified_network_events,
01463 new_masks,
01464 operation);
01465
01466 new_network_events = *modified_network_events;
01467
01468 return found;
01469 }
01470
01471 int
01472 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
01473 ACE_Reactor_Mask user_masks,
01474 ACE_Event_Handler **user_event_handler)
01475 {
01476 int found = 0;
01477 size_t i = 0;
01478 ACE_Event_Handler *event_handler = 0;
01479 long existing_masks = 0;
01480
01481
01482
01483
01484
01485
01486
01487 for (i = 0; i < this->max_handlep1_ && !found; i++)
01488 if ((handle == this->current_info_[i].io_handle_ ||
01489 handle == this->current_handles_[i]) &&
01490 !this->current_info_[i].delete_entry_)
01491 {
01492 found = 1;
01493 event_handler = this->current_info_[i].event_handler_;
01494 existing_masks = this->current_info_[i].network_events_;
01495 }
01496
01497
01498
01499
01500
01501 for (i = 0; i < this->suspended_handles_ && !found; i++)
01502 if ((handle == this->current_suspended_info_[i].io_handle_ ||
01503 handle == this->current_suspended_info_[i].event_handle_) &&
01504 !this->current_suspended_info_[i].delete_entry_)
01505 {
01506 found = 1;
01507 event_handler = this->current_suspended_info_[i].event_handler_;
01508 existing_masks = this->current_suspended_info_[i].network_events_;
01509 }
01510
01511
01512
01513
01514
01515 for (i = 0; i < this->handles_to_be_added_ && !found; i++)
01516 if ((handle == this->to_be_added_info_[i].io_handle_ ||
01517 handle == this->to_be_added_info_[i].event_handle_) &&
01518 !this->to_be_added_info_[i].delete_entry_)
01519 {
01520 found = 1;
01521 event_handler = this->to_be_added_info_[i].event_handler_;
01522 existing_masks = this->to_be_added_info_[i].network_events_;
01523 }
01524
01525
01526 if (!found)
01527 return -1;
01528
01529
01530
01531 if (found &&
01532 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
01533 if (!ACE_BIT_ENABLED (existing_masks, FD_READ)
01534 && !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
01535 found = 0;
01536
01537 if (found &&
01538 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
01539 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
01540 found = 0;
01541
01542 if (found &&
01543 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
01544 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
01545 found = 0;
01546
01547 if (found &&
01548 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
01549 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
01550 found = 0;
01551
01552 if (found &&
01553 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
01554 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
01555 found = 0;
01556
01557 if (found &&
01558 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
01559 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
01560 found = 0;
01561
01562 if (found &&
01563 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
01564 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
01565 found = 0;
01566
01567 if (found &&
01568 user_event_handler)
01569 *user_event_handler = event_handler;
01570
01571 if (found)
01572 return 0;
01573 else
01574 return -1;
01575 }
01576
01577
01578
01579 int
01580 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
01581 int alertable)
01582 {
01583 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
01584
01585
01586 if (!this->open_for_business_ || this->deactivated_)
01587 return -1;
01588
01589
01590
01591
01592 ACE_Countdown_Time countdown (max_wait_time);
01593
01594 int result;
01595 do
01596 {
01597
01598
01599
01600 result = this->ok_to_wait (max_wait_time, alertable);
01601 if (result != 1)
01602 return result;
01603
01604
01605 this->active_threads_++;
01606
01607
01608 this->lock_.release ();
01609
01610
01611
01612 countdown.update ();
01613
01614
01615 int timeout = this->calculate_timeout (max_wait_time);
01616
01617
01618 DWORD wait_status = this->wait_for_multiple_events (timeout,
01619 alertable);
01620
01621
01622 result = this->safe_dispatch (wait_status);
01623 if (0 == result)
01624 {
01625
01626
01627
01628
01629
01630
01631
01632
01633 countdown.update ();
01634 if (0 == max_wait_time || max_wait_time->usec () == 0)
01635 break;
01636 }
01637 }
01638 while (result == 0);
01639
01640 return result;
01641 }
01642
01643 int
01644 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
01645 int alertable)
01646 {
01647
01648
01649
01650
01651
01652 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
01653
01654
01655 DWORD result = 0;
01656 while (1)
01657 {
01658 #if defined (ACE_HAS_PHARLAP)
01659
01660
01661 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01662 this->atomic_wait_array_,
01663 TRUE,
01664 timeout);
01665
01666 if (result != WAIT_IO_COMPLETION)
01667 break;
01668
01669 #elif defined (ACE_HAS_WINCE)
01670 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01671 this->atomic_wait_array_,
01672 TRUE,
01673 timeout);
01674 break;
01675 #else
01676 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
01677 this->atomic_wait_array_,
01678 TRUE,
01679 timeout,
01680 alertable);
01681
01682 if (result != WAIT_IO_COMPLETION)
01683 break;
01684
01685 #endif
01686 }
01687
01688 switch (result)
01689 {
01690 case WAIT_TIMEOUT:
01691 errno = ETIME;
01692 return 0;
01693 case WAIT_FAILED:
01694 case WAIT_ABANDONED_0:
01695 ACE_OS::set_errno_to_last_error ();
01696 return -1;
01697 default:
01698 break;
01699 }
01700
01701
01702 return 1;
01703 }
01704
01705 DWORD
01706 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
01707 int alertable)
01708 {
01709
01710
01711
01712
01713 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
01714
01715
01716 ACE_UNUSED_ARG (alertable);
01717 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
01718 this->handler_rep_.handles (),
01719 FALSE,
01720 timeout);
01721 #else
01722 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
01723 this->handler_rep_.handles (),
01724 FALSE,
01725 timeout,
01726 alertable);
01727 #endif
01728 }
01729
01730 DWORD
01731 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
01732 {
01733 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
01734 this->handler_rep_.handles () + slot,
01735 FALSE,
01736 0);
01737 }
01738
01739 int
01740 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
01741 {
01742 ACE_Time_Value *time = 0;
01743 if (this->owner_ == ACE_Thread::self ())
01744 time = this->timer_queue_->calculate_timeout (max_wait_time);
01745 else
01746 time = max_wait_time;
01747
01748 if (time == 0)
01749 return INFINITE;
01750 else
01751 return time->msec ();
01752 }
01753
01754
01755 int
01756 ACE_WFMO_Reactor::expire_timers (void)
01757 {
01758
01759 if (ACE_Thread::self () == this->owner_)
01760
01761 return this->timer_queue_->expire ();
01762
01763 else
01764
01765 return 0;
01766 }
01767
01768 int
01769 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
01770 {
01771 int handlers_dispatched = 0;
01772
01773
01774 handlers_dispatched += this->expire_timers ();
01775
01776 switch (wait_status)
01777 {
01778 case WAIT_FAILED:
01779 ACE_OS::set_errno_to_last_error ();
01780 return -1;
01781
01782 case WAIT_TIMEOUT:
01783 errno = ETIME;
01784 return handlers_dispatched;
01785
01786 #ifndef ACE_HAS_WINCE
01787 case WAIT_IO_COMPLETION:
01788 return handlers_dispatched;
01789 #endif // ACE_HAS_WINCE
01790
01791 default:
01792
01793 handlers_dispatched += this->dispatch_handles (wait_status);
01794 return handlers_dispatched;
01795 }
01796 }
01797
01798
01799
01800
01801 int
01802 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
01803 {
01804
01805
01806 DWORD dispatch_slot = 0;
01807
01808
01809 DWORD max_handlep1 = this->handler_rep_.max_handlep1 ();
01810
01811
01812
01813 DWORD nCount = max_handlep1;
01814
01815 for (int number_of_handlers_dispatched = 1;
01816 ;
01817 number_of_handlers_dispatched++)
01818 {
01819 int ok = (
01820 #if ! (defined(__BORLANDC__) && (__BORLANDC__ >= 0x0530)) \
01821 && !defined (ghs) \
01822 && !defined (__MINGW32__)
01823
01824
01825
01826 wait_status >= WAIT_OBJECT_0 &&
01827 #endif
01828 wait_status <= (WAIT_OBJECT_0 + nCount));
01829 if (ok)
01830 dispatch_slot += wait_status - WAIT_OBJECT_0;
01831 else
01832
01833 dispatch_slot += wait_status - WAIT_ABANDONED_0;
01834
01835
01836 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
01837 return -1;
01838
01839
01840 dispatch_slot++;
01841
01842
01843 if (dispatch_slot >= max_handlep1)
01844 return number_of_handlers_dispatched;
01845
01846
01847 nCount = max_handlep1 - dispatch_slot;
01848
01849
01850 wait_status = this->poll_remaining_handles (dispatch_slot);
01851 switch (wait_status)
01852 {
01853 case WAIT_FAILED:
01854 ACE_OS::set_errno_to_last_error ();
01855
01856 case WAIT_TIMEOUT:
01857
01858 return number_of_handlers_dispatched;
01859 }
01860 }
01861 }
01862
01863 int
01864 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
01865 DWORD max_handlep1)
01866 {
01867
01868 if (slot == max_handlep1)
01869 return this->dispatch_window_messages ();
01870
01871
01872
01873
01874
01875
01876
01877 else if (!this->handler_rep_.scheduled_for_deletion (slot))
01878 {
01879 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
01880
01881 if (this->handler_rep_.current_info ()[slot].io_entry_)
01882 return this->complex_dispatch_handler (slot,
01883 event_handle);
01884 else
01885 return this->simple_dispatch_handler (slot,
01886 event_handle);
01887 }
01888 else
01889
01890 return 0;
01891 }
01892
01893 int
01894 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
01895 ACE_HANDLE event_handle)
01896 {
01897
01898
01899
01900
01901 siginfo_t sig (event_handle);
01902
01903 ACE_Event_Handler *eh =
01904 this->handler_rep_.current_info ()[slot].event_handler_;
01905
01906
01907 if (eh->handle_signal (0, &sig) == -1)
01908 this->handler_rep_.unbind (event_handle,
01909 ACE_Event_Handler::NULL_MASK);
01910
01911 return 0;
01912 }
01913
01914 int
01915 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
01916 ACE_HANDLE event_handle)
01917 {
01918
01919
01920 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
01921 this->handler_rep_.current_info ()[slot];
01922
01923 WSANETWORKEVENTS events;
01924 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
01925 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
01926 event_handle,
01927 &events) == SOCKET_ERROR)
01928 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
01929 else
01930 {
01931
01932
01933
01934
01935
01936
01937
01938
01939
01940
01941
01942 events.lNetworkEvents &= current_info.network_events_;
01943 while (events.lNetworkEvents != 0)
01944 {
01945
01946 problems |= this->upcall (current_info.event_handler_,
01947 current_info.io_handle_,
01948 events);
01949 if (this->handler_rep_.scheduled_for_deletion (slot))
01950 break;
01951 }
01952 }
01953
01954 if (problems != ACE_Event_Handler::NULL_MASK
01955 && !this->handler_rep_.scheduled_for_deletion (slot) )
01956 this->handler_rep_.unbind (event_handle, problems);
01957
01958 return 0;
01959 }
01960
01961 ACE_Reactor_Mask
01962 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
01963 ACE_HANDLE io_handle,
01964 WSANETWORKEVENTS &events)
01965 {
01966
01967
01968 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
01969
01970
01971
01972
01973
01974 long actual_events = events.lNetworkEvents;
01975 int action;
01976
01977 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
01978 {
01979 action = event_handler->handle_output (io_handle);
01980 if (action <= 0)
01981 {
01982 ACE_CLR_BITS (actual_events, FD_WRITE);
01983 if (action == -1)
01984 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
01985 }
01986 }
01987
01988 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
01989 {
01990 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
01991 {
01992
01993 action = event_handler->handle_output (io_handle);
01994 if (action <= 0)
01995 {
01996 ACE_CLR_BITS (actual_events, FD_CONNECT);
01997 if (action == -1)
01998 ACE_SET_BITS (problems,
01999 ACE_Event_Handler::CONNECT_MASK);
02000 }
02001 }
02002
02003 else
02004 {
02005 action = event_handler->handle_input (io_handle);
02006 if (action <= 0)
02007 {
02008 ACE_CLR_BITS (actual_events, FD_CONNECT);
02009 if (action == -1)
02010 ACE_SET_BITS (problems,
02011 ACE_Event_Handler::CONNECT_MASK);
02012 }
02013 }
02014 }
02015
02016 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
02017 {
02018 action = event_handler->handle_exception (io_handle);
02019 if (action <= 0)
02020 {
02021 ACE_CLR_BITS (actual_events, FD_OOB);
02022 if (action == -1)
02023 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
02024 }
02025 }
02026
02027 if (ACE_BIT_ENABLED (actual_events, FD_READ))
02028 {
02029 action = event_handler->handle_input (io_handle);
02030 if (action <= 0)
02031 {
02032 ACE_CLR_BITS (actual_events, FD_READ);
02033 if (action == -1)
02034 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02035 }
02036 }
02037
02038 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
02039 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
02040 {
02041 action = event_handler->handle_input (io_handle);
02042 if (action <= 0)
02043 {
02044 ACE_CLR_BITS (actual_events, FD_CLOSE);
02045 if (action == -1)
02046 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
02047 }
02048 }
02049
02050 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
02051 {
02052 action = event_handler->handle_input (io_handle);
02053 if (action <= 0)
02054 {
02055 ACE_CLR_BITS (actual_events, FD_ACCEPT);
02056 if (action == -1)
02057 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
02058 }
02059 }
02060
02061 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
02062 {
02063 action = event_handler->handle_qos (io_handle);
02064 if (action <= 0)
02065 {
02066 ACE_CLR_BITS (actual_events, FD_QOS);
02067 if (action == -1)
02068 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
02069 }
02070 }
02071
02072 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
02073 {
02074 action = event_handler->handle_group_qos (io_handle);
02075 if (action <= 0)
02076 {
02077 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
02078 if (action == -1)
02079 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
02080 }
02081 }
02082
02083 events.lNetworkEvents = actual_events;
02084 return problems;
02085 }
02086
02087
02088 int
02089 ACE_WFMO_Reactor::update_state (void)
02090 {
02091
02092 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02093
02094
02095 this->active_threads_--;
02096
02097
02098
02099 if (this->handler_rep_.changes_required () || this->new_owner ())
02100 {
02101 if (this->change_state_thread_ == 0)
02102
02103
02104 {
02105 this->change_state_thread_ = ACE_Thread::self ();
02106
02107 this->ok_to_wait_.reset ();
02108
02109 if (this->active_threads_ > 0)
02110
02111 {
02112
02113 this->wakeup_all_threads_.signal ();
02114
02115 monitor.release ();
02116
02117 this->waiting_to_change_state_.wait ();
02118
02119 monitor.acquire ();
02120 }
02121
02122
02123
02124
02125 while (this->handler_rep_.changes_required ())
02126
02127 this->handler_rep_.make_changes ();
02128 if (this->new_owner ())
02129
02130 this->change_owner ();
02131
02132 this->wakeup_all_threads_.reset ();
02133
02134 this->ok_to_wait_.signal ();
02135
02136 this->change_state_thread_ = 0;
02137 }
02138 else if (this->active_threads_ == 0)
02139
02140
02141
02142 this->waiting_to_change_state_.signal ();
02143 }
02144
02145
02146 else if (this->active_threads_ == 0)
02147
02148 this->wakeup_all_threads_.reset ();
02149
02150 return 0;
02151 }
02152
02153 void
02154 ACE_WFMO_Reactor::dump (void) const
02155 {
02156 ACE_TRACE ("ACE_WFMO_Reactor::dump");
02157
02158 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02159
02160 ACE_DEBUG ((LM_DEBUG,
02161 ACE_LIB_TEXT ("Count of currently active threads = %d\n"),
02162 this->active_threads_));
02163
02164 ACE_DEBUG ((LM_DEBUG,
02165 ACE_LIB_TEXT ("ID of owner thread = %d\n"),
02166 this->owner_));
02167
02168 this->handler_rep_.dump ();
02169 this->signal_handler_->dump ();
02170 this->timer_queue_->dump ();
02171
02172 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02173 }
02174
02175 int
02176 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & ,
02177 ACE_Handle_Set & )
02178 {
02179 return -1;
02180 }
02181
02182 int
02183 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & )
02184 {
02185 return 0;
02186 }
02187
02188 ACE_HANDLE
02189 ACE_WFMO_Reactor_Notify::notify_handle (void)
02190 {
02191 return ACE_INVALID_HANDLE;
02192 }
02193
02194 int
02195 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
02196 ACE_Notification_Buffer &)
02197 {
02198 return 0;
02199 }
02200
02201 int
02202 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
02203 {
02204 return 0;
02205 }
02206
02207 int
02208 ACE_WFMO_Reactor_Notify::close (void)
02209 {
02210 return -1;
02211 }
02212
02213 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
02214 : timer_queue_ (0),
02215 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02216 max_notifies * sizeof (ACE_Notification_Buffer)),
02217 max_notify_iterations_ (-1)
02218 {
02219 }
02220
02221 int
02222 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
02223 ACE_Timer_Queue *timer_queue,
02224 int ignore_notify)
02225 {
02226 ACE_UNUSED_ARG (ignore_notify);
02227 timer_queue_ = timer_queue;
02228 return wfmo_reactor->register_handler (this);
02229 }
02230
02231 ACE_HANDLE
02232 ACE_WFMO_Reactor_Notify::get_handle (void) const
02233 {
02234 return this->wakeup_one_thread_.handle ();
02235 }
02236
02237
02238
02239 int
02240 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
02241 siginfo_t *siginfo,
02242 ucontext_t *)
02243 {
02244 ACE_UNUSED_ARG (signum);
02245
02246
02247 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02248 return -1;
02249
02250
02251
02252
02253
02254
02255 for (int i = 1; ; i++)
02256 {
02257 ACE_Message_Block *mb = 0;
02258
02259 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02260 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02261 {
02262 if (errno == EWOULDBLOCK)
02263
02264
02265 return 0;
02266 else
02267 return -1;
02268 }
02269 else
02270 {
02271 ACE_Notification_Buffer *buffer =
02272 (ACE_Notification_Buffer *) mb->base ();
02273
02274
02275
02276
02277
02278 if (buffer->eh_ != 0)
02279 {
02280 int result = 0;
02281
02282 switch (buffer->mask_)
02283 {
02284 case ACE_Event_Handler::READ_MASK:
02285 case ACE_Event_Handler::ACCEPT_MASK:
02286 result = buffer->eh_->handle_input (ACE_INVALID_HANDLE);
02287 break;
02288 case ACE_Event_Handler::WRITE_MASK:
02289 result = buffer->eh_->handle_output (ACE_INVALID_HANDLE);
02290 break;
02291 case ACE_Event_Handler::EXCEPT_MASK:
02292 result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE);
02293 break;
02294 case ACE_Event_Handler::QOS_MASK:
02295 result = buffer->eh_->handle_qos (ACE_INVALID_HANDLE);
02296 break;
02297 case ACE_Event_Handler::GROUP_QOS_MASK:
02298 result = buffer->eh_->handle_group_qos (ACE_INVALID_HANDLE);
02299 break;
02300 default:
02301 ACE_ERROR ((LM_ERROR,
02302 ACE_LIB_TEXT ("invalid mask = %d\n"),
02303 buffer->mask_));
02304 break;
02305 }
02306 if (result == -1)
02307 buffer->eh_->handle_close (ACE_INVALID_HANDLE,
02308 ACE_Event_Handler::EXCEPT_MASK);
02309 }
02310
02311
02312
02313 mb->release ();
02314
02315
02316
02317
02318 if (i == this->max_notify_iterations_)
02319 {
02320
02321
02322 if (!this->message_queue_.is_empty ())
02323 this->wakeup_one_thread_.signal ();
02324
02325
02326 return 0;
02327 }
02328 }
02329 }
02330 }
02331
02332
02333
02334
02335
02336 int
02337 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *eh,
02338 ACE_Reactor_Mask mask,
02339 ACE_Time_Value *timeout)
02340 {
02341 if (eh != 0)
02342 {
02343 ACE_Message_Block *mb = 0;
02344 ACE_NEW_RETURN (mb,
02345 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02346 -1);
02347
02348 ACE_Notification_Buffer *buffer =
02349 (ACE_Notification_Buffer *) mb->base ();
02350 buffer->eh_ = eh;
02351 buffer->mask_ = mask;
02352
02353
02354
02355
02356 if (timeout != 0)
02357 *timeout += timer_queue_->gettimeofday ();
02358
02359 if (this->message_queue_.enqueue_tail
02360 (mb, timeout) == -1)
02361 {
02362 mb->release ();
02363 return -1;
02364 }
02365 }
02366
02367 return this->wakeup_one_thread_.signal ();
02368 }
02369
02370 void
02371 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
02372 {
02373 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02374
02375 if (iterations == 0)
02376 iterations = 1;
02377
02378 this->max_notify_iterations_ = iterations;
02379 }
02380
02381 int
02382 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
02383 {
02384 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02385 return this->max_notify_iterations_;
02386 }
02387
02388 int
02389 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
02390 ACE_Reactor_Mask mask)
02391 {
02392 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02393
02394
02395
02396
02397
02398
02399 if (this->message_queue_.is_empty ())
02400 return 0;
02401
02402
02403
02404
02405
02406
02407
02408 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02409
02410
02411
02412 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02413
02414 size_t queue_size = this->message_queue_.message_count ();
02415 int number_purged = 0;
02416
02417 size_t index;
02418
02419 for (index = 0; index < queue_size; ++index)
02420 {
02421 ACE_Message_Block *mb;
02422 if (-1 == this->message_queue_.dequeue_head (mb))
02423 return -1;
02424
02425 ACE_Notification_Buffer *buffer =
02426 ACE_reinterpret_cast (ACE_Notification_Buffer *, mb->base ());
02427
02428
02429
02430
02431
02432 if ((0 != buffer->eh_) &&
02433 (0 == eh || eh == buffer->eh_) &&
02434 ACE_BIT_DISABLED (buffer->mask_, ~mask))
02435
02436
02437 {
02438 mb->release ();
02439 ++number_purged;
02440 }
02441 else
02442 {
02443
02444
02445
02446
02447 if ((0 != buffer->eh_) &&
02448 (0 == eh || eh == buffer->eh_))
02449 ACE_CLR_BITS(buffer->mask_, mask);
02450 if (-1 == local_queue.enqueue_head (mb))
02451 return -1;
02452 }
02453 }
02454
02455 if (this->message_queue_.message_count ())
02456 {
02457 ACE_ASSERT (0);
02458 return -1;
02459 }
02460
02461
02462
02463 queue_size = local_queue.message_count ();
02464 for (index = 0; index < queue_size; ++index)
02465 {
02466 ACE_Message_Block *mb;
02467 if (-1 == local_queue.dequeue_head (mb))
02468 {
02469 ACE_ASSERT (0);
02470 return -1;
02471 }
02472
02473 if (-1 == this->message_queue_.enqueue_head (mb))
02474 {
02475 ACE_ASSERT (0);
02476 return -1;
02477 }
02478 }
02479
02480 return number_purged;
02481 }
02482
02483 void
02484 ACE_WFMO_Reactor_Notify::dump (void) const
02485 {
02486 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02487 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02488 this->timer_queue_->dump ();
02489 ACE_DEBUG ((LM_DEBUG,
02490 ACE_LIB_TEXT ("Max. iteration: %d\n"),
02491 this->max_notify_iterations_));
02492 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02493 }
02494
02495 void
02496 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
02497 {
02498 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02499 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
02500
02501
02502 this->notify_handler_->max_notify_iterations (iterations);
02503 }
02504
02505 int
02506 ACE_WFMO_Reactor::max_notify_iterations (void)
02507 {
02508 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
02509 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
02510
02511 return this->notify_handler_->max_notify_iterations ();
02512 }
02513
02514 int
02515 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
02516 ACE_Reactor_Mask mask)
02517 {
02518 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
02519 if (this->notify_handler_ == 0)
02520 return 0;
02521 else
02522 return this->notify_handler_->purge_pending_notifications (eh, mask);
02523 }
02524
02525 int
02526 ACE_WFMO_Reactor::resumable_handler (void)
02527 {
02528 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
02529 return 0;
02530 }
02531
02532
02533
02534 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
02535 int
02536 WSAEventSelect (SOCKET s,
02537 WSAEVENT hEventObject,
02538 long lNetworkEvents)
02539 {
02540 ACE_UNUSED_ARG (s);
02541 ACE_UNUSED_ARG (hEventObject);
02542 ACE_UNUSED_ARG (lNetworkEvents);
02543
02544 return -1;
02545 }
02546
02547 int
02548 WSAEnumNetworkEvents (SOCKET s,
02549 WSAEVENT hEventObject,
02550 LPWSANETWORKEVENTS lpNetworkEvents)
02551 {
02552 ACE_UNUSED_ARG (s);
02553 ACE_UNUSED_ARG (hEventObject);
02554 ACE_UNUSED_ARG (lpNetworkEvents);
02555
02556 return -1;
02557 }
02558 #endif
02559
02560 #endif