00001
00002
00003 #ifndef ACE_MESSAGE_QUEUE_T_C
00004 #define ACE_MESSAGE_QUEUE_T_C
00005
00006
00007
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 # pragma once
00013 #endif
00014
00015 #if !defined (__ACE_INLINE__)
00016 #include "ace/Message_Queue_T.i"
00017 #endif
00018
00019 #include "ace/Notification_Strategy.h"
00020
00021 ACE_RCSID(ace, Message_Queue_T, "$Id: Message_Queue_T.cpp,v 1.1.1.4.2.1 2003/04/16 16:41:14 taoadmin Exp $")
00022
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00024 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00025 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00026
00027 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00028 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00029 {
00030 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00031
00032 this->queue_.dump ();
00033 }
00034
00035 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00036 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00037 {
00038 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00039
00040 this->queue_.message_bytes (new_value);
00041 }
00042
00043 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00044 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00045 {
00046 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00047
00048 this->queue_.message_length (new_value);
00049 }
00050
00051 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00052 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00053 size_t lwm,
00054 ACE_Notification_Strategy *ns)
00055 {
00056 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00057
00058 if (this->queue_.open (hwm, lwm, ns) == -1)
00059 ACE_ERROR ((LM_ERROR,
00060 ACE_LIB_TEXT ("ACE_Message_Queue_Ex")));
00061 }
00062
00063 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00064 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00065 {
00066 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00067 }
00068
00069 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00070 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00071 size_t lwm,
00072 ACE_Notification_Strategy *ns)
00073 {
00074 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00075
00076 return this->queue_.open (hwm, lwm, ns);
00077 }
00078
00079
00080
00081 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00082 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00083 {
00084 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00085
00086 return this->queue_.close ();
00087 }
00088
00089 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00090 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00091 {
00092 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00093
00094 return this->queue_.flush ();
00095 }
00096
00097 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00098 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00099 {
00100 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00101
00102 return this->queue_.flush_i ();
00103 }
00104
00105
00106
00107 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00108 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00109 ACE_Time_Value *timeout)
00110 {
00111 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00112
00113 ACE_Message_Block *mb;
00114
00115 int cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00116
00117 if (cur_count != -1)
00118 first_item = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00119
00120 return cur_count;
00121 }
00122
00123 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00124 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00125 ACE_Time_Value *timeout)
00126 {
00127 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00128
00129 ACE_Message_Block *mb;
00130
00131 ACE_NEW_RETURN (mb,
00132 ACE_Message_Block ((char *) new_item,
00133 sizeof (*new_item),
00134 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00135 -1);
00136
00137 int result = this->queue_.enqueue_head (mb, timeout);
00138 if (result == -1)
00139
00140 mb->release ();
00141 return result;
00142 }
00143
00144
00145
00146
00147
00148 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00149 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00150 ACE_Time_Value *timeout)
00151 {
00152 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00153
00154 return this->enqueue_prio (new_item, timeout);
00155 }
00156
00157 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00158 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00159 ACE_Time_Value *timeout)
00160 {
00161 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00162
00163 ACE_Message_Block *mb;
00164
00165 ACE_NEW_RETURN (mb,
00166 ACE_Message_Block ((char *) new_item,
00167 sizeof (*new_item),
00168 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00169 -1);
00170
00171 int result = this->queue_.enqueue_prio (mb, timeout);
00172 if (result == -1)
00173
00174 mb->release ();
00175
00176 return result;
00177 }
00178
00179 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00180 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00181 ACE_Time_Value *timeout)
00182 {
00183 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00184
00185 ACE_Message_Block *mb;
00186
00187 ACE_NEW_RETURN (mb,
00188 ACE_Message_Block ((char *) new_item,
00189 sizeof (*new_item),
00190 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00191 -1);
00192
00193 int result = this->queue_.enqueue_deadline (mb, timeout);
00194 if (result == -1)
00195
00196 mb->release ();
00197
00198 return result;
00199 }
00200
00201
00202
00203
00204 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00205 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00206 ACE_Time_Value *timeout)
00207 {
00208 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00209
00210 ACE_Message_Block *mb;
00211
00212 ACE_NEW_RETURN (mb,
00213 ACE_Message_Block ((char *) new_item,
00214 sizeof (*new_item),
00215 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00216 -1);
00217
00218 int result = this->queue_.enqueue_tail (mb, timeout);
00219 if (result == -1)
00220
00221 mb->release ();
00222 return result;
00223 }
00224
00225
00226
00227
00228
00229 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00230 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00231 ACE_Time_Value *timeout)
00232 {
00233 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00234
00235 ACE_Message_Block *mb;
00236
00237 int cur_count = this->queue_.dequeue_head (mb, timeout);
00238
00239
00240 if (cur_count != -1)
00241 {
00242 first_item = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00243
00244 mb->release ();
00245 return cur_count;
00246 }
00247 else
00248 return -1;
00249 }
00250
00251
00252
00253
00254
00255 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00256 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00257 ACE_Time_Value *timeout)
00258 {
00259 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00260
00261 ACE_Message_Block *mb;
00262
00263 int cur_count = this->queue_.dequeue_prio (mb, timeout);
00264
00265
00266 if (cur_count != -1)
00267 {
00268 dequeued = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00269
00270 mb->release ();
00271 return cur_count;
00272 }
00273 else
00274 return -1;
00275 }
00276
00277
00278
00279
00280
00281 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00282 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00283 ACE_Time_Value *timeout)
00284 {
00285 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00286
00287 ACE_Message_Block *mb;
00288
00289 int cur_count = this->queue_.dequeue_tail (mb, timeout);
00290
00291
00292 if (cur_count != -1)
00293 {
00294 dequeued = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00295
00296 mb->release ();
00297 return cur_count;
00298 }
00299 else
00300 return -1;
00301 }
00302
00303
00304
00305
00306
00307 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00308 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00309 ACE_Time_Value *timeout)
00310 {
00311 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00312
00313 ACE_Message_Block *mb;
00314
00315 int cur_count = this->queue_.dequeue_deadline (mb, timeout);
00316
00317
00318 if (cur_count != -1)
00319 {
00320 dequeued = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00321
00322 mb->release ();
00323 return cur_count;
00324 }
00325 else
00326 return -1;
00327 }
00328
00329 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00330 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00331 {
00332 ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00333
00334 return this->queue_.notify ();
00335 }
00336
00337 template <ACE_SYNCH_DECL>
00338 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00339 : queue_ (q),
00340 curr_ (q.head_)
00341 {
00342 }
00343
00344 template <ACE_SYNCH_DECL> int
00345 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00346 {
00347 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00348
00349 if (this->curr_ != 0)
00350 {
00351 entry = this->curr_;
00352 return 1;
00353 }
00354
00355 return 0;
00356 }
00357
00358 template <ACE_SYNCH_DECL> int
00359 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00360 {
00361 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00362
00363 return this->curr_ == 0;
00364 }
00365
00366 template <ACE_SYNCH_DECL> int
00367 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00368 {
00369 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00370
00371 if (this->curr_)
00372 this->curr_ = this->curr_->next ();
00373 return this->curr_ != 0;
00374 }
00375
00376 template <ACE_SYNCH_DECL> void
00377 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00378 {
00379 }
00380
00381 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00382
00383 template <ACE_SYNCH_DECL>
00384 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00385 : queue_ (q),
00386 curr_ (queue_.tail_)
00387 {
00388 }
00389
00390 template <ACE_SYNCH_DECL> int
00391 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00392 {
00393 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00394
00395 if (this->curr_ != 0)
00396 {
00397 entry = this->curr_;
00398 return 1;
00399 }
00400
00401 return 0;
00402 }
00403
00404 template <ACE_SYNCH_DECL> int
00405 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00406 {
00407 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00408
00409 return this->curr_ == 0;
00410 }
00411
00412 template <ACE_SYNCH_DECL> int
00413 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00414 {
00415 ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00416
00417 if (this->curr_)
00418 this->curr_ = this->curr_->prev ();
00419 return this->curr_ != 0;
00420 }
00421
00422 template <ACE_SYNCH_DECL> void
00423 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00424 {
00425 }
00426
00427 template <ACE_SYNCH_DECL> void
00428 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00429 {
00430 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00431 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00432 switch (this->state_)
00433 {
00434 case ACE_Message_Queue_Base::ACTIVATED:
00435 ACE_DEBUG ((LM_DEBUG,
00436 ACE_LIB_TEXT ("state = ACTIVATED\n")));
00437 break;
00438 case ACE_Message_Queue_Base::DEACTIVATED:
00439 ACE_DEBUG ((LM_DEBUG,
00440 ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00441 break;
00442 case ACE_Message_Queue_Base::PULSED:
00443 ACE_DEBUG ((LM_DEBUG,
00444 ACE_LIB_TEXT ("state = PULSED\n")));
00445 break;
00446 }
00447 ACE_DEBUG ((LM_DEBUG,
00448 ACE_LIB_TEXT ("low_water_mark = %d\n")
00449 ACE_LIB_TEXT ("high_water_mark = %d\n")
00450 ACE_LIB_TEXT ("cur_bytes = %d\n")
00451 ACE_LIB_TEXT ("cur_length = %d\n")
00452 ACE_LIB_TEXT ("cur_count = %d\n")
00453 ACE_LIB_TEXT ("head_ = %u\n")
00454 ACE_LIB_TEXT ("tail_ = %u\n"),
00455 this->low_water_mark_,
00456 this->high_water_mark_,
00457 this->cur_bytes_,
00458 this->cur_length_,
00459 this->cur_count_,
00460 this->head_,
00461 this->tail_));
00462 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_full_cond: \n")));
00463 not_full_cond_.dump ();
00464 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_empty_cond: \n")));
00465 not_empty_cond_.dump ();
00466 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00467 }
00468
00469 template <ACE_SYNCH_DECL> void
00470 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00471 {
00472 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00473 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00474
00475 this->cur_bytes_ = new_value;
00476 }
00477
00478 template <ACE_SYNCH_DECL> void
00479 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00480 {
00481 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00482 ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00483
00484 this->cur_length_ = new_value;
00485 }
00486
00487 template <ACE_SYNCH_DECL>
00488 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00489 size_t lwm,
00490 ACE_Notification_Strategy *ns)
00491 : not_empty_cond_ (this->lock_),
00492 not_full_cond_ (this->lock_)
00493 {
00494 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00495
00496 if (this->open (hwm, lwm, ns) == -1)
00497 ACE_ERROR ((LM_ERROR,
00498 ACE_LIB_TEXT ("open")));
00499 }
00500
00501 template <ACE_SYNCH_DECL>
00502 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00503 {
00504 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00505 if (this->head_ != 0 && this->close () == -1)
00506 ACE_ERROR ((LM_ERROR,
00507 ACE_LIB_TEXT ("close")));
00508 }
00509
00510 template <ACE_SYNCH_DECL> int
00511 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00512 {
00513 int number_flushed = 0;
00514
00515
00516
00517 for (this->tail_ = 0; this->head_ != 0; )
00518 {
00519 number_flushed++;
00520
00521 size_t mb_bytes = 0;
00522 size_t mb_length = 0;
00523 this->head_->total_size_and_length (mb_bytes,
00524 mb_length);
00525
00526 this->cur_bytes_ -= mb_bytes;
00527 this->cur_length_ -= mb_length;
00528 this->cur_count_--;
00529
00530 ACE_Message_Block *temp = this->head_;
00531 this->head_ = this->head_->next ();
00532
00533
00534
00535 temp->release ();
00536 }
00537
00538 return number_flushed;
00539 }
00540
00541
00542
00543
00544
00545 template <ACE_SYNCH_DECL> int
00546 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
00547 size_t lwm,
00548 ACE_Notification_Strategy *ns)
00549 {
00550 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
00551 this->high_water_mark_ = hwm;
00552 this->low_water_mark_ = lwm;
00553 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00554 this->cur_bytes_ = 0;
00555 this->cur_length_ = 0;
00556 this->cur_count_ = 0;
00557 this->tail_ = 0;
00558 this->head_ = 0;
00559 this->notification_strategy_ = ns;
00560 return 0;
00561 }
00562
00563
00564
00565
00566 template <ACE_SYNCH_DECL> int
00567 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
00568 {
00569 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
00570 int previous_state = this->state_;
00571
00572 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00573 {
00574
00575 this->not_empty_cond_.broadcast ();
00576 this->not_full_cond_.broadcast ();
00577
00578 if (pulse)
00579 this->state_ = ACE_Message_Queue_Base::PULSED;
00580 else
00581 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00582 }
00583 return previous_state;
00584 }
00585
00586 template <ACE_SYNCH_DECL> int
00587 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
00588 {
00589 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
00590 int previous_state = this->state_;
00591 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00592 return previous_state;
00593 }
00594
00595 template <ACE_SYNCH_DECL> int
00596 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
00597 {
00598 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
00599 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00600
00601
00602 return this->flush_i ();
00603 }
00604
00605
00606
00607 template <ACE_SYNCH_DECL> int
00608 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
00609 {
00610 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
00611 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00612
00613 int result = this->deactivate_i ();
00614
00615
00616 this->flush_i ();
00617
00618 return result;
00619 }
00620
00621 template <ACE_SYNCH_DECL> int
00622 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
00623 {
00624 if (this->not_full_cond_.signal () != 0)
00625 return -1;
00626 return 0;
00627 }
00628
00629 template <ACE_SYNCH_DECL> int
00630 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
00631 {
00632
00633 if (this->not_empty_cond_.signal () != 0)
00634 return -1;
00635 return 0;
00636 }
00637
00638
00639
00640
00641 template <ACE_SYNCH_DECL> int
00642 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
00643 {
00644 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
00645
00646 if (new_item == 0)
00647 return -1;
00648
00649
00650 if (this->tail_ == 0)
00651 {
00652 this->head_ = new_item;
00653 this->tail_ = new_item;
00654 new_item->next (0);
00655 new_item->prev (0);
00656 }
00657
00658 else
00659 {
00660 new_item->next (0);
00661 this->tail_->next (new_item);
00662 new_item->prev (this->tail_);
00663 this->tail_ = new_item;
00664 }
00665
00666
00667 new_item->total_size_and_length (this->cur_bytes_,
00668 this->cur_length_);
00669 this->cur_count_++;
00670
00671 if (this->signal_dequeue_waiters () == -1)
00672 return -1;
00673 else
00674 return this->cur_count_;
00675 }
00676
00677
00678
00679 template <ACE_SYNCH_DECL> int
00680 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
00681 {
00682 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
00683
00684 if (new_item == 0)
00685 return -1;
00686
00687 new_item->prev (0);
00688 new_item->next (this->head_);
00689
00690 if (this->head_ != 0)
00691 this->head_->prev (new_item);
00692 else
00693 this->tail_ = new_item;
00694
00695 this->head_ = new_item;
00696
00697
00698 new_item->total_size_and_length (this->cur_bytes_,
00699 this->cur_length_);
00700 this->cur_count_++;
00701
00702 if (this->signal_dequeue_waiters () == -1)
00703 return -1;
00704 else
00705 return this->cur_count_;
00706 }
00707
00708
00709
00710
00711 template <ACE_SYNCH_DECL> int
00712 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
00713 {
00714 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
00715
00716 if (new_item == 0)
00717 return -1;
00718
00719 if (this->head_ == 0)
00720
00721
00722 return this->enqueue_head_i (new_item);
00723 else
00724 {
00725 ACE_Message_Block *temp;
00726
00727
00728
00729
00730
00731 for (temp = this->tail_;
00732 temp != 0;
00733 temp = temp->prev ())
00734 if (temp->msg_priority () >= new_item->msg_priority ())
00735
00736
00737 break;
00738
00739 if (temp == 0)
00740
00741
00742
00743 return this->enqueue_head_i (new_item);
00744 else if (temp->next () == 0)
00745
00746
00747
00748 return this->enqueue_tail_i (new_item);
00749 else
00750 {
00751
00752
00753
00754
00755 new_item->prev (temp);
00756 new_item->next (temp->next ());
00757 temp->next ()->prev (new_item);
00758 temp->next (new_item);
00759 }
00760 }
00761
00762
00763 new_item->total_size_and_length (this->cur_bytes_,
00764 this->cur_length_);
00765 this->cur_count_++;
00766
00767 if (this->signal_dequeue_waiters () == -1)
00768 return -1;
00769 else
00770 return this->cur_count_;
00771 }
00772
00773
00774
00775
00776 template <ACE_SYNCH_DECL> int
00777 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
00778 {
00779 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
00780 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
00781
00782 if (new_item == 0)
00783 return -1;
00784
00785 if (this->head_ == 0)
00786
00787
00788 return this->enqueue_head_i (new_item);
00789 else
00790 {
00791 ACE_Message_Block *temp;
00792
00793
00794
00795
00796
00797 for (temp = this->head_;
00798 temp != 0;
00799 temp = temp->next ())
00800 if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
00801
00802
00803 break;
00804
00805 if (temp == 0 || temp->next () == 0)
00806
00807
00808
00809 return this->enqueue_tail_i (new_item);
00810 else
00811 {
00812
00813
00814
00815
00816 new_item->prev (temp);
00817 new_item->next (temp->next ());
00818 temp->next ()->prev (new_item);
00819 temp->next (new_item);
00820 }
00821 }
00822
00823
00824 new_item->total_size_and_length (this->cur_bytes_,
00825 this->cur_length_);
00826 this->cur_count_++;
00827
00828 if (this->signal_dequeue_waiters () == -1)
00829 return -1;
00830 else
00831 return this->cur_count_;
00832 #else
00833 return this->enqueue_tail_i (new_item);
00834 #endif
00835 }
00836
00837
00838
00839
00840
00841 template <ACE_SYNCH_DECL> int
00842 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
00843 {
00844 if (this->head_ ==0)
00845 ACE_ERROR_RETURN ((LM_ERROR,
00846 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
00847 -1);
00848 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
00849 first_item = this->head_;
00850 this->head_ = this->head_->next ();
00851
00852 if (this->head_ == 0)
00853 this->tail_ = 0;
00854 else
00855
00856
00857 this->head_->prev (0);
00858
00859 size_t mb_bytes = 0;
00860 size_t mb_length = 0;
00861 first_item->total_size_and_length (mb_bytes,
00862 mb_length);
00863
00864 this->cur_bytes_ -= mb_bytes;
00865 this->cur_length_ -= mb_length;
00866 this->cur_count_--;
00867
00868 if (this->cur_count_ == 0 && this->head_ == this->tail_)
00869 this->head_ = this->tail_ = 0;
00870
00871
00872
00873 if (this->cur_bytes_ <= this->low_water_mark_
00874 && this->signal_enqueue_waiters () == -1)
00875 return -1;
00876 else
00877 return this->cur_count_;
00878 }
00879
00880
00881
00882
00883
00884 template <ACE_SYNCH_DECL> int
00885 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
00886 {
00887 if (this->head_ == 0)
00888 ACE_ERROR_RETURN ((LM_ERROR,
00889 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
00890 -1);
00891 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
00892
00893
00894 ACE_Message_Block* chosen = 0;
00895 u_long priority = ULONG_MAX;
00896 for (ACE_Message_Block *temp = this->tail_; temp != 0; temp = temp->prev ())
00897 {
00898 if (temp->msg_priority () < priority)
00899 {
00900 priority = temp->msg_priority ();
00901 chosen = temp;
00902 }
00903 }
00904
00905
00906 if (chosen == 0)
00907 {
00908 chosen = this->head_;
00909 }
00910
00911
00912
00913 if (chosen->prev () == 0)
00914 {
00915 this->head_ = chosen->next ();
00916 }
00917 else
00918 {
00919 chosen->prev ()->next (chosen->next ());
00920 }
00921
00922 if (chosen->next () == 0)
00923 {
00924 this->tail_ = chosen->prev ();
00925 }
00926 else
00927 {
00928 chosen->next ()->prev (chosen->prev ());
00929 }
00930
00931
00932 dequeued = chosen;
00933
00934 size_t mb_bytes = 0;
00935 size_t mb_length = 0;
00936 dequeued->total_size_and_length (mb_bytes,
00937 mb_length);
00938
00939 this->cur_bytes_ -= mb_bytes;
00940 this->cur_length_ -= mb_length;
00941 this->cur_count_--;
00942
00943 if (this->cur_count_ == 0 && this->head_ == this->tail_)
00944 this->head_ = this->tail_ = 0;
00945
00946
00947
00948 if (this->cur_bytes_ <= this->low_water_mark_
00949 && this->signal_enqueue_waiters () == -1)
00950 return -1;
00951 else
00952 return this->cur_count_;
00953 }
00954
00955
00956
00957
00958
00959 template <ACE_SYNCH_DECL> int
00960 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
00961 {
00962 if (this->head_ == 0)
00963 ACE_ERROR_RETURN ((LM_ERROR,
00964 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
00965 -1);
00966 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
00967 dequeued = this->tail_;
00968 if (this->tail_->prev () == 0)
00969 {
00970 this->head_ = 0;
00971 this->tail_ = 0;
00972 }
00973 else
00974 {
00975 this->tail_->prev ()->next (0);
00976 this->tail_ = this->tail_->prev ();
00977 }
00978
00979 size_t mb_bytes = 0;
00980 size_t mb_length = 0;
00981 dequeued->total_size_and_length (mb_bytes,
00982 mb_length);
00983
00984 this->cur_bytes_ -= mb_bytes;
00985 this->cur_length_ -= mb_length;
00986 this->cur_count_--;
00987
00988 if (this->cur_count_ == 0 && this->head_ == this->tail_)
00989 this->head_ = this->tail_ = 0;
00990
00991
00992
00993 if (this->cur_bytes_ <= this->low_water_mark_
00994 && this->signal_enqueue_waiters () == -1)
00995 return -1;
00996 else
00997 return this->cur_count_;
00998 }
00999
01000
01001
01002
01003
01004 template <ACE_SYNCH_DECL> int
01005 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01006 {
01007 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01008 if (this->head_ == 0)
01009 ACE_ERROR_RETURN ((LM_ERROR,
01010 ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01011 -1);
01012 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01013
01014
01015 ACE_Message_Block* chosen = 0;
01016 ACE_Time_Value deadline = ACE_Time_Value::max_time;
01017 for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01018 {
01019 if (temp->msg_deadline_time () < deadline)
01020 {
01021 deadline = temp->msg_deadline_time ();
01022 chosen = temp;
01023 }
01024 }
01025
01026
01027
01028 if (chosen == 0)
01029 {
01030 chosen = this->head_;
01031 }
01032
01033
01034
01035 if (chosen->prev () == 0)
01036 {
01037 this->head_ = chosen->next ();
01038 }
01039 else
01040 {
01041 chosen->prev ()->next (chosen->next ());
01042 }
01043
01044 if (chosen->next () == 0)
01045 {
01046 this->tail_ = chosen->prev ();
01047 }
01048 else
01049 {
01050 chosen->next ()->prev (chosen->prev ());
01051 }
01052
01053
01054 dequeued = chosen;
01055
01056 size_t mb_bytes = 0;
01057 size_t mb_length = 0;
01058 dequeued->total_size_and_length (mb_bytes,
01059 mb_length);
01060
01061 this->cur_bytes_ -= mb_bytes;
01062 this->cur_length_ -= mb_length;
01063 this->cur_count_--;
01064
01065 if (this->cur_count_ == 0 && this->head_ == this->tail_)
01066 this->head_ = this->tail_ = 0;
01067
01068
01069
01070 if (this->cur_bytes_ <= this->low_water_mark_
01071 && this->signal_enqueue_waiters () == -1)
01072 return -1;
01073 else
01074 return this->cur_count_;
01075 #else
01076 return this->dequeue_head_i (dequeued);
01077 #endif
01078 }
01079
01080
01081
01082 template <ACE_SYNCH_DECL> int
01083 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01084 ACE_Time_Value *timeout)
01085 {
01086 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01087 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01088
01089 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01090 {
01091 errno = ESHUTDOWN;
01092 return -1;
01093 }
01094
01095
01096
01097 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01098 return -1;
01099
01100 first_item = this->head_;
01101 return this->cur_count_;
01102 }
01103
01104 template <ACE_SYNCH_DECL> int
01105 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond
01106 (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01107 {
01108 int result = 0;
01109
01110
01111
01112 while (this->is_full_i ())
01113 {
01114 if (this->not_full_cond_.wait (timeout) == -1)
01115 {
01116 if (errno == ETIME)
01117 errno = EWOULDBLOCK;
01118 result = -1;
01119 break;
01120 }
01121 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01122 {
01123 errno = ESHUTDOWN;
01124 result = -1;
01125 break;
01126 }
01127 }
01128 return result;
01129 }
01130
01131 template <ACE_SYNCH_DECL> int
01132 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01133 (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01134 {
01135 int result = 0;
01136
01137
01138
01139 while (this->is_empty_i ())
01140 {
01141 if (this->not_empty_cond_.wait (timeout) == -1)
01142 {
01143 if (errno == ETIME)
01144 errno = EWOULDBLOCK;
01145 result = -1;
01146 break;
01147 }
01148 if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01149 {
01150 errno = ESHUTDOWN;
01151 result = -1;
01152 break;
01153 }
01154 }
01155 return result;
01156 }
01157
01158
01159
01160
01161 template <ACE_SYNCH_DECL> int
01162 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01163 ACE_Time_Value *timeout)
01164 {
01165 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01166 int queue_count = 0;
01167 {
01168 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01169
01170 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01171 {
01172 errno = ESHUTDOWN;
01173 return -1;
01174 }
01175
01176 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01177 return -1;
01178
01179 queue_count = this->enqueue_head_i (new_item);
01180
01181 if (queue_count == -1)
01182 return -1;
01183
01184 this->notify ();
01185 }
01186 return queue_count;
01187 }
01188
01189
01190
01191
01192
01193 template <ACE_SYNCH_DECL> int
01194 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01195 ACE_Time_Value *timeout)
01196 {
01197 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01198 int queue_count = 0;
01199 {
01200 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01201
01202 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01203 {
01204 errno = ESHUTDOWN;
01205 return -1;
01206 }
01207
01208 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01209 return -1;
01210
01211 queue_count = this->enqueue_i (new_item);
01212
01213 if (queue_count == -1)
01214 return -1;
01215
01216 this->notify ();
01217 }
01218 return queue_count;
01219 }
01220
01221
01222
01223
01224
01225 template <ACE_SYNCH_DECL> int
01226 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01227 ACE_Time_Value *timeout)
01228 {
01229 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01230 int queue_count = 0;
01231 {
01232 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01233
01234 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01235 {
01236 errno = ESHUTDOWN;
01237 return -1;
01238 }
01239
01240 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01241 return -1;
01242
01243 queue_count = this->enqueue_deadline_i (new_item);
01244
01245 if (queue_count == -1)
01246 return -1;
01247
01248 this->notify ();
01249 }
01250 return queue_count;
01251 }
01252
01253 template <ACE_SYNCH_DECL> int
01254 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01255 ACE_Time_Value *timeout)
01256 {
01257 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01258 return this->enqueue_prio (new_item, timeout);
01259 }
01260
01261
01262
01263
01264 template <ACE_SYNCH_DECL> int
01265 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01266 ACE_Time_Value *timeout)
01267 {
01268 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01269 int queue_count = 0;
01270 {
01271 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01272
01273 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01274 {
01275 errno = ESHUTDOWN;
01276 return -1;
01277 }
01278
01279 if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01280 return -1;
01281
01282 queue_count = this->enqueue_tail_i (new_item);
01283
01284 if (queue_count == -1)
01285 return -1;
01286
01287 this->notify ();
01288 }
01289 return queue_count;
01290 }
01291
01292
01293
01294
01295
01296 template <ACE_SYNCH_DECL> int
01297 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01298 ACE_Time_Value *timeout)
01299 {
01300 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01301 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01302
01303 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01304 {
01305 errno = ESHUTDOWN;
01306 return -1;
01307 }
01308
01309 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01310 return -1;
01311
01312 return this->dequeue_head_i (first_item);
01313 }
01314
01315
01316
01317
01318
01319 template <ACE_SYNCH_DECL> int
01320 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01321 ACE_Time_Value *timeout)
01322 {
01323 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01324 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01325
01326 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01327 {
01328 errno = ESHUTDOWN;
01329 return -1;
01330 }
01331
01332 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01333 return -1;
01334
01335 return this->dequeue_prio_i (dequeued);
01336 }
01337
01338
01339
01340
01341
01342 template <ACE_SYNCH_DECL> int
01343 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01344 ACE_Time_Value *timeout)
01345 {
01346 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01347 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01348
01349 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01350 {
01351 errno = ESHUTDOWN;
01352 return -1;
01353 }
01354
01355 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01356 return -1;
01357
01358 return this->dequeue_tail_i (dequeued);
01359 }
01360
01361
01362
01363
01364
01365 template <ACE_SYNCH_DECL> int
01366 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01367 ACE_Time_Value *timeout)
01368 {
01369 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01370 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01371
01372 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01373 {
01374 errno = ESHUTDOWN;
01375 return -1;
01376 }
01377
01378 if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01379 return -1;
01380
01381 return this->dequeue_deadline_i (dequeued);
01382 }
01383
01384 template <ACE_SYNCH_DECL> int
01385 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01386 {
01387 ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01388
01389
01390 if (this->notification_strategy_ == 0)
01391 return 0;
01392 else
01393 return this->notification_strategy_->notify ();
01394 }
01395
01396
01397
01398 template <ACE_SYNCH_DECL>
01399 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01400 size_t hwm,
01401 size_t lwm,
01402 ACE_Notification_Strategy *ns)
01403 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01404 pending_head_ (0),
01405 pending_tail_ (0),
01406 late_head_ (0),
01407 late_tail_ (0),
01408 beyond_late_head_ (0),
01409 beyond_late_tail_ (0),
01410 message_strategy_ (message_strategy)
01411 {
01412
01413
01414
01415 }
01416
01417
01418
01419 template <ACE_SYNCH_DECL>
01420 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01421 {
01422 delete &this->message_strategy_;
01423 }
01424
01425 template <ACE_SYNCH_DECL> int
01426 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01427 ACE_Message_Block *&list_tail,
01428 u_int status_flags)
01429 {
01430
01431 list_head = 0;
01432 list_tail = 0;
01433
01434
01435 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01436
01437
01438 int result = this->refresh_queue (current_time);
01439 if (result < 0)
01440 return result;
01441
01442 if (ACE_BIT_ENABLED (status_flags,
01443 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01444 && this->pending_head_
01445 && this->pending_tail_)
01446 {
01447
01448 if (this->pending_head_->prev ())
01449 {
01450 this->tail_ = this->pending_head_->prev ();
01451 this->pending_head_->prev ()->next (0);
01452 }
01453 else
01454 {
01455
01456 this->head_ = 0;
01457 this->tail_ = 0;
01458 }
01459
01460
01461 list_head = this->pending_head_;
01462 list_tail = this->pending_tail_;
01463
01464
01465 this->pending_head_->prev (0);
01466 this->pending_head_ = 0;
01467 this->pending_tail_ = 0;
01468 }
01469
01470 if (ACE_BIT_ENABLED (status_flags,
01471 (u_int) ACE_Dynamic_Message_Strategy::LATE)
01472 && this->late_head_
01473 && this->late_tail_)
01474 {
01475
01476
01477 if (this->late_tail_->next ())
01478 this->late_tail_->next ()->prev (this->late_head_->prev ());
01479 else
01480 this->tail_ = this->late_head_->prev ();
01481
01482 if (this->late_head_->prev ())
01483 this->late_head_->prev ()->next (this->late_tail_->next ());
01484 else
01485 this->head_ = this->late_tail_->next ();
01486
01487
01488 this->late_head_->prev (list_tail);
01489 if (list_tail)
01490 list_tail->next (this->late_head_);
01491 else
01492 list_head = this->late_head_;
01493
01494 list_tail = this->late_tail_;
01495
01496 this->late_tail_->next (0);
01497 this->late_head_ = 0;
01498 this->late_tail_ = 0;
01499 }
01500
01501 if (ACE_BIT_ENABLED (status_flags,
01502 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01503 && this->beyond_late_head_
01504 && this->beyond_late_tail_)
01505 {
01506
01507 if (this->beyond_late_tail_->next ())
01508 {
01509 this->head_ = this->beyond_late_tail_->next ();
01510 this->beyond_late_tail_->next ()->prev (0);
01511 }
01512 else
01513 {
01514
01515 this->head_ = 0;
01516 this->tail_ = 0;
01517 }
01518
01519
01520
01521 if (list_tail)
01522 {
01523 this->beyond_late_head_->prev (list_tail);
01524 list_tail->next (this->beyond_late_head_);
01525 }
01526 else
01527 list_head = this->beyond_late_head_;
01528
01529 list_tail = this->beyond_late_tail_;
01530
01531 this->beyond_late_tail_->next (0);
01532 this->beyond_late_head_ = 0;
01533 this->beyond_late_tail_ = 0;
01534 }
01535
01536
01537 ACE_Message_Block *temp1;
01538
01539 for (temp1 = list_head;
01540 temp1 != 0;
01541 temp1 = temp1->next ())
01542 {
01543 this->cur_count_--;
01544
01545 size_t mb_bytes = 0;
01546 size_t mb_length = 0;
01547 temp1->total_size_and_length (mb_bytes,
01548 mb_length);
01549
01550 this->cur_bytes_ -= mb_bytes;
01551 this->cur_length_ -= mb_length;
01552 }
01553
01554 return result;
01555 }
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565 template <ACE_SYNCH_DECL> int
01566 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01567 ACE_Time_Value *timeout)
01568 {
01569 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01570
01571 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01572
01573 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01574 {
01575 errno = ESHUTDOWN;
01576 return -1;
01577 }
01578
01579 int result;
01580
01581
01582 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01583
01584
01585 result = this->refresh_queue (current_time);
01586 if (result < 0)
01587 return result;
01588
01589
01590 result = this->wait_not_empty_cond (ace_mon, timeout);
01591 if (result == -1)
01592 return result;
01593
01594
01595
01596
01597 result = this->dequeue_head_i (first_item);
01598
01599 return result;
01600 }
01601
01602
01603
01604
01605 template <ACE_SYNCH_DECL> void
01606 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
01607 {
01608 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01609 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01610
01611 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01612 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01613
01614 ACE_DEBUG ((LM_DEBUG,
01615 ACE_LIB_TEXT ("pending_head_ = %u\n")
01616 ACE_LIB_TEXT ("pending_tail_ = %u\n")
01617 ACE_LIB_TEXT ("late_head_ = %u\n")
01618 ACE_LIB_TEXT ("late_tail_ = %u\n")
01619 ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01620 ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01621 this->pending_head_,
01622 this->pending_tail_,
01623 this->late_head_,
01624 this->late_tail_,
01625 this->beyond_late_head_,
01626 this->beyond_late_tail_));
01627
01628 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01629 message_strategy_.dump ();
01630
01631 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01632 }
01633
01634
01635 template <ACE_SYNCH_DECL> int
01636 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01637 {
01638 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01639
01640 if (new_item == 0)
01641 return -1;
01642
01643 int result = 0;
01644
01645
01646 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01647
01648
01649
01650 result = this->refresh_queue (current_time);
01651 if (result < 0)
01652 return result;
01653
01654
01655 switch (message_strategy_.priority_status (*new_item,
01656 current_time))
01657 {
01658 case ACE_Dynamic_Message_Strategy::PENDING:
01659 if (this->pending_tail_ == 0)
01660 {
01661
01662
01663
01664 pending_head_ = new_item;
01665 pending_tail_ = pending_head_;
01666 return this->enqueue_tail_i (new_item);
01667 }
01668 else
01669 {
01670
01671
01672 result = sublist_enqueue_i (new_item,
01673 current_time,
01674 this->pending_head_,
01675 this->pending_tail_,
01676 ACE_Dynamic_Message_Strategy::PENDING);
01677 }
01678 break;
01679
01680 case ACE_Dynamic_Message_Strategy::LATE:
01681 if (this->late_tail_ == 0)
01682 {
01683 late_head_ = new_item;
01684 late_tail_ = late_head_;
01685
01686 if (this->pending_head_ == 0)
01687
01688
01689
01690 return this->enqueue_tail_i (new_item);
01691 else if (this->beyond_late_tail_ == 0)
01692
01693
01694 return this->enqueue_head_i (new_item);
01695 else
01696 {
01697
01698
01699
01700 this->beyond_late_tail_->next (new_item);
01701 new_item->prev (this->beyond_late_tail_);
01702 this->pending_head_->prev (new_item);
01703 new_item->next (this->pending_head_);
01704 }
01705 }
01706 else
01707 {
01708
01709
01710 result = sublist_enqueue_i (new_item,
01711 current_time,
01712 this->late_head_,
01713 this->late_tail_,
01714 ACE_Dynamic_Message_Strategy::LATE);
01715 }
01716 break;
01717
01718 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01719 if (this->beyond_late_tail_ == 0)
01720 {
01721
01722
01723
01724 beyond_late_head_ = new_item;
01725 beyond_late_tail_ = beyond_late_head_;
01726 return this->enqueue_head_i (new_item);
01727 }
01728 else
01729 {
01730
01731
01732
01733 if (this->beyond_late_tail_->next ())
01734 this->beyond_late_tail_->next ()->prev (new_item);
01735 else
01736 this->tail_ = new_item;
01737
01738 new_item->next (this->beyond_late_tail_->next ());
01739 this->beyond_late_tail_->next (new_item);
01740 new_item->prev (this->beyond_late_tail_);
01741 this->beyond_late_tail_ = new_item;
01742 }
01743
01744 break;
01745
01746
01747 default:
01748 result = -1;
01749 break;
01750 }
01751
01752 if (result < 0)
01753 return result;
01754
01755 size_t mb_bytes = 0;
01756 size_t mb_length = 0;
01757 new_item->total_size_and_length (mb_bytes,
01758 mb_length);
01759
01760 this->cur_bytes_ -= mb_bytes;
01761 this->cur_length_ -= mb_length;
01762 this->cur_count_++;
01763
01764 if (this->signal_dequeue_waiters () == -1)
01765 return -1;
01766 else
01767 return this->cur_count_;
01768 }
01769
01770
01771
01772
01773
01774
01775
01776 template <ACE_SYNCH_DECL> int
01777 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
01778 const ACE_Time_Value ¤t_time,
01779 ACE_Message_Block *&sublist_head,
01780 ACE_Message_Block *&sublist_tail,
01781 ACE_Dynamic_Message_Strategy::Priority_Status status)
01782 {
01783 int result = 0;
01784 ACE_Message_Block *current_item = 0;
01785
01786
01787
01788 for (current_item = sublist_tail;
01789 current_item;
01790 current_item = current_item->prev ())
01791 {
01792 if (message_strategy_.priority_status (*current_item, current_time) == status)
01793 {
01794 if (current_item->msg_priority () >= new_item->msg_priority ())
01795 break;
01796 }
01797 else
01798 {
01799 sublist_head = new_item;
01800 break;
01801 }
01802 }
01803
01804 if (current_item == 0)
01805 {
01806
01807
01808 new_item->prev (0);
01809 new_item->next (this->head_);
01810 if (this->head_ != 0)
01811 this->head_->prev (new_item);
01812 else
01813 {
01814 this->tail_ = new_item;
01815 sublist_tail = new_item;
01816 }
01817 this->head_ = new_item;
01818 sublist_head = new_item;
01819 }
01820 else
01821 {
01822
01823 new_item->next (current_item->next ());
01824 new_item->prev (current_item);
01825
01826 if (current_item->next ())
01827 current_item->next ()->prev (new_item);
01828 else
01829 this->tail_ = new_item;
01830
01831 current_item->next (new_item);
01832
01833
01834
01835 if (current_item == sublist_tail)
01836 sublist_tail = new_item;
01837 }
01838
01839 return result;
01840 }
01841
01842
01843
01844
01845 template <ACE_SYNCH_DECL> int
01846 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01847 {
01848 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01849
01850 int result = 0;
01851 int last_in_subqueue = 0;
01852
01853
01854 if (this->pending_head_)
01855 {
01856 first_item = this->pending_head_;
01857
01858 if (0 == this->pending_head_->prev ())
01859 this->head_ = this->pending_head_->next ();
01860 else
01861 this->pending_head_->prev ()->next (this->pending_head_->next ());
01862
01863 if (0 == this->pending_head_->next ())
01864 {
01865 this->tail_ = this->pending_head_->prev ();
01866 this->pending_head_ = 0;
01867 this->pending_tail_ = 0;
01868 }
01869 else
01870 {
01871 this->pending_head_->next ()->prev (this->pending_head_->prev ());
01872 this->pending_head_ = this->pending_head_->next ();
01873 }
01874
01875 first_item->prev (0);
01876 first_item->next (0);
01877 }
01878
01879
01880 else if (this->late_head_)
01881 {
01882 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
01883
01884 first_item = this->late_head_;
01885
01886 if (0 == this->late_head_->prev ())
01887 this->head_ = this->late_head_->next ();
01888 else
01889 this->late_head_->prev ()->next (this->late_head_->next ());
01890
01891 if (0 == this->late_head_->next ())
01892 this->tail_ = this->late_head_->prev ();
01893 else
01894 {
01895 this->late_head_->next ()->prev (this->late_head_->prev ());
01896 this->late_head_ = this->late_head_->next ();
01897 }
01898
01899 if (last_in_subqueue)
01900 {
01901 this->late_head_ = 0;
01902 this->late_tail_ = 0;
01903 }
01904
01905 first_item->prev (0);
01906 first_item->next (0);
01907 }
01908
01909 else if (this->beyond_late_head_)
01910 {
01911 last_in_subqueue =
01912 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
01913
01914 first_item = this->beyond_late_head_;
01915 this->head_ = this->beyond_late_head_->next ();
01916
01917 if (0 == this->beyond_late_head_->next ())
01918 this->tail_ = this->beyond_late_head_->prev ();
01919 else
01920 {
01921 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
01922 this->beyond_late_head_ = this->beyond_late_head_->next ();
01923 }
01924
01925 if (last_in_subqueue)
01926 {
01927 this->beyond_late_head_ = 0;
01928 this->beyond_late_tail_ = 0;
01929 }
01930
01931 first_item->prev (0);
01932 first_item->next (0);
01933 }
01934 else
01935 {
01936
01937 first_item = 0;
01938 result = -1;
01939 }
01940
01941 if (result < 0)
01942 return result;
01943
01944 size_t mb_bytes = 0;
01945 size_t mb_length = 0;
01946 first_item->total_size_and_length (mb_bytes,
01947 mb_length);
01948
01949 this->cur_bytes_ -= mb_bytes;
01950 this->cur_length_ -= mb_length;
01951 this->cur_count_--;
01952
01953
01954
01955 if (this->cur_bytes_ <= this->low_water_mark_
01956 && this->signal_enqueue_waiters () == -1)
01957 return -1;
01958 else
01959 return this->cur_count_;
01960 }
01961
01962
01963
01964
01965
01966
01967
01968 template <ACE_SYNCH_DECL> int
01969 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time)
01970 {
01971 int result;
01972
01973 result = refresh_pending_queue (current_time);
01974
01975 if (result != -1)
01976 result = refresh_late_queue (current_time);
01977
01978 return result;
01979 }
01980
01981
01982
01983
01984 template <ACE_SYNCH_DECL> int
01985 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time)
01986 {
01987 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
01988
01989
01990 if (this->pending_head_)
01991 {
01992 current_status = message_strategy_.priority_status (*this->pending_head_,
01993 current_time);
01994 switch (current_status)
01995 {
01996 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01997
01998
01999 this->beyond_late_head_ = this->head_;
02000
02001
02002
02003 this->late_head_ = 0;
02004 this->late_tail_ = 0;
02005
02006
02007 do
02008 {
02009 this->pending_head_ = this->pending_head_->next ();
02010
02011 if (this->pending_head_)
02012 current_status = message_strategy_.priority_status (*this->pending_head_,
02013 current_time);
02014 else
02015 break;
02016
02017 }
02018 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02019
02020 if (this->pending_head_)
02021 {
02022
02023 this->beyond_late_tail_ = this->pending_head_->prev ();
02024
02025 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02026
02027 break;
02028 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02029 {
02030
02031 ACE_ERROR_RETURN ((LM_ERROR,
02032 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02033 (int) current_status),
02034 -1);
02035 }
02036
02037 }
02038 else
02039 {
02040
02041
02042 this->beyond_late_tail_ = this->tail_;
02043 this->pending_head_ = 0;
02044 this->pending_tail_ = 0;
02045 break;
02046 }
02047
02048 case ACE_Dynamic_Message_Strategy::LATE:
02049
02050
02051
02052 if (this->late_head_ == 0)
02053 this->late_head_ = this->pending_head_;
02054
02055
02056 do
02057 {
02058 this->pending_head_ = this->pending_head_->next ();
02059
02060 if (this->pending_head_)
02061 current_status = message_strategy_.priority_status (*this->pending_head_,
02062 current_time);
02063 else
02064 break;
02065
02066 }
02067 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02068
02069 if (this->pending_head_)
02070 {
02071 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02072
02073 ACE_ERROR_RETURN((LM_ERROR,
02074 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02075 (int) current_status),
02076 -1);
02077
02078
02079 this->late_tail_ = this->pending_head_->prev ();
02080 }
02081 else
02082 {
02083
02084 this->late_tail_ = this->tail_;
02085 this->pending_head_ = 0;
02086 this->pending_tail_ = 0;
02087 }
02088
02089 break;
02090 case ACE_Dynamic_Message_Strategy::PENDING:
02091
02092 break;
02093 default:
02094
02095 ACE_ERROR_RETURN((LM_ERROR,
02096 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02097 (int) current_status),
02098 -1);
02099 }
02100 }
02101 return 0;
02102 }
02103
02104
02105
02106
02107 template <ACE_SYNCH_DECL> int
02108 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time)
02109 {
02110 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02111
02112 if (this->late_head_)
02113 {
02114 current_status = message_strategy_.priority_status (*this->late_head_,
02115 current_time);
02116 switch (current_status)
02117 {
02118 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02119
02120
02121
02122 this->beyond_late_head_ = this->head_;
02123
02124
02125 do
02126 {
02127 this->late_head_ = this->late_head_->next ();
02128
02129 if (this->late_head_)
02130 current_status = message_strategy_.priority_status (*this->late_head_,
02131 current_time);
02132 else
02133 break;
02134
02135 }
02136 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02137
02138 if (this->late_head_)
02139 {
02140
02141 this->beyond_late_tail_ = this->late_head_->prev ();
02142
02143 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02144 {
02145
02146 this->late_head_ = 0;
02147 this->late_tail_ = 0;
02148 }
02149 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02150
02151 ACE_ERROR_RETURN ((LM_ERROR,
02152 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02153 (int) current_status),
02154 -1);
02155 }
02156 else
02157 {
02158
02159 this->beyond_late_tail_ = this->tail_;
02160 this->late_head_ = 0;
02161 this->late_tail_ = 0;
02162 }
02163
02164 break;
02165
02166 case ACE_Dynamic_Message_Strategy::LATE:
02167
02168 break;
02169
02170 case ACE_Dynamic_Message_Strategy::PENDING:
02171
02172 ACE_ERROR_RETURN ((LM_ERROR,
02173 ACE_LIB_TEXT ("Unexpected message priority status ")
02174 ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02175 (int) current_status),
02176 -1);
02177 default:
02178
02179 ACE_ERROR_RETURN ((LM_ERROR,
02180 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02181 (int) current_status),
02182 -1);
02183 }
02184 }
02185
02186 return 0;
02187 }
02188
02189
02190
02191
02192 template <ACE_SYNCH_DECL> int
02193 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02194 ACE_Time_Value *timeout)
02195 {
02196 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02197 timeout);
02198 }
02199
02200
02201
02202
02203 template <ACE_SYNCH_DECL> int
02204 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02205 ACE_Time_Value *timeout)
02206 {
02207 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02208 return this->enqueue_prio (new_item, timeout);
02209 }
02210
02211
02212
02213
02214
02215
02216 template <ACE_SYNCH_DECL> int
02217 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02218 ACE_Time_Value *timeout)
02219 {
02220 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02221 return this->enqueue_prio (new_item, timeout);
02222 }
02223
02224
02225
02226
02227
02228
02229 template <ACE_SYNCH_DECL>
02230 ACE_Message_Queue<ACE_SYNCH_USE> *
02231 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02232 size_t lwm,
02233 ACE_Notification_Strategy *ns)
02234 {
02235 ACE_Message_Queue<ACE_SYNCH_USE> *tmp;
02236
02237 ACE_NEW_RETURN (tmp,
02238 ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02239 0);
02240 return tmp;
02241 }
02242
02243
02244
02245 template <ACE_SYNCH_DECL>
02246 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02247 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02248 size_t lwm,
02249 ACE_Notification_Strategy *ns,
02250 u_long static_bit_field_mask,
02251 u_long static_bit_field_shift,
02252 u_long dynamic_priority_max,
02253 u_long dynamic_priority_offset)
02254 {
02255 ACE_Deadline_Message_Strategy *adms;
02256
02257 ACE_NEW_RETURN (adms,
02258 ACE_Deadline_Message_Strategy (static_bit_field_mask,
02259 static_bit_field_shift,
02260 dynamic_priority_max,
02261 dynamic_priority_offset),
02262 0);
02263
02264 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp;
02265 ACE_NEW_RETURN (tmp,
02266 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02267 0);
02268 return tmp;
02269 }
02270
02271
02272
02273
02274 template <ACE_SYNCH_DECL>
02275 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02276 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02277 size_t lwm,
02278 ACE_Notification_Strategy *ns,
02279 u_long static_bit_field_mask,
02280 u_long static_bit_field_shift,
02281 u_long dynamic_priority_max,
02282 u_long dynamic_priority_offset)
02283 {
02284 ACE_Laxity_Message_Strategy *alms;
02285
02286 ACE_NEW_RETURN (alms,
02287 ACE_Laxity_Message_Strategy (static_bit_field_mask,
02288 static_bit_field_shift,
02289 dynamic_priority_max,
02290 dynamic_priority_offset),
02291 0);
02292
02293 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp;
02294 ACE_NEW_RETURN (tmp,
02295 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02296 0);
02297 return tmp;
02298 }
02299
02300
02301
02302
02303 #if defined (VXWORKS)
02304
02305 template <ACE_SYNCH_DECL>
02306 ACE_Message_Queue_Vx *
02307 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02308 size_t max_message_length,
02309 ACE_Notification_Strategy *ns)
02310 {
02311 ACE_Message_Queue_Vx *tmp;
02312
02313 ACE_NEW_RETURN (tmp,
02314 ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02315 0);
02316 return tmp;
02317 }
02318
02319
02320 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
02321
02322 template <ACE_SYNCH_DECL>
02323 ACE_Message_Queue_NT *
02324 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02325 {
02326 ACE_Message_Queue_NT *tmp;
02327
02328 ACE_NEW_RETURN (tmp,
02329 ACE_Message_Queue_NT (max_threads);
02330 0);
02331 return tmp;
02332 }
02333
02334 #endif
02335 #endif
02336 #endif