Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ACE_WFMO_Reactor_Notify Class Reference

Unblock the <ACE_WFMO_Reactor> from its event loop, passing it an optional <ACE_Event_Handler> to dispatch. More...

#include <WFMO_Reactor.h>

Inheritance diagram for ACE_WFMO_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_WFMO_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Methods

 ACE_WFMO_Reactor_Notify (size_t max_notifies=1024)
 Constructor. More...

virtual int open (ACE_Reactor_Impl *wfmo_reactor, ACE_Timer_Queue *timer_queue, int disable_notify=0)
 Initialization. <timer_queue> is stored to call <gettimeofday>. More...

virtual int close (void)
 No-op. More...

virtual int notify (ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
 No-op. More...

virtual ACE_HANDLE get_handle (void) const
 Returns a handle to the <ACE_Auto_Event>. More...

virtual ACE_HANDLE notify_handle (void)
 Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the <Reactor_Impl>. More...

virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
 Handle one of the notify call on the <handle>. This could be because of a thread trying to unblock the <Reactor_Impl>. More...

virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not. More...

virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
 Read one of the notify call on the <handle> into the <buffer>. This could be because of a thread trying to unblock the <Reactor_Impl>. More...

void max_notify_iterations (int)
int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object. More...


Private Methods

virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)

Private Attributes

ACE_Timer_Queuetimer_queue_
 Pointer to the wfmo_reactor's timer queue. More...

ACE_Auto_Event wakeup_one_thread_
 An auto event is used so that we can <signal> it to wakeup one thread up (e.g., when the <notify> method is called). More...

ACE_Message_Queue< ACE_MT_SYNCHmessage_queue_
 Message queue that keeps track of pending <ACE_Event_Handlers>. This queue must be thread-safe because it can be called by multiple threads of control. More...

int max_notify_iterations_

Detailed Description

Unblock the <ACE_WFMO_Reactor> from its event loop, passing it an optional <ACE_Event_Handler> to dispatch.

This implementation is necessary for cases where the <ACE_WFMO_Reactor> is run in a multi-threaded program. In this case, we need to be able to unblock <WaitForMultipleObjects> when updates occur other than in the main <ACE_WFMO_Reactor> thread. To do this, we signal an auto-reset event the <ACE_WFMO_Reactor> is listening on. If an <ACE_Event_Handler> and <ACE_Reactor_Mask> is passed to <notify>, the appropriate <handle_*> method is dispatched.

Definition at line 475 of file WFMO_Reactor.h.


Constructor & Destructor Documentation

ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify size_t    max_notifies = 1024
 

Constructor.

Definition at line 2213 of file WFMO_Reactor.cpp.

02214   : timer_queue_ (0),
02215     message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
02216                     max_notifies * sizeof (ACE_Notification_Buffer)),
02217     max_notify_iterations_ (-1)
02218 {
02219 }


Member Function Documentation

int ACE_WFMO_Reactor_Notify::close void    [virtual]
 

No-op.

Implements ACE_Reactor_Notify.

Definition at line 2208 of file WFMO_Reactor.cpp.

02209 {
02210   return -1;
02211 }

int ACE_WFMO_Reactor_Notify::dispatch_notifications int &    number_of_active_handles,
ACE_Handle_Set   rd_mask
[virtual]
 

No-op.

Implements ACE_Reactor_Notify.

Definition at line 2176 of file WFMO_Reactor.cpp.

02178 {
02179   return -1;
02180 }

int ACE_WFMO_Reactor_Notify::dispatch_notify ACE_Notification_Buffer   buffer [virtual]
 

Handle one of the notify call on the <handle>. This could be because of a thread trying to unblock the <Reactor_Impl>.

Implements ACE_Reactor_Notify.

Definition at line 2202 of file WFMO_Reactor.cpp.

02203 {
02204   return 0;
02205 }

void ACE_WFMO_Reactor_Notify::dump void    const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 2484 of file WFMO_Reactor.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Timer_Queue_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall< ACE_SYNCH_RECURSIVE_MUTEX >, ACE_SYNCH_RECURSIVE_MUTEX >::dump, LM_DEBUG, and timer_queue_.

02485 {
02486   ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
02487   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02488   this->timer_queue_->dump ();
02489   ACE_DEBUG ((LM_DEBUG,
02490               ACE_LIB_TEXT ("Max. iteration: %d\n"),
02491               this->max_notify_iterations_));
02492   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02493 }

ACE_HANDLE ACE_WFMO_Reactor_Notify::get_handle void    const [virtual]
 

Returns a handle to the <ACE_Auto_Event>.

Reimplemented from ACE_Event_Handler.

Definition at line 2232 of file WFMO_Reactor.cpp.

References ACE_Event::handle, and wakeup_one_thread_.

02233 {
02234   return this->wakeup_one_thread_.handle ();
02235 }

int ACE_WFMO_Reactor_Notify::handle_signal int    signum,
siginfo_t   = 0,
ucontext_t   = 0
[private, virtual]
 

Called when the notification event waited on by <ACE_WFMO_Reactor> is signaled. This dequeues all pending <ACE_Event_Handlers> and dispatches them.

Reimplemented from ACE_Event_Handler.

Definition at line 2240 of file WFMO_Reactor.cpp.

References ACE_Event_Handler::ACCEPT_MASK, ACE_ERROR, ACE_LIB_TEXT, ACE_Message_Block::base, ACE_Message_Queue< ACE_MT_SYNCH >::dequeue_head, ACE_Notification_Buffer::eh_, EWOULDBLOCK, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::GROUP_QOS_MASK, ACE_Event::handle, ACE_Event_Handler::handle_close, ACE_Event_Handler::handle_exception, ACE_Event_Handler::handle_group_qos, ACE_Event_Handler::handle_input, ACE_Event_Handler::handle_output, ACE_Event_Handler::handle_qos, ACE_Message_Queue< ACE_MT_SYNCH >::is_empty, LM_ERROR, ACE_Notification_Buffer::mask_, max_notify_iterations_, message_queue_, ACE_Event_Handler::QOS_MASK, ACE_Event_Handler::READ_MASK, ACE_Message_Block::release, siginfo_t::si_handle_, ACE_Event::signal, ucontext_t, wakeup_one_thread_, ACE_Event_Handler::WRITE_MASK, and ACE_Time_Value::zero.

02243 {
02244   ACE_UNUSED_ARG (signum);
02245 
02246   // Just check for sanity...
02247   if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
02248     return -1;
02249 
02250   // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
02251   // is signaled.
02252   //  ACE_DEBUG ((LM_DEBUG,
02253   //             ACE_LIB_TEXT ("(%t) waking up to handle internal notifications\n")));
02254 
02255   for (int i = 1; ; i++)
02256     {
02257       ACE_Message_Block *mb = 0;
02258       // Copy ACE_Time_Value::zero since dequeue_head will modify it.
02259       ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
02260       if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
02261         {
02262           if (errno == EWOULDBLOCK)
02263             // We've reached the end of the processing, return
02264             // normally.
02265             return 0;
02266           else
02267             return -1; // Something weird happened...
02268         }
02269       else
02270         {
02271           ACE_Notification_Buffer *buffer =
02272             (ACE_Notification_Buffer *) mb->base ();
02273 
02274           // If eh == 0 then we've got major problems!  Otherwise, we
02275           // need to dispatch the appropriate handle_* method on the
02276           // ACE_Event_Handler pointer we've been passed.
02277 
02278           if (buffer->eh_ != 0)
02279             {
02280               int result = 0;
02281 
02282               switch (buffer->mask_)
02283                 {
02284                 case ACE_Event_Handler::READ_MASK:
02285                 case ACE_Event_Handler::ACCEPT_MASK:
02286                   result = buffer->eh_->handle_input (ACE_INVALID_HANDLE);
02287                   break;
02288                 case ACE_Event_Handler::WRITE_MASK:
02289                   result = buffer->eh_->handle_output (ACE_INVALID_HANDLE);
02290                   break;
02291                 case ACE_Event_Handler::EXCEPT_MASK:
02292                   result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE);
02293                   break;
02294                 case ACE_Event_Handler::QOS_MASK:
02295                   result = buffer->eh_->handle_qos (ACE_INVALID_HANDLE);
02296                   break;
02297                 case ACE_Event_Handler::GROUP_QOS_MASK:
02298                   result = buffer->eh_->handle_group_qos (ACE_INVALID_HANDLE);
02299                   break;
02300                 default:
02301                   ACE_ERROR ((LM_ERROR,
02302                               ACE_LIB_TEXT ("invalid mask = %d\n"),
02303                               buffer->mask_));
02304                   break;
02305                 }
02306               if (result == -1)
02307                 buffer->eh_->handle_close (ACE_INVALID_HANDLE,
02308                                            ACE_Event_Handler::EXCEPT_MASK);
02309             }
02310 
02311           // Make sure to delete the memory regardless of success or
02312           // failure!
02313           mb->release ();
02314 
02315           // Bail out if we've reached the <max_notify_iterations_>.
02316           // Note that by default <max_notify_iterations_> is -1, so
02317           // we'll loop until we're done.
02318           if (i == this->max_notify_iterations_)
02319             {
02320               // If there are still notification in the queue, we need
02321               // to wake up again
02322               if (!this->message_queue_.is_empty ())
02323                 this->wakeup_one_thread_.signal ();
02324 
02325               // Break the loop as we have reached max_notify_iterations_
02326               return 0;
02327             }
02328         }
02329     }
02330 }

int ACE_WFMO_Reactor_Notify::is_dispatchable ACE_Notification_Buffer   buffer [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 2183 of file WFMO_Reactor.cpp.

02184 {
02185   return 0;
02186 }

int ACE_WFMO_Reactor_Notify::max_notify_iterations void    [virtual]
 

Get the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop.

Implements ACE_Reactor_Notify.

Definition at line 2382 of file WFMO_Reactor.cpp.

References ACE_TRACE, and max_notify_iterations_.

02383 {
02384   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02385   return this->max_notify_iterations_;
02386 }

void ACE_WFMO_Reactor_Notify::max_notify_iterations int    [virtual]
 

Set the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. By default, this is set to -1, which means "iterate until the queue is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 2371 of file WFMO_Reactor.cpp.

References ACE_TRACE, and max_notify_iterations_.

02372 {
02373   ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
02374   // Must always be > 0 or < 0 to optimize the loop exit condition.
02375   if (iterations == 0)
02376     iterations = 1;
02377 
02378   this->max_notify_iterations_ = iterations;
02379 }

int ACE_WFMO_Reactor_Notify::notify ACE_Event_Handler   event_handler = 0,
ACE_Reactor_Mask    mask = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value   timeout = 0
[virtual]
 

Special trick to unblock <WaitForMultipleObjects> when updates occur. All we do is enqueue <event_handler> and <mask> onto the <ACE_Message_Queue> and wakeup the <WFMO_Reactor> by signaling its <ACE_Event> handle. The <ACE_Time_Value> indicates how long to blocking trying to notify the <WFMO_Reactor>. If <timeout> == 0, the caller will block until action is possible, else will wait until the relative time specified in <timeout> elapses).

Implements ACE_Reactor_Notify.

Definition at line 2337 of file WFMO_Reactor.cpp.

References ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_Message_Block::base, ACE_Notification_Buffer::eh_, ACE_Message_Queue< ACE_MT_SYNCH >::enqueue_tail, ACE_Timer_Queue_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall< ACE_SYNCH_RECURSIVE_MUTEX >, ACE_SYNCH_RECURSIVE_MUTEX >::gettimeofday, ACE_Notification_Buffer::mask_, message_queue_, ACE_Message_Block::release, ACE_Event::signal, timer_queue_, and wakeup_one_thread_.

02340 {
02341   if (eh != 0)
02342     {
02343       ACE_Message_Block *mb = 0;
02344       ACE_NEW_RETURN (mb,
02345                       ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
02346                       -1);
02347 
02348       ACE_Notification_Buffer *buffer =
02349         (ACE_Notification_Buffer *) mb->base ();
02350       buffer->eh_ = eh;
02351       buffer->mask_ = mask;
02352 
02353       // Convert from relative time to absolute time by adding the
02354       // current time of day.  This is what <ACE_Message_Queue>
02355       // expects.
02356       if (timeout != 0)
02357         *timeout += timer_queue_->gettimeofday ();
02358 
02359       if (this->message_queue_.enqueue_tail
02360           (mb, timeout) == -1)
02361         {
02362           mb->release ();
02363           return -1;
02364         }
02365     }
02366 
02367   return this->wakeup_one_thread_.signal ();
02368 }

ACE_HANDLE ACE_WFMO_Reactor_Notify::notify_handle void    [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the <Reactor_Impl>.

Implements ACE_Reactor_Notify.

Definition at line 2189 of file WFMO_Reactor.cpp.

02190 {
02191   return ACE_INVALID_HANDLE;
02192 }

int ACE_WFMO_Reactor_Notify::open ACE_Reactor_Impl   wfmo_reactor,
ACE_Timer_Queue   timer_queue,
int    disable_notify = 0
[virtual]
 

Initialization. <timer_queue> is stored to call <gettimeofday>.

Implements ACE_Reactor_Notify.

Definition at line 2222 of file WFMO_Reactor.cpp.

References ACE_Reactor_Impl::register_handler, and timer_queue_.

02225 {
02226   ACE_UNUSED_ARG (ignore_notify);
02227   timer_queue_ = timer_queue;
02228   return wfmo_reactor->register_handler (this);
02229 }

int ACE_WFMO_Reactor_Notify::purge_pending_notifications ACE_Event_Handler  ,
ACE_Reactor_Mask    = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified <ACE_Event_Handler> object. If <eh> == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 2389 of file WFMO_Reactor.cpp.

References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_GUARD_RETURN, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Message_Block::base, ACE_Message_Queue::dequeue_head, ACE_Message_Queue< ACE_MT_SYNCH >::dequeue_head, ACE_Notification_Buffer::eh_, ACE_Message_Queue< ACE_MT_SYNCH >::enqueue_head, ACE_Message_Queue::enqueue_head, ACE_Message_Queue< ACE_MT_SYNCH >::is_empty, ACE_Notification_Buffer::mask_, ACE_Message_Queue::message_count, ACE_Message_Queue< ACE_MT_SYNCH >::message_count, message_queue_, and ACE_Message_Block::release.

02391 {
02392   ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
02393 
02394   // Go over message queue and take out all the matching event
02395   // handlers.  If eh == 0, purge all. Note that reactor notifies (no
02396   // handler specified) are never purged, as this may lose a needed
02397   // notify the reactor queued for itself.
02398 
02399   if (this->message_queue_.is_empty ())
02400     return 0;
02401 
02402   // Guard against new and/or delivered notifications while purging.
02403   // WARNING!!! The use of the notification queue's lock object for
02404   // this guard makes use of the knowledge that on Win32, the mutex
02405   // protecting the queue is really a CriticalSection, which is
02406   // recursive. This is how we can get away with locking it down here
02407   // and still calling member functions on the queue object.
02408   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
02409 
02410   // first, copy all to our own local queue. Since we've locked everyone out
02411   // of here, there's no need to use any synchronization on this queue.
02412   ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
02413 
02414   size_t queue_size  = this->message_queue_.message_count ();
02415   int number_purged = 0;
02416 
02417   size_t index;
02418 
02419   for (index = 0; index < queue_size; ++index)
02420     {
02421       ACE_Message_Block  *mb;
02422       if (-1 == this->message_queue_.dequeue_head (mb))
02423         return -1;        // This shouldn't happen...
02424 
02425       ACE_Notification_Buffer *buffer =
02426         ACE_reinterpret_cast (ACE_Notification_Buffer *, mb->base ());
02427 
02428       // If this is not a Reactor notify (it is for a particular handler),
02429       // and it matches the specified handler (or purging all),
02430       // and applying the mask would totally eliminate the notification, then
02431       // release it and count the number purged.
02432       if ((0 != buffer->eh_) &&
02433           (0 == eh || eh == buffer->eh_) &&
02434           ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
02435                                                    // is left with nothing when
02436                                                    // applying the mask
02437       {
02438         mb->release ();
02439         ++number_purged;
02440       }
02441       else
02442       {
02443         // To preserve it, move it to the local_queue.  But first, if
02444         // this is not a Reactor notify (it is for a
02445         // particularhandler), and it matches the specified handler
02446         // (or purging all), then apply the mask
02447         if ((0 != buffer->eh_) &&
02448             (0 == eh || eh == buffer->eh_))
02449           ACE_CLR_BITS(buffer->mask_, mask);
02450         if (-1 == local_queue.enqueue_head (mb))
02451           return -1;
02452       }
02453     }
02454 
02455   if (this->message_queue_.message_count ())
02456     { // Should be empty!
02457       ACE_ASSERT (0);
02458       return -1;
02459     }
02460 
02461   // Now copy back from the local queue to the class queue, taking
02462   // care to preserve the original order...
02463   queue_size  = local_queue.message_count ();
02464   for (index = 0; index < queue_size; ++index)
02465     {
02466       ACE_Message_Block  *mb;
02467       if (-1 == local_queue.dequeue_head (mb))
02468         {
02469           ACE_ASSERT (0);
02470           return -1;
02471         }
02472 
02473       if (-1 == this->message_queue_.enqueue_head (mb))
02474         {
02475           ACE_ASSERT (0);
02476           return -1;
02477         }
02478     }
02479 
02480   return number_purged;
02481 }

int ACE_WFMO_Reactor_Notify::read_notify_pipe ACE_HANDLE    handle,
ACE_Notification_Buffer   buffer
[virtual]
 

Read one of the notify call on the <handle> into the <buffer>. This could be because of a thread trying to unblock the <Reactor_Impl>.

Implements ACE_Reactor_Notify.

Definition at line 2195 of file WFMO_Reactor.cpp.

02197 {
02198   return 0;
02199 }


Member Data Documentation

int ACE_WFMO_Reactor_Notify::max_notify_iterations_ [private]
 

Keeps track of the maximum number of times that the <ACE_WFMO_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify queue before breaking out of its <ACE_Message_Queue::dequeue> loop. By default, this is set to -1, which means "iterate until the queue is empty."

Definition at line 590 of file WFMO_Reactor.h.

Referenced by handle_signal, and max_notify_iterations.

ACE_Message_Queue<ACE_MT_SYNCH> ACE_WFMO_Reactor_Notify::message_queue_ [private]
 

Message queue that keeps track of pending <ACE_Event_Handlers>. This queue must be thread-safe because it can be called by multiple threads of control.

Definition at line 580 of file WFMO_Reactor.h.

Referenced by handle_signal, notify, and purge_pending_notifications.

ACE_Timer_Queue* ACE_WFMO_Reactor_Notify::timer_queue_ [private]
 

Pointer to the wfmo_reactor's timer queue.

Definition at line 564 of file WFMO_Reactor.h.

Referenced by dump, notify, and open.

ACE_Auto_Event ACE_WFMO_Reactor_Notify::wakeup_one_thread_ [private]
 

An auto event is used so that we can <signal> it to wakeup one thread up (e.g., when the <notify> method is called).

Definition at line 575 of file WFMO_Reactor.h.

Referenced by get_handle, handle_signal, and notify.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 12:59:20 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002