Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Token.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // $Id: Token.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:23 chad Exp $
00003 
00004 #include "ace/Thread.h"
00005 #include "ace/Token.h"
00006 #include "ace/Log_Msg.h"
00007 
00008 #if defined (DEBUGGING)
00009 #include "ace/streams.h"
00010 #endif /* DEBUGGING */
00011 
00012 ACE_RCSID(ace, Token, "$Id: Token.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:23 chad Exp $")
00013 
00014 #if defined (ACE_HAS_THREADS)
00015 
00016 #if !defined (__ACE_INLINE__)
00017 #include "ace/Synch_T.h"
00018 #include "ace/Token.i"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
00022 
00023 
00024 void
00025 ACE_Token::dump (void) const
00026 {
00027   ACE_TRACE ("ACE_Token::dump");
00028 
00029   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00030 
00031   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nthread = %d"), ACE_Thread::self ()));
00032   // @@ Is there a portable way to do this?
00033   // ACE_DEBUG ((LM_DEBUG, "\nowner_ = %l", (long) this->owner_));
00034   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nowner_ addr = %x"), &this->owner_));
00035   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nwaiters_ = %d"), this->waiters_));
00036   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nin_use_ = %d"), this->in_use_));
00037   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nnesting level = %d"), this->nesting_level_));
00038   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00039 }
00040 
00041 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00042                                                          ACE_thread_t t_id)
00043   : next_ (0),
00044     thread_id_ (t_id),
00045 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00046     cv_ (0),
00047 #else
00048     cv_ (m),
00049 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00050     runable_ (0)
00051 {
00052 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00053   ACE_UNUSED_ARG (m);
00054 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00055 
00056   ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00057 }
00058 
00059 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00060                                                          ACE_thread_t t_id,
00061                                                          ACE_Condition_Attributes &attributes)
00062   : next_ (0),
00063     thread_id_ (t_id),
00064 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00065     cv_ (0),
00066 #else
00067     cv_ (m, attributes),
00068 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00069     runable_ (0)
00070 {
00071 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00072   ACE_UNUSED_ARG (m);
00073   ACE_UNUSED_ARG (attributes);
00074 #endif /* ACE_TOKEN_USES_SEMAPHORE */
00075 
00076   ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00077 }
00078 
00079 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
00080   : head_ (0),
00081     tail_ (0)
00082 {
00083   ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
00084 }
00085 
00086 //
00087 // Remove an entry from the list.  Must be called with locks held.
00088 //
00089 void
00090 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
00091 {
00092   ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
00093   ACE_Token_Queue_Entry *curr = 0;
00094   ACE_Token_Queue_Entry *prev = 0;
00095 
00096   if (this->head_ == 0)
00097     return;
00098 
00099   for (curr = this->head_;
00100        curr != 0 && curr != entry;
00101        curr = curr->next_)
00102     prev = curr;
00103 
00104   if (curr == 0)
00105     // Didn't find the entry...
00106     return;
00107   else if (prev == 0)
00108     // Delete at the head.
00109     this->head_ = this->head_->next_;
00110   else
00111     // Delete in the middle.
00112     prev->next_ = curr->next_;
00113 
00114   // We need to update the tail of the list if we've deleted the last
00115   // entry.
00116   if (curr->next_ == 0)
00117     this->tail_ = prev;
00118 }
00119 
00120 //
00121 // Add an entry into the list.  Must be called with locks held.
00122 //
00123 void
00124 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
00125                                           int requeue_position)
00126 {
00127   if (this->head_ == 0)
00128     {
00129       // No other threads - just add me
00130       this->head_ = &entry;
00131       this->tail_ = &entry;
00132     }
00133   else if (requeue_position == -1)
00134     {
00135       // Insert at the end of the queue.
00136       this->tail_->next_ = &entry;
00137       this->tail_ = &entry;
00138     }
00139   else if (requeue_position == 0)
00140     {
00141       // Insert at head of queue.
00142       entry.next_ = this->head_;
00143       this->head_ = &entry;
00144     }
00145   else
00146     // Insert in the middle of the queue somewhere.
00147     {
00148       // Determine where our thread should go in the queue of waiters.
00149 
00150       ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
00151       while (requeue_position-- && insert_after->next_ != 0)
00152         insert_after = insert_after->next_;
00153 
00154       entry.next_ = insert_after->next_;
00155 
00156       if (entry.next_ == 0)
00157         this->tail_ = &entry;
00158 
00159       insert_after->next_ = &entry;
00160     }
00161 }
00162 
00163 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
00164   : lock_ (name, (ACE_mutexattr_t *) any),
00165     owner_ (ACE_OS::NULL_thread),
00166     in_use_ (0),
00167     waiters_ (0),
00168     nesting_level_ (0),
00169     attributes_ (USYNC_THREAD),
00170     queueing_strategy_ (FIFO)
00171 {
00172 //  ACE_TRACE ("ACE_Token::ACE_Token");
00173 }
00174 
00175 ACE_Token::~ACE_Token (void)
00176 {
00177   ACE_TRACE ("ACE_Token::~ACE_Token");
00178 }
00179 
00180 int
00181 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
00182                            void *arg,
00183                            ACE_Time_Value *timeout,
00184                            ACE_Token_Op_Type op_type)
00185 {
00186   ACE_TRACE ("ACE_Token::shared_acquire");
00187   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00188 
00189 #if defined (DEBUGGING)
00190   this->dump ();
00191 #endif /* DEBUGGING */
00192 
00193   ACE_thread_t thr_id = ACE_Thread::self ();
00194 
00195   // Nobody holds the token.
00196   if (!this->in_use_)
00197     {
00198       // Its mine!
00199       this->in_use_ = op_type;
00200       this->owner_ = thr_id;
00201       return 0;
00202     }
00203 
00204   //
00205   // Someone already holds the token.
00206   //
00207 
00208   // Check if it is us.
00209   if (ACE_OS::thr_equal (thr_id, this->owner_))
00210     {
00211       this->nesting_level_++;
00212       return 0;
00213     }
00214 
00215   // Do a quick check for "polling" behavior.
00216   if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0)
00217     {
00218       errno = ETIME;
00219       return -1;
00220     }
00221 
00222   //
00223   // We've got to sleep until we get the token.
00224   //
00225 
00226   // Which queue we should end up in...
00227   ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00228                             ? &this->readers_
00229                             : &this->writers_);
00230 
00231   // Allocate queue entry on stack.  This works since we don't exit
00232   // this method's activation record until we've got the token.
00233   ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00234                                              thr_id,
00235                                              this->attributes_);
00236   queue->insert_entry (my_entry, this->queueing_strategy_);
00237   this->waiters_++;
00238 
00239   // Execute appropriate <sleep_hook> callback.  (@@ should these
00240   // methods return a success/failure status, and if so, what should
00241   // we do with it?)
00242   int ret = 0;
00243   if (sleep_hook_func)
00244     {
00245       (*sleep_hook_func) (arg);
00246       ret++;
00247     }
00248   else
00249     {
00250       // Execute virtual method.
00251       this->sleep_hook ();
00252       ret++;
00253     }
00254 
00255   int timed_out = 0;
00256   int error = 0;
00257 
00258   // Sleep until we've got the token (ignore signals).
00259   do
00260     {
00261       int result = my_entry.wait (timeout,
00262                                   this->lock_);
00263 
00264       if (result == -1)
00265         {
00266           // Note, this should obey whatever thread-specific interrupt
00267           // policy is currently in place...
00268           if (errno == EINTR)
00269             continue;
00270 
00271 #if defined (DEBUGGING)
00272           cerr << '(' << ACE_Thread::self () << ')'
00273                << " acquire: "
00274                << (errno == ETIME ? "timed out" : "error occurred")
00275                << endl;
00276 #endif /* DEBUGGING */
00277 
00278           // We come here if a timeout occurs or some serious
00279           // ACE_Condition object error.
00280           if (errno == ETIME)
00281             timed_out = 1;
00282           else
00283             error = 1;
00284 
00285           // Stop the loop.
00286           break;
00287         }
00288     }
00289   while (!ACE_OS::thr_equal (thr_id, this->owner_));
00290 
00291   // Do this always and irrespective of the result of wait().
00292   this->waiters_--;
00293   queue->remove_entry (&my_entry);
00294 
00295 #if defined (DEBUGGING)
00296   cerr << '(' << ACE_Thread::self () << ')'
00297        << " acquire (UNBLOCKED)" << endl;
00298 #endif /* DEBUGGING */
00299 
00300   // If timeout occured
00301   if (timed_out)
00302     {
00303       // This thread was still selected to own the token.
00304       if (my_entry.runable_)
00305         {
00306           // Wakeup next waiter since this thread timed out.
00307           this->wakeup_next_waiter ();
00308         }
00309 
00310       // Return error.
00311      return -1;
00312     }
00313   else if (error)
00314     {
00315       // Return error.
00316       return -1;
00317     }
00318 
00319   // If this is a normal wakeup, this thread should be runnable.
00320   ACE_ASSERT (my_entry.runable_);
00321 
00322   return ret;
00323 }
00324 
00325 // By default this is a no-op.
00326 
00327 /* virtual */
00328 void
00329 ACE_Token::sleep_hook (void)
00330 {
00331   ACE_TRACE ("ACE_Token::sleep_hook");
00332 }
00333 
00334 int
00335 ACE_Token::acquire (ACE_Time_Value *timeout)
00336 {
00337   ACE_TRACE ("ACE_Token::acquire");
00338   return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
00339 }
00340 
00341 // Acquire the token, sleeping until it is obtained or until <timeout>
00342 // expires.
00343 
00344 int
00345 ACE_Token::acquire (void (*sleep_hook_func)(void *),
00346                     void *arg,
00347                     ACE_Time_Value *timeout)
00348 {
00349   ACE_TRACE ("ACE_Token::acquire");
00350   return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
00351 }
00352 
00353 // Try to renew the token.
00354 
00355 int
00356 ACE_Token::renew (int requeue_position,
00357                   ACE_Time_Value *timeout)
00358 {
00359   ACE_TRACE ("ACE_Token::renew");
00360   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00361 
00362 #if defined (DEBUGGING)
00363   this->dump ();
00364 #endif /* DEBUGGING */
00365   // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
00366 
00367   // Check to see if there are any waiters worth giving up the lock
00368   // for.
00369 
00370   // If no writers and either we are a writer or there are no readers.
00371   if (this->writers_.head_ == 0 &&
00372       (this->in_use_ == ACE_Token::WRITE_TOKEN ||
00373        this->readers_.head_ == 0))
00374     // Immediate return.
00375     return 0;
00376 
00377   // We've got to sleep until we get the token again.
00378 
00379   // Determine which queue should this thread go to.
00380   ACE_Token::ACE_Token_Queue *this_threads_queue =
00381     this->in_use_ == ACE_Token::READ_TOKEN ?
00382     &this->readers_ : &this->writers_;
00383 
00384   ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00385                                              this->owner_);
00386 
00387   this_threads_queue->insert_entry (my_entry,
00388                                     // if requeue_position == 0 then we want to go next,
00389                                     // otherwise use the queueing strategy, which might also
00390                                     // happen to be 0.
00391                                     requeue_position == 0 ? 0 : this->queueing_strategy_);
00392   this->waiters_++;
00393 
00394   // Remember nesting level...
00395   int save_nesting_level_ = this->nesting_level_;
00396 
00397   // Reset state for new owner.
00398   this->nesting_level_ = 0;
00399 
00400   // Wakeup waiter.
00401   this->wakeup_next_waiter ();
00402 
00403   int timed_out = 0;
00404   int error = 0;
00405 
00406   // Sleep until we've got the token (ignore signals).
00407   do
00408     {
00409       int result = my_entry.wait (timeout,
00410                                   this->lock_);
00411 
00412       if (result == -1)
00413         {
00414           // Note, this should obey whatever thread-specific interrupt
00415           // policy is currently in place...
00416           if (errno == EINTR)
00417             continue;
00418 
00419 #if defined (DEBUGGING)
00420           cerr << '(' << ACE_Thread::self () << ')'
00421                << " renew: "
00422                << (errno == ETIME ? "timed out" : "error occurred")
00423                << endl;
00424 #endif /* DEBUGGING */
00425 
00426           // We come here if a timeout occurs or some serious
00427           // ACE_Condition object error.
00428           if (errno == ETIME)
00429             timed_out = 1;
00430           else
00431             error = 1;
00432 
00433           // Stop the loop.
00434           break;
00435         }
00436     }
00437   while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
00438 
00439   // Do this always and irrespective of the result of wait().
00440   this->waiters_--;
00441   this_threads_queue->remove_entry (&my_entry);
00442 
00443 #if defined (DEBUGGING)
00444   cerr << '(' << ACE_Thread::self () << ')'
00445        << " acquire (UNBLOCKED)" << endl;
00446 #endif /* DEBUGGING */
00447 
00448   // If timeout occured
00449   if (timed_out)
00450     {
00451       // This thread was still selected to own the token.
00452       if (my_entry.runable_)
00453         {
00454           // Wakeup next waiter since this thread timed out.
00455           this->wakeup_next_waiter ();
00456         }
00457 
00458       // Return error.
00459      return -1;
00460     }
00461   else if (error)
00462     {
00463       // Return error.
00464       return -1;
00465     }
00466 
00467   // If this is a normal wakeup, this thread should be runnable.
00468   ACE_ASSERT (my_entry.runable_);
00469 
00470   // Reinstate nesting level.
00471   this->nesting_level_ = save_nesting_level_;
00472 
00473   return 0;
00474 }
00475 
00476 // Release the current holder of the token (which had
00477 // better be the caller's thread!).
00478 
00479 int
00480 ACE_Token::release (void)
00481 {
00482   ACE_TRACE ("ACE_Token::release");
00483   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00484 
00485   // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
00486 
00487 #if defined (DEBUGGING)
00488   this->dump ();
00489 #endif /* DEBUGGING */
00490 
00491   // Nested release...
00492   if (this->nesting_level_ > 0)
00493     --this->nesting_level_;
00494   else
00495     {
00496       //
00497       // Regular release...
00498       //
00499 
00500       // Wakeup waiter.
00501       this->wakeup_next_waiter ();
00502     }
00503 
00504   return 0;
00505 }
00506 
00507 void
00508 ACE_Token::wakeup_next_waiter (void)
00509 {
00510   ACE_TRACE ("ACE_Token::wakeup_next_waiter");
00511 
00512   // Reset state for new owner.
00513   this->owner_ = ACE_OS::NULL_thread;
00514   this->in_use_ = 0;
00515 
00516   // Any waiters...
00517   if (this->writers_.head_ == 0 &&
00518       this->readers_.head_ == 0)
00519     {
00520       // No more waiters...
00521       return;
00522     }
00523 
00524   // Wakeup next waiter.
00525   ACE_Token_Queue *queue;
00526 
00527   // Writer threads get priority to run first.
00528   if (this->writers_.head_ != 0)
00529     {
00530       this->in_use_ = ACE_Token::WRITE_TOKEN;
00531       queue = &this->writers_;
00532     }
00533   else
00534     {
00535       this->in_use_ = ACE_Token::READ_TOKEN;
00536       queue = &this->readers_;
00537     }
00538 
00539   // Wake up waiter and make it runable.
00540   queue->head_->runable_ = 1;
00541   queue->head_->signal ();
00542 
00543   this->owner_ = queue->head_->thread_id_;
00544 }
00545 
00546 #endif /* ACE_HAS_THREADS */
00547 
00548 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00549 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00550 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Mon Jun 16 11:21:56 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002