Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Select_Reactor_Base.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // $Id: Select_Reactor_Base.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $
00003 
00004 #include "ace/Select_Reactor_Base.h"
00005 #include "ace/Reactor.h"
00006 #include "ace/Thread.h"
00007 #include "ace/Synch_T.h"
00008 #include "ace/SOCK_Acceptor.h"
00009 #include "ace/SOCK_Connector.h"
00010 #include "ace/Timer_Heap.h"
00011 #include "ace/Log_Msg.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.i"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 ACE_RCSID(ace, Select_Reactor_Base, "$Id: Select_Reactor_Base.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $")
00018 
00019 #if defined (ACE_WIN32)
00020 #define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_)
00021 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_)
00022 #else
00023 #define ACE_SELECT_REACTOR_HANDLE(H) (H)
00024 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)])
00025 #endif /* ACE_WIN32 */
00026 
00027 // Performs sanity checking on the ACE_HANDLE.
00028 
00029 int
00030 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00031 {
00032   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00033 #if defined (ACE_WIN32)
00034   // It's too expensive to perform more exhaustive validity checks on
00035   // Win32 due to the way that they implement SOCKET HANDLEs.
00036   if (handle == ACE_INVALID_HANDLE)
00037 #else /* !ACE_WIN32 */
00038     if (handle < 0 || handle >= this->max_size_)
00039 #endif /* ACE_WIN32 */
00040       {
00041         errno = EINVAL;
00042         return 1;
00043       }
00044     else
00045       return 0;
00046 }
00047 
00048 // Performs sanity checking on the ACE_HANDLE.
00049 
00050 int
00051 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00052 {
00053   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00054 #if defined (ACE_WIN32)
00055   // It's too expensive to perform more exhaustive validity checks on
00056   // Win32 due to the way that they implement SOCKET HANDLEs.
00057   if (handle != ACE_INVALID_HANDLE)
00058 #else /* !ACE_WIN32 */
00059     if (handle >= 0 && handle < this->max_handlep1_)
00060 #endif /* ACE_WIN32 */
00061       return 1;
00062     else
00063       {
00064         errno = EINVAL;
00065         return 0;
00066       }
00067 }
00068 
00069 size_t
00070 ACE_Select_Reactor_Handler_Repository::max_handlep1 (void)
00071 {
00072   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1");
00073 
00074   return this->max_handlep1_;
00075 }
00076 
00077 int
00078 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00079 {
00080   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00081   this->max_size_ = size;
00082   this->max_handlep1_ = 0;
00083 
00084 #if defined (ACE_WIN32)
00085   // Try to allocate the memory.
00086   ACE_NEW_RETURN (this->event_handlers_,
00087                   ACE_Event_Tuple[size],
00088                   -1);
00089 
00090   // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }.
00091   for (size_t h = 0; h < size; h++)
00092     {
00093       ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE;
00094       ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00095     }
00096 #else
00097   // Try to allocate the memory.
00098   ACE_NEW_RETURN (this->event_handlers_,
00099                   ACE_Event_Handler *[size],
00100                   -1);
00101 
00102   // Initialize the ACE_Event_Handler * to NULL.
00103   for (size_t h = 0; h < size; h++)
00104     ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00105 #endif /* ACE_WIN32 */
00106 
00107   // Try to increase the number of handles if <size> is greater than
00108   // the current limit.
00109   return ACE::set_handle_limit (ACE_static_cast (int, size));
00110 }
00111 
00112 // Initialize a repository of the appropriate <size>.
00113 
00114 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00115   : select_reactor_ (select_reactor),
00116     max_size_ (0),
00117     max_handlep1_ (0),
00118     event_handlers_ (0)
00119 {
00120   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00121 }
00122 
00123 int
00124 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00125 {
00126   // Unbind all of the <handle, ACE_Event_Handler>s.
00127   for (int handle = 0;
00128        handle < this->max_handlep1_;
00129        handle++)
00130     this->unbind (ACE_SELECT_REACTOR_HANDLE (handle),
00131                   ACE_Event_Handler::ALL_EVENTS_MASK);
00132 
00133   return 0;
00134 }
00135 
00136 int
00137 ACE_Select_Reactor_Handler_Repository::close (void)
00138 {
00139   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00140 
00141   if (this->event_handlers_ != 0)
00142     {
00143       this->unbind_all ();
00144 
00145       delete [] this->event_handlers_;
00146       this->event_handlers_ = 0;
00147     }
00148   return 0;
00149 }
00150 
00151 // Return the <ACE_Event_Handler *> associated with the <handle>.
00152 
00153 ACE_Event_Handler *
00154 ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00155                                              size_t *index_p)
00156 {
00157   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find");
00158 
00159   ACE_Event_Handler *eh = 0;
00160   ssize_t i;
00161 
00162   // Only bother to search for the <handle> if it's in range.
00163   if (this->handle_in_range (handle))
00164     {
00165 #if defined (ACE_WIN32)
00166       i = 0;
00167 
00168       for (; i < this->max_handlep1_; i++)
00169         if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00170           {
00171             eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i);
00172             break;
00173           }
00174 #else
00175       i = handle;
00176 
00177       eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
00178 #endif /* ACE_WIN32 */
00179     }
00180   else
00181     // g++ can't figure out that <i> won't be used below if the handle
00182     // is out of range, so keep it happy by defining <i> here . . .
00183     i = 0;
00184 
00185   if (eh != 0)
00186     {
00187       if (index_p != 0)
00188         *index_p = i;
00189     }
00190   else
00191     errno = ENOENT;
00192 
00193   return eh;
00194 }
00195 
00196 // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
00197 
00198 int
00199 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00200                                              ACE_Event_Handler *event_handler,
00201                                              ACE_Reactor_Mask mask)
00202 {
00203   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00204 
00205   if (handle == ACE_INVALID_HANDLE)
00206     handle = event_handler->get_handle ();
00207 
00208   if (this->invalid_handle (handle))
00209     return -1;
00210 
00211 #if defined (ACE_WIN32)
00212   int assigned_slot = -1;
00213 
00214   for (ssize_t i = 0; i < this->max_handlep1_; i++)
00215     {
00216       // Found it, so let's just reuse this location.
00217       if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00218         {
00219           assigned_slot = i;
00220           break;
00221         }
00222       // Here's the first free slot, so let's take it.
00223       else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE
00224                && assigned_slot == -1)
00225         assigned_slot = i;
00226     }
00227 
00228   if (assigned_slot > -1)
00229     // We found a free spot, let's reuse it.
00230     {
00231       ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
00232       ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
00233     }
00234   else if (this->max_handlep1_ < this->max_size_)
00235     {
00236       // Insert at the end of the active portion.
00237       ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;
00238       ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler;
00239       this->max_handlep1_++;
00240     }
00241   else
00242     {
00243       // No more room at the inn!
00244       errno = ENOMEM;
00245       return -1;
00246     }
00247 #else
00248   ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
00249 
00250   if (this->max_handlep1_ < handle + 1)
00251     this->max_handlep1_ = handle + 1;
00252 #endif /* ACE_WIN32 */
00253 
00254   if (this->select_reactor_.is_suspended_i (handle))
00255     {
00256       this->select_reactor_.bit_ops (handle,
00257                                      mask,
00258                                      this->select_reactor_.suspend_set_,
00259                                      ACE_Reactor::ADD_MASK);
00260     }
00261   else
00262     {
00263       this->select_reactor_.bit_ops (handle,
00264                                      mask,
00265                                      this->select_reactor_.wait_set_,
00266                                      ACE_Reactor::ADD_MASK);
00267 
00268       // Note the fact that we've changed the state of the <wait_set_>,
00269       // which is used by the dispatching loop to determine whether it can
00270       // keep going or if it needs to reconsult select().
00271       this->select_reactor_.state_changed_ = 1;
00272     }
00273 
00274   /*
00275   // @@NOTE: We used to do this in earlier versions of ACE+TAO. But
00276   // this is totally wrong..
00277   // Clear any suspend masks for it too.
00278   this->select_reactor_.bit_ops (handle,
00279                                  mask,
00280                                  this->select_reactor_.suspend_set_,
00281                                  ACE_Reactor::CLR_MASK);
00282   */
00283 
00284   return 0;
00285 }
00286 
00287 // Remove the binding of <ACE_HANDLE>.
00288 
00289 int
00290 ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
00291                                                ACE_Reactor_Mask mask)
00292 {
00293   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00294 
00295   size_t slot;
00296   ACE_Event_Handler *eh = this->find (handle, &slot);
00297 
00298   if (eh == 0)
00299     return -1;
00300 
00301   // Clear out the <mask> bits in the Select_Reactor's wait_set.
00302   this->select_reactor_.bit_ops (handle,
00303                                  mask,
00304                                  this->select_reactor_.wait_set_,
00305                                  ACE_Reactor::CLR_MASK);
00306 
00307   // And suspend_set.
00308   this->select_reactor_.bit_ops (handle,
00309                                  mask,
00310                                  this->select_reactor_.suspend_set_,
00311                                  ACE_Reactor::CLR_MASK);
00312 
00313   // Note the fact that we've changed the state of the <wait_set_>,
00314   // which is used by the dispatching loop to determine whether it can
00315   // keep going or if it needs to reconsult select().
00316   this->select_reactor_.state_changed_ = 1;
00317 
00318   // Close down the <Event_Handler> unless we've been instructed not
00319   // to.
00320   if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00321     eh->handle_close (handle, mask);
00322 
00323   // If there are no longer any outstanding events on this <handle>
00324   // then we can totally shut down the Event_Handler.
00325 
00326   int has_any_wait_mask =
00327     (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328      || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329      || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330   int has_any_suspend_mask =
00331     (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332      || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333      || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334 
00335   if (!has_any_wait_mask && !has_any_suspend_mask)
00336 #if defined (ACE_WIN32)
00337     {
00338       ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
00339       ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
00340 
00341       if (this->max_handlep1_ == (int) slot + 1)
00342         {
00343           // We've deleted the last entry (i.e., i + 1 == the current
00344           // size of the array), so we need to figure out the last
00345           // valid place in the array that we should consider in
00346           // subsequent searches.
00347 
00348           int i;
00349 
00350           for (i = this->max_handlep1_ - 1;
00351                i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
00352                i--)
00353             continue;
00354 
00355           this->max_handlep1_ = i + 1;
00356         }
00357     }
00358 #else
00359   {
00360     ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0;
00361 
00362     if (this->max_handlep1_ == handle + 1)
00363       {
00364         // We've deleted the last entry, so we need to figure out
00365         // the last valid place in the array that is worth looking
00366         // at.
00367         ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set ();
00368         ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set ();
00369         ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set ();
00370 
00371         ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00372         ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00373         ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00374 
00375         // Compute the maximum of six values.
00376         this->max_handlep1_ = wait_rd_max;
00377         if (this->max_handlep1_ < wait_wr_max)
00378           this->max_handlep1_ = wait_wr_max;
00379         if (this->max_handlep1_ < wait_ex_max)
00380           this->max_handlep1_ = wait_ex_max;
00381 
00382         if (this->max_handlep1_ < suspend_rd_max)
00383           this->max_handlep1_ = suspend_rd_max;
00384         if (this->max_handlep1_ < suspend_wr_max)
00385           this->max_handlep1_ = suspend_wr_max;
00386         if (this->max_handlep1_ < suspend_ex_max)
00387           this->max_handlep1_ = suspend_ex_max;
00388 
00389         this->max_handlep1_++;
00390       }
00391   }
00392 #endif /* ACE_WIN32 */
00393 
00394   return 0;
00395 }
00396 
00397 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00398   (const ACE_Select_Reactor_Handler_Repository *s)
00399     : rep_ (s),
00400       current_ (-1)
00401 {
00402   this->advance ();
00403 }
00404 
00405 // Pass back the <next_item> that hasn't been seen in the Set.
00406 // Returns 0 when all items have been seen, else 1.
00407 
00408 int
00409 ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item)
00410 {
00411   int result = 1;
00412 
00413   if (this->current_ >= this->rep_->max_handlep1_)
00414     result = 0;
00415   else
00416     next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_,
00417                                                   this->current_);
00418   return result;
00419 }
00420 
00421 int
00422 ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const
00423 {
00424   return this->current_ >= this->rep_->max_handlep1_;
00425 }
00426 
00427 // Move forward by one element in the set.
00428 
00429 int
00430 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00431 {
00432   if (this->current_ < this->rep_->max_handlep1_)
00433     this->current_++;
00434 
00435   while (this->current_ < this->rep_->max_handlep1_)
00436     if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0)
00437       return 1;
00438     else
00439       this->current_++;
00440 
00441   return this->current_ < this->rep_->max_handlep1_;
00442 }
00443 
00444 // Dump the state of an object.
00445 
00446 void
00447 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00448 {
00449   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00450 
00451   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00452   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_));
00453   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_));
00454   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00455 }
00456 
00457 void
00458 ACE_Select_Reactor_Handler_Repository::dump (void) const
00459 {
00460   ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00461 
00462   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00463   ACE_DEBUG ((LM_DEBUG,
00464               ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"),
00465               this->max_handlep1_, this->max_size_));
00466   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("[")));
00467 
00468   ACE_Event_Handler *eh = 0;
00469 
00470   for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00471        iter.next (eh) != 0;
00472        iter.advance ())
00473     ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"),
00474                 eh, eh->get_handle ()));
00475 
00476   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]")));
00477   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00478 }
00479 
00480 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00481 
00482 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00483   : max_notify_iterations_ (-1)
00484 {
00485 }
00486 
00487 void
00488 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00489 {
00490   // Must always be > 0 or < 0 to optimize the loop exit condition.
00491   if (iterations == 0)
00492     iterations = 1;
00493 
00494   this->max_notify_iterations_ = iterations;
00495 }
00496 
00497 int
00498 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00499 {
00500   return this->max_notify_iterations_;
00501 }
00502 
00503 // purge_pending_notifications
00504 // Removes all entries from the notify_queue_ and each one that
00505 // matches <eh> is put on the free_queue_. The rest are saved on a
00506 // local queue and copied back to the notify_queue_ at the end.
00507 // Returns the number of entries removed. Returns -1 on error.
00508 // ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined.
00509 int
00510 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00511                                                         ACE_Reactor_Mask  mask )
00512 {
00513   ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00514 
00515 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00516 
00517   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00518 
00519   if (this->notify_queue_.is_empty ())
00520     return 0;
00521 
00522   ACE_Notification_Buffer *temp;
00523   ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00524 
00525   size_t queue_size = this->notify_queue_.size ();
00526   int number_purged = 0;
00527   size_t i;
00528   for (i = 0; i < queue_size; ++i)
00529     {
00530       if (-1 == this->notify_queue_.dequeue_head (temp))
00531         ACE_ERROR_RETURN ((LM_ERROR,
00532                            ACE_LIB_TEXT ("%p\n"),
00533                            ACE_LIB_TEXT ("dequeue_head")),
00534                           -1);
00535 
00536       // If this is not a Reactor notify (it is for a particular handler),
00537       // and it matches the specified handler (or purging all),
00538       // and applying the mask would totally eliminate the notification, then
00539       // release it and count the number purged.
00540       if ((0 != temp->eh_) &&
00541           (0 == eh || eh == temp->eh_) &&
00542           ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
00543                                                  // is left with nothing when
00544                                                  // applying the mask
00545       {
00546         if (-1 == this->free_queue_.enqueue_head (temp))
00547           ACE_ERROR_RETURN ((LM_ERROR,
00548                              ACE_LIB_TEXT ("%p\n"),
00549                              ACE_LIB_TEXT ("enqueue_head")),
00550                             -1);
00551         ++number_purged;
00552       }
00553       else
00554       {
00555         // To preserve it, move it to the local_queue.
00556         // But first, if this is not a Reactor notify (it is for a particularhandler),
00557         // and it matches the specified handler (or purging all), then
00558         // apply the mask
00559         if ((0 != temp->eh_) &&
00560             (0 == eh || eh == temp->eh_))
00561           ACE_CLR_BITS(temp->mask_, mask);
00562         if (-1 == local_queue.enqueue_head (temp))
00563           return -1;
00564       }
00565     }
00566 
00567   if (this->notify_queue_.size ())
00568     { // should be empty!
00569       ACE_ASSERT (0);
00570       return -1;
00571     }
00572 
00573   // now put it back in the notify queue
00574   queue_size = local_queue.size ();
00575   for (i = 0; i < queue_size; ++i)
00576     {
00577       if (-1 == local_queue.dequeue_head (temp))
00578         ACE_ERROR_RETURN ((LM_ERROR,
00579                            ACE_LIB_TEXT ("%p\n"),
00580                            ACE_LIB_TEXT ("dequeue_head")),
00581                           -1);
00582 
00583       if (-1 == this->notify_queue_.enqueue_head (temp))
00584         ACE_ERROR_RETURN ((LM_ERROR,
00585                            ACE_LIB_TEXT ("%p\n"),
00586                            ACE_LIB_TEXT ("enqueue_head")),
00587                           -1);
00588     }
00589 
00590   return number_purged;
00591 
00592 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00593   ACE_UNUSED_ARG (eh);
00594   ACE_UNUSED_ARG (mask);
00595   ACE_NOTSUP_RETURN (-1);
00596 #endif  /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
00597 }
00598 
00599 void
00600 ACE_Select_Reactor_Notify::dump (void) const
00601 {
00602   ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00603 
00604   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00605   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00606   this->notification_pipe_.dump ();
00607   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00608 }
00609 
00610 int
00611 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00612                                  ACE_Timer_Queue *,
00613                                  int disable_notify_pipe)
00614 {
00615   ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00616 
00617   if (disable_notify_pipe == 0)
00618     {
00619       this->select_reactor_ =
00620         ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
00621 
00622       if (select_reactor_ == 0)
00623         {
00624           errno = EINVAL;
00625           return -1;
00626         }
00627 
00628       if (this->notification_pipe_.open () == -1)
00629         return -1;
00630 #if defined (F_SETFD)
00631       ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00632       ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00633 #endif /* F_SETFD */
00634 
00635 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00636       ACE_Notification_Buffer *temp;
00637 
00638       ACE_NEW_RETURN (temp,
00639                       ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00640                       -1);
00641 
00642       if (this->alloc_queue_.enqueue_head (temp) == -1)
00643         {
00644           delete [] temp;
00645           return -1;
00646         }
00647 
00648       for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
00649         if (free_queue_.enqueue_head (temp + i) == -1)
00650           return -1;
00651 
00652 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00653 
00654       // There seems to be a Win32 bug with this...  Set this into
00655       // non-blocking mode.
00656       if (ACE::set_flags (this->notification_pipe_.read_handle (),
00657                           ACE_NONBLOCK) == -1)
00658         return -1;
00659       else
00660         return this->select_reactor_->register_handler
00661           (this->notification_pipe_.read_handle (),
00662            this,
00663            ACE_Event_Handler::READ_MASK);
00664     }
00665   else
00666     {
00667       this->select_reactor_ = 0;
00668       return 0;
00669     }
00670 }
00671 
00672 int
00673 ACE_Select_Reactor_Notify::close (void)
00674 {
00675   ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00676 
00677 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00678   // Free up the dynamically allocated resources.
00679   ACE_Notification_Buffer **b;
00680 
00681   for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00682        alloc_iter.next (b) != 0;
00683        alloc_iter.advance ())
00684     {
00685       delete [] *b;
00686       *b = 0;
00687     }
00688 
00689   this->alloc_queue_.reset ();
00690   this->notify_queue_.reset ();
00691   this->free_queue_.reset ();
00692 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00693 
00694   return this->notification_pipe_.close ();
00695 }
00696 
00697 int
00698 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
00699                                    ACE_Reactor_Mask mask,
00700                                    ACE_Time_Value *timeout)
00701 {
00702   ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00703 
00704   // Just consider this method a "no-op" if there's no
00705   // <ACE_Select_Reactor> configured.
00706   if (this->select_reactor_ == 0)
00707     return 0;
00708 
00709   ACE_Notification_Buffer buffer (eh, mask);
00710 
00711 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00712   // Artificial scope to limit the duration of the mutex.
00713   {
00714     // int notification_required = 0;
00715 
00716     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00717 
00718     // No pending notifications.
00719 
00720     // We will send notify for every message..
00721     // if (this->notify_queue_.is_empty ())
00722     //   notification_required = 1;
00723 
00724     ACE_Notification_Buffer *temp = 0;
00725 
00726     if (free_queue_.dequeue_head (temp) == -1)
00727       {
00728         // Grow the queue of available buffers.
00729         ACE_Notification_Buffer *temp1;
00730 
00731         ACE_NEW_RETURN (temp1,
00732                         ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00733                         -1);
00734 
00735         if (this->alloc_queue_.enqueue_head (temp1) == -1)
00736           {
00737             delete [] temp1;
00738             return -1;
00739           }
00740 
00741         // Start at 1 and enqueue only
00742         // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
00743         // the first one will be used right now.
00744         for (size_t i = 1;
00745              i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00746              i++)
00747           this->free_queue_.enqueue_head (temp1 + i);
00748 
00749         temp = temp1;
00750       }
00751 
00752     ACE_ASSERT (temp != 0);
00753     *temp = buffer;
00754 
00755     if (notify_queue_.enqueue_tail (temp) == -1)
00756       return -1;
00757   }
00758 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00759 
00760   ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00761                          (char *) &buffer,
00762                          sizeof buffer,
00763                          timeout);
00764   if (n == -1)
00765     return -1;
00766 
00767   return 0;
00768 }
00769 
00770 // Handles pending threads (if any) that are waiting to unblock the
00771 // Select_Reactor.
00772 
00773 int
00774 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00775                                                    ACE_Handle_Set &rd_mask)
00776 {
00777   ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00778 
00779   ACE_HANDLE read_handle =
00780     this->notification_pipe_.read_handle ();
00781 
00782   if (read_handle != ACE_INVALID_HANDLE
00783       && rd_mask.is_set (read_handle))
00784     {
00785       number_of_active_handles--;
00786       rd_mask.clr_bit (read_handle);
00787       return this->handle_input (read_handle);
00788     }
00789   else
00790     return 0;
00791 }
00792 
00793 
00794 ACE_HANDLE
00795 ACE_Select_Reactor_Notify::notify_handle (void)
00796 {
00797   ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00798 
00799   return this->notification_pipe_.read_handle ();
00800 }
00801 
00802 
00803 // Special trick to unblock <select> when updates occur in somewhere
00804 // other than the main <ACE_Select_Reactor> thread.  All we do is
00805 // write data to a pipe that the <ACE_Select_Reactor> is listening on.
00806 // Thanks to Paul Stephenson for suggesting this approach.
00807 int
00808 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00809 {
00810    // There is tonnes of code that can be abstracted...
00811 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00812   {
00813     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00814 
00815     ACE_Notification_Buffer *temp;
00816 
00817     ACE_UNUSED_ARG (buffer);
00818 
00819     // If the queue is empty just return 0
00820     if (notify_queue_.is_empty ())
00821       return 0;
00822 
00823     if (this->notify_queue_.dequeue_head (temp) == -1)
00824       ACE_ERROR_RETURN ((LM_ERROR,
00825                          ACE_LIB_TEXT ("%p\n"),
00826                          ACE_LIB_TEXT ("dequeue_head")),
00827                         -1);
00828     if (temp->eh_ != 0)
00829       {
00830         // If the queue had a buffer that has an event handler, put
00831         // the element  back in the queue and return a 1
00832         if (this->notify_queue_.enqueue_head (temp) == -1)
00833           {
00834             ACE_ERROR_RETURN ((LM_ERROR,
00835                                ACE_LIB_TEXT ("%p\n"),
00836                                ACE_LIB_TEXT ("enque_head")),
00837                               -1);
00838           }
00839 
00840         return 1;
00841       }
00842     // Else put the element in the free queue
00843     if (free_queue_.enqueue_head (temp) == -1)
00844       ACE_ERROR_RETURN ((LM_ERROR,
00845                          ACE_LIB_TEXT ("%p\n"),
00846                          ACE_LIB_TEXT ("enqueue_head")),
00847                         -1);
00848   }
00849 #else
00850   // If eh == 0 then another thread is unblocking the
00851   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00852   // internal structures.  Otherwise, we need to dispatch the
00853   // appropriate handle_* method on the <ACE_Event_Handler>
00854   // pointer we've been passed.
00855   if (buffer.eh_ != 0)
00856     return 1;
00857 
00858 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00859 
00860   // has no dispatchable buffer
00861   return 0;
00862 }
00863 
00864 int
00865 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00866 {
00867   int result = 0;
00868 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00869   // Dispatch all messages that are in the <notify_queue_>.
00870   {
00871     // We acquire the lock in a block to make sure we're not
00872     // holding the lock while delivering callbacks...
00873     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00874 
00875     ACE_Notification_Buffer *temp;
00876 
00877     if (notify_queue_.is_empty ())
00878       return 0;
00879     else if (notify_queue_.dequeue_head (temp) == -1)
00880       ACE_ERROR_RETURN ((LM_ERROR,
00881                          ACE_LIB_TEXT ("%p\n"),
00882                          ACE_LIB_TEXT ("dequeue_head")),
00883                         -1);
00884     buffer = *temp;
00885     if (free_queue_.enqueue_head (temp) == -1)
00886       ACE_ERROR_RETURN ((LM_ERROR,
00887                          ACE_LIB_TEXT ("%p\n"),
00888                          ACE_LIB_TEXT ("enqueue_head")),
00889                         -1);
00890   }
00891 
00892   // If eh == 0 then another thread is unblocking the
00893   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00894   // internal structures.  Otherwise, we need to dispatch the
00895   // appropriate handle_* method on the <ACE_Event_Handler>
00896   // pointer we've been passed.
00897   if (buffer.eh_ != 0)
00898     {
00899 
00900       switch (buffer.mask_)
00901         {
00902         case ACE_Event_Handler::READ_MASK:
00903         case ACE_Event_Handler::ACCEPT_MASK:
00904           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00905           break;
00906         case ACE_Event_Handler::WRITE_MASK:
00907           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00908           break;
00909         case ACE_Event_Handler::EXCEPT_MASK:
00910           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00911           break;
00912         default:
00913           // Should we bail out if we get an invalid mask?
00914           ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
00915         }
00916       if (result == -1)
00917         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00918                                   ACE_Event_Handler::EXCEPT_MASK);
00919     }
00920 #else
00921   // If eh == 0 then another thread is unblocking the
00922   // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
00923   // internal structures.  Otherwise, we need to dispatch the
00924   // appropriate handle_* method on the <ACE_Event_Handler>
00925   // pointer we've been passed.
00926   if (buffer.eh_ != 0)
00927     {
00928       switch (buffer.mask_)
00929         {
00930         case ACE_Event_Handler::READ_MASK:
00931         case ACE_Event_Handler::ACCEPT_MASK:
00932           result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00933           break;
00934         case ACE_Event_Handler::WRITE_MASK:
00935           result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00936           break;
00937         case ACE_Event_Handler::EXCEPT_MASK:
00938           result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00939           break;
00940         case ACE_Event_Handler::QOS_MASK:
00941           result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
00942           break;
00943         case ACE_Event_Handler::GROUP_QOS_MASK:
00944           result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
00945           break;
00946         default:
00947           // Should we bail out if we get an invalid mask?
00948           ACE_ERROR ((LM_ERROR,
00949                       ACE_LIB_TEXT ("invalid mask = %d\n"),
00950                       buffer.mask_));
00951         }
00952       if (result == -1)
00953         buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00954                                   ACE_Event_Handler::EXCEPT_MASK);
00955     }
00956 
00957 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
00958 
00959   return 1;
00960 }
00961 
00962 int
00963 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00964                                              ACE_Notification_Buffer &buffer)
00965 {
00966   ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00967 
00968   ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00969 
00970   if (n > 0)
00971     {
00972       // Check to see if we've got a short read.
00973       if (n != sizeof buffer)
00974         {
00975           ssize_t remainder = sizeof buffer - n;
00976 
00977           // If so, try to recover by reading the remainder.  If this
00978           // doesn't work we're in big trouble since the input stream
00979           // won't be aligned correctly.  I'm not sure quite what to
00980           // do at this point.  It's probably best just to return -1.
00981           if (ACE::recv (handle,
00982                          ((char *) &buffer) + n,
00983                          remainder) != remainder)
00984             return -1;
00985         }
00986 
00987 
00988       return 1;
00989     }
00990 
00991   // Return -1 if things have gone seriously  wrong.
00992   if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00993     return -1;
00994 
00995   return 0;
00996 }
00997 
00998 
00999 int
01000 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
01001 {
01002   ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01003   // Precondition: this->select_reactor_.token_.current_owner () ==
01004   // ACE_Thread::self ();
01005 
01006   int number_dispatched = 0;
01007   int result = 0;
01008   ACE_Notification_Buffer buffer;
01009 
01010   while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01011     {
01012       // Dispatch the buffer
01013       // NOTE: We count only if we made any dispatches ie. upcalls.
01014       if (this->dispatch_notify (buffer) > 0)
01015         number_dispatched++;
01016 
01017       // Bail out if we've reached the <notify_threshold_>.  Note that
01018       // by default <notify_threshold_> is -1, so we'll loop until all
01019       // the notifications in the pipe have been dispatched.
01020       if (number_dispatched == this->max_notify_iterations_)
01021         break;
01022     }
01023 
01024   // Reassign number_dispatched to -1 if things have gone seriously
01025   // wrong.
01026   if (result < 0)
01027     number_dispatched = -1;
01028 
01029   // Enqueue ourselves into the list of waiting threads.  When we
01030   // reacquire the token we'll be off and running again with ownership
01031   // of the token.  The postcondition of this call is that
01032   // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
01033   this->select_reactor_->renew ();
01034   return number_dispatched;
01035 }
01036 
01037 // Perform GET, CLR, SET, and ADD operations on the Handle_Sets.
01038 //
01039 // GET = 1, Retrieve current value
01040 // SET = 2, Set value of bits to new mask (changes the entire mask)
01041 // ADD = 3, Bitwise "or" the value into the mask (only changes
01042 //          enabled bits)
01043 // CLR = 4  Bitwise "and" the negation of the value out of the mask
01044 //          (only changes enabled bits)
01045 //
01046 // Returns the original mask.  Must be called with locks held.
01047 
01048 int
01049 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
01050                                   ACE_Reactor_Mask mask,
01051                                   ACE_Select_Reactor_Handle_Set &handle_set,
01052                                   int ops)
01053 {
01054   ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
01055   if (this->handler_rep_.handle_in_range (handle) == 0)
01056     return -1;
01057 
01058 #if !defined (ACE_WIN32)
01059   ACE_Sig_Guard sb; // Block out all signals until method returns.
01060 #endif /* ACE_WIN32 */
01061 
01062   ACE_FDS_PTMF ptmf  = &ACE_Handle_Set::set_bit;
01063   u_long omask = ACE_Event_Handler::NULL_MASK;
01064 
01065   // Find the old reactor masks.  This automatically does the work of
01066   // the GET_MASK operation.
01067   if (handle_set.rd_mask_.is_set (handle))
01068     ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
01069   if (handle_set.wr_mask_.is_set (handle))
01070     ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
01071   if (handle_set.ex_mask_.is_set (handle))
01072     ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
01073 
01074   switch (ops)
01075     {
01076     case ACE_Reactor::GET_MASK:
01077       // The work for this operation is done in all cases at the
01078       // begining of the function.
01079       break;
01080     case ACE_Reactor::CLR_MASK:
01081       ptmf = &ACE_Handle_Set::clr_bit;
01082       /* FALLTHRU */
01083     case ACE_Reactor::SET_MASK:
01084       /* FALLTHRU */
01085     case ACE_Reactor::ADD_MASK:
01086 
01087       // The following code is rather subtle...  Note that if we are
01088       // doing a ACE_Reactor::SET_MASK then if the bit is not enabled
01089       // in the mask we need to clear the bit from the ACE_Handle_Set.
01090       // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
01091       // a ACE_Reactor::ADD_MASK we just carry out the operations
01092       // specified by the mask.
01093 
01094       // READ, ACCEPT, and CONNECT flag will place the handle in the
01095       // read set.
01096       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01097           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01098           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01099         {
01100           (handle_set.rd_mask_.*ptmf) (handle);
01101         }
01102       else if (ops == ACE_Reactor::SET_MASK)
01103         handle_set.rd_mask_.clr_bit (handle);
01104 
01105       // WRITE and CONNECT flag will place the handle in the write set
01106       if (ACE_BIT_ENABLED (mask,
01107                            ACE_Event_Handler::WRITE_MASK)
01108           || ACE_BIT_ENABLED (mask,
01109                               ACE_Event_Handler::CONNECT_MASK))
01110         {
01111           (handle_set.wr_mask_.*ptmf) (handle);
01112         }
01113       else if (ops == ACE_Reactor::SET_MASK)
01114         handle_set.wr_mask_.clr_bit (handle);
01115 
01116       // EXCEPT (and CONNECT on Win32) flag will place the handle in
01117       // the except set.
01118       if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01119 #if defined (ACE_WIN32)
01120           || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01121 #endif /* ACE_WIN32 */
01122           )
01123         {
01124           (handle_set.ex_mask_.*ptmf) (handle);
01125         }
01126       else if (ops == ACE_Reactor::SET_MASK)
01127         handle_set.ex_mask_.clr_bit (handle);
01128       break;
01129     default:
01130       return -1;
01131     }
01132   return omask;
01133 }
01134 
01135 int
01136 ACE_Select_Reactor_Impl::resumable_handler (void)
01137 {
01138   // The select reactor has no handlers that can be resumed by the
01139   // application. So return 0;
01140 
01141   return 0;
01142 }
01143 
01144 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
01145 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
01146 template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>;
01147 template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>;
01148 template class ACE_Node <ACE_Notification_Buffer *>;
01149 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
01150 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
01151 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
01152 #pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *>
01153 #pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>
01154 #pragma instantiate ACE_Node <ACE_Notification_Buffer *>
01155 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
01156 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Mon Jun 16 11:21:07 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002