Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Message_Queue.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // $Id: Message_Queue.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:21 chad Exp $
00003 
00004 #if !defined (ACE_MESSAGE_QUEUE_C)
00005 #define ACE_MESSAGE_QUEUE_C
00006 
00007 #include "ace/Message_Queue.h"
00008 #include "ace/Log_Msg.h"
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/Message_Queue.i"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID(ace, Message_Queue, "$Id: Message_Queue.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:21 chad Exp $")
00015 
00016 #if defined (VXWORKS)
00017 
00018 ////////////////////////////////
00019 // class ACE_Message_Queue_Vx //
00020 ////////////////////////////////
00021 
00022 void
00023 ACE_Message_Queue_Vx::dump (void) const
00024 {
00025   ACE_TRACE ("ACE_Message_Queue_Vx::dump");
00026   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00027   switch (this->state_)
00028     {
00029     case ACE_Message_Queue_Base::ACTIVATED:
00030       ACE_DEBUG ((LM_DEBUG,
00031                   ACE_LIB_TEXT ("state = ACTIVATED\n")));
00032       break;
00033     case ACE_Message_Queue_Base::DEACTIVATED:
00034       ACE_DEBUG ((LM_DEBUG,
00035                   ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00036       break;
00037     case ACE_Message_Queue_Base::PULSED:
00038       ACE_DEBUG ((LM_DEBUG,
00039                   ACE_LIB_TEXT ("state = PULSED\n")));
00040       break;
00041     }
00042   ACE_DEBUG ((LM_DEBUG,
00043               ACE_LIB_TEXT ("low_water_mark = %d\n")
00044               ACE_LIB_TEXT ("high_water_mark = %d\n")
00045               ACE_LIB_TEXT ("cur_bytes = %d\n")
00046               ACE_LIB_TEXT ("cur_length = %d\n")
00047               ACE_LIB_TEXT ("cur_count = %d\n")
00048               ACE_LIB_TEXT ("head_ = %u\n")
00049               ACE_LIB_TEXT ("MSG_Q_ID = %u\n"),
00050               this->low_water_mark_,
00051               this->high_water_mark_,
00052               this->cur_bytes_,
00053               this->cur_length_,
00054               this->cur_count_,
00055               this->head_,
00056               this->tail_));
00057   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00058 }
00059 
00060 ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
00061                                             size_t max_message_length,
00062                                             ACE_Notification_Strategy *ns)
00063   : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
00064     max_messages_ (ACE_static_cast (int, max_messages)),
00065     max_message_length_ (ACE_static_cast (int, max_message_length))
00066 {
00067   ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
00068 
00069   if (this->open (max_messages_, max_message_length_, ns) == -1)
00070     ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("open")));
00071 }
00072 
00073 ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
00074 {
00075   ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
00076 
00077   if (this->tail_ != 0  &&  this->close () == -1)
00078     ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("close")));
00079 }
00080 
00081 // Don't bother locking since if someone calls this function more than
00082 // once for the same queue, we're in bigger trouble than just
00083 // concurrency control!
00084 
00085 int
00086 ACE_Message_Queue_Vx::open (size_t max_messages,
00087                             size_t max_message_length,
00088                             ACE_Notification_Strategy *ns)
00089 {
00090   ACE_TRACE ("ACE_Message_Queue_Vx::open");
00091   this->high_water_mark_ = 0;
00092   this->low_water_mark_  = 0;
00093   this->cur_bytes_ = 0;
00094   this->cur_length_ = 0;
00095   this->cur_count_ = 0;
00096   this->head_ = 0;
00097   this->notification_strategy_ = ns;
00098   this->max_messages_ = ACE_static_cast (int, max_messages);
00099   this->max_message_length_ = ACE_static_cast (int, max_message_length);
00100 
00101   if (tail_)
00102     {
00103       // Had already created a msgQ, so delete it.
00104       close ();
00105       activate_i ();
00106     }
00107 
00108   return (this->tail_ =
00109             ACE_reinterpret_cast (ACE_Message_Block *,
00110               ::msgQCreate (max_messages_,
00111                             max_message_length_,
00112                             MSG_Q_FIFO))) == 0 ? -1 : 0;
00113 }
00114 
00115 // Clean up the queue if we have not already done so!
00116 
00117 int
00118 ACE_Message_Queue_Vx::close (void)
00119 {
00120   ACE_TRACE ("ACE_Message_Queue_Vx::close");
00121   // Don't lock, because we don't have a lock.  It shouldn't be
00122   // necessary, anyways.
00123 
00124   this->deactivate_i ();
00125 
00126   // Don't bother to free up the remaining message on the list,
00127   // because we don't have any way to iterate over what's in the
00128   // queue.
00129 
00130   return ::msgQDelete (msgq ());
00131 }
00132 
00133 int
00134 ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
00135 {
00136   // No-op.
00137   return 0;
00138 }
00139 
00140 int
00141 ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
00142 {
00143   // No-op.
00144   return 0;
00145 }
00146 
00147 int
00148 ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
00149 {
00150   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
00151 
00152   if (new_item == 0)
00153     return -1;
00154 
00155   // Don't try to send a composite message!!!!  Only the first
00156   // block will be sent.
00157 
00158   this->cur_count_++;
00159 
00160   // Always use this method to actually send a message on the queue.
00161   if (::msgQSend (msgq (),
00162                   new_item->rd_ptr (),
00163                   new_item->size (),
00164                   WAIT_FOREVER,
00165                   MSG_PRI_NORMAL) == OK)
00166     return ::msgQNumMsgs (msgq ());
00167   else
00168     return -1;
00169 }
00170 
00171 int
00172 ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
00173 {
00174   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
00175 
00176   // Just delegate to enqueue_tail_i.
00177   return enqueue_tail_i (new_item);
00178 }
00179 
00180 int
00181 ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
00182 {
00183   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
00184 
00185   if (new_item == 0)
00186     return -1;
00187 
00188   if (this->head_ == 0)
00189     // Should always take this branch.
00190     return this->enqueue_head_i (new_item);
00191   else
00192     ACE_NOTSUP_RETURN (-1);
00193 }
00194 
00195 int
00196 ACE_Message_Queue_Vx::enqueue_deadline_i (ACE_Message_Block *new_item)
00197 {
00198   ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_deadline_i");
00199 
00200   // Just delegate to enqueue_tail_i.
00201   return enqueue_tail_i (new_item);
00202 }
00203 
00204 // Actually get the first ACE_Message_Block (no locking, so must be
00205 // called with locks held).  This method assumes that the queue has at
00206 // least one item in it when it is called.
00207 
00208 int
00209 ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
00210 {
00211   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
00212 
00213   // We don't allocate a new Message_Block:  the caller must provide
00214   // it, and must ensure that it is big enough (without chaining).
00215 
00216   if (first_item == 0  ||  first_item->wr_ptr () == 0)
00217     return -1;
00218 
00219   if (::msgQReceive (msgq (),
00220                      first_item->wr_ptr (),
00221                      first_item->size (),
00222                      WAIT_FOREVER) == ERROR)
00223     return -1;
00224   else
00225     return ::msgQNumMsgs (msgq ());
00226 }
00227 
00228 int
00229 ACE_Message_Queue_Vx::dequeue_prio_i (ACE_Message_Block *& /*dequeued*/)
00230 {
00231   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_prio_i");
00232   ACE_NOTSUP_RETURN (-1);
00233 }
00234 
00235 int
00236 ACE_Message_Queue_Vx::dequeue_tail_i (ACE_Message_Block *& /*dequeued*/)
00237 {
00238   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_tail_i");
00239   ACE_NOTSUP_RETURN (-1);
00240 }
00241 
00242 int
00243 ACE_Message_Queue_Vx::dequeue_deadline_i (ACE_Message_Block *& /*dequeued*/)
00244 {
00245   ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_deadline_i");
00246   ACE_NOTSUP_RETURN (-1);
00247 }
00248 
00249 // Take a look at the first item without removing it.
00250 
00251 int
00252 ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00253                                           ACE_Time_Value *tv)
00254 {
00255   // Always return here, and let the VxWorks message queue handle blocking.
00256   ACE_UNUSED_ARG (mon);
00257   ACE_UNUSED_ARG (tv);
00258 
00259   return 0;
00260 }
00261 
00262 int
00263 ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00264                                            ACE_Time_Value *tv)
00265 {
00266   // Always return here, and let the VxWorks message queue handle blocking.
00267   ACE_UNUSED_ARG (mon);
00268   ACE_UNUSED_ARG (tv);
00269 
00270   return 0;
00271 }
00272 
00273 #if ! defined (ACE_NEEDS_FUNC_DEFINITIONS)
00274 int
00275 ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
00276                                          ACE_Time_Value *tv)
00277 {
00278   ACE_UNUSED_ARG (tv);
00279   ACE_NOTSUP_RETURN (-1);
00280 }
00281 #endif /* ! ACE_NEEDS_FUNC_DEFINITIONS */
00282 
00283 #endif /* VXWORKS */
00284 
00285 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00286 
00287 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00288   : max_cthrs_ (max_threads),
00289     cur_thrs_ (0),
00290     cur_bytes_ (0),
00291     cur_length_ (0),
00292     cur_count_ (0),
00293     completion_port_ (ACE_INVALID_HANDLE)
00294 {
00295   ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00296   this->open (max_threads);
00297 }
00298 
00299 int
00300 ACE_Message_Queue_NT::open (DWORD max_threads)
00301 {
00302   ACE_TRACE ("ACE_Message_Queue_NT::open");
00303   this->max_cthrs_ = max_threads;
00304   this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00305                                                      0,
00306                                                      ACE_Message_Queue_Base::WAS_ACTIVE,
00307                                                      max_threads);
00308   return (this->completion_port_ == 0 ? -1 : 0);
00309 }
00310 
00311 int
00312 ACE_Message_Queue_NT::close (void)
00313 {
00314   ACE_TRACE ("ACE_Message_Queue_NT::close");
00315   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00316   this->deactivate ();
00317   return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00318 }
00319 
00320 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00321 {
00322   ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00323   this->close ();
00324 }
00325 
00326 int
00327 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00328                                ACE_Time_Value *)
00329 {
00330   ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00331   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00332   if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00333     {
00334       size_t msize = new_item->total_size ();
00335       size_t mlength = new_item->total_length ();
00336       // Note - we send ACTIVATED in the 3rd arg to tell the completion
00337       // routine it's _NOT_ being woken up because of deactivate().
00338 #if defined (ACE_WIN64)
00339       ULONG_PTR state_to_post;
00340 #else
00341       DWORD state_to_post;
00342 #endif /* ACE_WIN64 */
00343       state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00344       if (::PostQueuedCompletionStatus (this->completion_port_,
00345                                         ACE_static_cast (DWORD, msize),
00346                                         state_to_post,
00347                                         ACE_reinterpret_cast (LPOVERLAPPED, new_item)))
00348         {
00349           // Update the states once I succeed.
00350           this->cur_bytes_ += msize;
00351           this->cur_length_ += mlength;
00352           return ++this->cur_count_;
00353         }
00354     }
00355   else
00356     errno = ESHUTDOWN;
00357 
00358   // Fail to enqueue the message.
00359   return -1;
00360 }
00361 
00362 int
00363 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00364                                ACE_Time_Value *timeout)
00365 {
00366   ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00367 
00368   {
00369     ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00370 
00371     // Make sure the MQ is not deactivated before proceeding.
00372     if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00373       {
00374         errno = ESHUTDOWN;      // Operation on deactivated MQ not allowed.
00375         return -1;
00376       }
00377     else
00378       ++this->cur_thrs_;        // Increase the waiting thread count.
00379   }
00380 
00381 #if defined (ACE_WIN64)
00382   ULONG_PTR queue_state;
00383 #else
00384   DWORD queue_state;
00385 #endif /* ACE_WIN64 */
00386   DWORD msize;
00387   // Get a message from the completion port.
00388   int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00389                                           &msize,
00390                                           &queue_state,
00391                                           ACE_reinterpret_cast (LPOVERLAPPED *, &first_item),
00392                                           (timeout == 0 ? INFINITE : timeout->msec ()));
00393   {
00394     ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00395     --this->cur_thrs_;          // Decrease waiting thread count.
00396     if (retv)
00397       {
00398         if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00399           {                     // Really get a valid MB from the queue.
00400             --this->cur_count_;
00401             this->cur_bytes_ -= msize;
00402             this->cur_length_ -= first_item->total_length ();
00403             return this->cur_count_;
00404           }
00405         else                    // Woken up by deactivate () or pulse ().
00406             errno = ESHUTDOWN;
00407       }
00408   }
00409   return -1;
00410 }
00411 
00412 int
00413 ACE_Message_Queue_NT::deactivate (void)
00414 {
00415   ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00416   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00417 
00418   int previous_state = this->state_;
00419   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00420     {
00421       this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00422 
00423       // Get the number of shutdown messages necessary to wake up all
00424       // waiting threads.
00425       DWORD cntr =
00426         this->cur_thrs_ - ACE_static_cast (DWORD, this->cur_count_);
00427       while (cntr-- > 0)
00428         ::PostQueuedCompletionStatus (this->completion_port_,
00429                                       0,
00430                                       this->state_,
00431                                       0);
00432     }
00433   return previous_state;
00434 }
00435 
00436 int
00437 ACE_Message_Queue_NT::activate (void)
00438 {
00439   ACE_TRACE ("ACE_Message_Queue_NT::activate");
00440   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00441   int previous_status = this->state_;
00442   this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00443   return previous_status;
00444 }
00445 
00446 int
00447 ACE_Message_Queue_NT::pulse (void)
00448 {
00449   ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00450   ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00451 
00452   int previous_state = this->state_;
00453   if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00454     {
00455       this->state_ = ACE_Message_Queue_Base::PULSED;
00456 
00457       // Get the number of shutdown messages necessary to wake up all
00458       // waiting threads.
00459 
00460       DWORD cntr =
00461         this->cur_thrs_ - ACE_static_cast (DWORD, this->cur_count_);
00462       while (cntr-- > 0)
00463         ::PostQueuedCompletionStatus (this->completion_port_,
00464                                       0,
00465                                       this->state_,
00466                                       0);
00467     }
00468   return previous_state;
00469 }
00470 
00471 void
00472 ACE_Message_Queue_NT::dump (void) const
00473 {
00474   ACE_TRACE ("ACE_Message_Queue_NT::dump");
00475 
00476   ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00477   switch (this->state_)
00478     {
00479     case ACE_Message_Queue_Base::ACTIVATED:
00480       ACE_DEBUG ((LM_DEBUG,
00481                   ACE_LIB_TEXT ("state = ACTIVATED\n")));
00482       break;
00483     case ACE_Message_Queue_Base::DEACTIVATED:
00484       ACE_DEBUG ((LM_DEBUG,
00485                   ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00486       break;
00487     case ACE_Message_Queue_Base::PULSED:
00488       ACE_DEBUG ((LM_DEBUG,
00489                   ACE_LIB_TEXT ("state = PULSED\n")));
00490       break;
00491     }
00492 
00493   ACE_DEBUG ((LM_DEBUG,
00494               ACE_LIB_TEXT ("max_cthrs_ = %d\n")
00495               ACE_LIB_TEXT ("cur_thrs_ = %d\n")
00496               ACE_LIB_TEXT ("cur_bytes = %d\n")
00497               ACE_LIB_TEXT ("cur_length = %d\n")
00498               ACE_LIB_TEXT ("cur_count = %d\n")
00499               ACE_LIB_TEXT ("completion_port_ = %x\n"),
00500               this->max_cthrs_,
00501               this->cur_thrs_,
00502               this->cur_bytes_,
00503               this->cur_length_,
00504               this->cur_count_,
00505               this->completion_port_));
00506   ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00507 }
00508 
00509 #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
00510 
00511 #endif /* ACE_MESSAGE_QUEUE_C */

Generated on Mon Jun 16 11:20:22 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002