00001 #include "ace_pch.h"
00002
00003
00004
00005
00006 #include "ace/WIN32_Proactor.h"
00007
00008 #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
00009
00010
00011 #include "ace/Log_Msg.h"
00012 #include "ace/Object_Manager.h"
00013
00014
00015
00016
00017
00018
00019
00020
00021 class ACE_Export ACE_WIN32_Wakeup_Completion : public ACE_WIN32_Asynch_Result
00022 {
00023
00024 public:
00025
00026 ACE_WIN32_Wakeup_Completion (ACE_Handler &handler,
00027 const void *act = 0,
00028 ACE_HANDLE event = ACE_INVALID_HANDLE,
00029 int priority = 0,
00030 int signal_number = ACE_SIGRTMIN);
00031
00032
00033 virtual ~ACE_WIN32_Wakeup_Completion (void);
00034
00035
00036 virtual void complete (size_t bytes_transferred = 0,
00037 int success = 1,
00038 const void *completion_key = 0,
00039 u_long error = 0);
00040 };
00041
00042 ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads,
00043 int used_with_reactor_event_loop)
00044 : completion_port_ (0),
00045
00046 number_of_threads_ (ACE_static_cast (DWORD, number_of_threads)),
00047 used_with_reactor_event_loop_ (used_with_reactor_event_loop)
00048 {
00049
00050 this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
00051 0,
00052 0,
00053 this->number_of_threads_);
00054 if (this->completion_port_ == 0)
00055 ACE_ERROR ((LM_ERROR,
00056 ACE_LIB_TEXT ("%p\n"),
00057 ACE_LIB_TEXT ("CreateIoCompletionPort")));
00058
00059 this->get_asynch_pseudo_task ().start ();
00060 }
00061
00062 ACE_WIN32_Proactor::~ACE_WIN32_Proactor (void)
00063 {
00064 this->get_asynch_pseudo_task ().stop ();
00065
00066 this->close ();
00067 }
00068
00069 ACE_Asynch_Pseudo_Task &
00070 ACE_WIN32_Proactor::get_asynch_pseudo_task ()
00071 {
00072 return this->pseudo_task_;
00073 }
00074
00075 int
00076 ACE_WIN32_Proactor::close (void)
00077 {
00078
00079 if (this->completion_port_ != 0)
00080 {
00081
00082
00083 for (;;)
00084 {
00085 ACE_OVERLAPPED *overlapped = 0;
00086 u_long bytes_transferred = 0;
00087 #if defined (ACE_WIN64)
00088 ULONG_PTR completion_key = 0;
00089 #else
00090 ULONG completion_key = 0;
00091 #endif
00092
00093
00094 BOOL res = ::GetQueuedCompletionStatus
00095 (this->completion_port_,
00096 &bytes_transferred,
00097 &completion_key,
00098 &overlapped,
00099 0);
00100
00101 if (overlapped == 0)
00102 break;
00103
00104 ACE_WIN32_Asynch_Result *asynch_result =
00105 (ACE_WIN32_Asynch_Result *) overlapped;
00106
00107 delete asynch_result;
00108 }
00109
00110 int result = ACE_OS::close (this->completion_port_);
00111 this->completion_port_ = 0;
00112 return result;
00113 }
00114
00115 return 0;
00116 }
00117
00118 int
00119 ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle,
00120 const void *completion_key)
00121 {
00122 #if defined (ACE_WIN64)
00123 ULONG_PTR comp_key (ACE_static_cast (ULONG_PTR, completion_key));
00124 #else
00125 ULONG comp_key (ACE_reinterpret_cast (ULONG, completion_key));
00126 #endif
00127
00128
00129 ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
00130 this->completion_port_,
00131 comp_key,
00132 this->number_of_threads_);
00133 if (cp == 0)
00134 {
00135 ACE_OS::set_errno_to_last_error ();
00136
00137
00138 if (errno != ERROR_INVALID_PARAMETER)
00139 {
00140 if (ACE::debug ())
00141 {
00142 ACE_DEBUG ((LM_ERROR,
00143 ACE_LIB_TEXT ("%p\n"),
00144 ACE_LIB_TEXT ("CreateIoCompletionPort")));
00145 }
00146 return -1;
00147 }
00148 }
00149 return 0;
00150 }
00151
00152 ACE_Asynch_Read_Stream_Impl *
00153 ACE_WIN32_Proactor::create_asynch_read_stream (void)
00154 {
00155 ACE_Asynch_Read_Stream_Impl *implementation = 0;
00156 ACE_NEW_RETURN (implementation,
00157 ACE_WIN32_Asynch_Read_Stream (this),
00158 0);
00159 return implementation;
00160 }
00161
00162 ACE_Asynch_Write_Stream_Impl *
00163 ACE_WIN32_Proactor::create_asynch_write_stream (void)
00164 {
00165 ACE_Asynch_Write_Stream_Impl *implementation = 0;
00166 ACE_NEW_RETURN (implementation,
00167 ACE_WIN32_Asynch_Write_Stream (this),
00168 0);
00169 return implementation;
00170 }
00171
00172 ACE_Asynch_Read_Dgram_Impl *
00173 ACE_WIN32_Proactor::create_asynch_read_dgram (void)
00174 {
00175 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
00176 ACE_NEW_RETURN (implementation,
00177 ACE_WIN32_Asynch_Read_Dgram (this),
00178 0);
00179 return implementation;
00180 }
00181
00182 ACE_Asynch_Write_Dgram_Impl *
00183 ACE_WIN32_Proactor::create_asynch_write_dgram (void)
00184 {
00185 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
00186 ACE_NEW_RETURN (implementation,
00187 ACE_WIN32_Asynch_Write_Dgram (this),
00188 0);
00189 return implementation;
00190 }
00191
00192 ACE_Asynch_Read_File_Impl *
00193 ACE_WIN32_Proactor::create_asynch_read_file (void)
00194 {
00195 ACE_Asynch_Read_File_Impl *implementation = 0;
00196 ACE_NEW_RETURN (implementation,
00197 ACE_WIN32_Asynch_Read_File (this),
00198 0);
00199 return implementation;
00200 }
00201
00202 ACE_Asynch_Write_File_Impl *
00203 ACE_WIN32_Proactor::create_asynch_write_file (void)
00204 {
00205 ACE_Asynch_Write_File_Impl *implementation = 0;
00206 ACE_NEW_RETURN (implementation,
00207 ACE_WIN32_Asynch_Write_File (this),
00208 0);
00209 return implementation;
00210 }
00211
00212 ACE_Asynch_Accept_Impl *
00213 ACE_WIN32_Proactor::create_asynch_accept (void)
00214 {
00215 ACE_Asynch_Accept_Impl *implementation = 0;
00216 ACE_NEW_RETURN (implementation,
00217 ACE_WIN32_Asynch_Accept (this),
00218 0);
00219 return implementation;
00220 }
00221
00222 ACE_Asynch_Connect_Impl *
00223 ACE_WIN32_Proactor::create_asynch_connect (void)
00224 {
00225 ACE_Asynch_Connect_Impl *implementation = 0;
00226 ACE_NEW_RETURN (implementation,
00227 ACE_WIN32_Asynch_Connect (this),
00228 0);
00229 return implementation;
00230 }
00231
00232 ACE_Asynch_Transmit_File_Impl *
00233 ACE_WIN32_Proactor::create_asynch_transmit_file (void)
00234 {
00235 ACE_Asynch_Transmit_File_Impl *implementation = 0;
00236 ACE_NEW_RETURN (implementation,
00237 ACE_WIN32_Asynch_Transmit_File (this),
00238 0);
00239 return implementation;
00240 }
00241
00242 ACE_Asynch_Read_Stream_Result_Impl *
00243 ACE_WIN32_Proactor::create_asynch_read_stream_result (ACE_Handler &handler,
00244 ACE_HANDLE handle,
00245 ACE_Message_Block &message_block,
00246 size_t bytes_to_read,
00247 const void* act,
00248 ACE_HANDLE event,
00249 int priority,
00250 int signal_number)
00251 {
00252 ACE_Asynch_Read_Stream_Result_Impl *implementation = 0;
00253 ACE_NEW_RETURN (implementation,
00254 ACE_WIN32_Asynch_Read_Stream_Result (handler,
00255 handle,
00256 message_block,
00257 bytes_to_read,
00258 act,
00259 event,
00260 priority,
00261 signal_number),
00262 0);
00263 return implementation;
00264 }
00265
00266 ACE_Asynch_Write_Stream_Result_Impl *
00267 ACE_WIN32_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
00268 ACE_HANDLE handle,
00269 ACE_Message_Block &message_block,
00270 size_t bytes_to_write,
00271 const void* act,
00272 ACE_HANDLE event,
00273 int priority,
00274 int signal_number)
00275 {
00276 ACE_Asynch_Write_Stream_Result_Impl *implementation = 0;
00277 ACE_NEW_RETURN (implementation,
00278 ACE_WIN32_Asynch_Write_Stream_Result (handler,
00279 handle,
00280 message_block,
00281 bytes_to_write,
00282 act,
00283 event,
00284 priority,
00285 signal_number),
00286 0);
00287 return implementation;
00288 }
00289
00290 ACE_Asynch_Read_File_Result_Impl *
00291 ACE_WIN32_Proactor::create_asynch_read_file_result (ACE_Handler &handler,
00292 ACE_HANDLE handle,
00293 ACE_Message_Block &message_block,
00294 size_t bytes_to_read,
00295 const void* act,
00296 u_long offset,
00297 u_long offset_high,
00298 ACE_HANDLE event,
00299 int priority,
00300 int signal_number)
00301 {
00302 ACE_Asynch_Read_File_Result_Impl *implementation = 0;
00303 ACE_NEW_RETURN (implementation,
00304 ACE_WIN32_Asynch_Read_File_Result (handler,
00305 handle,
00306 message_block,
00307 bytes_to_read,
00308 act,
00309 offset,
00310 offset_high,
00311 event,
00312 priority,
00313 signal_number),
00314 0);
00315 return implementation;
00316 }
00317
00318 ACE_Asynch_Write_File_Result_Impl *
00319 ACE_WIN32_Proactor::create_asynch_write_file_result (ACE_Handler &handler,
00320 ACE_HANDLE handle,
00321 ACE_Message_Block &message_block,
00322 size_t bytes_to_write,
00323 const void* act,
00324 u_long offset,
00325 u_long offset_high,
00326 ACE_HANDLE event,
00327 int priority,
00328 int signal_number)
00329 {
00330 ACE_Asynch_Write_File_Result_Impl *implementation = 0;
00331 ACE_NEW_RETURN (implementation,
00332 ACE_WIN32_Asynch_Write_File_Result (handler,
00333 handle,
00334 message_block,
00335 bytes_to_write,
00336 act,
00337 offset,
00338 offset_high,
00339 event,
00340 priority,
00341 signal_number),
00342 0);
00343 return implementation;
00344 }
00345
00346 ACE_Asynch_Read_Dgram_Result_Impl *
00347 ACE_WIN32_Proactor::create_asynch_read_dgram_result (ACE_Handler &handler,
00348 ACE_HANDLE handle,
00349 ACE_Message_Block *message_block,
00350 size_t bytes_to_read,
00351 int flags,
00352 int protocol_family,
00353 const void* act,
00354 ACE_HANDLE event ,
00355 int priority ,
00356 int signal_number)
00357 {
00358 ACE_Asynch_Read_Dgram_Result_Impl *implementation = 0;
00359 ACE_NEW_RETURN (implementation,
00360 ACE_WIN32_Asynch_Read_Dgram_Result (handler,
00361 handle,
00362 message_block,
00363 bytes_to_read,
00364 flags,
00365 protocol_family,
00366 act,
00367 event,
00368 priority,
00369 signal_number),
00370 0);
00371 return implementation;
00372 }
00373
00374 ACE_Asynch_Write_Dgram_Result_Impl *
00375 ACE_WIN32_Proactor::create_asynch_write_dgram_result (ACE_Handler &handler,
00376 ACE_HANDLE handle,
00377 ACE_Message_Block *message_block,
00378 size_t bytes_to_read,
00379 int flags,
00380 const void* act,
00381 ACE_HANDLE event ,
00382 int priority ,
00383 int signal_number)
00384 {
00385 ACE_Asynch_Write_Dgram_Result_Impl *implementation = 0;
00386 ACE_NEW_RETURN (implementation,
00387 ACE_WIN32_Asynch_Write_Dgram_Result(handler,
00388 handle,
00389 message_block,
00390 bytes_to_read,
00391 flags,
00392 act,
00393 event,
00394 priority,
00395 signal_number),
00396 0);
00397 return implementation;
00398 }
00399
00400 ACE_Asynch_Accept_Result_Impl *
00401 ACE_WIN32_Proactor::create_asynch_accept_result (ACE_Handler &handler,
00402 ACE_HANDLE listen_handle,
00403 ACE_HANDLE accept_handle,
00404 ACE_Message_Block &message_block,
00405 size_t bytes_to_read,
00406 const void* act,
00407 ACE_HANDLE event,
00408 int priority,
00409 int signal_number)
00410 {
00411 ACE_Asynch_Accept_Result_Impl *implementation = 0;
00412 ACE_NEW_RETURN (implementation,
00413 ACE_WIN32_Asynch_Accept_Result (handler,
00414 listen_handle,
00415 accept_handle,
00416 message_block,
00417 bytes_to_read,
00418 act,
00419 event,
00420 priority,
00421 signal_number),
00422 0);
00423 return implementation;
00424 }
00425
00426 ACE_Asynch_Connect_Result_Impl *
00427 ACE_WIN32_Proactor::create_asynch_connect_result (ACE_Handler & handler,
00428 ACE_HANDLE connect_handle,
00429 const void *act,
00430 ACE_HANDLE event,
00431 int priority ,
00432 int signal_number)
00433 {
00434 ACE_Asynch_Connect_Result_Impl *implementation = 0;
00435 ACE_NEW_RETURN (implementation,
00436 ACE_WIN32_Asynch_Connect_Result (handler,
00437 connect_handle,
00438 act,
00439 event,
00440 priority,
00441 signal_number),
00442 0);
00443 return implementation;
00444 }
00445
00446 ACE_Asynch_Transmit_File_Result_Impl *
00447 ACE_WIN32_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
00448 ACE_HANDLE socket,
00449 ACE_HANDLE file,
00450 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
00451 size_t bytes_to_write,
00452 u_long offset,
00453 u_long offset_high,
00454 size_t bytes_per_send,
00455 u_long flags,
00456 const void *act,
00457 ACE_HANDLE event,
00458 int priority,
00459 int signal_number)
00460 {
00461 ACE_Asynch_Transmit_File_Result_Impl *implementation = 0;
00462 ACE_NEW_RETURN (implementation,
00463 ACE_WIN32_Asynch_Transmit_File_Result (handler,
00464 socket,
00465 file,
00466 header_and_trailer,
00467 bytes_to_write,
00468 offset,
00469 offset_high,
00470 bytes_per_send,
00471 flags,
00472 act,
00473 event,
00474 priority,
00475 signal_number),
00476 0);
00477 return implementation;
00478 }
00479
00480 ACE_Asynch_Result_Impl *
00481 ACE_WIN32_Proactor::create_asynch_timer (ACE_Handler &handler,
00482 const void *act,
00483 const ACE_Time_Value &tv,
00484 ACE_HANDLE event,
00485 int priority,
00486 int signal_number)
00487 {
00488 ACE_Asynch_Result_Impl *implementation = 0;
00489 ACE_NEW_RETURN (implementation,
00490 ACE_WIN32_Asynch_Timer (handler,
00491 act,
00492 tv,
00493 event,
00494 priority,
00495 signal_number),
00496 0);
00497 return implementation;
00498 }
00499
00500 int
00501 ACE_WIN32_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
00502 {
00503
00504
00505
00506 int result = 0;
00507
00508 for (ACE_Time_Value timeout (0, 0);
00509 ;
00510 )
00511 {
00512 result = this->handle_events (timeout);
00513
00514 if (result != 1)
00515 break;
00516 }
00517
00518
00519
00520 return result == -1 ? -1 : 0;
00521 }
00522
00523 int
00524 ACE_WIN32_Proactor::handle_close (ACE_HANDLE handle,
00525 ACE_Reactor_Mask close_mask)
00526 {
00527 ACE_UNUSED_ARG (close_mask);
00528 ACE_UNUSED_ARG (handle);
00529
00530 return this->close ();
00531 }
00532
00533 ACE_HANDLE
00534 ACE_WIN32_Proactor::get_handle (void) const
00535 {
00536 if (this->used_with_reactor_event_loop_)
00537 return this->event_.handle ();
00538 else
00539 return 0;
00540 }
00541
00542 int
00543 ACE_WIN32_Proactor::handle_events (ACE_Time_Value &wait_time)
00544 {
00545
00546 ACE_Countdown_Time countdown (&wait_time);
00547 return this->handle_events (wait_time.msec ());
00548 }
00549
00550 int
00551 ACE_WIN32_Proactor::handle_events (void)
00552 {
00553 return this->handle_events (ACE_INFINITE);
00554 }
00555
00556 int
00557 ACE_WIN32_Proactor::handle_events (unsigned long milli_seconds)
00558 {
00559 ACE_OVERLAPPED *overlapped = 0;
00560 u_long bytes_transferred = 0;
00561 #if defined (ACE_WIN64)
00562 ULONG_PTR completion_key = 0;
00563 #else
00564 ULONG completion_key = 0;
00565 #endif
00566
00567
00568 BOOL result = ::GetQueuedCompletionStatus (this->completion_port_,
00569 &bytes_transferred,
00570 &completion_key,
00571 &overlapped,
00572 milli_seconds);
00573 if (result == FALSE && overlapped == 0)
00574 {
00575 ACE_OS::set_errno_to_last_error ();
00576
00577 switch (errno)
00578 {
00579 case WAIT_TIMEOUT:
00580 errno = ETIME;
00581 return 0;
00582
00583 case ERROR_SUCCESS:
00584
00585
00586
00587 return 0;
00588
00589 default:
00590 if (ACE::debug ())
00591 ACE_DEBUG ((LM_ERROR,
00592 ACE_LIB_TEXT ("%p\n"),
00593 ACE_LIB_TEXT ("GetQueuedCompletionStatus")));
00594 return -1;
00595 }
00596 }
00597 else
00598 {
00599
00600 ACE_WIN32_Asynch_Result *asynch_result = (ACE_WIN32_Asynch_Result *) overlapped;
00601
00602
00603 if (result == FALSE)
00604 ACE_OS::set_errno_to_last_error ();
00605 else
00606 errno = 0;
00607
00608 u_long result_err = asynch_result->error ();
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624 if ( result_err == 0 )
00625 result_err = errno ;
00626
00627 this->application_specific_code (asynch_result,
00628 ACE_static_cast (size_t,
00629 bytes_transferred),
00630 (void *) completion_key,
00631 result_err);
00632 }
00633 return 1;
00634 }
00635
00636 void
00637 ACE_WIN32_Proactor::application_specific_code (ACE_WIN32_Asynch_Result *asynch_result,
00638 size_t bytes_transferred,
00639 const void *completion_key,
00640 u_long error)
00641 {
00642 ACE_SEH_TRY
00643 {
00644
00645 asynch_result->complete (bytes_transferred,
00646 error ? 0 : 1,
00647 (void *) completion_key,
00648 error);
00649 }
00650 ACE_SEH_FINALLY
00651 {
00652
00653 delete asynch_result;
00654 }
00655 }
00656
00657 int
00658 ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result)
00659 {
00660
00661 HANDLE handle = this->get_handle ();
00662
00663
00664 if (handle != ACE_INVALID_HANDLE &&
00665 handle != 0)
00666 ACE_OS::event_signal (&handle);
00667
00668
00669
00670
00671
00672
00673
00674 DWORD bytes_transferred = 0;
00675 const void * completion_key = 0 ;
00676
00677 if ( result != 0 )
00678 {
00679
00680
00681 bytes_transferred = ACE_static_cast (DWORD,
00682 result->bytes_transferred ());
00683 completion_key = result->completion_key();
00684 }
00685 #if defined (ACE_WIN64)
00686 ULONG_PTR comp_key (ACE_static_cast (ULONG_PTR, completion_key));
00687 #else
00688 ULONG comp_key (ACE_reinterpret_cast (ULONG, completion_key));
00689 #endif
00690
00691
00692 if (::PostQueuedCompletionStatus (this->completion_port_,
00693 bytes_transferred,
00694 comp_key,
00695 result
00696 ) == FALSE)
00697 {
00698 delete result;
00699
00700 if (ACE::debug ())
00701 {
00702 ACE_DEBUG ((LM_ERROR,
00703 ACE_LIB_TEXT ("%p\n"),
00704 ACE_LIB_TEXT ("PostQueuedCompletionStatus failed")));
00705 }
00706 return -1;
00707 }
00708
00709 return 0;
00710 }
00711
00712 int
00713 ACE_WIN32_Proactor::post_wakeup_completions (int how_many)
00714 {
00715 ACE_WIN32_Wakeup_Completion *wakeup_completion = 0;
00716
00717 for (ssize_t ci = 0; ci < how_many; ci++)
00718 {
00719 ACE_NEW_RETURN (wakeup_completion,
00720 ACE_WIN32_Wakeup_Completion (this->wakeup_handler_),
00721 -1);
00722
00723 if (wakeup_completion->post_completion (this) == -1)
00724 return -1;
00725 }
00726
00727 return 0;
00728 }
00729
00730 int
00731 ACE_WIN32_Proactor::wake_up_dispatch_threads (void)
00732 {
00733 return 0;
00734 }
00735
00736 int
00737 ACE_WIN32_Proactor::close_dispatch_threads (int)
00738 {
00739 return 0;
00740 }
00741
00742 size_t
00743 ACE_WIN32_Proactor::number_of_threads (void) const
00744 {
00745 return ACE_static_cast (size_t, this->number_of_threads_);
00746 }
00747
00748 void
00749 ACE_WIN32_Proactor::number_of_threads (size_t threads)
00750 {
00751 this->number_of_threads_ = ACE_static_cast (DWORD, threads);
00752 }
00753
00754 ACE_WIN32_Asynch_Timer::ACE_WIN32_Asynch_Timer (ACE_Handler &handler,
00755 const void *act,
00756 const ACE_Time_Value &tv,
00757 ACE_HANDLE event,
00758 int priority,
00759 int signal_number)
00760 : ACE_Asynch_Result_Impl (),
00761 ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority,
00762 signal_number),
00763 time_ (tv)
00764 {
00765 }
00766
00767 void
00768 ACE_WIN32_Asynch_Timer::complete (size_t bytes_transferred,
00769 int success,
00770 const void *completion_key,
00771 u_long error)
00772 {
00773 ACE_UNUSED_ARG (error);
00774 ACE_UNUSED_ARG (completion_key);
00775 ACE_UNUSED_ARG (success);
00776 ACE_UNUSED_ARG (bytes_transferred);
00777
00778 this->handler_.handle_time_out (this->time_, this->act ());
00779 }
00780
00781 ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion (ACE_Handler &handler,
00782 const void *act,
00783 ACE_HANDLE event,
00784 int priority,
00785 int signal_number)
00786 : ACE_Asynch_Result_Impl (),
00787 ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
00788 {
00789 }
00790
00791 ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion (void)
00792 {
00793 }
00794
00795 void
00796 ACE_WIN32_Wakeup_Completion::complete (size_t ,
00797 int ,
00798 const void * ,
00799 u_long )
00800 {
00801 this->handler_.handle_wakeup ();
00802 }
00803
00804 #endif