#include <Select_Reactor_Base.h>
Inheritance diagram for ACE_Select_Reactor_Notify:


Public Methods | |
| ACE_Select_Reactor_Notify (void) | |
| Constructor. More... | |
| ~ACE_Select_Reactor_Notify (void) | |
| Destructor. More... | |
| virtual int | open (ACE_Reactor_Impl *, ACE_Timer_Queue *=0, int disable_notify_pipe=0) |
| Initialize. More... | |
| virtual int | close (void) |
| Destroy. More... | |
| virtual int | notify (ACE_Event_Handler *=0, ACE_Reactor_Mask=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *=0) |
| virtual int | dispatch_notifications (int &number_of_active_handles, ACE_Handle_Set &rd_mask) |
| Handles pending threads (if any) that are waiting to unblock the <ACE_Select_Reactor>. More... | |
| virtual ACE_HANDLE | notify_handle (void) |
| Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor. More... | |
| virtual int | dispatch_notify (ACE_Notification_Buffer &buffer) |
| Handle one of the notify call on the <handle>. This could be because of a thread trying to unblock the <Reactor_Impl>. More... | |
| virtual int | read_notify_pipe (ACE_HANDLE handle, ACE_Notification_Buffer &buffer) |
| Read one of the notify call on the <handle> into the <buffer>. This could be because of a thread trying to unblock the <Reactor_Impl>. More... | |
| virtual int | is_dispatchable (ACE_Notification_Buffer &buffer) |
| Verify whether the buffer has dispatchable info or not. More... | |
| virtual int | handle_input (ACE_HANDLE handle) |
| Called back by the <ACE_Select_Reactor> when a thread wants to unblock us. More... | |
| virtual void | max_notify_iterations (int) |
| virtual int | max_notify_iterations (void) |
| virtual int | purge_pending_notifications (ACE_Event_Handler *, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK) |
| virtual void | dump (void) const |
| Dump the state of an object. More... | |
Public Attributes | |
| ACE_ALLOC_HOOK_DECLARE | |
| Declare the dynamic allocation hooks. More... | |
Protected Attributes | |
| ACE_Select_Reactor_Impl * | select_reactor_ |
| ACE_Pipe | notification_pipe_ |
| int | max_notify_iterations_ |
This implementation is necessary for cases where the <ACE_Select_Reactor> is run in a multi-threaded program. In this case, we need to be able to unblock <select> or <poll> when updates occur other than in the main <ACE_Select_Reactor> thread. To do this, we signal an auto-reset event the <ACE_Select_Reactor> is listening on. If an <ACE_Event_Handler> and <ACE_Select_Reactor_Mask> is passed to <notify>, the appropriate <handle_*> method is dispatched in the context of the <ACE_Select_Reactor> thread.
Definition at line 117 of file Select_Reactor_Base.h.
|
|
Constructor.
Definition at line 482 of file Select_Reactor_Base.cpp.
00483 : max_notify_iterations_ (-1) 00484 { 00485 } |
|
|
Destructor.
Definition at line 12 of file Select_Reactor_Base.i.
00013 {
00014 }
|
|
|
Destroy.
Implements ACE_Reactor_Notify. Definition at line 673 of file Select_Reactor_Base.cpp. References ACE_TRACE, ACE_Unbounded_Queue_Iterator::advance, ACE_Pipe::close, ACE_Unbounded_Queue_Iterator::next, and notification_pipe_.
00674 {
00675 ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00676
00677 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00678 // Free up the dynamically allocated resources.
00679 ACE_Notification_Buffer **b;
00680
00681 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00682 alloc_iter.next (b) != 0;
00683 alloc_iter.advance ())
00684 {
00685 delete [] *b;
00686 *b = 0;
00687 }
00688
00689 this->alloc_queue_.reset ();
00690 this->notify_queue_.reset ();
00691 this->free_queue_.reset ();
00692 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00693
00694 return this->notification_pipe_.close ();
00695 }
|
|
||||||||||||
|
Handles pending threads (if any) that are waiting to unblock the <ACE_Select_Reactor>.
Implements ACE_Reactor_Notify. Definition at line 774 of file Select_Reactor_Base.cpp. References ACE_TRACE, ACE_Handle_Set::clr_bit, handle_input, ACE_Handle_Set::is_set, notification_pipe_, and ACE_Pipe::read_handle.
00776 {
00777 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00778
00779 ACE_HANDLE read_handle =
00780 this->notification_pipe_.read_handle ();
00781
00782 if (read_handle != ACE_INVALID_HANDLE
00783 && rd_mask.is_set (read_handle))
00784 {
00785 number_of_active_handles--;
00786 rd_mask.clr_bit (read_handle);
00787 return this->handle_input (read_handle);
00788 }
00789 else
00790 return 0;
00791 }
|
|
|
Handle one of the notify call on the <handle>. This could be because of a thread trying to unblock the <Reactor_Impl>.
Implements ACE_Reactor_Notify. Definition at line 865 of file Select_Reactor_Base.cpp. References ACE_Event_Handler::ACCEPT_MASK, ACE_ERROR, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_Notification_Buffer::eh_, ACE_Event_Handler::EXCEPT_MASK, ACE_Event_Handler::GROUP_QOS_MASK, ACE_Event_Handler::handle_close, ACE_Event_Handler::handle_exception, ACE_Event_Handler::handle_group_qos, ACE_Event_Handler::handle_input, ACE_Event_Handler::handle_output, ACE_Event_Handler::handle_qos, LM_ERROR, ACE_Notification_Buffer::mask_, ACE_Event_Handler::QOS_MASK, ACE_Event_Handler::READ_MASK, and ACE_Event_Handler::WRITE_MASK. Referenced by handle_input.
00866 {
00867 int result = 0;
00868 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00869 // Dispatch all messages that are in the <notify_queue_>.
00870 {
00871 // We acquire the lock in a block to make sure we're not
00872 // holding the lock while delivering callbacks...
00873 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00874
00875 ACE_Notification_Buffer *temp;
00876
00877 if (notify_queue_.is_empty ())
00878 return 0;
00879 else if (notify_queue_.dequeue_head (temp) == -1)
00880 ACE_ERROR_RETURN ((LM_ERROR,
00881 ACE_LIB_TEXT ("%p\n"),
00882 ACE_LIB_TEXT ("dequeue_head")),
00883 -1);
00884 buffer = *temp;
00885 if (free_queue_.enqueue_head (temp) == -1)
00886 ACE_ERROR_RETURN ((LM_ERROR,
00887 ACE_LIB_TEXT ("%p\n"),
00888 ACE_LIB_TEXT ("enqueue_head")),
00889 -1);
00890 }
00891
00892 // If eh == 0 then another thread is unblocking the
00893 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00894 // internal structures. Otherwise, we need to dispatch the
00895 // appropriate handle_* method on the <ACE_Event_Handler>
00896 // pointer we've been passed.
00897 if (buffer.eh_ != 0)
00898 {
00899
00900 switch (buffer.mask_)
00901 {
00902 case ACE_Event_Handler::READ_MASK:
00903 case ACE_Event_Handler::ACCEPT_MASK:
00904 result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00905 break;
00906 case ACE_Event_Handler::WRITE_MASK:
00907 result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00908 break;
00909 case ACE_Event_Handler::EXCEPT_MASK:
00910 result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00911 break;
00912 default:
00913 // Should we bail out if we get an invalid mask?
00914 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
00915 }
00916 if (result == -1)
00917 buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00918 ACE_Event_Handler::EXCEPT_MASK);
00919 }
00920 #else
00921 // If eh == 0 then another thread is unblocking the
00922 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00923 // internal structures. Otherwise, we need to dispatch the
00924 // appropriate handle_* method on the <ACE_Event_Handler>
00925 // pointer we've been passed.
00926 if (buffer.eh_ != 0)
00927 {
00928 switch (buffer.mask_)
00929 {
00930 case ACE_Event_Handler::READ_MASK:
00931 case ACE_Event_Handler::ACCEPT_MASK:
00932 result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00933 break;
00934 case ACE_Event_Handler::WRITE_MASK:
00935 result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00936 break;
00937 case ACE_Event_Handler::EXCEPT_MASK:
00938 result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00939 break;
00940 case ACE_Event_Handler::QOS_MASK:
00941 result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
00942 break;
00943 case ACE_Event_Handler::GROUP_QOS_MASK:
00944 result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
00945 break;
00946 default:
00947 // Should we bail out if we get an invalid mask?
00948 ACE_ERROR ((LM_ERROR,
00949 ACE_LIB_TEXT ("invalid mask = %d\n"),
00950 buffer.mask_));
00951 }
00952 if (result == -1)
00953 buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00954 ACE_Event_Handler::EXCEPT_MASK);
00955 }
00956
00957 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00958
00959 return 1;
00960 }
|
|
|
Dump the state of an object.
Implements ACE_Reactor_Notify. Definition at line 600 of file Select_Reactor_Base.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Pipe::dump, LM_DEBUG, and notification_pipe_.
00601 {
00602 ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00603
00604 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00605 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00606 this->notification_pipe_.dump ();
00607 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00608 }
|
|
|
Called back by the <ACE_Select_Reactor> when a thread wants to unblock us.
Reimplemented from ACE_Event_Handler. Definition at line 1000 of file Select_Reactor_Base.cpp. References ACE_TRACE, dispatch_notify, max_notify_iterations_, read_notify_pipe, ACE_Select_Reactor_Impl::renew, and select_reactor_. Referenced by dispatch_notifications.
01001 {
01002 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01003 // Precondition: this->select_reactor_.token_.current_owner () ==
01004 // ACE_Thread::self ();
01005
01006 int number_dispatched = 0;
01007 int result = 0;
01008 ACE_Notification_Buffer buffer;
01009
01010 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01011 {
01012 // Dispatch the buffer
01013 // NOTE: We count only if we made any dispatches ie. upcalls.
01014 if (this->dispatch_notify (buffer) > 0)
01015 number_dispatched++;
01016
01017 // Bail out if we've reached the <notify_threshold_>. Note that
01018 // by default <notify_threshold_> is -1, so we'll loop until all
01019 // the notifications in the pipe have been dispatched.
01020 if (number_dispatched == this->max_notify_iterations_)
01021 break;
01022 }
01023
01024 // Reassign number_dispatched to -1 if things have gone seriously
01025 // wrong.
01026 if (result < 0)
01027 number_dispatched = -1;
01028
01029 // Enqueue ourselves into the list of waiting threads. When we
01030 // reacquire the token we'll be off and running again with ownership
01031 // of the token. The postcondition of this call is that
01032 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
01033 this->select_reactor_->renew ();
01034 return number_dispatched;
01035 }
|
|
|
Verify whether the buffer has dispatchable info or not.
Implements ACE_Reactor_Notify. Definition at line 808 of file Select_Reactor_Base.cpp. References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_SYNCH_MUTEX, ACE_Notification_Buffer::eh_, and LM_ERROR.
00809 {
00810 // There is tonnes of code that can be abstracted...
00811 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00812 {
00813 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00814
00815 ACE_Notification_Buffer *temp;
00816
00817 ACE_UNUSED_ARG (buffer);
00818
00819 // If the queue is empty just return 0
00820 if (notify_queue_.is_empty ())
00821 return 0;
00822
00823 if (this->notify_queue_.dequeue_head (temp) == -1)
00824 ACE_ERROR_RETURN ((LM_ERROR,
00825 ACE_LIB_TEXT ("%p\n"),
00826 ACE_LIB_TEXT ("dequeue_head")),
00827 -1);
00828 if (temp->eh_ != 0)
00829 {
00830 // If the queue had a buffer that has an event handler, put
00831 // the element back in the queue and return a 1
00832 if (this->notify_queue_.enqueue_head (temp) == -1)
00833 {
00834 ACE_ERROR_RETURN ((LM_ERROR,
00835 ACE_LIB_TEXT ("%p\n"),
00836 ACE_LIB_TEXT ("enque_head")),
00837 -1);
00838 }
00839
00840 return 1;
00841 }
00842 // Else put the element in the free queue
00843 if (free_queue_.enqueue_head (temp) == -1)
00844 ACE_ERROR_RETURN ((LM_ERROR,
00845 ACE_LIB_TEXT ("%p\n"),
00846 ACE_LIB_TEXT ("enqueue_head")),
00847 -1);
00848 }
00849 #else
00850 // If eh == 0 then another thread is unblocking the
00851 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00852 // internal structures. Otherwise, we need to dispatch the
00853 // appropriate handle_* method on the <ACE_Event_Handler>
00854 // pointer we've been passed.
00855 if (buffer.eh_ != 0)
00856 return 1;
00857
00858 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00859
00860 // has no dispatchable buffer
00861 return 0;
00862 }
|
|
|
Get the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify pipe before breaking out of its <recv> loop. Implements ACE_Reactor_Notify. Definition at line 498 of file Select_Reactor_Base.cpp. References max_notify_iterations_.
00499 {
00500 return this->max_notify_iterations_;
00501 }
|
|
|
Set the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify pipe before breaking out of its <recv> loop. By default, this is set to -1, which means "iterate until the pipe is empty." Setting this to a value like "1 or 2" will increase "fairness" (and thus prevent starvation) at the expense of slightly higher dispatching overhead. Implements ACE_Reactor_Notify. Definition at line 488 of file Select_Reactor_Base.cpp. References max_notify_iterations_.
00489 {
00490 // Must always be > 0 or < 0 to optimize the loop exit condition.
00491 if (iterations == 0)
00492 iterations = 1;
00493
00494 this->max_notify_iterations_ = iterations;
00495 }
|
|
||||||||||||||||
|
Called by a thread when it wants to unblock the <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if currently blocked in <select>/<poll>. Pass over both the <Event_Handler> *and* the <mask> to allow the caller to dictate which <Event_Handler> method the <ACE_Select_Reactor> will invoke. The <ACE_Time_Value> indicates how long to blocking trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, the caller will block until action is possible, else will wait until the relative time specified in *<timeout> elapses). Implements ACE_Reactor_Notify. Definition at line 698 of file Select_Reactor_Base.cpp. References ACE_ASSERT, ACE_GUARD_RETURN, ACE_NEW_RETURN, ACE_Reactor_Mask, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_SYNCH_MUTEX, ACE_TRACE, select_reactor_, ACE::send, and ssize_t.
00701 {
00702 ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00703
00704 // Just consider this method a "no-op" if there's no
00705 // <ACE_Select_Reactor> configured.
00706 if (this->select_reactor_ == 0)
00707 return 0;
00708
00709 ACE_Notification_Buffer buffer (eh, mask);
00710
00711 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00712 // Artificial scope to limit the duration of the mutex.
00713 {
00714 // int notification_required = 0;
00715
00716 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00717
00718 // No pending notifications.
00719
00720 // We will send notify for every message..
00721 // if (this->notify_queue_.is_empty ())
00722 // notification_required = 1;
00723
00724 ACE_Notification_Buffer *temp = 0;
00725
00726 if (free_queue_.dequeue_head (temp) == -1)
00727 {
00728 // Grow the queue of available buffers.
00729 ACE_Notification_Buffer *temp1;
00730
00731 ACE_NEW_RETURN (temp1,
00732 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00733 -1);
00734
00735 if (this->alloc_queue_.enqueue_head (temp1) == -1)
00736 {
00737 delete [] temp1;
00738 return -1;
00739 }
00740
00741 // Start at 1 and enqueue only
00742 // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00743 // the first one will be used right now.
00744 for (size_t i = 1;
00745 i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00746 i++)
00747 this->free_queue_.enqueue_head (temp1 + i);
00748
00749 temp = temp1;
00750 }
00751
00752 ACE_ASSERT (temp != 0);
00753 *temp = buffer;
00754
00755 if (notify_queue_.enqueue_tail (temp) == -1)
00756 return -1;
00757 }
00758 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00759
00760 ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00761 (char *) &buffer,
00762 sizeof buffer,
00763 timeout);
00764 if (n == -1)
00765 return -1;
00766
00767 return 0;
00768 }
|
|
|
Returns the ACE_HANDLE of the notify pipe on which the reactor is listening for notifications so that other threads can unblock the Select_Reactor.
Implements ACE_Reactor_Notify. Definition at line 795 of file Select_Reactor_Base.cpp. References ACE_TRACE, notification_pipe_, and ACE_Pipe::read_handle.
00796 {
00797 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00798
00799 return this->notification_pipe_.read_handle ();
00800 }
|
|
||||||||||||||||
|
Initialize.
Implements ACE_Reactor_Notify. Definition at line 611 of file Select_Reactor_Base.cpp. References ACE_NEW_RETURN, ACE_NONBLOCK, ACE_REACTOR_NOTIFICATION_ARRAY_SIZE, ACE_TRACE, ACE_OS::fcntl, notification_pipe_, ACE_Pipe::open, ACE_Pipe::read_handle, ACE_Event_Handler::READ_MASK, ACE_Reactor_Impl::register_handler, select_reactor_, and ACE_Flag_Manip::set_flags.
00614 {
00615 ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00616
00617 if (disable_notify_pipe == 0)
00618 {
00619 this->select_reactor_ =
00620 ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
00621
00622 if (select_reactor_ == 0)
00623 {
00624 errno = EINVAL;
00625 return -1;
00626 }
00627
00628 if (this->notification_pipe_.open () == -1)
00629 return -1;
00630 #if defined (F_SETFD)
00631 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00632 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00633 #endif /* F_SETFD */
00634
00635 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00636 ACE_Notification_Buffer *temp;
00637
00638 ACE_NEW_RETURN (temp,
00639 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00640 -1);
00641
00642 if (this->alloc_queue_.enqueue_head (temp) == -1)
00643 {
00644 delete [] temp;
00645 return -1;
00646 }
00647
00648 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
00649 if (free_queue_.enqueue_head (temp + i) == -1)
00650 return -1;
00651
00652 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00653
00654 // There seems to be a Win32 bug with this... Set this into
00655 // non-blocking mode.
00656 if (ACE::set_flags (this->notification_pipe_.read_handle (),
00657 ACE_NONBLOCK) == -1)
00658 return -1;
00659 else
00660 return this->select_reactor_->register_handler
00661 (this->notification_pipe_.read_handle (),
00662 this,
00663 ACE_Event_Handler::READ_MASK);
00664 }
00665 else
00666 {
00667 this->select_reactor_ = 0;
00668 return 0;
00669 }
00670 }
|
|
||||||||||||
|
Purge any notifications pending in this reactor for the specified <ACE_Event_Handler> object. If <eh> == 0, all notifications for all handlers are removed (but not any notifications posted just to wake up the reactor itself). Returns the number of notifications purged. Returns -1 on error. Implements ACE_Reactor_Notify. Definition at line 510 of file Select_Reactor_Base.cpp. References ACE_ASSERT, ACE_BIT_DISABLED, ACE_CLR_BITS, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_LIB_TEXT, ACE_Reactor_Mask, ACE_SYNCH_MUTEX, ACE_TRACE, ACE_Unbounded_Queue::dequeue_head, ACE_Notification_Buffer::eh_, ACE_Unbounded_Queue::enqueue_head, LM_ERROR, ACE_Notification_Buffer::mask_, and ACE_Unbounded_Queue::size.
00512 {
00513 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00514
00515 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00516
00517 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00518
00519 if (this->notify_queue_.is_empty ())
00520 return 0;
00521
00522 ACE_Notification_Buffer *temp;
00523 ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00524
00525 size_t queue_size = this->notify_queue_.size ();
00526 int number_purged = 0;
00527 size_t i;
00528 for (i = 0; i < queue_size; ++i)
00529 {
00530 if (-1 == this->notify_queue_.dequeue_head (temp))
00531 ACE_ERROR_RETURN ((LM_ERROR,
00532 ACE_LIB_TEXT ("%p\n"),
00533 ACE_LIB_TEXT ("dequeue_head")),
00534 -1);
00535
00536 // If this is not a Reactor notify (it is for a particular handler),
00537 // and it matches the specified handler (or purging all),
00538 // and applying the mask would totally eliminate the notification, then
00539 // release it and count the number purged.
00540 if ((0 != temp->eh_) &&
00541 (0 == eh || eh == temp->eh_) &&
00542 ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
00543 // is left with nothing when
00544 // applying the mask
00545 {
00546 if (-1 == this->free_queue_.enqueue_head (temp))
00547 ACE_ERROR_RETURN ((LM_ERROR,
00548 ACE_LIB_TEXT ("%p\n"),
00549 ACE_LIB_TEXT ("enqueue_head")),
00550 -1);
00551 ++number_purged;
00552 }
00553 else
00554 {
00555 // To preserve it, move it to the local_queue.
00556 // But first, if this is not a Reactor notify (it is for a particularhandler),
00557 // and it matches the specified handler (or purging all), then
00558 // apply the mask
00559 if ((0 != temp->eh_) &&
00560 (0 == eh || eh == temp->eh_))
00561 ACE_CLR_BITS(temp->mask_, mask);
00562 if (-1 == local_queue.enqueue_head (temp))
00563 return -1;
00564 }
00565 }
00566
00567 if (this->notify_queue_.size ())
00568 { // should be empty!
00569 ACE_ASSERT (0);
00570 return -1;
00571 }
00572
00573 // now put it back in the notify queue
00574 queue_size = local_queue.size ();
00575 for (i = 0; i < queue_size; ++i)
00576 {
00577 if (-1 == local_queue.dequeue_head (temp))
00578 ACE_ERROR_RETURN ((LM_ERROR,
00579 ACE_LIB_TEXT ("%p\n"),
00580 ACE_LIB_TEXT ("dequeue_head")),
00581 -1);
00582
00583 if (-1 == this->notify_queue_.enqueue_head (temp))
00584 ACE_ERROR_RETURN ((LM_ERROR,
00585 ACE_LIB_TEXT ("%p\n"),
00586 ACE_LIB_TEXT ("enqueue_head")),
00587 -1);
00588 }
00589
00590 return number_purged;
00591
00592 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00593 ACE_UNUSED_ARG (eh);
00594 ACE_UNUSED_ARG (mask);
00595 ACE_NOTSUP_RETURN (-1);
00596 #endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00597 }
|
|
||||||||||||
|
Read one of the notify call on the <handle> into the <buffer>. This could be because of a thread trying to unblock the <Reactor_Impl>.
Implements ACE_Reactor_Notify. Definition at line 963 of file Select_Reactor_Base.cpp. References ACE_TRACE, EWOULDBLOCK, ACE::recv, and ssize_t. Referenced by handle_input.
00965 {
00966 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00967
00968 ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00969
00970 if (n > 0)
00971 {
00972 // Check to see if we've got a short read.
00973 if (n != sizeof buffer)
00974 {
00975 ssize_t remainder = sizeof buffer - n;
00976
00977 // If so, try to recover by reading the remainder. If this
00978 // doesn't work we're in big trouble since the input stream
00979 // won't be aligned correctly. I'm not sure quite what to
00980 // do at this point. It's probably best just to return -1.
00981 if (ACE::recv (handle,
00982 ((char *) &buffer) + n,
00983 remainder) != remainder)
00984 return -1;
00985 }
00986
00987
00988 return 1;
00989 }
00990
00991 // Return -1 if things have gone seriously wrong.
00992 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00993 return -1;
00994
00995 return 0;
00996 }
|
|
|
Declare the dynamic allocation hooks.
Definition at line 211 of file Select_Reactor_Base.h. |
|
|
Keeps track of the maximum number of times that the <ACE_Select_Reactor_Notify::handle_input> method will iterate and dispatch the <ACE_Event_Handlers> that are passed in via the notify pipe before breaking out of its <recv> loop. By default, this is set to -1, which means "iterate until the pipe is empty." Definition at line 235 of file Select_Reactor_Base.h. Referenced by handle_input, and max_notify_iterations. |
|
|
Contains the <ACE_HANDLE> the <ACE_Select_Reactor> is listening on, as well as the <ACE_HANDLE> that threads wanting the attention of the <ACE_Select_Reactor> will write to. Definition at line 226 of file Select_Reactor_Base.h. Referenced by close, dispatch_notifications, dump, notify_handle, and open. |
|
|
Keep a back pointer to the <ACE_Select_Reactor>. If this value if NULL then the <ACE_Select_Reactor> has been initialized with <disable_notify_pipe>. Definition at line 219 of file Select_Reactor_Base.h. Referenced by handle_input, notify, and open. |
1.2.14 written by Dimitri van Heesch,
© 1997-2002