00001 #include "ace_pch.h"
00002
00003
00004
00005 #include "ace/POSIX_CB_Proactor.h"
00006
00007 #if defined (ACE_HAS_AIO_CALLS) && !defined(__sun) && !defined(__Lynx__)
00008
00009 #include "ace/Task_T.h"
00010 #include "ace/Log_Msg.h"
00011 #include "ace/Object_Manager.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/POSIX_CB_Proactor.i"
00015 #endif
00016
00017 ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations)
00018 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
00019 ACE_POSIX_Proactor::PROACTOR_CB),
00020 sema_ (0)
00021 {
00022
00023
00024
00025 this->get_asynch_pseudo_task ().start ();
00026 }
00027
00028
00029 ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor (void)
00030 {
00031 this->close ();
00032 }
00033
00034 void ACE_POSIX_CB_Proactor::aio_completion_func ( sigval_t cb_data )
00035 {
00036 #if defined (__FreeBSD__)
00037 ACE_POSIX_CB_Proactor * impl = ACE_static_cast (ACE_POSIX_CB_Proactor *, cb_data.sigval_ptr);
00038 #else
00039 ACE_POSIX_CB_Proactor * impl = ACE_static_cast (ACE_POSIX_CB_Proactor *, cb_data.sival_ptr);
00040 #endif
00041
00042 if ( impl != 0 )
00043 impl->notify_completion (0);
00044 }
00045
00046 int
00047 ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value &wait_time)
00048 {
00049
00050 ACE_Countdown_Time countdown (&wait_time);
00051 return this->handle_events_i (wait_time.msec ());
00052 }
00053
00054 int
00055 ACE_POSIX_CB_Proactor::handle_events (void)
00056 {
00057 return this->handle_events_i (ACE_INFINITE);
00058 }
00059
00060 int
00061 ACE_POSIX_CB_Proactor::notify_completion (int sig_num)
00062 {
00063 ACE_UNUSED_ARG (sig_num);
00064
00065 return this->sema_.release();
00066 }
00067
00068
00069 ssize_t
00070 ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
00071 {
00072 ssize_t slot = ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (result);
00073 if (slot == -1)
00074 return -1;
00075
00076
00077
00078
00079 #if defined(__sgi)
00080 result->aio_sigevent.sigev_notify = SIGEV_CALLBACK;
00081 result->aio_sigevent.sigev_func = aio_completion_func ;
00082 #else
00083 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
00084 #endif
00085
00086 #if defined (__FreeBSD__)
00087 result->aio_sigevent.sigev_value.sigval_ptr = this ;
00088 #else
00089 result->aio_sigevent.sigev_value.sival_ptr = this ;
00090 #endif
00091
00092 return slot;
00093 }
00094
00095 int
00096 ACE_POSIX_CB_Proactor::handle_events_i (unsigned long milli_seconds)
00097 {
00098
00099 int result_wait=0;
00100
00101
00102 if (milli_seconds == ACE_INFINITE)
00103 {
00104 result_wait = this->sema_.acquire();
00105 }
00106 else
00107 {
00108
00109 ACE_Time_Value abs_time = ACE_OS::gettimeofday ()
00110 + ACE_Time_Value ( milli_seconds);
00111
00112 result_wait = this->sema_.acquire(abs_time);
00113 }
00114
00115
00116
00117
00118 if (result_wait == -1)
00119 {
00120 if (errno != ETIME &&
00121 errno != EINTR )
00122 ACE_ERROR ((LM_ERROR,
00123 "%N:%l:(%P | %t)::%p\n",
00124 "ACE_POSIX_CB_Proactor::handle_events:"
00125 "semaphore acquire failed"
00126 ));
00127 }
00128
00129 size_t index = 0;
00130 size_t count = this->aiocb_list_max_size_;
00131
00132 int error_status = 0;
00133 size_t return_status = 0;
00134
00135 int ret_aio = 0;
00136 int ret_que = 0;
00137
00138 for (;; ret_aio++)
00139 {
00140 ACE_POSIX_Asynch_Result * asynch_result =
00141 this->find_completed_aio (error_status,
00142 return_status,
00143 index,
00144 count);
00145
00146 if (asynch_result == 0)
00147 break;
00148
00149
00150 this->application_specific_code (asynch_result,
00151 return_status,
00152 0,
00153 error_status);
00154 }
00155
00156
00157 ret_que = this->process_result_queue ();
00158
00159
00160
00161
00162
00163
00164
00165 return ret_aio + ret_que > 0 ? 1 : 0;
00166 }
00167
00168 #endif