00001 #include "ace_pch.h"
00002
00003
00004
00005 #include "ace/TP_Reactor.h"
00006 #include "ace/Reactor.h"
00007 #include "ace/Thread.h"
00008
00009 #if !defined (__ACE_INLINE__)
00010 #include "ace/TP_Reactor.i"
00011 #endif
00012
00013 ACE_RCSID(ace, TP_Reactor, "$Id: TP_Reactor.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $")
00014
00015
00016 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00017
00018 int
00019 ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
00020 {
00021 ACE_TRACE ("ACE_TP_Token_Guard::grab_token");
00022
00023
00024
00025
00026
00027 int result = 0;
00028
00029 if (max_wait_time)
00030 {
00031 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00032 tv += *max_wait_time;
00033
00034 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00035 0,
00036 &tv));
00037 }
00038 else
00039 {
00040 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00041 }
00042
00043
00044
00045 if (result == -1)
00046 {
00047 if (errno == ETIME)
00048 return 0;
00049 else
00050 return -1;
00051 }
00052
00053
00054 this->owner_ = 1;
00055
00056 return result;
00057 }
00058
00059
00060 int
00061 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00062 {
00063 ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00064
00065
00066
00067 int result = 0;
00068
00069 if (max_wait_time)
00070 {
00071 ACE_Time_Value tv = ACE_OS::gettimeofday ();
00072 tv += *max_wait_time;
00073
00074 ACE_MT (result = this->token_.acquire (0,
00075 0,
00076 &tv));
00077 }
00078 else
00079 {
00080 ACE_MT (result = this->token_.acquire ());
00081 }
00082
00083
00084
00085 if (result == -1)
00086 {
00087 if (errno == ETIME)
00088 return 0;
00089 else
00090 return -1;
00091 }
00092
00093
00094 this->owner_ = 1;
00095
00096 return result;
00097 }
00098
00099
00100
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102 ACE_Timer_Queue *tq,
00103 int mask_signals,
00104 int s_queue)
00105 : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals, s_queue)
00106 {
00107 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108 this->supress_notify_renew (1);
00109 }
00110
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t size,
00112 int rs,
00113 ACE_Sig_Handler *sh,
00114 ACE_Timer_Queue *tq,
00115 int mask_signals,
00116 int s_queue)
00117 : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals, s_queue)
00118 {
00119 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120 this->supress_notify_renew (1);
00121 }
00122
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126 ACE_TRACE ("ACE_TP_Reactor::owner");
00127 if (o_id)
00128 *o_id = ACE_Thread::self ();
00129
00130 return 0;
00131 }
00132
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136 ACE_TRACE ("ACE_TP_Reactor::owner");
00137 *t_id = ACE_Thread::self ();
00138
00139 return 0;
00140 }
00141
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145 ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146
00147
00148
00149
00150 ACE_Countdown_Time countdown (max_wait_time);
00151
00152
00153
00154
00155
00156
00157 ACE_TP_Token_Guard guard (this->token_);
00158
00159
00160 int result = guard.grab_token (max_wait_time);
00161
00162
00163 if (!guard.is_owner ())
00164 return result;
00165
00166
00167 if (this->deactivated_)
00168 return -1;
00169
00170
00171 countdown.update ();
00172
00173
00174 return this->dispatch_i (max_wait_time,
00175 guard);
00176 }
00177
00178
00179 int
00180 ACE_TP_Reactor::remove_handler (ACE_Event_Handler *eh,
00181 ACE_Reactor_Mask mask)
00182 {
00183 int result = 0;
00184
00185 {
00186 ACE_TP_Token_Guard guard (this->token_);
00187
00188
00189 result = guard.acquire_token ();
00190
00191 if (!guard.is_owner ())
00192 return result;
00193
00194
00195
00196 result = this->remove_handler_i (eh->get_handle (),
00197 mask | ACE_Event_Handler::DONT_CALL);
00198
00199 if (result == -1)
00200 return -1;
00201 }
00202
00203
00204
00205 if (result == 0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
00206 eh->handle_close (ACE_INVALID_HANDLE, mask);
00207
00208 return 0;
00209 }
00210
00211 int
00212 ACE_TP_Reactor::remove_handler (ACE_HANDLE handle,
00213 ACE_Reactor_Mask mask)
00214 {
00215
00216 ACE_Event_Handler *eh = 0;
00217 int result = 0;
00218
00219 {
00220 ACE_TP_Token_Guard guard (this->token_);
00221
00222
00223 result = guard.acquire_token ();
00224
00225 if (!guard.is_owner ())
00226 return result;
00227
00228 size_t slot = 0;
00229 eh = this->handler_rep_.find (handle, &slot);
00230
00231 if (eh == 0)
00232 return -1;
00233
00234
00235
00236 result = this->remove_handler_i (handle,
00237 mask | ACE_Event_Handler::DONT_CALL);
00238
00239 if (result == -1)
00240 return -1;
00241 }
00242
00243
00244
00245
00246
00247 if (result ==0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
00248 eh->handle_close (handle, mask);
00249
00250 return 0;
00251 }
00252
00253
00254 int
00255 ACE_TP_Reactor::remove_handler (const ACE_Handle_Set &handles,
00256 ACE_Reactor_Mask m)
00257 {
00258
00259 ACE_Event_Handler **aeh = 0;
00260
00261
00262 ACE_NEW_RETURN (aeh,
00263 ACE_Event_Handler *[handles.num_set ()],
00264 -1);
00265
00266 size_t index = 0;
00267
00268
00269 {
00270 ACE_TP_Token_Guard guard (this->token_);
00271
00272
00273 int result = guard.acquire_token ();
00274
00275 if (!guard.is_owner ())
00276 return result;
00277
00278 ACE_HANDLE h;
00279
00280 ACE_Handle_Set_Iterator handle_iter (handles);
00281
00282 while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
00283 {
00284 size_t slot = 0;
00285 ACE_Event_Handler *eh =
00286 this->handler_rep_.find (h, &slot);
00287
00288 if (this->remove_handler_i (h,
00289 m | ACE_Event_Handler::DONT_CALL) == -1)
00290 {
00291 delete [] aeh;
00292 return -1;
00293 }
00294
00295 aeh [index] = eh;
00296 index ++;
00297 }
00298 }
00299
00300
00301
00302 if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0)
00303 {
00304 for (size_t i = 0; i < index; i++)
00305 aeh[i]->handle_close (ACE_INVALID_HANDLE, m);
00306 }
00307
00308 delete [] aeh;
00309 return 0;
00310 }
00311
00312 int
00313 ACE_TP_Reactor::remove_handler (int ,
00314 ACE_Sig_Action * ,
00315 ACE_Sig_Action * ,
00316 int )
00317 {
00318 ACE_NOTSUP_RETURN (-1);
00319 }
00320
00321 int
00322 ACE_TP_Reactor::remove_handler (const ACE_Sig_Set & )
00323 {
00324 ACE_NOTSUP_RETURN (-1);
00325 }
00326
00327
00328 int
00329 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00330 ACE_TP_Token_Guard &guard)
00331 {
00332 int event_count =
00333 this->get_event_for_dispatching (max_wait_time);
00334
00335 int result = 0;
00336
00337
00338
00339
00340
00341
00342 if (event_count == -1)
00343 {
00344
00345
00346
00347
00348
00349 return this->handle_signals (event_count,
00350 guard);
00351 }
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364 result = this->handle_timer_events (event_count,
00365 guard);
00366
00367 if (result > 0)
00368 return result;
00369
00370
00371
00372
00373 if (event_count > 0)
00374 {
00375
00376
00377
00378 result = this->handle_notify_events (event_count,
00379 guard);
00380
00381 if (result > 0)
00382 return result;
00383
00384
00385 }
00386
00387 if (event_count > 0)
00388 {
00389
00390 return this->handle_socket_events (event_count,
00391 guard);
00392 }
00393
00394 return 0;
00395
00396 }
00397
00398
00399
00400
00401 int
00402 ACE_TP_Reactor::handle_signals (int & ,
00403 ACE_TP_Token_Guard & )
00404 {
00405 ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415 if (ACE_Sig_Handler::sig_pending () != 0)
00416 {
00417 ACE_Sig_Handler::sig_pending (0);
00418
00419
00420
00421
00422
00423 #if 0
00424
00425
00426
00427
00428
00429
00430 active_handle_count = this->any_ready (dispatch_set);
00431 #else
00432
00433 #endif
00434
00435
00436
00437
00438 return 1;
00439 }
00440
00441 return -1;
00442 }
00443
00444
00445 int
00446 ACE_TP_Reactor::handle_timer_events (int & ,
00447 ACE_TP_Token_Guard &guard)
00448 {
00449
00450 ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00451 this->timer_queue_->timer_skew ());
00452
00453
00454
00455 ACE_Timer_Node_Dispatch_Info info;
00456
00457 if (this->timer_queue_->dispatch_info (cur_time,
00458 info))
00459 {
00460
00461 guard.release_token ();
00462
00463
00464 this->timer_queue_->upcall (info.type_,
00465 info.act_,
00466 cur_time);
00467
00468
00469 return 1;
00470 }
00471
00472 return 0;
00473 }
00474
00475
00476
00477 int
00478 ACE_TP_Reactor::handle_notify_events (int & ,
00479 ACE_TP_Token_Guard &guard)
00480 {
00481
00482 ACE_HANDLE notify_handle =
00483 this->get_notify_handle ();
00484
00485 int result = 0;
00486
00487
00488
00489 if (notify_handle == ACE_INVALID_HANDLE)
00490 return result;
00491
00492
00493 ACE_Notification_Buffer buffer;
00494
00495
00496 this->ready_set_.rd_mask_.clr_bit (notify_handle);
00497
00498
00499
00500 while (this->notify_handler_->read_notify_pipe (notify_handle,
00501 buffer) > 0)
00502 {
00503
00504
00505
00506
00507
00508 if (this->notify_handler_->is_dispatchable (buffer) > 0)
00509 {
00510
00511 guard.release_token ();
00512
00513
00514 this->notify_handler_->dispatch_notify (buffer);
00515
00516
00517 result = 1;
00518
00519
00520 break;
00521 }
00522 }
00523
00524
00525
00526
00527 return result;
00528 }
00529
00530 int
00531 ACE_TP_Reactor::handle_socket_events (int &event_count,
00532 ACE_TP_Token_Guard &guard)
00533 {
00534
00535
00536 ACE_EH_Dispatch_Info dispatch_info;
00537
00538 this->get_socket_event_info (dispatch_info);
00539
00540
00541
00542 if (!dispatch_info.dispatch ())
00543 {
00544 return 0;
00545 }
00546
00547
00548
00549
00550
00551 if (dispatch_info.event_handler_ != this->notify_handler_)
00552 this->suspend_i (dispatch_info.handle_);
00553
00554
00555 guard.release_token ();
00556
00557 int result = 0;
00558
00559
00560
00561 --event_count;
00562
00563 if (this->dispatch_socket_event (dispatch_info) == 0)
00564 ++result;
00565
00566
00567
00568
00569 int flag =
00570 ACE_Event_Handler::ACE_EVENT_HANDLER_NOT_RESUMED;
00571
00572
00573
00574
00575 guard.acquire_token ();
00576
00577
00578 ACE_Event_Handler *eh =
00579 this->handler_rep_.find (dispatch_info.handle_);
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598 if (dispatch_info.event_handler_ != 0 &&
00599 eh == dispatch_info.event_handler_)
00600 {
00601 flag =
00602 dispatch_info.event_handler_->resume_handler ();
00603 }
00604
00605
00606 if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
00607 dispatch_info.event_handler_ != this->notify_handler_ &&
00608 flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00609 this->resume_i (dispatch_info.handle_);
00610
00611
00612
00613 guard.release_token ();
00614
00615 return result;
00616 }
00617
00618 int
00619 ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
00620 ACE_Reactor_Mask mask,
00621 int ops)
00622 {
00623 ACE_TRACE ("ACE_TP_Reactor::mask_ops");
00624 ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
00625 ace_mon, this->token_, -1));
00626
00627 int result = 0;
00628
00629
00630
00631
00632 if (this->suspend_set_.rd_mask_.is_set (handle) == 0
00633 && this->suspend_set_.wr_mask_.is_set (handle) == 0
00634 && this->suspend_set_.ex_mask_.is_set (handle) == 0)
00635
00636 result = this->bit_ops (handle, mask,
00637 this->wait_set_,
00638 ops);
00639 else
00640
00641 result = this->bit_ops (handle, mask,
00642 this->suspend_set_,
00643 ops);
00644
00645 return result;
00646 }
00647
00648
00649
00650 int
00651 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00652 {
00653
00654
00655
00656 if (this->state_changed_)
00657 {
00658 this->ready_set_.rd_mask_.reset ();
00659 this->ready_set_.wr_mask_.reset ();
00660 this->ready_set_.ex_mask_.reset ();
00661 this->state_changed_ = 0;
00662 }
00663 else
00664 {
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677 this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00678 this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00679 this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00680
00681
00682 }
00683
00684 return this->wait_for_multiple_events (this->ready_set_,
00685 max_wait_time);
00686 }
00687
00688 int
00689 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00690 {
00691 event.reset ();
00692
00693
00694
00695
00696
00697
00698 int found_io = 0;
00699 ACE_HANDLE handle;
00700
00701
00702
00703 {
00704 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00705
00706 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00707 {
00708 if (this->is_suspended_i (handle))
00709 continue;
00710
00711
00712 event.set (handle,
00713 this->handler_rep_.find (handle),
00714 ACE_Event_Handler::WRITE_MASK,
00715 &ACE_Event_Handler::handle_output);
00716
00717 this->clear_handle_read_set (handle);
00718 found_io = 1;
00719 }
00720 }
00721
00722 if (!found_io)
00723 {
00724 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00725
00726 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00727 {
00728 if (this->is_suspended_i (handle))
00729 continue;
00730
00731
00732 event.set (handle,
00733 this->handler_rep_.find (handle),
00734 ACE_Event_Handler::EXCEPT_MASK,
00735 &ACE_Event_Handler::handle_exception);
00736
00737 this->clear_handle_read_set (handle);
00738
00739 found_io = 1;
00740 }
00741 }
00742
00743 if (!found_io)
00744 {
00745 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00746
00747 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00748 {
00749 if (this->is_suspended_i (handle))
00750 continue;
00751
00752
00753 event.set (handle,
00754 this->handler_rep_.find (handle),
00755 ACE_Event_Handler::READ_MASK,
00756 &ACE_Event_Handler::handle_input);
00757
00758 this->clear_handle_read_set (handle);
00759 found_io = 1;
00760 }
00761 }
00762
00763 return found_io;
00764 }
00765
00766
00767
00768
00769 int
00770 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00771 {
00772 ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00773
00774 ACE_HANDLE handle = dispatch_info.handle_;
00775 ACE_Event_Handler *event_handler = dispatch_info.event_handler_;
00776 ACE_Reactor_Mask mask = dispatch_info.mask_;
00777 ACE_EH_PTMF callback = dispatch_info.callback_;
00778
00779
00780 if (event_handler == 0)
00781 return -1;
00782
00783
00784
00785
00786
00787
00788 int status = 1;
00789 while (status > 0)
00790 status = (event_handler->*callback) (handle);
00791
00792
00793 if (status < 0)
00794 {
00795 int retval =
00796 this->remove_handler (handle, mask);
00797
00798
00799 dispatch_info.event_handler_ = 0;
00800 dispatch_info.handle_ = ACE_INVALID_HANDLE;
00801
00802 return retval;
00803 }
00804
00805
00806 return 0;
00807 }
00808
00809 int
00810 ACE_TP_Reactor::resumable_handler (void)
00811 {
00812 return 1;
00813 }
00814
00815 int
00816 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00817 {
00818 return this->handle_events (&max_wait_time);
00819 }
00820
00821 int
00822 ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
00823 ACE_Reactor_Mask mask,
00824 int ops)
00825 {
00826 return this->mask_ops (eh->get_handle (), mask, ops);
00827 }
00828
00829 void
00830 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00831 ACE_Reactor_Mask,
00832 ACE_Handle_Set &,
00833 ACE_Event_Handler *eh,
00834 ACE_EH_PTMF)
00835 {
00836 ACE_ERROR ((LM_ERROR,
00837 ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: ")
00838 ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n")));
00839
00840 ACE_ASSERT (eh == 0);
00841 ACE_UNUSED_ARG (eh);
00842 }
00843
00844 ACE_HANDLE
00845 ACE_TP_Reactor::get_notify_handle (void)
00846 {
00847
00848
00849 ACE_HANDLE read_handle =
00850 this->notify_handler_->notify_handle ();
00851
00852
00853
00854 if (read_handle != ACE_INVALID_HANDLE &&
00855 this->ready_set_.rd_mask_.is_set (read_handle))
00856 {
00857 return read_handle;
00858 }
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872 return ACE_INVALID_HANDLE;
00873 }