Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ACE_Dev_Poll_Reactor_Notify Class Reference

Event handler used for unblocking the ACE_Dev_Poll_Reactor from its event loop. More...

#include <Dev_Poll_Reactor.h>

Inheritance diagram for ACE_Dev_Poll_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_Dev_Poll_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Methods

 ACE_Dev_Poll_Reactor_Notify (void)
 Constructor. More...

virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *timer_queue=0, int disable_notify=0)
virtual int close (void)
virtual ssize_t notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
 Unimplemented method required by pure virtual method in abstract base class. More...

virtual ACE_HANDLE notify_handle (void)
 Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Reactor_Impl. More...

virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not. More...

virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
 Handle one of the notify call on the handle. This could be because of a thread trying to unblock the Reactor_Impl. More...

virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
 Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl. More...

virtual int handle_input (ACE_HANDLE handle)
 Called back by the ACE_Dev_Poll_Reactor when a thread wants to unblock us. More...

virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object. More...


Protected Attributes

ACE_Dev_Poll_Reactordp_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Event handler used for unblocking the ACE_Dev_Poll_Reactor from its event loop.

This event handler is used internally by the ACE_Dev_Poll_Reactor as a means to allow a thread other then the one running the event loop to unblock the event loop.

Definition at line 153 of file Dev_Poll_Reactor.h.


Constructor & Destructor Documentation

ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify void   
 

Constructor.

Definition at line 50 of file Dev_Poll_Reactor.cpp.

00051   : dp_reactor_ (0)
00052   , notification_pipe_ ()
00053   , max_notify_iterations_ (-1)
00054 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00055   , alloc_queue_ ()
00056   , notify_queue_ ()
00057   , free_queue_ ()
00058   , notify_queue_lock_ ()
00059 #endif  /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00060 {
00061 }


Member Function Documentation

int ACE_Dev_Poll_Reactor_Notify::close void    [virtual]
 

Implements ACE_Reactor_Notify.

Definition at line 117 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, ACE_Unbounded_Queue_Iterator::advance, ACE_Pipe::close, ACE_Unbounded_Queue_Iterator::next, and notification_pipe_.

00118 {
00119   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
00120 
00121 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00122   // Free up the dynamically allocated resources.
00123   ACE_Notification_Buffer **b;
00124 
00125   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00126        alloc_iter.next (b) != 0;
00127        alloc_iter.advance ())
00128     {
00129       delete [] *b;
00130       *b = 0;
00131     }
00132 
00133   this->alloc_queue_.reset ();
00134   this->notify_queue_.reset ();
00135   this->free_queue_.reset ();
00136 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00137 
00138   return this->notification_pipe_.close ();
00139 }

int ACE_Dev_Poll_Reactor_Notify::dispatch_notifications int &    number_of_active_handles,
ACE_Handle_Set   rd_mask
[virtual]
 

Unimplemented method required by pure virtual method in abstract base class.

This method's interface is not very compatibile with this Reactor's design. It's not clear why this method is pure virtual either.

Implements ACE_Reactor_Notify.

Definition at line 221 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, handle_input, notification_pipe_, and ACE_Pipe::read_handle.

00224 {
00225   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notifications");
00226 
00227   // This method is unimplemented in the ACE_Dev_Poll_Reactor.
00228   // Instead, the notification handler is invoked as part of the IO
00229   // event set.  Doing so alters the some documented semantics that
00230   // state that the notifications are handled before IO events.
00231   // Enforcing such semantics does not appear to be beneficial, and
00232   // also serves to slow down event dispatching particularly with this
00233   // ACE_Dev_Poll_Reactor.
00234 
00235 #if 0
00236   ACE_HANDLE read_handle =
00237     this->notification_pipe_.read_handle ();
00238 
00239   // Note that we do not check if the handle has received any events.
00240   // Instead a non-blocking "speculative" read is performed.  If the
00241   // read returns with errno == EWOULDBLOCK then no notifications are
00242   // dispatched.  See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe()
00243   // for details.
00244   if (read_handle != ACE_INVALID_HANDLE)
00245     {
00246       number_of_active_handles--;
00247 
00248       return this->handle_input (read_handle);
00249     }
00250   else
00251     return 0;
00252 #else
00253   ACE_NOTSUP_RETURN (-1);
00254 #endif  /* 0 */
00255 }

int ACE_Dev_Poll_Reactor_Notify::dispatch_notify ACE_Notification_Buffer   buffer [virtual]
 

Handle one of the notify call on the handle. This could be because of a thread trying to unblock the Reactor_Impl.

Implements ACE_Reactor_Notify.

Definition at line 366 of file Dev_Poll_Reactor.cpp.

References ACE_Event_Handler::ACCEPT_MASK, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Notification_Buffer::eh_, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::GROUP_QOS_MASK, ACE_Event_Handler::handle_close, ACE_Event_Handler::handle_exception, ACE_Event_Handler::handle_group_qos, ACE_Event_Handler::handle_input, ACE_Event_Handler::handle_output, ACE_Event_Handler::handle_qos, LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::QOS_MASK, ACE_Event_Handler::READ_MASK, and ACE_Event_Handler::WRITE_MASK.

Referenced by handle_input.

00367 {
00368   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notify");
00369 
00370   int result = 0;
00371 
00372 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00373   // Dispatch all messages that are in the <notify_queue_>.
00374   {
00375     // We acquire the lock in a block to make sure we're not
00376     // holding the lock while delivering callbacks...
00377     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00378 
00379     ACE_Notification_Buffer *temp;
00380 
00381     if (notify_queue_.is_empty ())
00382       return 0;
00383     else if (notify_queue_.dequeue_head (temp) == -1)
00384       ACE_ERROR_RETURN ((LM_ERROR,
00385                          ACE_LIB_TEXT ("%p\n"),
00386                          ACE_LIB_TEXT ("dequeue_head")),
00387                         -1);
00388     buffer = *temp;
00389     if (free_queue_.enqueue_head (temp) == -1)
00390       ACE_ERROR_RETURN ((LM_ERROR,
00391                          ACE_LIB_TEXT ("%p\n"),
00392                          ACE_LIB_TEXT ("enqueue_head")),
00393                         -1);
00394   }
00395 
00396   // If eh == 0 then another thread is unblocking the
00397   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00398   // internal structures.  Otherwise, we need to dispatch the
00399   // appropriate handle_* method on the ACE_Event_Handler
00400   // pointer we've been passed.
00401   if (buffer.eh_ != 0)
00402     {
00403 
00404       switch (buffer.mask_)
00405         {
00406         case ACE_Event_Handler::READ_MASK:
00407         case ACE_Event_Handler::ACCEPT_MASK:
00408           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00409           break;
00410         case ACE_Event_Handler::WRITE_MASK:
00411           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00412           break;
00413         case ACE_Event_Handler::EXCEPT_MASK:
00414           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00415           break;
00416         default:
00417           // Should we bail out if we get an invalid mask?
00418           ACE_ERROR ((LM_ERROR,
00419                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00420                       buffer.mask_));
00421         }
00422       if (result == -1)
00423         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00424                                   ACE_Event_Handler::EXCEPT_MASK);
00425     }
00426 #else
00427   // If eh == 0 then another thread is unblocking the
00428   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00429   // internal structures.  Otherwise, we need to dispatch the
00430   // appropriate handle_* method on the ACE_Event_Handler
00431   // pointer we've been passed.
00432   if (buffer.eh_ != 0)
00433     {
00434       switch (buffer.mask_)
00435         {
00436         case ACE_Event_Handler::READ_MASK:
00437         case ACE_Event_Handler::ACCEPT_MASK:
00438           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00439           break;
00440         case ACE_Event_Handler::WRITE_MASK:
00441           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00442           break;
00443         case ACE_Event_Handler::EXCEPT_MASK:
00444           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00445           break;
00446         case ACE_Event_Handler::QOS_MASK:
00447           result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
00448           break;
00449         case ACE_Event_Handler::GROUP_QOS_MASK:
00450           result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
00451           break;
00452         default:
00453           // Should we bail out if we get an invalid mask?
00454           ACE_ERROR ((LM_ERROR,
00455                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00456                       buffer.mask_));
00457         }
00458       if (result == -1)
00459         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00460                                   ACE_Event_Handler::EXCEPT_MASK);
00461     }
00462 
00463 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00464 
00465   return 1;
00466 }

void ACE_Dev_Poll_Reactor_Notify::dump void    const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 583 of file Dev_Poll_Reactor.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Pipe::dump, LM_DEBUG, and notification_pipe_.

00584 {
00585   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dump");
00586 
00587   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00588   ACE_DEBUG ((LM_DEBUG,
00589               ACE_LIB_TEXT ("dp_reactor_ = 0x%x"),
00590               this->dp_reactor_));
00591   this->notification_pipe_.dump ();
00592   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00593 }

int ACE_Dev_Poll_Reactor_Notify::handle_input ACE_HANDLE    handle [virtual]
 

Called back by the ACE_Dev_Poll_Reactor when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 303 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, dispatch_notify, EWOULDBLOCK, max_notify_iterations_, and read_notify_pipe.

Referenced by dispatch_notifications.

00304 {
00305   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::handle_input");
00306 
00307   // @@ We may end up dispatching this event handler twice:  once when
00308   //    performing the speculative read on the notification pipe
00309   //    handle, and once more when dispatching the IO events.
00310 
00311   // Precondition: this->select_reactor_.token_.current_owner () ==
00312   // ACE_Thread::self ();
00313 
00314   int number_dispatched = 0;
00315   int result = 0;
00316   ACE_Notification_Buffer buffer;
00317 
00318   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00319     {
00320       // Dispatch the buffer
00321       // NOTE: We count only if we made any dispatches ie. upcalls.
00322       if (this->dispatch_notify (buffer) > 0)
00323         number_dispatched++;
00324 
00325       // Bail out if we've reached the <notify_threshold_>.  Note that
00326       // by default <notify_threshold_> is -1, so we'll loop until all
00327       // the notifications in the pipe have been dispatched.
00328       if (number_dispatched == this->max_notify_iterations_)
00329         break;
00330     }
00331 
00332   if ((result == -1 && (errno != EWOULDBLOCK || errno != EAGAIN))
00333       || result == 0)
00334     {
00335       // Reassign number_dispatched to -1 if things have gone
00336       // seriously wrong.
00337       number_dispatched = -1;
00338     }
00339 
00340   // Enqueue ourselves into the list of waiting threads.  When we
00341   // reacquire the token we'll be off and running again with ownership
00342   // of the token.  The postcondition of this call is that
00343   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00344   //this->select_reactor_->renew ();
00345 
00346   return number_dispatched;
00347 }

int ACE_Dev_Poll_Reactor_Notify::is_dispatchable ACE_Notification_Buffer   buffer [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 358 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE.

00359 {
00360   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::is_dispatchable");
00361 
00362   ACE_NOTSUP_RETURN (-1);
00363 }

int ACE_Dev_Poll_Reactor_Notify::max_notify_iterations void    [virtual]
 

Get the maximum number of times that the handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify queue before breaking out of its event loop.

Implements ACE_Reactor_Notify.

Definition at line 481 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, and max_notify_iterations_.

00482 {
00483   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00484 
00485   return this->max_notify_iterations_;
00486 }

void ACE_Dev_Poll_Reactor_Notify::max_notify_iterations int    [virtual]
 

Set the maximum number of times that the handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify queue before breaking out of the event loop. By default, this is set to -1, which means "iterate until the queue is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 469 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, and max_notify_iterations_.

00470 {
00471   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00472 
00473   // Must always be > 0 or < 0 to optimize the loop exit condition.
00474   if (iterations == 0)
00475     iterations = 1;
00476 
00477   this->max_notify_iterations_ = iterations;
00478 }

ssize_t ACE_Dev_Poll_Reactor_Notify::notify ACE_Event_Handler   = 0,
ACE_Reactor_Mask    = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value   = 0
[virtual]
 

Called by a thread when it wants to unblock the Reactor_Impl. This wakeups the Reactor_Impl if currently blocked. Pass over both the Event_Handler *and* the mask to allow the caller to dictate which Event_Handler method the Reactor_Impl will invoke. The ACE_Time_Value indicates how long to blocking trying to notify the Reactor_Impl. If timeout == 0, the caller will block until action is possible, else will wait until the relative time specified in *timeout elapses).

Implements ACE_Reactor_Notify.

Definition at line 142 of file Dev_Poll_Reactor.cpp.

References ACE_ASSERT, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_SYNCH_MUTEX, ACE_TRACE, dp_reactor_, ACE::send, and ssize_t.

00145 {
00146   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify");
00147 
00148   // Just consider this method a "no-op" if there's no
00149   // ACE_Dev_Poll_Reactor configured.
00150   if (this->dp_reactor_ == 0)
00151     return 0;
00152 
00153 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00154   ACE_Notification_Buffer buffer (eh, mask);
00155   // int notification_required = 0;
00156 
00157   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00158 
00159   // No pending notifications.
00160 
00161   // We will send notify for every message..
00162   // if (this->notify_queue_.is_empty ())
00163   //   notification_required = 1;
00164 
00165   ACE_Notification_Buffer *temp = 0;
00166 
00167   if (free_queue_.dequeue_head (temp) == -1)
00168     {
00169       // Grow the queue of available buffers.
00170       ACE_Notification_Buffer *temp1;
00171 
00172       ACE_NEW_RETURN (temp1,
00173                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00174                       -1);
00175 
00176       if (this->alloc_queue_.enqueue_head (temp1) == -1)
00177         return -1;
00178 
00179       // Start at 1 and enqueue only
00180       // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00181       // the first one will be used right now.
00182       for (size_t i = 1;
00183            i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00184            ++i)
00185         this->free_queue_.enqueue_head (temp1 + i);
00186 
00187       temp = temp1;
00188     }
00189 
00190   ACE_ASSERT (temp != 0);
00191   *temp = buffer;
00192 
00193   if (notify_queue_.enqueue_tail (temp) == -1)
00194     return -1;
00195 
00196   // Let us send a notify for every message
00197   // if (notification_required)
00198   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00199                          (char *) &buffer,
00200                          sizeof buffer,
00201                          timeout);
00202   if (n == -1)
00203     return -1;
00204 
00205   return 0;
00206 #else
00207   ACE_Notification_Buffer buffer (eh, mask);
00208 
00209   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00210                          (char *) &buffer,
00211                          sizeof buffer,
00212                          timeout);
00213   if (n == -1)
00214     return -1;
00215 
00216   return 0;
00217 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00218 }

ACE_HANDLE ACE_Dev_Poll_Reactor_Notify::notify_handle void    [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Reactor_Impl.

Implements ACE_Reactor_Notify.

Definition at line 350 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle.

00351 {
00352   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify_handle");
00353 
00354   return this->notification_pipe_.read_handle ();
00355 }

int ACE_Dev_Poll_Reactor_Notify::open ACE_Reactor_Impl  ,
ACE_Timer_Queue   timer_queue = 0,
int    disable_notify = 0
[virtual]
 

Implements ACE_Reactor_Notify.

Definition at line 64 of file Dev_Poll_Reactor.cpp.

References ACE_NEW_RETURN, ACE_NONBLOCK, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_TRACE, dp_reactor_, ACE_OS::fcntl, notification_pipe_, ACE_Pipe::open, and ACE_Flag_Manip::set_flags.

00067 {
00068   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::open");
00069 
00070   if (disable_notify_pipe == 0)
00071     {
00072       this->dp_reactor_ = ACE_dynamic_cast (ACE_Dev_Poll_Reactor *, r);
00073 
00074       if (this->dp_reactor_ == 0)
00075         {
00076           errno = EINVAL;
00077           return -1;
00078         }
00079 
00080       if (this->notification_pipe_.open () == -1)
00081         return -1;
00082 
00083 #if defined (F_SETFD)
00084       // close-on-exec
00085       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00086       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00087 #endif /* F_SETFD */
00088 
00089 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00090       ACE_Notification_Buffer *temp;
00091 
00092       ACE_NEW_RETURN (temp,
00093                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00094                       -1);
00095 
00096       if (this->alloc_queue_.enqueue_head (temp) == -1)
00097         return -1;
00098 
00099       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00100         if (free_queue_.enqueue_head (temp + i) == -1)
00101           return -1;
00102 
00103 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00104 
00105       // Set the read handle into non-blocking mode since we need to
00106       // perform a "speculative" read when determining if their are
00107       // notifications to dispatch.
00108       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00109                           ACE_NONBLOCK) == -1)
00110         return -1;
00111     }
00112 
00113   return 0;
00114 }

int ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications ACE_Event_Handler   = 0,
ACE_Reactor_Mask    = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified ACE_Event_Handler object. Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 489 of file Dev_Poll_Reactor.cpp.

References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Unbounded_Queue::dequeue_head, ACE_Notification_Buffer::eh_, ACE_Unbounded_Queue::enqueue_head, LM_ERROR, ACE_Notification_Buffer::mask_, and ACE_Unbounded_Queue::size.

00492 {
00493   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications");
00494 
00495 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00496 
00497   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00498 
00499   if (this->notify_queue_.is_empty ())
00500     return 0;
00501 
00502   ACE_Notification_Buffer *temp;
00503   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00504 
00505   size_t queue_size = this->notify_queue_.size ();
00506   int number_purged = 0;
00507   size_t i;
00508   for (i = 0; i < queue_size; ++i)
00509     {
00510       if (-1 == this->notify_queue_.dequeue_head (temp))
00511         ACE_ERROR_RETURN ((LM_ERROR,
00512                            ACE_LIB_TEXT ("%p\n"),
00513                            ACE_LIB_TEXT ("dequeue_head")),
00514                           -1);
00515 
00516       // If this is not a Reactor notify (it is for a particular
00517       // handler), and it matches the specified handler (or purging
00518       // all), and applying the mask would totally eliminate the
00519       // notification, then release it and count the number purged.
00520       if ((0 != temp->eh_) &&
00521           (0 == eh || eh == temp->eh_) &&
00522           ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing
00523                                                  // notification mask
00524                                                  // is left with
00525                                                  // nothing when
00526                                                  // applying the mask.
00527         {
00528           if (this->free_queue_.enqueue_head (temp) == -1)
00529             ACE_ERROR_RETURN ((LM_ERROR,
00530                                ACE_LIB_TEXT ("%p\n"),
00531                                ACE_LIB_TEXT ("enqueue_head")),
00532                               -1);
00533           ++number_purged;
00534         }
00535       else
00536         {
00537           // To preserve it, move it to the local_queue.
00538           // But first, if this is not a Reactor notify (it is for a
00539           // particular handler), and it matches the specified handler
00540           // (or purging all), then apply the mask.
00541           if ((0 != temp->eh_) &&
00542               (0 == eh || eh == temp->eh_))
00543             ACE_CLR_BITS(temp->mask_, mask);
00544           if (-1 == local_queue.enqueue_head (temp))
00545             return -1;
00546         }
00547     }
00548 
00549   if (this->notify_queue_.size ())
00550     {
00551       // Should be empty!
00552       ACE_ASSERT (0);
00553       return -1;
00554     }
00555 
00556   // Now put it back in the notify queue.
00557   queue_size = local_queue.size ();
00558   for (i = 0; i < queue_size; ++i)
00559     {
00560       if (-1 == local_queue.dequeue_head (temp))
00561         ACE_ERROR_RETURN ((LM_ERROR,
00562                            ACE_LIB_TEXT ("%p\n"),
00563                            ACE_LIB_TEXT ("dequeue_head")),
00564                           -1);
00565 
00566       if (-1 == this->notify_queue_.enqueue_head (temp))
00567         ACE_ERROR_RETURN ((LM_ERROR,
00568                            ACE_LIB_TEXT ("%p\n"),
00569                            ACE_LIB_TEXT ("enqueue_head")),
00570                           -1);
00571     }
00572 
00573   return number_purged;
00574 
00575 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00576   ACE_UNUSED_ARG (eh);
00577   ACE_UNUSED_ARG (mask);
00578   ACE_NOTSUP_RETURN (-1);
00579 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00580 }

int ACE_Dev_Poll_Reactor_Notify::read_notify_pipe ACE_HANDLE    handle,
ACE_Notification_Buffer   buffer
[virtual]
 

Read one of the notify call on the handle into the buffer. This could be because of a thread trying to unblock the Reactor_Impl.

Implements ACE_Reactor_Notify.

Definition at line 258 of file Dev_Poll_Reactor.cpp.

References ACE_TRACE, EWOULDBLOCK, ACE::recv, and ssize_t.

Referenced by handle_input.

00260 {
00261   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::read_notify_pipe");
00262 
00263   // This is a (non-blocking) "speculative" read, i.e., we attempt to
00264   // read even if no event was polled on the read handle.  A
00265   // speculative read is necessary since notifications must be
00266   // dispatched before IO events.  We can avoid the speculative read
00267   // by "walking" the array of pollfd structures returned from
00268   // `/dev/poll' or `/dev/epoll' but that is potentially much more
00269   // expensive than simply checking for an EWOULDBLOCK.
00270 
00271   ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00272 
00273   if (n > 0)
00274     {
00275       // Check to see if we've got a short read.
00276       if (n != sizeof buffer)
00277         {
00278           ssize_t remainder = sizeof buffer - n;
00279 
00280           // If so, try to recover by reading the remainder.  If this
00281           // doesn't work we're in big trouble since the input stream
00282           // won't be aligned correctly.  I'm not sure quite what to
00283           // do at this point.  It's probably best just to return -1.
00284           if (ACE::recv (handle,
00285                          ((char *) &buffer) + n,
00286                          remainder) != remainder)
00287             return -1;
00288         }
00289 
00290 
00291       return 1;
00292     }
00293 
00294   // Return -1 if things have gone seriously wrong.
00295   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00296     return -1;
00297 
00298   return n;
00299 }


Member Data Documentation

ACE_Dev_Poll_Reactor* ACE_Dev_Poll_Reactor_Notify::dp_reactor_ [protected]
 

Keep a back pointer to the ACE_Dev_Poll_Reactor. If this value if NULL then the ACE_Dev_Poll_Reactor has been initialized with disable_notify_pipe.

Definition at line 257 of file Dev_Poll_Reactor.h.

Referenced by notify, and open.

int ACE_Dev_Poll_Reactor_Notify::max_notify_iterations_ [protected]
 

Keeps track of the maximum number of times that the ACE_Dev_Poll_Reactor_Notify::handle_input method will iterate and dispatch the ACE_Event_Handlers that are passed in via the notify pipe before breaking out of its recv loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 273 of file Dev_Poll_Reactor.h.

Referenced by handle_input, and max_notify_iterations.

ACE_Pipe ACE_Dev_Poll_Reactor_Notify::notification_pipe_ [protected]
 

Contains the ACE_HANDLE the ACE_Dev_Poll_Reactor is listening on, as well as the ACE_HANDLE that threads wanting the attention of the ACE_Dev_Poll_Reactor will write to.

Definition at line 264 of file Dev_Poll_Reactor.h.

Referenced by close, dispatch_notifications, dump, notify_handle, and open.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 12:47:12 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002