00001 #include "ace_pch.h"
00002
00003
00004 #include "ace/Select_Reactor_Base.h"
00005 #include "ace/Reactor.h"
00006 #include "ace/Thread.h"
00007 #include "ace/Synch_T.h"
00008 #include "ace/SOCK_Acceptor.h"
00009 #include "ace/SOCK_Connector.h"
00010 #include "ace/Timer_Heap.h"
00011 #include "ace/Log_Msg.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/Select_Reactor_Base.i"
00015 #endif
00016
00017 ACE_RCSID(ace, Select_Reactor_Base, "$Id: Select_Reactor_Base.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $")
00018
00019 #if defined (ACE_WIN32)
00020 #define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_)
00021 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_)
00022 #else
00023 #define ACE_SELECT_REACTOR_HANDLE(H) (H)
00024 #define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)])
00025 #endif
00026
00027
00028
00029 int
00030 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
00031 {
00032 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
00033 #if defined (ACE_WIN32)
00034
00035
00036 if (handle == ACE_INVALID_HANDLE)
00037 #else
00038 if (handle < 0 || handle >= this->max_size_)
00039 #endif
00040 {
00041 errno = EINVAL;
00042 return 1;
00043 }
00044 else
00045 return 0;
00046 }
00047
00048
00049
00050 int
00051 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
00052 {
00053 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
00054 #if defined (ACE_WIN32)
00055
00056
00057 if (handle != ACE_INVALID_HANDLE)
00058 #else
00059 if (handle >= 0 && handle < this->max_handlep1_)
00060 #endif
00061 return 1;
00062 else
00063 {
00064 errno = EINVAL;
00065 return 0;
00066 }
00067 }
00068
00069 size_t
00070 ACE_Select_Reactor_Handler_Repository::max_handlep1 (void)
00071 {
00072 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::max_handlep1");
00073
00074 return this->max_handlep1_;
00075 }
00076
00077 int
00078 ACE_Select_Reactor_Handler_Repository::open (size_t size)
00079 {
00080 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
00081 this->max_size_ = size;
00082 this->max_handlep1_ = 0;
00083
00084 #if defined (ACE_WIN32)
00085
00086 ACE_NEW_RETURN (this->event_handlers_,
00087 ACE_Event_Tuple[size],
00088 -1);
00089
00090
00091 for (size_t h = 0; h < size; h++)
00092 {
00093 ACE_SELECT_REACTOR_HANDLE (h) = ACE_INVALID_HANDLE;
00094 ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00095 }
00096 #else
00097
00098 ACE_NEW_RETURN (this->event_handlers_,
00099 ACE_Event_Handler *[size],
00100 -1);
00101
00102
00103 for (size_t h = 0; h < size; h++)
00104 ACE_SELECT_REACTOR_EVENT_HANDLER (this, h) = 0;
00105 #endif
00106
00107
00108
00109 return ACE::set_handle_limit (ACE_static_cast (int, size));
00110 }
00111
00112
00113
00114 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
00115 : select_reactor_ (select_reactor),
00116 max_size_ (0),
00117 max_handlep1_ (0),
00118 event_handlers_ (0)
00119 {
00120 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
00121 }
00122
00123 int
00124 ACE_Select_Reactor_Handler_Repository::unbind_all (void)
00125 {
00126
00127 for (int handle = 0;
00128 handle < this->max_handlep1_;
00129 handle++)
00130 this->unbind (ACE_SELECT_REACTOR_HANDLE (handle),
00131 ACE_Event_Handler::ALL_EVENTS_MASK);
00132
00133 return 0;
00134 }
00135
00136 int
00137 ACE_Select_Reactor_Handler_Repository::close (void)
00138 {
00139 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
00140
00141 if (this->event_handlers_ != 0)
00142 {
00143 this->unbind_all ();
00144
00145 delete [] this->event_handlers_;
00146 this->event_handlers_ = 0;
00147 }
00148 return 0;
00149 }
00150
00151
00152
00153 ACE_Event_Handler *
00154 ACE_Select_Reactor_Handler_Repository::find (ACE_HANDLE handle,
00155 size_t *index_p)
00156 {
00157 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find");
00158
00159 ACE_Event_Handler *eh = 0;
00160 ssize_t i;
00161
00162
00163 if (this->handle_in_range (handle))
00164 {
00165 #if defined (ACE_WIN32)
00166 i = 0;
00167
00168 for (; i < this->max_handlep1_; i++)
00169 if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00170 {
00171 eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, i);
00172 break;
00173 }
00174 #else
00175 i = handle;
00176
00177 eh = ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle);
00178 #endif
00179 }
00180 else
00181
00182
00183 i = 0;
00184
00185 if (eh != 0)
00186 {
00187 if (index_p != 0)
00188 *index_p = i;
00189 }
00190 else
00191 errno = ENOENT;
00192
00193 return eh;
00194 }
00195
00196
00197
00198 int
00199 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
00200 ACE_Event_Handler *event_handler,
00201 ACE_Reactor_Mask mask)
00202 {
00203 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
00204
00205 if (handle == ACE_INVALID_HANDLE)
00206 handle = event_handler->get_handle ();
00207
00208 if (this->invalid_handle (handle))
00209 return -1;
00210
00211 #if defined (ACE_WIN32)
00212 int assigned_slot = -1;
00213
00214 for (ssize_t i = 0; i < this->max_handlep1_; i++)
00215 {
00216
00217 if (ACE_SELECT_REACTOR_HANDLE (i) == handle)
00218 {
00219 assigned_slot = i;
00220 break;
00221 }
00222
00223 else if (ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE
00224 && assigned_slot == -1)
00225 assigned_slot = i;
00226 }
00227
00228 if (assigned_slot > -1)
00229
00230 {
00231 ACE_SELECT_REACTOR_HANDLE (assigned_slot) = handle;
00232 ACE_SELECT_REACTOR_EVENT_HANDLER (this, assigned_slot) = event_handler;
00233 }
00234 else if (this->max_handlep1_ < this->max_size_)
00235 {
00236
00237 ACE_SELECT_REACTOR_HANDLE (this->max_handlep1_) = handle;
00238 ACE_SELECT_REACTOR_EVENT_HANDLER (this, this->max_handlep1_) = event_handler;
00239 this->max_handlep1_++;
00240 }
00241 else
00242 {
00243
00244 errno = ENOMEM;
00245 return -1;
00246 }
00247 #else
00248 ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = event_handler;
00249
00250 if (this->max_handlep1_ < handle + 1)
00251 this->max_handlep1_ = handle + 1;
00252 #endif
00253
00254 if (this->select_reactor_.is_suspended_i (handle))
00255 {
00256 this->select_reactor_.bit_ops (handle,
00257 mask,
00258 this->select_reactor_.suspend_set_,
00259 ACE_Reactor::ADD_MASK);
00260 }
00261 else
00262 {
00263 this->select_reactor_.bit_ops (handle,
00264 mask,
00265 this->select_reactor_.wait_set_,
00266 ACE_Reactor::ADD_MASK);
00267
00268
00269
00270
00271 this->select_reactor_.state_changed_ = 1;
00272 }
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284 return 0;
00285 }
00286
00287
00288
00289 int
00290 ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle,
00291 ACE_Reactor_Mask mask)
00292 {
00293 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
00294
00295 size_t slot;
00296 ACE_Event_Handler *eh = this->find (handle, &slot);
00297
00298 if (eh == 0)
00299 return -1;
00300
00301
00302 this->select_reactor_.bit_ops (handle,
00303 mask,
00304 this->select_reactor_.wait_set_,
00305 ACE_Reactor::CLR_MASK);
00306
00307
00308 this->select_reactor_.bit_ops (handle,
00309 mask,
00310 this->select_reactor_.suspend_set_,
00311 ACE_Reactor::CLR_MASK);
00312
00313
00314
00315
00316 this->select_reactor_.state_changed_ = 1;
00317
00318
00319
00320 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
00321 eh->handle_close (handle, mask);
00322
00323
00324
00325
00326 int has_any_wait_mask =
00327 (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
00328 || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
00329 || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
00330 int has_any_suspend_mask =
00331 (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
00332 || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
00333 || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
00334
00335 if (!has_any_wait_mask && !has_any_suspend_mask)
00336 #if defined (ACE_WIN32)
00337 {
00338 ACE_SELECT_REACTOR_HANDLE (slot) = ACE_INVALID_HANDLE;
00339 ACE_SELECT_REACTOR_EVENT_HANDLER (this, slot) = 0;
00340
00341 if (this->max_handlep1_ == (int) slot + 1)
00342 {
00343
00344
00345
00346
00347
00348 int i;
00349
00350 for (i = this->max_handlep1_ - 1;
00351 i >= 0 && ACE_SELECT_REACTOR_HANDLE (i) == ACE_INVALID_HANDLE;
00352 i--)
00353 continue;
00354
00355 this->max_handlep1_ = i + 1;
00356 }
00357 }
00358 #else
00359 {
00360 ACE_SELECT_REACTOR_EVENT_HANDLER (this, handle) = 0;
00361
00362 if (this->max_handlep1_ == handle + 1)
00363 {
00364
00365
00366
00367 ACE_HANDLE wait_rd_max = this->select_reactor_.wait_set_.rd_mask_.max_set ();
00368 ACE_HANDLE wait_wr_max = this->select_reactor_.wait_set_.wr_mask_.max_set ();
00369 ACE_HANDLE wait_ex_max = this->select_reactor_.wait_set_.ex_mask_.max_set ();
00370
00371 ACE_HANDLE suspend_rd_max = this->select_reactor_.suspend_set_.rd_mask_.max_set ();
00372 ACE_HANDLE suspend_wr_max = this->select_reactor_.suspend_set_.wr_mask_.max_set ();
00373 ACE_HANDLE suspend_ex_max = this->select_reactor_.suspend_set_.ex_mask_.max_set ();
00374
00375
00376 this->max_handlep1_ = wait_rd_max;
00377 if (this->max_handlep1_ < wait_wr_max)
00378 this->max_handlep1_ = wait_wr_max;
00379 if (this->max_handlep1_ < wait_ex_max)
00380 this->max_handlep1_ = wait_ex_max;
00381
00382 if (this->max_handlep1_ < suspend_rd_max)
00383 this->max_handlep1_ = suspend_rd_max;
00384 if (this->max_handlep1_ < suspend_wr_max)
00385 this->max_handlep1_ = suspend_wr_max;
00386 if (this->max_handlep1_ < suspend_ex_max)
00387 this->max_handlep1_ = suspend_ex_max;
00388
00389 this->max_handlep1_++;
00390 }
00391 }
00392 #endif
00393
00394 return 0;
00395 }
00396
00397 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
00398 (const ACE_Select_Reactor_Handler_Repository *s)
00399 : rep_ (s),
00400 current_ (-1)
00401 {
00402 this->advance ();
00403 }
00404
00405
00406
00407
00408 int
00409 ACE_Select_Reactor_Handler_Repository_Iterator::next (ACE_Event_Handler *&next_item)
00410 {
00411 int result = 1;
00412
00413 if (this->current_ >= this->rep_->max_handlep1_)
00414 result = 0;
00415 else
00416 next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_,
00417 this->current_);
00418 return result;
00419 }
00420
00421 int
00422 ACE_Select_Reactor_Handler_Repository_Iterator::done (void) const
00423 {
00424 return this->current_ >= this->rep_->max_handlep1_;
00425 }
00426
00427
00428
00429 int
00430 ACE_Select_Reactor_Handler_Repository_Iterator::advance (void)
00431 {
00432 if (this->current_ < this->rep_->max_handlep1_)
00433 this->current_++;
00434
00435 while (this->current_ < this->rep_->max_handlep1_)
00436 if (ACE_SELECT_REACTOR_EVENT_HANDLER (this->rep_, this->current_) != 0)
00437 return 1;
00438 else
00439 this->current_++;
00440
00441 return this->current_ < this->rep_->max_handlep1_;
00442 }
00443
00444
00445
00446 void
00447 ACE_Select_Reactor_Handler_Repository_Iterator::dump (void) const
00448 {
00449 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
00450
00451 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00452 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("rep_ = %u"), this->rep_));
00453 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("current_ = %d"), this->current_));
00454 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00455 }
00456
00457 void
00458 ACE_Select_Reactor_Handler_Repository::dump (void) const
00459 {
00460 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
00461
00462 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00463 ACE_DEBUG ((LM_DEBUG,
00464 ACE_LIB_TEXT ("(%t) max_handlep1_ = %d, max_size_ = %d\n"),
00465 this->max_handlep1_, this->max_size_));
00466 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("[")));
00467
00468 ACE_Event_Handler *eh = 0;
00469
00470 for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
00471 iter.next (eh) != 0;
00472 iter.advance ())
00473 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" (eh = %x, eh->handle_ = %d)"),
00474 eh, eh->get_handle ()));
00475
00476 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT (" ]")));
00477 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00478 }
00479
00480 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
00481
00482 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify (void)
00483 : max_notify_iterations_ (-1)
00484 {
00485 }
00486
00487 void
00488 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
00489 {
00490
00491 if (iterations == 0)
00492 iterations = 1;
00493
00494 this->max_notify_iterations_ = iterations;
00495 }
00496
00497 int
00498 ACE_Select_Reactor_Notify::max_notify_iterations (void)
00499 {
00500 return this->max_notify_iterations_;
00501 }
00502
00503
00504
00505
00506
00507
00508
00509 int
00510 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
00511 ACE_Reactor_Mask mask )
00512 {
00513 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
00514
00515 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00516
00517 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00518
00519 if (this->notify_queue_.is_empty ())
00520 return 0;
00521
00522 ACE_Notification_Buffer *temp;
00523 ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
00524
00525 size_t queue_size = this->notify_queue_.size ();
00526 int number_purged = 0;
00527 size_t i;
00528 for (i = 0; i < queue_size; ++i)
00529 {
00530 if (-1 == this->notify_queue_.dequeue_head (temp))
00531 ACE_ERROR_RETURN ((LM_ERROR,
00532 ACE_LIB_TEXT ("%p\n"),
00533 ACE_LIB_TEXT ("dequeue_head")),
00534 -1);
00535
00536
00537
00538
00539
00540 if ((0 != temp->eh_) &&
00541 (0 == eh || eh == temp->eh_) &&
00542 ACE_BIT_DISABLED (temp->mask_, ~mask))
00543
00544
00545 {
00546 if (-1 == this->free_queue_.enqueue_head (temp))
00547 ACE_ERROR_RETURN ((LM_ERROR,
00548 ACE_LIB_TEXT ("%p\n"),
00549 ACE_LIB_TEXT ("enqueue_head")),
00550 -1);
00551 ++number_purged;
00552 }
00553 else
00554 {
00555
00556
00557
00558
00559 if ((0 != temp->eh_) &&
00560 (0 == eh || eh == temp->eh_))
00561 ACE_CLR_BITS(temp->mask_, mask);
00562 if (-1 == local_queue.enqueue_head (temp))
00563 return -1;
00564 }
00565 }
00566
00567 if (this->notify_queue_.size ())
00568 {
00569 ACE_ASSERT (0);
00570 return -1;
00571 }
00572
00573
00574 queue_size = local_queue.size ();
00575 for (i = 0; i < queue_size; ++i)
00576 {
00577 if (-1 == local_queue.dequeue_head (temp))
00578 ACE_ERROR_RETURN ((LM_ERROR,
00579 ACE_LIB_TEXT ("%p\n"),
00580 ACE_LIB_TEXT ("dequeue_head")),
00581 -1);
00582
00583 if (-1 == this->notify_queue_.enqueue_head (temp))
00584 ACE_ERROR_RETURN ((LM_ERROR,
00585 ACE_LIB_TEXT ("%p\n"),
00586 ACE_LIB_TEXT ("enqueue_head")),
00587 -1);
00588 }
00589
00590 return number_purged;
00591
00592 #else
00593 ACE_UNUSED_ARG (eh);
00594 ACE_UNUSED_ARG (mask);
00595 ACE_NOTSUP_RETURN (-1);
00596 #endif
00597 }
00598
00599 void
00600 ACE_Select_Reactor_Notify::dump (void) const
00601 {
00602 ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
00603
00604 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
00605 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("select_reactor_ = %x"), this->select_reactor_));
00606 this->notification_pipe_.dump ();
00607 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
00608 }
00609
00610 int
00611 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
00612 ACE_Timer_Queue *,
00613 int disable_notify_pipe)
00614 {
00615 ACE_TRACE ("ACE_Select_Reactor_Notify::open");
00616
00617 if (disable_notify_pipe == 0)
00618 {
00619 this->select_reactor_ =
00620 ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
00621
00622 if (select_reactor_ == 0)
00623 {
00624 errno = EINVAL;
00625 return -1;
00626 }
00627
00628 if (this->notification_pipe_.open () == -1)
00629 return -1;
00630 #if defined (F_SETFD)
00631 ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
00632 ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
00633 #endif
00634
00635 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00636 ACE_Notification_Buffer *temp;
00637
00638 ACE_NEW_RETURN (temp,
00639 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00640 -1);
00641
00642 if (this->alloc_queue_.enqueue_head (temp) == -1)
00643 {
00644 delete [] temp;
00645 return -1;
00646 }
00647
00648 for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
00649 if (free_queue_.enqueue_head (temp + i) == -1)
00650 return -1;
00651
00652 #endif
00653
00654
00655
00656 if (ACE::set_flags (this->notification_pipe_.read_handle (),
00657 ACE_NONBLOCK) == -1)
00658 return -1;
00659 else
00660 return this->select_reactor_->register_handler
00661 (this->notification_pipe_.read_handle (),
00662 this,
00663 ACE_Event_Handler::READ_MASK);
00664 }
00665 else
00666 {
00667 this->select_reactor_ = 0;
00668 return 0;
00669 }
00670 }
00671
00672 int
00673 ACE_Select_Reactor_Notify::close (void)
00674 {
00675 ACE_TRACE ("ACE_Select_Reactor_Notify::close");
00676
00677 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00678
00679 ACE_Notification_Buffer **b;
00680
00681 for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
00682 alloc_iter.next (b) != 0;
00683 alloc_iter.advance ())
00684 {
00685 delete [] *b;
00686 *b = 0;
00687 }
00688
00689 this->alloc_queue_.reset ();
00690 this->notify_queue_.reset ();
00691 this->free_queue_.reset ();
00692 #endif
00693
00694 return this->notification_pipe_.close ();
00695 }
00696
00697 int
00698 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
00699 ACE_Reactor_Mask mask,
00700 ACE_Time_Value *timeout)
00701 {
00702 ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
00703
00704
00705
00706 if (this->select_reactor_ == 0)
00707 return 0;
00708
00709 ACE_Notification_Buffer buffer (eh, mask);
00710
00711 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00712
00713 {
00714
00715
00716 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00717
00718
00719
00720
00721
00722
00723
00724 ACE_Notification_Buffer *temp = 0;
00725
00726 if (free_queue_.dequeue_head (temp) == -1)
00727 {
00728
00729 ACE_Notification_Buffer *temp1;
00730
00731 ACE_NEW_RETURN (temp1,
00732 ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
00733 -1);
00734
00735 if (this->alloc_queue_.enqueue_head (temp1) == -1)
00736 {
00737 delete [] temp1;
00738 return -1;
00739 }
00740
00741
00742
00743
00744 for (size_t i = 1;
00745 i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
00746 i++)
00747 this->free_queue_.enqueue_head (temp1 + i);
00748
00749 temp = temp1;
00750 }
00751
00752 ACE_ASSERT (temp != 0);
00753 *temp = buffer;
00754
00755 if (notify_queue_.enqueue_tail (temp) == -1)
00756 return -1;
00757 }
00758 #endif
00759
00760 ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
00761 (char *) &buffer,
00762 sizeof buffer,
00763 timeout);
00764 if (n == -1)
00765 return -1;
00766
00767 return 0;
00768 }
00769
00770
00771
00772
00773 int
00774 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
00775 ACE_Handle_Set &rd_mask)
00776 {
00777 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
00778
00779 ACE_HANDLE read_handle =
00780 this->notification_pipe_.read_handle ();
00781
00782 if (read_handle != ACE_INVALID_HANDLE
00783 && rd_mask.is_set (read_handle))
00784 {
00785 number_of_active_handles--;
00786 rd_mask.clr_bit (read_handle);
00787 return this->handle_input (read_handle);
00788 }
00789 else
00790 return 0;
00791 }
00792
00793
00794 ACE_HANDLE
00795 ACE_Select_Reactor_Notify::notify_handle (void)
00796 {
00797 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
00798
00799 return this->notification_pipe_.read_handle ();
00800 }
00801
00802
00803
00804
00805
00806
00807 int
00808 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
00809 {
00810
00811 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00812 {
00813 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00814
00815 ACE_Notification_Buffer *temp;
00816
00817 ACE_UNUSED_ARG (buffer);
00818
00819
00820 if (notify_queue_.is_empty ())
00821 return 0;
00822
00823 if (this->notify_queue_.dequeue_head (temp) == -1)
00824 ACE_ERROR_RETURN ((LM_ERROR,
00825 ACE_LIB_TEXT ("%p\n"),
00826 ACE_LIB_TEXT ("dequeue_head")),
00827 -1);
00828 if (temp->eh_ != 0)
00829 {
00830
00831
00832 if (this->notify_queue_.enqueue_head (temp) == -1)
00833 {
00834 ACE_ERROR_RETURN ((LM_ERROR,
00835 ACE_LIB_TEXT ("%p\n"),
00836 ACE_LIB_TEXT ("enque_head")),
00837 -1);
00838 }
00839
00840 return 1;
00841 }
00842
00843 if (free_queue_.enqueue_head (temp) == -1)
00844 ACE_ERROR_RETURN ((LM_ERROR,
00845 ACE_LIB_TEXT ("%p\n"),
00846 ACE_LIB_TEXT ("enqueue_head")),
00847 -1);
00848 }
00849 #else
00850
00851
00852
00853
00854
00855 if (buffer.eh_ != 0)
00856 return 1;
00857
00858 #endif
00859
00860
00861 return 0;
00862 }
00863
00864 int
00865 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
00866 {
00867 int result = 0;
00868 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
00869
00870 {
00871
00872
00873 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
00874
00875 ACE_Notification_Buffer *temp;
00876
00877 if (notify_queue_.is_empty ())
00878 return 0;
00879 else if (notify_queue_.dequeue_head (temp) == -1)
00880 ACE_ERROR_RETURN ((LM_ERROR,
00881 ACE_LIB_TEXT ("%p\n"),
00882 ACE_LIB_TEXT ("dequeue_head")),
00883 -1);
00884 buffer = *temp;
00885 if (free_queue_.enqueue_head (temp) == -1)
00886 ACE_ERROR_RETURN ((LM_ERROR,
00887 ACE_LIB_TEXT ("%p\n"),
00888 ACE_LIB_TEXT ("enqueue_head")),
00889 -1);
00890 }
00891
00892
00893
00894
00895
00896
00897 if (buffer.eh_ != 0)
00898 {
00899
00900 switch (buffer.mask_)
00901 {
00902 case ACE_Event_Handler::READ_MASK:
00903 case ACE_Event_Handler::ACCEPT_MASK:
00904 result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00905 break;
00906 case ACE_Event_Handler::WRITE_MASK:
00907 result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00908 break;
00909 case ACE_Event_Handler::EXCEPT_MASK:
00910 result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00911 break;
00912 default:
00913
00914 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
00915 }
00916 if (result == -1)
00917 buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00918 ACE_Event_Handler::EXCEPT_MASK);
00919 }
00920 #else
00921
00922
00923
00924
00925
00926 if (buffer.eh_ != 0)
00927 {
00928 switch (buffer.mask_)
00929 {
00930 case ACE_Event_Handler::READ_MASK:
00931 case ACE_Event_Handler::ACCEPT_MASK:
00932 result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
00933 break;
00934 case ACE_Event_Handler::WRITE_MASK:
00935 result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
00936 break;
00937 case ACE_Event_Handler::EXCEPT_MASK:
00938 result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
00939 break;
00940 case ACE_Event_Handler::QOS_MASK:
00941 result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
00942 break;
00943 case ACE_Event_Handler::GROUP_QOS_MASK:
00944 result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
00945 break;
00946 default:
00947
00948 ACE_ERROR ((LM_ERROR,
00949 ACE_LIB_TEXT ("invalid mask = %d\n"),
00950 buffer.mask_));
00951 }
00952 if (result == -1)
00953 buffer.eh_->handle_close (ACE_INVALID_HANDLE,
00954 ACE_Event_Handler::EXCEPT_MASK);
00955 }
00956
00957 #endif
00958
00959 return 1;
00960 }
00961
00962 int
00963 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
00964 ACE_Notification_Buffer &buffer)
00965 {
00966 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
00967
00968 ssize_t n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
00969
00970 if (n > 0)
00971 {
00972
00973 if (n != sizeof buffer)
00974 {
00975 ssize_t remainder = sizeof buffer - n;
00976
00977
00978
00979
00980
00981 if (ACE::recv (handle,
00982 ((char *) &buffer) + n,
00983 remainder) != remainder)
00984 return -1;
00985 }
00986
00987
00988 return 1;
00989 }
00990
00991
00992 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
00993 return -1;
00994
00995 return 0;
00996 }
00997
00998
00999 int
01000 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
01001 {
01002 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01003
01004
01005
01006 int number_dispatched = 0;
01007 int result = 0;
01008 ACE_Notification_Buffer buffer;
01009
01010 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
01011 {
01012
01013
01014 if (this->dispatch_notify (buffer) > 0)
01015 number_dispatched++;
01016
01017
01018
01019
01020 if (number_dispatched == this->max_notify_iterations_)
01021 break;
01022 }
01023
01024
01025
01026 if (result < 0)
01027 number_dispatched = -1;
01028
01029
01030
01031
01032
01033 this->select_reactor_->renew ();
01034 return number_dispatched;
01035 }
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047
01048 int
01049 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
01050 ACE_Reactor_Mask mask,
01051 ACE_Select_Reactor_Handle_Set &handle_set,
01052 int ops)
01053 {
01054 ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
01055 if (this->handler_rep_.handle_in_range (handle) == 0)
01056 return -1;
01057
01058 #if !defined (ACE_WIN32)
01059 ACE_Sig_Guard sb;
01060 #endif
01061
01062 ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
01063 u_long omask = ACE_Event_Handler::NULL_MASK;
01064
01065
01066
01067 if (handle_set.rd_mask_.is_set (handle))
01068 ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
01069 if (handle_set.wr_mask_.is_set (handle))
01070 ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
01071 if (handle_set.ex_mask_.is_set (handle))
01072 ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
01073
01074 switch (ops)
01075 {
01076 case ACE_Reactor::GET_MASK:
01077
01078
01079 break;
01080 case ACE_Reactor::CLR_MASK:
01081 ptmf = &ACE_Handle_Set::clr_bit;
01082
01083 case ACE_Reactor::SET_MASK:
01084
01085 case ACE_Reactor::ADD_MASK:
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
01097 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
01098 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
01099 {
01100 (handle_set.rd_mask_.*ptmf) (handle);
01101 }
01102 else if (ops == ACE_Reactor::SET_MASK)
01103 handle_set.rd_mask_.clr_bit (handle);
01104
01105
01106 if (ACE_BIT_ENABLED (mask,
01107 ACE_Event_Handler::WRITE_MASK)
01108 || ACE_BIT_ENABLED (mask,
01109 ACE_Event_Handler::CONNECT_MASK))
01110 {
01111 (handle_set.wr_mask_.*ptmf) (handle);
01112 }
01113 else if (ops == ACE_Reactor::SET_MASK)
01114 handle_set.wr_mask_.clr_bit (handle);
01115
01116
01117
01118 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
01119 #if defined (ACE_WIN32)
01120 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
01121 #endif
01122 )
01123 {
01124 (handle_set.ex_mask_.*ptmf) (handle);
01125 }
01126 else if (ops == ACE_Reactor::SET_MASK)
01127 handle_set.ex_mask_.clr_bit (handle);
01128 break;
01129 default:
01130 return -1;
01131 }
01132 return omask;
01133 }
01134
01135 int
01136 ACE_Select_Reactor_Impl::resumable_handler (void)
01137 {
01138
01139
01140
01141 return 0;
01142 }
01143
01144 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
01145 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
01146 template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>;
01147 template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>;
01148 template class ACE_Node <ACE_Notification_Buffer *>;
01149 #endif
01150 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
01151 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
01152 #pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *>
01153 #pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>
01154 #pragma instantiate ACE_Node <ACE_Notification_Buffer *>
01155 #endif
01156 #endif