Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Message_Queue_T.cpp

Go to the documentation of this file.
00001 // $Id: Message_Queue_T.cpp,v 1.1.1.4.2.1 2003/04/16 16:41:14 taoadmin Exp $
00002 
00003 #ifndef ACE_MESSAGE_QUEUE_T_C
00004 #define ACE_MESSAGE_QUEUE_T_C
00005 
00006 // #include Message_Queue.h instead of Message_Queue_T.h to avoid
00007 // circular include problems.
00008 #include "ace/Message_Queue.h"
00009 #include "ace/Log_Msg.h"
00010 
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 # pragma once
00013 #endif /* ACE_LACKS_PRAGMA_ONCE */
00014 
00015 #if !defined (__ACE_INLINE__)
00016 #include "ace/Message_Queue_T.i"
00017 #endif /* __ACE_INLINE__ */
00018 
00019 #include "ace/Notification_Strategy.h"
00020 
00021 ACE_RCSID(ace, Message_Queue_T, "$Id: Message_Queue_T.cpp,v 1.1.1.4.2.1 2003/04/16 16:41:14 taoadmin Exp $")
00022 
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
00024 ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
00025 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex)
00026 
00027 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00028 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const
00029 {
00030   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump");
00031 
00032   this->queue_.dump ();
00033 }
00034 
00035 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00036 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value)
00037 {
00038   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes");
00039 
00040   this->queue_.message_bytes (new_value);
00041 }
00042 
00043 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void
00044 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value)
00045 {
00046   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length");
00047 
00048   this->queue_.message_length (new_value);
00049 }
00050 
00051 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00052 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm,
00053                                                                              size_t lwm,
00054                                                                              ACE_Notification_Strategy *ns)
00055 {
00056   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex");
00057 
00058   if (this->queue_.open (hwm, lwm, ns) == -1)
00059     ACE_ERROR ((LM_ERROR,
00060                 ACE_LIB_TEXT ("ACE_Message_Queue_Ex")));
00061 }
00062 
00063 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
00064 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void)
00065 {
00066   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex");
00067 }
00068 
00069 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00070 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm,
00071                                                              size_t lwm,
00072                                                              ACE_Notification_Strategy *ns)
00073 {
00074   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open");
00075 
00076   return this->queue_.open (hwm, lwm, ns);
00077 }
00078 
00079 // Clean up the queue if we have not already done so!
00080 
00081 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00082 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void)
00083 {
00084   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close");
00085 
00086   return this->queue_.close ();
00087 }
00088 
00089 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00090 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void)
00091 {
00092   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush");
00093 
00094   return this->queue_.flush ();
00095 }
00096 
00097 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00098 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void)
00099 {
00100   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i");
00101 
00102   return this->queue_.flush_i ();
00103 }
00104 
00105 // Take a look at the first item without removing it.
00106 
00107 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00108 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00109                                                                           ACE_Time_Value *timeout)
00110 {
00111   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head");
00112 
00113   ACE_Message_Block *mb;
00114 
00115   int cur_count = this->queue_.peek_dequeue_head (mb, timeout);
00116 
00117   if (cur_count != -1)
00118     first_item  = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00119 
00120   return cur_count;
00121 }
00122 
00123 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00124 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item,
00125                                                                      ACE_Time_Value *timeout)
00126 {
00127   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head");
00128 
00129   ACE_Message_Block *mb;
00130 
00131   ACE_NEW_RETURN (mb,
00132                   ACE_Message_Block ((char *) new_item,
00133                                      sizeof (*new_item),
00134                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00135                   -1);
00136 
00137   int result = this->queue_.enqueue_head (mb, timeout);
00138   if (result == -1)
00139     // Zap the message.
00140     mb->release ();
00141   return result;
00142 }
00143 
00144 // Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in
00145 // accordance with its <msg_priority> (0 is lowest priority).  Returns
00146 // -1 on failure, else the number of items still on the queue.
00147 
00148 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00149 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item,
00150                                                                 ACE_Time_Value *timeout)
00151 {
00152   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00153 
00154   return this->enqueue_prio (new_item, timeout);
00155 }
00156 
00157 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00158 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item,
00159                                                                      ACE_Time_Value *timeout)
00160 {
00161   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio");
00162 
00163   ACE_Message_Block *mb;
00164 
00165   ACE_NEW_RETURN (mb,
00166                   ACE_Message_Block ((char *) new_item,
00167                                      sizeof (*new_item),
00168                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00169                   -1);
00170 
00171   int result = this->queue_.enqueue_prio (mb, timeout);
00172   if (result == -1)
00173     // Zap the message.
00174     mb->release ();
00175 
00176   return result;
00177 }
00178 
00179 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00180 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item,
00181                                                                          ACE_Time_Value *timeout)
00182 {
00183   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline");
00184 
00185   ACE_Message_Block *mb;
00186 
00187   ACE_NEW_RETURN (mb,
00188                   ACE_Message_Block ((char *) new_item,
00189                                      sizeof (*new_item),
00190                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ),
00191                   -1);
00192 
00193   int result = this->queue_.enqueue_deadline (mb, timeout);
00194   if (result == -1)
00195     // Zap the message.
00196     mb->release ();
00197 
00198   return result;
00199 }
00200 
00201 // Block indefinitely waiting for an item to arrive,
00202 // does not ignore alerts (e.g., signals).
00203 
00204 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00205 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item,
00206                                                                      ACE_Time_Value *timeout)
00207 {
00208   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail");
00209 
00210   ACE_Message_Block *mb;
00211 
00212   ACE_NEW_RETURN (mb,
00213                   ACE_Message_Block ((char *) new_item,
00214                                      sizeof (*new_item),
00215                                      ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY),
00216                   -1);
00217 
00218   int result = this->queue_.enqueue_tail (mb, timeout);
00219   if (result == -1)
00220     // Zap the message.
00221     mb->release ();
00222   return result;
00223 }
00224 
00225 // Remove an item from the front of the queue.  If timeout == 0 block
00226 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00227 // the amount of time specified by timeout.
00228 
00229 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00230 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item,
00231                                                                      ACE_Time_Value *timeout)
00232 {
00233   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head");
00234 
00235   ACE_Message_Block *mb;
00236 
00237   int cur_count = this->queue_.dequeue_head (mb, timeout);
00238 
00239   // Dequeue the message.
00240   if (cur_count != -1)
00241     {
00242       first_item = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00243       // Delete the message block.
00244       mb->release ();
00245       return cur_count;
00246     }
00247   else
00248     return -1;
00249 }
00250 
00251 // Remove the item with the lowest priority from the queue.  If timeout == 0
00252 // block indefinitely (or until an alert occurs).  Otherwise, block for upto
00253 // the amount of time specified by timeout.
00254 
00255 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00256 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued,
00257                                                                      ACE_Time_Value *timeout)
00258 {
00259   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio");
00260 
00261   ACE_Message_Block *mb;
00262 
00263   int cur_count = this->queue_.dequeue_prio (mb, timeout);
00264 
00265   // Dequeue the message.
00266   if (cur_count != -1)
00267     {
00268       dequeued = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00269       // Delete the message block.
00270       mb->release ();
00271       return cur_count;
00272     }
00273   else
00274     return -1;
00275 }
00276 
00277 // Remove an item from the end of the queue.  If timeout == 0 block
00278 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00279 // the amount of time specified by timeout.
00280 
00281 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00282 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued,
00283                                                                      ACE_Time_Value *timeout)
00284 {
00285   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail");
00286 
00287   ACE_Message_Block *mb;
00288 
00289   int cur_count = this->queue_.dequeue_tail (mb, timeout);
00290 
00291   // Dequeue the message.
00292   if (cur_count != -1)
00293     {
00294       dequeued = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00295       // Delete the message block.
00296       mb->release ();
00297       return cur_count;
00298     }
00299   else
00300     return -1;
00301 }
00302 
00303 // Remove an item with the lowest deadline time.  If timeout == 0 block
00304 // indefinitely (or until an alert occurs).  Otherwise, block for upto
00305 // the amount of time specified by timeout.
00306 
00307 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00308 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued,
00309                                                                          ACE_Time_Value *timeout)
00310 {
00311   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline");
00312 
00313   ACE_Message_Block *mb;
00314 
00315   int cur_count = this->queue_.dequeue_deadline (mb, timeout);
00316 
00317   // Dequeue the message.
00318   if (cur_count != -1)
00319     {
00320       dequeued = ACE_reinterpret_cast (ACE_MESSAGE_TYPE *, mb->base ());
00321       // Delete the message block.
00322       mb->release ();
00323       return cur_count;
00324     }
00325   else
00326     return -1;
00327 }
00328 
00329 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int
00330 ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void)
00331 {
00332   ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify");
00333 
00334   return this->queue_.notify ();
00335 }
00336 
00337 template <ACE_SYNCH_DECL>
00338 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00339   : queue_ (q),
00340     curr_ (q.head_)
00341 {
00342 }
00343 
00344 template <ACE_SYNCH_DECL> int
00345 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00346 {
00347   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00348 
00349   if (this->curr_ != 0)
00350     {
00351       entry = this->curr_;
00352       return 1;
00353     }
00354 
00355   return 0;
00356 }
00357 
00358 template <ACE_SYNCH_DECL> int
00359 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
00360 {
00361   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00362 
00363   return this->curr_ == 0;
00364 }
00365 
00366 template <ACE_SYNCH_DECL> int
00367 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
00368 {
00369   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00370 
00371   if (this->curr_)
00372     this->curr_ = this->curr_->next ();
00373   return this->curr_ != 0;
00374 }
00375 
00376 template <ACE_SYNCH_DECL> void
00377 ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
00378 {
00379 }
00380 
00381 ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
00382 
00383 template <ACE_SYNCH_DECL>
00384 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
00385   : queue_ (q),
00386     curr_ (queue_.tail_)
00387 {
00388 }
00389 
00390 template <ACE_SYNCH_DECL> int
00391 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
00392 {
00393   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00394 
00395   if (this->curr_ != 0)
00396     {
00397       entry = this->curr_;
00398       return 1;
00399     }
00400 
00401   return 0;
00402 }
00403 
00404 template <ACE_SYNCH_DECL> int
00405 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
00406 {
00407   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00408 
00409   return this->curr_ == 0;
00410 }
00411 
00412 template <ACE_SYNCH_DECL> int
00413 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
00414 {
00415   ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
00416 
00417   if (this->curr_)
00418     this->curr_ = this->curr_->prev ();
00419   return this->curr_ != 0;
00420 }
00421 
00422 template <ACE_SYNCH_DECL> void
00423 ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
00424 {
00425 }
00426 
00427 template <ACE_SYNCH_DECL> void
00428 ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
00429 {
00430   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
00431   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00432   switch (this->state_)
00433     {
00434     case ACE_Message_Queue_Base::ACTIVATED:
00435       ACE_DEBUG ((LM_DEBUG,
00436                   ACE_LIB_TEXT ("state = ACTIVATED\n")));
00437       break;
00438     case ACE_Message_Queue_Base::DEACTIVATED:
00439       ACE_DEBUG ((LM_DEBUG,
00440                   ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00441       break;
00442     case ACE_Message_Queue_Base::PULSED:
00443       ACE_DEBUG ((LM_DEBUG,
00444                   ACE_LIB_TEXT ("state = PULSED\n")));
00445       break;
00446     }
00447   ACE_DEBUG ((LM_DEBUG,
00448               ACE_LIB_TEXT ("low_water_mark = %d\n")
00449               ACE_LIB_TEXT ("high_water_mark = %d\n")
00450               ACE_LIB_TEXT ("cur_bytes = %d\n")
00451               ACE_LIB_TEXT ("cur_length = %d\n")
00452               ACE_LIB_TEXT ("cur_count = %d\n")
00453               ACE_LIB_TEXT ("head_ = %u\n")
00454               ACE_LIB_TEXT ("tail_ = %u\n"),
00455               this->low_water_mark_,
00456               this->high_water_mark_,
00457               this->cur_bytes_,
00458               this->cur_length_,
00459               this->cur_count_,
00460               this->head_,
00461               this->tail_));
00462   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_full_cond: \n")));
00463   not_full_cond_.dump ();
00464   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_empty_cond: \n")));
00465   not_empty_cond_.dump ();
00466   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00467 }
00468 
00469 template <ACE_SYNCH_DECL> void
00470 ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value)
00471 {
00472   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
00473   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00474 
00475   this->cur_bytes_ = new_value;
00476 }
00477 
00478 template <ACE_SYNCH_DECL> void
00479 ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value)
00480 {
00481   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length");
00482   ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
00483 
00484   this->cur_length_ = new_value;
00485 }
00486 
00487 template <ACE_SYNCH_DECL>
00488 ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
00489                                                      size_t lwm,
00490                                                      ACE_Notification_Strategy *ns)
00491   : not_empty_cond_ (this->lock_),
00492     not_full_cond_ (this->lock_)
00493 {
00494   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
00495 
00496   if (this->open (hwm, lwm, ns) == -1)
00497     ACE_ERROR ((LM_ERROR,
00498                 ACE_LIB_TEXT ("open")));
00499 }
00500 
00501 template <ACE_SYNCH_DECL>
00502 ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
00503 {
00504   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
00505   if (this->head_ != 0 && this->close () == -1)
00506     ACE_ERROR ((LM_ERROR,
00507                 ACE_LIB_TEXT ("close")));
00508 }
00509 
00510 template <ACE_SYNCH_DECL> int
00511 ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void)
00512 {
00513   int number_flushed = 0;
00514 
00515   // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue>
00516   // and <release> their memory.
00517   for (this->tail_ = 0; this->head_ != 0; )
00518     {
00519       number_flushed++;
00520 
00521       size_t mb_bytes = 0;
00522       size_t mb_length = 0;
00523       this->head_->total_size_and_length (mb_bytes,
00524                                           mb_length);
00525       // Subtract off all of the bytes associated with this message.
00526       this->cur_bytes_ -= mb_bytes;
00527       this->cur_length_ -= mb_length;
00528       this->cur_count_--;
00529 
00530       ACE_Message_Block *temp = this->head_;
00531       this->head_ = this->head_->next ();
00532 
00533       // Make sure to use <release> rather than <delete> since this is
00534       // reference counted.
00535       temp->release ();
00536     }
00537 
00538   return number_flushed;
00539 }
00540 
00541 // Don't bother locking since if someone calls this function more than
00542 // once for the same queue, we're in bigger trouble than just
00543 // concurrency control!
00544 
00545 template <ACE_SYNCH_DECL> int
00546 ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
00547                                         size_t lwm,
00548                                         ACE_Notification_Strategy *ns)
00549 {
00550   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
00551   this->high_water_mark_ = hwm;
00552   this->low_water_mark_  = lwm;
00553   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00554   this->cur_bytes_ = 0;
00555   this->cur_length_ = 0;
00556   this->cur_count_ = 0;
00557   this->tail_ = 0;
00558   this->head_ = 0;
00559   this->notification_strategy_ = ns;
00560   return 0;
00561 }
00562 
00563 // Implementation of the public deactivate() method
00564 // (assumes locks are held).
00565 
00566 template <ACE_SYNCH_DECL> int
00567 ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
00568 {
00569   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
00570   int previous_state = this->state_;
00571 
00572   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00573     {
00574       // Wakeup all waiters.
00575       this->not_empty_cond_.broadcast ();
00576       this->not_full_cond_.broadcast ();
00577 
00578       if (pulse)
00579         this->state_ = ACE_Message_Queue_Base::PULSED;
00580       else
00581         this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00582     }
00583   return previous_state;
00584 }
00585 
00586 template <ACE_SYNCH_DECL> int
00587 ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
00588 {
00589   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
00590   int previous_state = this->state_;
00591   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00592   return previous_state;
00593 }
00594 
00595 template <ACE_SYNCH_DECL> int
00596 ACE_Message_Queue<ACE_SYNCH_USE>::flush (void)
00597 {
00598   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush");
00599   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00600 
00601   // Free up the remaining messages on the queue.
00602   return this->flush_i ();
00603 }
00604 
00605 // Clean up the queue if we have not already done so!
00606 
00607 template <ACE_SYNCH_DECL> int
00608 ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
00609 {
00610   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
00611   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00612 
00613   int result = this->deactivate_i ();
00614 
00615   // Free up the remaining messages on the queue.
00616   this->flush_i ();
00617 
00618   return result;
00619 }
00620 
00621 template <ACE_SYNCH_DECL> int
00622 ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
00623 {
00624   if (this->not_full_cond_.signal () != 0)
00625     return -1;
00626   return 0;
00627 }
00628 
00629 template <ACE_SYNCH_DECL> int
00630 ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
00631 {
00632   // Tell any blocked threads that the queue has a new item!
00633   if (this->not_empty_cond_.signal () != 0)
00634     return -1;
00635   return 0;
00636 }
00637 
00638 // Actually put the node at the end (no locking so must be called with
00639 // locks held).
00640 
00641 template <ACE_SYNCH_DECL> int
00642 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
00643 {
00644   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
00645 
00646   if (new_item == 0)
00647     return -1;
00648 
00649   // List was empty, so build a new one.
00650   if (this->tail_ == 0)
00651     {
00652       this->head_ = new_item;
00653       this->tail_ = new_item;
00654       new_item->next (0);
00655       new_item->prev (0);
00656     }
00657   // Link at the end.
00658   else
00659     {
00660       new_item->next (0);
00661       this->tail_->next (new_item);
00662       new_item->prev (this->tail_);
00663       this->tail_ = new_item;
00664     }
00665 
00666   // Make sure to count all the bytes in a composite message!!!
00667   new_item->total_size_and_length (this->cur_bytes_,
00668                                    this->cur_length_);
00669   this->cur_count_++;
00670 
00671   if (this->signal_dequeue_waiters () == -1)
00672     return -1;
00673   else
00674     return this->cur_count_;
00675 }
00676 
00677 // Actually put the node at the head (no locking)
00678 
00679 template <ACE_SYNCH_DECL> int
00680 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
00681 {
00682   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
00683 
00684   if (new_item == 0)
00685     return -1;
00686 
00687   new_item->prev (0);
00688   new_item->next (this->head_);
00689 
00690   if (this->head_ != 0)
00691     this->head_->prev (new_item);
00692   else
00693     this->tail_ = new_item;
00694 
00695   this->head_ = new_item;
00696 
00697   // Make sure to count all the bytes in a composite message!!!
00698   new_item->total_size_and_length (this->cur_bytes_,
00699                                    this->cur_length_);
00700   this->cur_count_++;
00701 
00702   if (this->signal_dequeue_waiters () == -1)
00703     return -1;
00704   else
00705     return this->cur_count_;
00706 }
00707 
00708 // Actually put the node at its proper position relative to its
00709 // priority.
00710 
00711 template <ACE_SYNCH_DECL> int
00712 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
00713 {
00714   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
00715 
00716   if (new_item == 0)
00717     return -1;
00718 
00719   if (this->head_ == 0)
00720     // Check for simple case of an empty queue, where all we need to
00721     // do is insert <new_item> into the head.
00722     return this->enqueue_head_i (new_item);
00723   else
00724     {
00725       ACE_Message_Block *temp;
00726 
00727       // Figure out where the new item goes relative to its priority.
00728       // We start looking from the highest priority to the lowest
00729       // priority.
00730 
00731       for (temp = this->tail_;
00732            temp != 0;
00733            temp = temp->prev ())
00734         if (temp->msg_priority () >= new_item->msg_priority ())
00735           // Break out when we've located an item that has
00736           // greater or equal priority.
00737           break;
00738 
00739       if (temp == 0)
00740         // Check for simple case of inserting at the head of the queue,
00741         // where all we need to do is insert <new_item> before the
00742         // current head.
00743         return this->enqueue_head_i (new_item);
00744       else if (temp->next () == 0)
00745         // Check for simple case of inserting at the tail of the
00746         // queue, where all we need to do is insert <new_item> after
00747         // the current tail.
00748         return this->enqueue_tail_i (new_item);
00749       else
00750         {
00751           // Insert the new message behind the message of
00752           // greater or equal priority.  This ensures that FIFO order is
00753           // maintained when messages of the same priority are
00754           // inserted consecutively.
00755           new_item->prev (temp);
00756           new_item->next (temp->next ());
00757           temp->next ()->prev (new_item);
00758           temp->next (new_item);
00759         }
00760     }
00761 
00762   // Make sure to count all the bytes in a composite message!!!
00763   new_item->total_size_and_length (this->cur_bytes_,
00764                                    this->cur_length_);
00765   this->cur_count_++;
00766 
00767   if (this->signal_dequeue_waiters () == -1)
00768     return -1;
00769   else
00770     return this->cur_count_;
00771 }
00772 
00773 // Actually put the node at its proper position relative to its
00774 // deadline time.
00775 
00776 template <ACE_SYNCH_DECL> int
00777 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item)
00778 {
00779 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
00780   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i");
00781 
00782   if (new_item == 0)
00783     return -1;
00784 
00785   if (this->head_ == 0)
00786     // Check for simple case of an empty queue, where all we need to
00787     // do is insert <new_item> into the head.
00788     return this->enqueue_head_i (new_item);
00789   else
00790     {
00791       ACE_Message_Block *temp;
00792 
00793       // Figure out where the new item goes relative to its priority.
00794       // We start looking from the smallest deadline to the highest
00795       // deadline.
00796 
00797       for (temp = this->head_;
00798            temp != 0;
00799            temp = temp->next ())
00800         if (new_item->msg_deadline_time () < temp->msg_deadline_time ())
00801           // Break out when we've located an item that has
00802           // greater or equal priority.
00803           break;
00804 
00805       if (temp == 0 || temp->next () == 0)
00806         // Check for simple case of inserting at the tail of the queue,
00807         // where all we need to do is insert <new_item> after the
00808         // current tail.
00809         return this->enqueue_tail_i (new_item);
00810       else
00811         {
00812           // Insert the new message behind the message of
00813           // lesser or equal deadline time.  This ensures that FIFO order is
00814           // maintained when messages of the same priority are
00815           // inserted consecutively.
00816           new_item->prev (temp);
00817           new_item->next (temp->next ());
00818           temp->next ()->prev (new_item);
00819           temp->next (new_item);
00820         }
00821     }
00822 
00823   // Make sure to count all the bytes in a composite message!!!
00824   new_item->total_size_and_length (this->cur_bytes_,
00825                                    this->cur_length_);
00826   this->cur_count_++;
00827 
00828   if (this->signal_dequeue_waiters () == -1)
00829     return -1;
00830   else
00831     return this->cur_count_;
00832 #else
00833   return this->enqueue_tail_i (new_item);
00834 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
00835 }
00836 
00837 // Actually get the first ACE_Message_Block (no locking, so must be
00838 // called with locks held).  This method assumes that the queue has at
00839 // least one item in it when it is called.
00840 
00841 template <ACE_SYNCH_DECL> int
00842 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
00843 {
00844   if (this->head_ ==0)
00845     ACE_ERROR_RETURN ((LM_ERROR,
00846                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
00847                       -1);
00848   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
00849   first_item = this->head_;
00850   this->head_ = this->head_->next ();
00851 
00852   if (this->head_ == 0)
00853     this->tail_ = 0;
00854   else
00855     // The prev pointer of the first message block has to point to
00856     // NULL...
00857     this->head_->prev (0);
00858 
00859   size_t mb_bytes = 0;
00860   size_t mb_length = 0;
00861   first_item->total_size_and_length (mb_bytes,
00862                                      mb_length);
00863   // Subtract off all of the bytes associated with this message.
00864   this->cur_bytes_ -= mb_bytes;
00865   this->cur_length_ -= mb_length;
00866   this->cur_count_--;
00867 
00868   if (this->cur_count_ == 0 && this->head_ == this->tail_)
00869     this->head_ = this->tail_ = 0;
00870 
00871   // Only signal enqueueing threads if we've fallen below the low
00872   // water mark.
00873   if (this->cur_bytes_ <= this->low_water_mark_
00874       && this->signal_enqueue_waiters () == -1)
00875     return -1;
00876   else
00877     return this->cur_count_;
00878 }
00879 
00880 // Actually get the ACE_Message_Block with the lowest priority (no locking,
00881 // so must be called with locks held).  This method assumes that the queue
00882 // has at least one item in it when it is called.
00883 
00884 template <ACE_SYNCH_DECL> int
00885 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued)
00886 {
00887   if (this->head_ == 0)
00888     ACE_ERROR_RETURN ((LM_ERROR,
00889                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
00890                       -1);
00891   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i");
00892 
00893   // Find the last message enqueued with the lowest priority
00894   ACE_Message_Block* chosen = 0;
00895   u_long priority = ULONG_MAX;
00896   for (ACE_Message_Block *temp = this->tail_; temp != 0; temp = temp->prev ())
00897     {
00898       if (temp->msg_priority () < priority)
00899         {
00900           priority = temp->msg_priority ();
00901           chosen = temp;
00902         }
00903     }
00904 
00905   // If every message block is the same priority, pass back the first one
00906   if (chosen == 0)
00907     {
00908       chosen = this->head_;
00909     }
00910 
00911   // Patch up the queue.  If we don't have a previous
00912   // then we are at the head of the queue.
00913   if (chosen->prev () == 0)
00914     {
00915       this->head_ = chosen->next ();
00916     }
00917   else
00918     {
00919       chosen->prev ()->next (chosen->next ());
00920     }
00921 
00922   if (chosen->next () == 0)
00923     {
00924       this->tail_ = chosen->prev ();
00925     }
00926   else
00927     {
00928       chosen->next ()->prev (chosen->prev ());
00929     }
00930 
00931   // Pass back the chosen block
00932   dequeued = chosen;
00933 
00934   size_t mb_bytes = 0;
00935   size_t mb_length = 0;
00936   dequeued->total_size_and_length (mb_bytes,
00937                                    mb_length);
00938   // Subtract off all of the bytes associated with this message.
00939   this->cur_bytes_ -= mb_bytes;
00940   this->cur_length_ -= mb_length;
00941   this->cur_count_--;
00942 
00943   if (this->cur_count_ == 0 && this->head_ == this->tail_)
00944     this->head_ = this->tail_ = 0;
00945 
00946   // Only signal enqueueing threads if we've fallen below the low
00947   // water mark.
00948   if (this->cur_bytes_ <= this->low_water_mark_
00949       && this->signal_enqueue_waiters () == -1)
00950     return -1;
00951   else
00952     return this->cur_count_;
00953 }
00954 
00955 // Actually get the last ACE_Message_Block (no locking, so must be
00956 // called with locks held).  This method assumes that the queue has at
00957 // least one item in it when it is called.
00958 
00959 template <ACE_SYNCH_DECL> int
00960 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued)
00961 {
00962   if (this->head_ == 0)
00963     ACE_ERROR_RETURN ((LM_ERROR,
00964                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
00965                       -1);
00966   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i");
00967   dequeued = this->tail_;
00968   if (this->tail_->prev () == 0)
00969     {
00970       this->head_ = 0;
00971       this->tail_ = 0;
00972     }
00973   else
00974     {
00975       this->tail_->prev ()->next (0);
00976       this->tail_ = this->tail_->prev ();
00977     }
00978 
00979   size_t mb_bytes = 0;
00980   size_t mb_length = 0;
00981   dequeued->total_size_and_length (mb_bytes,
00982                                    mb_length);
00983   // Subtract off all of the bytes associated with this message.
00984   this->cur_bytes_ -= mb_bytes;
00985   this->cur_length_ -= mb_length;
00986   this->cur_count_--;
00987 
00988   if (this->cur_count_ == 0 && this->head_ == this->tail_)
00989     this->head_ = this->tail_ = 0;
00990 
00991   // Only signal enqueueing threads if we've fallen below the low
00992   // water mark.
00993   if (this->cur_bytes_ <= this->low_water_mark_
00994       && this->signal_enqueue_waiters () == -1)
00995     return -1;
00996   else
00997     return this->cur_count_;
00998 }
00999 
01000 // Actually get the ACE_Message_Block with the lowest deadline time
01001 // (no locking, so must be called with locks held).  This method assumes
01002 // that the queue has at least one item in it when it is called.
01003 
01004 template <ACE_SYNCH_DECL> int
01005 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued)
01006 {
01007 #if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
01008   if (this->head_ == 0)
01009     ACE_ERROR_RETURN ((LM_ERROR,
01010                        ACE_LIB_TEXT ("Attempting to dequeue from empty queue")),
01011                       -1);
01012   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i");
01013 
01014   // Find the last message enqueued with the lowest deadline time
01015   ACE_Message_Block* chosen = 0;
01016   ACE_Time_Value deadline = ACE_Time_Value::max_time;
01017   for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ())
01018     {
01019       if (temp->msg_deadline_time () < deadline)
01020         {
01021           deadline = temp->msg_deadline_time ();
01022           chosen = temp;
01023         }
01024     }
01025 
01026   // If every message block is the same deadline time,
01027   // pass back the first one
01028   if (chosen == 0)
01029     {
01030       chosen = this->head_;
01031     }
01032 
01033   // Patch up the queue.  If we don't have a previous
01034   // then we are at the head of the queue.
01035   if (chosen->prev () == 0)
01036     {
01037       this->head_ = chosen->next ();
01038     }
01039   else
01040     {
01041       chosen->prev ()->next (chosen->next ());
01042     }
01043 
01044   if (chosen->next () == 0)
01045     {
01046       this->tail_ = chosen->prev ();
01047     }
01048   else
01049     {
01050       chosen->next ()->prev (chosen->prev ());
01051     }
01052 
01053   // Pass back the chosen block
01054   dequeued = chosen;
01055 
01056   size_t mb_bytes = 0;
01057   size_t mb_length = 0;
01058   dequeued->total_size_and_length (mb_bytes,
01059                                    mb_length);
01060   // Subtract off all of the bytes associated with this message.
01061   this->cur_bytes_ -= mb_bytes;
01062   this->cur_length_ -= mb_length;
01063   this->cur_count_--;
01064 
01065   if (this->cur_count_ == 0 && this->head_ == this->tail_)
01066     this->head_ = this->tail_ = 0;
01067 
01068   // Only signal enqueueing threads if we've fallen below the low
01069   // water mark.
01070   if (this->cur_bytes_ <= this->low_water_mark_
01071       && this->signal_enqueue_waiters () == -1)
01072     return -1;
01073   else
01074     return this->cur_count_;
01075 #else
01076   return this->dequeue_head_i (dequeued);
01077 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
01078 }
01079 
01080 // Take a look at the first item without removing it.
01081 
01082 template <ACE_SYNCH_DECL> int
01083 ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
01084                                                      ACE_Time_Value *timeout)
01085 {
01086   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
01087   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01088 
01089   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01090     {
01091       errno = ESHUTDOWN;
01092       return -1;
01093     }
01094 
01095   // Wait for at least one item to become available.
01096 
01097   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01098     return -1;
01099 
01100   first_item = this->head_;
01101   return this->cur_count_;
01102 }
01103 
01104 template <ACE_SYNCH_DECL> int
01105 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond
01106     (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01107 {
01108   int result = 0;
01109 
01110   // Wait while the queue is full.
01111 
01112   while (this->is_full_i ())
01113     {
01114       if (this->not_full_cond_.wait (timeout) == -1)
01115         {
01116           if (errno == ETIME)
01117             errno = EWOULDBLOCK;
01118           result = -1;
01119           break;
01120         }
01121       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01122         {
01123           errno = ESHUTDOWN;
01124           result = -1;
01125           break;
01126         }
01127     }
01128   return result;
01129 }
01130 
01131 template <ACE_SYNCH_DECL> int
01132 ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
01133     (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
01134 {
01135   int result = 0;
01136 
01137   // Wait while the queue is empty.
01138 
01139   while (this->is_empty_i ())
01140     {
01141       if (this->not_empty_cond_.wait (timeout) == -1)
01142         {
01143           if (errno == ETIME)
01144             errno = EWOULDBLOCK;
01145           result = -1;
01146           break;
01147         }
01148       if (this->state_ != ACE_Message_Queue_Base::ACTIVATED)
01149         {
01150           errno = ESHUTDOWN;
01151           result = -1;
01152           break;
01153         }
01154     }
01155   return result;
01156 }
01157 
01158 // Block indefinitely waiting for an item to arrive, does not ignore
01159 // alerts (e.g., signals).
01160 
01161 template <ACE_SYNCH_DECL> int
01162 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
01163                                                 ACE_Time_Value *timeout)
01164 {
01165   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
01166   int queue_count = 0;
01167   {
01168     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01169 
01170     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01171       {
01172         errno = ESHUTDOWN;
01173         return -1;
01174       }
01175 
01176     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01177       return -1;
01178 
01179     queue_count = this->enqueue_head_i (new_item);
01180 
01181     if (queue_count == -1)
01182       return -1;
01183 
01184     this->notify ();
01185   }
01186   return queue_count;
01187 }
01188 
01189 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01190 // accordance with its <msg_priority> (0 is lowest priority).  Returns
01191 // -1 on failure, else the number of items still on the queue.
01192 
01193 template <ACE_SYNCH_DECL> int
01194 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
01195                                                 ACE_Time_Value *timeout)
01196 {
01197   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
01198   int queue_count = 0;
01199   {
01200     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01201 
01202     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01203       {
01204         errno = ESHUTDOWN;
01205         return -1;
01206       }
01207 
01208     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01209       return -1;
01210 
01211     queue_count = this->enqueue_i (new_item);
01212 
01213     if (queue_count == -1)
01214       return -1;
01215 
01216     this->notify ();
01217   }
01218   return queue_count;
01219 }
01220 
01221 // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
01222 // accordance with its <msg_deadline_time>.  Returns
01223 // -1 on failure, else the number of items still on the queue.
01224 
01225 template <ACE_SYNCH_DECL> int
01226 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item,
01227                                                     ACE_Time_Value *timeout)
01228 {
01229   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline");
01230   int queue_count = 0;
01231   {
01232     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01233 
01234     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01235       {
01236         errno = ESHUTDOWN;
01237         return -1;
01238       }
01239 
01240     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01241       return -1;
01242 
01243     queue_count = this->enqueue_deadline_i (new_item);
01244 
01245     if (queue_count == -1)
01246       return -1;
01247 
01248     this->notify ();
01249   }
01250   return queue_count;
01251 }
01252 
01253 template <ACE_SYNCH_DECL> int
01254 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
01255                                            ACE_Time_Value *timeout)
01256 {
01257   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
01258   return this->enqueue_prio (new_item, timeout);
01259 }
01260 
01261 // Block indefinitely waiting for an item to arrive,
01262 // does not ignore alerts (e.g., signals).
01263 
01264 template <ACE_SYNCH_DECL> int
01265 ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
01266                                               ACE_Time_Value *timeout)
01267 {
01268   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
01269   int queue_count = 0;
01270   {
01271     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01272 
01273     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01274       {
01275         errno = ESHUTDOWN;
01276         return -1;
01277       }
01278 
01279     if (this->wait_not_full_cond (ace_mon, timeout) == -1)
01280       return -1;
01281 
01282     queue_count = this->enqueue_tail_i (new_item);
01283 
01284     if (queue_count == -1)
01285       return -1;
01286 
01287     this->notify ();
01288   }
01289   return queue_count;
01290 }
01291 
01292 // Remove an item from the front of the queue.  If timeout == 0 block
01293 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01294 // the amount of time specified by timeout.
01295 
01296 template <ACE_SYNCH_DECL> int
01297 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01298                                                 ACE_Time_Value *timeout)
01299 {
01300   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01301   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01302 
01303   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01304     {
01305       errno = ESHUTDOWN;
01306       return -1;
01307     }
01308 
01309   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01310     return -1;
01311 
01312   return this->dequeue_head_i (first_item);
01313 }
01314 
01315 // Remove item with the lowest priority from the queue.  If timeout == 0 block
01316 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01317 // the amount of time specified by timeout.
01318 
01319 template <ACE_SYNCH_DECL> int
01320 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued,
01321                                                 ACE_Time_Value *timeout)
01322 {
01323   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio");
01324   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01325 
01326   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01327     {
01328       errno = ESHUTDOWN;
01329       return -1;
01330     }
01331 
01332   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01333     return -1;
01334 
01335   return this->dequeue_prio_i (dequeued);
01336 }
01337 
01338 // Remove an item from the end of the queue.  If timeout == 0 block
01339 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01340 // the amount of time specified by timeout.
01341 
01342 template <ACE_SYNCH_DECL> int
01343 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued,
01344                                                 ACE_Time_Value *timeout)
01345 {
01346   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail");
01347   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01348 
01349   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01350     {
01351       errno = ESHUTDOWN;
01352       return -1;
01353     }
01354 
01355   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01356     return -1;
01357 
01358   return this->dequeue_tail_i (dequeued);
01359 }
01360 
01361 // Remove an item with the lowest deadline time.  If timeout == 0 block
01362 // indefinitely (or until an alert occurs).  Otherwise, block for upto
01363 // the amount of time specified by timeout.
01364 
01365 template <ACE_SYNCH_DECL> int
01366 ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued,
01367                                                     ACE_Time_Value *timeout)
01368 {
01369   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline");
01370   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01371 
01372   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01373     {
01374       errno = ESHUTDOWN;
01375       return -1;
01376     }
01377 
01378   if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
01379     return -1;
01380 
01381   return this->dequeue_deadline_i (dequeued);
01382 }
01383 
01384 template <ACE_SYNCH_DECL> int
01385 ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
01386 {
01387   ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
01388 
01389   // By default, don't do anything.
01390   if (this->notification_strategy_ == 0)
01391     return 0;
01392   else
01393     return this->notification_strategy_->notify ();
01394 }
01395 
01396 
01397 // = Initialization and termination methods.
01398 template <ACE_SYNCH_DECL>
01399 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
01400                                                                      size_t hwm,
01401                                                                      size_t lwm,
01402                                                                      ACE_Notification_Strategy *ns)
01403   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01404     pending_head_ (0),
01405     pending_tail_ (0),
01406     late_head_ (0),
01407     late_tail_ (0),
01408     beyond_late_head_ (0),
01409     beyond_late_tail_ (0),
01410     message_strategy_ (message_strategy)
01411 {
01412   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01413   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01414   // it in its own dtor
01415 }
01416 
01417 // dtor: free message strategy and let base class dtor do the rest.
01418 
01419 template <ACE_SYNCH_DECL>
01420 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
01421 {
01422   delete &this->message_strategy_;
01423 }
01424 
01425 template <ACE_SYNCH_DECL> int
01426 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
01427                                                            ACE_Message_Block *&list_tail,
01428                                                            u_int status_flags)
01429 {
01430   // start with an empty list
01431   list_head = 0;
01432   list_tail = 0;
01433 
01434   // Get the current time
01435   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01436 
01437   // Refresh priority status boundaries in the queue.
01438   int result = this->refresh_queue (current_time);
01439   if (result < 0)
01440     return result;
01441 
01442   if (ACE_BIT_ENABLED (status_flags,
01443                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01444       && this->pending_head_
01445       && this->pending_tail_)
01446     {
01447       // patch up pointers for the new tail of the queue
01448       if (this->pending_head_->prev ())
01449         {
01450           this->tail_ = this->pending_head_->prev ();
01451           this->pending_head_->prev ()->next (0);
01452         }
01453       else
01454         {
01455           // the list has become empty
01456           this->head_ = 0;
01457           this->tail_ = 0;
01458         }
01459 
01460       // point to the head and tail of the list
01461       list_head = this->pending_head_;
01462       list_tail = this->pending_tail_;
01463 
01464       // cut the pending messages out of the queue entirely
01465       this->pending_head_->prev (0);
01466       this->pending_head_ = 0;
01467       this->pending_tail_ = 0;
01468     }
01469 
01470   if (ACE_BIT_ENABLED (status_flags,
01471                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01472       && this->late_head_
01473       && this->late_tail_)
01474     {
01475       // Patch up pointers for the (possibly) new head and tail of the
01476       // queue.
01477       if (this->late_tail_->next ())
01478         this->late_tail_->next ()->prev (this->late_head_->prev ());
01479       else
01480         this->tail_ = this->late_head_->prev ();
01481 
01482       if (this->late_head_->prev ())
01483         this->late_head_->prev ()->next (this->late_tail_->next ());
01484       else
01485         this->head_ = this->late_tail_->next ();
01486 
01487       // put late messages behind pending messages (if any) being returned
01488       this->late_head_->prev (list_tail);
01489       if (list_tail)
01490         list_tail->next (this->late_head_);
01491       else
01492         list_head = this->late_head_;
01493 
01494       list_tail = this->late_tail_;
01495 
01496       this->late_tail_->next (0);
01497       this->late_head_ = 0;
01498       this->late_tail_ = 0;
01499     }
01500 
01501   if (ACE_BIT_ENABLED (status_flags,
01502       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01503       && this->beyond_late_head_
01504       && this->beyond_late_tail_)
01505     {
01506       // Patch up pointers for the new tail of the queue
01507       if (this->beyond_late_tail_->next ())
01508         {
01509           this->head_ = this->beyond_late_tail_->next ();
01510           this->beyond_late_tail_->next ()->prev (0);
01511         }
01512       else
01513         {
01514           // the list has become empty
01515           this->head_ = 0;
01516           this->tail_ = 0;
01517         }
01518 
01519       // Put beyond late messages at the end of the list being
01520       // returned.
01521       if (list_tail)
01522         {
01523           this->beyond_late_head_->prev (list_tail);
01524           list_tail->next (this->beyond_late_head_);
01525         }
01526       else
01527         list_head = this->beyond_late_head_;
01528 
01529       list_tail = this->beyond_late_tail_;
01530 
01531       this->beyond_late_tail_->next (0);
01532       this->beyond_late_head_ = 0;
01533       this->beyond_late_tail_ = 0;
01534     }
01535 
01536   // Decrement message and size counts for removed messages.
01537   ACE_Message_Block *temp1;
01538 
01539   for (temp1 = list_head;
01540        temp1 != 0;
01541        temp1 = temp1->next ())
01542     {
01543       this->cur_count_--;
01544 
01545       size_t mb_bytes = 0;
01546       size_t mb_length = 0;
01547       temp1->total_size_and_length (mb_bytes,
01548                                     mb_length);
01549       // Subtract off all of the bytes associated with this message.
01550       this->cur_bytes_ -= mb_bytes;
01551       this->cur_length_ -= mb_length;
01552     }
01553 
01554   return result;
01555 }
01556 
01557 // Detach all messages with status given in the passed flags from the
01558 // queue and return them by setting passed head and tail pointers to
01559 // the linked list they comprise.  This method is intended primarily
01560 // as a means of periodically harvesting messages that have missed
01561 // their deadlines, but is available in its most general form.  All
01562 // messages are returned in priority order, from head to tail, as of
01563 // the time this method was called.
01564 
01565 template <ACE_SYNCH_DECL> int
01566 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
01567                                                         ACE_Time_Value *timeout)
01568 {
01569   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01570 
01571   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01572 
01573   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01574     {
01575       errno = ESHUTDOWN;
01576       return -1;
01577     }
01578 
01579   int result;
01580 
01581   // get the current time
01582   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01583 
01584   // refresh priority status boundaries in the queue
01585   result = this->refresh_queue (current_time);
01586   if (result < 0)
01587     return result;
01588 
01589   // *now* it's appropriate to wait for an enqueued item
01590   result = this->wait_not_empty_cond (ace_mon, timeout);
01591   if (result == -1)
01592     return result;
01593 
01594   // call the internal dequeue method, which selects an item from the
01595   // highest priority status portion of the queue that has messages
01596   // enqueued.
01597   result = this->dequeue_head_i (first_item);
01598 
01599   return result;
01600 }
01601 
01602 // Dequeue and return the <ACE_Message_Block *> at the (logical) head
01603 // of the queue.
01604 
01605 template <ACE_SYNCH_DECL> void
01606 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
01607 {
01608   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01609   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01610 
01611   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01612   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01613 
01614   ACE_DEBUG ((LM_DEBUG,
01615               ACE_LIB_TEXT ("pending_head_ = %u\n")
01616               ACE_LIB_TEXT ("pending_tail_ = %u\n")
01617               ACE_LIB_TEXT ("late_head_ = %u\n")
01618               ACE_LIB_TEXT ("late_tail_ = %u\n")
01619               ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01620               ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01621               this->pending_head_,
01622               this->pending_tail_,
01623               this->late_head_,
01624               this->late_tail_,
01625               this->beyond_late_head_,
01626               this->beyond_late_tail_));
01627 
01628   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01629   message_strategy_.dump ();
01630 
01631   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01632 }
01633   // dump the state of the queue
01634 
01635 template <ACE_SYNCH_DECL> int
01636 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
01637 {
01638   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01639 
01640   if (new_item == 0)
01641     return -1;
01642 
01643   int result = 0;
01644 
01645   // Get the current time.
01646   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01647 
01648   // Refresh priority status boundaries in the queue.
01649 
01650   result = this->refresh_queue (current_time);
01651   if (result < 0)
01652     return result;
01653 
01654   // Where we enqueue depends on the message's priority status.
01655   switch (message_strategy_.priority_status (*new_item,
01656                                              current_time))
01657     {
01658     case ACE_Dynamic_Message_Strategy::PENDING:
01659       if (this->pending_tail_ == 0)
01660         {
01661           // Check for simple case of an empty pending queue, where
01662           // all we need to do is insert <new_item> into the tail of
01663           // the queue.
01664           pending_head_ = new_item;
01665           pending_tail_ = pending_head_;
01666           return this->enqueue_tail_i (new_item);
01667         }
01668       else
01669         {
01670           // Enqueue the new message in priority order in the pending
01671           // sublist
01672           result = sublist_enqueue_i (new_item,
01673                                       current_time,
01674                                       this->pending_head_,
01675                                       this->pending_tail_,
01676                                       ACE_Dynamic_Message_Strategy::PENDING);
01677         }
01678       break;
01679 
01680     case ACE_Dynamic_Message_Strategy::LATE:
01681       if (this->late_tail_ == 0)
01682         {
01683           late_head_ = new_item;
01684           late_tail_ = late_head_;
01685 
01686           if (this->pending_head_ == 0)
01687             // Check for simple case of an empty pending queue,
01688             // where all we need to do is insert <new_item> into the
01689             // tail of the queue.
01690             return this->enqueue_tail_i (new_item);
01691           else if (this->beyond_late_tail_ == 0)
01692             // Check for simple case of an empty beyond late queue, where all
01693             // we need to do is insert <new_item> into the head of the queue.
01694             return this->enqueue_head_i (new_item);
01695           else
01696             {
01697               // Otherwise, we can just splice the new message in
01698               // between the pending and beyond late portions of the
01699               // queue.
01700               this->beyond_late_tail_->next (new_item);
01701               new_item->prev (this->beyond_late_tail_);
01702               this->pending_head_->prev (new_item);
01703               new_item->next (this->pending_head_);
01704             }
01705         }
01706       else
01707         {
01708           // Enqueue the new message in priority order in the late
01709           // sublist
01710           result = sublist_enqueue_i (new_item,
01711                                       current_time,
01712                                       this->late_head_,
01713                                       this->late_tail_,
01714                                       ACE_Dynamic_Message_Strategy::LATE);
01715         }
01716       break;
01717 
01718     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01719       if (this->beyond_late_tail_ == 0)
01720         {
01721           // Check for simple case of an empty beyond late queue,
01722           // where all we need to do is insert <new_item> into the
01723           // head of the queue.
01724           beyond_late_head_ = new_item;
01725           beyond_late_tail_ = beyond_late_head_;
01726           return this->enqueue_head_i (new_item);
01727         }
01728       else
01729         {
01730           // all beyond late messages have the same (zero) priority,
01731           // so just put the new one at the end of the beyond late
01732           // messages
01733           if (this->beyond_late_tail_->next ())
01734             this->beyond_late_tail_->next ()->prev (new_item);
01735           else
01736             this->tail_ = new_item;
01737 
01738           new_item->next (this->beyond_late_tail_->next ());
01739           this->beyond_late_tail_->next (new_item);
01740           new_item->prev (this->beyond_late_tail_);
01741           this->beyond_late_tail_ = new_item;
01742         }
01743 
01744       break;
01745 
01746       // should never get here, but just in case...
01747     default:
01748       result = -1;
01749       break;
01750     }
01751 
01752   if (result < 0)
01753     return result;
01754 
01755   size_t mb_bytes = 0;
01756   size_t mb_length = 0;
01757   new_item->total_size_and_length (mb_bytes,
01758                                    mb_length);
01759   // Subtract off all of the bytes associated with this message.
01760   this->cur_bytes_ -= mb_bytes;
01761   this->cur_length_ -= mb_length;
01762   this->cur_count_++;
01763 
01764   if (this->signal_dequeue_waiters () == -1)
01765     return -1;
01766   else
01767     return this->cur_count_;
01768 }
01769 
01770 // Enqueue an <ACE_Message_Block *> in accordance with its priority.
01771 // priority may be *dynamic* or *static* or a combination or *both* It
01772 // calls the priority evaluation function passed into the Dynamic
01773 // Message Queue constructor to update the priorities of all enqueued
01774 // messages.
01775 
01776 template <ACE_SYNCH_DECL> int
01777 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
01778                                                              const ACE_Time_Value &current_time,
01779                                                              ACE_Message_Block *&sublist_head,
01780                                                              ACE_Message_Block *&sublist_tail,
01781                                                              ACE_Dynamic_Message_Strategy::Priority_Status status)
01782 {
01783   int result = 0;
01784   ACE_Message_Block *current_item = 0;
01785 
01786   // Find message after which to enqueue new item, based on message
01787   // priority and priority status.
01788   for (current_item = sublist_tail;
01789        current_item;
01790        current_item = current_item->prev ())
01791     {
01792       if (message_strategy_.priority_status (*current_item, current_time) == status)
01793         {
01794           if (current_item->msg_priority () >= new_item->msg_priority ())
01795             break;
01796         }
01797       else
01798         {
01799           sublist_head = new_item;
01800           break;
01801         }
01802     }
01803 
01804   if (current_item == 0)
01805     {
01806       // If the new message has highest priority of any, put it at the
01807       // head of the list (and sublist).
01808       new_item->prev (0);
01809       new_item->next (this->head_);
01810       if (this->head_ != 0)
01811         this->head_->prev (new_item);
01812       else
01813         {
01814           this->tail_ = new_item;
01815           sublist_tail = new_item;
01816         }
01817       this->head_ = new_item;
01818       sublist_head = new_item;
01819     }
01820   else
01821     {
01822       // insert the new item into the list
01823       new_item->next (current_item->next ());
01824       new_item->prev (current_item);
01825 
01826       if (current_item->next ())
01827         current_item->next ()->prev (new_item);
01828       else
01829         this->tail_ = new_item;
01830 
01831       current_item->next (new_item);
01832 
01833       // If the new item has lowest priority of any in the sublist,
01834       // move the tail pointer of the sublist back to the new item
01835       if (current_item == sublist_tail)
01836         sublist_tail = new_item;
01837     }
01838 
01839   return result;
01840 }
01841 
01842 // Enqueue a message in priority order within a given priority status
01843 // sublist.
01844 
01845 template <ACE_SYNCH_DECL> int
01846 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
01847 {
01848   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01849 
01850   int result = 0;
01851   int last_in_subqueue = 0;
01852 
01853   // first, try to dequeue from the head of the pending list
01854   if (this->pending_head_)
01855     {
01856       first_item = this->pending_head_;
01857 
01858       if (0 == this->pending_head_->prev ())
01859         this->head_ = this->pending_head_->next ();
01860       else
01861         this->pending_head_->prev ()->next (this->pending_head_->next ());
01862 
01863       if (0 == this->pending_head_->next ())
01864         {
01865           this->tail_ = this->pending_head_->prev ();
01866           this->pending_head_ = 0;
01867           this->pending_tail_ = 0;
01868         }
01869       else
01870         {
01871           this->pending_head_->next ()->prev (this->pending_head_->prev ());
01872           this->pending_head_ = this->pending_head_->next ();
01873         }
01874 
01875       first_item->prev (0);
01876       first_item->next (0);
01877     }
01878 
01879   // Second, try to dequeue from the head of the late list
01880   else if (this->late_head_)
01881     {
01882       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
01883 
01884       first_item = this->late_head_;
01885 
01886       if (0 == this->late_head_->prev ())
01887         this->head_ = this->late_head_->next ();
01888       else
01889         this->late_head_->prev ()->next (this->late_head_->next ());
01890 
01891       if (0 == this->late_head_->next ())
01892         this->tail_ = this->late_head_->prev ();
01893       else
01894         {
01895           this->late_head_->next ()->prev (this->late_head_->prev ());
01896           this->late_head_ = this->late_head_->next ();
01897         }
01898 
01899       if (last_in_subqueue)
01900         {
01901           this->late_head_ = 0;
01902           this->late_tail_ = 0;
01903         }
01904 
01905       first_item->prev (0);
01906       first_item->next (0);
01907     }
01908   // finally, try to dequeue from the head of the beyond late list
01909   else if (this->beyond_late_head_)
01910     {
01911       last_in_subqueue =
01912         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
01913 
01914       first_item = this->beyond_late_head_;
01915       this->head_ = this->beyond_late_head_->next ();
01916 
01917       if (0 == this->beyond_late_head_->next ())
01918         this->tail_ = this->beyond_late_head_->prev ();
01919       else
01920         {
01921           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
01922           this->beyond_late_head_ = this->beyond_late_head_->next ();
01923         }
01924 
01925       if (last_in_subqueue)
01926         {
01927           this->beyond_late_head_ = 0;
01928           this->beyond_late_tail_ = 0;
01929         }
01930 
01931       first_item->prev (0);
01932       first_item->next (0);
01933     }
01934   else
01935     {
01936       // nothing to dequeue: set the pointer to zero and return an error code
01937       first_item = 0;
01938       result = -1;
01939     }
01940 
01941   if (result < 0)
01942     return result;
01943 
01944   size_t mb_bytes = 0;
01945   size_t mb_length = 0;
01946   first_item->total_size_and_length (mb_bytes,
01947                                      mb_length);
01948   // Subtract off all of the bytes associated with this message.
01949   this->cur_bytes_ -= mb_bytes;
01950   this->cur_length_ -= mb_length;
01951   this->cur_count_--;
01952 
01953   // Only signal enqueueing threads if we've fallen below the low
01954   // water mark.
01955   if (this->cur_bytes_ <= this->low_water_mark_
01956       && this->signal_enqueue_waiters () == -1)
01957     return -1;
01958   else
01959     return this->cur_count_;
01960 }
01961 
01962 // Dequeue and return the <ACE_Message_Block *> at the head of the
01963 // logical queue.  Attempts first to dequeue from the pending portion
01964 // of the queue, or if that is empty from the late portion, or if that
01965 // is empty from the beyond late portion, or if that is empty just
01966 // sets the passed pointer to zero and returns -1.
01967 
01968 template <ACE_SYNCH_DECL> int
01969 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value &current_time)
01970 {
01971   int result;
01972 
01973   result = refresh_pending_queue (current_time);
01974 
01975   if (result != -1)
01976     result = refresh_late_queue (current_time);
01977 
01978   return result;
01979 }
01980 
01981 // Refresh the queue using the strategy specific priority status
01982 // function.
01983 
01984 template <ACE_SYNCH_DECL> int
01985 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value &current_time)
01986 {
01987   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
01988 
01989   // refresh priority status boundaries in the queue
01990   if (this->pending_head_)
01991     {
01992       current_status = message_strategy_.priority_status (*this->pending_head_,
01993                                                           current_time);
01994       switch (current_status)
01995         {
01996         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01997           // Make sure the head of the beyond late queue is set (there
01998           // may not have been any beyond late messages previously)
01999           this->beyond_late_head_ = this->head_;
02000 
02001           // Zero out the late queue pointers, and set them only if
02002           // there turn out to be late messages in the pending sublist
02003           this->late_head_ = 0;
02004           this->late_tail_ = 0;
02005 
02006           // Advance through the beyond late messages in the pending queue
02007           do
02008             {
02009               this->pending_head_ = this->pending_head_->next ();
02010 
02011               if (this->pending_head_)
02012                 current_status = message_strategy_.priority_status (*this->pending_head_,
02013                                                                     current_time);
02014               else
02015                 break;  // do while
02016 
02017             }
02018           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02019 
02020           if (this->pending_head_)
02021             {
02022               // point tail of beyond late sublist to previous item
02023               this->beyond_late_tail_ = this->pending_head_->prev ();
02024 
02025               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02026                 // there are no late messages left in the queue
02027                 break; // switch
02028               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02029                 {
02030                   // if we got here, something is *seriously* wrong with the queue
02031                   ACE_ERROR_RETURN ((LM_ERROR,
02032                                      ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02033                                      (int) current_status),
02034                                     -1);
02035                 }
02036               /* FALLTHRU */
02037             }
02038           else
02039             {
02040               // There are no pending or late messages left in the
02041               // queue.
02042               this->beyond_late_tail_ = this->tail_;
02043               this->pending_head_ = 0;
02044               this->pending_tail_ = 0;
02045               break; // switch
02046             }
02047 
02048         case ACE_Dynamic_Message_Strategy::LATE:
02049           // Make sure the head of the late queue is set (there may
02050           // not have been any late messages previously, or they may
02051           // have all become beyond late).
02052           if (this->late_head_ == 0)
02053             this->late_head_ = this->pending_head_;
02054 
02055           // advance through the beyond late messages in the pending queue
02056           do
02057             {
02058               this->pending_head_ = this->pending_head_->next ();
02059 
02060               if (this->pending_head_)
02061                 current_status = message_strategy_.priority_status (*this->pending_head_,
02062                                                                     current_time);
02063               else
02064                 break;  // do while
02065 
02066             }
02067           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02068 
02069           if (this->pending_head_)
02070             {
02071               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02072                 // if we got here, something is *seriously* wrong with the queue
02073                 ACE_ERROR_RETURN((LM_ERROR,
02074                                   ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02075                                   (int) current_status),
02076                                  -1);
02077 
02078               // Point tail of late sublist to previous item
02079               this->late_tail_ = this->pending_head_->prev ();
02080             }
02081           else
02082             {
02083               // there are no pending messages left in the queue
02084               this->late_tail_ = this->tail_;
02085               this->pending_head_ = 0;
02086               this->pending_tail_ = 0;
02087             }
02088 
02089           break; // switch
02090         case ACE_Dynamic_Message_Strategy::PENDING:
02091           // do nothing - the pending queue is unchanged
02092           break; // switch
02093         default:
02094           // if we got here, something is *seriously* wrong with the queue
02095           ACE_ERROR_RETURN((LM_ERROR,
02096                             ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02097                             (int) current_status),
02098                            -1);
02099         }
02100     }
02101   return 0;
02102 }
02103 
02104 // Refresh the pending queue using the strategy specific priority
02105 // status function.
02106 
02107 template <ACE_SYNCH_DECL> int
02108 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value &current_time)
02109 {
02110   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02111 
02112   if (this->late_head_)
02113     {
02114       current_status = message_strategy_.priority_status (*this->late_head_,
02115                                                           current_time);
02116       switch (current_status)
02117         {
02118         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02119 
02120           // make sure the head of the beyond late queue is set
02121           // (there may not have been any beyond late messages previously)
02122           this->beyond_late_head_ = this->head_;
02123 
02124           // advance through the beyond late messages in the late queue
02125           do
02126             {
02127               this->late_head_ = this->late_head_->next ();
02128 
02129               if (this->late_head_)
02130                 current_status = message_strategy_.priority_status (*this->late_head_,
02131                                                                     current_time);
02132               else
02133                 break;  // do while
02134 
02135             }
02136           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02137 
02138           if (this->late_head_)
02139             {
02140               // point tail of beyond late sublist to previous item
02141               this->beyond_late_tail_ = this->late_head_->prev ();
02142 
02143               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02144                 {
02145                   // there are no late messages left in the queue
02146                   this->late_head_ = 0;
02147                   this->late_tail_ = 0;
02148                 }
02149               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02150                 // if we got here, something is *seriously* wrong with the queue
02151                 ACE_ERROR_RETURN ((LM_ERROR,
02152                                    ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02153                                    (int) current_status),
02154                                   -1);
02155             }
02156           else
02157             {
02158               // there are no late messages left in the queue
02159               this->beyond_late_tail_ = this->tail_;
02160               this->late_head_ = 0;
02161               this->late_tail_ = 0;
02162             }
02163 
02164           break;  // switch
02165 
02166         case ACE_Dynamic_Message_Strategy::LATE:
02167           // do nothing - the late queue is unchanged
02168           break; // switch
02169 
02170         case ACE_Dynamic_Message_Strategy::PENDING:
02171           // if we got here, something is *seriously* wrong with the queue
02172           ACE_ERROR_RETURN ((LM_ERROR,
02173                              ACE_LIB_TEXT ("Unexpected message priority status ")
02174                              ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02175                              (int) current_status),
02176                             -1);
02177         default:
02178           // if we got here, something is *seriously* wrong with the queue
02179           ACE_ERROR_RETURN ((LM_ERROR,
02180                              ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02181                              (int) current_status),
02182                             -1);
02183         }
02184     }
02185 
02186   return 0;
02187 }
02188 
02189 // Refresh the late queue using the strategy specific priority status
02190 // function.
02191 
02192 template <ACE_SYNCH_DECL> int
02193 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
02194                                                              ACE_Time_Value *timeout)
02195 {
02196   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02197                                                               timeout);
02198 }
02199 
02200 // Private method to hide public base class method: just calls base
02201 // class method.
02202 
02203 template <ACE_SYNCH_DECL> int
02204 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
02205                                                         ACE_Time_Value *timeout)
02206 {
02207   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02208   return this->enqueue_prio (new_item, timeout);
02209 }
02210 
02211 // Just call priority enqueue method: tail enqueue semantics for
02212 // dynamic message queues are unstable: the message may or may not be
02213 // where it was placed after the queue is refreshed prior to the next
02214 // enqueue or dequeue operation.
02215 
02216 template <ACE_SYNCH_DECL> int
02217 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
02218                                                         ACE_Time_Value *timeout)
02219 {
02220   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02221   return this->enqueue_prio (new_item, timeout);
02222 }
02223 
02224 // Just call priority enqueue method: head enqueue semantics for
02225 // dynamic message queues are unstable: the message may or may not be
02226 // where it was placed after the queue is refreshed prior to the next
02227 // enqueue or dequeue operation.
02228 
02229 template <ACE_SYNCH_DECL>
02230 ACE_Message_Queue<ACE_SYNCH_USE> *
02231 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
02232                                                                        size_t lwm,
02233                                                                        ACE_Notification_Strategy *ns)
02234 {
02235   ACE_Message_Queue<ACE_SYNCH_USE> *tmp;
02236 
02237   ACE_NEW_RETURN (tmp,
02238                   ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
02239                   0);
02240   return tmp;
02241 }
02242 
02243 // Factory method for a statically prioritized ACE_Message_Queue.
02244 
02245 template <ACE_SYNCH_DECL>
02246 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02247 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
02248                                                                          size_t lwm,
02249                                                                          ACE_Notification_Strategy *ns,
02250                                                                          u_long static_bit_field_mask,
02251                                                                          u_long static_bit_field_shift,
02252                                                                          u_long dynamic_priority_max,
02253                                                                          u_long dynamic_priority_offset)
02254 {
02255   ACE_Deadline_Message_Strategy *adms;
02256 
02257   ACE_NEW_RETURN (adms,
02258                   ACE_Deadline_Message_Strategy (static_bit_field_mask,
02259                                                  static_bit_field_shift,
02260                                                  dynamic_priority_max,
02261                                                  dynamic_priority_offset),
02262                   0);
02263 
02264   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp;
02265   ACE_NEW_RETURN (tmp,
02266                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns),
02267                   0);
02268   return tmp;
02269 }
02270 
02271 // Factory method for a dynamically prioritized (by time to deadline)
02272 // ACE_Dynamic_Message_Queue.
02273 
02274 template <ACE_SYNCH_DECL>
02275 ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
02276 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
02277                                                                        size_t lwm,
02278                                                                        ACE_Notification_Strategy *ns,
02279                                                                        u_long static_bit_field_mask,
02280                                                                        u_long static_bit_field_shift,
02281                                                                        u_long dynamic_priority_max,
02282                                                                        u_long dynamic_priority_offset)
02283 {
02284   ACE_Laxity_Message_Strategy *alms;
02285 
02286   ACE_NEW_RETURN (alms,
02287                   ACE_Laxity_Message_Strategy (static_bit_field_mask,
02288                                                static_bit_field_shift,
02289                                                dynamic_priority_max,
02290                                                dynamic_priority_offset),
02291                   0);
02292 
02293   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp;
02294   ACE_NEW_RETURN (tmp,
02295                   ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns),
02296                   0);
02297   return tmp;
02298 }
02299 
02300 // Factory method for a dynamically prioritized (by laxity)
02301 // <ACE_Dynamic_Message_Queue>.
02302 
02303 #if defined (VXWORKS)
02304 
02305 template <ACE_SYNCH_DECL>
02306 ACE_Message_Queue_Vx *
02307 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
02308                                                                    size_t max_message_length,
02309                                                                    ACE_Notification_Strategy *ns)
02310 {
02311   ACE_Message_Queue_Vx *tmp;
02312 
02313   ACE_NEW_RETURN (tmp,
02314                   ACE_Message_Queue_Vx (max_messages, max_message_length, ns),
02315                   0);
02316   return tmp;
02317 }
02318   // factory method for a wrapped VxWorks message queue
02319 
02320 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
02321 
02322 template <ACE_SYNCH_DECL>
02323 ACE_Message_Queue_NT *
02324 ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
02325 {
02326   ACE_Message_Queue_NT *tmp;
02327 
02328   ACE_NEW_RETURN (tmp,
02329                   ACE_Message_Queue_NT (max_threads);
02330                   0);
02331   return tmp;
02332 }
02333 
02334 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
02335 #endif /* defined (VXWORKS) */
02336 #endif /* ACE_MESSAGE_QUEUE_T_C */

Generated on Mon Jun 16 11:20:25 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002