Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

TP_Reactor.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // $Id: TP_Reactor.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $
00003 
00004 
00005 #include "ace/TP_Reactor.h"
00006 #include "ace/Reactor.h"
00007 #include "ace/Thread.h"
00008 
00009 #if !defined (__ACE_INLINE__)
00010 #include "ace/TP_Reactor.i"
00011 #endif /* __ACE_INLINE__ */
00012 
00013 ACE_RCSID(ace, TP_Reactor, "$Id: TP_Reactor.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $")
00014 
00015 
00016 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
00017 
00018 int
00019 ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
00020 {
00021   ACE_TRACE ("ACE_TP_Token_Guard::grab_token");
00022 
00023   // The order of these events is very subtle, modify with care.
00024 
00025   // Try to grab the lock.  If someone if already there, don't wake
00026   // them up, just queue up in the thread pool.
00027   int result = 0;
00028 
00029   if (max_wait_time)
00030     {
00031       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00032       tv += *max_wait_time;
00033 
00034       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
00035                                                   0,
00036                                                   &tv));
00037     }
00038   else
00039     {
00040       ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
00041     }
00042 
00043   // Now that this thread owns the token let us make
00044   // Check for timeouts and errors.
00045   if (result == -1)
00046     {
00047       if (errno == ETIME)
00048         return 0;
00049       else
00050         return -1;
00051     }
00052 
00053   // We got the token and so let us mark ourselves as owner
00054   this->owner_ = 1;
00055 
00056   return result;
00057 }
00058 
00059 
00060 int
00061 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
00062 {
00063   ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
00064 
00065   // Try to grab the lock.  If someone if already there, don't wake
00066   // them up, just queue up in the thread pool.
00067   int result = 0;
00068 
00069   if (max_wait_time)
00070     {
00071       ACE_Time_Value tv = ACE_OS::gettimeofday ();
00072       tv += *max_wait_time;
00073 
00074       ACE_MT (result = this->token_.acquire (0,
00075                                              0,
00076                                              &tv));
00077     }
00078   else
00079     {
00080       ACE_MT (result = this->token_.acquire ());
00081     }
00082 
00083   // Now that this thread owns the token let us make
00084   // Check for timeouts and errors.
00085   if (result == -1)
00086     {
00087       if (errno == ETIME)
00088         return 0;
00089       else
00090         return -1;
00091     }
00092 
00093   // We got the token and so let us mark ourseleves as owner
00094   this->owner_ = 1;
00095 
00096   return result;
00097 }
00098 
00099 
00100 
00101 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
00102                                 ACE_Timer_Queue *tq,
00103                                 int mask_signals,
00104                                 int s_queue)
00105   : ACE_Select_Reactor (sh, tq, 0, 0, mask_signals, s_queue)
00106 {
00107   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00108   this->supress_notify_renew (1);
00109 }
00110 
00111 ACE_TP_Reactor::ACE_TP_Reactor (size_t size,
00112                                 int rs,
00113                                 ACE_Sig_Handler *sh,
00114                                 ACE_Timer_Queue *tq,
00115                                 int mask_signals,
00116                                 int s_queue)
00117   : ACE_Select_Reactor (size, rs, sh, tq, 0, 0, mask_signals, s_queue)
00118 {
00119   ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
00120   this->supress_notify_renew (1);
00121 }
00122 
00123 int
00124 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
00125 {
00126   ACE_TRACE ("ACE_TP_Reactor::owner");
00127   if (o_id)
00128     *o_id = ACE_Thread::self ();
00129 
00130   return 0;
00131 }
00132 
00133 int
00134 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
00135 {
00136   ACE_TRACE ("ACE_TP_Reactor::owner");
00137   *t_id = ACE_Thread::self ();
00138 
00139   return 0;
00140 }
00141 
00142 int
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145   ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146 
00147   // Stash the current time -- the destructor of this object will
00148   // automatically compute how much time elapsed since this method was
00149   // called.
00150   ACE_Countdown_Time countdown (max_wait_time);
00151 
00152   // The order of these events is very subtle, modify with care.
00153 
00154 
00155   // Instantiate the token guard which will try grabbing the token for
00156   // this thread.
00157   ACE_TP_Token_Guard guard (this->token_);
00158 
00159 
00160   int result = guard.grab_token (max_wait_time);
00161 
00162   // If the guard is NOT the owner just return the retval
00163   if (!guard.is_owner ())
00164     return result;
00165 
00166   // After getting the lock just just for deactivation..
00167   if (this->deactivated_)
00168     return -1;
00169 
00170   // Update the countdown to reflect time waiting for the token.
00171   countdown.update ();
00172 
00173 
00174   return this->dispatch_i (max_wait_time,
00175                            guard);
00176 }
00177 
00178 
00179 int
00180 ACE_TP_Reactor::remove_handler (ACE_Event_Handler *eh,
00181                                 ACE_Reactor_Mask mask)
00182 {
00183   int result = 0;
00184   // Artificial scoping for grabbing and releasing the token
00185   {
00186     ACE_TP_Token_Guard guard (this->token_);
00187 
00188     // Acquire the token
00189     result = guard.acquire_token ();
00190 
00191     if (!guard.is_owner ())
00192       return result;
00193 
00194     // Call the remove_handler_i () with a DONT_CALL mask. We dont
00195     // want to call the handle_close with the token held.
00196     result = this->remove_handler_i (eh->get_handle (),
00197                                      mask | ACE_Event_Handler::DONT_CALL);
00198 
00199     if (result == -1)
00200       return -1;
00201   }
00202 
00203   // Close down the <Event_Handler> unless we've been instructed not
00204   // to.
00205   if (result == 0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
00206     eh->handle_close (ACE_INVALID_HANDLE, mask);
00207 
00208   return 0;
00209 }
00210 
00211 int
00212 ACE_TP_Reactor::remove_handler (ACE_HANDLE handle,
00213                                 ACE_Reactor_Mask mask)
00214 {
00215 
00216   ACE_Event_Handler *eh = 0;
00217   int result = 0;
00218   // Artificial scoping for grabbing and releasing the token
00219   {
00220     ACE_TP_Token_Guard guard (this->token_);
00221 
00222     // Acquire the token
00223     result = guard.acquire_token ();
00224 
00225     if (!guard.is_owner ())
00226       return result;
00227 
00228     size_t slot = 0;
00229     eh =  this->handler_rep_.find (handle, &slot);
00230 
00231     if (eh == 0)
00232       return -1;
00233 
00234     // Call the remove_handler_i () with a DONT_CALL mask. We dont
00235     // want to call the handle_close with the token held.
00236     result = this->remove_handler_i (handle,
00237                                      mask | ACE_Event_Handler::DONT_CALL);
00238 
00239     if (result == -1)
00240       return -1;
00241   }
00242 
00243   // Close down the <Event_Handler> unless we've been instructed not
00244   // to.
00245   // @@ Note: The check for result ==0 may be redundant, but shouldnt
00246   // be a problem.
00247   if (result ==0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0))
00248     eh->handle_close (handle, mask);
00249 
00250   return 0;
00251 }
00252 
00253 
00254 int
00255 ACE_TP_Reactor::remove_handler (const ACE_Handle_Set &handles,
00256                                 ACE_Reactor_Mask m)
00257 {
00258   // Array of <Event_Handlers> corresponding to <handles>
00259   ACE_Event_Handler **aeh = 0;
00260 
00261   // Allocate memory for the size of the handle set
00262   ACE_NEW_RETURN (aeh,
00263                   ACE_Event_Handler *[handles.num_set ()],
00264                   -1);
00265 
00266   size_t index = 0;
00267 
00268   // Artificial scoping for grabbing and releasing the token
00269   {
00270     ACE_TP_Token_Guard guard (this->token_);
00271 
00272     // Acquire the token
00273     int result = guard.acquire_token ();
00274 
00275     if (!guard.is_owner ())
00276       return result;
00277 
00278     ACE_HANDLE h;
00279 
00280     ACE_Handle_Set_Iterator handle_iter (handles);
00281 
00282     while ((h = handle_iter ()) != ACE_INVALID_HANDLE)
00283       {
00284         size_t slot = 0;
00285         ACE_Event_Handler *eh =
00286           this->handler_rep_.find (h, &slot);
00287 
00288         if (this->remove_handler_i (h,
00289                                     m | ACE_Event_Handler::DONT_CALL) == -1)
00290           {
00291             delete [] aeh;
00292             return -1;
00293           }
00294 
00295         aeh [index] = eh;
00296         index ++;
00297       }
00298   }
00299 
00300   // Close down the <Event_Handler> unless we've been instructed not
00301   // to.
00302   if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0)
00303     {
00304       for (size_t i = 0; i < index; i++)
00305         aeh[i]->handle_close (ACE_INVALID_HANDLE, m);
00306     }
00307 
00308   delete [] aeh;
00309   return 0;
00310 }
00311 
00312 int
00313 ACE_TP_Reactor::remove_handler (int /*signum*/,
00314                                 ACE_Sig_Action * /*new_disp*/,
00315                                 ACE_Sig_Action * /*old_disp*/,
00316                                 int /*sigkey*/)
00317 {
00318   ACE_NOTSUP_RETURN (-1);
00319 }
00320 
00321 int
00322 ACE_TP_Reactor::remove_handler (const ACE_Sig_Set & /*sigset*/)
00323 {
00324   ACE_NOTSUP_RETURN (-1);
00325 }
00326 
00327 
00328 int
00329 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
00330                             ACE_TP_Token_Guard &guard)
00331 {
00332   int event_count =
00333     this->get_event_for_dispatching (max_wait_time);
00334 
00335   int result = 0;
00336 
00337   // Note: We are passing the <event_count> around, to have record of
00338   // how many events still need processing. May be this could be
00339   // useful in future.
00340 
00341   // Dispatch signals
00342   if (event_count == -1)
00343     {
00344       // Looks like we dont do any upcalls in dispatch signals. If at
00345       // a later point of time, we decide to handle signals we have to
00346       // release the lock before we make any upcalls.. What is here
00347       // now is not the right thing...
00348       // @@ We need to do better..
00349       return this->handle_signals (event_count,
00350                                    guard);
00351     }
00352 
00353   // If there are no signals and if we had received a proper
00354   // event_count then first look at dispatching timeouts. We need to
00355   // handle timers early since they may have higher latency
00356   // constraints than I/O handlers.  Ideally, the order of
00357   // dispatching should be a strategy...
00358 
00359   // NOTE: The event count does not have the number of timers that
00360   // needs dispatching. But we are still passing this along. We dont
00361   // need to do that. In the future we *may* have the timers also
00362   // returned through the <event_count>. Just passing that along for
00363   // that day.
00364   result = this->handle_timer_events (event_count,
00365                                       guard);
00366 
00367   if (result > 0)
00368     return result;
00369 
00370 
00371   // Else justgo ahead fall through for further handling
00372 
00373   if (event_count > 0)
00374     {
00375       // Next dispatch the notification handlers (if there are any to
00376       // dispatch).  These are required to handle multiple-threads that
00377       // are trying to update the <Reactor>.
00378       result = this->handle_notify_events (event_count,
00379                                            guard);
00380 
00381       if (result > 0)
00382         return result;
00383 
00384       // Else just fall through for further handling
00385     }
00386 
00387   if (event_count > 0)
00388     {
00389       // Handle socket events
00390       return this->handle_socket_events (event_count,
00391                                          guard);
00392     }
00393 
00394   return 0;
00395 
00396 }
00397 
00398 
00399 
00400 
00401 int
00402 ACE_TP_Reactor::handle_signals (int & /*event_count*/,
00403                                 ACE_TP_Token_Guard & /*guard*/)
00404 {
00405   ACE_TRACE ("ACE_TP_Reactor::handle_signals");
00406 
00407   /*
00408    *
00409    *             THIS METHOD SEEMS BROKEN
00410    *
00411    *
00412    */
00413   // First check for interrupts.
00414   // Bail out -- we got here since <select> was interrupted.
00415   if (ACE_Sig_Handler::sig_pending () != 0)
00416     {
00417       ACE_Sig_Handler::sig_pending (0);
00418 
00419       // This piece of code comes from the old TP_Reactor. We did not
00420       // handle signals at all then. If we happen to handle signals
00421       // in the TP_Reactor, we should then start worryiung about this
00422       // - Bala 21-Aug- 01
00423 #if 0
00424       // Not sure if this should be done in the TP_Reactor
00425       // case... leave it out for now.   -Steve Huston 22-Aug-00
00426 
00427       // If any HANDLES in the <ready_set_> are activated as a
00428       // result of signals they should be dispatched since
00429       // they may be time critical...
00430       active_handle_count = this->any_ready (dispatch_set);
00431  #else
00432       // active_handle_count = 0;
00433 #endif
00434 
00435       // Record the fact that the Reactor has dispatched a
00436       // handle_signal() method.  We need this to return the
00437       // appropriate count.
00438       return 1;
00439     }
00440 
00441   return -1;
00442 }
00443 
00444 
00445 int
00446 ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
00447                                      ACE_TP_Token_Guard &guard)
00448 {
00449   // Get the current time
00450   ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
00451                            this->timer_queue_->timer_skew ());
00452 
00453   // Look for a node in the timer queue whose timer <= the present
00454   // time.
00455   ACE_Timer_Node_Dispatch_Info info;
00456 
00457   if (this->timer_queue_->dispatch_info (cur_time,
00458                                          info))
00459     {
00460       // Release the token before dispatching notifies...
00461       guard.release_token ();
00462 
00463       // call the functor
00464       this->timer_queue_->upcall (info.type_,
00465                                   info.act_,
00466                                   cur_time);
00467 
00468       // We have dispatched a timer
00469       return 1;
00470     }
00471 
00472   return 0;
00473 }
00474 
00475 
00476 
00477 int
00478 ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
00479                                       ACE_TP_Token_Guard &guard)
00480 {
00481   // Get the handle on which notify calls could have occured
00482   ACE_HANDLE notify_handle =
00483     this->get_notify_handle ();
00484 
00485   int result = 0;
00486 
00487   // The notify was not in the list returned by
00488   // wait_for_multiple_events ().
00489   if (notify_handle == ACE_INVALID_HANDLE)
00490     return result;
00491 
00492   // Now just do a read on the pipe..
00493   ACE_Notification_Buffer buffer;
00494 
00495   // Clear the handle of the read_mask of our <ready_set_>
00496   this->ready_set_.rd_mask_.clr_bit (notify_handle);
00497 
00498   // Keep reading notifies till we empty it or till we have a
00499   // dispatchable buffer
00500   while (this->notify_handler_->read_notify_pipe (notify_handle,
00501                                                   buffer) > 0)
00502     {
00503       // Just figure out whether we can read  any buffer that has
00504       // dispatchable info. If not we have just been unblocked by
00505       // another thread trying to update the reactor. If we get any
00506       // buffer that needs dispatching we will dispatch that after
00507       // releasing the lock
00508       if (this->notify_handler_->is_dispatchable (buffer) > 0)
00509         {
00510           // Release the token before dispatching notifies...
00511           guard.release_token ();
00512 
00513           // Dispatch the upcall for the notify
00514           this->notify_handler_->dispatch_notify (buffer);
00515 
00516           // We had a successful dispatch.
00517           result = 1;
00518 
00519           // break out of the while loop
00520           break;
00521         }
00522     }
00523 
00524   // If we did ssome work, then we just return 1 which will allow us
00525   // to get out of here. If we return 0, then we will be asked to do
00526   // some work ie. dispacth socket events
00527   return result;
00528 }
00529 
00530 int
00531 ACE_TP_Reactor::handle_socket_events (int &event_count,
00532                                       ACE_TP_Token_Guard &guard)
00533 {
00534 
00535   // We got the lock, lets handle some I/O events.
00536   ACE_EH_Dispatch_Info dispatch_info;
00537 
00538   this->get_socket_event_info (dispatch_info);
00539 
00540   // If there is any event handler that is ready to be dispatched, the
00541   // dispatch information is recorded in dispatch_info.
00542   if (!dispatch_info.dispatch ())
00543     {
00544       return 0;
00545     }
00546 
00547   // Suspend the handler so that other threads don't start
00548   // dispatching it.
00549   // NOTE: This check was performed in older versions of the
00550   // TP_Reactor. Looks like it is a waste..
00551   if (dispatch_info.event_handler_ != this->notify_handler_)
00552     this->suspend_i (dispatch_info.handle_);
00553 
00554   // Release the lock.  Others threads can start waiting.
00555   guard.release_token ();
00556 
00557   int result = 0;
00558 
00559   // If there was an event handler ready, dispatch it.
00560   // Decrement the event left
00561   --event_count;
00562 
00563   if (this->dispatch_socket_event (dispatch_info) == 0)
00564     ++result;                // Dispatched an event
00565 
00566   // This is to get around a problem/ which  is well described in
00567   // 1361. This is just a work around that  would help applications
00568   // from resuming handles at the most  inopportune moment.
00569   int flag =
00570     ACE_Event_Handler::ACE_EVENT_HANDLER_NOT_RESUMED;
00571 
00572   // Acquire the token since we want to access the handler
00573   // repository. The call to find () does not hold a lock and hence
00574   // this is required.
00575   guard.acquire_token ();
00576 
00577   // Get the handler for the handle that we have dispatched to.
00578   ACE_Event_Handler *eh =
00579     this->handler_rep_.find (dispatch_info.handle_);
00580 
00581   // This check is required for the following reasons
00582   // 1. If dispatch operation returned a -1, then there is a
00583   // possibility that the event handler could be deleted. In such
00584   // cases the pointer to the event_handler that <dispatch_info>
00585   // holds is set to 0.
00586   //
00587   // 2. If the application did its own memory management, a return
00588   // value of 0 cannot be believed since the event handler could
00589   // be deleted by the application based on some conditions. This
00590   // is *bad*. But we dont have much of a choice with the existing
00591   // reactor setup. To get around this, we can make a check for
00592   // the handler registered with the repository for the handle
00593   // that we have and compare with the handler that we
00594   // posses. Yeah, I know it is like touching your nose by taking
00595   // your hand around your head. But that is the way it is. This
00596   // is a fix for [BUGID 1231]
00597 
00598   if (dispatch_info.event_handler_ != 0 &&
00599       eh == dispatch_info.event_handler_)
00600     {
00601       flag =
00602            dispatch_info.event_handler_->resume_handler ();
00603     }
00604 
00605   // Use resume_i () since we hold the token already.
00606   if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
00607       dispatch_info.event_handler_ != this->notify_handler_ &&
00608       flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
00609     this->resume_i (dispatch_info.handle_);
00610 
00611   // Let me release the token here. This is not required since the
00612   // destruction of the object on the stack will take care of this.
00613   guard.release_token ();
00614 
00615   return result;
00616 }
00617 
00618 int
00619 ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
00620                           ACE_Reactor_Mask mask,
00621                           int ops)
00622 {
00623   ACE_TRACE ("ACE_TP_Reactor::mask_ops");
00624   ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
00625           ace_mon, this->token_, -1));
00626 
00627   int result = 0;
00628 
00629   // If it looks like the handle isn't suspended, then
00630   // set the ops on the wait_set_, otherwise set the suspend_set_.
00631 
00632   if (this->suspend_set_.rd_mask_.is_set (handle) == 0
00633       && this->suspend_set_.wr_mask_.is_set (handle) == 0
00634       && this->suspend_set_.ex_mask_.is_set (handle) == 0)
00635 
00636     result = this->bit_ops (handle, mask,
00637                             this->wait_set_,
00638                             ops);
00639   else
00640 
00641     result = this->bit_ops (handle, mask,
00642                             this->suspend_set_,
00643                             ops);
00644 
00645   return result;
00646 }
00647 
00648 
00649 
00650 int
00651 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
00652 {
00653 
00654   // If the reactor handler state has changed, clear any remembered
00655   // ready bits and re-scan from the master wait_set.
00656   if (this->state_changed_)
00657     {
00658       this->ready_set_.rd_mask_.reset ();
00659       this->ready_set_.wr_mask_.reset ();
00660       this->ready_set_.ex_mask_.reset ();
00661       this->state_changed_ = 0;
00662     }
00663   else
00664     {
00665       // This is a hack... somewhere, under certain conditions (which
00666       // I don't understand...) the mask will have all of its bits clear,
00667       // yet have a size_ > 0. This is an attempt to remedy the affect,
00668       // without knowing why it happens.
00669 
00670       //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500))
00671       // SunCC seems to be having problems with this piece of code
00672       // here. I am  not sure why though. This works fine with other
00673       // compilers. As we dont seem to understand when this piece of
00674       // code is needed and as it creates problems for SunCC we will
00675       // not compile this. Most of the tests in TAO seem to be happy
00676       // without this in SunCC.
00677       this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
00678       this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
00679       this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
00680       //# endif /* ! __SUNPRO_CC */
00681 
00682     }
00683 
00684   return this->wait_for_multiple_events (this->ready_set_,
00685                                          max_wait_time);
00686 }
00687 
00688 int
00689 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
00690 {
00691   event.reset ();           // Nothing to dispatch yet
00692 
00693   // Check for dispatch in write, except, read. Only catch one, but if
00694   // one is caught, be sure to clear the handle from each mask in case
00695   // there is more than one mask set for it. This would cause problems
00696   // if the handler is suspended for dispatching, but its set bit in
00697   // another part of ready_set_ kept it from being dispatched.
00698   int found_io = 0;
00699   ACE_HANDLE handle;
00700 
00701   // @@todo: We can do quite a bit of code reduction here. Let me get
00702   // it to work before I do this.
00703   {
00704     ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
00705 
00706     while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00707       {
00708         if (this->is_suspended_i (handle))
00709           continue;
00710 
00711         // Remember this info
00712         event.set (handle,
00713                    this->handler_rep_.find (handle),
00714                    ACE_Event_Handler::WRITE_MASK,
00715                    &ACE_Event_Handler::handle_output);
00716 
00717         this->clear_handle_read_set (handle);
00718         found_io = 1;
00719       }
00720   }
00721 
00722   if (!found_io)
00723     {
00724       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
00725 
00726       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00727         {
00728           if (this->is_suspended_i (handle))
00729             continue;
00730 
00731           // Remember this info
00732           event.set (handle,
00733                      this->handler_rep_.find (handle),
00734                      ACE_Event_Handler::EXCEPT_MASK,
00735                      &ACE_Event_Handler::handle_exception);
00736 
00737           this->clear_handle_read_set (handle);
00738 
00739           found_io = 1;
00740         }
00741     }
00742 
00743   if (!found_io)
00744     {
00745       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00746 
00747       while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
00748         {
00749           if (this->is_suspended_i (handle))
00750             continue;
00751 
00752           // Remember this info
00753           event.set (handle,
00754                      this->handler_rep_.find (handle),
00755                      ACE_Event_Handler::READ_MASK,
00756                      &ACE_Event_Handler::handle_input);
00757 
00758           this->clear_handle_read_set (handle);
00759           found_io = 1;
00760         }
00761     }
00762 
00763   return found_io;
00764 }
00765 
00766 
00767 
00768 // Dispatches a single event handler
00769 int
00770 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
00771 {
00772   ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
00773 
00774   ACE_HANDLE handle = dispatch_info.handle_;
00775   ACE_Event_Handler *event_handler = dispatch_info.event_handler_;
00776   ACE_Reactor_Mask mask = dispatch_info.mask_;
00777   ACE_EH_PTMF callback = dispatch_info.callback_;
00778 
00779   // Check for removed handlers.
00780   if (event_handler == 0)
00781     return -1;
00782 
00783   // Upcall. If the handler returns positive value (requesting a
00784   // reactor callback) don't set the ready-bit because it will be
00785   // ignored if the reactor state has changed. Just call back
00786   // as many times as the handler requests it. Other threads are off
00787   // handling other things.
00788   int status = 1;
00789   while (status > 0)
00790     status = (event_handler->*callback) (handle);
00791 
00792   // If negative, remove from Reactor
00793   if (status < 0)
00794     {
00795       int retval =
00796         this->remove_handler (handle, mask);
00797 
00798       // As the handler is no longer valid, invalidate the handle
00799       dispatch_info.event_handler_ = 0;
00800       dispatch_info.handle_ = ACE_INVALID_HANDLE;
00801 
00802       return retval;
00803     }
00804 
00805   // assert (status >= 0);
00806   return 0;
00807 }
00808 
00809 int
00810 ACE_TP_Reactor::resumable_handler (void)
00811 {
00812   return 1;
00813 }
00814 
00815 int
00816 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
00817 {
00818   return this->handle_events (&max_wait_time);
00819 }
00820 
00821 int
00822 ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
00823                           ACE_Reactor_Mask mask,
00824                           int ops)
00825 {
00826   return this->mask_ops (eh->get_handle (), mask, ops);
00827 }
00828 
00829 void
00830 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
00831                                ACE_Reactor_Mask,
00832                                ACE_Handle_Set &,
00833                                ACE_Event_Handler *eh,
00834                                ACE_EH_PTMF)
00835 {
00836   ACE_ERROR ((LM_ERROR,
00837               ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: ")
00838               ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n")));
00839 
00840   ACE_ASSERT (eh == 0);
00841   ACE_UNUSED_ARG (eh);
00842 }
00843 
00844 ACE_HANDLE
00845 ACE_TP_Reactor::get_notify_handle (void)
00846 {
00847   // Call the notify handler to get a handle on which we would have a
00848   // notify waiting
00849   ACE_HANDLE read_handle =
00850     this->notify_handler_->notify_handle ();
00851 
00852   // Check whether the rd_mask has been set on that handle. If so
00853   // return the handle.
00854   if (read_handle != ACE_INVALID_HANDLE &&
00855       this->ready_set_.rd_mask_.is_set (read_handle))
00856     {
00857       return read_handle;
00858     }
00859     /*if (read_handle != ACE_INVALID_HANDLE)
00860     {
00861       ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
00862       ACE_HANDLE handle = ACE_INVALID_HANDLE;
00863 
00864       while ((handle = handle_iter ()) == read_handle)
00865         {
00866           return read_handle;
00867         }
00868       ACE_UNUSED_ARG (handle);
00869       }*/
00870 
00871   // None found..
00872   return ACE_INVALID_HANDLE;
00873 }

Generated on Mon Jun 16 11:21:58 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002