#include <Message_Queue_T.h>
Inheritance diagram for ACE_Dynamic_Message_Queue:


Public Methods | |
| ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy &message_strategy, size_t hwm=ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm=ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy *=0) | |
| virtual | ~ACE_Dynamic_Message_Queue (void) |
| Close down the message queue and release all resources. More... | |
| virtual int | remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) |
| virtual int | dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
| virtual void | dump (void) const |
| Dump the state of the queue. More... | |
| virtual int | enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
| virtual int | enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout=0) |
Public Attributes | |
| ACE_ALLOC_HOOK_DECLARE | |
| Declare the dynamic allocation hooks. More... | |
Protected Methods | |
| virtual int | enqueue_i (ACE_Message_Block *new_item) |
| virtual int | sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) |
| Enqueue a message in priority order within a given priority status sublist. More... | |
| virtual int | dequeue_head_i (ACE_Message_Block *&first_item) |
| virtual int | refresh_queue (const ACE_Time_Value ¤t_time) |
| Refresh the queue using the strategy specific priority status function. More... | |
| virtual int | refresh_pending_queue (const ACE_Time_Value ¤t_time) |
| Refresh the pending queue using the strategy specific priority status function. More... | |
| virtual int | refresh_late_queue (const ACE_Time_Value ¤t_time) |
| Refresh the late queue using the strategy specific priority status function. More... | |
Protected Attributes | |
| ACE_Message_Block * | pending_head_ |
| Pointer to head of the pending messages. More... | |
| ACE_Message_Block * | pending_tail_ |
| Pointer to tail of the pending messages. More... | |
| ACE_Message_Block * | late_head_ |
| Pointer to head of the late messages. More... | |
| ACE_Message_Block * | late_tail_ |
| Pointer to tail of the late messages. More... | |
| ACE_Message_Block * | beyond_late_head_ |
| Pointer to head of the beyond late messages. More... | |
| ACE_Message_Block * | beyond_late_tail_ |
| Pointer to tail of the beyond late messages. More... | |
| ACE_Dynamic_Message_Strategy & | message_strategy_ |
| Pointer to a dynamic priority evaluation function. More... | |
Private Methods | |
| void | operator= (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) |
| ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue< _ACE_SYNCH > &) | |
| virtual int | peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout=0) |
| Private method to hide public base class method: just calls base class method. More... | |
The messages in the queue are managed so as to preserve a logical ordering with minimal overhead per enqueue and dequeue operation. For this reason, the actual order of messages in the linked list of the queue may differ from their priority order. As time passes, a message may change from pending status to late status, and eventually to beyond late status. To minimize reordering overhead under this design force, three separate boundaries are maintained within the linked list of messages. Messages are dequeued preferentially from the head of the pending portion, then the head of the late portion, and finally from the head of the beyond late portion. In this way, only the boundaries need to be maintained (which can be done efficiently, as aging messages maintain the same linked list order as they progress from one status to the next), with no reordering of the messages themselves, while providing correct priority ordered dequeueing semantics. Head and tail enqueue methods inherited from ACE_Message_Queue are made private to prevent out-of-order messages from confusing management of the various portions of the queue. Messages in the pending portion of the queue whose priority becomes late (according to the specific dynamic strategy) advance into the late portion of the queue. Messages in the late portion of the queue whose priority becomes later than can be represented advance to the beyond_late portion of the queue. These behaviors support a limited schedule overrun, with pending messages prioritized ahead of late messages, and late messages ahead of beyond late messages. These behaviors can be modified in derived classes by providing alternative definitions for the appropriate virtual methods. When filled with messages, the queue's linked list should look like: H T | | B - B - B - B - L - L - L - P - P - P - P - P | | | | | | BH BT LH LT PH PT Where the symbols are as follows: H = Head of the entire list T = Tail of the entire list B = Beyond late message BH = Beyond late messages Head BT = Beyond late messages Tail L = Late message LH = Late messages Head LT = Late messages Tail P = Pending message PH = Pending messages Head PT = Pending messages Tail Caveat: the virtual methods enqueue_tail, enqueue_head, and peek_dequeue_head have semantics for the static message queues that cannot be guaranteed for dynamic message queues. The peek_dequeue_head method just calls the base class method, while the two enqueue methods call the priority enqueue method. The order of messages in the dynamic queue is a function of message deadlines and how long they are in the queues. You can manipulate these in some cases to ensure the correct semantics, but that is not a very stable or portable approach (discouraged).
Definition at line 668 of file Message_Queue_T.h.
|
||||||||||||||||||||||||
|
Definition at line 1399 of file Message_Queue_T.cpp. References ACE_SYNCH_USE.
01403 : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), 01404 pending_head_ (0), 01405 pending_tail_ (0), 01406 late_head_ (0), 01407 late_tail_ (0), 01408 beyond_late_head_ (0), 01409 beyond_late_tail_ (0), 01410 message_strategy_ (message_strategy) 01411 { 01412 // Note, the ACE_Dynamic_Message_Queue assumes full responsibility 01413 // for the passed ACE_Dynamic_Message_Strategy object, and deletes 01414 // it in its own dtor 01415 } |
|
||||||||||
|
Close down the message queue and release all resources.
Definition at line 1420 of file Message_Queue_T.cpp. References message_strategy_.
01421 {
01422 delete &this->message_strategy_;
01423 }
|
|
||||||||||
|
|
|
||||||||||||||||
|
Dequeue and return the <ACE_Message_Block *> at the head of the queue. Returns -1 on failure, else the number of items still on the queue. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1566 of file Message_Queue_T.cpp. References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX_T, ACE_TRACE, ACE_Message_Queue_Base::DEACTIVATED, dequeue_head_i, ESHUTDOWN, ACE_OS::gettimeofday, refresh_queue, ACE_Message_Queue_Base::state_, and ACE_Message_Queue< ACE_SYNCH_USE >::wait_not_empty_cond.
01568 {
01569 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
01570
01571 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
01572
01573 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
01574 {
01575 errno = ESHUTDOWN;
01576 return -1;
01577 }
01578
01579 int result;
01580
01581 // get the current time
01582 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01583
01584 // refresh priority status boundaries in the queue
01585 result = this->refresh_queue (current_time);
01586 if (result < 0)
01587 return result;
01588
01589 // *now* it's appropriate to wait for an enqueued item
01590 result = this->wait_not_empty_cond (ace_mon, timeout);
01591 if (result == -1)
01592 return result;
01593
01594 // call the internal dequeue method, which selects an item from the
01595 // highest priority status portion of the queue that has messages
01596 // enqueued.
01597 result = this->dequeue_head_i (first_item);
01598
01599 return result;
01600 }
|
|
||||||||||
|
Dequeue and return the <ACE_Message_Block *> at the head of the logical queue. Attempts first to dequeue from the pending portion of the queue, or if that is empty from the late portion, or if that is empty from the beyond late portion, or if that is empty just sets the passed pointer to zero and returns -1. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1846 of file Message_Queue_T.cpp. References ACE_TRACE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, late_head_, late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::low_water_mark_, ACE_Message_Block::next, pending_head_, pending_tail_, ACE_Message_Block::prev, ACE_Message_Queue< ACE_SYNCH_USE >::signal_enqueue_waiters, ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length. Referenced by dequeue_head.
01847 {
01848 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
01849
01850 int result = 0;
01851 int last_in_subqueue = 0;
01852
01853 // first, try to dequeue from the head of the pending list
01854 if (this->pending_head_)
01855 {
01856 first_item = this->pending_head_;
01857
01858 if (0 == this->pending_head_->prev ())
01859 this->head_ = this->pending_head_->next ();
01860 else
01861 this->pending_head_->prev ()->next (this->pending_head_->next ());
01862
01863 if (0 == this->pending_head_->next ())
01864 {
01865 this->tail_ = this->pending_head_->prev ();
01866 this->pending_head_ = 0;
01867 this->pending_tail_ = 0;
01868 }
01869 else
01870 {
01871 this->pending_head_->next ()->prev (this->pending_head_->prev ());
01872 this->pending_head_ = this->pending_head_->next ();
01873 }
01874
01875 first_item->prev (0);
01876 first_item->next (0);
01877 }
01878
01879 // Second, try to dequeue from the head of the late list
01880 else if (this->late_head_)
01881 {
01882 last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
01883
01884 first_item = this->late_head_;
01885
01886 if (0 == this->late_head_->prev ())
01887 this->head_ = this->late_head_->next ();
01888 else
01889 this->late_head_->prev ()->next (this->late_head_->next ());
01890
01891 if (0 == this->late_head_->next ())
01892 this->tail_ = this->late_head_->prev ();
01893 else
01894 {
01895 this->late_head_->next ()->prev (this->late_head_->prev ());
01896 this->late_head_ = this->late_head_->next ();
01897 }
01898
01899 if (last_in_subqueue)
01900 {
01901 this->late_head_ = 0;
01902 this->late_tail_ = 0;
01903 }
01904
01905 first_item->prev (0);
01906 first_item->next (0);
01907 }
01908 // finally, try to dequeue from the head of the beyond late list
01909 else if (this->beyond_late_head_)
01910 {
01911 last_in_subqueue =
01912 (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
01913
01914 first_item = this->beyond_late_head_;
01915 this->head_ = this->beyond_late_head_->next ();
01916
01917 if (0 == this->beyond_late_head_->next ())
01918 this->tail_ = this->beyond_late_head_->prev ();
01919 else
01920 {
01921 this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
01922 this->beyond_late_head_ = this->beyond_late_head_->next ();
01923 }
01924
01925 if (last_in_subqueue)
01926 {
01927 this->beyond_late_head_ = 0;
01928 this->beyond_late_tail_ = 0;
01929 }
01930
01931 first_item->prev (0);
01932 first_item->next (0);
01933 }
01934 else
01935 {
01936 // nothing to dequeue: set the pointer to zero and return an error code
01937 first_item = 0;
01938 result = -1;
01939 }
01940
01941 if (result < 0)
01942 return result;
01943
01944 size_t mb_bytes = 0;
01945 size_t mb_length = 0;
01946 first_item->total_size_and_length (mb_bytes,
01947 mb_length);
01948 // Subtract off all of the bytes associated with this message.
01949 this->cur_bytes_ -= mb_bytes;
01950 this->cur_length_ -= mb_length;
01951 this->cur_count_--;
01952
01953 // Only signal enqueueing threads if we've fallen below the low
01954 // water mark.
01955 if (this->cur_bytes_ <= this->low_water_mark_
01956 && this->signal_enqueue_waiters () == -1)
01957 return -1;
01958 else
01959 return this->cur_count_;
01960 }
|
|
||||||||||
|
Dump the state of the queue.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1606 of file Message_Queue_T.cpp. References ACE_BEGIN_DUMP, ACE_DEBUG, ACE_END_DUMP, ACE_LIB_TEXT, ACE_TRACE, ACE_Dynamic_Message_Strategy::dump, ACE_Message_Queue::dump, LM_DEBUG, and message_strategy_.
01607 {
01608 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
01609 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
01610
01611 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
01612 this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
01613
01614 ACE_DEBUG ((LM_DEBUG,
01615 ACE_LIB_TEXT ("pending_head_ = %u\n")
01616 ACE_LIB_TEXT ("pending_tail_ = %u\n")
01617 ACE_LIB_TEXT ("late_head_ = %u\n")
01618 ACE_LIB_TEXT ("late_tail_ = %u\n")
01619 ACE_LIB_TEXT ("beyond_late_head_ = %u\n")
01620 ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"),
01621 this->pending_head_,
01622 this->pending_tail_,
01623 this->late_head_,
01624 this->late_tail_,
01625 this->beyond_late_head_,
01626 this->beyond_late_tail_));
01627
01628 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n")));
01629 message_strategy_.dump ();
01630
01631 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
01632 }
|
|
||||||||||||||||
|
Just call priority enqueue method: head enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2217 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio.
02219 {
02220 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
02221 return this->enqueue_prio (new_item, timeout);
02222 }
|
|
||||||||||
|
Enqueue an <ACE_Message_Block *> in accordance with its priority. priority may be *dynamic* or *static* or a combination or *both* It calls the priority evaluation function passed into the Dynamic Message Queue constructor to update the priorities of all enqueued messages. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 1636 of file Message_Queue_T.cpp. References ACE_TRACE, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_head_i, ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_tail_i, ACE_OS::gettimeofday, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, message_strategy_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, pending_head_, pending_tail_, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, refresh_queue, ACE_Message_Queue< ACE_SYNCH_USE >::signal_dequeue_waiters, sublist_enqueue_i, ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length.
01637 {
01638 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
01639
01640 if (new_item == 0)
01641 return -1;
01642
01643 int result = 0;
01644
01645 // Get the current time.
01646 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01647
01648 // Refresh priority status boundaries in the queue.
01649
01650 result = this->refresh_queue (current_time);
01651 if (result < 0)
01652 return result;
01653
01654 // Where we enqueue depends on the message's priority status.
01655 switch (message_strategy_.priority_status (*new_item,
01656 current_time))
01657 {
01658 case ACE_Dynamic_Message_Strategy::PENDING:
01659 if (this->pending_tail_ == 0)
01660 {
01661 // Check for simple case of an empty pending queue, where
01662 // all we need to do is insert <new_item> into the tail of
01663 // the queue.
01664 pending_head_ = new_item;
01665 pending_tail_ = pending_head_;
01666 return this->enqueue_tail_i (new_item);
01667 }
01668 else
01669 {
01670 // Enqueue the new message in priority order in the pending
01671 // sublist
01672 result = sublist_enqueue_i (new_item,
01673 current_time,
01674 this->pending_head_,
01675 this->pending_tail_,
01676 ACE_Dynamic_Message_Strategy::PENDING);
01677 }
01678 break;
01679
01680 case ACE_Dynamic_Message_Strategy::LATE:
01681 if (this->late_tail_ == 0)
01682 {
01683 late_head_ = new_item;
01684 late_tail_ = late_head_;
01685
01686 if (this->pending_head_ == 0)
01687 // Check for simple case of an empty pending queue,
01688 // where all we need to do is insert <new_item> into the
01689 // tail of the queue.
01690 return this->enqueue_tail_i (new_item);
01691 else if (this->beyond_late_tail_ == 0)
01692 // Check for simple case of an empty beyond late queue, where all
01693 // we need to do is insert <new_item> into the head of the queue.
01694 return this->enqueue_head_i (new_item);
01695 else
01696 {
01697 // Otherwise, we can just splice the new message in
01698 // between the pending and beyond late portions of the
01699 // queue.
01700 this->beyond_late_tail_->next (new_item);
01701 new_item->prev (this->beyond_late_tail_);
01702 this->pending_head_->prev (new_item);
01703 new_item->next (this->pending_head_);
01704 }
01705 }
01706 else
01707 {
01708 // Enqueue the new message in priority order in the late
01709 // sublist
01710 result = sublist_enqueue_i (new_item,
01711 current_time,
01712 this->late_head_,
01713 this->late_tail_,
01714 ACE_Dynamic_Message_Strategy::LATE);
01715 }
01716 break;
01717
01718 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01719 if (this->beyond_late_tail_ == 0)
01720 {
01721 // Check for simple case of an empty beyond late queue,
01722 // where all we need to do is insert <new_item> into the
01723 // head of the queue.
01724 beyond_late_head_ = new_item;
01725 beyond_late_tail_ = beyond_late_head_;
01726 return this->enqueue_head_i (new_item);
01727 }
01728 else
01729 {
01730 // all beyond late messages have the same (zero) priority,
01731 // so just put the new one at the end of the beyond late
01732 // messages
01733 if (this->beyond_late_tail_->next ())
01734 this->beyond_late_tail_->next ()->prev (new_item);
01735 else
01736 this->tail_ = new_item;
01737
01738 new_item->next (this->beyond_late_tail_->next ());
01739 this->beyond_late_tail_->next (new_item);
01740 new_item->prev (this->beyond_late_tail_);
01741 this->beyond_late_tail_ = new_item;
01742 }
01743
01744 break;
01745
01746 // should never get here, but just in case...
01747 default:
01748 result = -1;
01749 break;
01750 }
01751
01752 if (result < 0)
01753 return result;
01754
01755 size_t mb_bytes = 0;
01756 size_t mb_length = 0;
01757 new_item->total_size_and_length (mb_bytes,
01758 mb_length);
01759 // Subtract off all of the bytes associated with this message.
01760 this->cur_bytes_ -= mb_bytes;
01761 this->cur_length_ -= mb_length;
01762 this->cur_count_++;
01763
01764 if (this->signal_dequeue_waiters () == -1)
01765 return -1;
01766 else
01767 return this->cur_count_;
01768 }
|
|
||||||||||||||||
|
Just call priority enqueue method: tail enqueue semantics for dynamic message queues are unstable: the message may or may not be where it was placed after the queue is refreshed prior to the next enqueue or dequeue operation. Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2204 of file Message_Queue_T.cpp. References ACE_TRACE, and ACE_Message_Queue< ACE_SYNCH_USE >::enqueue_prio.
02206 {
02207 ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
02208 return this->enqueue_prio (new_item, timeout);
02209 }
|
|
||||||||||
|
|
|
||||||||||||||||
|
Private method to hide public base class method: just calls base class method.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 2193 of file Message_Queue_T.cpp. References ACE_Message_Queue::peek_dequeue_head.
02195 {
02196 return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
02197 timeout);
02198 }
|
|
||||||||||
|
Refresh the late queue using the strategy specific priority status function.
Definition at line 2108 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, LM_ERROR, message_strategy_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, ACE_Dynamic_Message_Strategy::Priority_Status, and ACE_Message_Queue< ACE_SYNCH_USE >::tail_. Referenced by refresh_queue.
02109 {
02110 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
02111
02112 if (this->late_head_)
02113 {
02114 current_status = message_strategy_.priority_status (*this->late_head_,
02115 current_time);
02116 switch (current_status)
02117 {
02118 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
02119
02120 // make sure the head of the beyond late queue is set
02121 // (there may not have been any beyond late messages previously)
02122 this->beyond_late_head_ = this->head_;
02123
02124 // advance through the beyond late messages in the late queue
02125 do
02126 {
02127 this->late_head_ = this->late_head_->next ();
02128
02129 if (this->late_head_)
02130 current_status = message_strategy_.priority_status (*this->late_head_,
02131 current_time);
02132 else
02133 break; // do while
02134
02135 }
02136 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02137
02138 if (this->late_head_)
02139 {
02140 // point tail of beyond late sublist to previous item
02141 this->beyond_late_tail_ = this->late_head_->prev ();
02142
02143 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02144 {
02145 // there are no late messages left in the queue
02146 this->late_head_ = 0;
02147 this->late_tail_ = 0;
02148 }
02149 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02150 // if we got here, something is *seriously* wrong with the queue
02151 ACE_ERROR_RETURN ((LM_ERROR,
02152 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02153 (int) current_status),
02154 -1);
02155 }
02156 else
02157 {
02158 // there are no late messages left in the queue
02159 this->beyond_late_tail_ = this->tail_;
02160 this->late_head_ = 0;
02161 this->late_tail_ = 0;
02162 }
02163
02164 break; // switch
02165
02166 case ACE_Dynamic_Message_Strategy::LATE:
02167 // do nothing - the late queue is unchanged
02168 break; // switch
02169
02170 case ACE_Dynamic_Message_Strategy::PENDING:
02171 // if we got here, something is *seriously* wrong with the queue
02172 ACE_ERROR_RETURN ((LM_ERROR,
02173 ACE_LIB_TEXT ("Unexpected message priority status ")
02174 ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"),
02175 (int) current_status),
02176 -1);
02177 default:
02178 // if we got here, something is *seriously* wrong with the queue
02179 ACE_ERROR_RETURN ((LM_ERROR,
02180 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02181 (int) current_status),
02182 -1);
02183 }
02184 }
02185
02186 return 0;
02187 }
|
|
||||||||||
|
Refresh the pending queue using the strategy specific priority status function.
Definition at line 1985 of file Message_Queue_T.cpp. References ACE_ERROR_RETURN, ACE_LIB_TEXT, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, LM_ERROR, message_strategy_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, pending_head_, pending_tail_, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, ACE_Dynamic_Message_Strategy::Priority_Status, and ACE_Message_Queue< ACE_SYNCH_USE >::tail_. Referenced by refresh_queue.
01986 {
01987 ACE_Dynamic_Message_Strategy::Priority_Status current_status;
01988
01989 // refresh priority status boundaries in the queue
01990 if (this->pending_head_)
01991 {
01992 current_status = message_strategy_.priority_status (*this->pending_head_,
01993 current_time);
01994 switch (current_status)
01995 {
01996 case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
01997 // Make sure the head of the beyond late queue is set (there
01998 // may not have been any beyond late messages previously)
01999 this->beyond_late_head_ = this->head_;
02000
02001 // Zero out the late queue pointers, and set them only if
02002 // there turn out to be late messages in the pending sublist
02003 this->late_head_ = 0;
02004 this->late_tail_ = 0;
02005
02006 // Advance through the beyond late messages in the pending queue
02007 do
02008 {
02009 this->pending_head_ = this->pending_head_->next ();
02010
02011 if (this->pending_head_)
02012 current_status = message_strategy_.priority_status (*this->pending_head_,
02013 current_time);
02014 else
02015 break; // do while
02016
02017 }
02018 while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
02019
02020 if (this->pending_head_)
02021 {
02022 // point tail of beyond late sublist to previous item
02023 this->beyond_late_tail_ = this->pending_head_->prev ();
02024
02025 if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
02026 // there are no late messages left in the queue
02027 break; // switch
02028 else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
02029 {
02030 // if we got here, something is *seriously* wrong with the queue
02031 ACE_ERROR_RETURN ((LM_ERROR,
02032 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
02033 (int) current_status),
02034 -1);
02035 }
02036 /* FALLTHRU */
02037 }
02038 else
02039 {
02040 // There are no pending or late messages left in the
02041 // queue.
02042 this->beyond_late_tail_ = this->tail_;
02043 this->pending_head_ = 0;
02044 this->pending_tail_ = 0;
02045 break; // switch
02046 }
02047
02048 case ACE_Dynamic_Message_Strategy::LATE:
02049 // Make sure the head of the late queue is set (there may
02050 // not have been any late messages previously, or they may
02051 // have all become beyond late).
02052 if (this->late_head_ == 0)
02053 this->late_head_ = this->pending_head_;
02054
02055 // advance through the beyond late messages in the pending queue
02056 do
02057 {
02058 this->pending_head_ = this->pending_head_->next ();
02059
02060 if (this->pending_head_)
02061 current_status = message_strategy_.priority_status (*this->pending_head_,
02062 current_time);
02063 else
02064 break; // do while
02065
02066 }
02067 while (current_status == ACE_Dynamic_Message_Strategy::LATE);
02068
02069 if (this->pending_head_)
02070 {
02071 if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
02072 // if we got here, something is *seriously* wrong with the queue
02073 ACE_ERROR_RETURN((LM_ERROR,
02074 ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
02075 (int) current_status),
02076 -1);
02077
02078 // Point tail of late sublist to previous item
02079 this->late_tail_ = this->pending_head_->prev ();
02080 }
02081 else
02082 {
02083 // there are no pending messages left in the queue
02084 this->late_tail_ = this->tail_;
02085 this->pending_head_ = 0;
02086 this->pending_tail_ = 0;
02087 }
02088
02089 break; // switch
02090 case ACE_Dynamic_Message_Strategy::PENDING:
02091 // do nothing - the pending queue is unchanged
02092 break; // switch
02093 default:
02094 // if we got here, something is *seriously* wrong with the queue
02095 ACE_ERROR_RETURN((LM_ERROR,
02096 ACE_LIB_TEXT ("Unknown message priority status [%d]"),
02097 (int) current_status),
02098 -1);
02099 }
02100 }
02101 return 0;
02102 }
|
|
||||||||||
|
Refresh the queue using the strategy specific priority status function.
Definition at line 1969 of file Message_Queue_T.cpp. References refresh_late_queue, and refresh_pending_queue. Referenced by dequeue_head, enqueue_i, and remove_messages.
01970 {
01971 int result;
01972
01973 result = refresh_pending_queue (current_time);
01974
01975 if (result != -1)
01976 result = refresh_late_queue (current_time);
01977
01978 return result;
01979 }
|
|
||||||||||||||||||||
|
Detach all messages with status given in the passed flags from the queue and return them by setting passed head and tail pointers to the linked list they comprise. This method is intended primarily as a means of periodically harvesting messages that have missed their deadlines, but is available in its most general form. All messages are returned in priority order, from head to tail, as of the time this method was called. Definition at line 1426 of file Message_Queue_T.cpp. References ACE_BIT_ENABLED, ACE_Dynamic_Message_Strategy::BEYOND_LATE, beyond_late_head_, beyond_late_tail_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_bytes_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_count_, ACE_Message_Queue< ACE_SYNCH_USE >::cur_length_, ACE_OS::gettimeofday, ACE_Message_Queue< ACE_SYNCH_USE >::head_, ACE_Dynamic_Message_Strategy::LATE, late_head_, late_tail_, ACE_Message_Block::next, ACE_Dynamic_Message_Strategy::PENDING, pending_head_, pending_tail_, ACE_Message_Block::prev, refresh_queue, ACE_Message_Queue< ACE_SYNCH_USE >::tail_, and ACE_Message_Block::total_size_and_length.
01429 {
01430 // start with an empty list
01431 list_head = 0;
01432 list_tail = 0;
01433
01434 // Get the current time
01435 ACE_Time_Value current_time = ACE_OS::gettimeofday ();
01436
01437 // Refresh priority status boundaries in the queue.
01438 int result = this->refresh_queue (current_time);
01439 if (result < 0)
01440 return result;
01441
01442 if (ACE_BIT_ENABLED (status_flags,
01443 (u_int) ACE_Dynamic_Message_Strategy::PENDING)
01444 && this->pending_head_
01445 && this->pending_tail_)
01446 {
01447 // patch up pointers for the new tail of the queue
01448 if (this->pending_head_->prev ())
01449 {
01450 this->tail_ = this->pending_head_->prev ();
01451 this->pending_head_->prev ()->next (0);
01452 }
01453 else
01454 {
01455 // the list has become empty
01456 this->head_ = 0;
01457 this->tail_ = 0;
01458 }
01459
01460 // point to the head and tail of the list
01461 list_head = this->pending_head_;
01462 list_tail = this->pending_tail_;
01463
01464 // cut the pending messages out of the queue entirely
01465 this->pending_head_->prev (0);
01466 this->pending_head_ = 0;
01467 this->pending_tail_ = 0;
01468 }
01469
01470 if (ACE_BIT_ENABLED (status_flags,
01471 (u_int) ACE_Dynamic_Message_Strategy::LATE)
01472 && this->late_head_
01473 && this->late_tail_)
01474 {
01475 // Patch up pointers for the (possibly) new head and tail of the
01476 // queue.
01477 if (this->late_tail_->next ())
01478 this->late_tail_->next ()->prev (this->late_head_->prev ());
01479 else
01480 this->tail_ = this->late_head_->prev ();
01481
01482 if (this->late_head_->prev ())
01483 this->late_head_->prev ()->next (this->late_tail_->next ());
01484 else
01485 this->head_ = this->late_tail_->next ();
01486
01487 // put late messages behind pending messages (if any) being returned
01488 this->late_head_->prev (list_tail);
01489 if (list_tail)
01490 list_tail->next (this->late_head_);
01491 else
01492 list_head = this->late_head_;
01493
01494 list_tail = this->late_tail_;
01495
01496 this->late_tail_->next (0);
01497 this->late_head_ = 0;
01498 this->late_tail_ = 0;
01499 }
01500
01501 if (ACE_BIT_ENABLED (status_flags,
01502 (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
01503 && this->beyond_late_head_
01504 && this->beyond_late_tail_)
01505 {
01506 // Patch up pointers for the new tail of the queue
01507 if (this->beyond_late_tail_->next ())
01508 {
01509 this->head_ = this->beyond_late_tail_->next ();
01510 this->beyond_late_tail_->next ()->prev (0);
01511 }
01512 else
01513 {
01514 // the list has become empty
01515 this->head_ = 0;
01516 this->tail_ = 0;
01517 }
01518
01519 // Put beyond late messages at the end of the list being
01520 // returned.
01521 if (list_tail)
01522 {
01523 this->beyond_late_head_->prev (list_tail);
01524 list_tail->next (this->beyond_late_head_);
01525 }
01526 else
01527 list_head = this->beyond_late_head_;
01528
01529 list_tail = this->beyond_late_tail_;
01530
01531 this->beyond_late_tail_->next (0);
01532 this->beyond_late_head_ = 0;
01533 this->beyond_late_tail_ = 0;
01534 }
01535
01536 // Decrement message and size counts for removed messages.
01537 ACE_Message_Block *temp1;
01538
01539 for (temp1 = list_head;
01540 temp1 != 0;
01541 temp1 = temp1->next ())
01542 {
01543 this->cur_count_--;
01544
01545 size_t mb_bytes = 0;
01546 size_t mb_length = 0;
01547 temp1->total_size_and_length (mb_bytes,
01548 mb_length);
01549 // Subtract off all of the bytes associated with this message.
01550 this->cur_bytes_ -= mb_bytes;
01551 this->cur_length_ -= mb_length;
01552 }
01553
01554 return result;
01555 }
|
|
||||||||||||||||||||||||||||
|
Enqueue a message in priority order within a given priority status sublist.
Definition at line 1777 of file Message_Queue_T.cpp. References ACE_Message_Queue< ACE_SYNCH_USE >::head_, message_strategy_, ACE_Message_Block::msg_priority, ACE_Message_Block::next, ACE_Message_Block::prev, ACE_Dynamic_Message_Strategy::priority_status, ACE_Dynamic_Message_Strategy::Priority_Status, and ACE_Message_Queue< ACE_SYNCH_USE >::tail_. Referenced by enqueue_i.
01782 {
01783 int result = 0;
01784 ACE_Message_Block *current_item = 0;
01785
01786 // Find message after which to enqueue new item, based on message
01787 // priority and priority status.
01788 for (current_item = sublist_tail;
01789 current_item;
01790 current_item = current_item->prev ())
01791 {
01792 if (message_strategy_.priority_status (*current_item, current_time) == status)
01793 {
01794 if (current_item->msg_priority () >= new_item->msg_priority ())
01795 break;
01796 }
01797 else
01798 {
01799 sublist_head = new_item;
01800 break;
01801 }
01802 }
01803
01804 if (current_item == 0)
01805 {
01806 // If the new message has highest priority of any, put it at the
01807 // head of the list (and sublist).
01808 new_item->prev (0);
01809 new_item->next (this->head_);
01810 if (this->head_ != 0)
01811 this->head_->prev (new_item);
01812 else
01813 {
01814 this->tail_ = new_item;
01815 sublist_tail = new_item;
01816 }
01817 this->head_ = new_item;
01818 sublist_head = new_item;
01819 }
01820 else
01821 {
01822 // insert the new item into the list
01823 new_item->next (current_item->next ());
01824 new_item->prev (current_item);
01825
01826 if (current_item->next ())
01827 current_item->next ()->prev (new_item);
01828 else
01829 this->tail_ = new_item;
01830
01831 current_item->next (new_item);
01832
01833 // If the new item has lowest priority of any in the sublist,
01834 // move the tail pointer of the sublist back to the new item
01835 if (current_item == sublist_tail)
01836 sublist_tail = new_item;
01837 }
01838
01839 return result;
01840 }
|
|
|||||
|
Declare the dynamic allocation hooks.
Reimplemented from ACE_Message_Queue< ACE_SYNCH_USE >. Definition at line 724 of file Message_Queue_T.h. |
|
|||||
|
Pointer to head of the beyond late messages.
Definition at line 778 of file Message_Queue_T.h. Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages. |
|
|||||
|
Pointer to tail of the beyond late messages.
Definition at line 781 of file Message_Queue_T.h. Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages. |
|
|||||
|
Pointer to head of the late messages.
Definition at line 772 of file Message_Queue_T.h. Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages. |
|
|||||
|
Pointer to tail of the late messages.
Definition at line 775 of file Message_Queue_T.h. Referenced by dequeue_head_i, enqueue_i, refresh_late_queue, refresh_pending_queue, and remove_messages. |
|
|||||
|
Pointer to a dynamic priority evaluation function.
Definition at line 784 of file Message_Queue_T.h. Referenced by dump, enqueue_i, refresh_late_queue, refresh_pending_queue, sublist_enqueue_i, and ~ACE_Dynamic_Message_Queue. |
|
|||||
|
Pointer to head of the pending messages.
Definition at line 766 of file Message_Queue_T.h. Referenced by dequeue_head_i, enqueue_i, refresh_pending_queue, and remove_messages. |
|
|||||
|
Pointer to tail of the pending messages.
Definition at line 769 of file Message_Queue_T.h. Referenced by dequeue_head_i, enqueue_i, refresh_pending_queue, and remove_messages. |
1.2.14 written by Dimitri van Heesch,
© 1997-2002