Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Message_Queue.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 
00003 //=============================================================================
00004 /**
00005  *  @file    Message_Queue.h
00006  *
00007  *  $Id: Message_Queue.h,v 1.1.1.4 2003/02/21 18:36:32 chad Exp $
00008  *
00009  *  @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
00010  */
00011 //=============================================================================
00012 
00013 #ifndef ACE_MESSAGE_QUEUE_H
00014 #define ACE_MESSAGE_QUEUE_H
00015 #include "ace/pre.h"
00016 
00017 #include "ace/Message_Block.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 # pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 #include "ace/IO_Cntl_Msg.h"
00024 #include "ace/Synch.h"
00025 
00026 // Forward decls.
00027 class ACE_Notification_Strategy;
00028 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Iterator;
00029 template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator;
00030 
00031 /**
00032  * @class ACE_Message_Queue_Base
00033  *
00034  * @brief Base class for <ACE_Message_Queue>, which is the central
00035  * queueing facility for messages in the ACE framework.
00036  *
00037  * For all the <ACE_Time_Value> pointer parameters the caller will
00038  * block until action is possible if <timeout> == 0.  Otherwise, it
00039  * will wait until the absolute time specified in *<timeout>
00040  * elapses.
00041  *
00042  * A queue is always in one of three states:
00043  * . ACTIVATED
00044  * . DEACTIVATED
00045  * . PULSED
00046  */
00047 class ACE_Export ACE_Message_Queue_Base
00048 {
00049 public:
00050   enum
00051   {
00052     // Default high and low watermarks.
00053 
00054     /// Default high watermark (16 K).
00055     DEFAULT_HWM = 16 * 1024,
00056     /// Default low watermark (same as high water mark).
00057     DEFAULT_LWM = 16 * 1024,
00058 
00059     // Queue states.  Before PULSED state was added, the activate()
00060     // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE
00061     // to indicate the previous condition.  Now those methods
00062     // return the state the queue was previously in.  WAS_ACTIVE
00063     // and WAS_INACTIVE are defined to match previous semantics for
00064     // applications that don't use the PULSED state.
00065 
00066     WAS_ACTIVE = 1, /* DEPRECATED */
00067     /// Message queue is active and processing normally
00068     ACTIVATED = 1,
00069 
00070     WAS_INACTIVE = 2, /* DEPRECATED */
00071     /// Queue is deactivated; no enqueue or dequeue operations allowed.
00072     DEACTIVATED = 2,
00073 
00074     /// Message queue was pulsed; enqueue and dequeue may proceed normally.
00075     PULSED = 3
00076 
00077   };
00078 
00079   ACE_Message_Queue_Base (void);
00080 
00081   /// Close down the message queue and release all resources.
00082   virtual int close (void) = 0;
00083 
00084   /// Close down the message queue and release all resources.
00085   virtual ~ACE_Message_Queue_Base (void);
00086 
00087   // = Enqueue and dequeue methods.
00088 
00089   /**
00090    * Retrieve the first <ACE_Message_Block> without removing it.  Note
00091    * that <timeout> uses <{absolute}> time rather than <{relative}>
00092    * time.  If the <timeout> elapses without receiving a message -1 is
00093    * returned and <errno> is set to <EWOULDBLOCK>.  If the queue is
00094    * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
00095    * Otherwise, returns -1 on failure, else the number of items still
00096    * on the queue.
00097    */
00098   virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
00099                                  ACE_Time_Value *timeout = 0) = 0;
00100 
00101   /**
00102    * Enqueue a <ACE_Message_Block *> into the tail of the queue.
00103    * Returns number of items in queue if the call succeeds or -1
00104    * otherwise.  These calls return -1 when queue is closed,
00105    * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal
00106    * occurs (in which case <errno> == <EINTR>, or if the time
00107    * specified in timeout elapses (in which case <errno> ==
00108    * <EWOULDBLOCK>).
00109    */
00110   virtual int enqueue_tail (ACE_Message_Block *new_item,
00111                             ACE_Time_Value *timeout = 0) = 0;
00112   virtual int enqueue (ACE_Message_Block *new_item,
00113                        ACE_Time_Value *timeout = 0) = 0;
00114 
00115   /**
00116    * Dequeue and return the <ACE_Message_Block *> at the head of the
00117    * queue.  Returns number of items in queue if the call succeeds or
00118    * -1 otherwise.  These calls return -1 when queue is closed,
00119    * deactivated (in which case <errno> == <ESHUTDOWN>), when a signal
00120    * occurs (in which case <errno> == <EINTR>, or if the time
00121    * specified in timeout elapses (in which case <errno> ==
00122    * <EWOULDBLOCK>).
00123    */
00124   virtual int dequeue_head (ACE_Message_Block *&first_item,
00125                             ACE_Time_Value *timeout = 0) = 0;
00126   virtual int dequeue (ACE_Message_Block *&first_item,
00127                        ACE_Time_Value *timeout = 0) = 0;
00128 
00129   // = Check if queue is full/empty.
00130   /// True if queue is full, else false.
00131   virtual int is_full (void) = 0;
00132 
00133   /// True if queue is empty, else false.
00134   virtual int is_empty (void) = 0;
00135 
00136   // = Queue statistic methods.
00137 
00138   /// Number of total bytes on the queue, i.e., sum of the message
00139   /// block sizes.
00140   virtual size_t message_bytes (void) = 0;
00141 
00142   /// Number of total length on the queue, i.e., sum of the message
00143   /// block lengths.
00144   virtual size_t message_length (void) = 0;
00145 
00146   /// Number of total messages on the queue.
00147   virtual int message_count (void) = 0;
00148 
00149   /// New value of the number of total bytes on the queue, i.e.,
00150   /// sum of the message block sizes.
00151   virtual void message_bytes (size_t new_size) = 0;
00152 
00153   /// New value of the number of total length on the queue, i.e.,
00154   /// sum of the message block lengths.
00155   virtual void message_length (size_t new_length) = 0;
00156 
00157   // = Activation control methods.
00158 
00159   /**
00160    * Deactivate the queue and wake up all threads waiting on the queue
00161    * so they can continue.  No messages are removed from the queue,
00162    * however.  Any other operations called until the queue is
00163    * activated again will immediately return -1 with @c errno
00164    * ESHUTDOWN.
00165    *
00166    * @retval  The queue's state before this call.
00167    */
00168   virtual int deactivate (void) = 0;
00169 
00170   /**
00171    * Reactivate the queue so that threads can enqueue and dequeue
00172    * messages again.
00173    *
00174    * @retval  The queue's state before this call.
00175    */
00176   virtual int activate (void) = 0;
00177 
00178   /**
00179    * Pulse the queue to wake up any waiting threads.  Changes the
00180    * queue state to PULSED; future enqueue/dequeue operations proceed
00181    * as in ACTIVATED state.
00182    *
00183    * @retval  The queue's state before this call.
00184    */
00185   virtual int pulse (void) = 0;
00186 
00187   /// Returns the current state of the queue.
00188   virtual int state (void);
00189 
00190   /// Returns 1 if the state of the queue is DEACTIVATED,
00191   /// and 0 if the queue's state is ACTIVATED or PULSED.
00192   virtual int deactivated (void) = 0;
00193 
00194   /// Get the notification strategy for the <Message_Queue>
00195   virtual ACE_Notification_Strategy *notification_strategy (void) = 0;
00196 
00197   /// Set the notification strategy for the <Message_Queue>
00198   virtual void notification_strategy (ACE_Notification_Strategy *s) = 0;
00199 
00200   // = Notification hook.
00201 
00202   /// Dump the state of an object.
00203   virtual void dump (void) const = 0;
00204 
00205   /// Declare the dynamic allocation hooks.
00206   ACE_ALLOC_HOOK_DECLARE;
00207 
00208 protected:
00209   /// Indicates the state of the queue, which can be
00210   /// <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
00211   int state_;
00212 
00213 private:
00214   // = Disallow these operations.
00215   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Base &))
00216   ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Base (const ACE_Message_Queue_Base &))
00217 };
00218 
00219 // Include the templates here.
00220 #include "ace/Message_Queue_T.h"
00221 
00222 #if defined (VXWORKS)
00223 # include /**/ <msgQLib.h>
00224 
00225 /**
00226  * @class ACE_Message_Queue_Vx
00227  *
00228  * @brief Wrapper for VxWorks message queues.
00229  *
00230  * Specialization of ACE_Message_Queue to simply wrap VxWorks
00231  * MsgQ.  It does not use any synchronization, because it relies
00232  * on the native MsgQ implementation to take care of that.  The
00233  * only system calls that it uses are VxWorks msgQLib calls, so
00234  * it is suitable for use in interrupt service routines.
00235  * NOTE: *Many* ACE_Message_Queue features are not supported with
00236  * this specialization, including:
00237  * * The two size arguments to the constructor and <open> are
00238  * interpreted differently.  The first is interpreted as the
00239  * maximum number of bytes in a message.  The second is
00240  * interpreted as the maximum number of messages that can be
00241  * queued.
00242  * * <dequeue_head> *requires* that the ACE_Message_Block
00243  * pointer argument point to an ACE_Message_Block that was
00244  * allocated by the caller.  It must be big enough to support
00245  * the received message, without using continuation. The
00246  * pointer argument is not modified.
00247  * * Message priority.  MSG_Q_FIFO is hard-coded.
00248  * * enqueue method timeouts.
00249  * * <peek_dequeue_head>.
00250  * * <ACE_Message_Queue_Iterators>.
00251  * * The ability to change low and high water marks after creation.
00252  * * <Message_Block> chains.  The continuation field of <ACE_Message_Block>
00253  * *   is ignored; only the first block of a fragment chain is
00254  * *   recognized.
00255  */
00256 class ACE_Message_Queue_Vx : public ACE_Message_Queue<ACE_NULL_SYNCH>
00257 {
00258 public:
00259   // = Initialization and termination methods.
00260   ACE_Message_Queue_Vx (size_t max_messages,
00261                         size_t max_message_length,
00262                         ACE_Notification_Strategy * = 0);
00263 
00264   // Create a message queue with all the defaults.
00265   /// Create a message queue with all the defaults.
00266   virtual int open (size_t max_messages,
00267                     size_t max_message_length,
00268                     ACE_Notification_Strategy * = 0);
00269 
00270   /// Close down the message queue and release all resources.
00271   virtual int close (void);
00272 
00273   /// Close down the message queue and release all resources.
00274   virtual ~ACE_Message_Queue_Vx (void);
00275 
00276   // = Queue statistic methods.
00277   /**
00278    * Number of total bytes on the queue, i.e., sum of the message
00279    * block sizes.
00280    */
00281   virtual size_t message_bytes (void);
00282 
00283   /**
00284    * Number of total length on the queue, i.e., sum of the message
00285    * block lengths.
00286    */
00287   virtual size_t message_length (void);
00288 
00289   /**
00290    * Number of total messages on the queue.
00291    */
00292   virtual int message_count (void);
00293 
00294   // = Manual changes to these stats (used when queued message blocks
00295   // change size or lengths).
00296   /**
00297    * New value of the number of total bytes on the queue, i.e., sum of
00298    * the message block sizes.
00299    */
00300   virtual void message_bytes (size_t new_size);
00301   /**
00302    * New value of the number of total length on the queue, i.e., sum
00303    * of the message block lengths.
00304    */
00305   virtual void message_length (size_t new_length);
00306 
00307   // = Flow control routines
00308 
00309   /// Get high watermark.
00310   virtual size_t high_water_mark (void);
00311 
00312   /// Set high watermark.
00313   virtual void high_water_mark (size_t hwm);
00314 
00315   /// Get low watermark.
00316   virtual size_t low_water_mark (void);
00317 
00318   /// Set low watermark.
00319   virtual void low_water_mark (size_t lwm);
00320 
00321   // = Activation control methods.
00322 
00323   /// Dump the state of an object.
00324   void dump (void) const;
00325 
00326   /// Declare the dynamic allocation hooks.
00327   ACE_ALLOC_HOOK_DECLARE;
00328 
00329 protected:
00330   /// Enqueue an <ACE_Message_Block *> in accordance with its priority.
00331   virtual int enqueue_i (ACE_Message_Block *new_item);
00332 
00333   /// Enqueue an <ACE_Message_Block *> in accordance with its deadline time.
00334   virtual int enqueue_deadline_i (ACE_Message_Block *new_item);
00335 
00336   /// Enqueue an <ACE_Message_Block *> at the end of the queue.
00337   virtual int enqueue_tail_i (ACE_Message_Block *new_item);
00338 
00339   /// Enqueue an <ACE_Message_Block *> at the head of the queue.
00340   virtual int enqueue_head_i (ACE_Message_Block *new_item);
00341 
00342   /// Dequeue and return the <ACE_Message_Block *> at the head of the
00343   /// queue.
00344   virtual int dequeue_head_i (ACE_Message_Block *&first_item);
00345 
00346   /// Dequeue and return the <ACE_Message_Block *> with the lowest
00347   /// priority.
00348   virtual int dequeue_prio_i (ACE_Message_Block *&dequeued);
00349 
00350   /// Dequeue and return the <ACE_Message_Block *> at the tail of the
00351   /// queue.
00352   virtual int dequeue_tail_i (ACE_Message_Block *&dequeued);
00353 
00354   /// Dequeue and return the <ACE_Message_Block *> that has the lowest
00355   /// deadline time.
00356   virtual int dequeue_deadline_i (ACE_Message_Block *&dequeued);
00357 
00358   // = Check the boundary conditions (assumes locks are held).
00359   /// True if queue is full, else false.
00360   virtual int is_full_i (void);
00361 
00362   /// True if queue is empty, else false.
00363   virtual int is_empty_i (void);
00364 
00365   // = Implementation of public <activate>/<deactivate> methods above.
00366 
00367   // These methods assume locks are held.
00368 
00369   // = Helper methods to factor out common #ifdef code.
00370   /// Wait for the queue to become non-full.
00371   virtual int wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00372                                   ACE_Time_Value *tv);
00373 
00374   /// Wait for the queue to become non-empty.
00375   virtual int wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00376                                    ACE_Time_Value *tv);
00377 
00378   /// Inform any threads waiting to enqueue that they can procede.
00379   virtual int signal_enqueue_waiters (void);
00380 
00381   /// Inform any threads waiting to dequeue that they can procede.
00382   virtual int signal_dequeue_waiters (void);
00383 
00384   /// Access the underlying msgQ.
00385   MSG_Q_ID msgq (void);
00386 
00387 private:
00388   /// Maximum number of messages that can be queued.
00389   int max_messages_;
00390 
00391   /// Maximum message size, in bytes.
00392   int max_message_length_;
00393 
00394   /// Native message queue options.
00395   int options_;
00396 
00397   // = Disallow these operations.
00398   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Vx &))
00399   ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Vx (const ACE_Message_Queue_Vx &))
00400 
00401   ACE_UNIMPLEMENTED_FUNC (virtual int peek_dequeue_head
00402                             (ACE_Message_Block *&first_item,
00403                              ACE_Time_Value *tv = 0))
00404 };
00405 #endif /* VXWORKS */
00406 
00407 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00408 /**
00409  * @class ACE_Message_Queue_NT
00410  *
00411  * @brief Message Queue implementation using IO completion port on NT.
00412  *
00413  * Implementation of a strip-downed ACE_Message_Queue using NT's
00414  * IO completion port mechanism.
00415  * NOTE: *Many* ACE_Message_Queue features are not supported with
00416  * this implementation, including:
00417  * * <open> method have different signatures.
00418  * * <dequeue_head> *requires* that the <ACE_Message_Block>
00419  * pointer argument point to an <ACE_Message_Block> that was
00420  * allocated by the caller.
00421  * * <peek_dequeue_head>.
00422  * * <ACE_Message_Queue_Iterators>.
00423  * * No flow control.
00424  */
00425 class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base
00426 {
00427 public:
00428   // = Initialization and termination methods.
00429   ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
00430 
00431   /**
00432    * Initialize the Message Queue by creating a new NT I/O completion
00433    * port.  The first arguemnt specifies the number of threads
00434    * released by the MQ that are allowed to run concurrently.  Return
00435    * 0 when succeeds, -1 otherwise.
00436    */
00437   virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
00438 
00439   /// Close down the underlying I/O completion port.  You need to
00440   /// re-open the MQ after this function is executed.
00441   virtual int close (void);
00442 
00443   /// Close down the message queue and release all resources.
00444   virtual ~ACE_Message_Queue_NT (void);
00445 
00446   // = Enqueue and dequeue methods.
00447 
00448   /**
00449    * Enqueue an <ACE_Message_Block *> at the end of the queue.
00450    * Returns -1 on failure, else the number of items still on the
00451    * queue.
00452    */
00453   virtual int enqueue_tail (ACE_Message_Block *new_item,
00454                             ACE_Time_Value *timeout = 0);
00455   virtual int enqueue (ACE_Message_Block *new_item,
00456                        ACE_Time_Value *timeout = 0);
00457 
00458   /**
00459    * Dequeue and return the <ACE_Message_Block *> at the head of the
00460    * queue.  Returns -1 on failure, else the number of items still on
00461    * the queue.
00462    */
00463   virtual int dequeue_head (ACE_Message_Block *&first_item,
00464                             ACE_Time_Value *timeout = 0);
00465   virtual int dequeue (ACE_Message_Block *&first_item,
00466                        ACE_Time_Value *timeout = 0);
00467 
00468   // = Check if queue is full/empty.
00469   /**
00470    * Always return false.
00471    */
00472 
00473   virtual int is_full (void);
00474   /**
00475    * True if queue is empty, else false.  Notice the return value is
00476    * only transient.
00477    */
00478   virtual int is_empty (void);
00479 
00480   // = Queue statistic methods (transient.)
00481   /**
00482    * Number of total bytes on the queue, i.e., sum of the message
00483    * block sizes.
00484    */
00485   virtual size_t message_bytes (void);
00486 
00487   /**
00488    * Number of total length on the queue, i.e., sum of the message
00489    * block lengths.
00490    */
00491   virtual size_t message_length (void);
00492 
00493   /**
00494    * Number of total messages on the queue.
00495    */
00496   virtual int message_count (void);
00497 
00498   // = Manual changes to these stats (used when queued message blocks
00499   // change size or lengths).
00500   /**
00501    * New value of the number of total bytes on the queue, i.e., sum of
00502    * the message block sizes.
00503    */
00504   virtual void message_bytes (size_t new_size);
00505 
00506   /**
00507    * New value of the number of total length on the queue, i.e., sum
00508    * of the message block lengths.
00509    */
00510   virtual void message_length (size_t new_length);
00511 
00512   /// Get the max concurrent thread number.
00513   virtual DWORD max_threads (void);
00514 
00515   // = Activation control methods.
00516 
00517   /**
00518    * Deactivate the queue and wake up all threads waiting on the queue
00519    * so they can continue.  No messages are removed from the queue,
00520    * however.  Any other operations called until the queue is
00521    * activated again will immediately return -1 with @c errno
00522    * ESHUTDOWN.
00523    *
00524    * @retval  The queue's state before this call.
00525    */
00526   virtual int deactivate (void);
00527 
00528   /**
00529    * Reactivate the queue so that threads can enqueue and dequeue
00530    * messages again.  Returns the state of the queue before the call.
00531    */
00532   virtual int activate (void);
00533 
00534   /**
00535    * Pulse the queue to wake up any waiting threads.  Changes the
00536    * queue state to PULSED; future enqueue/dequeue operations proceed
00537    * as in ACTIVATED state.
00538    *
00539    * @retval  The queue's state before this call.
00540    */
00541   virtual int pulse (void);
00542 
00543   /// Returns true if the state of the queue is <DEACTIVATED>,
00544   /// but false if the queue's is <ACTIVATED> or <PULSED>.
00545   virtual int deactivated (void);
00546 
00547   // = Not currently implemented...
00548   int peek_dequeue_head (ACE_Message_Block *&first_item,
00549                          ACE_Time_Value *timeout = 0);
00550   ACE_Notification_Strategy *notification_strategy (void);
00551   void notification_strategy (ACE_Notification_Strategy *s);
00552 
00553   // = Notification hook.
00554 
00555   /// Dump the state of an object.
00556   virtual void dump (void) const;
00557 
00558   /// Get the handle to the underlying completion port.
00559   virtual ACE_HANDLE completion_port (void);
00560 
00561   /// Declare the dynamic allocation hooks.
00562   ACE_ALLOC_HOOK_DECLARE;
00563 
00564 private:
00565   // = Internal states.
00566 
00567   /// Maximum threads that can be released (and run) concurrently.
00568   DWORD max_cthrs_;
00569 
00570   /// Current number of threads waiting to dequeue messages.
00571   DWORD cur_thrs_;
00572 
00573   /// Current number of bytes in queue.
00574   size_t cur_bytes_;
00575 
00576   /// Current length of messages in queue.
00577   size_t cur_length_;
00578 
00579   /// Current number of messages in the queue.
00580   int cur_count_;
00581 
00582   /**
00583    * Synchronizer.  This should really be an ACE_Recursive_Thread_Mutex
00584    * but since this class is only supported on NT, it's okay to use
00585    * ACE_Thread_Mutex here.
00586    */
00587   ACE_Thread_Mutex lock_;
00588 
00589   /// Underlying NT IoCompletionPort.
00590   ACE_HANDLE completion_port_;
00591 
00592   // = Disallow these operations.
00593   ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_NT &))
00594   ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_NT (const ACE_Message_Queue_NT &))
00595 };
00596 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
00597 
00598 
00599 #if defined (__ACE_INLINE__)
00600 #include "ace/Message_Queue.i"
00601 #endif /* __ACE_INLINE__ */
00602 
00603 #include "ace/post.h"
00604 #endif /* ACE_MESSAGE_QUEUE_H */

Generated on Mon Jun 16 11:20:23 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002