Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ACE_Select_Reactor_Notify Class Reference

Unblock the <ACE_Select_Reactor> from its event loop. More...

#include <Select_Reactor_Base.h>

Inheritance diagram for ACE_Select_Reactor_Notify:

Inheritance graph
[legend]
Collaboration diagram for ACE_Select_Reactor_Notify:

Collaboration graph
[legend]
List of all members.

Public Methods

 ACE_Select_Reactor_Notify (void)
 Constructor. More...

 ~ACE_Select_Reactor_Notify (void)
 Destructor. More...

virtual int open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=0)
 Initialize. More...

virtual int close (void)
 Destroy. More...

virtual int notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *=0)
virtual int dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask)
 Handles pending threads (if any) that are waiting to unblock the <ACE_Select_Reactor>. More...

virtual ACE_HANDLE notify_handle (void)
 Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor. More...

virtual int dispatch_notify (ACE_Notification_Buffer &buffer)
 Handle one of the notify call on the <handle>. This could be because of a thread trying to unblock the <Reactor_Impl>. More...

virtual int read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer)
 Read one of the notify call on the <handle> into the <buffer>. This could be because of a thread trying to unblock the <Reactor_Impl>. More...

virtual int is_dispatchable (ACE_Notification_Buffer &buffer)
 Verify whether the buffer has dispatchable info or not. More...

virtual int handle_input (ACE_HANDLE handle)
 Called back by the <ACE_Select_Reactor> when a thread wants to unblock us. More...

virtual void max_notify_iterations (int)
virtual int max_notify_iterations (void)
virtual int purge_pending_notifications (ACE_Event_Handler *, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK)
virtual void dump (void) const
 Dump the state of an object. More...


Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks. More...


Protected Attributes

ACE_Select_Reactor_Implselect_reactor_
ACE_Pipe notification_pipe_
int max_notify_iterations_

Detailed Description

Unblock the <ACE_Select_Reactor> from its event loop.

This implementation is necessary for cases where the <ACE_Select_Reactor> is run in a multi-threaded program. In this case, we need to be able to unblock <select> or <poll> when updates occur other than in the main <ACE_Select_Reactor> thread. To do this, we signal an auto-reset event the <ACE_Select_Reactor> is listening on. If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is passed to <notify>, the appropriate <handle_*> method is dispatched in the context of the <ACE_Select_Reactor> thread.

Definition at line 117 of file Select_Reactor_Base.h.


Constructor & Destructor Documentation

ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify void   
 

Constructor.

Definition at line 482 of file Select_Reactor_Base.cpp.

00483   : max_notify_iterations_ (-1)
00484 {
00485 }

ACE_INLINE ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify void   
 

Destructor.

Definition at line 12 of file Select_Reactor_Base.i.

00013 {
00014 }


Member Function Documentation

int ACE_Select_Reactor_Notify::close void    [virtual]
 

Destroy.

Implements ACE_Reactor_Notify.

Definition at line 673 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Unbounded_Queue_Iterator::advance, ACE_Pipe::close, ACE_Unbounded_Queue_Iterator::next, and notification_pipe_.

00674 {
00675   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00676 
00677 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00678   // Free up the dynamically allocated resources.
00679   ACE_Notification_Buffer **b;
00680 
00681   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00682        alloc_iter.next (b) != 0;
00683        alloc_iter.advance ())
00684     {
00685       delete [] *b;
00686       *b = 0;
00687     }
00688 
00689   this->alloc_queue_.reset ();
00690   this->notify_queue_.reset ();
00691   this->free_queue_.reset ();
00692 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00693 
00694   return this->notification_pipe_.close ();
00695 }

int ACE_Select_Reactor_Notify::dispatch_notifications int &    number_of_active_handles,
ACE_Handle_Set   rd_mask
[virtual]
 

Handles pending threads (if any) that are waiting to unblock the <ACE_Select_Reactor>.

Implements ACE_Reactor_Notify.

Definition at line 774 of file Select_Reactor_Base.cpp.

References ACE_TRACE, ACE_Handle_Set::clr_bit, handle_input, ACE_Handle_Set::is_set, notification_pipe_, and ACE_Pipe::read_handle.

00776 {
00777   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00778 
00779   ACE_HANDLE read_handle =
00780     this->notification_pipe_.read_handle ();
00781 
00782   if (read_handle != ACE_INVALID_HANDLE
00783       && rd_mask.is_set (read_handle))
00784     {
00785       number_of_active_handles--;
00786       rd_mask.clr_bit (read_handle);
00787       return this->handle_input (read_handle);
00788     }
00789   else
00790     return 0;
00791 }

int ACE_Select_Reactor_Notify::dispatch_notify ACE_Notification_Buffer   buffer [virtual]
 

Handle one of the notify call on the <handle>. This could be because of a thread trying to unblock the <Reactor_Impl>.

Implements ACE_Reactor_Notify.

Definition at line 865 of file Select_Reactor_Base.cpp.

References ACE_Event_Handler::ACCEPT_MASK, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_Notification_Buffer::eh_, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::GROUP_QOS_MASK, ACE_Event_Handler::handle_close, ACE_Event_Handler::handle_exception, ACE_Event_Handler::handle_group_qos, ACE_Event_Handler::handle_input, ACE_Event_Handler::handle_output, ACE_Event_Handler::handle_qos, LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::QOS_MASK, ACE_Event_Handler::READ_MASK, and ACE_Event_Handler::WRITE_MASK.

Referenced by handle_input.

00866 {
00867   int result = 0;
00868 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00869   // Dispatch all messages that are in the <notify_queue_>.
00870   {
00871     // We acquire the lock in a block to make sure we're not
00872     // holding the lock while delivering callbacks...
00873     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00874 
00875     ACE_Notification_Buffer *temp;
00876 
00877     if (notify_queue_.is_empty ())
00878       return 0;
00879     else if (notify_queue_.dequeue_head (temp) == -1)
00880       ACE_ERROR_RETURN ((LM_ERROR,
00881                          ACE_LIB_TEXT ("%p\n"),
00882                          ACE_LIB_TEXT ("dequeue_head")),
00883                         -1);
00884     buffer = *temp;
00885     if (free_queue_.enqueue_head (temp) == -1)
00886       ACE_ERROR_RETURN ((LM_ERROR,
00887                          ACE_LIB_TEXT ("%p\n"),
00888                          ACE_LIB_TEXT ("enqueue_head")),
00889                         -1);
00890   }
00891 
00892   // If eh == 0 then another thread is unblocking the
00893   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00894   // internal structures.  Otherwise, we need to dispatch the
00895   // appropriate handle_* method on the <ACE_Event_Handler>
00896   // pointer we've been passed.
00897   if (buffer.eh_ != 0)
00898     {
00899 
00900       switch (buffer.mask_)
00901         {
00902         case ACE_Event_Handler::READ_MASK:
00903         case ACE_Event_Handler::ACCEPT_MASK:
00904           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00905           break;
00906         case ACE_Event_Handler::WRITE_MASK:
00907           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00908           break;
00909         case ACE_Event_Handler::EXCEPT_MASK:
00910           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00911           break;
00912         default:
00913           // Should we bail out if we get an invalid mask?
00914           ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
00915         }
00916       if (result == -1)
00917         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00918                                   ACE_Event_Handler::EXCEPT_MASK);
00919     }
00920 #else
00921   // If eh == 0 then another thread is unblocking the
00922   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00923   // internal structures.  Otherwise, we need to dispatch the
00924   // appropriate handle_* method on the <ACE_Event_Handler>
00925   // pointer we've been passed.
00926   if (buffer.eh_ != 0)
00927     {
00928       switch (buffer.mask_)
00929         {
00930         case ACE_Event_Handler::READ_MASK:
00931         case ACE_Event_Handler::ACCEPT_MASK:
00932           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00933           break;
00934         case ACE_Event_Handler::WRITE_MASK:
00935           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00936           break;
00937         case ACE_Event_Handler::EXCEPT_MASK:
00938           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00939           break;
00940         case ACE_Event_Handler::QOS_MASK:
00941           result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
00942           break;
00943         case ACE_Event_Handler::GROUP_QOS_MASK:
00944           result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
00945           break;
00946         default:
00947           // Should we bail out if we get an invalid mask?
00948           ACE_ERROR ((LM_ERROR,
00949                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00950                       buffer.mask_));
00951         }
00952       if (result == -1)
00953         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00954                                   ACE_Event_Handler::EXCEPT_MASK);
00955     }
00956 
00957 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00958 
00959   return 1;
00960 }

void ACE_Select_Reactor_Notify::dump void    const [virtual]
 

Dump the state of an object.

Implements ACE_Reactor_Notify.

Definition at line 600 of file Select_Reactor_Base.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Pipe::dump, LM_DEBUG, and notification_pipe_.

00601 {
00602   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00603 
00604   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00605   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00606   this->notification_pipe_.dump ();
00607   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00608 }

int ACE_Select_Reactor_Notify::handle_input ACE_HANDLE    handle [virtual]
 

Called back by the <ACE_Select_Reactor> when a thread wants to unblock us.

Reimplemented from ACE_Event_Handler.

Definition at line 1000 of file Select_Reactor_Base.cpp.

References ACE_TRACE, dispatch_notify, max_notify_iterations_, read_notify_pipe, ACE_Select_Reactor_Impl::renew, and select_reactor_.

Referenced by dispatch_notifications.

01001 {
01002   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01003   // Precondition: this->select_reactor_.token_.current_owner () ==
01004   // ACE_Thread::self ();
01005 
01006   int number_dispatched = 0;
01007   int result = 0;
01008   ACE_Notification_Buffer buffer;
01009 
01010   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01011     {
01012       // Dispatch the buffer
01013       // NOTE: We count only if we made any dispatches ie. upcalls.
01014       if (this->dispatch_notify (buffer) > 0)
01015         number_dispatched++;
01016 
01017       // Bail out if we've reached the <notify_threshold_>.  Note that
01018       // by default <notify_threshold_> is -1, so we'll loop until all
01019       // the notifications in the pipe have been dispatched.
01020       if (number_dispatched == this->max_notify_iterations_)
01021         break;
01022     }
01023 
01024   // Reassign number_dispatched to -1 if things have gone seriously
01025   // wrong.
01026   if (result < 0)
01027     number_dispatched = -1;
01028 
01029   // Enqueue ourselves into the list of waiting threads.  When we
01030   // reacquire the token we'll be off and running again with ownership
01031   // of the token.  The postcondition of this call is that
01032   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
01033   this->select_reactor_->renew ();
01034   return number_dispatched;
01035 }

int ACE_Select_Reactor_Notify::is_dispatchable ACE_Notification_Buffer   buffer [virtual]
 

Verify whether the buffer has dispatchable info or not.

Implements ACE_Reactor_Notify.

Definition at line 808 of file Select_Reactor_Base.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_Notification_Buffer::eh_, and LM_ERROR.

00809 {
00810    // There is tonnes of code that can be abstracted...
00811 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00812   {
00813     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00814 
00815     ACE_Notification_Buffer *temp;
00816 
00817     ACE_UNUSED_ARG (buffer);
00818 
00819     // If the queue is empty just return 0
00820     if (notify_queue_.is_empty ())
00821       return 0;
00822 
00823     if (this->notify_queue_.dequeue_head (temp) == -1)
00824       ACE_ERROR_RETURN ((LM_ERROR,
00825                          ACE_LIB_TEXT ("%p\n"),
00826                          ACE_LIB_TEXT ("dequeue_head")),
00827                         -1);
00828     if (temp->eh_ != 0)
00829       {
00830         // If the queue had a buffer that has an event handler, put
00831         // the element  back in the queue and return a 1
00832         if (this->notify_queue_.enqueue_head (temp) == -1)
00833           {
00834             ACE_ERROR_RETURN ((LM_ERROR,
00835                                ACE_LIB_TEXT ("%p\n"),
00836                                ACE_LIB_TEXT ("enque_head")),
00837                               -1);
00838           }
00839 
00840         return 1;
00841       }
00842     // Else put the element in the free queue
00843     if (free_queue_.enqueue_head (temp) == -1)
00844       ACE_ERROR_RETURN ((LM_ERROR,
00845                          ACE_LIB_TEXT ("%p\n"),
00846                          ACE_LIB_TEXT ("enqueue_head")),
00847                         -1);
00848   }
00849 #else
00850   // If eh == 0 then another thread is unblocking the
00851   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00852   // internal structures.  Otherwise, we need to dispatch the
00853   // appropriate handle_* method on the <ACE_Event_Handler>
00854   // pointer we've been passed.
00855   if (buffer.eh_ != 0)
00856     return 1;
00857 
00858 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00859 
00860   // has no dispatchable buffer
00861   return 0;
00862 }

int ACE_Select_Reactor_Notify::max_notify_iterations void    [virtual]
 

Get the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify pipe before breaking out of its <recv> loop.

Implements ACE_Reactor_Notify.

Definition at line 498 of file Select_Reactor_Base.cpp.

References max_notify_iterations_.

00499 {
00500   return this->max_notify_iterations_;
00501 }

void ACE_Select_Reactor_Notify::max_notify_iterations int    [virtual]
 

Set the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify pipe before breaking out of its <recv> loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead.

Implements ACE_Reactor_Notify.

Definition at line 488 of file Select_Reactor_Base.cpp.

References max_notify_iterations_.

00489 {
00490   // Must always be > 0 or < 0 to optimize the loop exit condition.
00491   if (iterations == 0)
00492     iterations = 1;
00493 
00494   this->max_notify_iterations_ = iterations;
00495 }

int ACE_Select_Reactor_Notify::notify ACE_Event_Handler   = 0,
ACE_Reactor_Mask    = ACE_Event_Handler::EXCEPT_MASK,
ACE_Time_Value   = 0
[virtual]
 

Called by a thread when it wants to unblock the <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if currently blocked in <select>/<poll>. Pass over both the <Event_Handler> *and* the <mask> to allow the caller to dictate which <Event_Handler> method the <ACE_Select_Reactor> will invoke. The <ACE_Time_Value> indicates how long to blocking trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, the caller will block until action is possible, else will wait until the relative time specified in *<timeout> elapses).

Implements ACE_Reactor_Notify.

Definition at line 698 of file Select_Reactor_Base.cpp.

References ACE_ASSERT, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_SYNCH_MUTEX, ACE_TRACE, select_reactor_, ACE::send, and ssize_t.

00701 {
00702   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00703 
00704   // Just consider this method a "no-op" if there's no
00705   // <ACE_Select_Reactor> configured.
00706   if (this->select_reactor_ == 0)
00707     return 0;
00708 
00709   ACE_Notification_Buffer buffer (eh, mask);
00710 
00711 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00712   // Artificial scope to limit the duration of the mutex.
00713   {
00714     // int notification_required = 0;
00715 
00716     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00717 
00718     // No pending notifications.
00719 
00720     // We will send notify for every message..
00721     // if (this->notify_queue_.is_empty ())
00722     //   notification_required = 1;
00723 
00724     ACE_Notification_Buffer *temp = 0;
00725 
00726     if (free_queue_.dequeue_head (temp) == -1)
00727       {
00728         // Grow the queue of available buffers.
00729         ACE_Notification_Buffer *temp1;
00730 
00731         ACE_NEW_RETURN (temp1,
00732                         ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00733                         -1);
00734 
00735         if (this->alloc_queue_.enqueue_head (temp1) == -1)
00736           {
00737             delete [] temp1;
00738             return -1;
00739           }
00740 
00741         // Start at 1 and enqueue only
00742         // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00743         // the first one will be used right now.
00744         for (size_t i = 1;
00745              i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00746              i++)
00747           this->free_queue_.enqueue_head (temp1 + i);
00748 
00749         temp = temp1;
00750       }
00751 
00752     ACE_ASSERT (temp != 0);
00753     *temp = buffer;
00754 
00755     if (notify_queue_.enqueue_tail (temp) == -1)
00756       return -1;
00757   }
00758 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00759 
00760   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00761                          (char *) &buffer,
00762                          sizeof buffer,
00763                          timeout);
00764   if (n == -1)
00765     return -1;
00766 
00767   return 0;
00768 }

ACE_HANDLE ACE_Select_Reactor_Notify::notify_handle void    [virtual]
 

Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor.

Implements ACE_Reactor_Notify.

Definition at line 795 of file Select_Reactor_Base.cpp.

References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle.

00796 {
00797   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00798 
00799   return this->notification_pipe_.read_handle ();
00800 }

int ACE_Select_Reactor_Notify::open ACE_Reactor_Impl  ,
ACE_Timer_Queue   = 0,
int    disable_notify_pipe = 0
[virtual]
 

Initialize.

Implements ACE_Reactor_Notify.

Definition at line 611 of file Select_Reactor_Base.cpp.

References ACE_NEW_RETURN, ACE_NONBLOCK, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_TRACE, ACE_OS::fcntl, notification_pipe_, ACE_Pipe::open, ACE_Pipe::read_handle, ACE_Event_Handler::READ_MASK, ACE_Reactor_Impl::register_handler, select_reactor_, and ACE_Flag_Manip::set_flags.

00614 {
00615   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00616 
00617   if (disable_notify_pipe == 0)
00618     {
00619       this->select_reactor_ =
00620         ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
00621 
00622       if (select_reactor_ == 0)
00623         {
00624           errno = EINVAL;
00625           return -1;
00626         }
00627 
00628       if (this->notification_pipe_.open () == -1)
00629         return -1;
00630 #if defined (F_SETFD)
00631       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00632       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00633 #endif /* F_SETFD */
00634 
00635 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00636       ACE_Notification_Buffer *temp;
00637 
00638       ACE_NEW_RETURN (temp,
00639                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00640                       -1);
00641 
00642       if (this->alloc_queue_.enqueue_head (temp) == -1)
00643         {
00644           delete [] temp;
00645           return -1;
00646         }
00647 
00648       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
00649         if (free_queue_.enqueue_head (temp + i) == -1)
00650           return -1;
00651 
00652 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00653 
00654       // There seems to be a Win32 bug with this...  Set this into
00655       // non-blocking mode.
00656       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00657                           ACE_NONBLOCK) == -1)
00658         return -1;
00659       else
00660         return this->select_reactor_->register_handler
00661           (this->notification_pipe_.read_handle (),
00662            this,
00663            ACE_Event_Handler::READ_MASK);
00664     }
00665   else
00666     {
00667       this->select_reactor_ = 0;
00668       return 0;
00669     }
00670 }

int ACE_Select_Reactor_Notify::purge_pending_notifications ACE_Event_Handler  ,
ACE_Reactor_Mask    = ACE_Event_Handler::ALL_EVENTS_MASK
[virtual]
 

Purge any notifications pending in this reactor for the specified <ACE_Event_Handler> object. If <eh> == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error.

Implements ACE_Reactor_Notify.

Definition at line 510 of file Select_Reactor_Base.cpp.

References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Unbounded_Queue::dequeue_head, ACE_Notification_Buffer::eh_, ACE_Unbounded_Queue::enqueue_head, LM_ERROR, ACE_Notification_Buffer::mask_, and ACE_Unbounded_Queue::size.

00512 {
00513   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00514 
00515 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00516 
00517   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00518 
00519   if (this->notify_queue_.is_empty ())
00520     return 0;
00521 
00522   ACE_Notification_Buffer *temp;
00523   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00524 
00525   size_t queue_size = this->notify_queue_.size ();
00526   int number_purged = 0;
00527   size_t i;
00528   for (i = 0; i < queue_size; ++i)
00529     {
00530       if (-1 == this->notify_queue_.dequeue_head (temp))
00531         ACE_ERROR_RETURN ((LM_ERROR,
00532                            ACE_LIB_TEXT ("%p\n"),
00533                            ACE_LIB_TEXT ("dequeue_head")),
00534                           -1);
00535 
00536       // If this is not a Reactor notify (it is for a particular handler),
00537       // and it matches the specified handler (or purging all),
00538       // and applying the mask would totally eliminate the notification, then
00539       // release it and count the number purged.
00540       if ((0 != temp->eh_) &&
00541           (0 == eh || eh == temp->eh_) &&
00542           ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
00543                                                  // is left with nothing when
00544                                                  // applying the mask
00545       {
00546         if (-1 == this->free_queue_.enqueue_head (temp))
00547           ACE_ERROR_RETURN ((LM_ERROR,
00548                              ACE_LIB_TEXT ("%p\n"),
00549                              ACE_LIB_TEXT ("enqueue_head")),
00550                             -1);
00551         ++number_purged;
00552       }
00553       else
00554       {
00555         // To preserve it, move it to the local_queue.
00556         // But first, if this is not a Reactor notify (it is for a particularhandler),
00557         // and it matches the specified handler (or purging all), then
00558         // apply the mask
00559         if ((0 != temp->eh_) &&
00560             (0 == eh || eh == temp->eh_))
00561           ACE_CLR_BITS(temp->mask_, mask);
00562         if (-1 == local_queue.enqueue_head (temp))
00563           return -1;
00564       }
00565     }
00566 
00567   if (this->notify_queue_.size ())
00568     { // should be empty!
00569       ACE_ASSERT (0);
00570       return -1;
00571     }
00572 
00573   // now put it back in the notify queue
00574   queue_size = local_queue.size ();
00575   for (i = 0; i < queue_size; ++i)
00576     {
00577       if (-1 == local_queue.dequeue_head (temp))
00578         ACE_ERROR_RETURN ((LM_ERROR,
00579                            ACE_LIB_TEXT ("%p\n"),
00580                            ACE_LIB_TEXT ("dequeue_head")),
00581                           -1);
00582 
00583       if (-1 == this->notify_queue_.enqueue_head (temp))
00584         ACE_ERROR_RETURN ((LM_ERROR,
00585                            ACE_LIB_TEXT ("%p\n"),
00586                            ACE_LIB_TEXT ("enqueue_head")),
00587                           -1);
00588     }
00589 
00590   return number_purged;
00591 
00592 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00593   ACE_UNUSED_ARG (eh);
00594   ACE_UNUSED_ARG (mask);
00595   ACE_NOTSUP_RETURN (-1);
00596 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00597 }

int ACE_Select_Reactor_Notify::read_notify_pipe ACE_HANDLE    handle,
ACE_Notification_Buffer   buffer
[virtual]
 

Read one of the notify call on the <handle> into the <buffer>. This could be because of a thread trying to unblock the <Reactor_Impl>.

Implements ACE_Reactor_Notify.

Definition at line 963 of file Select_Reactor_Base.cpp.

References ACE_TRACE, EWOULDBLOCK, ACE::recv, and ssize_t.

Referenced by handle_input.

00965 {
00966   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00967 
00968   ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00969 
00970   if (n > 0)
00971     {
00972       // Check to see if we've got a short read.
00973       if (n != sizeof buffer)
00974         {
00975           ssize_t remainder = sizeof buffer - n;
00976 
00977           // If so, try to recover by reading the remainder.  If this
00978           // doesn't work we're in big trouble since the input stream
00979           // won't be aligned correctly.  I'm not sure quite what to
00980           // do at this point.  It's probably best just to return -1.
00981           if (ACE::recv (handle,
00982                          ((char *) &buffer) + n,
00983                          remainder) != remainder)
00984             return -1;
00985         }
00986 
00987 
00988       return 1;
00989     }
00990 
00991   // Return -1 if things have gone seriously  wrong.
00992   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00993     return -1;
00994 
00995   return 0;
00996 }


Member Data Documentation

ACE_Select_Reactor_Notify::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Definition at line 211 of file Select_Reactor_Base.h.

int ACE_Select_Reactor_Notify::max_notify_iterations_ [protected]
 

Keeps track of the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify pipe before breaking out of its <recv> loop. By default, this is set to -1, which means "iterate until the pipe is empty."

Definition at line 235 of file Select_Reactor_Base.h.

Referenced by handle_input, and max_notify_iterations.

ACE_Pipe ACE_Select_Reactor_Notify::notification_pipe_ [protected]
 

Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening on, as well as the <ACE_HANDLE> that threads wanting the attention of the <ACE_Select_Reactor> will write to.

Definition at line 226 of file Select_Reactor_Base.h.

Referenced by close, dispatch_notifications, dump, notify_handle, and open.

ACE_Select_Reactor_Impl* ACE_Select_Reactor_Notify::select_reactor_ [protected]
 

Keep a back pointer to the <ACE_Select_Reactor>. If this value if NULL then the <ACE_Select_Reactor> has been initialized with <disable_notify_pipe>.

Definition at line 219 of file Select_Reactor_Base.h.

Referenced by handle_input, notify, and open.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 12:55:43 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002