00001 #include "ace_pch.h"
00002
00003
00004
00005 #include "ace/POSIX_Proactor.h"
00006
00007 #if defined (ACE_HAS_AIO_CALLS)
00008
00009 #include "ace/ACE.h"
00010 #include "ace/Task_T.h"
00011 #include "ace/Log_Msg.h"
00012 #include "ace/Object_Manager.h"
00013
00014 #if !defined (__ACE_INLINE__)
00015 #include "ace/POSIX_Proactor.i"
00016 #endif
00017
00018 # if defined (ACE_HAS_SYSINFO)
00019 # include <sys/systeminfo.h>
00020 # endif
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
00031 {
00032 public:
00033
00034 ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
00035 const void *act = 0,
00036 ACE_HANDLE event = ACE_INVALID_HANDLE,
00037 int priority = 0,
00038 int signal_number = ACE_SIGRTMIN);
00039
00040
00041 virtual ~ACE_POSIX_Wakeup_Completion (void);
00042
00043
00044
00045 virtual void complete (size_t bytes_transferred = 0,
00046 int success = 1,
00047 const void *completion_key = 0,
00048 u_long error = 0);
00049 };
00050
00051
00052 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
00053 : os_id_ (OS_UNDEFINED)
00054 {
00055 #if defined(sun)
00056
00057 os_id_ = OS_SUN;
00058
00059 char Buf [32];
00060
00061 ::memset(Buf,0,sizeof(Buf));
00062
00063 ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1);
00064
00065 if (ACE_OS_String::strcasecmp (Buf , "5.6") == 0)
00066 os_id_ = OS_SUN_56;
00067 else if (ACE_OS_String::strcasecmp (Buf , "5.7") == 0)
00068 os_id_ = OS_SUN_57;
00069 else if (ACE_OS_String::strcasecmp (Buf , "5.8") == 0)
00070 os_id_ = OS_SUN_58;
00071
00072 #elif defined(HPUX)
00073
00074 os_id_ = OS_HPUX;
00075
00076 #elif defined(__sgi)
00077
00078 os_id_ = OS_IRIX;
00079
00080 #elif defined(__OpenBSD)
00081
00082 os_id_ = OS_OPENBSD;
00083
00084
00085
00086
00087
00088 #endif
00089 }
00090
00091 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
00092 {
00093 this->close ();
00094 }
00095
00096 int
00097 ACE_POSIX_Proactor::close (void)
00098 {
00099 return 0;
00100 }
00101
00102 int
00103 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
00104 const void *completion_key)
00105 {
00106 ACE_UNUSED_ARG (handle);
00107 ACE_UNUSED_ARG (completion_key);
00108 return 0;
00109 }
00110
00111 int
00112 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
00113 {
00114 return 0;
00115 }
00116
00117 int
00118 ACE_POSIX_Proactor::close_dispatch_threads (int)
00119 {
00120 return 0;
00121 }
00122
00123 size_t
00124 ACE_POSIX_Proactor::number_of_threads (void) const
00125 {
00126
00127 ACE_NOTSUP_RETURN (0);
00128 }
00129
00130 void
00131 ACE_POSIX_Proactor::number_of_threads (size_t threads)
00132 {
00133
00134 ACE_UNUSED_ARG (threads);
00135 }
00136
00137 ACE_HANDLE
00138 ACE_POSIX_Proactor::get_handle (void) const
00139 {
00140 return ACE_INVALID_HANDLE;
00141 }
00142
00143 ACE_Asynch_Read_Stream_Impl *
00144 ACE_POSIX_Proactor::create_asynch_read_stream (void)
00145 {
00146 ACE_Asynch_Read_Stream_Impl *implementation = 0;
00147 ACE_NEW_RETURN (implementation,
00148 ACE_POSIX_Asynch_Read_Stream (this),
00149 0);
00150 return implementation;
00151 }
00152
00153 ACE_Asynch_Read_Stream_Result_Impl *
00154 ACE_POSIX_Proactor::create_asynch_read_stream_result (ACE_Handler &handler,
00155 ACE_HANDLE handle,
00156 ACE_Message_Block &message_block,
00157 size_t bytes_to_read,
00158 const void* act,
00159 ACE_HANDLE event,
00160 int priority,
00161 int signal_number)
00162 {
00163 ACE_Asynch_Read_Stream_Result_Impl *implementation;
00164 ACE_NEW_RETURN (implementation,
00165 ACE_POSIX_Asynch_Read_Stream_Result (handler,
00166 handle,
00167 message_block,
00168 bytes_to_read,
00169 act,
00170 event,
00171 priority,
00172 signal_number),
00173 0);
00174 return implementation;
00175 }
00176
00177
00178 ACE_Asynch_Write_Stream_Impl *
00179 ACE_POSIX_Proactor::create_asynch_write_stream (void)
00180 {
00181 ACE_Asynch_Write_Stream_Impl *implementation = 0;
00182 ACE_NEW_RETURN (implementation,
00183 ACE_POSIX_Asynch_Write_Stream (this),
00184 0);
00185 return implementation;
00186 }
00187
00188 ACE_Asynch_Write_Stream_Result_Impl *
00189 ACE_POSIX_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
00190 ACE_HANDLE handle,
00191 ACE_Message_Block &message_block,
00192 size_t bytes_to_write,
00193 const void* act,
00194 ACE_HANDLE event,
00195 int priority,
00196 int signal_number)
00197 {
00198 ACE_Asynch_Write_Stream_Result_Impl *implementation;
00199 ACE_NEW_RETURN (implementation,
00200 ACE_POSIX_Asynch_Write_Stream_Result (handler,
00201 handle,
00202 message_block,
00203 bytes_to_write,
00204 act,
00205 event,
00206 priority,
00207 signal_number),
00208 0);
00209 return implementation;
00210 }
00211
00212
00213 ACE_Asynch_Read_File_Impl *
00214 ACE_POSIX_Proactor::create_asynch_read_file (void)
00215 {
00216 ACE_Asynch_Read_File_Impl *implementation = 0;
00217 ACE_NEW_RETURN (implementation,
00218 ACE_POSIX_Asynch_Read_File (this),
00219 0);
00220 return implementation;
00221 }
00222
00223 ACE_Asynch_Read_File_Result_Impl *
00224 ACE_POSIX_Proactor::create_asynch_read_file_result (ACE_Handler &handler,
00225 ACE_HANDLE handle,
00226 ACE_Message_Block &message_block,
00227 size_t bytes_to_read,
00228 const void* act,
00229 u_long offset,
00230 u_long offset_high,
00231 ACE_HANDLE event,
00232 int priority,
00233 int signal_number)
00234 {
00235 ACE_Asynch_Read_File_Result_Impl *implementation;
00236 ACE_NEW_RETURN (implementation,
00237 ACE_POSIX_Asynch_Read_File_Result (handler,
00238 handle,
00239 message_block,
00240 bytes_to_read,
00241 act,
00242 offset,
00243 offset_high,
00244 event,
00245 priority,
00246 signal_number),
00247 0);
00248 return implementation;
00249 }
00250
00251
00252 ACE_Asynch_Write_File_Impl *
00253 ACE_POSIX_Proactor::create_asynch_write_file (void)
00254 {
00255 ACE_Asynch_Write_File_Impl *implementation = 0;
00256 ACE_NEW_RETURN (implementation,
00257 ACE_POSIX_Asynch_Write_File (this),
00258 0);
00259 return implementation;
00260 }
00261
00262 ACE_Asynch_Write_File_Result_Impl *
00263 ACE_POSIX_Proactor::create_asynch_write_file_result (ACE_Handler &handler,
00264 ACE_HANDLE handle,
00265 ACE_Message_Block &message_block,
00266 size_t bytes_to_write,
00267 const void* act,
00268 u_long offset,
00269 u_long offset_high,
00270 ACE_HANDLE event,
00271 int priority,
00272 int signal_number)
00273 {
00274 ACE_Asynch_Write_File_Result_Impl *implementation;
00275 ACE_NEW_RETURN (implementation,
00276 ACE_POSIX_Asynch_Write_File_Result (handler,
00277 handle,
00278 message_block,
00279 bytes_to_write,
00280 act,
00281 offset,
00282 offset_high,
00283 event,
00284 priority,
00285 signal_number),
00286 0);
00287 return implementation;
00288 }
00289
00290
00291 ACE_Asynch_Read_Dgram_Impl *
00292 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
00293 {
00294 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
00295 ACE_NEW_RETURN (implementation,
00296 ACE_POSIX_Asynch_Read_Dgram (this),
00297 0);
00298 return implementation;
00299 }
00300
00301 ACE_Asynch_Read_Dgram_Result_Impl *
00302 ACE_POSIX_Proactor::create_asynch_read_dgram_result (ACE_Handler &handler,
00303 ACE_HANDLE handle,
00304 ACE_Message_Block *message_block,
00305 size_t bytes_to_read,
00306 int flags,
00307 int protocol_family,
00308 const void* act,
00309 ACE_HANDLE event ,
00310 int priority ,
00311 int signal_number)
00312 {
00313 ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
00314 ACE_NEW_RETURN (implementation,
00315 ACE_POSIX_Asynch_Read_Dgram_Result(handler,
00316 handle,
00317 message_block,
00318 bytes_to_read,
00319 flags,
00320 protocol_family,
00321 act,
00322 event,
00323 priority,
00324 signal_number),
00325 0);
00326
00327 return implementation;
00328 }
00329
00330
00331 ACE_Asynch_Write_Dgram_Impl *
00332 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
00333 {
00334 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
00335 ACE_NEW_RETURN (implementation,
00336 ACE_POSIX_Asynch_Write_Dgram (this),
00337 0);
00338
00339 return implementation;
00340 }
00341
00342 ACE_Asynch_Write_Dgram_Result_Impl *
00343 ACE_POSIX_Proactor::create_asynch_write_dgram_result (ACE_Handler &handler,
00344 ACE_HANDLE handle,
00345 ACE_Message_Block *message_block,
00346 size_t bytes_to_write,
00347 int flags,
00348 const void* act,
00349 ACE_HANDLE event,
00350 int priority ,
00351 int signal_number)
00352 {
00353 ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
00354 ACE_NEW_RETURN (implementation,
00355 ACE_POSIX_Asynch_Write_Dgram_Result(handler,
00356 handle,
00357 message_block,
00358 bytes_to_write,
00359 flags,
00360 act,
00361 event,
00362 priority,
00363 signal_number),
00364 0);
00365
00366 return implementation;
00367 }
00368
00369
00370 ACE_Asynch_Accept_Impl *
00371 ACE_POSIX_Proactor::create_asynch_accept (void)
00372 {
00373 ACE_Asynch_Accept_Impl *implementation = 0;
00374 ACE_NEW_RETURN (implementation,
00375 ACE_POSIX_Asynch_Accept (this),
00376 0);
00377
00378 return implementation;
00379 }
00380
00381 ACE_Asynch_Accept_Result_Impl *
00382 ACE_POSIX_Proactor::create_asynch_accept_result (ACE_Handler &handler,
00383 ACE_HANDLE listen_handle,
00384 ACE_HANDLE accept_handle,
00385 ACE_Message_Block &message_block,
00386 size_t bytes_to_read,
00387 const void* act,
00388 ACE_HANDLE event,
00389 int priority,
00390 int signal_number)
00391 {
00392 ACE_Asynch_Accept_Result_Impl *implementation;
00393 ACE_NEW_RETURN (implementation,
00394 ACE_POSIX_Asynch_Accept_Result (handler,
00395 listen_handle,
00396 accept_handle,
00397 message_block,
00398 bytes_to_read,
00399 act,
00400 event,
00401 priority,
00402 signal_number),
00403 0);
00404 return implementation;
00405 }
00406
00407
00408 ACE_Asynch_Connect_Impl *
00409 ACE_POSIX_Proactor::create_asynch_connect (void)
00410 {
00411 ACE_Asynch_Connect_Impl *implementation = 0;
00412 ACE_NEW_RETURN (implementation,
00413 ACE_POSIX_Asynch_Connect (this),
00414 0);
00415
00416 return implementation;
00417 }
00418
00419 ACE_Asynch_Connect_Result_Impl *
00420 ACE_POSIX_Proactor::create_asynch_connect_result (ACE_Handler &handler,
00421 ACE_HANDLE connect_handle,
00422 const void* act,
00423 ACE_HANDLE event,
00424 int priority,
00425 int signal_number)
00426 {
00427 ACE_Asynch_Connect_Result_Impl *implementation;
00428 ACE_NEW_RETURN (implementation,
00429 ACE_POSIX_Asynch_Connect_Result (handler,
00430 connect_handle,
00431 act,
00432 event,
00433 priority,
00434 signal_number),
00435 0);
00436 return implementation;
00437 }
00438
00439
00440 ACE_Asynch_Transmit_File_Impl *
00441 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
00442 {
00443 ACE_Asynch_Transmit_File_Impl *implementation = 0;
00444 ACE_NEW_RETURN (implementation,
00445 ACE_POSIX_Asynch_Transmit_File (this),
00446 0);
00447 return implementation;
00448 }
00449
00450 ACE_Asynch_Transmit_File_Result_Impl *
00451 ACE_POSIX_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
00452 ACE_HANDLE socket,
00453 ACE_HANDLE file,
00454 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
00455 size_t bytes_to_write,
00456 u_long offset,
00457 u_long offset_high,
00458 size_t bytes_per_send,
00459 u_long flags,
00460 const void *act,
00461 ACE_HANDLE event,
00462 int priority,
00463 int signal_number)
00464 {
00465 ACE_Asynch_Transmit_File_Result_Impl *implementation;
00466 ACE_NEW_RETURN (implementation,
00467 ACE_POSIX_Asynch_Transmit_File_Result (handler,
00468 socket,
00469 file,
00470 header_and_trailer,
00471 bytes_to_write,
00472 offset,
00473 offset_high,
00474 bytes_per_send,
00475 flags,
00476 act,
00477 event,
00478 priority,
00479 signal_number),
00480 0);
00481 return implementation;
00482 }
00483
00484 ACE_Asynch_Result_Impl *
00485 ACE_POSIX_Proactor::create_asynch_timer (ACE_Handler &handler,
00486 const void *act,
00487 const ACE_Time_Value &tv,
00488 ACE_HANDLE event,
00489 int priority,
00490 int signal_number)
00491 {
00492 ACE_Asynch_Result_Impl *implementation;
00493 ACE_NEW_RETURN (implementation,
00494 ACE_POSIX_Asynch_Timer (handler,
00495 act,
00496 tv,
00497 event,
00498 priority,
00499 signal_number),
00500 0);
00501 return implementation;
00502 }
00503
00504 #if 0
00505 int
00506 ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
00507 {
00508
00509
00510
00511 ACE_Time_Value timeout (0, 0);
00512 int result = 0;
00513
00514 for (;;)
00515 {
00516 result = this->handle_events (timeout);
00517 if (result != 0 || errno == ETIME)
00518 break;
00519 }
00520
00521
00522
00523 return result == -1 ? -1 : 0;
00524 }
00525
00526 int
00527 ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
00528 ACE_Reactor_Mask close_mask)
00529 {
00530 ACE_UNUSED_ARG (close_mask);
00531 ACE_UNUSED_ARG (handle);
00532
00533 return this->close ();
00534 }
00535 #endif
00536
00537 void
00538 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
00539 size_t bytes_transferred,
00540 const void *,
00541 u_long error)
00542 {
00543 ACE_SEH_TRY
00544 {
00545
00546 asynch_result->complete (bytes_transferred,
00547 error ? 0 : 1,
00548 0,
00549 error);
00550 }
00551 ACE_SEH_FINALLY
00552 {
00553
00554 delete asynch_result;
00555 }
00556 }
00557
00558 int
00559 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
00560 {
00561 ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
00562
00563 for (int ci = 0; ci < how_many; ci++)
00564 {
00565 ACE_NEW_RETURN (wakeup_completion,
00566 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_),
00567 -1);
00568 if (this->post_completion (wakeup_completion) == -1)
00569 return -1;
00570 }
00571
00572 return 0;
00573 }
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602 class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
00603 {
00604 public:
00605
00606
00607 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
00608
00609
00610 virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
00611
00612
00613 int notify ();
00614
00615
00616
00617 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
00618
00619 private:
00620
00621 ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
00622
00623
00624 ACE_Message_Block message_block_;
00625
00626
00627
00628 ACE_Pipe pipe_;
00629
00630
00631 ACE_POSIX_Asynch_Read_Stream read_stream_;
00632
00633
00634 ACE_AIOCB_Notify_Pipe_Manager (void);
00635 };
00636
00637 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
00638 : posix_aiocb_proactor_ (posix_aiocb_proactor),
00639 message_block_ (sizeof (2)),
00640 read_stream_ (posix_aiocb_proactor)
00641 {
00642
00643 this->pipe_.open ();
00644
00645
00646 ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
00647
00648
00649 posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
00650
00651
00652 if (this->read_stream_.open (*this,
00653 this->pipe_.read_handle (),
00654 0,
00655 0)
00656 == -1)
00657 ACE_ERROR ((LM_ERROR,
00658 "%N:%l:%p\n",
00659 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00660 "Open on Read Stream failed"));
00661
00662
00663 if (this->read_stream_.read (this->message_block_,
00664 1,
00665 0,
00666 0)
00667 == -1)
00668 ACE_ERROR ((LM_ERROR,
00669 "%N:%l:%p\n",
00670 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
00671 "Read from pipe failed"));
00672 }
00673
00674 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
00675 {
00676
00677 this->read_stream_.cancel ();
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689 ACE_HANDLE h = this->pipe_.write_handle ();
00690 if (h != ACE_INVALID_HANDLE)
00691 ACE_OS::closesocket (h);
00692
00693 h = this->pipe_.read_handle ();
00694 if ( h != ACE_INVALID_HANDLE)
00695 ACE_OS::closesocket (h);
00696
00697 }
00698
00699
00700 int
00701 ACE_AIOCB_Notify_Pipe_Manager::notify ()
00702 {
00703
00704 char char_send = 0;
00705 int ret_val = ACE::send (this->pipe_.write_handle (),
00706 &char_send,
00707 sizeof (char_send));
00708
00709 if (ret_val < 0)
00710 {
00711 if (errno != EWOULDBLOCK)
00712 #if 0
00713 ACE_ERROR ((LM_ERROR,
00714 ACE_LIB_TEXT ("(%P %t):%p\n"),
00715 ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
00716 ACE_LIB_TEXT ("Error:Writing on to notify pipe failed")));
00717 #endif
00718 return -1;
00719 }
00720
00721 return 0;
00722 }
00723
00724 void
00725 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
00726 (const ACE_Asynch_Read_Stream::Result & )
00727 {
00728
00729
00730
00731
00732 if (this->message_block_.length () > 0)
00733 this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
00734
00735
00736
00737 if (-1 == this->read_stream_.read (this->message_block_,
00738 1,
00739 0,
00740 0))
00741 ACE_ERROR ((LM_ERROR,
00742 ACE_LIB_TEXT ("%N:%l:(%P | %t):%p\n"),
00743 ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
00744 ACE_LIB_TEXT ("Read from pipe failed")));
00745
00746
00747
00748
00749 }
00750
00751
00752 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
00753 : aiocb_notify_pipe_manager_ (0),
00754 aiocb_list_ (0),
00755 result_list_ (0),
00756 aiocb_list_max_size_ (max_aio_operations),
00757 aiocb_list_cur_size_ (0),
00758 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00759 num_deferred_aiocb_ (0),
00760 num_started_aio_ (0)
00761 {
00762
00763 check_max_aio_num ();
00764
00765 this->create_result_aiocb_list ();
00766
00767 this->create_notify_manager ();
00768
00769
00770
00771 this->get_asynch_pseudo_task().start ();
00772
00773 }
00774
00775
00776 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
00777 ACE_POSIX_Proactor::Proactor_Type)
00778 : aiocb_notify_pipe_manager_ (0),
00779 aiocb_list_ (0),
00780 result_list_ (0),
00781 aiocb_list_max_size_ (max_aio_operations),
00782 aiocb_list_cur_size_ (0),
00783 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00784 num_deferred_aiocb_ (0),
00785 num_started_aio_ (0)
00786 {
00787
00788 this->check_max_aio_num ();
00789
00790 this->create_result_aiocb_list ();
00791
00792
00793
00794 }
00795
00796
00797 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
00798 {
00799 this->close();
00800 }
00801
00802 int
00803 ACE_POSIX_AIOCB_Proactor::close (void)
00804 {
00805
00806 this->get_asynch_pseudo_task().stop ();
00807
00808 this->delete_notify_manager ();
00809
00810 this->clear_result_queue ();
00811
00812 return this->delete_result_aiocb_list ();
00813 }
00814
00815 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
00816 {
00817 notify_pipe_read_handle_ = h;
00818 }
00819
00820 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
00821 {
00822 if (aiocb_list_ != 0)
00823 return 0;
00824
00825 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00826
00827 ACE_NEW_RETURN (result_list_,
00828 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00829 -1);
00830
00831
00832 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00833 {
00834 aiocb_list_[ai] = 0;
00835 result_list_[ai] = 0;
00836 }
00837
00838 return 0;
00839 }
00840
00841 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
00842 {
00843 if (aiocb_list_ == 0)
00844 return 0;
00845
00846 size_t ai;
00847
00848
00849
00850 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00851 if (this->aiocb_list_[ai] != 0)
00852 this->cancel_aiocb (result_list_[ai]);
00853
00854 int num_pending = 0;
00855
00856 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00857 {
00858 if (this->aiocb_list_[ai] == 0 )
00859 continue;
00860
00861
00862 int error_status = 0;
00863 size_t transfer_count = 0;
00864 int flg_completed = this->get_result_status (result_list_[ai],
00865 error_status,
00866 transfer_count);
00867
00868
00869 if (flg_completed == 0)
00870 {
00871 num_pending++;
00872 #if 0
00873 char * errtxt = ACE_OS::strerror (error_status);
00874 if (errtxt == 0)
00875 errtxt ="?????????";
00876
00877 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00878 "WRITE":"READ" ;
00879
00880
00881 ACE_ERROR ((LM_ERROR,
00882 ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00883 ai,
00884 op,
00885 error_status,
00886 transfer_count,
00887 errtxt));
00888 #endif
00889 }
00890 else
00891 {
00892 delete this->result_list_[ai];
00893 this->result_list_[ai] = 0;
00894 this->aiocb_list_[ai] = 0;
00895 }
00896 }
00897
00898
00899
00900
00901
00902
00903 ACE_DEBUG
00904 ((LM_DEBUG,
00905 ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00906 ACE_LIB_TEXT(" number pending AIO=%d\n"),
00907 num_pending));
00908
00909 delete [] this->aiocb_list_;
00910 this->aiocb_list_ = 0;
00911
00912 delete [] this->result_list_;
00913 this->result_list_ = 0;
00914
00915 return (num_pending == 0 ? 0 : -1);
00916
00917 }
00918
00919 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
00920 {
00921 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00922
00923
00924
00925
00926
00927 if (max_os_aio_num > 0 &&
00928 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00929 aiocb_list_max_size_ = max_os_aio_num;
00930
00931 #if defined (HPUX)
00932
00933
00934
00935
00936
00937 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00938 if (max_os_listio_num > 0
00939 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00940 aiocb_list_max_size_ = max_os_listio_num;
00941 #endif
00942
00943
00944
00945
00946 if (aiocb_list_max_size_ <= 0
00947 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00948 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00949
00950
00951
00952 int max_num_files = ACE::max_handles ();
00953
00954 if (max_num_files > 0
00955 && aiocb_list_max_size_ > (unsigned long) max_num_files)
00956 {
00957 ACE::set_handle_limit (aiocb_list_max_size_);
00958
00959 max_num_files = ACE::max_handles ();
00960 }
00961
00962 if (max_num_files > 0
00963 && aiocb_list_max_size_ > (unsigned long) max_num_files)
00964 aiocb_list_max_size_ = (unsigned long) max_num_files;
00965
00966 ACE_DEBUG ((LM_DEBUG,
00967 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
00968 aiocb_list_max_size_));
00969
00970 #if defined(__sgi)
00971
00972 ACE_DEBUG((LM_DEBUG,
00973 ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985 aioinit_t aioinit;
00986
00987 aioinit.aio_threads = 10;
00988 aioinit.aio_locks = 20;
00989
00990 aioinit.aio_num = aiocb_list_max_size_;
00991 aioinit.aio_usedba = 0;
00992 aioinit.aio_debug = 0;
00993 aioinit.aio_numusers = 100;
00994 aioinit.aio_reserved[0] = 0;
00995 aioinit.aio_reserved[1] = 0;
00996 aioinit.aio_reserved[2] = 0;
00997
00998 aio_sgi_init (&aioinit);
00999
01000 #endif
01001
01002 return;
01003 }
01004
01005 void
01006 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
01007 {
01008
01009
01010
01011 if (aiocb_notify_pipe_manager_ == 0)
01012 ACE_NEW (aiocb_notify_pipe_manager_,
01013 ACE_AIOCB_Notify_Pipe_Manager (this));
01014 }
01015
01016 void
01017 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
01018 {
01019
01020
01021
01022 delete aiocb_notify_pipe_manager_;
01023 aiocb_notify_pipe_manager_ = 0;
01024 }
01025
01026 int
01027 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
01028 {
01029
01030 ACE_Countdown_Time countdown (&wait_time);
01031 return this->handle_events_i (wait_time.msec ());
01032 }
01033
01034 int
01035 ACE_POSIX_AIOCB_Proactor::handle_events (void)
01036 {
01037 return this->handle_events_i (ACE_INFINITE);
01038 }
01039
01040 int
01041 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num)
01042 {
01043 ACE_UNUSED_ARG (sig_num);
01044
01045 return this->aiocb_notify_pipe_manager_->notify ();
01046 }
01047
01048 int
01049 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
01050 {
01051 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01052
01053 int ret_val = this->putq_result (result);
01054
01055 return ret_val;
01056 }
01057
01058 int
01059 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
01060 {
01061
01062
01063
01064 if (!result)
01065 return -1;
01066
01067 int sig_num = result->signal_number ();
01068 int ret_val = this->result_queue_.enqueue_tail (result);
01069
01070 if (ret_val == -1)
01071 ACE_ERROR_RETURN ((LM_ERROR,
01072 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01073 -1);
01074
01075 this->notify_completion (sig_num);
01076
01077 return 0;
01078 }
01079
01080 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void)
01081 {
01082 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01083
01084
01085 ACE_POSIX_Asynch_Result* result = 0;
01086
01087 if (this->result_queue_.dequeue_head (result) != 0)
01088 return 0;
01089
01090
01091
01092
01093
01094
01095
01096
01097 return result;
01098 }
01099
01100 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
01101 {
01102 int ret_val = 0;
01103 ACE_POSIX_Asynch_Result* result = 0;
01104
01105 while ((result = this->getq_result ()) != 0)
01106 {
01107 delete result;
01108 ret_val++;
01109 }
01110
01111 return ret_val;
01112 }
01113
01114 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
01115 {
01116 int ret_val = 0;
01117 ACE_POSIX_Asynch_Result* result = 0;
01118
01119 while ((result = this->getq_result ()) != 0)
01120 {
01121 this->application_specific_code
01122 (result,
01123 result->bytes_transferred(),
01124 0,
01125 result->error());
01126
01127 ret_val++;
01128 }
01129
01130 return ret_val;
01131 }
01132
01133 int
01134 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
01135 {
01136 int result_suspend = 0;
01137 int retval= 0;
01138
01139 if (milli_seconds == ACE_INFINITE)
01140
01141 result_suspend = aio_suspend (aiocb_list_,
01142 aiocb_list_max_size_,
01143 0);
01144 else
01145 {
01146
01147 timespec timeout;
01148 timeout.tv_sec = milli_seconds / 1000;
01149 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000;
01150 result_suspend = aio_suspend (aiocb_list_,
01151 aiocb_list_max_size_,
01152 &timeout);
01153 }
01154
01155
01156 if (result_suspend == -1)
01157 {
01158 if (errno != EAGAIN &&
01159 errno != EINTR )
01160 ACE_ERROR ((LM_ERROR,
01161 "%N:%l:(%P | %t)::%p\n",
01162 "ACE_POSIX_AIOCB_Proactor::handle_events:"
01163 "aio_suspend failed\n"));
01164
01165
01166
01167 }
01168 else
01169 {
01170 size_t index = 0;
01171 size_t count = aiocb_list_max_size_;
01172 int error_status = 0;
01173 size_t transfer_count = 0;
01174
01175 for (;; retval++)
01176 {
01177 ACE_POSIX_Asynch_Result *asynch_result =
01178 find_completed_aio (error_status,
01179 transfer_count,
01180 index,
01181 count);
01182
01183 if (asynch_result == 0)
01184 break;
01185
01186
01187 this->application_specific_code (asynch_result,
01188 transfer_count,
01189 0,
01190 error_status);
01191 }
01192 }
01193
01194
01195 retval += this->process_result_queue ();
01196
01197 return retval > 0 ? 1 : 0;
01198 }
01199
01200 int
01201 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
01202 int &error_status,
01203 size_t &transfer_count)
01204 {
01205 transfer_count = 0;
01206
01207
01208 error_status = aio_error (asynch_result);
01209 if (error_status == EINPROGRESS)
01210 return 0;
01211
01212 ssize_t op_return = aio_return (asynch_result);
01213 if (op_return > 0)
01214 transfer_count = ACE_static_cast (size_t, op_return);
01215
01216 return 1;
01217 }
01218
01219 ACE_POSIX_Asynch_Result *
01220 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
01221 size_t &transfer_count,
01222 size_t &index,
01223 size_t &count)
01224 {
01225
01226
01227
01228 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01229
01230 ACE_POSIX_Asynch_Result *asynch_result = 0;
01231
01232 if (num_started_aio_ == 0)
01233 return 0;
01234
01235 for (; count > 0; index++ , count--)
01236 {
01237 if (index >= aiocb_list_max_size_)
01238 index = 0;
01239
01240 if (aiocb_list_[index] == 0)
01241 continue;
01242
01243 if (0 != this->get_result_status (result_list_[index],
01244 error_status,
01245 transfer_count))
01246 break;
01247
01248 }
01249
01250 if (count == 0)
01251 return 0;
01252 asynch_result = result_list_[index];
01253
01254 aiocb_list_[index] = 0;
01255 result_list_[index] = 0;
01256 aiocb_list_cur_size_--;
01257
01258 num_started_aio_--;
01259 index++;
01260 count--;
01261
01262 this->start_deferred_aio ();
01263
01264
01265
01266 return asynch_result;
01267 }
01268
01269
01270 int
01271 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
01272 ACE_POSIX_Proactor::Opcode op)
01273 {
01274 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01275
01276 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01277
01278 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01279
01280 if (result == 0)
01281 return ret_val;
01282
01283
01284 switch (op)
01285 {
01286 case ACE_POSIX_Proactor::READ:
01287 result->aio_lio_opcode = LIO_READ;
01288 break;
01289
01290 case ACE_POSIX_Proactor::WRITE:
01291 result->aio_lio_opcode = LIO_WRITE;
01292 break;
01293
01294 default:
01295 ACE_ERROR_RETURN ((LM_ERROR,
01296 "%N:%l:(%P | %t)::\n"
01297 "start_aio: Invalid operation code\n"),
01298 -1);
01299 }
01300
01301 if (ret_val != 0)
01302 {
01303 errno = EAGAIN;
01304 return -1;
01305 }
01306
01307
01308
01309 ssize_t slot = allocate_aio_slot (result);
01310
01311 if (slot < 0)
01312 return -1;
01313
01314 size_t index = ACE_static_cast (size_t, slot);
01315
01316 result_list_[index] = result;
01317 aiocb_list_cur_size_++;
01318
01319 ret_val = start_aio_i (result);
01320 switch (ret_val)
01321 {
01322 case 0:
01323 aiocb_list_[index] = result;
01324 return 0;
01325
01326 case 1:
01327 num_deferred_aiocb_ ++;
01328 return 0;
01329
01330 default:
01331 break;
01332 }
01333
01334 result_list_[index] = 0;
01335 aiocb_list_cur_size_--;
01336 return -1;
01337 }
01338
01339 ssize_t
01340 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01341 {
01342 size_t i = 0;
01343
01344
01345
01346
01347 if (notify_pipe_read_handle_ == result->aio_fildes)
01348 {
01349 if (result_list_[i] != 0)
01350 {
01351 errno = EAGAIN;
01352 ACE_ERROR_RETURN ((LM_ERROR,
01353 "%N:%l:(%P | %t)::\n"
01354 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01355 "internal Proactor error 0\n"),
01356 -1);
01357 }
01358 }
01359 else
01360 {
01361 for (i= 1; i < this->aiocb_list_max_size_; i++)
01362 if (result_list_[i] == 0)
01363 break;
01364 }
01365
01366 if (i >= this->aiocb_list_max_size_)
01367 ACE_ERROR_RETURN ((LM_ERROR,
01368 "%N:%l:(%P | %t)::\n"
01369 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01370 "internal Proactor error 1\n"),
01371 -1);
01372
01373
01374 result->aio_sigevent.sigev_notify = SIGEV_NONE;
01375
01376 return ACE_static_cast (ssize_t, i);
01377 }
01378
01379
01380
01381
01382
01383
01384 int
01385 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
01386 {
01387 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01388
01389 int ret_val;
01390 const ACE_TCHAR *ptype;
01391
01392
01393
01394 switch (result->aio_lio_opcode )
01395 {
01396 case LIO_READ :
01397 ptype = ACE_LIB_TEXT ("read ");
01398 ret_val = aio_read (result);
01399 break;
01400 case LIO_WRITE :
01401 ptype = ACE_LIB_TEXT ("write");
01402 ret_val = aio_write (result);
01403 break;
01404 default:
01405 ptype = ACE_LIB_TEXT ("?????");
01406 ret_val = -1;
01407 break;
01408 }
01409
01410 if (ret_val == 0)
01411 this->num_started_aio_++;
01412 else
01413 {
01414 if (errno == EAGAIN || errno == ENOMEM)
01415 ret_val = 1;
01416 else
01417 ACE_ERROR ((LM_ERROR,
01418 ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01419 ptype,
01420 ACE_LIB_TEXT ("queueing failed\n")));
01421 }
01422
01423 return ret_val;
01424 }
01425
01426
01427 int
01428 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
01429 {
01430 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440 if (num_deferred_aiocb_ == 0)
01441 return 0;
01442
01443 size_t i = 0;
01444
01445 for (i= 0; i < this->aiocb_list_max_size_; i++)
01446 if (result_list_[i] !=0
01447 && aiocb_list_[i] ==0)
01448 break;
01449
01450 if (i >= this->aiocb_list_max_size_)
01451 ACE_ERROR_RETURN ((LM_ERROR,
01452 "%N:%l:(%P | %t)::\n"
01453 "start_deferred_aio:"
01454 "internal Proactor error 3\n"),
01455 -1);
01456
01457 ACE_POSIX_Asynch_Result *result = result_list_[i];
01458
01459 int ret_val = start_aio_i (result);
01460
01461 switch (ret_val)
01462 {
01463 case 0 :
01464 aiocb_list_[i] = result;
01465 num_deferred_aiocb_ --;
01466 return 0;
01467
01468 case 1 :
01469 return 0;
01470
01471 default :
01472 break;
01473 }
01474
01475
01476
01477 result_list_[i] = 0;
01478 aiocb_list_cur_size_--;
01479
01480 num_deferred_aiocb_ --;
01481
01482 result->set_error (errno);
01483 result->set_bytes_transferred (0);
01484 this->putq_result (result);
01485
01486 return -1;
01487 }
01488
01489 int
01490 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
01491 {
01492
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01506
01507 int num_total = 0;
01508 int num_cancelled = 0;
01509
01510 {
01511 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01512
01513 size_t ai = 0;
01514
01515 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01516 {
01517 if (this->result_list_[ai] == 0)
01518 continue;
01519
01520 if (this->result_list_[ai]->aio_fildes != handle)
01521 continue;
01522
01523 num_total++;
01524
01525 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01526
01527 if (this->aiocb_list_[ai] == 0)
01528 {
01529 num_cancelled++;
01530 this->num_deferred_aiocb_--;
01531
01532 this->aiocb_list_[ai] = 0;
01533 this->result_list_[ai] = 0;
01534 this->aiocb_list_cur_size_--;
01535
01536 asynch_result->set_error (ECANCELED);
01537 asynch_result->set_bytes_transferred (0);
01538 this->putq_result (asynch_result);
01539
01540 }
01541 else
01542 {
01543 int rc_cancel = this->cancel_aiocb (asynch_result);
01544
01545 if (rc_cancel == 0)
01546 num_cancelled++;
01547 }
01548 }
01549
01550 }
01551
01552 if (num_total == 0)
01553 return 1;
01554
01555 if (num_cancelled == num_total)
01556 return 0;
01557
01558 return 2;
01559 }
01560
01561 int
01562 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
01563 {
01564
01565
01566 int rc = ::aio_cancel (0, result);
01567
01568
01569 if (rc == AIO_CANCELED)
01570 return 0;
01571 else if (rc == AIO_ALLDONE)
01572 return 1;
01573 else
01574 return 2;
01575 }
01576
01577
01578
01579
01580 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
01581
01582 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
01583 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01584 ACE_POSIX_Proactor::PROACTOR_SIG)
01585 {
01586
01587
01588
01589
01590
01591 ACE_OS::sigemptyset (&this->RT_completion_signals_);
01592
01593
01594 if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
01595 ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
01596 ACE_LIB_TEXT ("sigaddset")));
01597 this->block_signals ();
01598
01599 this->setup_signal_handler (ACE_SIGRTMIN);
01600
01601
01602
01603
01604
01605 this->get_asynch_pseudo_task().start ();
01606 return;
01607 }
01608
01609 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
01610 size_t max_aio_operations)
01611 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
01612 ACE_POSIX_Proactor::PROACTOR_SIG)
01613 {
01614
01615
01616
01617
01618
01619
01620 if (sigemptyset (&this->RT_completion_signals_) == -1)
01621 ACE_ERROR ((LM_ERROR,
01622 "Error:(%P | %t):%p\n",
01623 "sigemptyset failed"));
01624
01625
01626
01627
01628 int member = 0;
01629 for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
01630 {
01631 member = sigismember (&signal_set,
01632 si);
01633 if (member == -1)
01634 ACE_ERROR ((LM_ERROR,
01635 "%N:%l:(%P | %t)::%p\n",
01636 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01637 "sigismember failed"));
01638 else if (member == 1)
01639 {
01640 sigaddset (&this->RT_completion_signals_, si);
01641 this->setup_signal_handler (si);
01642 }
01643 }
01644
01645
01646 this->block_signals ();
01647
01648
01649
01650
01651
01652 this->get_asynch_pseudo_task().start ();
01653 return;
01654 }
01655
01656 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
01657 {
01658 this->close ();
01659
01660
01661 }
01662
01663 int
01664 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
01665 {
01666
01667 ACE_Countdown_Time countdown (&wait_time);
01668 return this->handle_events_i (&wait_time);
01669 }
01670
01671 int
01672 ACE_POSIX_SIG_Proactor::handle_events (void)
01673 {
01674 return this->handle_events_i (0);
01675 }
01676
01677 int
01678 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
01679 {
01680
01681 pid_t pid = ACE_OS::getpid ();
01682 if (pid == (pid_t) -1)
01683 ACE_ERROR_RETURN ((LM_ERROR,
01684 "Error:%N:%l(%P | %t):%p",
01685 "<getpid> failed"),
01686 -1);
01687
01688
01689 sigval value = { -1 };
01690 #if defined (__FreeBSD__)
01691 value.sigval_int = -1;
01692 #else
01693 value.sival_int = -1;
01694 #endif
01695
01696
01697 if (sigqueue (pid, sig_num, value) == 0)
01698 return 0;
01699
01700 if (errno != EAGAIN)
01701 ACE_ERROR_RETURN ((LM_ERROR,
01702 "Error:%N:%l:(%P | %t):%p\n",
01703 "<sigqueue> failed"),
01704 -1);
01705 return -1;
01706 }
01707
01708 ACE_Asynch_Result_Impl *
01709 ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler,
01710 const void *act,
01711 const ACE_Time_Value &tv,
01712 ACE_HANDLE event,
01713 int priority,
01714 int signal_number)
01715 {
01716 int is_member = 0;
01717
01718
01719 if (signal_number == -1)
01720 {
01721 int si;
01722 for (si = ACE_SIGRTMAX;
01723 (is_member == 0) && (si >= ACE_SIGRTMIN);
01724 si--)
01725 {
01726 is_member = sigismember (&this->RT_completion_signals_,
01727 si);
01728 if (is_member == -1)
01729 ACE_ERROR_RETURN ((LM_ERROR,
01730 "%N:%l:(%P | %t)::%s\n",
01731 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
01732 "sigismember failed"),
01733 0);
01734 }
01735
01736 if (is_member == 0)
01737 ACE_ERROR_RETURN ((LM_ERROR,
01738 "Error:%N:%l:(%P | %t)::%s\n",
01739 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
01740 "Signal mask set empty"),
01741 0);
01742 else
01743
01744 signal_number = si + 1;
01745 }
01746
01747 ACE_Asynch_Result_Impl *implementation;
01748 ACE_NEW_RETURN (implementation,
01749 ACE_POSIX_Asynch_Timer (handler,
01750 act,
01751 tv,
01752 event,
01753 priority,
01754 signal_number),
01755 0);
01756 return implementation;
01757 }
01758
01759
01760 void sig_handler (int sig_num, siginfo_t *, ucontext_t *)
01761 {
01762
01763 ACE_DEBUG ((LM_DEBUG,
01764 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
01765 sig_num));
01766 return;
01767 }
01768
01769 int
01770 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
01771 {
01772
01773
01774
01775
01776
01777
01778
01779
01780
01781
01782
01783 struct sigaction reaction;
01784 sigemptyset (&reaction.sa_mask);
01785 reaction.sa_flags = SA_SIGINFO;
01786 reaction.sa_sigaction = ACE_SIGNAL_C_FUNC(sig_handler);
01787 int sigaction_return = ACE_OS::sigaction (signal_number,
01788 &reaction,
01789 0);
01790 if (sigaction_return == -1)
01791 ACE_ERROR_RETURN ((LM_ERROR,
01792 "Error:%p\n",
01793 "Proactor couldnt do sigaction for the RT SIGNAL"),
01794 -1);
01795 return 0;
01796 }
01797
01798
01799 int
01800 ACE_POSIX_SIG_Proactor::block_signals (void) const
01801 {
01802 return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
01803 }
01804
01805 ssize_t
01806 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
01807 {
01808 size_t i = 0;
01809
01810
01811 for (i = 0; i < this->aiocb_list_max_size_; i++)
01812 if (result_list_[i] == 0)
01813 break;
01814
01815 if (i >= this->aiocb_list_max_size_)
01816 ACE_ERROR_RETURN ((LM_ERROR,
01817 "%N:%l:(%P | %t)::\n"
01818 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
01819 "internal Proactor error 1\n"),
01820 -1);
01821
01822
01823
01824 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
01825 result->aio_sigevent.sigev_signo = result->signal_number ();
01826 #if defined (__FreeBSD__)
01827 result->aio_sigevent.sigev_value.sigval_int = ACE_static_cast (int, i);
01828 #else
01829 result->aio_sigevent.sigev_value.sival_int = ACE_static_cast (int, i);
01830 #endif
01831
01832 return ACE_static_cast (ssize_t, i);
01833 }
01834
01835 int
01836 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
01837 {
01838 int result_sigwait = 0;
01839 siginfo_t sig_info;
01840
01841
01842 if (timeout == 0)
01843 {
01844 result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
01845 &sig_info);
01846 }
01847 else
01848 {
01849 result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
01850 &sig_info,
01851 timeout);
01852 if (result_sigwait == -1 && errno == EAGAIN)
01853 return 0;
01854 }
01855
01856
01857
01858 if (result_sigwait == -1)
01859 return -1;
01860
01861
01862
01863
01864 int flg_aio = 0;
01865
01866 size_t index = 0;
01867 size_t count = 1;
01868 int error_status = 0;
01869 size_t transfer_count = 0;
01870
01871 if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == OS_SUN_56)
01872 {
01873 flg_aio = 1;
01874
01875
01876 #if defined (__FreeBSD__)
01877 index = ACE_static_cast (size_t, sig_info.si_value.sigval_int);
01878 #else
01879 index = ACE_static_cast (size_t, sig_info.si_value.sival_int);
01880 #endif
01881
01882
01883
01884
01885 if (os_id_ == OS_SUN_56)
01886 {
01887
01888
01889
01890
01891
01892
01893
01894 count = aiocb_list_max_size_;
01895 }
01896 }
01897 else if (sig_info.si_code != SI_QUEUE)
01898 {
01899
01900
01901
01902
01903 ACE_ERROR ((LM_DEBUG,
01904 ACE_LIB_TEXT ("%N:%l:(%P | %t): ")
01905 ACE_LIB_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
01906 ACE_LIB_TEXT ("Unexpected signal code (%d) returned ")
01907 ACE_LIB_TEXT ("from sigwait; expecting %d\n"),
01908 result_sigwait, sig_info.si_code));
01909 flg_aio = 1;
01910 }
01911
01912 int ret_aio = 0;
01913 int ret_que = 0;
01914
01915 if (flg_aio)
01916 for (;; ret_aio++)
01917 {
01918 ACE_POSIX_Asynch_Result *asynch_result =
01919 find_completed_aio (error_status,
01920 transfer_count,
01921 index,
01922 count);
01923
01924 if (asynch_result == 0)
01925 break;
01926
01927
01928 this->application_specific_code (asynch_result,
01929 transfer_count,
01930 0,
01931 error_status);
01932 }
01933
01934
01935 ret_que = this->process_result_queue ();
01936
01937
01938
01939 #if 0
01940 ACE_DEBUG ((LM_DEBUG,
01941 "(%t) NumAIO=%d NumQueue=%d\n",
01942 ret_aio, ret_que));
01943 #endif
01944
01945 return ret_aio + ret_que > 0 ? 1 : 0;
01946 }
01947
01948 #endif
01949
01950
01951
01952 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer (ACE_Handler &handler,
01953 const void *act,
01954 const ACE_Time_Value &tv,
01955 ACE_HANDLE event,
01956 int priority,
01957 int signal_number)
01958 : ACE_Asynch_Result_Impl (),
01959 ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
01960 time_ (tv)
01961 {
01962 }
01963
01964 void
01965 ACE_POSIX_Asynch_Timer::complete (size_t ,
01966 int ,
01967 const void * ,
01968 u_long )
01969 {
01970 this->handler_.handle_time_out (this->time_, this->act ());
01971 }
01972
01973
01974
01975
01976 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
01977 const void *act,
01978 ACE_HANDLE event,
01979 int priority,
01980 int signal_number)
01981 : ACE_Asynch_Result_Impl (),
01982 ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
01983 {
01984 }
01985
01986 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
01987 {
01988 }
01989
01990 void
01991 ACE_POSIX_Wakeup_Completion::complete (size_t ,
01992 int ,
01993 const void * ,
01994 u_long )
01995 {
01996
01997 this->handler_.handle_wakeup ();
01998 }
01999
02000
02001 #endif