00001 #include "ace_pch.h"
00002
00003
00004 #include "ace/Thread.h"
00005 #include "ace/Token.h"
00006 #include "ace/Log_Msg.h"
00007
00008 #if defined (DEBUGGING)
00009 #include "ace/streams.h"
00010 #endif
00011
00012 ACE_RCSID(ace, Token, "$Id: Token.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:23 chad Exp $")
00013
00014 #if defined (ACE_HAS_THREADS)
00015
00016 #if !defined (__ACE_INLINE__)
00017 #include "ace/Synch_T.h"
00018 #include "ace/Token.i"
00019 #endif
00020
00021 ACE_ALLOC_HOOK_DEFINE(ACE_Token)
00022
00023
00024 void
00025 ACE_Token::dump (void) const
00026 {
00027 ACE_TRACE ("ACE_Token::dump");
00028
00029 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00030
00031 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nthread = %d"), ACE_Thread::self ()));
00032
00033
00034 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nowner_ addr = %x"), &this->owner_));
00035 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nwaiters_ = %d"), this->waiters_));
00036 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nin_use_ = %d"), this->in_use_));
00037 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("\nnesting level = %d"), this->nesting_level_));
00038 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00039 }
00040
00041 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00042 ACE_thread_t t_id)
00043 : next_ (0),
00044 thread_id_ (t_id),
00045 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00046 cv_ (0),
00047 #else
00048 cv_ (m),
00049 #endif
00050 runable_ (0)
00051 {
00052 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00053 ACE_UNUSED_ARG (m);
00054 #endif
00055
00056 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00057 }
00058
00059 ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
00060 ACE_thread_t t_id,
00061 ACE_Condition_Attributes &attributes)
00062 : next_ (0),
00063 thread_id_ (t_id),
00064 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00065 cv_ (0),
00066 #else
00067 cv_ (m, attributes),
00068 #endif
00069 runable_ (0)
00070 {
00071 #if defined (ACE_TOKEN_USES_SEMAPHORE)
00072 ACE_UNUSED_ARG (m);
00073 ACE_UNUSED_ARG (attributes);
00074 #endif
00075
00076 ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
00077 }
00078
00079 ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
00080 : head_ (0),
00081 tail_ (0)
00082 {
00083 ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
00084 }
00085
00086
00087
00088
00089 void
00090 ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
00091 {
00092 ACE_TRACE ("ACE_Token::ACE_Token_Queue::remove_entry");
00093 ACE_Token_Queue_Entry *curr = 0;
00094 ACE_Token_Queue_Entry *prev = 0;
00095
00096 if (this->head_ == 0)
00097 return;
00098
00099 for (curr = this->head_;
00100 curr != 0 && curr != entry;
00101 curr = curr->next_)
00102 prev = curr;
00103
00104 if (curr == 0)
00105
00106 return;
00107 else if (prev == 0)
00108
00109 this->head_ = this->head_->next_;
00110 else
00111
00112 prev->next_ = curr->next_;
00113
00114
00115
00116 if (curr->next_ == 0)
00117 this->tail_ = prev;
00118 }
00119
00120
00121
00122
00123 void
00124 ACE_Token::ACE_Token_Queue::insert_entry (ACE_Token::ACE_Token_Queue_Entry &entry,
00125 int requeue_position)
00126 {
00127 if (this->head_ == 0)
00128 {
00129
00130 this->head_ = &entry;
00131 this->tail_ = &entry;
00132 }
00133 else if (requeue_position == -1)
00134 {
00135
00136 this->tail_->next_ = &entry;
00137 this->tail_ = &entry;
00138 }
00139 else if (requeue_position == 0)
00140 {
00141
00142 entry.next_ = this->head_;
00143 this->head_ = &entry;
00144 }
00145 else
00146
00147 {
00148
00149
00150 ACE_Token::ACE_Token_Queue_Entry *insert_after = this->head_;
00151 while (requeue_position-- && insert_after->next_ != 0)
00152 insert_after = insert_after->next_;
00153
00154 entry.next_ = insert_after->next_;
00155
00156 if (entry.next_ == 0)
00157 this->tail_ = &entry;
00158
00159 insert_after->next_ = &entry;
00160 }
00161 }
00162
00163 ACE_Token::ACE_Token (const ACE_TCHAR *name, void *any)
00164 : lock_ (name, (ACE_mutexattr_t *) any),
00165 owner_ (ACE_OS::NULL_thread),
00166 in_use_ (0),
00167 waiters_ (0),
00168 nesting_level_ (0),
00169 attributes_ (USYNC_THREAD),
00170 queueing_strategy_ (FIFO)
00171 {
00172
00173 }
00174
00175 ACE_Token::~ACE_Token (void)
00176 {
00177 ACE_TRACE ("ACE_Token::~ACE_Token");
00178 }
00179
00180 int
00181 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
00182 void *arg,
00183 ACE_Time_Value *timeout,
00184 ACE_Token_Op_Type op_type)
00185 {
00186 ACE_TRACE ("ACE_Token::shared_acquire");
00187 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00188
00189 #if defined (DEBUGGING)
00190 this->dump ();
00191 #endif
00192
00193 ACE_thread_t thr_id = ACE_Thread::self ();
00194
00195
00196 if (!this->in_use_)
00197 {
00198
00199 this->in_use_ = op_type;
00200 this->owner_ = thr_id;
00201 return 0;
00202 }
00203
00204
00205
00206
00207
00208
00209 if (ACE_OS::thr_equal (thr_id, this->owner_))
00210 {
00211 this->nesting_level_++;
00212 return 0;
00213 }
00214
00215
00216 if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0)
00217 {
00218 errno = ETIME;
00219 return -1;
00220 }
00221
00222
00223
00224
00225
00226
00227 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00228 ? &this->readers_
00229 : &this->writers_);
00230
00231
00232
00233 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00234 thr_id,
00235 this->attributes_);
00236 queue->insert_entry (my_entry, this->queueing_strategy_);
00237 this->waiters_++;
00238
00239
00240
00241
00242 int ret = 0;
00243 if (sleep_hook_func)
00244 {
00245 (*sleep_hook_func) (arg);
00246 ret++;
00247 }
00248 else
00249 {
00250
00251 this->sleep_hook ();
00252 ret++;
00253 }
00254
00255 int timed_out = 0;
00256 int error = 0;
00257
00258
00259 do
00260 {
00261 int result = my_entry.wait (timeout,
00262 this->lock_);
00263
00264 if (result == -1)
00265 {
00266
00267
00268 if (errno == EINTR)
00269 continue;
00270
00271 #if defined (DEBUGGING)
00272 cerr << '(' << ACE_Thread::self () << ')'
00273 << " acquire: "
00274 << (errno == ETIME ? "timed out" : "error occurred")
00275 << endl;
00276 #endif
00277
00278
00279
00280 if (errno == ETIME)
00281 timed_out = 1;
00282 else
00283 error = 1;
00284
00285
00286 break;
00287 }
00288 }
00289 while (!ACE_OS::thr_equal (thr_id, this->owner_));
00290
00291
00292 this->waiters_--;
00293 queue->remove_entry (&my_entry);
00294
00295 #if defined (DEBUGGING)
00296 cerr << '(' << ACE_Thread::self () << ')'
00297 << " acquire (UNBLOCKED)" << endl;
00298 #endif
00299
00300
00301 if (timed_out)
00302 {
00303
00304 if (my_entry.runable_)
00305 {
00306
00307 this->wakeup_next_waiter ();
00308 }
00309
00310
00311 return -1;
00312 }
00313 else if (error)
00314 {
00315
00316 return -1;
00317 }
00318
00319
00320 ACE_ASSERT (my_entry.runable_);
00321
00322 return ret;
00323 }
00324
00325
00326
00327
00328 void
00329 ACE_Token::sleep_hook (void)
00330 {
00331 ACE_TRACE ("ACE_Token::sleep_hook");
00332 }
00333
00334 int
00335 ACE_Token::acquire (ACE_Time_Value *timeout)
00336 {
00337 ACE_TRACE ("ACE_Token::acquire");
00338 return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
00339 }
00340
00341
00342
00343
00344 int
00345 ACE_Token::acquire (void (*sleep_hook_func)(void *),
00346 void *arg,
00347 ACE_Time_Value *timeout)
00348 {
00349 ACE_TRACE ("ACE_Token::acquire");
00350 return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
00351 }
00352
00353
00354
00355 int
00356 ACE_Token::renew (int requeue_position,
00357 ACE_Time_Value *timeout)
00358 {
00359 ACE_TRACE ("ACE_Token::renew");
00360 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00361
00362 #if defined (DEBUGGING)
00363 this->dump ();
00364 #endif
00365
00366
00367
00368
00369
00370
00371 if (this->writers_.head_ == 0 &&
00372 (this->in_use_ == ACE_Token::WRITE_TOKEN ||
00373 this->readers_.head_ == 0))
00374
00375 return 0;
00376
00377
00378
00379
00380 ACE_Token::ACE_Token_Queue *this_threads_queue =
00381 this->in_use_ == ACE_Token::READ_TOKEN ?
00382 &this->readers_ : &this->writers_;
00383
00384 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,
00385 this->owner_);
00386
00387 this_threads_queue->insert_entry (my_entry,
00388
00389
00390
00391 requeue_position == 0 ? 0 : this->queueing_strategy_);
00392 this->waiters_++;
00393
00394
00395 int save_nesting_level_ = this->nesting_level_;
00396
00397
00398 this->nesting_level_ = 0;
00399
00400
00401 this->wakeup_next_waiter ();
00402
00403 int timed_out = 0;
00404 int error = 0;
00405
00406
00407 do
00408 {
00409 int result = my_entry.wait (timeout,
00410 this->lock_);
00411
00412 if (result == -1)
00413 {
00414
00415
00416 if (errno == EINTR)
00417 continue;
00418
00419 #if defined (DEBUGGING)
00420 cerr << '(' << ACE_Thread::self () << ')'
00421 << " renew: "
00422 << (errno == ETIME ? "timed out" : "error occurred")
00423 << endl;
00424 #endif
00425
00426
00427
00428 if (errno == ETIME)
00429 timed_out = 1;
00430 else
00431 error = 1;
00432
00433
00434 break;
00435 }
00436 }
00437 while (!ACE_OS::thr_equal (my_entry.thread_id_, this->owner_));
00438
00439
00440 this->waiters_--;
00441 this_threads_queue->remove_entry (&my_entry);
00442
00443 #if defined (DEBUGGING)
00444 cerr << '(' << ACE_Thread::self () << ')'
00445 << " acquire (UNBLOCKED)" << endl;
00446 #endif
00447
00448
00449 if (timed_out)
00450 {
00451
00452 if (my_entry.runable_)
00453 {
00454
00455 this->wakeup_next_waiter ();
00456 }
00457
00458
00459 return -1;
00460 }
00461 else if (error)
00462 {
00463
00464 return -1;
00465 }
00466
00467
00468 ACE_ASSERT (my_entry.runable_);
00469
00470
00471 this->nesting_level_ = save_nesting_level_;
00472
00473 return 0;
00474 }
00475
00476
00477
00478
00479 int
00480 ACE_Token::release (void)
00481 {
00482 ACE_TRACE ("ACE_Token::release");
00483 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00484
00485
00486
00487 #if defined (DEBUGGING)
00488 this->dump ();
00489 #endif
00490
00491
00492 if (this->nesting_level_ > 0)
00493 --this->nesting_level_;
00494 else
00495 {
00496
00497
00498
00499
00500
00501 this->wakeup_next_waiter ();
00502 }
00503
00504 return 0;
00505 }
00506
00507 void
00508 ACE_Token::wakeup_next_waiter (void)
00509 {
00510 ACE_TRACE ("ACE_Token::wakeup_next_waiter");
00511
00512
00513 this->owner_ = ACE_OS::NULL_thread;
00514 this->in_use_ = 0;
00515
00516
00517 if (this->writers_.head_ == 0 &&
00518 this->readers_.head_ == 0)
00519 {
00520
00521 return;
00522 }
00523
00524
00525 ACE_Token_Queue *queue;
00526
00527
00528 if (this->writers_.head_ != 0)
00529 {
00530 this->in_use_ = ACE_Token::WRITE_TOKEN;
00531 queue = &this->writers_;
00532 }
00533 else
00534 {
00535 this->in_use_ = ACE_Token::READ_TOKEN;
00536 queue = &this->readers_;
00537 }
00538
00539
00540 queue->head_->runable_ = 1;
00541 queue->head_->signal ();
00542
00543 this->owner_ = queue->head_->thread_id_;
00544 }
00545
00546 #endif
00547
00548 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00549 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00550 #endif