00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Message_Queue.h 00006 * 00007 * $Id: Message_Queue.h,v 1.1.1.4 2003/02/21 18:36:32 chad Exp $ 00008 * 00009 * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef ACE_MESSAGE_QUEUE_H 00014 #define ACE_MESSAGE_QUEUE_H 00015 #include "ace/pre.h" 00016 00017 #include "ace/Message_Block.h" 00018 00019 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00020 # pragma once 00021 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00022 00023 #include "ace/IO_Cntl_Msg.h" 00024 #include "ace/Synch.h" 00025 00026 // Forward decls. 00027 class ACE_Notification_Strategy; 00028 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Iterator; 00029 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator; 00030 00031 /** 00032 * @class ACE_Message_Queue_Base 00033 * 00034 * @brief Base class for <ACE_Message_Queue>, which is the central 00035 * queueing facility for messages in the ACE framework. 00036 * 00037 * For all the <ACE_Time_Value> pointer parameters the caller will 00038 * block until action is possible if <timeout> == 0. Otherwise, it 00039 * will wait until the absolute time specified in *<timeout> 00040 * elapses. 00041 * 00042 * A queue is always in one of three states: 00043 * . ACTIVATED 00044 * . DEACTIVATED 00045 * . PULSED 00046 */ 00047 class ACE_Export ACE_Message_Queue_Base 00048 { 00049 public: 00050 enum 00051 { 00052 // Default high and low watermarks. 00053 00054 /// Default high watermark (16 K). 00055 DEFAULT_HWM = 16 * 1024, 00056 /// Default low watermark (same as high water mark). 00057 DEFAULT_LWM = 16 * 1024, 00058 00059 // Queue states. Before PULSED state was added, the activate() 00060 // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE 00061 // to indicate the previous condition. Now those methods 00062 // return the state the queue was previously in. WAS_ACTIVE 00063 // and WAS_INACTIVE are defined to match previous semantics for 00064 // applications that don't use the PULSED state. 00065 00066 WAS_ACTIVE = 1, /* DEPRECATED */ 00067 /// Message queue is active and processing normally 00068 ACTIVATED = 1, 00069 00070 WAS_INACTIVE = 2, /* DEPRECATED */ 00071 /// Queue is deactivated; no enqueue or dequeue operations allowed. 00072 DEACTIVATED = 2, 00073 00074 /// Message queue was pulsed; enqueue and dequeue may proceed normally. 00075 PULSED = 3 00076 00077 }; 00078 00079 ACE_Message_Queue_Base (void); 00080 00081 /// Close down the message queue and release all resources. 00082 virtual int close (void) = 0; 00083 00084 /// Close down the message queue and release all resources. 00085 virtual ~ACE_Message_Queue_Base (void); 00086 00087 // = Enqueue and dequeue methods. 00088 00089 /** 00090 * Retrieve the first <ACE_Message_Block> without removing it. Note 00091 * that <timeout> uses <{absolute}> time rather than <{relative}> 00092 * time. If the <timeout> elapses without receiving a message -1 is 00093 * returned and <errno> is set to <EWOULDBLOCK>. If the queue is 00094 * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. 00095 * Otherwise, returns -1 on failure, else the number of items still 00096 * on the queue. 00097 */ 00098 virtual int peek_dequeue_head (ACE_Message_Block *&first_item, 00099 ACE_Time_Value *timeout = 0) = 0; 00100 00101 /** 00102 * Enqueue a <ACE_Message_Block *> into the tail of the queue. 00103 * Returns number of items in queue if the call succeeds or -1 00104 * otherwise. These calls return -1 when queue is closed, 00105 * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal 00106 * occurs (in which case <errno> == <EINTR>, or if the time 00107 * specified in timeout elapses (in which case <errno> == 00108 * <EWOULDBLOCK>). 00109 */ 00110 virtual int enqueue_tail (ACE_Message_Block *new_item, 00111 ACE_Time_Value *timeout = 0) = 0; 00112 virtual int enqueue (ACE_Message_Block *new_item, 00113 ACE_Time_Value *timeout = 0) = 0; 00114 00115 /** 00116 * Dequeue and return the <ACE_Message_Block *> at the head of the 00117 * queue. Returns number of items in queue if the call succeeds or 00118 * -1 otherwise. These calls return -1 when queue is closed, 00119 * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal 00120 * occurs (in which case <errno> == <EINTR>, or if the time 00121 * specified in timeout elapses (in which case <errno> == 00122 * <EWOULDBLOCK>). 00123 */ 00124 virtual int dequeue_head (ACE_Message_Block *&first_item, 00125 ACE_Time_Value *timeout = 0) = 0; 00126 virtual int dequeue (ACE_Message_Block *&first_item, 00127 ACE_Time_Value *timeout = 0) = 0; 00128 00129 // = Check if queue is full/empty. 00130 /// True if queue is full, else false. 00131 virtual int is_full (void) = 0; 00132 00133 /// True if queue is empty, else false. 00134 virtual int is_empty (void) = 0; 00135 00136 // = Queue statistic methods. 00137 00138 /// Number of total bytes on the queue, i.e., sum of the message 00139 /// block sizes. 00140 virtual size_t message_bytes (void) = 0; 00141 00142 /// Number of total length on the queue, i.e., sum of the message 00143 /// block lengths. 00144 virtual size_t message_length (void) = 0; 00145 00146 /// Number of total messages on the queue. 00147 virtual int message_count (void) = 0; 00148 00149 /// New value of the number of total bytes on the queue, i.e., 00150 /// sum of the message block sizes. 00151 virtual void message_bytes (size_t new_size) = 0; 00152 00153 /// New value of the number of total length on the queue, i.e., 00154 /// sum of the message block lengths. 00155 virtual void message_length (size_t new_length) = 0; 00156 00157 // = Activation control methods. 00158 00159 /** 00160 * Deactivate the queue and wake up all threads waiting on the queue 00161 * so they can continue. No messages are removed from the queue, 00162 * however. Any other operations called until the queue is 00163 * activated again will immediately return -1 with @c errno 00164 * ESHUTDOWN. 00165 * 00166 * @retval The queue's state before this call. 00167 */ 00168 virtual int deactivate (void) = 0; 00169 00170 /** 00171 * Reactivate the queue so that threads can enqueue and dequeue 00172 * messages again. 00173 * 00174 * @retval The queue's state before this call. 00175 */ 00176 virtual int activate (void) = 0; 00177 00178 /** 00179 * Pulse the queue to wake up any waiting threads. Changes the 00180 * queue state to PULSED; future enqueue/dequeue operations proceed 00181 * as in ACTIVATED state. 00182 * 00183 * @retval The queue's state before this call. 00184 */ 00185 virtual int pulse (void) = 0; 00186 00187 /// Returns the current state of the queue. 00188 virtual int state (void); 00189 00190 /// Returns 1 if the state of the queue is DEACTIVATED, 00191 /// and 0 if the queue's state is ACTIVATED or PULSED. 00192 virtual int deactivated (void) = 0; 00193 00194 /// Get the notification strategy for the <Message_Queue> 00195 virtual ACE_Notification_Strategy *notification_strategy (void) = 0; 00196 00197 /// Set the notification strategy for the <Message_Queue> 00198 virtual void notification_strategy (ACE_Notification_Strategy *s) = 0; 00199 00200 // = Notification hook. 00201 00202 /// Dump the state of an object. 00203 virtual void dump (void) const = 0; 00204 00205 /// Declare the dynamic allocation hooks. 00206 ACE_ALLOC_HOOK_DECLARE; 00207 00208 protected: 00209 /// Indicates the state of the queue, which can be 00210 /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>. 00211 int state_; 00212 00213 private: 00214 // = Disallow these operations. 00215 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Base &)) 00216 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Base (const ACE_Message_Queue_Base &)) 00217 }; 00218 00219 // Include the templates here. 00220 #include "ace/Message_Queue_T.h" 00221 00222 #if defined (VXWORKS) 00223 # include /**/ <msgQLib.h> 00224 00225 /** 00226 * @class ACE_Message_Queue_Vx 00227 * 00228 * @brief Wrapper for VxWorks message queues. 00229 * 00230 * Specialization of ACE_Message_Queue to simply wrap VxWorks 00231 * MsgQ. It does not use any synchronization, because it relies 00232 * on the native MsgQ implementation to take care of that. The 00233 * only system calls that it uses are VxWorks msgQLib calls, so 00234 * it is suitable for use in interrupt service routines. 00235 * NOTE: *Many* ACE_Message_Queue features are not supported with 00236 * this specialization, including: 00237 * * The two size arguments to the constructor and <open> are 00238 * interpreted differently. The first is interpreted as the 00239 * maximum number of bytes in a message. The second is 00240 * interpreted as the maximum number of messages that can be 00241 * queued. 00242 * * <dequeue_head> *requires* that the ACE_Message_Block 00243 * pointer argument point to an ACE_Message_Block that was 00244 * allocated by the caller. It must be big enough to support 00245 * the received message, without using continuation. The 00246 * pointer argument is not modified. 00247 * * Message priority. MSG_Q_FIFO is hard-coded. 00248 * * enqueue method timeouts. 00249 * * <peek_dequeue_head>. 00250 * * <ACE_Message_Queue_Iterators>. 00251 * * The ability to change low and high water marks after creation. 00252 * * <Message_Block> chains. The continuation field of <ACE_Message_Block> 00253 * * is ignored; only the first block of a fragment chain is 00254 * * recognized. 00255 */ 00256 class ACE_Message_Queue_Vx : public ACE_Message_Queue<ACE_NULL_SYNCH> 00257 { 00258 public: 00259 // = Initialization and termination methods. 00260 ACE_Message_Queue_Vx (size_t max_messages, 00261 size_t max_message_length, 00262 ACE_Notification_Strategy * = 0); 00263 00264 // Create a message queue with all the defaults. 00265 /// Create a message queue with all the defaults. 00266 virtual int open (size_t max_messages, 00267 size_t max_message_length, 00268 ACE_Notification_Strategy * = 0); 00269 00270 /// Close down the message queue and release all resources. 00271 virtual int close (void); 00272 00273 /// Close down the message queue and release all resources. 00274 virtual ~ACE_Message_Queue_Vx (void); 00275 00276 // = Queue statistic methods. 00277 /** 00278 * Number of total bytes on the queue, i.e., sum of the message 00279 * block sizes. 00280 */ 00281 virtual size_t message_bytes (void); 00282 00283 /** 00284 * Number of total length on the queue, i.e., sum of the message 00285 * block lengths. 00286 */ 00287 virtual size_t message_length (void); 00288 00289 /** 00290 * Number of total messages on the queue. 00291 */ 00292 virtual int message_count (void); 00293 00294 // = Manual changes to these stats (used when queued message blocks 00295 // change size or lengths). 00296 /** 00297 * New value of the number of total bytes on the queue, i.e., sum of 00298 * the message block sizes. 00299 */ 00300 virtual void message_bytes (size_t new_size); 00301 /** 00302 * New value of the number of total length on the queue, i.e., sum 00303 * of the message block lengths. 00304 */ 00305 virtual void message_length (size_t new_length); 00306 00307 // = Flow control routines 00308 00309 /// Get high watermark. 00310 virtual size_t high_water_mark (void); 00311 00312 /// Set high watermark. 00313 virtual void high_water_mark (size_t hwm); 00314 00315 /// Get low watermark. 00316 virtual size_t low_water_mark (void); 00317 00318 /// Set low watermark. 00319 virtual void low_water_mark (size_t lwm); 00320 00321 // = Activation control methods. 00322 00323 /// Dump the state of an object. 00324 void dump (void) const; 00325 00326 /// Declare the dynamic allocation hooks. 00327 ACE_ALLOC_HOOK_DECLARE; 00328 00329 protected: 00330 /// Enqueue an <ACE_Message_Block *> in accordance with its priority. 00331 virtual int enqueue_i (ACE_Message_Block *new_item); 00332 00333 /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time. 00334 virtual int enqueue_deadline_i (ACE_Message_Block *new_item); 00335 00336 /// Enqueue an <ACE_Message_Block *> at the end of the queue. 00337 virtual int enqueue_tail_i (ACE_Message_Block *new_item); 00338 00339 /// Enqueue an <ACE_Message_Block *> at the head of the queue. 00340 virtual int enqueue_head_i (ACE_Message_Block *new_item); 00341 00342 /// Dequeue and return the <ACE_Message_Block *> at the head of the 00343 /// queue. 00344 virtual int dequeue_head_i (ACE_Message_Block *&first_item); 00345 00346 /// Dequeue and return the <ACE_Message_Block *> with the lowest 00347 /// priority. 00348 virtual int dequeue_prio_i (ACE_Message_Block *&dequeued); 00349 00350 /// Dequeue and return the <ACE_Message_Block *> at the tail of the 00351 /// queue. 00352 virtual int dequeue_tail_i (ACE_Message_Block *&dequeued); 00353 00354 /// Dequeue and return the <ACE_Message_Block *> that has the lowest 00355 /// deadline time. 00356 virtual int dequeue_deadline_i (ACE_Message_Block *&dequeued); 00357 00358 // = Check the boundary conditions (assumes locks are held). 00359 /// True if queue is full, else false. 00360 virtual int is_full_i (void); 00361 00362 /// True if queue is empty, else false. 00363 virtual int is_empty_i (void); 00364 00365 // = Implementation of public <activate>/<deactivate> methods above. 00366 00367 // These methods assume locks are held. 00368 00369 // = Helper methods to factor out common #ifdef code. 00370 /// Wait for the queue to become non-full. 00371 virtual int wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon, 00372 ACE_Time_Value *tv); 00373 00374 /// Wait for the queue to become non-empty. 00375 virtual int wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon, 00376 ACE_Time_Value *tv); 00377 00378 /// Inform any threads waiting to enqueue that they can procede. 00379 virtual int signal_enqueue_waiters (void); 00380 00381 /// Inform any threads waiting to dequeue that they can procede. 00382 virtual int signal_dequeue_waiters (void); 00383 00384 /// Access the underlying msgQ. 00385 MSG_Q_ID msgq (void); 00386 00387 private: 00388 /// Maximum number of messages that can be queued. 00389 int max_messages_; 00390 00391 /// Maximum message size, in bytes. 00392 int max_message_length_; 00393 00394 /// Native message queue options. 00395 int options_; 00396 00397 // = Disallow these operations. 00398 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Vx &)) 00399 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Vx (const ACE_Message_Queue_Vx &)) 00400 00401 ACE_UNIMPLEMENTED_FUNC (virtual int peek_dequeue_head 00402 (ACE_Message_Block *&first_item, 00403 ACE_Time_Value *tv = 0)) 00404 }; 00405 #endif /* VXWORKS */ 00406 00407 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) 00408 /** 00409 * @class ACE_Message_Queue_NT 00410 * 00411 * @brief Message Queue implementation using IO completion port on NT. 00412 * 00413 * Implementation of a strip-downed ACE_Message_Queue using NT's 00414 * IO completion port mechanism. 00415 * NOTE: *Many* ACE_Message_Queue features are not supported with 00416 * this implementation, including: 00417 * * <open> method have different signatures. 00418 * * <dequeue_head> *requires* that the <ACE_Message_Block> 00419 * pointer argument point to an <ACE_Message_Block> that was 00420 * allocated by the caller. 00421 * * <peek_dequeue_head>. 00422 * * <ACE_Message_Queue_Iterators>. 00423 * * No flow control. 00424 */ 00425 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base 00426 { 00427 public: 00428 // = Initialization and termination methods. 00429 ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00430 00431 /** 00432 * Initialize the Message Queue by creating a new NT I/O completion 00433 * port. The first arguemnt specifies the number of threads 00434 * released by the MQ that are allowed to run concurrently. Return 00435 * 0 when succeeds, -1 otherwise. 00436 */ 00437 virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); 00438 00439 /// Close down the underlying I/O completion port. You need to 00440 /// re-open the MQ after this function is executed. 00441 virtual int close (void); 00442 00443 /// Close down the message queue and release all resources. 00444 virtual ~ACE_Message_Queue_NT (void); 00445 00446 // = Enqueue and dequeue methods. 00447 00448 /** 00449 * Enqueue an <ACE_Message_Block *> at the end of the queue. 00450 * Returns -1 on failure, else the number of items still on the 00451 * queue. 00452 */ 00453 virtual int enqueue_tail (ACE_Message_Block *new_item, 00454 ACE_Time_Value *timeout = 0); 00455 virtual int enqueue (ACE_Message_Block *new_item, 00456 ACE_Time_Value *timeout = 0); 00457 00458 /** 00459 * Dequeue and return the <ACE_Message_Block *> at the head of the 00460 * queue. Returns -1 on failure, else the number of items still on 00461 * the queue. 00462 */ 00463 virtual int dequeue_head (ACE_Message_Block *&first_item, 00464 ACE_Time_Value *timeout = 0); 00465 virtual int dequeue (ACE_Message_Block *&first_item, 00466 ACE_Time_Value *timeout = 0); 00467 00468 // = Check if queue is full/empty. 00469 /** 00470 * Always return false. 00471 */ 00472 00473 virtual int is_full (void); 00474 /** 00475 * True if queue is empty, else false. Notice the return value is 00476 * only transient. 00477 */ 00478 virtual int is_empty (void); 00479 00480 // = Queue statistic methods (transient.) 00481 /** 00482 * Number of total bytes on the queue, i.e., sum of the message 00483 * block sizes. 00484 */ 00485 virtual size_t message_bytes (void); 00486 00487 /** 00488 * Number of total length on the queue, i.e., sum of the message 00489 * block lengths. 00490 */ 00491 virtual size_t message_length (void); 00492 00493 /** 00494 * Number of total messages on the queue. 00495 */ 00496 virtual int message_count (void); 00497 00498 // = Manual changes to these stats (used when queued message blocks 00499 // change size or lengths). 00500 /** 00501 * New value of the number of total bytes on the queue, i.e., sum of 00502 * the message block sizes. 00503 */ 00504 virtual void message_bytes (size_t new_size); 00505 00506 /** 00507 * New value of the number of total length on the queue, i.e., sum 00508 * of the message block lengths. 00509 */ 00510 virtual void message_length (size_t new_length); 00511 00512 /// Get the max concurrent thread number. 00513 virtual DWORD max_threads (void); 00514 00515 // = Activation control methods. 00516 00517 /** 00518 * Deactivate the queue and wake up all threads waiting on the queue 00519 * so they can continue. No messages are removed from the queue, 00520 * however. Any other operations called until the queue is 00521 * activated again will immediately return -1 with @c errno 00522 * ESHUTDOWN. 00523 * 00524 * @retval The queue's state before this call. 00525 */ 00526 virtual int deactivate (void); 00527 00528 /** 00529 * Reactivate the queue so that threads can enqueue and dequeue 00530 * messages again. Returns the state of the queue before the call. 00531 */ 00532 virtual int activate (void); 00533 00534 /** 00535 * Pulse the queue to wake up any waiting threads. Changes the 00536 * queue state to PULSED; future enqueue/dequeue operations proceed 00537 * as in ACTIVATED state. 00538 * 00539 * @retval The queue's state before this call. 00540 */ 00541 virtual int pulse (void); 00542 00543 /// Returns true if the state of the queue is <DEACTIVATED>, 00544 /// but false if the queue's is <ACTIVATED> or <PULSED>. 00545 virtual int deactivated (void); 00546 00547 // = Not currently implemented... 00548 int peek_dequeue_head (ACE_Message_Block *&first_item, 00549 ACE_Time_Value *timeout = 0); 00550 ACE_Notification_Strategy *notification_strategy (void); 00551 void notification_strategy (ACE_Notification_Strategy *s); 00552 00553 // = Notification hook. 00554 00555 /// Dump the state of an object. 00556 virtual void dump (void) const; 00557 00558 /// Get the handle to the underlying completion port. 00559 virtual ACE_HANDLE completion_port (void); 00560 00561 /// Declare the dynamic allocation hooks. 00562 ACE_ALLOC_HOOK_DECLARE; 00563 00564 private: 00565 // = Internal states. 00566 00567 /// Maximum threads that can be released (and run) concurrently. 00568 DWORD max_cthrs_; 00569 00570 /// Current number of threads waiting to dequeue messages. 00571 DWORD cur_thrs_; 00572 00573 /// Current number of bytes in queue. 00574 size_t cur_bytes_; 00575 00576 /// Current length of messages in queue. 00577 size_t cur_length_; 00578 00579 /// Current number of messages in the queue. 00580 int cur_count_; 00581 00582 /** 00583 * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex 00584 * but since this class is only supported on NT, it's okay to use 00585 * ACE_Thread_Mutex here. 00586 */ 00587 ACE_Thread_Mutex lock_; 00588 00589 /// Underlying NT IoCompletionPort. 00590 ACE_HANDLE completion_port_; 00591 00592 // = Disallow these operations. 00593 ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_NT &)) 00594 ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_NT (const ACE_Message_Queue_NT &)) 00595 }; 00596 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ 00597 00598 00599 #if defined (__ACE_INLINE__) 00600 #include "ace/Message_Queue.i" 00601 #endif /* __ACE_INLINE__ */ 00602 00603 #include "ace/post.h" 00604 #endif /* ACE_MESSAGE_QUEUE_H */
1.2.14 written by Dimitri van Heesch,
© 1997-2002