00001 #include "ace_pch.h"
00002
00003
00004 #if !defined (ACE_MESSAGE_QUEUE_C)
00005 #define ACE_MESSAGE_QUEUE_C
00006
00007 #include "ace/Message_Queue.h"
00008 #include "ace/Log_Msg.h"
00009
00010 #if !defined (__ACE_INLINE__)
00011 #include "ace/Message_Queue.i"
00012 #endif
00013
00014 ACE_RCSID(ace, Message_Queue, "$Id: Message_Queue.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:21 chad Exp $")
00015
00016 #if defined (VXWORKS)
00017
00018
00019
00020
00021
00022 void
00023 ACE_Message_Queue_Vx::dump (void) const
00024 {
00025 ACE_TRACE ("ACE_Message_Queue_Vx::dump");
00026 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00027 switch (this->state_)
00028 {
00029 case ACE_Message_Queue_Base::ACTIVATED:
00030 ACE_DEBUG ((LM_DEBUG,
00031 ACE_LIB_TEXT ("state = ACTIVATED\n")));
00032 break;
00033 case ACE_Message_Queue_Base::DEACTIVATED:
00034 ACE_DEBUG ((LM_DEBUG,
00035 ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00036 break;
00037 case ACE_Message_Queue_Base::PULSED:
00038 ACE_DEBUG ((LM_DEBUG,
00039 ACE_LIB_TEXT ("state = PULSED\n")));
00040 break;
00041 }
00042 ACE_DEBUG ((LM_DEBUG,
00043 ACE_LIB_TEXT ("low_water_mark = %d\n")
00044 ACE_LIB_TEXT ("high_water_mark = %d\n")
00045 ACE_LIB_TEXT ("cur_bytes = %d\n")
00046 ACE_LIB_TEXT ("cur_length = %d\n")
00047 ACE_LIB_TEXT ("cur_count = %d\n")
00048 ACE_LIB_TEXT ("head_ = %u\n")
00049 ACE_LIB_TEXT ("MSG_Q_ID = %u\n"),
00050 this->low_water_mark_,
00051 this->high_water_mark_,
00052 this->cur_bytes_,
00053 this->cur_length_,
00054 this->cur_count_,
00055 this->head_,
00056 this->tail_));
00057 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00058 }
00059
00060 ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
00061 size_t max_message_length,
00062 ACE_Notification_Strategy *ns)
00063 : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
00064 max_messages_ (ACE_static_cast (int, max_messages)),
00065 max_message_length_ (ACE_static_cast (int, max_message_length))
00066 {
00067 ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
00068
00069 if (this->open (max_messages_, max_message_length_, ns) == -1)
00070 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("open")));
00071 }
00072
00073 ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
00074 {
00075 ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
00076
00077 if (this->tail_ != 0 && this->close () == -1)
00078 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("close")));
00079 }
00080
00081
00082
00083
00084
00085 int
00086 ACE_Message_Queue_Vx::open (size_t max_messages,
00087 size_t max_message_length,
00088 ACE_Notification_Strategy *ns)
00089 {
00090 ACE_TRACE ("ACE_Message_Queue_Vx::open");
00091 this->high_water_mark_ = 0;
00092 this->low_water_mark_ = 0;
00093 this->cur_bytes_ = 0;
00094 this->cur_length_ = 0;
00095 this->cur_count_ = 0;
00096 this->head_ = 0;
00097 this->notification_strategy_ = ns;
00098 this->max_messages_ = ACE_static_cast (int, max_messages);
00099 this->max_message_length_ = ACE_static_cast (int, max_message_length);
00100
00101 if (tail_)
00102 {
00103
00104 close ();
00105 activate_i ();
00106 }
00107
00108 return (this->tail_ =
00109 ACE_reinterpret_cast (ACE_Message_Block *,
00110 ::msgQCreate (max_messages_,
00111 max_message_length_,
00112 MSG_Q_FIFO))) == 0 ? -1 : 0;
00113 }
00114
00115
00116
00117 int
00118 ACE_Message_Queue_Vx::close (void)
00119 {
00120 ACE_TRACE ("ACE_Message_Queue_Vx::close");
00121
00122
00123
00124 this->deactivate_i ();
00125
00126
00127
00128
00129
00130 return ::msgQDelete (msgq ());
00131 }
00132
00133 int
00134 ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
00135 {
00136
00137 return 0;
00138 }
00139
00140 int
00141 ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
00142 {
00143
00144 return 0;
00145 }
00146
00147 int
00148 ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
00149 {
00150 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
00151
00152 if (new_item == 0)
00153 return -1;
00154
00155
00156
00157
00158 this->cur_count_++;
00159
00160
00161 if (::msgQSend (msgq (),
00162 new_item->rd_ptr (),
00163 new_item->size (),
00164 WAIT_FOREVER,
00165 MSG_PRI_NORMAL) == OK)
00166 return ::msgQNumMsgs (msgq ());
00167 else
00168 return -1;
00169 }
00170
00171 int
00172 ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
00173 {
00174 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
00175
00176
00177 return enqueue_tail_i (new_item);
00178 }
00179
00180 int
00181 ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
00182 {
00183 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
00184
00185 if (new_item == 0)
00186 return -1;
00187
00188 if (this->head_ == 0)
00189
00190 return this->enqueue_head_i (new_item);
00191 else
00192 ACE_NOTSUP_RETURN (-1);
00193 }
00194
00195 int
00196 ACE_Message_Queue_Vx::enqueue_deadline_i (ACE_Message_Block *new_item)
00197 {
00198 ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_deadline_i");
00199
00200
00201 return enqueue_tail_i (new_item);
00202 }
00203
00204
00205
00206
00207
00208 int
00209 ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
00210 {
00211 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
00212
00213
00214
00215
00216 if (first_item == 0 || first_item->wr_ptr () == 0)
00217 return -1;
00218
00219 if (::msgQReceive (msgq (),
00220 first_item->wr_ptr (),
00221 first_item->size (),
00222 WAIT_FOREVER) == ERROR)
00223 return -1;
00224 else
00225 return ::msgQNumMsgs (msgq ());
00226 }
00227
00228 int
00229 ACE_Message_Queue_Vx::dequeue_prio_i (ACE_Message_Block *& )
00230 {
00231 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_prio_i");
00232 ACE_NOTSUP_RETURN (-1);
00233 }
00234
00235 int
00236 ACE_Message_Queue_Vx::dequeue_tail_i (ACE_Message_Block *& )
00237 {
00238 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_tail_i");
00239 ACE_NOTSUP_RETURN (-1);
00240 }
00241
00242 int
00243 ACE_Message_Queue_Vx::dequeue_deadline_i (ACE_Message_Block *& )
00244 {
00245 ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_deadline_i");
00246 ACE_NOTSUP_RETURN (-1);
00247 }
00248
00249
00250
00251 int
00252 ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00253 ACE_Time_Value *tv)
00254 {
00255
00256 ACE_UNUSED_ARG (mon);
00257 ACE_UNUSED_ARG (tv);
00258
00259 return 0;
00260 }
00261
00262 int
00263 ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
00264 ACE_Time_Value *tv)
00265 {
00266
00267 ACE_UNUSED_ARG (mon);
00268 ACE_UNUSED_ARG (tv);
00269
00270 return 0;
00271 }
00272
00273 #if ! defined (ACE_NEEDS_FUNC_DEFINITIONS)
00274 int
00275 ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
00276 ACE_Time_Value *tv)
00277 {
00278 ACE_UNUSED_ARG (tv);
00279 ACE_NOTSUP_RETURN (-1);
00280 }
00281 #endif
00282
00283 #endif
00284
00285 #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
00286
00287 ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
00288 : max_cthrs_ (max_threads),
00289 cur_thrs_ (0),
00290 cur_bytes_ (0),
00291 cur_length_ (0),
00292 cur_count_ (0),
00293 completion_port_ (ACE_INVALID_HANDLE)
00294 {
00295 ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
00296 this->open (max_threads);
00297 }
00298
00299 int
00300 ACE_Message_Queue_NT::open (DWORD max_threads)
00301 {
00302 ACE_TRACE ("ACE_Message_Queue_NT::open");
00303 this->max_cthrs_ = max_threads;
00304 this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
00305 0,
00306 ACE_Message_Queue_Base::WAS_ACTIVE,
00307 max_threads);
00308 return (this->completion_port_ == 0 ? -1 : 0);
00309 }
00310
00311 int
00312 ACE_Message_Queue_NT::close (void)
00313 {
00314 ACE_TRACE ("ACE_Message_Queue_NT::close");
00315 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00316 this->deactivate ();
00317 return (::CloseHandle (this->completion_port_) ? 0 : -1 );
00318 }
00319
00320 ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
00321 {
00322 ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
00323 this->close ();
00324 }
00325
00326 int
00327 ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
00328 ACE_Time_Value *)
00329 {
00330 ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
00331 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00332 if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
00333 {
00334 size_t msize = new_item->total_size ();
00335 size_t mlength = new_item->total_length ();
00336
00337
00338 #if defined (ACE_WIN64)
00339 ULONG_PTR state_to_post;
00340 #else
00341 DWORD state_to_post;
00342 #endif
00343 state_to_post = ACE_Message_Queue_Base::ACTIVATED;
00344 if (::PostQueuedCompletionStatus (this->completion_port_,
00345 ACE_static_cast (DWORD, msize),
00346 state_to_post,
00347 ACE_reinterpret_cast (LPOVERLAPPED, new_item)))
00348 {
00349
00350 this->cur_bytes_ += msize;
00351 this->cur_length_ += mlength;
00352 return ++this->cur_count_;
00353 }
00354 }
00355 else
00356 errno = ESHUTDOWN;
00357
00358
00359 return -1;
00360 }
00361
00362 int
00363 ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
00364 ACE_Time_Value *timeout)
00365 {
00366 ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
00367
00368 {
00369 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00370
00371
00372 if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
00373 {
00374 errno = ESHUTDOWN;
00375 return -1;
00376 }
00377 else
00378 ++this->cur_thrs_;
00379 }
00380
00381 #if defined (ACE_WIN64)
00382 ULONG_PTR queue_state;
00383 #else
00384 DWORD queue_state;
00385 #endif
00386 DWORD msize;
00387
00388 int retv = ::GetQueuedCompletionStatus (this->completion_port_,
00389 &msize,
00390 &queue_state,
00391 ACE_reinterpret_cast (LPOVERLAPPED *, &first_item),
00392 (timeout == 0 ? INFINITE : timeout->msec ()));
00393 {
00394 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00395 --this->cur_thrs_;
00396 if (retv)
00397 {
00398 if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
00399 {
00400 --this->cur_count_;
00401 this->cur_bytes_ -= msize;
00402 this->cur_length_ -= first_item->total_length ();
00403 return this->cur_count_;
00404 }
00405 else
00406 errno = ESHUTDOWN;
00407 }
00408 }
00409 return -1;
00410 }
00411
00412 int
00413 ACE_Message_Queue_NT::deactivate (void)
00414 {
00415 ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
00416 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00417
00418 int previous_state = this->state_;
00419 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00420 {
00421 this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
00422
00423
00424
00425 DWORD cntr =
00426 this->cur_thrs_ - ACE_static_cast (DWORD, this->cur_count_);
00427 while (cntr-- > 0)
00428 ::PostQueuedCompletionStatus (this->completion_port_,
00429 0,
00430 this->state_,
00431 0);
00432 }
00433 return previous_state;
00434 }
00435
00436 int
00437 ACE_Message_Queue_NT::activate (void)
00438 {
00439 ACE_TRACE ("ACE_Message_Queue_NT::activate");
00440 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00441 int previous_status = this->state_;
00442 this->state_ = ACE_Message_Queue_Base::ACTIVATED;
00443 return previous_status;
00444 }
00445
00446 int
00447 ACE_Message_Queue_NT::pulse (void)
00448 {
00449 ACE_TRACE ("ACE_Message_Queue_NT::pulse");
00450 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
00451
00452 int previous_state = this->state_;
00453 if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
00454 {
00455 this->state_ = ACE_Message_Queue_Base::PULSED;
00456
00457
00458
00459
00460 DWORD cntr =
00461 this->cur_thrs_ - ACE_static_cast (DWORD, this->cur_count_);
00462 while (cntr-- > 0)
00463 ::PostQueuedCompletionStatus (this->completion_port_,
00464 0,
00465 this->state_,
00466 0);
00467 }
00468 return previous_state;
00469 }
00470
00471 void
00472 ACE_Message_Queue_NT::dump (void) const
00473 {
00474 ACE_TRACE ("ACE_Message_Queue_NT::dump");
00475
00476 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00477 switch (this->state_)
00478 {
00479 case ACE_Message_Queue_Base::ACTIVATED:
00480 ACE_DEBUG ((LM_DEBUG,
00481 ACE_LIB_TEXT ("state = ACTIVATED\n")));
00482 break;
00483 case ACE_Message_Queue_Base::DEACTIVATED:
00484 ACE_DEBUG ((LM_DEBUG,
00485 ACE_LIB_TEXT ("state = DEACTIVATED\n")));
00486 break;
00487 case ACE_Message_Queue_Base::PULSED:
00488 ACE_DEBUG ((LM_DEBUG,
00489 ACE_LIB_TEXT ("state = PULSED\n")));
00490 break;
00491 }
00492
00493 ACE_DEBUG ((LM_DEBUG,
00494 ACE_LIB_TEXT ("max_cthrs_ = %d\n")
00495 ACE_LIB_TEXT ("cur_thrs_ = %d\n")
00496 ACE_LIB_TEXT ("cur_bytes = %d\n")
00497 ACE_LIB_TEXT ("cur_length = %d\n")
00498 ACE_LIB_TEXT ("cur_count = %d\n")
00499 ACE_LIB_TEXT ("completion_port_ = %x\n"),
00500 this->max_cthrs_,
00501 this->cur_thrs_,
00502 this->cur_bytes_,
00503 this->cur_length_,
00504 this->cur_count_,
00505 this->completion_port_));
00506 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00507 }
00508
00509 #endif
00510
00511 #endif