Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ACE_TP_Reactor Class Reference

Specialization of Select Reactor to support thread-pool based event dispatching. More...

#include <TP_Reactor.h>

Inheritance diagram for ACE_TP_Reactor:

Inheritance graph
[legend]
Collaboration diagram for ACE_TP_Reactor:

Collaboration graph
[legend]
List of all members.

Public Methods

 ACE_TP_Reactor (ACE_Sig_Handler *=0, ACE_Timer_Queue *=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO)
 Initialize <ACE_TP_Reactor> with the default size. More...

 ACE_TP_Reactor (size_t max_number_of_handles, int restart=0, ACE_Sig_Handler *=0, ACE_Timer_Queue *=0, int mask_signals=1, int s_queue=ACE_Select_Reactor_Token::FIFO)
virtual int handle_events (ACE_Time_Value *max_wait_time=0)
virtual int handle_events (ACE_Time_Value &max_wait_time)
virtual int remove_handler (ACE_Event_Handler *eh, ACE_Reactor_Mask mask)
 The following two overloaded methods are necessary as we dont want the TP_Reactor to call handle_close () with the token held. More...

virtual int remove_handler (ACE_HANDLE handle, ACE_Reactor_Mask)
virtual int remove_handler (const ACE_Handle_Set &handle_set, ACE_Reactor_Mask)
virtual int remove_handler (int signum, ACE_Sig_Action *new_disp, ACE_Sig_Action *old_disp=0, int sigkey=-1)
virtual int remove_handler (const ACE_Sig_Set &sigset)
 Calls <remove_handler> for every signal in <sigset>. More...

virtual int resumable_handler (void)
 Does the reactor allow the application to resume the handle on its own ie. can it pass on the control of handle resumption to the application. The TP reactor has can allow applications to resume handles. So return a +ve value. More...

virtual int mask_ops (ACE_Event_Handler *eh, ACE_Reactor_Mask mask, int ops)
 GET/SET/ADD/CLR the dispatch mask "bit" bound with the <eh> and <mask>. More...

virtual int mask_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, int ops)
 GET/SET/ADD/CLR the dispatch mask "bit" bound with the <handle> and <mask>. More...

virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id=0)
 Set the new owner of the thread and return the old owner. More...

virtual int owner (ACE_thread_t *)
 Return the current owner of the thread. More...


Static Public Methods

void no_op_sleep_hook (void *)
 Called from handle events. More...


Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks. More...


Protected Methods

int dispatch_i (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard)
 Dispatch just 1 signal, timer, notification handlers. More...

int get_event_for_dispatching (ACE_Time_Value *max_wait_time)
 Get the event that needs dispatching.It could be either a signal, timer, notification handlers or return possibly 1 I/O handler for dispatching. In the most common use case, this would return 1 I/O handler for dispatching. More...

int handle_signals (int &event_count, ACE_TP_Token_Guard &g)
 Method to handle signals NOTE: It is just busted at this point in time. More...

int handle_timer_events (int &event_count, ACE_TP_Token_Guard &g)
 Handle timer events. More...

int handle_notify_events (int &event_count, ACE_TP_Token_Guard &g)
 Handle notify events. More...

int handle_socket_events (int &event_count, ACE_TP_Token_Guard &g)
 handle socket events. More...

virtual void notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &, ACE_Event_Handler *eh, ACE_EH_PTMF callback)
 This method shouldn't get called. More...


Private Methods

ACE_HANDLE get_notify_handle (void)
 Get the handle of the notify pipe from the ready set if there is an event in the notify pipe. More...

int get_socket_event_info (ACE_EH_Dispatch_Info &info)
 Get socket event dispatch information. More...

int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
 Notify the appropriate <callback> in the context of the <eh> associated with <handle> that a particular event has occurred. More...

void clear_handle_read_set (ACE_HANDLE handle)
 Clear the <handle> from the read_set. More...

 ACE_TP_Reactor (const ACE_TP_Reactor &)
 Deny access since member-wise won't work... More...

ACE_TP_Reactor & operator= (const ACE_TP_Reactor &)

Detailed Description

Specialization of Select Reactor to support thread-pool based event dispatching.

One of the short comings of the Select_Reactor in ACE is that it did not support a thread pool based event dispatching model, similar to the one in WFMO_Reactor. In Select_Reactor, only thread can be blocked in <handle_events> at any given time. A new Reactor has been added to ACE that removes this short-coming. TP_Reactor is a specialization of Select Reactor to support thread-pool based event dispatching. This Reactor takes advantage of the fact that events reported by <select> are persistent if not acted upon immediately. It works by remembering the event handler that just got activated, releasing the internal lock (so that some other thread can start waiting in the event loop) and then dispatching the event handler outside the context of the Reactor lock. This Reactor is best suited for situations when the callbacks to event handlers can take arbitrarily long and/or a number of threads are available to run the event loops. Note that callback code in Event Handlers (e.g. Event_Handler::handle_input) does not have to be modified or made thread-safe for this Reactor. This is because an activated Event Handler is suspended in the Reactor before the upcall is made and resumed after the upcall completes. Therefore, one Event Handler cannot be called by multiple threads simultaneously.

Definition at line 165 of file TP_Reactor.h.


Constructor & Destructor Documentation

ACE_TP_Reactor::ACE_TP_Reactor ACE_Sig_Handler   = 0,
ACE_Timer_Queue   = 0,
int    mask_signals = 1,
int    s_queue = ACE_Select_Reactor_Token::FIFO
 

Initialize <ACE_TP_Reactor> with the default size.

Definition at line 101 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew.

00105   : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }

ACE_TP_Reactor::ACE_TP_Reactor size_t    max_number_of_handles,
int    restart = 0,
ACE_Sig_Handler   = 0,
ACE_Timer_Queue   = 0,
int    mask_signals = 1,
int    s_queue = ACE_Select_Reactor_Token::FIFO
 

Initialize the <ACE_TP_Reactor> to manage <max_number_of_handles>. If <restart> is non-0 then the <ACE_Reactor>'s <handle_events> method will be restarted automatically when <EINTR> occurs. If <signal_handler> or <timer_queue> are non-0 they are used as the signal handler and timer queue, respectively.

Definition at line 111 of file TP_Reactor.cpp.

References ACE_TRACE, and ACE_Select_Reactor_Impl::supress_notify_renew.

00117   : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }

ACE_TP_Reactor::ACE_TP_Reactor const ACE_TP_Reactor &    [private]
 

Deny access since member-wise won't work...


Member Function Documentation

ACE_INLINE void ACE_TP_Reactor::clear_handle_read_set ACE_HANDLE    handle [private]
 

Clear the <handle> from the read_set.

Definition at line 96 of file TP_Reactor.i.

References ACE_Handle_Set::clr_bit, ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, and ACE_Select_Reactor_Handle_Set::wr_mask_.

Referenced by get_socket_event_info.

00097 {
00098   this->ready_set_.wr_mask_.clr_bit (handle);
00099   this->ready_set_.ex_mask_.clr_bit (handle);
00100   this->ready_set_.rd_mask_.clr_bit (handle);
00101 }

int ACE_TP_Reactor::dispatch_i ACE_Time_Value   max_wait_time,
ACE_TP_Token_Guard   guard
[protected]
 

Dispatch just 1 signal, timer, notification handlers.

Definition at line 329 of file TP_Reactor.cpp.

References get_event_for_dispatching, handle_notify_events, handle_signals, handle_socket_events, and handle_timer_events.

Referenced by handle_events.

00331 {
00332   int event_count =
00333     this->get_event_for_dispatching (max_wait_time);
00334 
00335   int result = 0;
00336 
00337   // Note: We are passing the <event_count> around, to have record of
00338   // how many events still need processing. May be this could be
00339   // useful in future.
00340 
00341   // Dispatch signals
00342   if (event_count == -1)
00343     {
00344       // Looks like we dont do any upcalls in dispatch signals. If at
00345       // a later point of time, we decide to handle signals we have to
00346       // release the lock before we make any upcalls.. What is here
00347       // now is not the right thing...
00348       // @@ We need to do better..
00349       return this->handle_signals (event_count,
00350                                    guard);
00351     }
00352 
00353   // If there are no signals and if we had received a proper
00354   // event_count then first look at dispatching timeouts. We need to
00355   // handle timers early since they may have higher latency
00356   // constraints than I/O handlers.  Ideally, the order of
00357   // dispatching should be a strategy...
00358 
00359   // NOTE: The event count does not have the number of timers that
00360   // needs dispatching. But we are still passing this along. We dont
00361   // need to do that. In the future we *may* have the timers also
00362   // returned through the <event_count>. Just passing that along for
00363   // that day.
00364   result = this->handle_timer_events (event_count,
00365                                       guard);
00366 
00367   if (result > 0)
00368     return result;
00369 
00370 
00371   // Else justgo ahead fall through for further handling
00372 
00373   if (event_count > 0)
00374     {
00375       // Next dispatch the notification handlers (if there are any to
00376       // dispatch).  These are required to handle multiple-threads that
00377       // are trying to update the <Reactor>.
00378       result = this->handle_notify_events (event_count,
00379                                            guard);
00380 
00381       if (result > 0)
00382         return result;
00383 
00384       // Else just fall through for further handling
00385     }
00386 
00387   if (event_count > 0)
00388     {
00389       // Handle socket events
00390       return this->handle_socket_events (event_count,
00391                                          guard);
00392     }
00393 
00394   return 0;
00395 
00396 }

int ACE_TP_Reactor::dispatch_socket_event ACE_EH_Dispatch_Info   dispatch_info [private]
 

Notify the appropriate <callback> in the context of the <eh> associated with <handle> that a particular event has occurred.

Definition at line 770 of file TP_Reactor.cpp.

References ACE_EH_PTMF, ACE_Reactor_Mask, ACE_TRACE, ACE_EH_Dispatch_Info::callback_, ACE_EH_Dispatch_Info::event_handler_, ACE_EH_Dispatch_Info::handle_, ACE_EH_Dispatch_Info::mask_, and remove_handler.

Referenced by handle_socket_events.

00771 {
00772   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00773 
00774   ACE_HANDLE handle = dispatch_info.handle_;
00775   ACE_Event_Handler *event_handler = dispatch_info.event_handler_;
00776   ACE_Reactor_Mask mask = dispatch_info.mask_;
00777   ACE_EH_PTMF callback = dispatch_info.callback_;
00778 
00779   // Check for removed handlers.
00780   if (event_handler == 0)
00781     return -1;
00782 
00783   // Upcall. If the handler returns positive value (requesting a
00784   // reactor callback) don't set the ready-bit because it will be
00785   // ignored if the reactor state has changed. Just call back
00786   // as many times as the handler requests it. Other threads are off
00787   // handling other things.
00788   int status = 1;
00789   while (status > 0)
00790     status = (event_handler->*callback) (handle);
00791 
00792   // If negative, remove from Reactor
00793   if (status < 0)
00794     {
00795       int retval =
00796         this->remove_handler (handle, mask);
00797 
00798       // As the handler is no longer valid, invalidate the handle
00799       dispatch_info.event_handler_ = 0;
00800       dispatch_info.handle_ = ACE_INVALID_HANDLE;
00801 
00802       return retval;
00803     }
00804 
00805   // assert (status >= 0);
00806   return 0;
00807 }

int ACE_TP_Reactor::get_event_for_dispatching ACE_Time_Value   max_wait_time [protected]
 

Get the event that needs dispatching.It could be either a signal, timer, notification handlers or return possibly 1 I/O handler for dispatching. In the most common use case, this would return 1 I/O handler for dispatching.

Definition at line 651 of file TP_Reactor.cpp.

References ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::ready_set_, ACE_Handle_Set::reset, ACE_Select_Reactor_Impl::state_changed_, ACE_Handle_Set::sync, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::wait_for_multiple_events, and ACE_Select_Reactor_Handle_Set::wr_mask_.

Referenced by dispatch_i.

00652 {
00653 
00654   // If the reactor handler state has changed, clear any remembered
00655   // ready bits and re-scan from the master wait_set.
00656   if (this->state_changed_)
00657     {
00658       this->ready_set_.rd_mask_.reset ();
00659       this->ready_set_.wr_mask_.reset ();
00660       this->ready_set_.ex_mask_.reset ();
00661       this->state_changed_ = 0;
00662     }
00663   else
00664     {
00665       // This is a hack... somewhere, under certain conditions (which
00666       // I don't understand...) the mask will have all of its bits clear,
00667       // yet have a size_ > 0. This is an attempt to remedy the affect,
00668       // without knowing why it happens.
00669 
00670       //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500))
00671       // SunCC seems to be having problems with this piece of code
00672       // here. I am  not sure why though. This works fine with other
00673       // compilers. As we dont seem to understand when this piece of
00674       // code is needed and as it creates problems for SunCC we will
00675       // not compile this. Most of the tests in TAO seem to be happy
00676       // without this in SunCC.
00677       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00678       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00679       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00680       //# endif /* ! __SUNPRO_CC */
00681 
00682     }
00683 
00684   return this->wait_for_multiple_events (this->ready_set_,
00685                                          max_wait_time);
00686 }

ACE_HANDLE ACE_TP_Reactor::get_notify_handle void    [private]
 

Get the handle of the notify pipe from the ready set if there is an event in the notify pipe.

Definition at line 845 of file TP_Reactor.cpp.

References ACE_Handle_Set::is_set, ACE_Reactor_Notify::notify_handle, ACE_Select_Reactor_Impl::notify_handler_, ACE_Select_Reactor_Handle_Set::rd_mask_, and ACE_Select_Reactor_Impl::ready_set_.

Referenced by handle_notify_events.

00846 {
00847   // Call the notify handler to get a handle on which we would have a
00848   // notify waiting
00849   ACE_HANDLE read_handle =
00850     this->notify_handler_->notify_handle ();
00851 
00852   // Check whether the rd_mask has been set on that handle. If so
00853   // return the handle.
00854   if (read_handle != ACE_INVALID_HANDLE &&
00855       this->ready_set_.rd_mask_.is_set (read_handle))
00856     {
00857       return read_handle;
00858     }
00859     /*if (read_handle != ACE_INVALID_HANDLE)
00860     {
00861       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00862       ACE_HANDLE handle = ACE_INVALID_HANDLE;
00863 
00864       while ((handle = handle_iter ()) == read_handle)
00865         {
00866           return read_handle;
00867         }
00868       ACE_UNUSED_ARG (handle);
00869       }*/
00870 
00871   // None found..
00872   return ACE_INVALID_HANDLE;
00873 }

int ACE_TP_Reactor::get_socket_event_info ACE_EH_Dispatch_Info   info [private]
 

Get socket event dispatch information.

Definition at line 689 of file TP_Reactor.cpp.

References clear_handle_read_set, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::handle_exception, ACE_Event_Handler::handle_input, ACE_Event_Handler::handle_output, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::is_suspended_i, ACE_Event_Handler::READ_MASK, ACE_EH_Dispatch_Info::reset, ACE_EH_Dispatch_Info::set, and ACE_Event_Handler::WRITE_MASK.

Referenced by handle_socket_events.

00690 {
00691   event.reset ();           // Nothing to dispatch yet
00692 
00693   // Check for dispatch in write, except, read. Only catch one, but if
00694   // one is caught, be sure to clear the handle from each mask in case
00695   // there is more than one mask set for it. This would cause problems
00696   // if the handler is suspended for dispatching, but its set bit in
00697   // another part of ready_set_ kept it from being dispatched.
00698   int found_io = 0;
00699   ACE_HANDLE handle;
00700 
00701   // @@todo: We can do quite a bit of code reduction here. Let me get
00702   // it to work before I do this.
00703   {
00704     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00705 
00706     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00707       {
00708         if (this->is_suspended_i (handle))
00709           continue;
00710 
00711         // Remember this info
00712         event.set (handle,
00713                    this->handler_rep_.find (handle),
00714                    ACE_Event_Handler::WRITE_MASK,
00715                    &ACE_Event_Handler::handle_output);
00716 
00717         this->clear_handle_read_set (handle);
00718         found_io = 1;
00719       }
00720   }
00721 
00722   if (!found_io)
00723     {
00724       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00725 
00726       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00727         {
00728           if (this->is_suspended_i (handle))
00729             continue;
00730 
00731           // Remember this info
00732           event.set (handle,
00733                      this->handler_rep_.find (handle),
00734                      ACE_Event_Handler::EXCEPT_MASK,
00735                      &ACE_Event_Handler::handle_exception);
00736 
00737           this->clear_handle_read_set (handle);
00738 
00739           found_io = 1;
00740         }
00741     }
00742 
00743   if (!found_io)
00744     {
00745       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00746 
00747       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00748         {
00749           if (this->is_suspended_i (handle))
00750             continue;
00751 
00752           // Remember this info
00753           event.set (handle,
00754                      this->handler_rep_.find (handle),
00755                      ACE_Event_Handler::READ_MASK,
00756                      &ACE_Event_Handler::handle_input);
00757 
00758           this->clear_handle_read_set (handle);
00759           found_io = 1;
00760         }
00761     }
00762 
00763   return found_io;
00764 }

int ACE_TP_Reactor::handle_events ACE_Time_Value   max_wait_time [virtual]
 

This method is just like the one above, except the <max_wait_time> value is a reference and can therefore never be NULL.

The only difference between <alertable_handle_events> and <handle_events> is that in the alertable case, the eventloop will return when the system queues an I/O completion routine or an Asynchronous Procedure Call.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 816 of file TP_Reactor.cpp.

References handle_events.

00817 {
00818   return this->handle_events (&max_wait_time);
00819 }

int ACE_TP_Reactor::handle_events ACE_Time_Value   max_wait_time = 0 [virtual]
 

This event loop driver that blocks for <max_wait_time> before returning. It will return earlier if timer events, I/O events, or signal events occur. Note that <max_wait_time> can be 0, in which case this method blocks indefinitely until events occur.

<max_wait_time> is decremented to reflect how much time this call took. For instance, if a time value of 3 seconds is passed to handle_events and an event occurs after 2 seconds, <max_wait_time> will equal 1 second. This can be used if an application wishes to handle events for some fixed amount of time.

Returns the total number of <ACE_Event_Handler>s that were dispatched, 0 if the <max_wait_time> elapsed without dispatching any handlers, or -1 if something goes wrong.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 143 of file TP_Reactor.cpp.

References ACE_TRACE, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::deactivated_, dispatch_i, ACE_TP_Token_Guard::grab_token, ACE_TP_Token_Guard::is_owner, and ACE_Countdown_Time::update.

Referenced by handle_events.

00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   // Stash the current time -- the destructor of this object will
00148   // automatically compute how much time elapsed since this method was
00149   // called.
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   // The order of these events is very subtle, modify with care.
00153 
00154 
00155   // Instantiate the token guard which will try grabbing the token for
00156   // this thread.
00157   ACE_TP_Token_Guard guard (this->token_);
00158 
00159 
00160   int result = guard.grab_token (max_wait_time);
00161 
00162   // If the guard is NOT the owner just return the retval
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   // After getting the lock just just for deactivation..
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   // Update the countdown to reflect time waiting for the token.
00171   countdown.update ();
00172 
00173 
00174   return this->dispatch_i (max_wait_time,
00175                            guard);
00176 }

int ACE_TP_Reactor::handle_notify_events int &    event_count,
ACE_TP_Token_Guard   g
[protected]
 

Handle notify events.

Definition at line 478 of file TP_Reactor.cpp.

References ACE_Handle_Set::clr_bit, ACE_Reactor_Notify::dispatch_notify, get_notify_handle, ACE_Reactor_Notify::is_dispatchable, notify_handle, ACE_Select_Reactor_Impl::notify_handler_, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Reactor_Notify::read_notify_pipe, ACE_Select_Reactor_Impl::ready_set_, and ACE_TP_Token_Guard::release_token.

Referenced by dispatch_i.

00480 {
00481   // Get the handle on which notify calls could have occured
00482   ACE_HANDLE notify_handle =
00483     this->get_notify_handle ();
00484 
00485   int result = 0;
00486 
00487   // The notify was not in the list returned by
00488   // wait_for_multiple_events ().
00489   if (notify_handle == ACE_INVALID_HANDLE)
00490     return result;
00491 
00492   // Now just do a read on the pipe..
00493   ACE_Notification_Buffer buffer;
00494 
00495   // Clear the handle of the read_mask of our <ready_set_>
00496   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00497 
00498   // Keep reading notifies till we empty it or till we have a
00499   // dispatchable buffer
00500   while (this->notify_handler_->read_notify_pipe (notify_handle,
00501                                                   buffer) > 0)
00502     {
00503       // Just figure out whether we can read  any buffer that has
00504       // dispatchable info. If not we have just been unblocked by
00505       // another thread trying to update the reactor. If we get any
00506       // buffer that needs dispatching we will dispatch that after
00507       // releasing the lock
00508       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00509         {
00510           // Release the token before dispatching notifies...
00511           guard.release_token ();
00512 
00513           // Dispatch the upcall for the notify
00514           this->notify_handler_->dispatch_notify (buffer);
00515 
00516           // We had a successful dispatch.
00517           result = 1;
00518 
00519           // break out of the while loop
00520           break;
00521         }
00522     }
00523 
00524   // If we did ssome work, then we just return 1 which will allow us
00525   // to get out of here. If we return 0, then we will be asked to do
00526   // some work ie. dispacth socket events
00527   return result;
00528 }

int ACE_TP_Reactor::handle_signals int &    event_count,
ACE_TP_Token_Guard   g
[protected]
 

Method to handle signals NOTE: It is just busted at this point in time.

Definition at line 402 of file TP_Reactor.cpp.

References ACE_TRACE, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::any_ready, and ACE_Sig_Handler::sig_pending.

Referenced by dispatch_i.

00404 {
00405   ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00406 
00407   /*
00408    *
00409    *             THIS METHOD SEEMS BROKEN
00410    *
00411    *
00412    */
00413   // First check for interrupts.
00414   // Bail out -- we got here since <select> was interrupted.
00415   if (ACE_Sig_Handler::sig_pending () != 0)
00416     {
00417       ACE_Sig_Handler::sig_pending (0);
00418 
00419       // This piece of code comes from the old TP_Reactor. We did not
00420       // handle signals at all then. If we happen to handle signals
00421       // in the TP_Reactor, we should then start worryiung about this
00422       // - Bala 21-Aug- 01
00423 #if 0
00424       // Not sure if this should be done in the TP_Reactor
00425       // case... leave it out for now.   -Steve Huston 22-Aug-00
00426 
00427       // If any HANDLES in the <ready_set_> are activated as a
00428       // result of signals they should be dispatched since
00429       // they may be time critical...
00430       active_handle_count = this->any_ready (dispatch_set);
00431  #else
00432       // active_handle_count = 0;
00433 #endif
00434 
00435       // Record the fact that the Reactor has dispatched a
00436       // handle_signal() method.  We need this to return the
00437       // appropriate count.
00438       return 1;
00439     }
00440 
00441   return -1;
00442 }

int ACE_TP_Reactor::handle_socket_events int &    event_count,
ACE_TP_Token_Guard   g
[protected]
 

handle socket events.

Definition at line 531 of file TP_Reactor.cpp.

References ACE_Event_Handler::ACE_EVENT_HANDLER_NOT_RESUMED, ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER, ACE_TP_Token_Guard::acquire_token, ACE_EH_Dispatch_Info::dispatch, dispatch_socket_event, ACE_EH_Dispatch_Info::event_handler_, ACE_Select_Reactor_Handler_Repository::find, get_socket_event_info, ACE_EH_Dispatch_Info::handle_, ACE_Select_Reactor_Impl::handler_rep_, ACE_Select_Reactor_Impl::notify_handler_, ACE_TP_Token_Guard::release_token, ACE_Event_Handler::resume_handler, ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::resume_i, and ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::suspend_i.

Referenced by dispatch_i.

00533 {
00534 
00535   // We got the lock, lets handle some I/O events.
00536   ACE_EH_Dispatch_Info dispatch_info;
00537 
00538   this->get_socket_event_info (dispatch_info);
00539 
00540   // If there is any event handler that is ready to be dispatched, the
00541   // dispatch information is recorded in dispatch_info.
00542   if (!dispatch_info.dispatch ())
00543     {
00544       return 0;
00545     }
00546 
00547   // Suspend the handler so that other threads don't start
00548   // dispatching it.
00549   // NOTE: This check was performed in older versions of the
00550   // TP_Reactor. Looks like it is a waste..
00551   if (dispatch_info.event_handler_ != this->notify_handler_)
00552     this->suspend_i (dispatch_info.handle_);
00553 
00554   // Release the lock.  Others threads can start waiting.
00555   guard.release_token ();
00556 
00557   int result = 0;
00558 
00559   // If there was an event handler ready, dispatch it.
00560   // Decrement the event left
00561   --event_count;
00562 
00563   if (this->dispatch_socket_event (dispatch_info) == 0)
00564     ++result;                // Dispatched an event
00565 
00566   // This is to get around a problem/ which  is well described in
00567   // 1361. This is just a work around that  would help applications
00568   // from resuming handles at the most  inopportune moment.
00569   int flag =
00570     ACE_Event_Handler::ACE_EVENT_HANDLER_NOT_RESUMED;
00571 
00572   // Acquire the token since we want to access the handler
00573   // repository. The call to find () does not hold a lock and hence
00574   // this is required.
00575   guard.acquire_token ();
00576 
00577   // Get the handler for the handle that we have dispatched to.
00578   ACE_Event_Handler *eh =
00579     this->handler_rep_.find (dispatch_info.handle_);
00580 
00581   // This check is required for the following reasons
00582   // 1. If dispatch operation returned a -1, then there is a
00583   // possibility that the event handler could be deleted. In such
00584   // cases the pointer to the event_handler that <dispatch_info>
00585   // holds is set to 0.
00586   //
00587   // 2. If the application did its own memory management, a return
00588   // value of 0 cannot be believed since the event handler could
00589   // be deleted by the application based on some conditions. This
00590   // is *bad*. But we dont have much of a choice with the existing
00591   // reactor setup. To get around this, we can make a check for
00592   // the handler registered with the repository for the handle
00593   // that we have and compare with the handler that we
00594   // posses. Yeah, I know it is like touching your nose by taking
00595   // your hand around your head. But that is the way it is. This
00596   // is a fix for [BUGID 1231]
00597 
00598   if (dispatch_info.event_handler_ != 0 &&
00599       eh == dispatch_info.event_handler_)
00600     {
00601       flag =
00602            dispatch_info.event_handler_->resume_handler ();
00603     }
00604 
00605   // Use resume_i () since we hold the token already.
00606   if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
00607       dispatch_info.event_handler_ != this->notify_handler_ &&
00608       flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00609     this->resume_i (dispatch_info.handle_);
00610 
00611   // Let me release the token here. This is not required since the
00612   // destruction of the object on the stack will take care of this.
00613   guard.release_token ();
00614 
00615   return result;
00616 }

int ACE_TP_Reactor::handle_timer_events int &    event_count,
ACE_TP_Token_Guard   g
[protected]
 

Handle timer events.

Definition at line 446 of file TP_Reactor.cpp.

References ACE_Timer_Node_Dispatch_Info_T::act_, ACE_Timer_Queue_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall< ACE_SYNCH_RECURSIVE_MUTEX >, ACE_SYNCH_RECURSIVE_MUTEX >::dispatch_info, ACE_TP_Token_Guard::release_token, ACE_Select_Reactor_Impl::timer_queue_, ACE_Timer_Queue_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall< ACE_SYNCH_RECURSIVE_MUTEX >, ACE_SYNCH_RECURSIVE_MUTEX >::timer_skew, ACE_Timer_Node_Dispatch_Info_T::type_, and ACE_Timer_Queue_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall< ACE_SYNCH_RECURSIVE_MUTEX >, ACE_SYNCH_RECURSIVE_MUTEX >::upcall.

Referenced by dispatch_i.

00448 {
00449   // Get the current time
00450   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00451                            this->timer_queue_->timer_skew ());
00452 
00453   // Look for a node in the timer queue whose timer <= the present
00454   // time.
00455   ACE_Timer_Node_Dispatch_Info info;
00456 
00457   if (this->timer_queue_->dispatch_info (cur_time,
00458                                          info))
00459     {
00460       // Release the token before dispatching notifies...
00461       guard.release_token ();
00462 
00463       // call the functor
00464       this->timer_queue_->upcall (info.type_,
00465                                   info.act_,
00466                                   cur_time);
00467 
00468       // We have dispatched a timer
00469       return 1;
00470     }
00471 
00472   return 0;
00473 }

int ACE_TP_Reactor::mask_ops ACE_HANDLE    handle,
ACE_Reactor_Mask    mask,
int    ops
[virtual]
 

GET/SET/ADD/CLR the dispatch mask "bit" bound with the <handle> and <mask>.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 619 of file TP_Reactor.cpp.

References ACE_GUARD_RETURN, ACE_MT, ACE_Reactor_Mask, ACE_TRACE, ACE_Select_Reactor_Impl::bit_ops, ACE_Select_Reactor_Handle_Set::ex_mask_, ACE_Handle_Set::is_set, ACE_Select_Reactor_Handle_Set::rd_mask_, ACE_Select_Reactor_Impl::suspend_set_, and ACE_Select_Reactor_Handle_Set::wr_mask_.

00622 {
00623   ACE_TRACE ("ACE_TP_Reactor::mask_ops");
00624   ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
00625           ace_mon, this->token_, -1));
00626 
00627   int result = 0;
00628 
00629   // If it looks like the handle isn't suspended, then
00630   // set the ops on the wait_set_, otherwise set the suspend_set_.
00631 
00632   if (this->suspend_set_.rd_mask_.is_set (handle) == 0
00633       && this->suspend_set_.wr_mask_.is_set (handle) == 0
00634       && this->suspend_set_.ex_mask_.is_set (handle) == 0)
00635 
00636     result = this->bit_ops (handle, mask,
00637                             this->wait_set_,
00638                             ops);
00639   else
00640 
00641     result = this->bit_ops (handle, mask,
00642                             this->suspend_set_,
00643                             ops);
00644 
00645   return result;
00646 }

int ACE_TP_Reactor::mask_ops ACE_Event_Handler   eh,
ACE_Reactor_Mask    mask,
int    ops
[virtual]
 

GET/SET/ADD/CLR the dispatch mask "bit" bound with the <eh> and <mask>.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 822 of file TP_Reactor.cpp.

References ACE_Reactor_Mask, and ACE_Event_Handler::get_handle.

00825 {
00826   return this->mask_ops (eh->get_handle (), mask, ops);
00827 }

ACE_INLINE void ACE_TP_Reactor::no_op_sleep_hook void *    [static]
 

Called from handle events.

Definition at line 91 of file TP_Reactor.i.

Referenced by ACE_TP_Token_Guard::grab_token.

00092 {
00093 }

void ACE_TP_Reactor::notify_handle ACE_HANDLE    handle,
ACE_Reactor_Mask    mask,
ACE_Handle_Set  ,
ACE_Event_Handler   eh,
ACE_EH_PTMF    callback
[protected, virtual]
 

This method shouldn't get called.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 830 of file TP_Reactor.cpp.

References ACE_ASSERT, ACE_EH_PTMF, ACE_ERROR, ACE_LIB_TEXT, ACE_Reactor_Mask, and LM_ERROR.

Referenced by handle_notify_events.

00835 {
00836   ACE_ERROR ((LM_ERROR,
00837               ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: ")
00838               ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n")));
00839 
00840   ACE_ASSERT (eh == 0);
00841   ACE_UNUSED_ARG (eh);
00842 }

ACE_TP_Reactor& ACE_TP_Reactor::operator= const ACE_TP_Reactor &    [private]
 

int ACE_TP_Reactor::owner ACE_thread_t   [virtual]
 

Return the current owner of the thread.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 134 of file TP_Reactor.cpp.

References ACE_thread_t, ACE_TRACE, and ACE_Thread::self.

00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }

int ACE_TP_Reactor::owner ACE_thread_t    n_id,
ACE_thread_t   o_id = 0
[virtual]
 

Set the new owner of the thread and return the old owner.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 124 of file TP_Reactor.cpp.

References ACE_thread_t, ACE_TRACE, and ACE_Thread::self.

00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }

int ACE_TP_Reactor::remove_handler const ACE_Sig_Set   sigset [virtual]
 

Calls <remove_handler> for every signal in <sigset>.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 322 of file TP_Reactor.cpp.

00323 {
00324   ACE_NOTSUP_RETURN (-1);
00325 }

int ACE_TP_Reactor::remove_handler int    signum,
ACE_Sig_Action   new_disp,
ACE_Sig_Action   old_disp = 0,
int    sigkey = -1
[virtual]
 

Remove the ACE_Event_Handler currently associated with <signum>. <sigkey> is ignored in this implementation since there is only one instance of a signal handler. Install the new disposition (if given) and return the previous disposition (if desired by the caller). Returns 0 on success and -1 if <signum> is invalid.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 313 of file TP_Reactor.cpp.

00317 {
00318   ACE_NOTSUP_RETURN (-1);
00319 }

int ACE_TP_Reactor::remove_handler const ACE_Handle_Set   handle_set,
ACE_Reactor_Mask   
[virtual]
 

Removes all the <mask> bindings for handles in the <handle_set> bind of <Event_Handler>. If there are no more bindings for any of these handlers then they are removed from the Select_Reactor.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 255 of file TP_Reactor.cpp.

References ACE_BIT_ENABLED, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_TP_Token_Guard::acquire_token, ACE_Event_Handler::DONT_CALL, ACE_Select_Reactor_Handler_Repository::find, ACE_Event_Handler::handle_close, ACE_Select_Reactor_Impl::handler_rep_, ACE_TP_Token_Guard::is_owner, ACE_Handle_Set::num_set, and ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::remove_handler_i.

00257 {
00258   // Array of <Event_Handlers> corresponding to <handles>
00259   ACE_Event_Handler **aeh = 0;
00260 
00261   // Allocate memory for the size of the handle set
00262   ACE_NEW_RETURN (aeh,
00263                   ACE_Event_Handler *[handles.num_set ()],
00264                   -1);
00265 
00266   size_t index = 0;
00267 
00268   // Artificial scoping for grabbing and releasing the token
00269   {
00270     ACE_TP_Token_Guard guard (this->token_);
00271 
00272     // Acquire the token
00273     int result = guard.acquire_token ();
00274 
00275     if (!guard.is_owner ())
00276       return result;
00277 
00278     ACE_HANDLE h;
00279 
00280     ACE_Handle_Set_Iterator handle_iter (handles);
00281 
00282     while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
00283       {
00284         size_t slot = 0;
00285         ACE_Event_Handler *eh =
00286           this->handler_rep_.find (h, &slot);
00287 
00288         if (this->remove_handler_i (h,
00289                                     m | ACE_Event_Handler::DONT_CALL) == -1)
00290           {
00291             delete [] aeh;
00292             return -1;
00293           }
00294 
00295         aeh [index] = eh;
00296         index ++;
00297       }
00298   }
00299 
00300   // Close down the <Event_Handler> unless we've been instructed not
00301   // to.
00302   if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0)
00303     {
00304       for (size_t i = 0; i < index; i++)
00305         aeh[i]->handle_close (ACE_INVALID_HANDLE, m);
00306     }
00307 
00308   delete [] aeh;
00309   return 0;
00310 }

int ACE_TP_Reactor::remove_handler ACE_HANDLE    handle,
ACE_Reactor_Mask   
[virtual]
 

Removes the <mask> bind of <Event_Handler> whose handle is <handle> from the Select_Reactor. If there are no more bindings for this <eh> then it is removed from the Select_Reactor.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 212 of file TP_Reactor.cpp.

References ACE_BIT_ENABLED, ACE_Reactor_Mask, ACE_TP_Token_Guard::acquire_token, ACE_Event_Handler::DONT_CALL, ACE_Select_Reactor_Handler_Repository::find, ACE_Event_Handler::handle_close, ACE_Select_Reactor_Impl::handler_rep_, ACE_TP_Token_Guard::is_owner, and ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::remove_handler_i.

00214 {
00215 
00216   ACE_Event_Handler *eh = 0;
00217   int result = 0;
00218   // Artificial scoping for grabbing and releasing the token
00219   {
00220     ACE_TP_Token_Guard guard (this->token_);
00221 
00222     // Acquire the token
00223     result = guard.acquire_token ();
00224 
00225     if (!guard.is_owner ())
00226       return result;
00227 
00228     size_t slot = 0;
00229     eh =  this->handler_rep_.find (handle, &slot);
00230 
00231     if (eh == 0)
00232       return -1;
00233 
00234     // Call the remove_handler_i () with a DONT_CALL mask. We dont
00235     // want to call the handle_close with the token held.
00236     result = this->remove_handler_i (handle,
00237                                      mask | ACE_Event_Handler::DONT_CALL);
00238 
00239     if (result == -1)
00240       return -1;
00241   }
00242 
00243   // Close down the <Event_Handler> unless we've been instructed not
00244   // to.
00245   // @@ Note: The check for result ==0 may be redundant, but shouldnt
00246   // be a problem.
00247   if (result ==0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
00248     eh->handle_close (handle, mask);
00249 
00250   return 0;
00251 }

int ACE_TP_Reactor::remove_handler ACE_Event_Handler   eh,
ACE_Reactor_Mask    mask
[virtual]
 

The following two overloaded methods are necessary as we dont want the TP_Reactor to call handle_close () with the token held.

Removes the <mask> binding of <eh> from the Select_Reactor. If there are no more bindings for this <eh> then it is removed from the Select_Reactor. Note that the Select_Reactor will call <ACE_Event_Handler::get_handle> to extract the underlying I/O handle.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 180 of file TP_Reactor.cpp.

References ACE_BIT_ENABLED, ACE_Reactor_Mask, ACE_TP_Token_Guard::acquire_token, ACE_Event_Handler::DONT_CALL, ACE_Event_Handler::get_handle, ACE_Event_Handler::handle_close, ACE_TP_Token_Guard::is_owner, and ACE_Select_Reactor_T< ACE_Select_Reactor_Token >::remove_handler_i.

Referenced by dispatch_socket_event.

00182 {
00183   int result = 0;
00184   // Artificial scoping for grabbing and releasing the token
00185   {
00186     ACE_TP_Token_Guard guard (this->token_);
00187 
00188     // Acquire the token
00189     result = guard.acquire_token ();
00190 
00191     if (!guard.is_owner ())
00192       return result;
00193 
00194     // Call the remove_handler_i () with a DONT_CALL mask. We dont
00195     // want to call the handle_close with the token held.
00196     result = this->remove_handler_i (eh->get_handle (),
00197                                      mask | ACE_Event_Handler::DONT_CALL);
00198 
00199     if (result == -1)
00200       return -1;
00201   }
00202 
00203   // Close down the <Event_Handler> unless we've been instructed not
00204   // to.
00205   if (result == 0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
00206     eh->handle_close (ACE_INVALID_HANDLE, mask);
00207 
00208   return 0;
00209 }

int ACE_TP_Reactor::resumable_handler void    [virtual]
 

Does the reactor allow the application to resume the handle on its own ie. can it pass on the control of handle resumption to the application. The TP reactor has can allow applications to resume handles. So return a +ve value.

Reimplemented from ACE_Select_Reactor_Impl.

Definition at line 810 of file TP_Reactor.cpp.

00811 {
00812   return 1;
00813 }


Member Data Documentation

ACE_TP_Reactor::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Reimplemented from ACE_Select_Reactor_T< ACE_Select_Reactor_Token >.

Definition at line 297 of file TP_Reactor.h.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 12:58:36 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002