00001 #include "ace_pch.h"
00002
00003
00004
00005 #include "ace/SUN_Proactor.h"
00006
00007 #if defined (ACE_HAS_AIO_CALLS) && defined (sun)
00008
00009 #include "ace/Task_T.h"
00010 #include "ace/Log_Msg.h"
00011 #include "ace/Object_Manager.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "ace/SUN_Proactor.i"
00015 #endif
00016
00017 ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
00018 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
00019 ACE_POSIX_Proactor::PROACTOR_SUN),
00020 condition_ (mutex_)
00021 {
00022
00023 create_notify_manager ();
00024
00025
00026
00027
00028 this->get_asynch_pseudo_task ().start ();
00029 }
00030
00031
00032 ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
00033 {
00034 this->close ();
00035 }
00036
00037 int
00038 ACE_SUN_Proactor::handle_events (ACE_Time_Value &wait_time)
00039 {
00040
00041 ACE_Countdown_Time countdown (&wait_time);
00042 return this->handle_events_i (&wait_time);
00043 }
00044
00045 int
00046 ACE_SUN_Proactor::handle_events (void)
00047 {
00048 return this->handle_events_i (0);
00049 }
00050
00051 int ACE_SUN_Proactor::wait_for_start (ACE_Time_Value * abstime)
00052 {
00053 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
00054
00055 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, -1));
00056
00057 if (this->num_started_aio_ != 0)
00058 return 0;
00059
00060 return this->condition_.wait (abstime);
00061
00062 #else
00063
00064 return 0;
00065
00066 #endif
00067 }
00068
00069 int
00070 ACE_SUN_Proactor::handle_events_i (ACE_Time_Value *delta)
00071 {
00072 int retval = 0;
00073 aio_result_t *result = 0;
00074
00075 if (0 == delta)
00076 {
00077 if (this->num_started_aio_ == 0)
00078 this->wait_for_start (0);
00079
00080 result = aiowait (0);
00081 }
00082 else
00083 {
00084 if (this->num_started_aio_ == 0)
00085 {
00086
00087 ACE_Countdown_Time countdown (delta);
00088 ACE_Time_Value tv (*delta);
00089 tv += ACE_OS::gettimeofday ();
00090 if (this->wait_for_start (&tv) == -1)
00091 return -1;
00092 }
00093 struct timeval delta_tv = *delta;
00094 result = aiowait (&delta_tv);
00095 }
00096
00097 if (result == 0)
00098 {
00099
00100
00101 }
00102 else if (ACE_reinterpret_cast (long, result) == -1)
00103 {
00104
00105 switch (errno)
00106 {
00107 case EINTR :
00108 case EINVAL:
00109 break;
00110
00111 default:
00112 ACE_ERROR_RETURN ((LM_ERROR,
00113 "%N:%l:(%P | %t)::%p \nNumAIO=%d\n",
00114 "ACE_SUN_Proactor::handle_events: aiowait failed",
00115 num_started_aio_),
00116 -1);
00117 }
00118 }
00119 else
00120 {
00121 int error_status = 0;
00122 size_t transfer_count = 0;
00123
00124 ACE_POSIX_Asynch_Result *asynch_result =
00125 find_completed_aio (result,
00126 error_status,
00127 transfer_count);
00128
00129 if (asynch_result != 0)
00130 {
00131
00132 this->application_specific_code (asynch_result,
00133 transfer_count,
00134 0,
00135 error_status);
00136 retval++;
00137 }
00138 }
00139
00140
00141 retval += this->process_result_queue ();
00142
00143 return retval > 0 ? 1 : 0 ;
00144
00145 }
00146
00147 int
00148 ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
00149 int &error_status,
00150 size_t &transfer_count)
00151 {
00152
00153
00154 error_status = asynch_result->aio_resultp.aio_errno;
00155 ssize_t op_return = asynch_result->aio_resultp.aio_return;
00156
00157
00158
00159
00160
00161
00162
00163
00164 if (error_status == EINPROGRESS || op_return == AIO_INPROGRESS)
00165 return 0;
00166
00167 if (op_return < 0)
00168 transfer_count = 0;
00169 else
00170 transfer_count = ACE_static_cast (size_t, op_return);
00171
00172 return 1;
00173 }
00174
00175 ACE_POSIX_Asynch_Result *
00176 ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
00177 int &error_status,
00178 size_t &transfer_count)
00179 {
00180 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0));
00181
00182 size_t ai;
00183 error_status = -1;
00184 transfer_count = 0;
00185
00186
00187
00188 for (ai = 0; ai < aiocb_list_max_size_; ai++)
00189 if (aiocb_list_[ai] != 0 &&
00190 result == &aiocb_list_[ai]->aio_resultp)
00191 break;
00192
00193 if (ai >= aiocb_list_max_size_)
00194 return 0;
00195
00196 ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
00197
00198 if (this->get_result_status (asynch_result,
00199 error_status,
00200 transfer_count) == 0)
00201 {
00202 ACE_ERROR ((LM_ERROR,
00203 "%N:%l:(%P | %t)::%p\n",
00204 "ACE_SUN_Proactor::find_completed_aio:"
00205 "should never be !!!\n"));
00206 return 0;
00207 }
00208
00209 aiocb_list_[ai] = 0;
00210 result_list_[ai] = 0;
00211 aiocb_list_cur_size_--;
00212
00213 num_started_aio_--;
00214
00215 start_deferred_aio ();
00216
00217
00218
00219 return asynch_result;
00220 }
00221
00222
00223
00224
00225
00226
00227 int
00228 ACE_SUN_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
00229 {
00230 ACE_TRACE ("ACE_SUN_Proactor::start_aio_i");
00231
00232 int ret_val;
00233 const ACE_TCHAR *ptype;
00234
00235
00236
00237
00238
00239
00240
00241 result->aio_resultp.aio_return = AIO_INPROGRESS;
00242 result->aio_resultp.aio_errno = EINPROGRESS;
00243
00244
00245 switch (result->aio_lio_opcode)
00246 {
00247 case LIO_READ :
00248 ptype = ACE_LIB_TEXT ("read");
00249 ret_val = aioread (result->aio_fildes,
00250 (char *) result->aio_buf,
00251 result->aio_nbytes,
00252 result->aio_offset,
00253 SEEK_SET,
00254 &result->aio_resultp);
00255 break;
00256
00257 case LIO_WRITE :
00258 ptype = ACE_LIB_TEXT ("write");
00259 ret_val = aiowrite (result->aio_fildes,
00260 (char *) result->aio_buf,
00261 result->aio_nbytes,
00262 result->aio_offset,
00263 SEEK_SET,
00264 &result->aio_resultp);
00265 break;
00266
00267 default:
00268 ptype = ACE_LIB_TEXT ("?????");
00269 ret_val = -1;
00270 break;
00271 }
00272
00273 if (ret_val == 0)
00274 {
00275 this->num_started_aio_++;
00276 if (this->num_started_aio_ == 1)
00277 this->condition_.broadcast ();
00278 }
00279 else
00280 {
00281 if (errno == EAGAIN || errno == ENOMEM)
00282 ret_val = 1;
00283 else
00284 ACE_ERROR ((LM_ERROR,
00285 ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio: aio%s %p\n"),
00286 ptype,
00287 ACE_LIB_TEXT ("queueing failed\n")));
00288 }
00289
00290 return ret_val;
00291 }
00292
00293 int
00294 ACE_SUN_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result *result)
00295 {
00296 ACE_TRACE ("ACE_SUN_Proactor::cancel_aiocb");
00297 int rc = ::aiocancel (&result->aio_resultp);
00298 if (rc == 0)
00299 {
00300
00301
00302
00303
00304
00305 result->set_error (ECANCELED);
00306 result->set_bytes_transferred (0);
00307 this->putq_result (result);
00308 return 0;
00309 }
00310
00311 return 2;
00312 }
00313
00314 #endif