Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Dev_Poll_Reactor.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // -*- C++ -*-
00003 
00004 #include "ace/Dev_Poll_Reactor.h"
00005 
00006 ACE_RCSID (ace,
00007            Dev_Poll_Reactor,
00008            "$Id: Dev_Poll_Reactor.cpp,v 1.1.1.1.2.1 2003/03/13 19:44:21 chad Exp $")
00009 
00010 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
00011 
00012 #  if defined (ACE_HAS_EVENT_POLL) && defined (linux)
00013 
00014 // #undef POLLIN
00015 // #undef POLLPRI
00016 // #undef POLLOUT
00017 // #undef POLLERR
00018 // #undef POLLHUP
00019 // #undef POLLNVAL
00020 
00021 #    include /**/ <asm/page.h>
00022   //#  include <asm/poll.h>
00023   // @@ UGLY HACK ... REMOVE ME
00024   //    <asm/poll.h> and <sys/poll.h> conflict.
00025 #    define POLLREMOVE      0x1000
00026 #    include /**/ <linux/eventpoll.h>
00027 
00028 #  elif defined (ACE_HAS_DEV_POLL)
00029 
00030 #    if defined (sun)
00031 #      include /**/ <sys/devpoll.h>
00032 #    elif defined (linux)
00033 #      include /**/ <linux/devpoll.h>
00034 #    endif  /* sun */
00035 
00036 #  endif  /* ACE_HAS_DEV_POLL */
00037 
00038 
00039 #if !defined (__ACE_INLINE__)
00040 # include "ace/Dev_Poll_Reactor.inl"
00041 #endif /* __ACE_INLINE__ */
00042 
00043 
00044 #include "ace/Handle_Set.h"
00045 #include "ace/Reactor.h"
00046 #include "ace/Timer_Heap.h"
00047 #include "ace/ACE.h"
00048 
00049 
00050 ACE_Dev_Poll_Reactor_Notify::ACE_Dev_Poll_Reactor_Notify (void)
00051   : dp_reactor_ (0)
00052   , notification_pipe_ ()
00053   , max_notify_iterations_ (-1)
00054 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00055   , alloc_queue_ ()
00056   , notify_queue_ ()
00057   , free_queue_ ()
00058   , notify_queue_lock_ ()
00059 #endif  /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00060 {
00061 }
00062 
00063 int
00064 ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
00065                                    ACE_Timer_Queue * /* timer_queue */,
00066                                    int disable_notify_pipe)
00067 {
00068   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::open");
00069 
00070   if (disable_notify_pipe == 0)
00071     {
00072       this->dp_reactor_ = ACE_dynamic_cast (ACE_Dev_Poll_Reactor *, r);
00073 
00074       if (this->dp_reactor_ == 0)
00075         {
00076           errno = EINVAL;
00077           return -1;
00078         }
00079 
00080       if (this->notification_pipe_.open () == -1)
00081         return -1;
00082 
00083 #if defined (F_SETFD)
00084       // close-on-exec
00085       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00086       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00087 #endif /* F_SETFD */
00088 
00089 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00090       ACE_Notification_Buffer *temp;
00091 
00092       ACE_NEW_RETURN (temp,
00093                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00094                       -1);
00095 
00096       if (this->alloc_queue_.enqueue_head (temp) == -1)
00097         return -1;
00098 
00099       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
00100         if (free_queue_.enqueue_head (temp + i) == -1)
00101           return -1;
00102 
00103 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00104 
00105       // Set the read handle into non-blocking mode since we need to
00106       // perform a "speculative" read when determining if their are
00107       // notifications to dispatch.
00108       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00109                           ACE_NONBLOCK) == -1)
00110         return -1;
00111     }
00112 
00113   return 0;
00114 }
00115 
00116 int
00117 ACE_Dev_Poll_Reactor_Notify::close (void)
00118 {
00119   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::close");
00120 
00121 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00122   // Free up the dynamically allocated resources.
00123   ACE_Notification_Buffer **b;
00124 
00125   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00126        alloc_iter.next (b) != 0;
00127        alloc_iter.advance ())
00128     {
00129       delete [] *b;
00130       *b = 0;
00131     }
00132 
00133   this->alloc_queue_.reset ();
00134   this->notify_queue_.reset ();
00135   this->free_queue_.reset ();
00136 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00137 
00138   return this->notification_pipe_.close ();
00139 }
00140 
00141 ssize_t
00142 ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
00143                                      ACE_Reactor_Mask mask,
00144                                      ACE_Time_Value *timeout)
00145 {
00146   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify");
00147 
00148   // Just consider this method a "no-op" if there's no
00149   // ACE_Dev_Poll_Reactor configured.
00150   if (this->dp_reactor_ == 0)
00151     return 0;
00152 
00153 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00154   ACE_Notification_Buffer buffer (eh, mask);
00155   // int notification_required = 0;
00156 
00157   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00158 
00159   // No pending notifications.
00160 
00161   // We will send notify for every message..
00162   // if (this->notify_queue_.is_empty ())
00163   //   notification_required = 1;
00164 
00165   ACE_Notification_Buffer *temp = 0;
00166 
00167   if (free_queue_.dequeue_head (temp) == -1)
00168     {
00169       // Grow the queue of available buffers.
00170       ACE_Notification_Buffer *temp1;
00171 
00172       ACE_NEW_RETURN (temp1,
00173                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00174                       -1);
00175 
00176       if (this->alloc_queue_.enqueue_head (temp1) == -1)
00177         return -1;
00178 
00179       // Start at 1 and enqueue only
00180       // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00181       // the first one will be used right now.
00182       for (size_t i = 1;
00183            i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00184            ++i)
00185         this->free_queue_.enqueue_head (temp1 + i);
00186 
00187       temp = temp1;
00188     }
00189 
00190   ACE_ASSERT (temp != 0);
00191   *temp = buffer;
00192 
00193   if (notify_queue_.enqueue_tail (temp) == -1)
00194     return -1;
00195 
00196   // Let us send a notify for every message
00197   // if (notification_required)
00198   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00199                          (char *) &buffer,
00200                          sizeof buffer,
00201                          timeout);
00202   if (n == -1)
00203     return -1;
00204 
00205   return 0;
00206 #else
00207   ACE_Notification_Buffer buffer (eh, mask);
00208 
00209   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00210                          (char *) &buffer,
00211                          sizeof buffer,
00212                          timeout);
00213   if (n == -1)
00214     return -1;
00215 
00216   return 0;
00217 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00218 }
00219 
00220 int
00221 ACE_Dev_Poll_Reactor_Notify::dispatch_notifications (
00222   int & /* number_of_active_handles */,
00223   ACE_Handle_Set & /* rd_mask */)
00224 {
00225   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notifications");
00226 
00227   // This method is unimplemented in the ACE_Dev_Poll_Reactor.
00228   // Instead, the notification handler is invoked as part of the IO
00229   // event set.  Doing so alters the some documented semantics that
00230   // state that the notifications are handled before IO events.
00231   // Enforcing such semantics does not appear to be beneficial, and
00232   // also serves to slow down event dispatching particularly with this
00233   // ACE_Dev_Poll_Reactor.
00234 
00235 #if 0
00236   ACE_HANDLE read_handle =
00237     this->notification_pipe_.read_handle ();
00238 
00239   // Note that we do not check if the handle has received any events.
00240   // Instead a non-blocking "speculative" read is performed.  If the
00241   // read returns with errno == EWOULDBLOCK then no notifications are
00242   // dispatched.  See ACE_Dev_Poll_Reactor_Notify::read_notify_pipe()
00243   // for details.
00244   if (read_handle != ACE_INVALID_HANDLE)
00245     {
00246       number_of_active_handles--;
00247 
00248       return this->handle_input (read_handle);
00249     }
00250   else
00251     return 0;
00252 #else
00253   ACE_NOTSUP_RETURN (-1);
00254 #endif  /* 0 */
00255 }
00256 
00257 int
00258 ACE_Dev_Poll_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00259                                                ACE_Notification_Buffer &buffer)
00260 {
00261   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::read_notify_pipe");
00262 
00263   // This is a (non-blocking) "speculative" read, i.e., we attempt to
00264   // read even if no event was polled on the read handle.  A
00265   // speculative read is necessary since notifications must be
00266   // dispatched before IO events.  We can avoid the speculative read
00267   // by "walking" the array of pollfd structures returned from
00268   // `/dev/poll' or `/dev/epoll' but that is potentially much more
00269   // expensive than simply checking for an EWOULDBLOCK.
00270 
00271   ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00272 
00273   if (n > 0)
00274     {
00275       // Check to see if we've got a short read.
00276       if (n != sizeof buffer)
00277         {
00278           ssize_t remainder = sizeof buffer - n;
00279 
00280           // If so, try to recover by reading the remainder.  If this
00281           // doesn't work we're in big trouble since the input stream
00282           // won't be aligned correctly.  I'm not sure quite what to
00283           // do at this point.  It's probably best just to return -1.
00284           if (ACE::recv (handle,
00285                          ((char *) &buffer) + n,
00286                          remainder) != remainder)
00287             return -1;
00288         }
00289 
00290 
00291       return 1;
00292     }
00293 
00294   // Return -1 if things have gone seriously wrong.
00295   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00296     return -1;
00297 
00298   return n;
00299 }
00300 
00301 
00302 int
00303 ACE_Dev_Poll_Reactor_Notify::handle_input (ACE_HANDLE handle)
00304 {
00305   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::handle_input");
00306 
00307   // @@ We may end up dispatching this event handler twice:  once when
00308   //    performing the speculative read on the notification pipe
00309   //    handle, and once more when dispatching the IO events.
00310 
00311   // Precondition: this->select_reactor_.token_.current_owner () ==
00312   // ACE_Thread::self ();
00313 
00314   int number_dispatched = 0;
00315   int result = 0;
00316   ACE_Notification_Buffer buffer;
00317 
00318   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
00319     {
00320       // Dispatch the buffer
00321       // NOTE: We count only if we made any dispatches ie. upcalls.
00322       if (this->dispatch_notify (buffer) > 0)
00323         number_dispatched++;
00324 
00325       // Bail out if we've reached the <notify_threshold_>.  Note that
00326       // by default <notify_threshold_> is -1, so we'll loop until all
00327       // the notifications in the pipe have been dispatched.
00328       if (number_dispatched == this->max_notify_iterations_)
00329         break;
00330     }
00331 
00332   if ((result == -1 && (errno != EWOULDBLOCK || errno != EAGAIN))
00333       || result == 0)
00334     {
00335       // Reassign number_dispatched to -1 if things have gone
00336       // seriously wrong.
00337       number_dispatched = -1;
00338     }
00339 
00340   // Enqueue ourselves into the list of waiting threads.  When we
00341   // reacquire the token we'll be off and running again with ownership
00342   // of the token.  The postcondition of this call is that
00343   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
00344   //this->select_reactor_->renew ();
00345 
00346   return number_dispatched;
00347 }
00348 
00349 ACE_HANDLE
00350 ACE_Dev_Poll_Reactor_Notify::notify_handle (void)
00351 {
00352   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::notify_handle");
00353 
00354   return this->notification_pipe_.read_handle ();
00355 }
00356 
00357 int
00358 ACE_Dev_Poll_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &)
00359 {
00360   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::is_dispatchable");
00361 
00362   ACE_NOTSUP_RETURN (-1);
00363 }
00364 
00365 int
00366 ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00367 {
00368   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dispatch_notify");
00369 
00370   int result = 0;
00371 
00372 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00373   // Dispatch all messages that are in the <notify_queue_>.
00374   {
00375     // We acquire the lock in a block to make sure we're not
00376     // holding the lock while delivering callbacks...
00377     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00378 
00379     ACE_Notification_Buffer *temp;
00380 
00381     if (notify_queue_.is_empty ())
00382       return 0;
00383     else if (notify_queue_.dequeue_head (temp) == -1)
00384       ACE_ERROR_RETURN ((LM_ERROR,
00385                          ACE_LIB_TEXT ("%p\n"),
00386                          ACE_LIB_TEXT ("dequeue_head")),
00387                         -1);
00388     buffer = *temp;
00389     if (free_queue_.enqueue_head (temp) == -1)
00390       ACE_ERROR_RETURN ((LM_ERROR,
00391                          ACE_LIB_TEXT ("%p\n"),
00392                          ACE_LIB_TEXT ("enqueue_head")),
00393                         -1);
00394   }
00395 
00396   // If eh == 0 then another thread is unblocking the
00397   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00398   // internal structures.  Otherwise, we need to dispatch the
00399   // appropriate handle_* method on the ACE_Event_Handler
00400   // pointer we've been passed.
00401   if (buffer.eh_ != 0)
00402     {
00403 
00404       switch (buffer.mask_)
00405         {
00406         case ACE_Event_Handler::READ_MASK:
00407         case ACE_Event_Handler::ACCEPT_MASK:
00408           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00409           break;
00410         case ACE_Event_Handler::WRITE_MASK:
00411           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00412           break;
00413         case ACE_Event_Handler::EXCEPT_MASK:
00414           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00415           break;
00416         default:
00417           // Should we bail out if we get an invalid mask?
00418           ACE_ERROR ((LM_ERROR,
00419                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00420                       buffer.mask_));
00421         }
00422       if (result == -1)
00423         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00424                                   ACE_Event_Handler::EXCEPT_MASK);
00425     }
00426 #else
00427   // If eh == 0 then another thread is unblocking the
00428   // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
00429   // internal structures.  Otherwise, we need to dispatch the
00430   // appropriate handle_* method on the ACE_Event_Handler
00431   // pointer we've been passed.
00432   if (buffer.eh_ != 0)
00433     {
00434       switch (buffer.mask_)
00435         {
00436         case ACE_Event_Handler::READ_MASK:
00437         case ACE_Event_Handler::ACCEPT_MASK:
00438           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00439           break;
00440         case ACE_Event_Handler::WRITE_MASK:
00441           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00442           break;
00443         case ACE_Event_Handler::EXCEPT_MASK:
00444           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00445           break;
00446         case ACE_Event_Handler::QOS_MASK:
00447           result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
00448           break;
00449         case ACE_Event_Handler::GROUP_QOS_MASK:
00450           result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
00451           break;
00452         default:
00453           // Should we bail out if we get an invalid mask?
00454           ACE_ERROR ((LM_ERROR,
00455                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00456                       buffer.mask_));
00457         }
00458       if (result == -1)
00459         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00460                                   ACE_Event_Handler::EXCEPT_MASK);
00461     }
00462 
00463 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00464 
00465   return 1;
00466 }
00467 
00468 void
00469 ACE_Dev_Poll_Reactor_Notify::max_notify_iterations (int iterations)
00470 {
00471   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00472 
00473   // Must always be > 0 or < 0 to optimize the loop exit condition.
00474   if (iterations == 0)
00475     iterations = 1;
00476 
00477   this->max_notify_iterations_ = iterations;
00478 }
00479 
00480 int
00481 ACE_Dev_Poll_Reactor_Notify::max_notify_iterations (void)
00482 {
00483   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::max_notify_iterations");
00484 
00485   return this->max_notify_iterations_;
00486 }
00487 
00488 int
00489 ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications (
00490   ACE_Event_Handler *eh,
00491   ACE_Reactor_Mask mask)
00492 {
00493   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::purge_pending_notifications");
00494 
00495 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00496 
00497   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00498 
00499   if (this->notify_queue_.is_empty ())
00500     return 0;
00501 
00502   ACE_Notification_Buffer *temp;
00503   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00504 
00505   size_t queue_size = this->notify_queue_.size ();
00506   int number_purged = 0;
00507   size_t i;
00508   for (i = 0; i < queue_size; ++i)
00509     {
00510       if (-1 == this->notify_queue_.dequeue_head (temp))
00511         ACE_ERROR_RETURN ((LM_ERROR,
00512                            ACE_LIB_TEXT ("%p\n"),
00513                            ACE_LIB_TEXT ("dequeue_head")),
00514                           -1);
00515 
00516       // If this is not a Reactor notify (it is for a particular
00517       // handler), and it matches the specified handler (or purging
00518       // all), and applying the mask would totally eliminate the
00519       // notification, then release it and count the number purged.
00520       if ((0 != temp->eh_) &&
00521           (0 == eh || eh == temp->eh_) &&
00522           ACE_BIT_DISABLED (temp->mask_, ~mask)) // The existing
00523                                                  // notification mask
00524                                                  // is left with
00525                                                  // nothing when
00526                                                  // applying the mask.
00527         {
00528           if (this->free_queue_.enqueue_head (temp) == -1)
00529             ACE_ERROR_RETURN ((LM_ERROR,
00530                                ACE_LIB_TEXT ("%p\n"),
00531                                ACE_LIB_TEXT ("enqueue_head")),
00532                               -1);
00533           ++number_purged;
00534         }
00535       else
00536         {
00537           // To preserve it, move it to the local_queue.
00538           // But first, if this is not a Reactor notify (it is for a
00539           // particular handler), and it matches the specified handler
00540           // (or purging all), then apply the mask.
00541           if ((0 != temp->eh_) &&
00542               (0 == eh || eh == temp->eh_))
00543             ACE_CLR_BITS(temp->mask_, mask);
00544           if (-1 == local_queue.enqueue_head (temp))
00545             return -1;
00546         }
00547     }
00548 
00549   if (this->notify_queue_.size ())
00550     {
00551       // Should be empty!
00552       ACE_ASSERT (0);
00553       return -1;
00554     }
00555 
00556   // Now put it back in the notify queue.
00557   queue_size = local_queue.size ();
00558   for (i = 0; i < queue_size; ++i)
00559     {
00560       if (-1 == local_queue.dequeue_head (temp))
00561         ACE_ERROR_RETURN ((LM_ERROR,
00562                            ACE_LIB_TEXT ("%p\n"),
00563                            ACE_LIB_TEXT ("dequeue_head")),
00564                           -1);
00565 
00566       if (-1 == this->notify_queue_.enqueue_head (temp))
00567         ACE_ERROR_RETURN ((LM_ERROR,
00568                            ACE_LIB_TEXT ("%p\n"),
00569                            ACE_LIB_TEXT ("enqueue_head")),
00570                           -1);
00571     }
00572 
00573   return number_purged;
00574 
00575 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00576   ACE_UNUSED_ARG (eh);
00577   ACE_UNUSED_ARG (mask);
00578   ACE_NOTSUP_RETURN (-1);
00579 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00580 }
00581 
00582 void
00583 ACE_Dev_Poll_Reactor_Notify::dump (void) const
00584 {
00585   ACE_TRACE ("ACE_Dev_Poll_Reactor_Notify::dump");
00586 
00587   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00588   ACE_DEBUG ((LM_DEBUG,
00589               ACE_LIB_TEXT ("dp_reactor_ = 0x%x"),
00590               this->dp_reactor_));
00591   this->notification_pipe_.dump ();
00592   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00593 }
00594 
00595 // -----------------------------------------------------------------
00596 
00597 
00598 ACE_Dev_Poll_Reactor_Handler_Repository::ACE_Dev_Poll_Reactor_Handler_Repository (void)
00599   : max_size_ (0),
00600     handlers_ (0)
00601 {
00602   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::ACE_Dev_Poll_Reactor_Handler_Repository");
00603 }
00604 
00605 int
00606 ACE_Dev_Poll_Reactor_Handler_Repository::invalid_handle (
00607   ACE_HANDLE handle) const
00608 {
00609   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::invalid_handle");
00610 
00611   if (handle < 0 || handle >= this->max_size_)
00612     {
00613       errno = EINVAL;
00614       return 1;
00615     }
00616   else
00617     return 0;
00618 }
00619 
00620 int
00621 ACE_Dev_Poll_Reactor_Handler_Repository::handle_in_range (
00622   ACE_HANDLE handle) const
00623 {
00624   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::handle_in_range");
00625 
00626   if (handle >= 0 && handle < this->max_size_)
00627     return 1;
00628   else
00629     {
00630       errno = EINVAL;
00631       return 0;
00632     }
00633 }
00634 
00635 int
00636 ACE_Dev_Poll_Reactor_Handler_Repository::open (size_t size)
00637 {
00638   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::open");
00639 
00640   this->max_size_ = size;
00641 
00642   // Try to allocate the memory.
00643   ACE_NEW_RETURN (this->handlers_,
00644                   ACE_Dev_Poll_Event_Tuple[size],
00645                   -1);
00646 
00647   // Try to increase the number of handles if <size> is greater than
00648   // the current limit.
00649   return ACE::set_handle_limit (size);
00650 }
00651 
00652 int
00653 ACE_Dev_Poll_Reactor_Handler_Repository::unbind_all (void)
00654 {
00655   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::unbind_all");
00656 
00657   // Unbind all of the event handlers.
00658   for (int handle = 0;
00659        handle < this->max_size_;
00660        ++handle)
00661     this->unbind (handle);
00662 
00663   return 0;
00664 }
00665 
00666 int
00667 ACE_Dev_Poll_Reactor_Handler_Repository::close (void)
00668 {
00669   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::close");
00670 
00671   if (this->handlers_ != 0)
00672     {
00673       this->unbind_all ();
00674 
00675       delete [] this->handlers_;
00676       this->handlers_ = 0;
00677     }
00678 
00679   return 0;
00680 }
00681 
00682 ACE_Event_Handler *
00683 ACE_Dev_Poll_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00684                                                size_t *index_p)
00685 {
00686   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::find");
00687 
00688   ACE_Event_Handler *eh = 0;
00689 
00690   // Only bother to search for the <handle> if it's in range.
00691   if (this->handle_in_range (handle))
00692     {
00693       eh = this->handlers_[handle].event_handler;
00694 
00695       if (eh != 0 && index_p != 0)
00696         *index_p = handle;
00697       else
00698         errno = ENOENT;
00699     }
00700 
00701   return eh;
00702 }
00703 
00704 int
00705 ACE_Dev_Poll_Reactor_Handler_Repository::bind (
00706   ACE_HANDLE handle,
00707   ACE_Event_Handler *event_handler,
00708   ACE_Reactor_Mask mask)
00709 {
00710   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::bind");
00711 
00712   if (handle == ACE_INVALID_HANDLE)
00713     handle = event_handler->get_handle ();
00714 
00715   if (this->invalid_handle (handle))
00716     return -1;
00717 
00718   this->handlers_[handle].event_handler = event_handler;
00719   this->handlers_[handle].mask = mask;
00720 
00721   return 0;
00722 }
00723 
00724 int
00725 ACE_Dev_Poll_Reactor_Handler_Repository::unbind (ACE_HANDLE handle)
00726 {
00727   ACE_TRACE ("ACE_Dev_Poll_Reactor_Handler_Repository::unbind");
00728 
00729   if (this->find (handle) == 0)
00730     return -1;
00731 
00732   this->handlers_[handle].event_handler = 0;
00733   this->handlers_[handle].mask = ACE_Event_Handler::NULL_MASK;
00734   this->handlers_[handle].suspended = 0;
00735 
00736   return 0;
00737 }
00738 
00739 // -----------------------------------------------------------------
00740 
00741 ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor (ACE_Sig_Handler *sh,
00742                                             ACE_Timer_Queue *tq,
00743                                             int disable_notify_pipe,
00744                                             ACE_Reactor_Notify *notify,
00745                                             int mask_signals)
00746   : initialized_ (0)
00747   , poll_fd_ (ACE_INVALID_HANDLE)
00748   , size_ (0)
00749   // , ready_set_ ()
00750 #if defined (ACE_HAS_EVENT_POLL)
00751   , mmap_ (0)
00752 #else
00753   , dp_fds_ (0)
00754 #endif  /* ACE_HAS_EVENT_POLL */
00755   , start_pfds_ (0)
00756   , end_pfds_ (0)
00757   , deactivated_ (0)
00758   , lock_ ()
00759   , lock_adapter_ (lock_)
00760   , timer_queue_ (0)
00761   , delete_timer_queue_ (0)
00762   , signal_handler_ (0)
00763   , delete_signal_handler_ (0)
00764   , notify_handler_ (0)
00765   , delete_notify_handler_ (0)
00766   , mask_signals_ (mask_signals)
00767   , restart_ (0)
00768 {
00769   ACE_TRACE ("ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor");
00770 
00771   if (this->open (ACE::max_handles (),
00772                   0,
00773                   sh,
00774                   tq,
00775                   disable_notify_pipe,
00776                   notify) == -1)
00777     ACE_ERROR ((LM_ERROR,
00778                 ACE_LIB_TEXT ("%p\n"),
00779                 ACE_LIB_TEXT ("ACE_Dev_Poll_Reactor::open ")
00780                 ACE_LIB_TEXT ("failed inside ")
00781                 ACE_LIB_TEXT ("ACE_Dev_Poll_Reactor::CTOR")));
00782 }
00783 
00784 ACE_Dev_Poll_Reactor::ACE_Dev_Poll_Reactor (size_t size,
00785                                             int rs,
00786                                             ACE_Sig_Handler *sh,
00787                                             ACE_Timer_Queue *tq,
00788                                             int disable_notify_pipe,
00789                                             ACE_Reactor_Notify *notify,
00790                                             int mask_signals)
00791   : initialized_ (0)
00792   , poll_fd_ (ACE_INVALID_HANDLE)
00793   , size_ (0)
00794   // , ready_set_ ()
00795 #if defined (ACE_HAS_EVENT_POLL)
00796   , mmap_ (0)
00797 #else
00798   , dp_fds_ (0)
00799 #endif  /* ACE_HAS_EVENT_POLL */
00800   , start_pfds_ (0)
00801   , end_pfds_ (0)
00802   , deactivated_ (0)
00803   , lock_ ()
00804   , lock_adapter_ (lock_)
00805   , timer_queue_ (0)
00806   , delete_timer_queue_ (0)
00807   , signal_handler_ (0)
00808   , delete_signal_handler_ (0)
00809   , notify_handler_ (0)
00810   , delete_notify_handler_ (0)
00811   , mask_signals_ (mask_signals)
00812   , restart_ (0)
00813 {
00814   if (this->open (size,
00815                   rs,
00816                   sh,
00817                   tq,
00818                   disable_notify_pipe,
00819                   notify) == -1)
00820     ACE_ERROR ((LM_ERROR,
00821                 ACE_LIB_TEXT ("%p\n"),
00822                 ACE_LIB_TEXT ("ACE_Dev_Poll_Reactor::open ")
00823                 ACE_LIB_TEXT ("failed inside ACE_Dev_Poll_Reactor::CTOR")));
00824 }
00825 
00826 ACE_Dev_Poll_Reactor::~ACE_Dev_Poll_Reactor (void)
00827 {
00828   ACE_TRACE ("ACE_Dev_Poll_Reactor::~ACE_Dev_Poll_Reactor");
00829 
00830   (void) this->close ();
00831 }
00832 
00833 int
00834 ACE_Dev_Poll_Reactor::open (size_t size,
00835                             int restart,
00836                             ACE_Sig_Handler *sh,
00837                             ACE_Timer_Queue *tq,
00838                             int disable_notify_pipe,
00839                             ACE_Reactor_Notify *notify)
00840 {
00841   ACE_TRACE ("ACE_Dev_Poll_Reactor::open");
00842 
00843   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
00844 
00845   // Can't initialize ourselves more than once.
00846   if (this->initialized_ > 0)
00847     return -1;
00848 
00849   this->restart_ = restart;
00850   this->signal_handler_ = sh;
00851   this->timer_queue_ = tq;
00852   this->notify_handler_ = notify;
00853 
00854   int result = 0;
00855 
00856   // Allows the signal handler to be overridden.
00857   if (this->signal_handler_ == 0)
00858     {
00859       ACE_NEW_RETURN (this->signal_handler_,
00860                       ACE_Sig_Handler,
00861                       -1);
00862 
00863       if (this->signal_handler_ == 0)
00864         result = -1;
00865       else
00866         this->delete_signal_handler_ = 1;
00867     }
00868 
00869   // Allows the timer queue to be overridden.
00870   if (result != -1 && this->timer_queue_ == 0)
00871     {
00872       ACE_NEW_RETURN (this->timer_queue_,
00873                       ACE_Timer_Heap,
00874                       -1);
00875 
00876       if (this->timer_queue_ == 0)
00877         result = -1;
00878       else
00879         this->delete_timer_queue_ = 1;
00880     }
00881 
00882   // Allows the Notify_Handler to be overridden.
00883   if (result != -1 && this->notify_handler_ == 0)
00884     {
00885       ACE_NEW_RETURN (this->notify_handler_,
00886                       ACE_Dev_Poll_Reactor_Notify,
00887                       -1);
00888 
00889       if (this->notify_handler_ == 0)
00890         result = -1;
00891       else
00892         this->delete_notify_handler_ = 1;
00893     }
00894 
00895 #if defined (ACE_HAS_EVENT_POLL)
00896 
00897   // Open the `/dev/epoll' character device.
00898   this->poll_fd_ = ACE_OS::open ("/dev/epoll", O_RDWR);
00899   if (this->poll_fd_ == ACE_INVALID_HANDLE)
00900     result = -1;
00901 
00902   // Set the maximum number of file descriptors to expect.
00903   if (result != -1
00904       && ACE_OS::ioctl (this->poll_fd_, EP_ALLOC, (void *) size) == 0)
00905     {
00906       // Map an area of memory to which results will be fed.
00907       void *mm = ACE_OS::mmap (0,
00908                                EP_MAP_SIZE (size),
00909                                PROT_READ | PROT_WRITE,
00910                                MAP_PRIVATE,
00911                                this->poll_fd_,
00912                                0);
00913 
00914       if (mm == (void *) MAP_FAILED)
00915         result = -1;
00916       else
00917         this->mmap_ = ACE_static_cast (char *, mm);
00918     }
00919 #else
00920 
00921   // Allocate the array before opening the device to avoid a potential
00922   // resource leak if allocation fails.
00923   ACE_NEW_RETURN (this->dp_fds_,
00924                   pollfd[size],
00925                   -1);
00926 
00927   // Open the `/dev/poll' character device.
00928   this->poll_fd_ = ACE_OS::open ("/dev/poll", O_RDWR);
00929   if (this->poll_fd_ == ACE_INVALID_HANDLE)
00930     result = -1;
00931 
00932 #endif  /* ACE_HAS_EVENT_POLL */
00933 
00934   if (result != -1 && this->handler_rep_.open (size) == -1)
00935     result = -1;
00936 
00937   // Registration of the notification handler must be done after the
00938   // /dev/poll device has been fully initialized.
00939   else if (this->notify_handler_->open (this,
00940                                         0,
00941                                         disable_notify_pipe) == -1
00942            || this->register_handler_i (
00943                        this->notify_handler_->notify_handle (),
00944                        this->notify_handler_,
00945                        ACE_Event_Handler::READ_MASK) == -1)
00946     result = -1;
00947 
00948   this->size_ = size;
00949 
00950   if (result != -1)
00951     // We're all set to go.
00952     this->initialized_ = 1;
00953   else
00954     // This will close down all the allocated resources properly.
00955     (void) this->close ();
00956 
00957   return result;
00958 }
00959 
00960 int
00961 ACE_Dev_Poll_Reactor::current_info (ACE_HANDLE, size_t & /* size */)
00962 {
00963   ACE_NOTSUP_RETURN (-1);
00964 }
00965 
00966 
00967 int
00968 ACE_Dev_Poll_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
00969 {
00970   if (this->delete_signal_handler_ && this->signal_handler_)
00971     delete this->signal_handler_;
00972 
00973   this->signal_handler_ = signal_handler;
00974   this->delete_signal_handler_ = 0;
00975 
00976   return 0;
00977 }
00978 
00979 int
00980 ACE_Dev_Poll_Reactor::set_timer_queue (ACE_Timer_Queue *tq)
00981 {
00982   // @note This method is deprecated.
00983 
00984   return this->timer_queue (tq);
00985 }
00986 
00987 int
00988 ACE_Dev_Poll_Reactor::timer_queue (ACE_Timer_Queue *tq)
00989 {
00990   if (this->delete_timer_queue_ && this->timer_queue_)
00991     delete this->timer_queue_;
00992 
00993   this->timer_queue_ = tq;
00994   this->delete_timer_queue_ = 0;
00995 
00996   return 0;
00997 
00998 }
00999 
01000 ACE_Timer_Queue *
01001 ACE_Dev_Poll_Reactor::timer_queue (void) const
01002 {
01003   return this->timer_queue_;
01004 }
01005 
01006 int
01007 ACE_Dev_Poll_Reactor::close (void)
01008 {
01009   ACE_TRACE ("ACE_Dev_Poll_Reactor::close");
01010 
01011   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01012 
01013   int result = 0;
01014 
01015 #if defined (ACE_HAS_EVENT_POLL)
01016   if (this->mmap_ != 0)
01017     {
01018       (void) ACE_OS::munmap (this->mmap_, EP_MAP_SIZE (this->size_));
01019       this->mmap_ = 0;
01020     }
01021 #else
01022   delete [] this->dp_fds_;
01023   this->dp_fds_ = 0;
01024 #endif  /* ACE_HAS_EVENT_POLL */
01025 
01026   if (this->poll_fd_ != ACE_INVALID_HANDLE)
01027     {
01028 #if defined (ACE_HAS_EVENT_POLL)
01029       (void) ACE_OS::ioctl (this->poll_fd_, EP_FREE, 0);
01030 #endif  /* ACE_HAS_EVENT_POLL */
01031       result = ACE_OS::close (this->poll_fd_);
01032     }
01033 
01034   if (this->delete_signal_handler_)
01035     {
01036       delete this->signal_handler_;
01037       this->signal_handler_ = 0;
01038       this->delete_signal_handler_ = 0;
01039     }
01040 
01041   (void) this->handler_rep_.close ();
01042 
01043   if (this->delete_timer_queue_)
01044     {
01045       delete this->timer_queue_;
01046       this->timer_queue_ = 0;
01047       this->delete_timer_queue_ = 0;
01048     }
01049 
01050   if (this->notify_handler_ != 0)
01051     this->notify_handler_->close ();
01052 
01053   if (this->delete_notify_handler_)
01054     {
01055       delete this->notify_handler_;
01056       this->notify_handler_ = 0;
01057       this->delete_notify_handler_ = 0;
01058     }
01059 
01060   this->poll_fd_ = ACE_INVALID_HANDLE;
01061   this->start_pfds_ = 0;
01062   this->end_pfds_ = 0;
01063   this->initialized_ = 0;
01064 
01065   return result;
01066 }
01067 
01068 int
01069 ACE_Dev_Poll_Reactor::work_pending (const ACE_Time_Value & max_wait_time)
01070 {
01071   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending");
01072 
01073   // Stash the current time
01074   //
01075   // The destructor of this object will automatically compute how much
01076   // time elapsed since this method was called.
01077   ACE_Time_Value mwt (max_wait_time);
01078   ACE_MT (ACE_Countdown_Time countdown (&mwt));
01079 
01080   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01081 
01082   // Update the countdown to reflect time waiting for the mutex.
01083   ACE_MT (countdown.update ());
01084 
01085   return this->work_pending_i (&mwt);
01086 }
01087 
01088 int
01089 ACE_Dev_Poll_Reactor::work_pending_i (ACE_Time_Value * max_wait_time)
01090 {
01091   ACE_TRACE ("ACE_Dev_Poll_Reactor::work_pending_i");
01092 
01093   if (this->deactivated_)
01094     return 0;
01095 
01096   if (this->start_pfds_ != this->end_pfds_)
01097     return 1;  // We still have work_pending() do not poll for
01098                // additional events.
01099 
01100   ACE_Time_Value timer_buf (0);
01101   ACE_Time_Value *this_timeout = 0;
01102 
01103   this_timeout = this->timer_queue_->calculate_timeout (max_wait_time,
01104                                                         &timer_buf);
01105 
01106   // Check if we have timers to fire.
01107   int timers_pending =
01108     ((this_timeout != 0 && max_wait_time == 0)
01109      || (this_timeout != 0 && max_wait_time != 0
01110          && *this_timeout != *max_wait_time) ? 1 : 0);
01111 
01112   long timeout =
01113     (this_timeout == 0 ? -1 /* Infinity */ : this_timeout->msec ());
01114 
01115 #if defined (ACE_HAS_EVENT_POLL)
01116 
01117   struct evpoll evp;
01118 
01119   evp.ep_timeout = timeout;  // Milliseconds
01120   evp.ep_resoff = 0;
01121 
01122   // Poll for events
01123   int nfds = ACE_OS::ioctl (this->poll_fd_, EP_POLL, &evp);
01124 
01125   // Retrieve the results from the memory map.
01126   this->start_pfds_ =
01127     ACE_reinterpret_cast (struct pollfd *,
01128                           this->mmap_ + evp.ep_resoff);
01129 
01130 #else
01131 
01132   struct dvpoll dvp;
01133 
01134   dvp.dp_fds = this->dp_fds_;
01135   dvp.dp_nfds = this->size_;
01136   dvp.dp_timeout = timeout;  // Milliseconds
01137 
01138   // Poll for events
01139   int nfds = ACE_OS::ioctl (this->poll_fd_, DP_POLL, &dvp);
01140 
01141   // Retrieve the results from the pollfd array.
01142   this->start_pfds_ = dvp.dp_fds;
01143 
01144 #endif  /* ACE_HAS_EVENT_POLL */
01145 
01146   // If nfds == 0 then end_pfds_ == start_pfds_ meaning that there is
01147   // no work pending.  If nfds > 0 then there is work pending.
01148   // Otherwise an error occurred.
01149   if (nfds > -1)
01150     this->end_pfds_ = this->start_pfds_ + nfds;
01151 
01152   // If timers are pending, override any timeout from the poll.
01153   return (nfds == 0 && timers_pending != 0 ? 1 : nfds);
01154 }
01155 
01156 
01157 int
01158 ACE_Dev_Poll_Reactor::handle_events (ACE_Time_Value *max_wait_time)
01159 {
01160   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01161 
01162   // Stash the current time
01163   //
01164   // The destructor of this object will automatically compute how much
01165   // time elapsed since this method was called.
01166   ACE_MT (ACE_Countdown_Time countdown (max_wait_time));
01167 
01168   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01169 
01170   if (this->deactivated_)
01171     return -1;
01172 
01173   // Update the countdown to reflect time waiting for the mutex.
01174   ACE_MT (countdown.update ());
01175 
01176   return this->handle_events_i (max_wait_time);
01177 }
01178 
01179 int
01180 ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time)
01181 {
01182   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events_i");
01183 
01184   int result = 0;
01185   // int active_handle_count = 0;
01186 
01187   // Poll for events
01188   //
01189   // If the underlying ioctl() call was interrupted via the interrupt
01190   // signal (i.e. returned -1 with errno == EINTR) then the loop will
01191   // be restarted if so desired.
01192   do
01193     {
01194       result = this->work_pending_i (max_wait_time);
01195     }
01196   while (result == -1 && this->restart_ != 0 && errno == EINTR);
01197 
01198   if (result == 0 || (result == -1 && errno == ETIME))
01199     return 0;
01200   else if (result == -1)
01201     return -1;
01202 
01203   // Dispatch the events, if any.
01204   return this->dispatch ();
01205 }
01206 
01207 int
01208 ACE_Dev_Poll_Reactor::dispatch (void)
01209 {
01210   ACE_TRACE ("ACE_Dev_Poll_Reactor::dispatch");
01211 
01212   int io_handlers_dispatched = 0;
01213   int other_handlers_dispatched = 0;
01214   int signal_occurred = 0;
01215 
01216   // The following do/while loop keeps dispatching as long as there
01217   // are still active handles.  Note that the only way we should ever
01218   // iterate more than once through this loop is if signals occur
01219   // while we're dispatching other handlers.
01220 
01221   do
01222     {
01223       // Note that we keep track of changes to our state.  If any of
01224       // the dispatch_*() methods below return -1 it means that the
01225       // "interest set" state has changed as the result of an
01226       // ACE_Event_Handler being dispatched.  This means that we
01227       // need to bail out and rerun the poll loop since our existing
01228       // notion of handles in dispatch set may no longer be correct.
01229       //
01230       // In the beginning, our state starts out unchanged.  After
01231       // every iteration (i.e., due to signals), our state starts out
01232       // unchanged again.
01233 
01234       // this->state_changed_ = 0;
01235 
01236       // Perform the Template Method for dispatching all the handlers.
01237 
01238       // First check for interrupts.
01239       if (0 /* active_handle_count == -1 */)
01240         {
01241           // Bail out -- we got here since the poll (i.e. ioctl()) was
01242           // interrupted.
01243           if (ACE_Sig_Handler::sig_pending () != 0)
01244             {
01245               ACE_Sig_Handler::sig_pending (0);
01246 
01247 #if 0
01248               // If any HANDLES in the <ready_set_> are activated as a
01249               // result of signals they should be dispatched since
01250               // they may be time critical...
01251               pfds = this->ready_set_.pfds;
01252               active_handle_count = this->ready_set_.nfds;
01253 #endif  /* 0 */
01254 
01255               // Record the fact that the Reactor has dispatched a
01256               // handle_signal() method.  We need this to return the
01257               // appropriate count below.
01258               signal_occurred = 1;
01259             }
01260           else
01261             return -1;
01262         }
01263 
01264       // Handle timers early since they may have higher latency
01265       // constraints than I/O handlers.  Ideally, the order of
01266       // dispatching should be a strategy...
01267       else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
01268         // State has changed or timer queue has failed, exit loop.
01269         break;
01270 
01271       // Check to see if there are no more I/O handles left to
01272       // dispatch AFTER we've handled the timers.
01273       else if (0 /* active_handle_count == 0 */)
01274         return io_handlers_dispatched
01275           + other_handlers_dispatched
01276           + signal_occurred;
01277 
01278 #if 0
01279       // Next dispatch the notification handlers (if there are any to
01280       // dispatch).  These are required to handle multiple threads
01281       // that are trying to update the Reactor.
01282       else if (this->dispatch_notification_handlers (
01283                  dispatch_set,
01284                  active_handle_count,
01285                  other_handlers_dispatched) == -1)
01286         // State has changed or a serious failure has occured, so exit
01287         // loop.
01288         break;
01289 #endif  /* 0 */
01290 
01291       // Finally, dispatch the I/O handlers.
01292       else if (this->dispatch_io_events (io_handlers_dispatched) == -1)
01293         // State has changed, so exit loop.
01294         break;
01295     }
01296   while (0 /* active_handle_count > 0 */);
01297 
01298   return io_handlers_dispatched + other_handlers_dispatched + signal_occurred;
01299 }
01300 
01301 int
01302 ACE_Dev_Poll_Reactor::dispatch_timer_handlers (
01303   int &number_of_timers_cancelled)
01304 {
01305   // Release the lock during the upcall.
01306   ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_);
01307   ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>,
01308                     reverse_guard,
01309                     reverse_lock,
01310                     -1);
01311 
01312   number_of_timers_cancelled += this->timer_queue_->expire ();
01313 
01314   return 0;
01315 }
01316 
01317 #if 0
01318 int
01319 ACE_Dev_Poll_Reactor::dispatch_notification_handlers (
01320   ACE_Select_Reactor_Handle_Set &dispatch_set,
01321   int &number_of_active_handles,
01322   int &number_of_handlers_dispatched)
01323 {
01324   // Check to see if the ACE_HANDLE associated with the
01325   // Dev_Poll_Reactor's notify hook is enabled.  If so, it means that
01326   // one or more other threads are trying to update the
01327   // ACE_Dev_Poll_Reactor's internal tables or the notify pipe is
01328   // enabled.  We'll handle all these threads and notifications, and
01329   // then break out to continue the event loop.
01330 
01331   int n =
01332     this->notify_handler_->dispatch_notifications (number_of_active_handles,
01333                                                    dispatch_set.rd_mask_);
01334 
01335   if (n == -1)
01336     return -1;
01337   else
01338     number_of_handlers_dispatched += n;
01339 
01340   return /* this->state_changed_ ? -1 : */ 0;
01341 }
01342 #endif  /* 0 */
01343 
01344 int
01345 ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
01346 {
01347   // Since the underlying event demultiplexing mechansim (`/dev/poll'
01348   // or '/dev/epoll') is stateful, and since only one result buffer is
01349   // used, all pending events (i.e. those retrieved from a previous
01350   // poll) must be dispatched before any additional event can be
01351   // polled.  As such, the Dev_Poll_Reactor keeps track of the
01352   // progress of events that have been dispatched.
01353 
01354   // Dispatch the events.
01355   //
01356   // The semantics of this loop in the presence of multiple threads is
01357   // non-trivial.  this->start_pfds_ will be incremented each time an
01358   // event handler is dispatched, which may be done across multiple
01359   // threads.  Multiple threads may change the loop variables.  Care
01360   // must be taken to only change those variables with the reactor
01361   // lock held.
01362   //
01363   // Notice that pfds only contains file descriptors that have
01364   // received events.
01365   for (struct pollfd *& pfds = this->start_pfds_;
01366        pfds < this->end_pfds_;
01367        /* Nothing to do before next loop iteration */)
01368     {
01369       const ACE_HANDLE handle = pfds->fd;
01370       const short revents     = pfds->revents;
01371 
01372       // Increment the pointer to the next pollfd element before we
01373       // release the lock.  Otherwise event handlers end up being
01374       // dispatched multiple times for the same poll.
01375       ++pfds;
01376 
01377       ACE_Event_Handler *eh = this->handler_rep_.find (handle);
01378 
01379       {
01380         // Modify the reference count in an exception-safe way.
01381         ACE_Dev_Poll_Handler_Guard (this->handler_rep_, handle);
01382 
01383         // Release the lock during the upcall.
01384         ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_);
01385         ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>,
01386                           reverse_guard,
01387                           reverse_lock,
01388                           -1);
01389 
01390         // Dispatch all output events.
01391         if (ACE_BIT_ENABLED (revents, POLLOUT))
01392           {
01393             int status =
01394               this->upcall (eh, &ACE_Event_Handler::handle_output, handle);
01395 
01396             if (status < 0)
01397               {
01398                 // Note that the lock is reacquired in
01399                 // remove_handler().
01400                 return this->remove_handler (handle,
01401                                              ACE_Event_Handler::WRITE_MASK);
01402               }
01403 
01404             io_handlers_dispatched++;
01405           }
01406 
01407         // Dispatch all "high priority" (e.g. out-of-band data) events.
01408         if (ACE_BIT_ENABLED (revents, POLLPRI))
01409           {
01410             int status =
01411               this->upcall (eh, &ACE_Event_Handler::handle_exception, handle);
01412 
01413             if (status < 0)
01414               {
01415                 // Note that the lock is reacquired in
01416                 // remove_handler().
01417                 return this->remove_handler (handle,
01418                                              ACE_Event_Handler::EXCEPT_MASK);
01419               }
01420 
01421             io_handlers_dispatched++;
01422           }
01423 
01424         // Dispatch all input events.
01425         if (ACE_BIT_ENABLED (revents, POLLIN))
01426           {
01427             int status =
01428               this->upcall (eh, &ACE_Event_Handler::handle_input, handle);
01429 
01430             if (status < 0)
01431               {
01432                 // Note that the lock is reacquired in
01433                 // remove_handler().
01434                 return this->remove_handler (handle,
01435                                              ACE_Event_Handler::READ_MASK);
01436               }
01437 
01438             io_handlers_dispatched++;
01439           }
01440       } // The reactor lock is reacquired upon leaving this scope.
01441     }
01442 
01443   return 0;
01444 }
01445 
01446 int
01447 ACE_Dev_Poll_Reactor::alertable_handle_events (ACE_Time_Value *max_wait_time)
01448 {
01449   ACE_TRACE ("ACE_Dev_Poll_Reactor::alertable_handle_events");
01450 
01451   return this->handle_events (max_wait_time);
01452 }
01453 
01454 int
01455 ACE_Dev_Poll_Reactor::handle_events (ACE_Time_Value &max_wait_time)
01456 {
01457   ACE_TRACE ("ACE_Dev_Poll_Reactor::handle_events");
01458 
01459   return this->handle_events (&max_wait_time);
01460 }
01461 
01462 int
01463 ACE_Dev_Poll_Reactor::alertable_handle_events (ACE_Time_Value &max_wait_time)
01464 {
01465   ACE_TRACE ("ACE_Dev_Poll_Reactor::alertable_handle_events");
01466 
01467   return this->handle_events (max_wait_time);
01468 }
01469 
01470 int
01471 ACE_Dev_Poll_Reactor::deactivated (void)
01472 {
01473   return this->deactivated_;
01474 }
01475 
01476 void
01477 ACE_Dev_Poll_Reactor::deactivate (int do_stop)
01478 {
01479   this->deactivated_ = do_stop;
01480   this->wakeup_all_threads ();
01481 }
01482 
01483 int
01484 ACE_Dev_Poll_Reactor::register_handler (ACE_Event_Handler *handler,
01485                                         ACE_Reactor_Mask mask)
01486 {
01487   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01488 
01489   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01490 
01491   return this->register_handler_i (handler->get_handle (),
01492                                    handler,
01493                                    mask);
01494 }
01495 
01496 int
01497 ACE_Dev_Poll_Reactor::register_handler (ACE_HANDLE handle,
01498                                         ACE_Event_Handler *event_handler,
01499                                         ACE_Reactor_Mask mask)
01500 {
01501   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01502 
01503   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01504 
01505   return this->register_handler_i (handle,
01506                                    event_handler,
01507                                    mask);
01508 };
01509 
01510 int
01511 ACE_Dev_Poll_Reactor::register_handler_i (ACE_HANDLE handle,
01512                                           ACE_Event_Handler *event_handler,
01513                                           ACE_Reactor_Mask mask)
01514 {
01515   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler_i");
01516 
01517   if (handle == ACE_INVALID_HANDLE
01518       || mask == ACE_Event_Handler::NULL_MASK
01519       || this->handler_rep_.find (handle) != 0)
01520     {
01521       errno = EINVAL;
01522       return -1;
01523     }
01524 
01525   // Add the event handler to the repository.
01526   if (this->handler_rep_.bind (handle, event_handler, mask) != 0)
01527     return -1;
01528 
01529   struct pollfd pfd;
01530 
01531   pfd.fd      = handle;
01532   pfd.events  = this->reactor_mask_to_poll_event (mask);
01533   pfd.revents = 0;
01534 
01535   // Add file descriptor to the "interest set."
01536   if (ACE_OS::write (this->poll_fd_, &pfd, sizeof (pfd)) != sizeof (pfd))
01537     {
01538       (void) this->handler_rep_.unbind (handle);
01539 
01540       return -1;
01541     }
01542 
01543   // Note the fact that we've changed the state of the wait_set_,
01544   // which is used by the dispatching loop to determine whether it can
01545   // keep going or if it needs to reconsult select().
01546   // this->state_changed_ = 1;
01547 
01548   return 0;
01549 }
01550 
01551 int
01552 ACE_Dev_Poll_Reactor::register_handler (
01553   ACE_HANDLE /* event_handle */,
01554   ACE_HANDLE /* io_handle */,
01555   ACE_Event_Handler * /* event_handler */,
01556   ACE_Reactor_Mask /* mask */)
01557 {
01558   ACE_NOTSUP_RETURN (-1);
01559 }
01560 
01561 int
01562 ACE_Dev_Poll_Reactor::register_handler (const ACE_Handle_Set &handle_set,
01563                                         ACE_Event_Handler *event_handler,
01564                                         ACE_Reactor_Mask mask)
01565 {
01566   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01567 
01568   ACE_Handle_Set_Iterator handle_iter (handle_set);
01569 
01570   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01571 
01572   // @@ It might be more efficient to construct a pollfd array and
01573   //    pass it to the write() call in register_handler_i() only once,
01574   //    instead of calling write() (a system call) once for each file
01575   //    descriptor.
01576 
01577   for (ACE_HANDLE h = handle_iter ();
01578        h != ACE_INVALID_HANDLE;
01579        h = handle_iter ())
01580     if (this->register_handler_i (h, event_handler, mask) == -1)
01581       return -1;
01582 
01583   return 0;
01584 }
01585 
01586 int
01587 ACE_Dev_Poll_Reactor::register_handler (int signum,
01588                                         ACE_Event_Handler *new_sh,
01589                                         ACE_Sig_Action *new_disp,
01590                                         ACE_Event_Handler **old_sh,
01591                                         ACE_Sig_Action *old_disp)
01592 {
01593   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01594 
01595   return this->signal_handler_->register_handler (signum,
01596                                                   new_sh,
01597                                                   new_disp,
01598                                                   old_sh,
01599                                                   old_disp);
01600 }
01601 
01602 int
01603 ACE_Dev_Poll_Reactor::register_handler (const ACE_Sig_Set &sigset,
01604                                         ACE_Event_Handler *new_sh,
01605                                         ACE_Sig_Action *new_disp)
01606 {
01607   ACE_TRACE ("ACE_Dev_Poll_Reactor::register_handler");
01608 
01609   int result = 0;
01610 
01611 #if (ACE_NSIG > 0)  &&  !defined (CHORUS)
01612 
01613   for (int s = 1; s < ACE_NSIG; ++s)
01614     if (sigset.is_member (s)
01615         && this->signal_handler_->register_handler (s,
01616                                                     new_sh,
01617                                                     new_disp) == -1)
01618       result = -1;
01619 
01620 #else  /* ACE_NSIG <= 0  ||  CHORUS */
01621 
01622   ACE_UNUSED_ARG (sigset);
01623   ACE_UNUSED_ARG (new_sh);
01624   ACE_UNUSED_ARG (new_disp);
01625 
01626 #endif /* ACE_NSIG <= 0  ||  CHORUS */
01627 
01628   return result;
01629 }
01630 
01631 int
01632 ACE_Dev_Poll_Reactor::remove_handler (ACE_Event_Handler *handler,
01633                                       ACE_Reactor_Mask mask)
01634 {
01635   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01636 
01637   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01638 
01639   return this->remove_handler_i (handler->get_handle (), mask);
01640 }
01641 
01642 int
01643 ACE_Dev_Poll_Reactor::remove_handler (ACE_HANDLE handle,
01644                                       ACE_Reactor_Mask mask)
01645 {
01646   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01647 
01648   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01649 
01650   return this->remove_handler_i (handle, mask);
01651 }
01652 
01653 int
01654 ACE_Dev_Poll_Reactor::remove_handler_i (ACE_HANDLE handle,
01655                                         ACE_Reactor_Mask mask)
01656 {
01657   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler_i");
01658 
01659 
01660   ACE_Event_Handler *eh = this->handler_rep_.find (handle);
01661 
01662   if (eh == 0
01663       || this->mask_ops_i (handle, mask, ACE_Reactor::CLR_MASK) != 0)
01664     return -1;
01665 
01666   // If there are no longer any outstanding events on the given handle
01667   // then remove it from the handler repository.
01668   if (this->handler_rep_.mask (handle) == ACE_Event_Handler::NULL_MASK
01669       && this->handler_rep_.unbind (handle) != 0)
01670     return -1;
01671 
01672   if (ACE_BIT_DISABLED (mask, ACE_Event_Handler::DONT_CALL))
01673     (void) eh->handle_close (handle, mask);
01674 
01675   // Note the fact that we've changed the state of the wait_set,
01676   // i.e. the "interest set," which is used by the dispatching loop to
01677   // determine whether it can keep going or if it needs to reconsult
01678   // /dev/poll or /dev/epoll.
01679   // this->state_changed_ = 1;
01680 
01681   return 0;
01682 }
01683 
01684 int
01685 ACE_Dev_Poll_Reactor::remove_handler (const ACE_Handle_Set &handle_set,
01686                                       ACE_Reactor_Mask mask)
01687 {
01688   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01689 
01690   ACE_Handle_Set_Iterator handle_iter (handle_set);
01691 
01692   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01693 
01694   // @@ It might be more efficient to construct a pollfd array and
01695   //    pass it to the write() call in register_handler_i() only once,
01696   //    instead of calling write() (a system call) once for each file
01697   //    descriptor.
01698 
01699   for (ACE_HANDLE h = handle_iter ();
01700        h != ACE_INVALID_HANDLE;
01701        h = handle_iter ())
01702     if (this->remove_handler_i (h, mask) == -1)
01703       return -1;
01704 
01705   return 0;
01706 }
01707 
01708 int
01709 ACE_Dev_Poll_Reactor::remove_handler (int signum,
01710                                       ACE_Sig_Action *new_disp,
01711                                       ACE_Sig_Action *old_disp,
01712                                       int sigkey)
01713 {
01714   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01715 
01716   return this->signal_handler_->remove_handler (signum,
01717                                                 new_disp,
01718                                                 old_disp,
01719                                                 sigkey);
01720 }
01721 
01722 int
01723 ACE_Dev_Poll_Reactor::remove_handler (const ACE_Sig_Set &sigset)
01724 {
01725   ACE_TRACE ("ACE_Dev_Poll_Reactor::remove_handler");
01726 
01727   int result = 0;
01728 
01729 #if (ACE_NSIG > 0)  &&  !defined (CHORUS)
01730 
01731   for (int s = 1; s < ACE_NSIG; ++s)
01732     if (sigset.is_member (s)
01733         && this->signal_handler_->remove_handler (s) == -1)
01734       result = -1;
01735 
01736 #else  /* ACE_NSIG <= 0  ||  CHORUS */
01737 
01738   ACE_UNUSED_ARG (sigset);
01739 
01740 #endif /* ACE_NSIG <= 0  ||  CHORUS */
01741 
01742   return result;
01743 }
01744 
01745 int
01746 ACE_Dev_Poll_Reactor::suspend_handler (ACE_Event_Handler *event_handler)
01747 {
01748   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01749 
01750   if (event_handler == 0)
01751     {
01752       errno = EINVAL;
01753       return -1;
01754     }
01755 
01756   ACE_HANDLE handle = event_handler->get_handle ();
01757 
01758   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01759 
01760   return this->suspend_handler_i (handle);
01761 }
01762 
01763 int
01764 ACE_Dev_Poll_Reactor::suspend_handler (ACE_HANDLE handle)
01765 {
01766   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01767 
01768   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01769 
01770   return this->suspend_handler_i (handle);
01771 }
01772 
01773 int
01774 ACE_Dev_Poll_Reactor::suspend_handler (const ACE_Handle_Set &handles)
01775 {
01776   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler");
01777 
01778   ACE_Handle_Set_Iterator handle_iter (handles);
01779   ACE_HANDLE h;
01780 
01781   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01782 
01783   while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
01784     if (this->suspend_handler_i (h) == -1)
01785       return -1;
01786 
01787   return 0;
01788 }
01789 
01790 int
01791 ACE_Dev_Poll_Reactor::suspend_handlers (void)
01792 {
01793   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handlers");
01794 
01795   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01796 
01797   size_t len = this->handler_rep_.size ();
01798 
01799   for (size_t i = 0; i < len; ++i)
01800     if (this->handler_rep_.suspended (i) == 0
01801         && this->suspend_handler_i (i) != 0)
01802       return -1;
01803 
01804   return 0;
01805 }
01806 
01807 int
01808 ACE_Dev_Poll_Reactor::suspend_handler_i (ACE_HANDLE handle)
01809 {
01810   ACE_TRACE ("ACE_Dev_Poll_Reactor::suspend_handler_i");
01811 
01812   if (this->handler_rep_.find (handle) == 0)
01813     return -1;
01814 
01815   if (this->handler_rep_.suspended (handle))
01816     return 0;  // Already suspended.  @@ Should this be an error?
01817 
01818   struct pollfd pfd[1];
01819 
01820   pfd[0].fd      = handle;
01821   pfd[0].events  = POLLREMOVE;
01822   pfd[0].revents = 0;
01823 
01824   // Remove the handle from the "interest set."
01825   //
01826   // Note that the associated event handler is still in the handler
01827   // repository, but no events will be polled on the given handle thus
01828   // no event will be dispatched to the event handler.
01829   if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))
01830     return -1;
01831 
01832   this->handler_rep_.suspend (handle);
01833 
01834   return 0;
01835 }
01836 
01837 int
01838 ACE_Dev_Poll_Reactor::resume_handler (ACE_Event_Handler *event_handler)
01839 {
01840   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01841 
01842   if (event_handler == 0)
01843     {
01844       errno = EINVAL;
01845       return -1;
01846     }
01847 
01848   ACE_HANDLE handle = event_handler->get_handle ();
01849 
01850   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01851 
01852   return this->resume_handler_i (handle);
01853 }
01854 
01855 int
01856 ACE_Dev_Poll_Reactor::resume_handler (ACE_HANDLE handle)
01857 {
01858   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01859 
01860   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01861 
01862   return this->resume_handler_i (handle);
01863 }
01864 
01865 int
01866 ACE_Dev_Poll_Reactor::resume_handler (const ACE_Handle_Set &handles)
01867 {
01868   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler");
01869 
01870   ACE_Handle_Set_Iterator handle_iter (handles);
01871   ACE_HANDLE h;
01872 
01873   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01874 
01875   while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
01876     if (this->resume_handler_i (h) == -1)
01877       return -1;
01878 
01879   return 0;
01880 }
01881 
01882 int
01883 ACE_Dev_Poll_Reactor::resume_handlers (void)
01884 {
01885   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handlers");
01886 
01887   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01888 
01889   size_t len = this->handler_rep_.size ();
01890 
01891   for (size_t i = 0; i < len; ++i)
01892     if (this->handler_rep_.suspended (i)
01893         && this->resume_handler_i (i) != 0)
01894       return -1;
01895 
01896   return 0;
01897 }
01898 
01899 int
01900 ACE_Dev_Poll_Reactor::resume_handler_i (ACE_HANDLE handle)
01901 {
01902   ACE_TRACE ("ACE_Dev_Poll_Reactor::resume_handler_i");
01903 
01904   if (this->handler_rep_.find (handle) == 0
01905       && this->handler_rep_.suspended (handle) == 0)
01906     return -1;
01907 
01908   ACE_Reactor_Mask mask = this->handler_rep_.mask (handle);
01909 
01910   if (mask == ACE_Event_Handler::NULL_MASK)
01911     return -1;
01912 
01913   struct pollfd pfd[1];
01914 
01915   pfd[0].fd      = handle;
01916   pfd[0].events  = this->reactor_mask_to_poll_event (mask);
01917   pfd[0].revents = 0;
01918 
01919   // Place the handle back in to the "interest set."
01920   //
01921   // Events for the given handle will once again be polled.
01922   if (ACE_OS::write (this->poll_fd_, pfd, sizeof (pfd)) != sizeof (pfd))
01923     return -1;
01924 
01925   this->handler_rep_.resume (handle);
01926 
01927   return 0;
01928 }
01929 
01930 int
01931 ACE_Dev_Poll_Reactor::resumable_handler (void)
01932 {
01933   // @@ Is this correct?
01934 
01935   return 0;
01936 }
01937 
01938 int
01939 ACE_Dev_Poll_Reactor::uses_event_associations (void)
01940 {
01941   // Since the Dev_Poll_Reactor does not do any event associations,
01942   // this method always return zero.
01943   return 0;
01944 }
01945 
01946 long
01947 ACE_Dev_Poll_Reactor::schedule_timer (ACE_Event_Handler *event_handler,
01948                                       const void *arg,
01949                                       const ACE_Time_Value &delay,
01950                                       const ACE_Time_Value &interval)
01951 {
01952   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_timer");
01953 
01954   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01955 
01956   return this->timer_queue_->schedule (
01957            event_handler,
01958            arg,
01959            this->timer_queue_->gettimeofday () + delay,
01960            interval);
01961 }
01962 
01963 int
01964 ACE_Dev_Poll_Reactor::reset_timer_interval (long timer_id,
01965                                             const ACE_Time_Value &interval)
01966 {
01967   ACE_TRACE ("ACE_Dev_Poll_Reactor::reset_timer_interval");
01968 
01969   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01970 
01971   return this->timer_queue_->reset_interval (timer_id, interval);
01972 }
01973 
01974 int
01975 ACE_Dev_Poll_Reactor::cancel_timer (ACE_Event_Handler *event_handler,
01976                                     int dont_call_handle_close)
01977 {
01978   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_timer");
01979 
01980   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01981 
01982   return (this->timer_queue_ == 0
01983           ? 0
01984           : this->timer_queue_->cancel (event_handler,
01985                                         dont_call_handle_close));
01986 }
01987 
01988 int
01989 ACE_Dev_Poll_Reactor::cancel_timer (long timer_id,
01990                                     const void **arg,
01991                                     int dont_call_handle_close)
01992 {
01993   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_timer");
01994 
01995   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01996 
01997   return (this->timer_queue_ == 0
01998           ? 0
01999           : this->timer_queue_->cancel (timer_id,
02000                                         arg,
02001                                         dont_call_handle_close));
02002 }
02003 
02004 int
02005 ACE_Dev_Poll_Reactor::schedule_wakeup (ACE_Event_Handler *eh,
02006                                        ACE_Reactor_Mask mask)
02007 {
02008   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_wakeup");
02009 
02010   return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::ADD_MASK);
02011 }
02012 
02013 int
02014 ACE_Dev_Poll_Reactor::schedule_wakeup (ACE_HANDLE handle,
02015                                        ACE_Reactor_Mask mask)
02016 {
02017   ACE_TRACE ("ACE_Dev_Poll_Reactor::schedule_wakeup");
02018 
02019   return this->mask_ops (handle, mask, ACE_Reactor::ADD_MASK);
02020 }
02021 
02022 int
02023 ACE_Dev_Poll_Reactor::cancel_wakeup (ACE_Event_Handler *eh,
02024                                      ACE_Reactor_Mask mask)
02025 {
02026   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_wakeup");
02027 
02028   return this->mask_ops (eh->get_handle (), mask, ACE_Reactor::CLR_MASK);
02029 }
02030 
02031 int
02032 ACE_Dev_Poll_Reactor::cancel_wakeup (ACE_HANDLE handle,
02033                                      ACE_Reactor_Mask mask)
02034 {
02035   ACE_TRACE ("ACE_Dev_Poll_Reactor::cancel_wakeup");
02036 
02037   return this->mask_ops (handle, mask, ACE_Reactor::CLR_MASK);
02038 }
02039 
02040 int
02041 ACE_Dev_Poll_Reactor::notify (ACE_Event_Handler *eh,
02042                               ACE_Reactor_Mask mask,
02043                               ACE_Time_Value *timeout)
02044 {
02045   ACE_TRACE ("ACE_Dev_Poll_Reactor::notify");
02046 
02047   ssize_t n = 0;
02048 
02049   // Pass over both the Event_Handler *and* the mask to allow the
02050   // caller to dictate which Event_Handler method the receiver
02051   // invokes.  Note that this call can timeout.
02052 
02053   n = this->notify_handler_->notify (eh, mask, timeout);
02054 
02055   return n == -1 ? -1 : 0;
02056 }
02057 
02058 void
02059 ACE_Dev_Poll_Reactor::max_notify_iterations (int iterations)
02060 {
02061   ACE_TRACE ("ACE_Dev_Poll_Reactor::max_notify_iterations");
02062 
02063   ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_));
02064 
02065   this->notify_handler_->max_notify_iterations (iterations);
02066 }
02067 
02068 int
02069 ACE_Dev_Poll_Reactor::max_notify_iterations (void)
02070 {
02071   ACE_TRACE ("ACE_Dev_Poll_Reactor::max_notify_iterations");
02072 
02073   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02074 
02075   return this->notify_handler_->max_notify_iterations ();
02076 }
02077 
02078 int
02079 ACE_Dev_Poll_Reactor::purge_pending_notifications (ACE_Event_Handler * eh,
02080                                                    ACE_Reactor_Mask mask)
02081 {
02082   if (this->notify_handler_ == 0)
02083     return 0;
02084 
02085   return this->notify_handler_->purge_pending_notifications (eh, mask);
02086 }
02087 
02088 int
02089 ACE_Dev_Poll_Reactor::handler (ACE_HANDLE handle,
02090                                ACE_Reactor_Mask mask,
02091                                ACE_Event_Handler **event_handler)
02092 {
02093   ACE_TRACE ("ACE_Dev_Poll_Reactor::handler");
02094 
02095   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02096 
02097   ACE_Event_Handler *h = this->handler_rep_.find (handle);
02098 
02099   if (h != 0
02100       && ACE_BIT_CMP_MASK (this->handler_rep_.mask (handle),
02101                            mask,  // Compare all bits in the mask
02102                            mask))
02103     {
02104       if (event_handler != 0)
02105         *event_handler = h;
02106 
02107       return 0;
02108     }
02109 
02110   return -1;
02111 }
02112 
02113 int
02114 ACE_Dev_Poll_Reactor::handler (int signum,
02115                                ACE_Event_Handler **eh)
02116 {
02117   ACE_TRACE ("ACE_Dev_Poll_Reactor::handler");
02118 
02119   ACE_Event_Handler *handler = this->signal_handler_->handler (signum);
02120 
02121   if (handler == 0)
02122     return -1;
02123   else if (eh != 0)
02124     *eh = handler;
02125 
02126   return 0;
02127 }
02128 
02129 int
02130 ACE_Dev_Poll_Reactor::initialized (void)
02131 {
02132   ACE_TRACE ("ACE_Dev_Poll_Reactor::initialized");
02133 
02134   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02135 
02136   return this->initialized_;
02137 }
02138 
02139 size_t
02140 ACE_Dev_Poll_Reactor::size (void) const
02141 {
02142   return this->size_;
02143 }
02144 
02145 ACE_Lock &
02146 ACE_Dev_Poll_Reactor::lock (void)
02147 {
02148   ACE_TRACE ("ACE_Dev_Poll_Reactor::lock");
02149 
02150   return this->lock_adapter_;
02151 }
02152 
02153 void
02154 ACE_Dev_Poll_Reactor::wakeup_all_threads (void)
02155 {
02156   ACE_TRACE ("ACE_Dev_Poll_Reactor::wakeup_all_threads");
02157 
02158 #if 0
02159   // Send a notification, but don't block if there's no one to receive
02160   // it.
02161   this->notify (0,
02162                 ACE_Event_Handler::NULL_MASK,
02163                 (ACE_Time_Value *) &ACE_Time_Value::zero);
02164 #endif  /* 0 */
02165 }
02166 
02167 int
02168 ACE_Dev_Poll_Reactor::owner (ACE_thread_t /* new_owner */,
02169                              ACE_thread_t * /* old_owner */)
02170 {
02171   ACE_TRACE ("ACE_Dev_Poll_Reactor::owner");
02172 
02173   // There is no need to set the owner of the event loop.  Multiple
02174   // threads may invoke the event loop simulataneously.
02175 
02176   return 0;
02177 }
02178 
02179 int
02180 ACE_Dev_Poll_Reactor::owner (ACE_thread_t * /* owner */)
02181 {
02182   ACE_TRACE ("ACE_Dev_Poll_Reactor::owner");
02183 
02184   // There is no need to set the owner of the event loop.  Multiple
02185   // threads may invoke the event loop simulataneously.
02186 
02187   return 0;
02188 }
02189 
02190 int
02191 ACE_Dev_Poll_Reactor::restart (void)
02192 {
02193   ACE_TRACE ("ACE_Dev_Poll_Reactor::restart");
02194 
02195   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02196 
02197   return this->restart_;
02198 }
02199 
02200 int
02201 ACE_Dev_Poll_Reactor::restart (int r)
02202 {
02203   ACE_TRACE ("ACE_Dev_Poll_Reactor::restart");
02204 
02205   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02206 
02207   int current_value = this->restart_;
02208   this->restart_ = r;
02209   return current_value;
02210 }
02211 
02212 void
02213 ACE_Dev_Poll_Reactor::requeue_position (int)
02214 {
02215   ACE_TRACE ("ACE_Dev_Poll_Reactor::requeue_position");
02216 }
02217 
02218 int
02219 ACE_Dev_Poll_Reactor::requeue_position (void)
02220 {
02221   ACE_TRACE ("ACE_Dev_Poll_Reactor::requeue_position");
02222 
02223   ACE_NOTSUP_RETURN (-1);
02224 }
02225 
02226 int
02227 ACE_Dev_Poll_Reactor::mask_ops (ACE_Event_Handler *event_handler,
02228                                 ACE_Reactor_Mask mask,
02229                                 int ops)
02230 {
02231   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops");
02232 
02233   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02234 
02235   return this->mask_ops_i (event_handler->get_handle (), mask, ops);
02236 }
02237 
02238 int
02239 ACE_Dev_Poll_Reactor::mask_ops (ACE_HANDLE handle,
02240                                 ACE_Reactor_Mask mask,
02241                                 int ops)
02242 {
02243   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops");
02244 
02245   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
02246 
02247   return this->mask_ops_i (handle, mask, ops);
02248 }
02249 
02250 int
02251 ACE_Dev_Poll_Reactor::mask_ops_i (ACE_HANDLE handle,
02252                                   ACE_Reactor_Mask mask,
02253                                   int ops)
02254 {
02255   ACE_TRACE ("ACE_Dev_Poll_Reactor::mask_ops_i");
02256 
02257   if (this->handler_rep_.handle_in_range (handle) == 0)
02258     return -1;
02259 
02260   // Block out all signals until method returns.
02261   ACE_Sig_Guard sb;
02262 
02263   ACE_Reactor_Mask old_mask = this->handler_rep_.mask (handle);
02264   ACE_Reactor_Mask new_mask = old_mask;
02265 
02266   // Perform GET, CLR, SET, and ADD operations on the interest/wait
02267   // set and the suspend set (if necessary).
02268   //
02269   // GET = 1, Retrieve current value
02270   // SET = 2, Set value of bits to new mask (changes the entire mask)
02271   // ADD = 3, Bitwise "or" the value into the mask (only changes
02272   //          enabled bits)
02273   // CLR = 4  Bitwise "and" the negation of the value out of the mask
02274   //          (only changes enabled bits)
02275   //
02276   // Returns the original mask.
02277 
02278   switch (ops)
02279     {
02280     case ACE_Reactor::GET_MASK:
02281       // The work for this operation is done in all cases at the
02282       // begining of the function.
02283       return old_mask;
02284 
02285     case ACE_Reactor::CLR_MASK:
02286       ACE_CLR_BITS (new_mask, mask);
02287       break;
02288 
02289     case ACE_Reactor::SET_MASK:
02290       new_mask = mask;
02291       break;
02292 
02293     case ACE_Reactor::ADD_MASK:
02294       ACE_SET_BITS (new_mask, mask);
02295       break;
02296 
02297     default:
02298       return -1;
02299     }
02300 
02301   /// Reset the mask for the given handle.
02302   this->handler_rep_.mask (handle, new_mask);
02303 
02304   if (this->handler_rep_.suspended (handle) == 0)
02305     {
02306       // Only attempt to alter events for the handle from the
02307       // "interest set" if it hasn't been suspended.
02308 
02309       short events = this->reactor_mask_to_poll_event (new_mask);
02310 
02311 #if defined (sun)
02312       // Apparently events cannot be updated on-the-fly on Solaris so
02313       // remove the existing events, and then add the new ones.
02314       struct pollfd pfd[2];
02315 
02316       pfd[0].fd      = handle;
02317       pfd[0].events  = POLLREMOVE;
02318       pfd[0].revents = 0;
02319       pfd[1].fd      = (events == POLLREMOVE ? ACE_INVALID_HANDLE : handle);
02320       pfd[1].events  = events;
02321       pfd[1].revents = 0;
02322 #else
02323       pollfd pfd[1];
02324 
02325       pfd[0].fd      = handle;
02326       pfd[0].events  = events;
02327       pfd[0].revents = 0;
02328 #endif  /* sun */
02329 
02330       // Change the events associated with the given file descriptor.
02331       if (ACE_OS::write (this->poll_fd_,
02332                          pfd,
02333                          sizeof (pfd)) != sizeof (pfd))
02334         return -1;
02335     }
02336 
02337   return old_mask;
02338 }
02339 
02340 int
02341 ACE_Dev_Poll_Reactor::ready_ops (ACE_Event_Handler * /* event_handler */,
02342                                  ACE_Reactor_Mask /* mask */,
02343                                  int /* ops */)
02344 {
02345   ACE_TRACE ("ACE_Dev_Poll_Reactor::ready_ops");
02346 
02347   // Since the Dev_Poll_Reactor uses the poll result buffer, the
02348   // ready_set cannot be directly manipulated outside of the event
02349   // loop.
02350   ACE_NOTSUP_RETURN (-1);
02351 }
02352 
02353 int
02354 ACE_Dev_Poll_Reactor::ready_ops (ACE_HANDLE /* handle */,
02355                                  ACE_Reactor_Mask /* mask */,
02356                                  int /* ops */)
02357 {
02358   ACE_TRACE ("ACE_Dev_Poll_Reactor::ready_ops");
02359 
02360   // Since the Dev_Poll_Reactor uses the poll result buffer, the
02361   // ready_set cannot be directly manipulated outside of the event
02362   // loop.
02363   ACE_NOTSUP_RETURN (-1);
02364 }
02365 
02366 void
02367 ACE_Dev_Poll_Reactor::dump (void) const
02368 {
02369   ACE_TRACE ("ACE_Dev_Poll_Reactor::dump");
02370 
02371   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
02372   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("restart_ = %d\n"), this->restart_));
02373   ACE_DEBUG ((LM_DEBUG,
02374               ACE_LIB_TEXT ("initialized_ = %d"),
02375               this->initialized_));
02376   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("poll_fd_ = %d"), this->poll_fd_));
02377   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("size_ = %u"), this->size_));
02378   ACE_DEBUG ((LM_DEBUG,
02379               ACE_LIB_TEXT ("deactivated_ = %d"),
02380               this->deactivated_));
02381   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
02382 }
02383 
02384 short
02385 ACE_Dev_Poll_Reactor::reactor_mask_to_poll_event (ACE_Reactor_Mask mask)
02386 {
02387   ACE_TRACE ("ACE_Dev_Poll_Reactor::reactor_mask_to_poll_event");
02388 
02389   if (mask == ACE_Event_Handler::NULL_MASK)
02390     return POLLREMOVE;  // No event.  Remove from interest set.
02391 
02392   short events = 0;
02393 
02394   // READ, ACCEPT, and CONNECT flag will place the handle in the
02395   // read set.
02396   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
02397       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
02398       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
02399     {
02400       ACE_SET_BITS (events, POLLIN);
02401     }
02402 
02403   // WRITE and CONNECT flag will place the handle in the write set.
02404   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK)
02405       || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
02406     {
02407       ACE_SET_BITS (events, POLLOUT);
02408     }
02409 
02410   // EXCEPT flag will place the handle in the except set.
02411   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
02412     {
02413       ACE_SET_BITS (events, POLLPRI);
02414     }
02415 
02416   return events;
02417 }
02418 
02419 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
02420 
02421 template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;
02422 template class ACE_Reverse_Lock<ACE_SYNCH_MUTEX>;
02423 
02424 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
02425 
02426 #pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>
02427 #pragma instantiate ACE_Reverse_Lock<ACE_SYNCH_MUTEX>;
02428 
02429 #endif  /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
02430 
02431 #endif  /* ACE_HAS_EVENT_POLL || ACE_HAS_DEV_POLL */

Generated on Mon Jun 16 11:19:35 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002