00001 /* -*- C++ -*- */ 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue_T.h 00006 * 00007 * $Id: Message_Queue_T.h,v 1.1.1.4 2003/02/21 18:36:32 chad Exp $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_T_H 00014 #define ACE_MESSAGE_QUEUE_T_H 00015 #include "ace/pre.h" 00016 00017 #include "ace/Message_Queue.h" 00018 #include "ace/Synch.h" 00019 00020 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00021 # pragma once 00022 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00023 00024 #if defined (VXWORKS) 00025 class ACE_Message_Queue_Vx; 00026 #endif /* defined (VXWORKS) */ 00027 00028 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00029 class ACE_Message_Queue_NT; 00030 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ 00031 00032 /** 00033 * @class ACE_Message_Queue 00034 * 00035 * @brief A threaded message queueing facility, modeled after the 00036 * queueing facilities in System V STREAMs. 00037 * 00038 * An <ACE_Message_Queue> is the central queueing facility for 00039 * messages in the ACE framework. If <ACE_SYNCH_DECL> is 00040 * <ACE_MT_SYNCH> then all operations are thread-safe. 00041 * Otherwise, if it's <ACE_NULL_SYNCH> then there's no locking 00042 * overhead. 00043 */ 00044 template <ACE_SYNCH_DECL> 00045 class ACE_Message_Queue : public ACE_Message_Queue_Base 00046 { 00047 public: 00048 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00049 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00050 00051 // = Traits 00052 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00053 ITERATOR; 00054 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00055 REVERSE_ITERATOR; 00056 00057 // = Initialization and termination methods. 00058 /** 00059 * Initialize an <ACE_Message_Queue>. The <high_water_mark> 00060 * determines how many bytes can be stored in a queue before it's 00061 * considered "full." Supplier threads must block until the queue 00062 * is no longer full. The <low_water_mark> determines how many 00063 * bytes must be in the queue before supplier threads are allowed to 00064 * enqueue additional <ACE_Message_Block>s. By default, the 00065 * <high_water_mark> equals the <low_water_mark>, which means that 00066 * suppliers will be able to enqueue new messages as soon as a 00067 * consumer removes any message from the queue. Making the 00068 * <low_water_mark> smaller than the <high_water_mark> forces 00069 * consumers to drain more messages from the queue before suppliers 00070 * can enqueue new messages, which can minimize the "silly window 00071 * syndrome." 00072 */ 00073 ACE_Message_Queue (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 00074 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 00075 ACE_Notification_Strategy * = 0); 00076 00077 /** 00078 * Initialize an <ACE_Message_Queue>. The <high_water_mark> 00079 * determines how many bytes can be stored in a queue before it's 00080 * considered "full." Supplier threads must block until the queue 00081 * is no longer full. The <low_water_mark> determines how many 00082 * bytes must be in the queue before supplier threads are allowed to 00083 * enqueue additional <ACE_Message_Block>s. By default, the 00084 * <high_water_mark> equals the <low_water_mark>, which means that 00085 * suppliers will be able to enqueue new messages as soon as a 00086 * consumer removes any message from the queue. Making the 00087 * <low_water_mark> smaller than the <high_water_mark> forces 00088 * consumers to drain more messages from the queue before suppliers 00089 * can enqueue new messages, which can minimize the "silly window 00090 * syndrome." 00091 */ 00092 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00093 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00094 ACE_Notification_Strategy * = 0); 00095 00096 /// Release all resources from the message queue and mark it as deactivated. 00097 /// Returns the number of messages released from the queue. 00098 virtual int close (void); 00099 00100 /// Release all resources from the message queue and mark it as deactivated. 00101 virtual ~ACE_Message_Queue (void); 00102 00103 /// Release all resources from the message queue but do not mark it 00104 /// as deactivated. 00105 /** 00106 * This method holds the queue lock during this operation. 00107 * 00108 * @return The number of messages flushed. 00109 */ 00110 virtual int flush (void); 00111 00112 /// Release all resources from the message queue but do not mark it 00113 /// as deactivated. 00114 /** 00115 * The caller must be holding the queue lock before calling this 00116 * method. 00117 * 00118 * @return The number of messages flushed. 00119 */ 00120 virtual int flush_i (void); 00121 00122 // = Enqueue and dequeue methods. 00123 00124 // For the following enqueue and dequeue methods if <timeout> == 0, 00125 // the caller will block until action is possible, else will wait 00126 // until the absolute time specified in *<timeout> elapses). These 00127 // calls will return, however, when queue is closed, deactivated, 00128 // when a signal occurs, or if the time specified in timeout 00129 // elapses, (in which case errno = EWOULDBLOCK). 00130 00131 /** 00132 * Retrieve a pointer to the first ACE_Message_Block in the queue 00133 * without removing it. 00134 * 00135 * @param first_item Reference to an ACE_Message_Block * that will 00136 * point to the first block on the queue. The block 00137 * remains on the queue until this or another thread 00138 * dequeues it. 00139 * @param timeout The absolute time the caller will wait until 00140 * for a block to be queued. 00141 * 00142 * @retval >0 The number of ACE_Message_Blocks on the queue. 00143 * @retval -1 On failure. errno holds the reason. If EWOULDBLOCK, 00144 * the timeout elapsed. If ESHUTDOWN, the queue was 00145 * deactivated or pulsed. 00146 */ 00147 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00148 ACE_Time_Value *timeout = 0); 00149 00150 /** 00151 * Enqueue an ACE_Message_Block into the queue in accordance with 00152 * the ACE_Message_Block's priority (0 is lowest priority). FIFO 00153 * order is maintained when messages of the same priority are 00154 * inserted consecutively. 00155 * 00156 * @param new_item Pointer to an ACE_Message_Block that will be 00157 * added to the queue. The block's @c msg_priority() 00158 * method will be called to obtain the queueing priority. 00159 * @param timeout The absolute time the caller will wait until 00160 * for the block to be queued. 00161 * 00162 * @retval >0 The number of ACE_Message_Blocks on the queue after adding 00163 * the specified block. 00164 * @retval -1 On failure. errno holds the reason. If EWOULDBLOCK, 00165 * the timeout elapsed. If ESHUTDOWN, the queue was 00166 * deactivated or pulsed. 00167 */ 00168 virtual int enqueue_prio (ACE_Message_Block *new_item, 00169 ACE_Time_Value *timeout = 0); 00170 00171 /** 00172 * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in 00173 * accordance with its <msg_deadline_time>. FIFO 00174 * order is maintained when messages of the same deadline time are 00175 * inserted consecutively. Note that <timeout> uses <{absolute}> 00176 * time rather than <{relative}> time. If the <timeout> elapses 00177 * without receiving a message -1 is returned and <errno> is set to 00178 * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and 00179 * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, 00180 * else the number of items still on the queue. 00181 */ 00182 virtual int enqueue_deadline (ACE_Message_Block *new_item, 00183 ACE_Time_Value *timeout = 0); 00184 00185 /** 00186 * This is an alias for <enqueue_prio>. It's only here for 00187 * backwards compatibility and will go away in a subsequent release. 00188 * Please use <enqueue_prio> instead. Note that <timeout> uses 00189 * <{absolute}> time rather than <{relative}> time. 00190 */ 00191 virtual int enqueue (ACE_Message_Block *new_item, 00192 ACE_Time_Value *timeout = 0); 00193 00194 /** 00195 * Enqueue an <ACE_Message_Block *> at the end of the queue. Note 00196 * that <timeout> uses <{absolute}> time rather than <{relative}> 00197 * time. If the <timeout> elapses without receiving a message -1 is 00198 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 00199 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 00200 * Otherwise, returns -1 on failure, else the number of items still 00201 * on the queue. 00202 */ 00203 virtual int enqueue_tail (ACE_Message_Block *new_item, 00204 ACE_Time_Value *timeout = 0); 00205 00206 /** 00207 * Enqueue an <ACE_Message_Block *> at the head of the queue. Note 00208 * that <timeout> uses <{absolute}> time rather than <{relative}> 00209 * time. If the <timeout> elapses without receiving a message -1 is 00210 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 00211 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 00212 * Otherwise, returns -1 on failure, else the number of items still 00213 * on the queue. 00214 */ 00215 virtual int enqueue_head (ACE_Message_Block *new_item, 00216 ACE_Time_Value *timeout = 0); 00217 00218 /// This method is an alias for the following <dequeue_head> method. 00219 virtual int dequeue (ACE_Message_Block *&first_item, 00220 ACE_Time_Value *timeout = 0); 00221 00222 /** 00223 * Dequeue and return the <ACE_Message_Block *> at the head of the 00224 * queue. Note that <timeout> uses <{absolute}> time rather than 00225 * <{relative}> time. If the <timeout> elapses without receiving a 00226 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 00227 * the queue is deactivated -1 is returned and <errno> is set to 00228 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 00229 * of items still on the queue. 00230 */ 00231 virtual int dequeue_head (ACE_Message_Block *&first_item, 00232 ACE_Time_Value *timeout = 0); 00233 00234 /** 00235 * Dequeue and return the <ACE_Message_Block *> that has the lowest 00236 * priority. Note that <timeout> uses <{absolute}> time rather than 00237 * <{relative}> time. If the <timeout> elapses without receiving a 00238 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 00239 * the queue is deactivated -1 is returned and <errno> is set to 00240 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 00241 * of items still on the queue. 00242 */ 00243 virtual int dequeue_prio (ACE_Message_Block *&first_item, 00244 ACE_Time_Value *timeout = 0); 00245 00246 /** 00247 * Dequeue and return the <ACE_Message_Block *> at the tail of the 00248 * queue. Note that <timeout> uses <{absolute}> time rather than 00249 * <{relative}> time. If the <timeout> elapses without receiving a 00250 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 00251 * the queue is deactivated -1 is returned and <errno> is set to 00252 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 00253 * of items still on the queue. 00254 */ 00255 virtual int dequeue_tail (ACE_Message_Block *&dequeued, 00256 ACE_Time_Value *timeout = 0); 00257 00258 /** 00259 * Dequeue and return the <ACE_Message_Block *> with the lowest 00260 * deadlien time. Note that <timeout> uses <{absolute}> time rather than 00261 * <{relative}> time. If the <timeout> elapses without receiving a 00262 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 00263 * the queue is deactivated -1 is returned and <errno> is set to 00264 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 00265 * of items still on the queue. 00266 */ 00267 virtual int dequeue_deadline (ACE_Message_Block *&dequeued, 00268 ACE_Time_Value *timeout = 0); 00269 00270 // = Check if queue is full/empty. 00271 /// True if queue is full, else false. 00272 virtual int is_full (void); 00273 /// True if queue is empty, else false. 00274 virtual int is_empty (void); 00275 00276 // = Queue statistic methods. 00277 /** 00278 * Number of total bytes on the queue, i.e., sum of the message 00279 * block sizes. 00280 */ 00281 virtual size_t message_bytes (void); 00282 /** 00283 * Number of total length on the queue, i.e., sum of the message 00284 * block lengths. 00285 */ 00286 virtual size_t message_length (void); 00287 /** 00288 * Number of total messages on the queue. 00289 */ 00290 virtual int message_count (void); 00291 00292 // = Manual changes to these stats (used when queued message blocks 00293 // change size or lengths). 00294 /** 00295 * New value of the number of total bytes on the queue, i.e., sum of 00296 * the message block sizes. 00297 */ 00298 virtual void message_bytes (size_t new_size); 00299 /** 00300 * New value of the number of total length on the queue, i.e., sum 00301 * of the message block lengths. 00302 */ 00303 virtual void message_length (size_t new_length); 00304 00305 // = Flow control methods. 00306 00307 /** 00308 * Get high watermark. 00309 */ 00310 virtual size_t high_water_mark (void); 00311 /** 00312 * Set the high watermark, which determines how many bytes can be 00313 * stored in a queue before it's considered "full." 00314 */ 00315 virtual void high_water_mark (size_t hwm); 00316 00317 /** 00318 * Get low watermark. 00319 */ 00320 virtual size_t low_water_mark (void); 00321 /** 00322 * Set the low watermark, which determines how many bytes must be in 00323 * the queue before supplier threads are allowed to enqueue 00324 * additional <ACE_Message_Block>s. 00325 */ 00326 virtual void low_water_mark (size_t lwm); 00327 00328 // = Activation control methods. 00329 00330 /** 00331 * Deactivate the queue and wakeup all threads waiting on the queue 00332 * so they can continue. No messages are removed from the queue, 00333 * however. Any other operations called until the queue is 00334 * activated again will immediately return -1 with <errno> == 00335 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 00336 * call and WAS_ACTIVE if queue was active before the call. 00337 */ 00338 virtual int deactivate (void); 00339 00340 /** 00341 * Reactivate the queue so that threads can enqueue and dequeue 00342 * messages again. Returns the state of the queue before the call. 00343 */ 00344 virtual int activate (void); 00345 00346 /** 00347 * Pulse the queue to wake up any waiting threads. Changes the 00348 * queue state to PULSED; future enqueue/dequeue operations proceed 00349 * as in ACTIVATED state. 00350 * 00351 * @return The queue's state before this call. 00352 */ 00353 virtual int pulse (void); 00354 00355 /// Returns the current state of the queue, which can be one of 00356 /// ACTIVATED, DEACTIVATED, or PULSED. 00357 virtual int state (void); 00358 00359 /// Returns true if the state of the queue is <DEACTIVATED>, 00360 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00361 virtual int deactivated (void); 00362 00363 // = Notification hook. 00364 00365 /** 00366 * This hook is automatically invoked by <enqueue_head>, 00367 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 00368 * into the queue. Subclasses can override this method to perform 00369 * specific notification strategies (e.g., signaling events for a 00370 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 00371 * multi-threaded application with concurrent consumers, there is no 00372 * guarantee that the queue will be still be non-empty by the time 00373 * the notification occurs. 00374 */ 00375 virtual int notify (void); 00376 00377 /// Get the notification strategy for the <Message_Queue> 00378 virtual ACE_Notification_Strategy *notification_strategy (void); 00379 00380 /// Set the notification strategy for the <Message_Queue> 00381 virtual void notification_strategy (ACE_Notification_Strategy *s); 00382 00383 /// Returns a reference to the lock used by the <ACE_Message_Queue>. 00384 virtual ACE_SYNCH_MUTEX_T &lock (void) 00385 { 00386 // The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the 00387 // header file (j.russell.noseworthy@objectsciences.com) 00388 return this->lock_; 00389 } 00390 00391 /// Dump the state of an object. 00392 virtual void dump (void) const; 00393 00394 /// Declare the dynamic allocation hooks. 00395 ACE_ALLOC_HOOK_DECLARE; 00396 00397 protected: 00398 // = Routines that actually do the enqueueing and dequeueing. 00399 00400 // These routines assume that locks are held by the corresponding 00401 // public methods. Since they are virtual, you can change the 00402 // queueing mechanism by subclassing from <ACE_Message_Queue>. 00403 00404 /// Enqueue an <ACE_Message_Block *> in accordance with its priority. 00405 virtual int enqueue_i (ACE_Message_Block *new_item); 00406 00407 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time. 00408 virtual int enqueue_deadline_i (ACE_Message_Block *new_item); 00409 00410 /// Enqueue an <ACE_Message_Block *> at the end of the queue. 00411 virtual int enqueue_tail_i (ACE_Message_Block *new_item); 00412 00413 /// Enqueue an <ACE_Message_Block *> at the head of the queue. 00414 virtual int enqueue_head_i (ACE_Message_Block *new_item); 00415 00416 /// Dequeue and return the <ACE_Message_Block *> at the head of the 00417 /// queue. 00418 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00419 00420 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00421 /// priority. 00422 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued); 00423 00424 /// Dequeue and return the <ACE_Message_Block *> at the tail of the 00425 /// queue. 00426 virtual int dequeue_tail_i (ACE_Message_Block *&first_item); 00427 00428 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00429 /// deadline time. 00430 virtual int dequeue_deadline_i (ACE_Message_Block *&first_item); 00431 00432 // = Check the boundary conditions (assumes locks are held). 00433 00434 /// True if queue is full, else false. 00435 virtual int is_full_i (void); 00436 00437 /// True if queue is empty, else false. 00438 virtual int is_empty_i (void); 00439 00440 // = Implementation of the public <activate> and <deactivate> methods. 00441 00442 // These methods assume locks are held. 00443 00444 /** 00445 * Notifies all waiting threads that the queue has been deactivated 00446 * so they can wakeup and continue other processing. 00447 * No messages are removed from the queue. 00448 * 00449 * @param pulse If 0, the queue's state is changed to DEACTIVATED 00450 * and any other operations called until the queue is 00451 * reactivated will immediately return -1 with 00452 * errno == ESHUTDOWN. 00453 * If not zero, only the waiting threads are notified and 00454 * the queue's state changes to PULSED. 00455 * 00456 * @return The state of the queue before the call. 00457 */ 00458 virtual int deactivate_i (int pulse = 0); 00459 00460 /// Activate the queue. 00461 virtual int activate_i (void); 00462 00463 // = Helper methods to factor out common #ifdef code. 00464 00465 /// Wait for the queue to become non-full. 00466 virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00467 ACE_Time_Value *timeout); 00468 00469 /// Wait for the queue to become non-empty. 00470 virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, 00471 ACE_Time_Value *timeout); 00472 00473 /// Inform any threads waiting to enqueue that they can procede. 00474 virtual int signal_enqueue_waiters (void); 00475 00476 /// Inform any threads waiting to dequeue that they can procede. 00477 virtual int signal_dequeue_waiters (void); 00478 00479 /// Pointer to head of ACE_Message_Block list. 00480 ACE_Message_Block *head_; 00481 00482 /// Pointer to tail of ACE_Message_Block list. 00483 ACE_Message_Block *tail_; 00484 00485 /// Lowest number before unblocking occurs. 00486 size_t low_water_mark_; 00487 00488 /// Greatest number of bytes before blocking. 00489 size_t high_water_mark_; 00490 00491 /// Current number of bytes in the queue. 00492 size_t cur_bytes_; 00493 00494 /// Current length of messages in the queue. 00495 size_t cur_length_; 00496 00497 /// Current number of messages in the queue. 00498 int cur_count_; 00499 00500 /// The notification strategy used when a new message is enqueued. 00501 ACE_Notification_Strategy *notification_strategy_; 00502 00503 // = Synchronization primitives for controlling concurrent access. 00504 /// Protect queue from concurrent access. 00505 ACE_SYNCH_MUTEX_T lock_; 00506 00507 /// Used to make threads sleep until the queue is no longer empty. 00508 ACE_SYNCH_CONDITION_T not_empty_cond_; 00509 00510 /// Used to make threads sleep until the queue is no longer full. 00511 ACE_SYNCH_CONDITION_T not_full_cond_; 00512 00513 private: 00514 00515 // = Disallow these operations. 00516 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00517 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) 00518 }; 00519 00520 // This typedef is used to get around a compiler bug in g++/vxworks. 00521 typedef ACE_Message_Queue<ACE_SYNCH> ACE_DEFAULT_MESSAGE_QUEUE_TYPE; 00522 00523 00524 /** 00525 * @class ACE_Message_Queue_Iterator 00526 * 00527 * @brief Iterator for the <ACE_Message_Queue>. 00528 */ 00529 template <ACE_SYNCH_DECL> 00530 class ACE_Message_Queue_Iterator 00531 { 00532 public: 00533 // = Initialization method. 00534 ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00535 00536 // = Iteration methods. 00537 /// Pass back the <entry> that hasn't been seen in the queue. 00538 /// Returns 0 when all items have been seen, else 1. 00539 int next (ACE_Message_Block *&entry); 00540 00541 /// Returns 1 when all items have been seen, else 0. 00542 int done (void) const; 00543 00544 /// Move forward by one element in the queue. Returns 0 when all the 00545 /// items in the set have been seen, else 1. 00546 int advance (void); 00547 00548 /// Dump the state of an object. 00549 void dump (void) const; 00550 00551 /// Declare the dynamic allocation hooks. 00552 ACE_ALLOC_HOOK_DECLARE; 00553 00554 private: 00555 /// Message_Queue we are iterating over. 00556 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00557 00558 /// Keeps track of how far we've advanced... 00559 ACE_Message_Block *curr_; 00560 }; 00561 00562 /** 00563 * @class ACE_Message_Queue_Reverse_Iterator 00564 * 00565 * @brief Reverse Iterator for the <ACE_Message_Queue>. 00566 */ 00567 template <ACE_SYNCH_DECL> 00568 class ACE_Message_Queue_Reverse_Iterator 00569 { 00570 public: 00571 // = Initialization method. 00572 ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); 00573 00574 // = Iteration methods. 00575 /// Pass back the <entry> that hasn't been seen in the queue. 00576 /// Returns 0 when all items have been seen, else 1. 00577 int next (ACE_Message_Block *&entry); 00578 00579 /// Returns 1 when all items have been seen, else 0. 00580 int done (void) const; 00581 00582 /// Move forward by one element in the queue. Returns 0 when all the 00583 /// items in the set have been seen, else 1. 00584 int advance (void); 00585 00586 /// Dump the state of an object. 00587 void dump (void) const; 00588 00589 /// Declare the dynamic allocation hooks. 00590 ACE_ALLOC_HOOK_DECLARE; 00591 00592 private: 00593 /// Message_Queue we are iterating over. 00594 ACE_Message_Queue <ACE_SYNCH_USE> &queue_; 00595 00596 /// Keeps track of how far we've advanced... 00597 ACE_Message_Block *curr_; 00598 }; 00599 00600 /** 00601 * @class ACE_Dynamic_Message_Queue 00602 * 00603 * @brief A derived class which adapts the <ACE_Message_Queue> 00604 * class in order to maintain dynamic priorities for enqueued 00605 * <ACE_Message_Blocks> and manage the queue order according 00606 * to these dynamic priorities. 00607 * 00608 * The messages in the queue are managed so as to preserve 00609 * a logical ordering with minimal overhead per enqueue and 00610 * dequeue operation. For this reason, the actual order of 00611 * messages in the linked list of the queue may differ from 00612 * their priority order. As time passes, a message may change 00613 * from pending status to late status, and eventually to beyond 00614 * late status. To minimize reordering overhead under this 00615 * design force, three separate boundaries are maintained 00616 * within the linked list of messages. Messages are dequeued 00617 * preferentially from the head of the pending portion, then 00618 * the head of the late portion, and finally from the head 00619 * of the beyond late portion. In this way, only the boundaries 00620 * need to be maintained (which can be done efficiently, as 00621 * aging messages maintain the same linked list order as they 00622 * progress from one status to the next), with no reordering 00623 * of the messages themselves, while providing correct priority 00624 * ordered dequeueing semantics. 00625 * Head and tail enqueue methods inherited from ACE_Message_Queue 00626 * are made private to prevent out-of-order messages from confusing 00627 * management of the various portions of the queue. Messages in 00628 * the pending portion of the queue whose priority becomes late 00629 * (according to the specific dynamic strategy) advance into 00630 * the late portion of the queue. Messages in the late portion 00631 * of the queue whose priority becomes later than can be represented 00632 * advance to the beyond_late portion of the queue. These behaviors 00633 * support a limited schedule overrun, with pending messages prioritized 00634 * ahead of late messages, and late messages ahead of beyond late 00635 * messages. These behaviors can be modified in derived classes by 00636 * providing alternative definitions for the appropriate virtual methods. 00637 * When filled with messages, the queue's linked list should look like: 00638 * H T 00639 * | | 00640 * B - B - B - B - L - L - L - P - P - P - P - P 00641 * | | | | | | 00642 * BH BT LH LT PH PT 00643 * Where the symbols are as follows: 00644 * H = Head of the entire list 00645 * T = Tail of the entire list 00646 * B = Beyond late message 00647 * BH = Beyond late messages Head 00648 * BT = Beyond late messages Tail 00649 * L = Late message 00650 * LH = Late messages Head 00651 * LT = Late messages Tail 00652 * P = Pending message 00653 * PH = Pending messages Head 00654 * PT = Pending messages Tail 00655 * Caveat: the virtual methods enqueue_tail, enqueue_head, 00656 * and peek_dequeue_head have semantics for the static 00657 * message queues that cannot be guaranteed for dynamic 00658 * message queues. The peek_dequeue_head method just 00659 * calls the base class method, while the two enqueue 00660 * methods call the priority enqueue method. The 00661 * order of messages in the dynamic queue is a function 00662 * of message deadlines and how long they are in the 00663 * queues. You can manipulate these in some cases to 00664 * ensure the correct semantics, but that is not a 00665 * very stable or portable approach (discouraged). 00666 */ 00667 template <ACE_SYNCH_DECL> 00668 class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> 00669 { 00670 public: 00671 // = Initialization and termination methods. 00672 ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, 00673 size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00674 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00675 ACE_Notification_Strategy * = 0); 00676 00677 /// Close down the message queue and release all resources. 00678 virtual ~ACE_Dynamic_Message_Queue (void); 00679 00680 /** 00681 * Detach all messages with status given in the passed flags from 00682 * the queue and return them by setting passed head and tail pointers 00683 * to the linked list they comprise. This method is intended primarily 00684 * as a means of periodically harvesting messages that have missed 00685 * their deadlines, but is available in its most general form. All 00686 * messages are returned in priority order, from head to tail, as of 00687 * the time this method was called. 00688 */ 00689 virtual int remove_messages (ACE_Message_Block *&list_head, 00690 ACE_Message_Block *&list_tail, 00691 u_int status_flags); 00692 00693 /** 00694 * Dequeue and return the <ACE_Message_Block *> at the head of the 00695 * queue. Returns -1 on failure, else the number of items still on 00696 * the queue. 00697 */ 00698 virtual int dequeue_head (ACE_Message_Block *&first_item, 00699 ACE_Time_Value *timeout = 0); 00700 00701 /// Dump the state of the queue. 00702 virtual void dump (void) const; 00703 00704 /** 00705 * Just call priority enqueue method: tail enqueue semantics for dynamic 00706 * message queues are unstable: the message may or may not be where 00707 * it was placed after the queue is refreshed prior to the next 00708 * enqueue or dequeue operation. 00709 */ 00710 virtual int enqueue_tail (ACE_Message_Block *new_item, 00711 ACE_Time_Value *timeout = 0); 00712 00713 /** 00714 * Just call priority enqueue method: head enqueue semantics for dynamic 00715 * message queues are unstable: the message may or may not be where 00716 * it was placed after the queue is refreshed prior to the next 00717 * enqueue or dequeue operation. 00718 */ 00719 virtual int enqueue_head (ACE_Message_Block *new_item, 00720 ACE_Time_Value *timeout = 0); 00721 00722 00723 /// Declare the dynamic allocation hooks. 00724 ACE_ALLOC_HOOK_DECLARE; 00725 00726 protected: 00727 00728 /** 00729 * Enqueue an <ACE_Message_Block *> in accordance with its priority. 00730 * priority may be *dynamic* or *static* or a combination or *both* 00731 * It calls the priority evaluation function passed into the Dynamic 00732 * Message Queue constructor to update the priorities of all 00733 * enqueued messages. 00734 */ 00735 virtual int enqueue_i (ACE_Message_Block *new_item); 00736 00737 /// Enqueue a message in priority order within a given priority status sublist 00738 virtual int sublist_enqueue_i (ACE_Message_Block *new_item, 00739 const ACE_Time_Value ¤t_time, 00740 ACE_Message_Block *&sublist_head, 00741 ACE_Message_Block *&sublist_tail, 00742 ACE_Dynamic_Message_Strategy::Priority_Status status); 00743 00744 /** 00745 * Dequeue and return the <ACE_Message_Block *> at the head of the 00746 * logical queue. Attempts first to dequeue from the pending 00747 * portion of the queue, or if that is empty from the late portion, 00748 * or if that is empty from the beyond late portion, or if that is 00749 * empty just sets the passed pointer to zero and returns -1. 00750 */ 00751 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00752 00753 /// Refresh the queue using the strategy 00754 /// specific priority status function. 00755 virtual int refresh_queue (const ACE_Time_Value & current_time); 00756 00757 /// Refresh the pending queue using the strategy 00758 /// specific priority status function. 00759 virtual int refresh_pending_queue (const ACE_Time_Value & current_time); 00760 00761 /// Refresh the late queue using the strategy 00762 /// specific priority status function. 00763 virtual int refresh_late_queue (const ACE_Time_Value & current_time); 00764 00765 /// Pointer to head of the pending messages 00766 ACE_Message_Block *pending_head_; 00767 00768 /// Pointer to tail of the pending messages 00769 ACE_Message_Block *pending_tail_; 00770 00771 /// Pointer to head of the late messages 00772 ACE_Message_Block *late_head_; 00773 00774 /// Pointer to tail of the late messages 00775 ACE_Message_Block *late_tail_; 00776 00777 /// Pointer to head of the beyond late messages 00778 ACE_Message_Block *beyond_late_head_; 00779 00780 /// Pointer to tail of the beyond late messages 00781 ACE_Message_Block *beyond_late_tail_; 00782 00783 /// Pointer to a dynamic priority evaluation function. 00784 ACE_Dynamic_Message_Strategy &message_strategy_; 00785 00786 private: 00787 // = Disallow public access to these operations. 00788 00789 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00790 ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) 00791 00792 // provide definitions for these (just call base class method), 00793 // but make them private so they're not accessible outside the class 00794 00795 /// Private method to hide public base class method: just calls base class method 00796 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00797 ACE_Time_Value *timeout = 0); 00798 00799 }; 00800 00801 /** 00802 * @class ACE_Message_Queue_Factory 00803 * 00804 * @brief ACE_Message_Queue_Factory is a static factory class template which 00805 * provides a separate factory method for each of the major kinds of 00806 * priority based message dispatching: static, earliest deadline first 00807 * (EDF), and minimum laxity first (MLF). 00808 * 00809 * The ACE_Dynamic_Message_Queue class assumes responsibility for 00810 * releasing the resources of the strategy with which it was 00811 * constructed: the user of a message queue constructed by 00812 * any of these factory methods is only responsible for 00813 * ensuring destruction of the message queue itself. 00814 */ 00815 template <ACE_SYNCH_DECL> 00816 class ACE_Message_Queue_Factory 00817 { 00818 public: 00819 /// Factory method for a statically prioritized ACE_Message_Queue 00820 static ACE_Message_Queue<ACE_SYNCH_USE> * 00821 create_static_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00822 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00823 ACE_Notification_Strategy * = 0); 00824 00825 /// Factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue 00826 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00827 create_deadline_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00828 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00829 ACE_Notification_Strategy * = 0, 00830 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00831 u_long static_bit_field_shift = 10, // 10 low order bits 00832 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00833 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00834 00835 /// Factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue 00836 static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * 00837 create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00838 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00839 ACE_Notification_Strategy * = 0, 00840 u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 00841 u_long static_bit_field_shift = 10, // 10 low order bits 00842 u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 00843 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) 00844 00845 00846 #if defined (VXWORKS) 00847 00848 /// Factory method for a wrapped VxWorks message queue 00849 static ACE_Message_Queue_Vx * 00850 create_Vx_message_queue (size_t max_messages, size_t max_message_length, 00851 ACE_Notification_Strategy *ns = 0); 00852 00853 #endif /* defined (VXWORKS) */ 00854 00855 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00856 00857 /// Factory method for a NT message queue. 00858 static ACE_Message_Queue_NT * 00859 create_NT_message_queue (size_t max_threads); 00860 00861 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ 00862 }; 00863 00864 /** 00865 * @class ACE_Message_Queue_Ex 00866 * 00867 * @brief A threaded message queueing facility, modeled after the 00868 * queueing facilities in System V STREAMs. 00869 * 00870 * An <ACE_Message_Queue_Ex> is a strongly-typed version of the 00871 * <ACE_Message_Queue>. If 00872 * <ACE_SYNCH_DECL> is <ACE_MT_SYNCH> then all operations are 00873 * thread-safe. Otherwise, if it's <ACE_NULL_SYNCH> then there's no 00874 * locking overhead. 00875 */ 00876 template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> 00877 class ACE_Message_Queue_Ex 00878 { 00879 public: 00880 00881 // = Default priority value. 00882 enum 00883 { 00884 DEFAULT_PRIORITY = 0 00885 }; 00886 00887 #if 0 00888 // @@ Iterators are not implemented yet... 00889 00890 friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; 00891 friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; 00892 00893 // = Traits 00894 typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> 00895 ITERATOR; 00896 typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> 00897 REVERSE_ITERATOR; 00898 #endif /* 0 */ 00899 00900 // = Initialization and termination methods. 00901 00902 /** 00903 * Initialize an <ACE_Message_Queue>. The <high_water_mark> 00904 * determines how many bytes can be stored in a queue before it's 00905 * considered "full." Supplier threads must block until the queue 00906 * is no longer full. The <low_water_mark> determines how many 00907 * bytes must be in the queue before supplier threads are allowed to 00908 * enqueue additional <ACE_Message_Block>s. By default, the 00909 * <high_water_mark> equals the <low_water_mark>, which means that 00910 * suppliers will be able to enqueue new messages as soon as a 00911 * consumer removes any message from the queue. Making the 00912 * <low_water_mark> smaller than the <high_water_mark> forces 00913 * consumers to drain more messages from the queue before suppliers 00914 * can enqueue new messages, which can minimize the "silly window 00915 * syndrome." 00916 */ 00917 ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, 00918 size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, 00919 ACE_Notification_Strategy * = 0); 00920 00921 /** 00922 * Initialize an <ACE_Message_Queue>. The <high_water_mark> 00923 * determines how many bytes can be stored in a queue before it's 00924 * considered "full." Supplier threads must block until the queue 00925 * is no longer full. The <low_water_mark> determines how many 00926 * bytes must be in the queue before supplier threads are allowed to 00927 * enqueue additional <ACE_Message_Block>s. By default, the 00928 * <high_water_mark> equals the <low_water_mark>, which means that 00929 * suppliers will be able to enqueue new messages as soon as a 00930 * consumer removes any message from the queue. Making the 00931 * <low_water_mark> smaller than the <high_water_mark> forces 00932 * consumers to drain more messages from the queue before suppliers 00933 * can enqueue new messages, which can minimize the "silly window 00934 * syndrome." 00935 */ 00936 virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, 00937 size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, 00938 ACE_Notification_Strategy * = 0); 00939 00940 /// Close down the message queue and release all resources. 00941 virtual int close (void); 00942 00943 /// Close down the message queue and release all resources. 00944 virtual ~ACE_Message_Queue_Ex (void); 00945 00946 /// Release all resources from the message queue but do not mark it as deactivated. 00947 /// This method holds the queue lock during this operation. Returns the number of 00948 /// messages flushed. 00949 virtual int flush (void); 00950 00951 /// Release all resources from the message queue but do not mark it as deactivated. 00952 /// This method does not hold the queue lock during this operation, i.e., it assume 00953 /// the lock is held externally. Returns the number of messages flushed. 00954 virtual int flush_i (void); 00955 00956 // = Enqueue and dequeue methods. 00957 00958 // For the following enqueue and dequeue methods if <timeout> == 0, 00959 // the caller will block until action is possible, else will wait 00960 // until the absolute time specified in *<timeout> elapses). These 00961 // calls will return, however, when queue is closed, deactivated, 00962 // when a signal occurs, or if the time specified in timeout 00963 // elapses, (in which case errno = EWOULDBLOCK). 00964 00965 /** 00966 * Retrieve the first <ACE_MESSAGE_TYPE> without removing it. Note 00967 * that <timeout> uses <{absolute}> time rather than <{relative}> 00968 * time. If the <timeout> elapses without receiving a message -1 is 00969 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 00970 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 00971 * Otherwise, returns -1 on failure, else the number of items still 00972 * on the queue. 00973 */ 00974 virtual int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item, 00975 ACE_Time_Value *timeout = 0); 00976 00977 /** 00978 * Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in 00979 * accordance with its <msg_priority> (0 is lowest priority). FIFO 00980 * order is maintained when messages of the same priority are 00981 * inserted consecutively. Note that <timeout> uses <{absolute}> 00982 * time rather than <{relative}> time. If the <timeout> elapses 00983 * without receiving a message -1 is returned and <errno> is set to 00984 * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and 00985 * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, 00986 * else the number of items still on the queue. 00987 */ 00988 virtual int enqueue_prio (ACE_MESSAGE_TYPE *new_item, 00989 ACE_Time_Value *timeout = 0); 00990 00991 /** 00992 * Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in 00993 * accordance with its <msg_deadline_time>. FIFO 00994 * order is maintained when messages of the same deadline time are 00995 * inserted consecutively. Note that <timeout> uses <{absolute}> 00996 * time rather than <{relative}> time. If the <timeout> elapses 00997 * without receiving a message -1 is returned and <errno> is set to 00998 * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and 00999 * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, 01000 * else the number of items still on the queue. 01001 */ 01002 virtual int enqueue_deadline (ACE_MESSAGE_TYPE *new_item, 01003 ACE_Time_Value *timeout = 0); 01004 01005 /** 01006 * This is an alias for <enqueue_prio>. It's only here for 01007 * backwards compatibility and will go away in a subsequent release. 01008 * Please use <enqueue_prio> instead. Note that <timeout> uses 01009 * <{absolute}> time rather than <{relative}> time. 01010 */ 01011 virtual int enqueue (ACE_MESSAGE_TYPE *new_item, 01012 ACE_Time_Value *timeout = 0); 01013 01014 /** 01015 * Enqueue an <ACE_MESSAGE_TYPE *> at the end of the queue. Note 01016 * that <timeout> uses <{absolute}> time rather than <{relative}> 01017 * time. If the <timeout> elapses without receiving a message -1 is 01018 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 01019 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 01020 * Otherwise, returns -1 on failure, else the number of items still 01021 * on the queue. 01022 */ 01023 virtual int enqueue_tail (ACE_MESSAGE_TYPE *new_item, 01024 ACE_Time_Value *timeout = 0); 01025 01026 /** 01027 * Enqueue an <ACE_MESSAGE_TYPE *> at the head of the queue. Note 01028 * that <timeout> uses <{absolute}> time rather than <{relative}> 01029 * time. If the <timeout> elapses without receiving a message -1 is 01030 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 01031 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 01032 * Otherwise, returns -1 on failure, else the number of items still 01033 * on the queue. 01034 */ 01035 virtual int enqueue_head (ACE_MESSAGE_TYPE *new_item, 01036 ACE_Time_Value *timeout = 0); 01037 01038 /// This method is an alias for the following <dequeue_head> method. 01039 virtual int dequeue (ACE_MESSAGE_TYPE *&first_item, 01040 ACE_Time_Value *timeout = 0); 01041 // This method is an alias for the following <dequeue_head> method. 01042 01043 /** 01044 * Dequeue and return the <ACE_MESSAGE_TYPE *> at the head of the 01045 * queue. Note that <timeout> uses <{absolute}> time rather than 01046 * <{relative}> time. If the <timeout> elapses without receiving a 01047 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01048 * the queue is deactivated -1 is returned and <errno> is set to 01049 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01050 * of items still on the queue. 01051 */ 01052 virtual int dequeue_head (ACE_MESSAGE_TYPE *&first_item, 01053 ACE_Time_Value *timeout = 0); 01054 01055 /** 01056 * Dequeue and return the <ACE_MESSAGE_TYPE *> that has the lowest 01057 * priority. Note that <timeout> uses <{absolute}> time rather than 01058 * <{relative}> time. If the <timeout> elapses without receiving a 01059 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01060 * the queue is deactivated -1 is returned and <errno> is set to 01061 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01062 * of items still on the queue. 01063 */ 01064 virtual int dequeue_prio (ACE_MESSAGE_TYPE *&dequeued, 01065 ACE_Time_Value *timeout = 0); 01066 01067 /** 01068 * Dequeue and return the <ACE_MESSAGE_TYPE *> at the tail of the 01069 * queue. Note that <timeout> uses <{absolute}> time rather than 01070 * <{relative}> time. If the <timeout> elapses without receiving a 01071 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01072 * the queue is deactivated -1 is returned and <errno> is set to 01073 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01074 * of items still on the queue. 01075 */ 01076 virtual int dequeue_tail (ACE_MESSAGE_TYPE *&dequeued, 01077 ACE_Time_Value *timeout = 0); 01078 01079 /** 01080 * Dequeue and return the <ACE_MESSAGE_TYPE *> with the lowest 01081 * deadline time. Note that <timeout> uses <{absolute}> time rather than 01082 * <{relative}> time. If the <timeout> elapses without receiving a 01083 * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If 01084 * the queue is deactivated -1 is returned and <errno> is set to 01085 * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number 01086 * of items still on the queue. 01087 */ 01088 virtual int dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued, 01089 ACE_Time_Value *timeout = 0); 01090 01091 // = Check if queue is full/empty. 01092 /// True if queue is full, else false. 01093 virtual int is_full (void); 01094 /// True if queue is empty, else false. 01095 virtual int is_empty (void); 01096 01097 01098 // = Queue statistic methods. 01099 /** 01100 * Number of total bytes on the queue, i.e., sum of the message 01101 * block sizes. 01102 */ 01103 virtual size_t message_bytes (void); 01104 /** 01105 * Number of total length on the queue, i.e., sum of the message 01106 * block lengths. 01107 */ 01108 virtual size_t message_length (void); 01109 /** 01110 * Number of total messages on the queue. 01111 */ 01112 virtual int message_count (void); 01113 01114 // = Manual changes to these stats (used when queued message blocks 01115 // change size or lengths). 01116 /** 01117 * New value of the number of total bytes on the queue, i.e., sum of 01118 * the message block sizes. 01119 */ 01120 virtual void message_bytes (size_t new_size); 01121 /** 01122 * New value of the number of total length on the queue, i.e., sum 01123 * of the message block lengths. 01124 */ 01125 virtual void message_length (size_t new_length); 01126 01127 // = Flow control methods. 01128 /** 01129 * Get high watermark. 01130 */ 01131 virtual size_t high_water_mark (void); 01132 /** 01133 * Set the high watermark, which determines how many bytes can be 01134 * stored in a queue before it's considered "full." 01135 */ 01136 virtual void high_water_mark (size_t hwm); 01137 01138 /** 01139 * Get low watermark. 01140 */ 01141 virtual size_t low_water_mark (void); 01142 /** 01143 * Set the low watermark, which determines how many bytes must be in 01144 * the queue before supplier threads are allowed to enqueue 01145 * additional <ACE_MESSAGE_TYPE>s. 01146 */ 01147 virtual void low_water_mark (size_t lwm); 01148 01149 // = Activation control methods. 01150 01151 /** 01152 * Deactivate the queue and wakeup all threads waiting on the queue 01153 * so they can continue. No messages are removed from the queue, 01154 * however. Any other operations called until the queue is 01155 * activated again will immediately return -1 with <errno> == 01156 * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the 01157 * call and WAS_ACTIVE if queue was active before the call. 01158 */ 01159 virtual int deactivate (void); 01160 01161 /** 01162 * Reactivate the queue so that threads can enqueue and dequeue 01163 * messages again. Returns the state of the queue before the call. 01164 */ 01165 virtual int activate (void); 01166 01167 /** 01168 * Pulse the queue to wake up any waiting threads. Changes the 01169 * queue state to PULSED; future enqueue/dequeue operations proceed 01170 * as in ACTIVATED state. 01171 * 01172 * @retval The queue's state before this call. 01173 */ 01174 virtual int pulse (void); 01175 01176 /// Returns the current state of the queue, which can be one of 01177 /// ACTIVATED, DEACTIVATED, or PULSED. 01178 virtual int state (void); 01179 01180 /// Returns true if the state of the queue is DEACTIVATED, 01181 /// but false if the queue's state is ACTIVATED or PULSED. 01182 virtual int deactivated (void); 01183 01184 // = Notification hook. 01185 01186 /** 01187 * This hook is automatically invoked by <enqueue_head>, 01188 * <enqueue_tail>, and <enqueue_prio> when a new item is inserted 01189 * into the queue. Subclasses can override this method to perform 01190 * specific notification strategies (e.g., signaling events for a 01191 * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a 01192 * multi-threaded application with concurrent consumers, there is no 01193 * guarantee that the queue will be still be non-empty by the time 01194 * the notification occurs. 01195 */ 01196 virtual int notify (void); 01197 01198 /// Get the notification strategy for the <Message_Queue> 01199 virtual ACE_Notification_Strategy *notification_strategy (void); 01200 01201 /// Set the notification strategy for the <Message_Queue> 01202 virtual void notification_strategy (ACE_Notification_Strategy *s); 01203 01204 /// Returns a reference to the lock used by the <ACE_Message_Queue_Ex>. 01205 virtual ACE_SYNCH_MUTEX_T &lock (void) 01206 { 01207 // The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the 01208 // header file (j.russell.noseworthy@objectsciences.com) 01209 return this->queue_.lock (); 01210 } 01211 01212 /// Dump the state of an object. 01213 virtual void dump (void) const; 01214 01215 /// Declare the dynamic allocation hooks. 01216 ACE_ALLOC_HOOK_DECLARE; 01217 01218 private: 01219 /// Implement this via an <ACE_Message_Queue>. 01220 ACE_Message_Queue<ACE_SYNCH_USE> queue_; 01221 }; 01222 01223 #if defined (__ACE_INLINE__) 01224 #include "ace/Message_Queue_T.i" 01225 #endif /* __ACE_INLINE__ */ 01226 01227 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) 01228 #include "ace/Message_Queue_T.cpp" 01229 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ 01230 01231 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) 01232 #pragma implementation ("Message_Queue_T.cpp") 01233 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ 01234 01235 #include "ace/post.h" 01236 #endif /* ACE_MESSAGE_QUEUE_T_H */
1.2.14 written by Dimitri van Heesch,
© 1997-2002