Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ACE_Dynamic_Message_Queue Class Template Reference

A derived class which adapts the <ACE_Message_Queue> class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities. More...

#include <Message_Queue_T.h>

Inheritance diagram for ACE_Dynamic_Message_Queue:

Inheritance graph
[legend]
Collaboration diagram for ACE_Dynamic_Message_Queue:

Collaboration graph
[legend]
List of all members.

Public Methods

 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0)
virtual ~ACE_Dynamic_Message_Queue (void)
 Close down the message queue and release all resources. More...

virtual int remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags)
virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
virtual void dump (void) const
 Dump the state of the queue. More...

virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)
virtual int enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0)

Public Attributes

 ACE_ALLOC_HOOK_DECLARE
 Declare the dynamic allocation hooks. More...


Protected Methods

virtual int enqueue_i (ACE_Message_Block *new_item)
virtual int sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value &current_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status)
 Enqueue a message in priority order within a given priority status sublist. More...

virtual int dequeue_head_i (ACE_Message_Block *&first_item)
virtual int refresh_queue (const ACE_Time_Value &current_time)
 Refresh the queue using the strategy specific priority status function. More...

virtual int refresh_pending_queue (const ACE_Time_Value &current_time)
 Refresh the pending queue using the strategy specific priority status function. More...

virtual int refresh_late_queue (const ACE_Time_Value &current_time)
 Refresh the late queue using the strategy specific priority status function. More...


Protected Attributes

ACE_Message_Blockpending_head_
 Pointer to head of the pending messages. More...

ACE_Message_Blockpending_tail_
 Pointer to tail of the pending messages. More...

ACE_Message_Blocklate_head_
 Pointer to head of the late messages. More...

ACE_Message_Blocklate_tail_
 Pointer to tail of the late messages. More...

ACE_Message_Blockbeyond_late_head_
 Pointer to head of the beyond late messages. More...

ACE_Message_Blockbeyond_late_tail_
 Pointer to tail of the beyond late messages. More...

ACE_Dynamic_Message_Strategymessage_strategy_
 Pointer to a dynamic priority evaluation function. More...


Private Methods

void operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &)
 ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &)
virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0)
 Private method to hide public base class method: just calls base class method. More...


Detailed Description

template<ACE_SYNCH_DECL>
class ACE_Dynamic_Message_Queue<>

A derived class which adapts the <ACE_Message_Queue> class in order to maintain dynamic priorities for enqueued <ACE_Message_Blocks> and manage the queue order according to these dynamic priorities.

The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).

Definition at line 668 of file Message_Queue_T.h.


Constructor & Destructor Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue ACE_Dynamic_Message_Strategy   message_strategy,
size_t    hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t    lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy   = 0
 

Definition at line 1399 of file Message_Queue_T.cpp.

References ACE_SYNCH_USE.

01403   : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns),
01404     pending_head_ (0),
01405     pending_tail_ (0),
01406     late_head_ (0),
01407     late_tail_ (0),
01408     beyond_late_head_ (0),
01409     beyond_late_tail_ (0),
01410     message_strategy_ (message_strategy)
01411 {
01412   // Note, the ACE_Dynamic_Message_Queue assumes full responsibility
01413   // for the passed ACE_Dynamic_Message_Strategy object, and deletes
01414   // it in its own dtor
01415 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::~ACE_Dynamic_Message_Queue void    [virtual]
 

Close down the message queue and release all resources.

Definition at line 1420 of file Message_Queue_T.cpp.

References message_strategy_.

01421 {
01422   delete &this->message_strategy_;
01423 }

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue<>::ACE_Dynamic_Message_Queue const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &    [private]
 


Member Function Documentation

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head ACE_Message_Block *&    first_item,
ACE_Time_Value   timeout = 0
[virtual]
 

Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 1566 of file Message_Queue_T.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Message_Queue_Base::DEACTIVATED, dequeue_head_i, ESHUTDOWN, ACE_OS::gettimeofday, refresh_queue, ACE_Message_Queue_Base::state_, and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond.

01568 {
01569   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01570 
01571   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01572 
01573   if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01574     {
01575       errno = ESHUTDOWN;
01576       return -1;
01577     }
01578 
01579   int result;
01580 
01581   // get the current time
01582   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01583 
01584   // refresh priority status boundaries in the queue
01585   result = this->refresh_queue (current_time);
01586   if (result < 0)
01587     return result;
01588 
01589   // *now* it's appropriate to wait for an enqueued item
01590   result = this->wait_not_empty_cond (ace_mon, timeout);
01591   if (result == -1)
01592     return result;
01593 
01594   // call the internal dequeue method, which selects an item from the
01595   // highest priority status portion of the queue that has messages
01596   // enqueued.
01597   result = this->dequeue_head_i (first_item);
01598 
01599   return result;
01600 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::dequeue_head_i ACE_Message_Block *&    first_item [protected, virtual]
 

Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 1846 of file Message_Queue_T.cpp.

References ACE_TRACE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, late_head_, late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::low_water_mark_, ACE_Message_Block::next, pending_head_, pending_tail_, ACE_Message_Block::prev, ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters, ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length.

Referenced by dequeue_head.

01847 {
01848   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01849 
01850   int result = 0;
01851   int last_in_subqueue = 0;
01852 
01853   // first, try to dequeue from the head of the pending list
01854   if (this->pending_head_)
01855     {
01856       first_item = this->pending_head_;
01857 
01858       if (0 == this->pending_head_->prev ())
01859         this->head_ = this->pending_head_->next ();
01860       else
01861         this->pending_head_->prev ()->next (this->pending_head_->next ());
01862 
01863       if (0 == this->pending_head_->next ())
01864         {
01865           this->tail_ = this->pending_head_->prev ();
01866           this->pending_head_ = 0;
01867           this->pending_tail_ = 0;
01868         }
01869       else
01870         {
01871           this->pending_head_->next ()->prev (this->pending_head_->prev ());
01872           this->pending_head_ = this->pending_head_->next ();
01873         }
01874 
01875       first_item->prev (0);
01876       first_item->next (0);
01877     }
01878 
01879   // Second, try to dequeue from the head of the late list
01880   else if (this->late_head_)
01881     {
01882       last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
01883 
01884       first_item = this->late_head_;
01885 
01886       if (0 == this->late_head_->prev ())
01887         this->head_ = this->late_head_->next ();
01888       else
01889         this->late_head_->prev ()->next (this->late_head_->next ());
01890 
01891       if (0 == this->late_head_->next ())
01892         this->tail_ = this->late_head_->prev ();
01893       else
01894         {
01895           this->late_head_->next ()->prev (this->late_head_->prev ());
01896           this->late_head_ = this->late_head_->next ();
01897         }
01898 
01899       if (last_in_subqueue)
01900         {
01901           this->late_head_ = 0;
01902           this->late_tail_ = 0;
01903         }
01904 
01905       first_item->prev (0);
01906       first_item->next (0);
01907     }
01908   // finally, try to dequeue from the head of the beyond late list
01909   else if (this->beyond_late_head_)
01910     {
01911       last_in_subqueue =
01912         (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
01913 
01914       first_item = this->beyond_late_head_;
01915       this->head_ = this->beyond_late_head_->next ();
01916 
01917       if (0 == this->beyond_late_head_->next ())
01918         this->tail_ = this->beyond_late_head_->prev ();
01919       else
01920         {
01921           this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
01922           this->beyond_late_head_ = this->beyond_late_head_->next ();
01923         }
01924 
01925       if (last_in_subqueue)
01926         {
01927           this->beyond_late_head_ = 0;
01928           this->beyond_late_tail_ = 0;
01929         }
01930 
01931       first_item->prev (0);
01932       first_item->next (0);
01933     }
01934   else
01935     {
01936       // nothing to dequeue: set the pointer to zero and return an error code
01937       first_item = 0;
01938       result = -1;
01939     }
01940 
01941   if (result < 0)
01942     return result;
01943 
01944   size_t mb_bytes = 0;
01945   size_t mb_length = 0;
01946   first_item->total_size_and_length (mb_bytes,
01947                                      mb_length);
01948   // Subtract off all of the bytes associated with this message.
01949   this->cur_bytes_ -= mb_bytes;
01950   this->cur_length_ -= mb_length;
01951   this->cur_count_--;
01952 
01953   // Only signal enqueueing threads if we've fallen below the low
01954   // water mark.
01955   if (this->cur_bytes_ <= this->low_water_mark_
01956       && this->signal_enqueue_waiters () == -1)
01957     return -1;
01958   else
01959     return this->cur_count_;
01960 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::dump void    const [virtual]
 

Dump the state of the queue.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 1606 of file Message_Queue_T.cpp.

References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump, ACE_Message_Queue::dump, LM_DEBUG, and message_strategy_.

01607 {
01608   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01609   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01610 
01611   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01612   this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01613 
01614   ACE_DEBUG ((LM_DEBUG,
01615               ACE_LIB_TEXT ("pending_head_ = %u\n")
01616               ACE_LIB_TEXT ("pending_tail_ = %u\n")
01617               ACE_LIB_TEXT ("late_head_ = %u\n")
01618               ACE_LIB_TEXT ("late_tail_ = %u\n")
01619               ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01620               ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01621               this->pending_head_,
01622               this->pending_tail_,
01623               this->late_head_,
01624               this->late_tail_,
01625               this->beyond_late_head_,
01626               this->beyond_late_tail_));
01627 
01628   ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01629   message_strategy_.dump ();
01630 
01631   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01632 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_head ACE_Message_Block   new_item,
ACE_Time_Value   timeout = 0
[virtual]
 

Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2217 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio.

02219 {
02220   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02221   return this->enqueue_prio (new_item, timeout);
02222 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_i ACE_Message_Block   new_item [protected, virtual]
 

Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 1636 of file Message_Queue_T.cpp.

References ACE_TRACE, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i, ACE_OS::gettimeofday, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, message_strategy_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, pending_head_, pending_tail_, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, refresh_queue, ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters, sublist_enqueue_i, ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length.

01637 {
01638   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01639 
01640   if (new_item == 0)
01641     return -1;
01642 
01643   int result = 0;
01644 
01645   // Get the current time.
01646   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01647 
01648   // Refresh priority status boundaries in the queue.
01649 
01650   result = this->refresh_queue (current_time);
01651   if (result < 0)
01652     return result;
01653 
01654   // Where we enqueue depends on the message's priority status.
01655   switch (message_strategy_.priority_status (*new_item,
01656                                              current_time))
01657     {
01658     case ACE_Dynamic_Message_Strategy::PENDING:
01659       if (this->pending_tail_ == 0)
01660         {
01661           // Check for simple case of an empty pending queue, where
01662           // all we need to do is insert <new_item> into the tail of
01663           // the queue.
01664           pending_head_ = new_item;
01665           pending_tail_ = pending_head_;
01666           return this->enqueue_tail_i (new_item);
01667         }
01668       else
01669         {
01670           // Enqueue the new message in priority order in the pending
01671           // sublist
01672           result = sublist_enqueue_i (new_item,
01673                                       current_time,
01674                                       this->pending_head_,
01675                                       this->pending_tail_,
01676                                       ACE_Dynamic_Message_Strategy::PENDING);
01677         }
01678       break;
01679 
01680     case ACE_Dynamic_Message_Strategy::LATE:
01681       if (this->late_tail_ == 0)
01682         {
01683           late_head_ = new_item;
01684           late_tail_ = late_head_;
01685 
01686           if (this->pending_head_ == 0)
01687             // Check for simple case of an empty pending queue,
01688             // where all we need to do is insert <new_item> into the
01689             // tail of the queue.
01690             return this->enqueue_tail_i (new_item);
01691           else if (this->beyond_late_tail_ == 0)
01692             // Check for simple case of an empty beyond late queue, where all
01693             // we need to do is insert <new_item> into the head of the queue.
01694             return this->enqueue_head_i (new_item);
01695           else
01696             {
01697               // Otherwise, we can just splice the new message in
01698               // between the pending and beyond late portions of the
01699               // queue.
01700               this->beyond_late_tail_->next (new_item);
01701               new_item->prev (this->beyond_late_tail_);
01702               this->pending_head_->prev (new_item);
01703               new_item->next (this->pending_head_);
01704             }
01705         }
01706       else
01707         {
01708           // Enqueue the new message in priority order in the late
01709           // sublist
01710           result = sublist_enqueue_i (new_item,
01711                                       current_time,
01712                                       this->late_head_,
01713                                       this->late_tail_,
01714                                       ACE_Dynamic_Message_Strategy::LATE);
01715         }
01716       break;
01717 
01718     case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01719       if (this->beyond_late_tail_ == 0)
01720         {
01721           // Check for simple case of an empty beyond late queue,
01722           // where all we need to do is insert <new_item> into the
01723           // head of the queue.
01724           beyond_late_head_ = new_item;
01725           beyond_late_tail_ = beyond_late_head_;
01726           return this->enqueue_head_i (new_item);
01727         }
01728       else
01729         {
01730           // all beyond late messages have the same (zero) priority,
01731           // so just put the new one at the end of the beyond late
01732           // messages
01733           if (this->beyond_late_tail_->next ())
01734             this->beyond_late_tail_->next ()->prev (new_item);
01735           else
01736             this->tail_ = new_item;
01737 
01738           new_item->next (this->beyond_late_tail_->next ());
01739           this->beyond_late_tail_->next (new_item);
01740           new_item->prev (this->beyond_late_tail_);
01741           this->beyond_late_tail_ = new_item;
01742         }
01743 
01744       break;
01745 
01746       // should never get here, but just in case...
01747     default:
01748       result = -1;
01749       break;
01750     }
01751 
01752   if (result < 0)
01753     return result;
01754 
01755   size_t mb_bytes = 0;
01756   size_t mb_length = 0;
01757   new_item->total_size_and_length (mb_bytes,
01758                                    mb_length);
01759   // Subtract off all of the bytes associated with this message.
01760   this->cur_bytes_ -= mb_bytes;
01761   this->cur_length_ -= mb_length;
01762   this->cur_count_++;
01763 
01764   if (this->signal_dequeue_waiters () == -1)
01765     return -1;
01766   else
01767     return this->cur_count_;
01768 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::enqueue_tail ACE_Message_Block   new_item,
ACE_Time_Value   timeout = 0
[virtual]
 

Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2204 of file Message_Queue_T.cpp.

References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio.

02206 {
02207   ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02208   return this->enqueue_prio (new_item, timeout);
02209 }

template<ACE_SYNCH_DECL >
void ACE_Dynamic_Message_Queue<>::operator= const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &    [private]
 

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::peek_dequeue_head ACE_Message_Block *&    first_item,
ACE_Time_Value   timeout = 0
[private, virtual]
 

Private method to hide public base class method: just calls base class method.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 2193 of file Message_Queue_T.cpp.

References ACE_Message_Queue::peek_dequeue_head.

02195 {
02196   return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02197                                                               timeout);
02198 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_late_queue const ACE_Time_Value   current_time [protected, virtual]
 

Refresh the late queue using the strategy specific priority status function.

Definition at line 2108 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, LM_ERROR, message_strategy_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, ACE_Dynamic_Message_Strategy::Priority_Status, and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.

Referenced by refresh_queue.

02109 {
02110   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02111 
02112   if (this->late_head_)
02113     {
02114       current_status = message_strategy_.priority_status (*this->late_head_,
02115                                                           current_time);
02116       switch (current_status)
02117         {
02118         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02119 
02120           // make sure the head of the beyond late queue is set
02121           // (there may not have been any beyond late messages previously)
02122           this->beyond_late_head_ = this->head_;
02123 
02124           // advance through the beyond late messages in the late queue
02125           do
02126             {
02127               this->late_head_ = this->late_head_->next ();
02128 
02129               if (this->late_head_)
02130                 current_status = message_strategy_.priority_status (*this->late_head_,
02131                                                                     current_time);
02132               else
02133                 break;  // do while
02134 
02135             }
02136           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02137 
02138           if (this->late_head_)
02139             {
02140               // point tail of beyond late sublist to previous item
02141               this->beyond_late_tail_ = this->late_head_->prev ();
02142 
02143               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02144                 {
02145                   // there are no late messages left in the queue
02146                   this->late_head_ = 0;
02147                   this->late_tail_ = 0;
02148                 }
02149               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02150                 // if we got here, something is *seriously* wrong with the queue
02151                 ACE_ERROR_RETURN ((LM_ERROR,
02152                                    ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02153                                    (int) current_status),
02154                                   -1);
02155             }
02156           else
02157             {
02158               // there are no late messages left in the queue
02159               this->beyond_late_tail_ = this->tail_;
02160               this->late_head_ = 0;
02161               this->late_tail_ = 0;
02162             }
02163 
02164           break;  // switch
02165 
02166         case ACE_Dynamic_Message_Strategy::LATE:
02167           // do nothing - the late queue is unchanged
02168           break; // switch
02169 
02170         case ACE_Dynamic_Message_Strategy::PENDING:
02171           // if we got here, something is *seriously* wrong with the queue
02172           ACE_ERROR_RETURN ((LM_ERROR,
02173                              ACE_LIB_TEXT ("Unexpected message priority status ")
02174                              ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02175                              (int) current_status),
02176                             -1);
02177         default:
02178           // if we got here, something is *seriously* wrong with the queue
02179           ACE_ERROR_RETURN ((LM_ERROR,
02180                              ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02181                              (int) current_status),
02182                             -1);
02183         }
02184     }
02185 
02186   return 0;
02187 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_pending_queue const ACE_Time_Value   current_time [protected, virtual]
 

Refresh the pending queue using the strategy specific priority status function.

Definition at line 1985 of file Message_Queue_T.cpp.

References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, LM_ERROR, message_strategy_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, pending_head_, pending_tail_, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, ACE_Dynamic_Message_Strategy::Priority_Status, and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.

Referenced by refresh_queue.

01986 {
01987   ACE_Dynamic_Message_Strategy::Priority_Status current_status;
01988 
01989   // refresh priority status boundaries in the queue
01990   if (this->pending_head_)
01991     {
01992       current_status = message_strategy_.priority_status (*this->pending_head_,
01993                                                           current_time);
01994       switch (current_status)
01995         {
01996         case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01997           // Make sure the head of the beyond late queue is set (there
01998           // may not have been any beyond late messages previously)
01999           this->beyond_late_head_ = this->head_;
02000 
02001           // Zero out the late queue pointers, and set them only if
02002           // there turn out to be late messages in the pending sublist
02003           this->late_head_ = 0;
02004           this->late_tail_ = 0;
02005 
02006           // Advance through the beyond late messages in the pending queue
02007           do
02008             {
02009               this->pending_head_ = this->pending_head_->next ();
02010 
02011               if (this->pending_head_)
02012                 current_status = message_strategy_.priority_status (*this->pending_head_,
02013                                                                     current_time);
02014               else
02015                 break;  // do while
02016 
02017             }
02018           while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02019 
02020           if (this->pending_head_)
02021             {
02022               // point tail of beyond late sublist to previous item
02023               this->beyond_late_tail_ = this->pending_head_->prev ();
02024 
02025               if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02026                 // there are no late messages left in the queue
02027                 break; // switch
02028               else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02029                 {
02030                   // if we got here, something is *seriously* wrong with the queue
02031                   ACE_ERROR_RETURN ((LM_ERROR,
02032                                      ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02033                                      (int) current_status),
02034                                     -1);
02035                 }
02036               /* FALLTHRU */
02037             }
02038           else
02039             {
02040               // There are no pending or late messages left in the
02041               // queue.
02042               this->beyond_late_tail_ = this->tail_;
02043               this->pending_head_ = 0;
02044               this->pending_tail_ = 0;
02045               break; // switch
02046             }
02047 
02048         case ACE_Dynamic_Message_Strategy::LATE:
02049           // Make sure the head of the late queue is set (there may
02050           // not have been any late messages previously, or they may
02051           // have all become beyond late).
02052           if (this->late_head_ == 0)
02053             this->late_head_ = this->pending_head_;
02054 
02055           // advance through the beyond late messages in the pending queue
02056           do
02057             {
02058               this->pending_head_ = this->pending_head_->next ();
02059 
02060               if (this->pending_head_)
02061                 current_status = message_strategy_.priority_status (*this->pending_head_,
02062                                                                     current_time);
02063               else
02064                 break;  // do while
02065 
02066             }
02067           while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02068 
02069           if (this->pending_head_)
02070             {
02071               if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02072                 // if we got here, something is *seriously* wrong with the queue
02073                 ACE_ERROR_RETURN((LM_ERROR,
02074                                   ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02075                                   (int) current_status),
02076                                  -1);
02077 
02078               // Point tail of late sublist to previous item
02079               this->late_tail_ = this->pending_head_->prev ();
02080             }
02081           else
02082             {
02083               // there are no pending messages left in the queue
02084               this->late_tail_ = this->tail_;
02085               this->pending_head_ = 0;
02086               this->pending_tail_ = 0;
02087             }
02088 
02089           break; // switch
02090         case ACE_Dynamic_Message_Strategy::PENDING:
02091           // do nothing - the pending queue is unchanged
02092           break; // switch
02093         default:
02094           // if we got here, something is *seriously* wrong with the queue
02095           ACE_ERROR_RETURN((LM_ERROR,
02096                             ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02097                             (int) current_status),
02098                            -1);
02099         }
02100     }
02101   return 0;
02102 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::refresh_queue const ACE_Time_Value   current_time [protected, virtual]
 

Refresh the queue using the strategy specific priority status function.

Definition at line 1969 of file Message_Queue_T.cpp.

References refresh_late_queue, and refresh_pending_queue.

Referenced by dequeue_head, enqueue_i, and remove_messages.

01970 {
01971   int result;
01972 
01973   result = refresh_pending_queue (current_time);
01974 
01975   if (result != -1)
01976     result = refresh_late_queue (current_time);
01977 
01978   return result;
01979 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::remove_messages ACE_Message_Block *&    list_head,
ACE_Message_Block *&    list_tail,
u_int    status_flags
[virtual]
 

Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called.

Definition at line 1426 of file Message_Queue_T.cpp.

References ACE_BIT_ENABLED, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_OS::gettimeofday, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, pending_head_, pending_tail_, ACE_Message_Block::prev, refresh_queue, ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length.

01429 {
01430   // start with an empty list
01431   list_head = 0;
01432   list_tail = 0;
01433 
01434   // Get the current time
01435   ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01436 
01437   // Refresh priority status boundaries in the queue.
01438   int result = this->refresh_queue (current_time);
01439   if (result < 0)
01440     return result;
01441 
01442   if (ACE_BIT_ENABLED (status_flags,
01443                        (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01444       && this->pending_head_
01445       && this->pending_tail_)
01446     {
01447       // patch up pointers for the new tail of the queue
01448       if (this->pending_head_->prev ())
01449         {
01450           this->tail_ = this->pending_head_->prev ();
01451           this->pending_head_->prev ()->next (0);
01452         }
01453       else
01454         {
01455           // the list has become empty
01456           this->head_ = 0;
01457           this->tail_ = 0;
01458         }
01459 
01460       // point to the head and tail of the list
01461       list_head = this->pending_head_;
01462       list_tail = this->pending_tail_;
01463 
01464       // cut the pending messages out of the queue entirely
01465       this->pending_head_->prev (0);
01466       this->pending_head_ = 0;
01467       this->pending_tail_ = 0;
01468     }
01469 
01470   if (ACE_BIT_ENABLED (status_flags,
01471                        (u_int) ACE_Dynamic_Message_Strategy::LATE)
01472       && this->late_head_
01473       && this->late_tail_)
01474     {
01475       // Patch up pointers for the (possibly) new head and tail of the
01476       // queue.
01477       if (this->late_tail_->next ())
01478         this->late_tail_->next ()->prev (this->late_head_->prev ());
01479       else
01480         this->tail_ = this->late_head_->prev ();
01481 
01482       if (this->late_head_->prev ())
01483         this->late_head_->prev ()->next (this->late_tail_->next ());
01484       else
01485         this->head_ = this->late_tail_->next ();
01486 
01487       // put late messages behind pending messages (if any) being returned
01488       this->late_head_->prev (list_tail);
01489       if (list_tail)
01490         list_tail->next (this->late_head_);
01491       else
01492         list_head = this->late_head_;
01493 
01494       list_tail = this->late_tail_;
01495 
01496       this->late_tail_->next (0);
01497       this->late_head_ = 0;
01498       this->late_tail_ = 0;
01499     }
01500 
01501   if (ACE_BIT_ENABLED (status_flags,
01502       (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01503       && this->beyond_late_head_
01504       && this->beyond_late_tail_)
01505     {
01506       // Patch up pointers for the new tail of the queue
01507       if (this->beyond_late_tail_->next ())
01508         {
01509           this->head_ = this->beyond_late_tail_->next ();
01510           this->beyond_late_tail_->next ()->prev (0);
01511         }
01512       else
01513         {
01514           // the list has become empty
01515           this->head_ = 0;
01516           this->tail_ = 0;
01517         }
01518 
01519       // Put beyond late messages at the end of the list being
01520       // returned.
01521       if (list_tail)
01522         {
01523           this->beyond_late_head_->prev (list_tail);
01524           list_tail->next (this->beyond_late_head_);
01525         }
01526       else
01527         list_head = this->beyond_late_head_;
01528 
01529       list_tail = this->beyond_late_tail_;
01530 
01531       this->beyond_late_tail_->next (0);
01532       this->beyond_late_head_ = 0;
01533       this->beyond_late_tail_ = 0;
01534     }
01535 
01536   // Decrement message and size counts for removed messages.
01537   ACE_Message_Block *temp1;
01538 
01539   for (temp1 = list_head;
01540        temp1 != 0;
01541        temp1 = temp1->next ())
01542     {
01543       this->cur_count_--;
01544 
01545       size_t mb_bytes = 0;
01546       size_t mb_length = 0;
01547       temp1->total_size_and_length (mb_bytes,
01548                                     mb_length);
01549       // Subtract off all of the bytes associated with this message.
01550       this->cur_bytes_ -= mb_bytes;
01551       this->cur_length_ -= mb_length;
01552     }
01553 
01554   return result;
01555 }

template<ACE_SYNCH_DECL >
int ACE_Dynamic_Message_Queue<>::sublist_enqueue_i ACE_Message_Block   new_item,
const ACE_Time_Value   current_time,
ACE_Message_Block *&    sublist_head,
ACE_Message_Block *&    sublist_tail,
ACE_Dynamic_Message_Strategy::Priority_Status    status
[protected, virtual]
 

Enqueue a message in priority order within a given priority status sublist.

Definition at line 1777 of file Message_Queue_T.cpp.

References ACE_Message_Queue< ACE_SYNCH_USE >::head_, message_strategy_, ACE_Message_Block::msg_priority, ACE_Message_Block::next, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, ACE_Dynamic_Message_Strategy::Priority_Status, and ACE_Message_Queue< ACE_SYNCH_USE >::tail_.

Referenced by enqueue_i.

01782 {
01783   int result = 0;
01784   ACE_Message_Block *current_item = 0;
01785 
01786   // Find message after which to enqueue new item, based on message
01787   // priority and priority status.
01788   for (current_item = sublist_tail;
01789        current_item;
01790        current_item = current_item->prev ())
01791     {
01792       if (message_strategy_.priority_status (*current_item, current_time) == status)
01793         {
01794           if (current_item->msg_priority () >= new_item->msg_priority ())
01795             break;
01796         }
01797       else
01798         {
01799           sublist_head = new_item;
01800           break;
01801         }
01802     }
01803 
01804   if (current_item == 0)
01805     {
01806       // If the new message has highest priority of any, put it at the
01807       // head of the list (and sublist).
01808       new_item->prev (0);
01809       new_item->next (this->head_);
01810       if (this->head_ != 0)
01811         this->head_->prev (new_item);
01812       else
01813         {
01814           this->tail_ = new_item;
01815           sublist_tail = new_item;
01816         }
01817       this->head_ = new_item;
01818       sublist_head = new_item;
01819     }
01820   else
01821     {
01822       // insert the new item into the list
01823       new_item->next (current_item->next ());
01824       new_item->prev (current_item);
01825 
01826       if (current_item->next ())
01827         current_item->next ()->prev (new_item);
01828       else
01829         this->tail_ = new_item;
01830 
01831       current_item->next (new_item);
01832 
01833       // If the new item has lowest priority of any in the sublist,
01834       // move the tail pointer of the sublist back to the new item
01835       if (current_item == sublist_tail)
01836         sublist_tail = new_item;
01837     }
01838 
01839   return result;
01840 }


Member Data Documentation

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Queue::ACE_ALLOC_HOOK_DECLARE
 

Declare the dynamic allocation hooks.

Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >.

Definition at line 724 of file Message_Queue_T.h.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue::beyond_late_head_ [protected]
 

Pointer to head of the beyond late messages.

Definition at line 778 of file Message_Queue_T.h.

Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue::beyond_late_tail_ [protected]
 

Pointer to tail of the beyond late messages.

Definition at line 781 of file Message_Queue_T.h.

Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue::late_head_ [protected]
 

Pointer to head of the late messages.

Definition at line 772 of file Message_Queue_T.h.

Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue::late_tail_ [protected]
 

Pointer to tail of the late messages.

Definition at line 775 of file Message_Queue_T.h.

Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages.

template<ACE_SYNCH_DECL >
ACE_Dynamic_Message_Strategy& ACE_Dynamic_Message_Queue::message_strategy_ [protected]
 

Pointer to a dynamic priority evaluation function.

Definition at line 784 of file Message_Queue_T.h.

Referenced by dump, enqueue_i, refresh_late_queue, refresh_pending_queue, sublist_enqueue_i, and ~ACE_Dynamic_Message_Queue.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue::pending_head_ [protected]
 

Pointer to head of the pending messages.

Definition at line 766 of file Message_Queue_T.h.

Referenced by dequeue_head_i, enqueue_i, refresh_pending_queue, and remove_messages.

template<ACE_SYNCH_DECL >
ACE_Message_Block* ACE_Dynamic_Message_Queue::pending_tail_ [protected]
 

Pointer to tail of the pending messages.

Definition at line 769 of file Message_Queue_T.h.

Referenced by dequeue_head_i, enqueue_i, refresh_pending_queue, and remove_messages.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 12:47:35 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002