Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

POSIX_Asynch_IO.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 /* -*- C++ -*- */
00003 // $Id: POSIX_Asynch_IO.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $
00004 
00005 #include "ace/POSIX_Asynch_IO.h"
00006 
00007 #if defined (ACE_HAS_AIO_CALLS)
00008 
00009 #include "ace/Proactor.h"
00010 #include "ace/Message_Block.h"
00011 #include "ace/INET_Addr.h"
00012 #include "ace/Asynch_Pseudo_Task.h"
00013 #include "ace/POSIX_Proactor.h"
00014 
00015 #if !defined (__ACE_INLINE__)
00016 #include "ace/POSIX_Asynch_IO.i"
00017 #endif /* __ACE_INLINE__ */
00018 
00019 size_t
00020 ACE_POSIX_Asynch_Result::bytes_transferred (void) const
00021 {
00022   return this->bytes_transferred_;
00023 }
00024 
00025 void
00026 ACE_POSIX_Asynch_Result::set_bytes_transferred (size_t nbytes)
00027 {
00028   this->bytes_transferred_= nbytes;
00029 }
00030 
00031 const void *
00032 ACE_POSIX_Asynch_Result::act (void) const
00033 {
00034   return this->act_;
00035 }
00036 
00037 int
00038 ACE_POSIX_Asynch_Result::success (void) const
00039 {
00040   return this->success_;
00041 }
00042 
00043 const void *
00044 ACE_POSIX_Asynch_Result::completion_key (void) const
00045 {
00046   return this->completion_key_;
00047 }
00048 
00049 u_long
00050 ACE_POSIX_Asynch_Result::error (void) const
00051 {
00052   return this->error_;
00053 }
00054 
00055 void
00056 ACE_POSIX_Asynch_Result::set_error (u_long errcode)
00057 {
00058   this->error_=errcode;
00059 }
00060 ACE_HANDLE
00061 ACE_POSIX_Asynch_Result::event (void) const
00062 {
00063   return ACE_INVALID_HANDLE;
00064 }
00065 
00066 u_long
00067 ACE_POSIX_Asynch_Result::offset (void) const
00068 {
00069   return this->aio_offset;
00070 }
00071 
00072 u_long
00073 ACE_POSIX_Asynch_Result::offset_high (void) const
00074 {
00075   //
00076   // @@ Support aiocb64??
00077   //
00078   ACE_NOTSUP_RETURN (0);
00079 }
00080 
00081 int
00082 ACE_POSIX_Asynch_Result::priority (void) const
00083 {
00084   return this->aio_reqprio;
00085 }
00086 
00087 int
00088 ACE_POSIX_Asynch_Result::signal_number (void) const
00089 {
00090   return this->aio_sigevent.sigev_signo;
00091 }
00092 
00093 int
00094 ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl)
00095 {
00096   // Get to the platform specific implementation.
00097   ACE_POSIX_Proactor *posix_proactor = ACE_dynamic_cast (ACE_POSIX_Proactor *,
00098                                                          proactor_impl);
00099 
00100   if (posix_proactor == 0)
00101     ACE_ERROR_RETURN ((LM_ERROR, "Dynamic cast to POSIX Proactor failed\n"), -1);
00102 
00103   // Post myself.
00104   return posix_proactor->post_completion (this);
00105 }
00106 
00107 ACE_POSIX_Asynch_Result::~ACE_POSIX_Asynch_Result (void)
00108 {
00109 }
00110 
00111 ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result (ACE_Handler &handler,
00112                                                   const void* act,
00113                                                   ACE_HANDLE event,
00114                                                   u_long offset,
00115                                                   u_long offset_high,
00116                                                   int priority,
00117                                                   int signal_number)
00118   : ACE_Asynch_Result_Impl (),
00119     aiocb (),
00120     handler_ (handler),
00121     act_ (act),
00122     bytes_transferred_ (0),
00123     success_ (0),
00124     completion_key_ (0),
00125     error_ (0)
00126 {
00127   aio_offset = offset;
00128   aio_reqprio = priority;
00129   aio_sigevent.sigev_signo = signal_number;
00130 
00131   // Event is not used on POSIX.
00132   ACE_UNUSED_ARG (event);
00133 
00134   //
00135   // @@ Support offset_high with aiocb64.
00136   //
00137   ACE_UNUSED_ARG (offset_high);
00138 
00139   // Other fields in the <aiocb> will be initialized by the
00140   // subclasses.
00141 }
00142 
00143 // ****************************************************************
00144 
00145 int
00146 ACE_POSIX_Asynch_Operation::open (ACE_Handler &handler,
00147                                   ACE_HANDLE handle,
00148                                   const void *completion_key,
00149                                   ACE_Proactor *proactor)
00150 {
00151   this->proactor_ = proactor;
00152   this->handler_ = &handler;
00153   this->handle_ = handle;
00154 
00155   // Grab the handle from the <handler> if <handle> is invalid
00156   if (this->handle_ == ACE_INVALID_HANDLE)
00157     this->handle_ = this->handler_->handle ();
00158   if (this->handle_ == ACE_INVALID_HANDLE)
00159     return -1;
00160 
00161 #if 0
00162   // @@ If <proactor> is 0, let us not bother about getting this
00163   // Proactor, we have already got the specific implementation
00164   // Proactor.
00165 
00166   // If no proactor was passed
00167   if (this->proactor_ == 0)
00168     {
00169       // Grab the proactor from the <Service_Config> if
00170       // <handler->proactor> is zero
00171       this->proactor_ = this->handler_->proactor ();
00172       if (this->proactor_ == 0)
00173         this->proactor_ = ACE_Proactor::instance();
00174     }
00175 #endif /* 0 */
00176 
00177   // AIO stuff is present. So no registering.
00178   ACE_UNUSED_ARG (completion_key);
00179   return 0;
00180 }
00181 
00182 int
00183 ACE_POSIX_Asynch_Operation::cancel (void)
00184 {
00185   if (!posix_proactor_)
00186     return -1;
00187   return posix_proactor_->cancel_aio (this->handle_);
00188 }
00189 
00190 ACE_Proactor *
00191 ACE_POSIX_Asynch_Operation::proactor (void) const
00192 {
00193   return this->proactor_;
00194 }
00195 
00196 ACE_POSIX_Proactor *
00197 ACE_POSIX_Asynch_Operation::posix_proactor (void) const
00198 {
00199   return this->posix_proactor_;
00200 }
00201 
00202 ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation (void)
00203 {
00204 }
00205 
00206 ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (ACE_POSIX_Proactor *posix_proactor)
00207   : ACE_Asynch_Operation_Impl (),
00208     posix_proactor_ (posix_proactor),
00209     handler_ (0),
00210     handle_  (ACE_INVALID_HANDLE)
00211 {
00212 }
00213 
00214 // *********************************************************************
00215 
00216 size_t
00217 ACE_POSIX_Asynch_Read_Stream_Result::bytes_to_read (void) const
00218 {
00219   return this->aio_nbytes;
00220 }
00221 
00222 ACE_Message_Block &
00223 ACE_POSIX_Asynch_Read_Stream_Result::message_block (void) const
00224 {
00225   return this->message_block_;
00226 }
00227 
00228 ACE_HANDLE
00229 ACE_POSIX_Asynch_Read_Stream_Result::handle (void) const
00230 {
00231   return this->aio_fildes;
00232 }
00233 
00234 ACE_POSIX_Asynch_Read_Stream_Result::ACE_POSIX_Asynch_Read_Stream_Result (ACE_Handler &handler,
00235                                                                           ACE_HANDLE handle,
00236                                                                           ACE_Message_Block &message_block,
00237                                                                           size_t bytes_to_read,
00238                                                                           const void* act,
00239                                                                           ACE_HANDLE event,
00240                                                                           int priority,
00241                                                                           int signal_number)
00242   : ACE_Asynch_Result_Impl (),
00243     ACE_Asynch_Read_Stream_Result_Impl (),
00244     ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
00245     message_block_ (message_block)
00246 {
00247   this->aio_fildes = handle;
00248   this->aio_buf = message_block.wr_ptr ();
00249   this->aio_nbytes = bytes_to_read;
00250   ACE_UNUSED_ARG (event);
00251 }
00252 
00253 void
00254 ACE_POSIX_Asynch_Read_Stream_Result::complete (size_t bytes_transferred,
00255                                                int success,
00256                                                const void *completion_key,
00257                                                u_long error)
00258 {
00259   this->bytes_transferred_ = bytes_transferred;
00260   this->success_ = success;
00261   this->completion_key_ = completion_key;
00262   this->error_ = error;
00263 
00264   // <errno> is available in the aiocb.
00265   ACE_UNUSED_ARG (error);
00266 
00267   // Appropriately move the pointers in the message block.
00268   this->message_block_.wr_ptr (bytes_transferred);
00269 
00270   // Create the interface result class.
00271   ACE_Asynch_Read_Stream::Result result (this);
00272 
00273   // Call the application handler.
00274   this->handler_.handle_read_stream (result);
00275 }
00276 
00277 ACE_POSIX_Asynch_Read_Stream_Result::~ACE_POSIX_Asynch_Read_Stream_Result (void)
00278 {
00279 }
00280 
00281 // ************************************************************
00282 
00283 ACE_POSIX_Asynch_Read_Stream::ACE_POSIX_Asynch_Read_Stream (ACE_POSIX_Proactor  *posix_proactor)
00284   : ACE_Asynch_Operation_Impl (),
00285     ACE_Asynch_Read_Stream_Impl (),
00286     ACE_POSIX_Asynch_Operation (posix_proactor)
00287 {
00288 }
00289 
00290 int
00291 ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
00292                                     size_t bytes_to_read,
00293                                     const void *act,
00294                                     int priority,
00295                                     int signal_number)
00296 {
00297   size_t space = message_block.space ();
00298   if (bytes_to_read > space)
00299      bytes_to_read=space;
00300 
00301   if (bytes_to_read == 0)
00302     {
00303       errno = ENOSPC;
00304       return -1;
00305     }
00306 
00307   // Create the Asynch_Result.
00308   ACE_POSIX_Asynch_Read_Stream_Result *result = 0;
00309   ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00310   ACE_NEW_RETURN (result,
00311                   ACE_POSIX_Asynch_Read_Stream_Result (*this->handler_,
00312                                                        this->handle_,
00313                                                        message_block,
00314                                                        bytes_to_read,
00315                                                        act,
00316                                                        proactor->get_handle (),
00317                                                        priority,
00318                                                        signal_number),
00319                   -1);
00320 
00321   int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::READ);
00322   if (return_val == -1)
00323     delete result;
00324 
00325   return return_val;
00326 }
00327 
00328 ACE_POSIX_Asynch_Read_Stream::~ACE_POSIX_Asynch_Read_Stream (void)
00329 {
00330 }
00331 
00332 // *********************************************************************
00333 
00334 size_t
00335 ACE_POSIX_Asynch_Write_Stream_Result::bytes_to_write (void) const
00336 {
00337   return this->aio_nbytes;
00338 }
00339 
00340 ACE_Message_Block &
00341 ACE_POSIX_Asynch_Write_Stream_Result::message_block (void) const
00342 {
00343   return this->message_block_;
00344 }
00345 
00346 ACE_HANDLE
00347 ACE_POSIX_Asynch_Write_Stream_Result::handle (void) const
00348 {
00349   return this->aio_fildes;
00350 }
00351 
00352 ACE_POSIX_Asynch_Write_Stream_Result::ACE_POSIX_Asynch_Write_Stream_Result
00353   (ACE_Handler &handler,
00354    ACE_HANDLE handle,
00355    ACE_Message_Block &message_block,
00356    size_t bytes_to_write,
00357    const void* act,
00358    ACE_HANDLE event,
00359    int priority,
00360    int signal_number)
00361   : ACE_Asynch_Result_Impl (),
00362     ACE_Asynch_Write_Stream_Result_Impl (),
00363     ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
00364     message_block_ (message_block)
00365 {
00366   this->aio_fildes = handle;
00367   this->aio_buf = message_block.rd_ptr ();
00368   this->aio_nbytes = bytes_to_write;
00369   ACE_UNUSED_ARG (event);
00370 }
00371 
00372 void
00373 ACE_POSIX_Asynch_Write_Stream_Result::complete (size_t bytes_transferred,
00374                                                 int success,
00375                                                 const void *completion_key,
00376                                                 u_long error)
00377 {
00378   // Get all the data copied.
00379   this->bytes_transferred_ = bytes_transferred;
00380   this->success_ = success;
00381   this->completion_key_ = completion_key;
00382   this->error_ = error;
00383 
00384   // <errno> is available in the aiocb.
00385   ACE_UNUSED_ARG (error);
00386 
00387   // Appropriately move the pointers in the message block.
00388   this->message_block_.rd_ptr (bytes_transferred);
00389 
00390   // Create the interface result class.
00391   ACE_Asynch_Write_Stream::Result result (this);
00392 
00393   // Call the application handler.
00394   this->handler_.handle_write_stream (result);
00395 }
00396 
00397 ACE_POSIX_Asynch_Write_Stream_Result::~ACE_POSIX_Asynch_Write_Stream_Result (void)
00398 {
00399 }
00400 
00401 // *********************************************************************
00402 
00403 ACE_POSIX_Asynch_Write_Stream::ACE_POSIX_Asynch_Write_Stream (ACE_POSIX_Proactor *posix_proactor)
00404   : ACE_Asynch_Operation_Impl (),
00405     ACE_Asynch_Write_Stream_Impl (),
00406     ACE_POSIX_Asynch_Operation (posix_proactor)
00407 {
00408 }
00409 
00410 int
00411 ACE_POSIX_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
00412                                       size_t bytes_to_write,
00413                                       const void *act,
00414                                       int priority,
00415                                       int signal_number)
00416 {
00417   size_t len = message_block.length ();
00418   if (bytes_to_write > len)
00419      bytes_to_write = len;
00420 
00421   if (bytes_to_write == 0)
00422     ACE_ERROR_RETURN 
00423       ((LM_ERROR,
00424         ACE_LIB_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
00425         ACE_LIB_TEXT ("Attempt to write 0 bytes\n")),
00426       -1);
00427 
00428   ACE_POSIX_Asynch_Write_Stream_Result *result = 0;
00429   ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00430   ACE_NEW_RETURN (result,
00431                   ACE_POSIX_Asynch_Write_Stream_Result (*this->handler_,
00432                                                         this->handle_,
00433                                                         message_block,
00434                                                         bytes_to_write,
00435                                                         act,
00436                                                         proactor->get_handle (),
00437                                                         priority,
00438                                                         signal_number),
00439                   -1);
00440 
00441   int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::WRITE);
00442   if (return_val == -1)
00443     delete result;
00444 
00445   return return_val;
00446 }
00447 
00448 ACE_POSIX_Asynch_Write_Stream::~ACE_POSIX_Asynch_Write_Stream (void)
00449 {
00450 }
00451 
00452 // *********************************************************************
00453 
00454 ACE_POSIX_Asynch_Read_File_Result::ACE_POSIX_Asynch_Read_File_Result
00455   (ACE_Handler &handler,
00456    ACE_HANDLE handle,
00457    ACE_Message_Block &message_block,
00458    size_t bytes_to_read,
00459    const void* act,
00460    u_long offset,
00461    u_long offset_high,
00462    ACE_HANDLE event,
00463    int priority,
00464    int signal_number)
00465   : ACE_Asynch_Result_Impl (),
00466     ACE_Asynch_Read_Stream_Result_Impl (),
00467     ACE_Asynch_Read_File_Result_Impl (),
00468     ACE_POSIX_Asynch_Read_Stream_Result (handler,
00469                                          handle,
00470                                          message_block,
00471                                          bytes_to_read,
00472                                          act,
00473                                          event,
00474                                          priority,
00475                                          signal_number)
00476 {
00477   this->aio_offset = offset;
00478   //
00479   // @@ Use aiocb64??
00480   //
00481   ACE_UNUSED_ARG (offset_high);
00482 }
00483 
00484 void
00485 ACE_POSIX_Asynch_Read_File_Result::complete (size_t bytes_transferred,
00486                                              int success,
00487                                              const void *completion_key,
00488                                              u_long error)
00489 {
00490   // Copy all the data.
00491   this->bytes_transferred_ = bytes_transferred;
00492   this->success_ = success;
00493   this->completion_key_ = completion_key;
00494   this->error_ = error;
00495 
00496   // <errno> is available in the aiocb.
00497   ACE_UNUSED_ARG (error);
00498 
00499   // Appropriately move the pointers in the message block.
00500   this->message_block_.wr_ptr (bytes_transferred);
00501 
00502   // Create the interface result class.
00503   ACE_Asynch_Read_File::Result result (this);
00504 
00505   // Call the application handler.
00506   this->handler_.handle_read_file (result);
00507 }
00508 
00509 ACE_POSIX_Asynch_Read_File_Result::~ACE_POSIX_Asynch_Read_File_Result (void)
00510 {
00511 }
00512 
00513 // *********************************************************************
00514 
00515 ACE_POSIX_Asynch_Read_File::ACE_POSIX_Asynch_Read_File (ACE_POSIX_Proactor *posix_proactor)
00516   : ACE_Asynch_Operation_Impl (),
00517     ACE_Asynch_Read_Stream_Impl (),
00518     ACE_Asynch_Read_File_Impl (),
00519     ACE_POSIX_Asynch_Read_Stream (posix_proactor)
00520 {
00521 }
00522 
00523 int
00524 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block,
00525                                   size_t bytes_to_read,
00526                                   u_long offset,
00527                                   u_long offset_high,
00528                                   const void *act,
00529                                   int priority,
00530                                   int signal_number)
00531 {
00532   size_t space = message_block.space ();
00533   if ( bytes_to_read > space )
00534      bytes_to_read=space;
00535 
00536   if ( bytes_to_read == 0 )
00537     ACE_ERROR_RETURN 
00538       ((LM_ERROR,
00539         ACE_LIB_TEXT ("ACE_POSIX_Asynch_Read_File::read:")
00540         ACE_LIB_TEXT ("Attempt to read 0 bytes or no space in the message block\n")),
00541        -1);
00542 
00543   ACE_POSIX_Asynch_Read_File_Result *result = 0;
00544   ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00545   ACE_NEW_RETURN (result,
00546                   ACE_POSIX_Asynch_Read_File_Result (*this->handler_,
00547                                                      this->handle_,
00548                                                      message_block,
00549                                                      bytes_to_read,
00550                                                      act,
00551                                                      offset,
00552                                                      offset_high,
00553                                                      posix_proactor ()->get_handle (),
00554                                                      priority,
00555                                                      signal_number),
00556                   -1);
00557 
00558   int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::READ);
00559   if (return_val == -1)
00560     delete result;
00561 
00562   return return_val;
00563 }
00564 
00565 ACE_POSIX_Asynch_Read_File::~ACE_POSIX_Asynch_Read_File (void)
00566 {
00567 }
00568 
00569 int
00570 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block,
00571                                   size_t bytes_to_read,
00572                                   const void *act,
00573                                   int priority,
00574                                   int signal_number)
00575 {
00576   return ACE_POSIX_Asynch_Read_Stream::read (message_block,
00577                                                    bytes_to_read,
00578                                                    act,
00579                                                    priority,
00580                                                    signal_number);
00581 }
00582 
00583 // ************************************************************
00584 
00585 ACE_POSIX_Asynch_Write_File_Result::ACE_POSIX_Asynch_Write_File_Result
00586   (ACE_Handler &handler,
00587    ACE_HANDLE handle,
00588    ACE_Message_Block &message_block,
00589    size_t bytes_to_write,
00590    const void* act,
00591    u_long offset,
00592    u_long offset_high,
00593    ACE_HANDLE event,
00594    int priority,
00595    int signal_number)
00596   : ACE_Asynch_Result_Impl (),
00597     ACE_Asynch_Write_Stream_Result_Impl (),
00598     ACE_Asynch_Write_File_Result_Impl (),
00599     ACE_POSIX_Asynch_Write_Stream_Result (handler,
00600                                           handle,
00601                                           message_block,
00602                                           bytes_to_write,
00603                                           act,
00604                                           event,
00605                                           priority,
00606                                           signal_number)
00607 {
00608   this->aio_offset = offset;
00609   //
00610   // @@ Support offset_high with aiocb64.
00611   //
00612   ACE_UNUSED_ARG (offset_high);
00613 }
00614 
00615 void
00616 ACE_POSIX_Asynch_Write_File_Result::complete (size_t bytes_transferred,
00617                                               int success,
00618                                               const void *completion_key,
00619                                               u_long error)
00620 {
00621   // Copy the data.
00622   this->bytes_transferred_ = bytes_transferred;
00623   this->success_ = success;
00624   this->completion_key_ = completion_key;
00625   this->error_ = error;
00626 
00627   // <error> is available in <aio_resultp.aio_error>
00628   ACE_UNUSED_ARG (error);
00629 
00630   // Appropriately move the pointers in the message block.
00631   this->message_block_.rd_ptr (bytes_transferred);
00632 
00633   // Create the interface result class.
00634   ACE_Asynch_Write_File::Result result (this);
00635 
00636   // Call the application handler.
00637   this->handler_.handle_write_file (result);
00638 }
00639 
00640 ACE_POSIX_Asynch_Write_File_Result::~ACE_POSIX_Asynch_Write_File_Result  (void)
00641 {
00642 }
00643 
00644 // *********************************************************************
00645 
00646 ACE_POSIX_Asynch_Write_File::ACE_POSIX_Asynch_Write_File (ACE_POSIX_Proactor *posix_proactor)
00647   : ACE_Asynch_Operation_Impl (),
00648     ACE_Asynch_Write_Stream_Impl (),
00649     ACE_Asynch_Write_File_Impl (),
00650     ACE_POSIX_Asynch_Write_Stream (posix_proactor)
00651 {
00652 }
00653 
00654 int
00655 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block,
00656                                     size_t bytes_to_write,
00657                                     u_long offset,
00658                                     u_long offset_high,
00659                                     const void *act,
00660                                     int priority,
00661                                     int signal_number)
00662 {
00663   size_t len = message_block.length ();
00664   if (bytes_to_write > len)
00665      bytes_to_write = len;
00666 
00667   if (bytes_to_write == 0)
00668     ACE_ERROR_RETURN 
00669       ((LM_ERROR,
00670         ACE_LIB_TEXT ("ACE_POSIX_Asynch_Write_File::write:")
00671         ACE_LIB_TEXT ("Attempt to write 0 bytes\n")),
00672       -1);
00673 
00674   ACE_POSIX_Asynch_Write_File_Result *result = 0;
00675   ACE_POSIX_Proactor *proactor = this->posix_proactor ();
00676   ACE_NEW_RETURN (result,
00677                   ACE_POSIX_Asynch_Write_File_Result (*this->handler_,
00678                                                       this->handle_,
00679                                                       message_block,
00680                                                       bytes_to_write,
00681                                                       act,
00682                                                       offset,
00683                                                       offset_high,
00684                                                       proactor->get_handle (),
00685                                                       priority,
00686                                                       signal_number),
00687                   -1);
00688 
00689   int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::WRITE);
00690   if (return_val == -1)
00691     delete result;
00692 
00693   return return_val;
00694 }
00695 
00696 ACE_POSIX_Asynch_Write_File::~ACE_POSIX_Asynch_Write_File (void)
00697 {
00698 }
00699 
00700 int
00701 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block,
00702                                     size_t bytes_to_write,
00703                                     const void *act,
00704                                     int priority,
00705                                     int signal_number)
00706 {
00707   return ACE_POSIX_Asynch_Write_Stream::write (message_block,
00708                                                      bytes_to_write,
00709                                                      act,
00710                                                      priority,
00711                                                      signal_number);
00712 }
00713 
00714 // *********************************************************************
00715 
00716 
00717 size_t
00718 ACE_POSIX_Asynch_Accept_Result::bytes_to_read (void) const
00719 {
00720   return this->aio_nbytes;
00721 }
00722 
00723 ACE_Message_Block &
00724 ACE_POSIX_Asynch_Accept_Result::message_block (void) const
00725 {
00726   return this->message_block_;
00727 }
00728 
00729 ACE_HANDLE
00730 ACE_POSIX_Asynch_Accept_Result::listen_handle (void) const
00731 {
00732   return this->listen_handle_;
00733 }
00734 
00735 ACE_HANDLE
00736 ACE_POSIX_Asynch_Accept_Result::accept_handle (void) const
00737 {
00738   return this->aio_fildes;
00739 }
00740 
00741 ACE_POSIX_Asynch_Accept_Result::ACE_POSIX_Asynch_Accept_Result
00742   (ACE_Handler &handler,
00743    ACE_HANDLE listen_handle,
00744    ACE_HANDLE accept_handle,
00745    ACE_Message_Block &message_block,
00746    size_t bytes_to_read,
00747    const void* act,
00748    ACE_HANDLE event,
00749    int priority,
00750    int signal_number)
00751 
00752   : ACE_Asynch_Result_Impl (),
00753     ACE_Asynch_Accept_Result_Impl (),
00754     ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
00755     message_block_ (message_block),
00756     listen_handle_ (listen_handle)
00757 {
00758   this->aio_fildes = accept_handle;
00759   this->aio_nbytes = bytes_to_read;
00760 }
00761 
00762 void
00763 ACE_POSIX_Asynch_Accept_Result::complete (size_t bytes_transferred,
00764                                           int success,
00765                                           const void *completion_key,
00766                                           u_long error)
00767 {
00768   // Copy the data.
00769   this->bytes_transferred_ = bytes_transferred;
00770   this->success_ = success;
00771   this->completion_key_ = completion_key;
00772   this->error_ = error;
00773 
00774   // Appropriately move the pointers in the message block.
00775   this->message_block_.wr_ptr (bytes_transferred);
00776 
00777   // Create the interface result class.
00778   ACE_Asynch_Accept::Result result (this);
00779 
00780   // Call the application handler.
00781   this->handler_.handle_accept (result);
00782 }
00783 
00784 ACE_POSIX_Asynch_Accept_Result::~ACE_POSIX_Asynch_Accept_Result (void)
00785 {
00786 }
00787 
00788 // *********************************************************************
00789 
00790 ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_proactor)
00791   : ACE_Asynch_Operation_Impl (),
00792     ACE_Asynch_Accept_Impl (),
00793     ACE_POSIX_Asynch_Operation (posix_proactor),
00794     flg_open_ (0),
00795     task_lock_count_ (0)
00796 {
00797 }
00798 
00799 ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void)
00800 {
00801   this->close ();
00802   this->reactor(0); // to avoid purge_pending_notifications
00803 }
00804 
00805 ACE_HANDLE
00806 ACE_POSIX_Asynch_Accept::get_handle (void) const
00807 {
00808   return this->handle_;
00809 }
00810 
00811 void
00812 ACE_POSIX_Asynch_Accept::set_handle (ACE_HANDLE handle)
00813 {
00814   ACE_ASSERT (handle_ == ACE_INVALID_HANDLE);
00815   this->handle_ = handle;
00816 }
00817 
00818 int
00819 ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler,
00820                                ACE_HANDLE handle,
00821                                const void *completion_key,
00822                                ACE_Proactor *proactor)
00823 {
00824   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::open\n"));
00825 
00826   int result=0;
00827 
00828   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
00829 
00830   // if we are already opened,
00831   // we could not create a new handler without closing the previous
00832 
00833   if (this->flg_open_ != 0)
00834     ACE_ERROR_RETURN ((LM_ERROR,
00835                        ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:")
00836                        ACE_LIB_TEXT("acceptor already open \n")),
00837                       -1);
00838 
00839   result = ACE_POSIX_Asynch_Operation::open (handler,
00840                                              handle,
00841                                              completion_key,
00842                                              proactor);
00843   if (result == -1)
00844     return result;
00845 
00846   flg_open_ = 1;
00847 
00848   task_lock_count_++;
00849 
00850   // At this moment asynch_accept_task does not know about us,
00851   // so we can lock task's token with our lock_ locked.
00852   // In all other cases we should release our lock_ before
00853   // calling task's methods to avoid deadlock
00854   ACE_Asynch_Pseudo_Task & task =
00855     this->posix_proactor()->get_asynch_pseudo_task();
00856 
00857   result = task.register_io_handler (this->get_handle(),
00858                                      this,
00859                                      ACE_Event_Handler::ACCEPT_MASK,
00860                                      1);  // suspend after register
00861 
00862   task_lock_count_-- ;
00863 
00864   if (result < 0)
00865     {
00866       this->flg_open_= 0;
00867       this->handle_ = ACE_INVALID_HANDLE;
00868       return -1 ;
00869     }
00870 
00871   return 0;
00872 }
00873 
00874 int
00875 ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
00876                                  size_t bytes_to_read,
00877                                  ACE_HANDLE accept_handle,
00878                                  const void *act,
00879                                  int priority,
00880                                  int signal_number)
00881 {
00882   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::accept\n"));
00883 
00884   {
00885     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
00886 
00887     if (this->flg_open_ == 0)
00888       ACE_ERROR_RETURN ((LM_ERROR,
00889                          ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
00890                          ACE_LIB_TEXT("acceptor was not opened before\n")),
00891                         -1);
00892 
00893     // Sanity check: make sure that enough space has been allocated by
00894     // the caller.
00895     size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr);
00896     size_t space_in_use = message_block.wr_ptr () - message_block.base ();
00897     size_t total_size = message_block.size ();
00898     size_t available_space = total_size - space_in_use;
00899     size_t space_needed = bytes_to_read + 2 * address_size;
00900 
00901     if (available_space < space_needed)
00902       ACE_ERROR_RETURN ((LM_ERROR,
00903                          ACE_LIB_TEXT ("Buffer too small\n")),
00904                         -1);
00905 
00906     // Common code for both WIN and POSIX.
00907     // Create future Asynch_Accept_Result
00908     ACE_POSIX_Asynch_Accept_Result *result = 0;
00909     ACE_NEW_RETURN (result,
00910                     ACE_POSIX_Asynch_Accept_Result (*this->handler_,
00911                                                     this->handle_,
00912                                                     accept_handle,
00913                                                     message_block,
00914                                                     bytes_to_read,
00915                                                     act,
00916                                                     this->posix_proactor()->get_handle (),
00917                                                     priority,
00918                                                     signal_number),
00919                   -1);
00920 
00921     // Enqueue result
00922     if (this->result_queue_.enqueue_tail (result) == -1)
00923       {
00924         delete result;  // to avoid memory  leak
00925 
00926         ACE_ERROR_RETURN ((LM_ERROR,
00927                            ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept:")
00928                            ACE_LIB_TEXT("enqueue accept call failed\n")),
00929                           -1);
00930       }
00931 
00932     if (this->result_queue_.size () > 1)
00933       return 0;
00934 
00935     task_lock_count_ ++;
00936   }
00937 
00938   // If this is the only item, then it means there the set was empty
00939   // before. So enable the <handle> in the reactor.
00940 
00941   ACE_Asynch_Pseudo_Task & task =
00942     this->posix_proactor ()->get_asynch_pseudo_task ();
00943 
00944   int rc_task = task.resume_io_handler (this->get_handle());
00945 
00946   {
00947     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
00948 
00949     task_lock_count_ --;
00950 
00951     if (rc_task == -2 && task_lock_count_ == 0)  // task is closing
00952       task.unlock_finish ();
00953   }
00954 
00955   if (rc_task < 0)
00956     return -1;
00957 
00958   return 0;
00959 }
00960 
00961 //@@ New method cancel_uncompleted
00962 // It performs cancellation of all pending requests
00963 //
00964 // Parameter flg_notify can be
00965 //     0  - don't send notifications about canceled accepts
00966 //    !0  - notify user about canceled accepts
00967 //          according POSIX standards we should receive notifications
00968 //          on canceled AIO requests
00969 //
00970 //  Return value : number of cancelled requests
00971 //
00972 
00973 int
00974 ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify)
00975 {
00976   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::cancel_uncompleted\n"));
00977 
00978   int retval = 0;
00979 
00980   for (; ; retval++)
00981     {
00982       ACE_POSIX_Asynch_Accept_Result* result = 0;
00983 
00984       this->result_queue_.dequeue_head (result);
00985 
00986       if (result == 0)
00987         break;
00988 
00989       if (this->flg_open_ == 0 || flg_notify == 0) //if we should not notify
00990         delete result ;                            // we have to delete result
00991       else                                 //else notify as any cancelled AIO
00992         {
00993           // Store the new handle.
00994           result->aio_fildes = ACE_INVALID_HANDLE ;
00995           result->set_bytes_transferred (0);
00996           result->set_error (ECANCELED);
00997 
00998           if (this->posix_proactor ()->post_completion (result) == -1)
00999             ACE_ERROR ((LM_ERROR,
01000                         ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
01001                         ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::")
01002                         ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed")
01003                         ));
01004         }
01005     }
01006   return retval;
01007 }
01008 
01009 int
01010 ACE_POSIX_Asynch_Accept::cancel (void)
01011 {
01012   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::cancel\n"));
01013 
01014   //We are not really ACE_POSIX_Asynch_Operation
01015   //so we could not call ::aiocancel ()
01016   // or just write
01017   //return ACE_POSIX_Asynch_Operation::cancel ();
01018   //We delegate real cancelation to cancel_uncompleted (1)
01019 
01020   int rc = -1 ;  // ERRORS
01021 
01022   {
01023     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01024 
01025     int num_cancelled = cancel_uncompleted (flg_open_);
01026 
01027     if (num_cancelled == 0)
01028        rc = 1 ;        // AIO_ALLDONE
01029     else if (num_cancelled > 0)
01030        rc = 0 ;        // AIO_CANCELED
01031 
01032     if (this->flg_open_ == 0)
01033        return rc ;
01034 
01035     task_lock_count_++;
01036   }
01037 
01038   ACE_Asynch_Pseudo_Task & task =
01039     this->posix_proactor ()->get_asynch_pseudo_task ();
01040 
01041   int rc_task = task.suspend_io_handler (this->get_handle());
01042 
01043   {
01044     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01045 
01046     task_lock_count_--;
01047 
01048     if (rc_task == -2 && task_lock_count_ == 0)  // task is closing
01049        task.unlock_finish ();
01050   }
01051 
01052   return rc;
01053 }
01054 
01055 int
01056 ACE_POSIX_Asynch_Accept::close ()
01057 {
01058   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::close\n"));
01059 
01060   // 1. It performs cancellation of all pending requests
01061   // 2. Removes itself from Reactor ( ACE_Asynch_Pseudo_Task)
01062   // 3. close the socket
01063   //
01064   //  Parameter flg_notify can be
01065   //     0  - don't send notifications about canceled accepts
01066   //    !0  - notify user about canceled accepts
01067   //          according POSIX standards we should receive notifications
01068   //          on canceled AIO requests
01069   //
01070   //  Return codes : 0 - OK ,
01071   //                -1 - Errors
01072 
01073   {
01074     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01075 
01076     this->cancel_uncompleted (flg_open_);
01077 
01078     if (this->flg_open_ == 0)
01079       {
01080         if (this->handle_ != ACE_INVALID_HANDLE)
01081           {
01082             ACE_OS::closesocket (this->handle_);
01083             this->handle_ = ACE_INVALID_HANDLE;
01084           }
01085         return 0;
01086       }
01087 
01088     task_lock_count_++;
01089   }
01090 
01091   ACE_Asynch_Pseudo_Task & task =
01092     this->posix_proactor ()->get_asynch_pseudo_task ();
01093 
01094   int rc_task = task.remove_io_handler (this->get_handle ());
01095 
01096   {
01097     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01098 
01099     task_lock_count_--;
01100 
01101     if (rc_task == -2 && task_lock_count_ == 0)  // task is closing
01102       task.unlock_finish ();
01103 
01104     if (this->handle_ != ACE_INVALID_HANDLE)
01105       {
01106         ACE_OS::closesocket (this->handle_);
01107         this->handle_ = ACE_INVALID_HANDLE;
01108       }
01109 
01110     this->flg_open_ = 0;
01111   }
01112 
01113   return 0;
01114 }
01115 
01116 int
01117 ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
01118 {
01119   ACE_UNUSED_ARG (handle);
01120   ACE_UNUSED_ARG (close_mask);
01121 
01122   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_close\n"));
01123 
01124   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01125 
01126   // handle_close is called only in one case :
01127   //  when Asynch_accept_task is closing ( i.e. proactor destructor )
01128 
01129   // In all other cases we deregister ourself
01130   // with ACE_Event_Handler::DONT_CALL mask
01131 
01132   this->cancel_uncompleted (0);
01133 
01134   this->flg_open_ = 0;
01135 
01136   // it means other thread is waiting for reactor token_
01137   if (task_lock_count_ > 0)
01138     {
01139       ACE_Asynch_Pseudo_Task & task =
01140          this->posix_proactor ()->get_asynch_pseudo_task ();
01141 
01142       task.lock_finish ();
01143     }
01144 
01145   return 0;
01146 }
01147 
01148 int
01149 ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
01150 {
01151   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input\n"));
01152 
01153   // An <accept> has been sensed on the <listen_handle>. We should be
01154   // able to just go ahead and do the <accept> now on this <fd>. This
01155   // should be the same as the <listen_handle>.
01156 
01157   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01158 
01159   ACE_POSIX_Asynch_Accept_Result* result = 0;
01160 
01161   // Deregister this info pertaining to this <accept> call.
01162   if (this->result_queue_.dequeue_head (result) != 0)
01163     ACE_ERROR ((LM_ERROR,
01164                 ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
01165                 ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
01166                 ACE_LIB_TEXT( " dequeueing failed")));
01167 
01168   // Disable the <handle> in the reactor if no <accept>'s are pending.
01169 
01170   // we allow the following sequence of locks :
01171   //    reactor::token , then our mutex lock_
01172   // to avoid deadlock prohibited reverse sequence
01173 
01174   if (this->result_queue_.size () == 0)
01175     {
01176       ACE_Asynch_Pseudo_Task & task =
01177         this->posix_proactor ()->get_asynch_pseudo_task ();
01178 
01179       task.suspend_io_handler (this->get_handle());
01180     }
01181 
01182   // Issue <accept> now.
01183   // @@ We shouldnt block here since we have already done poll/select
01184   // thru reactor. But are we sure?
01185 
01186   ACE_HANDLE new_handle = ACE_OS::accept (this->handle_, 0, 0);
01187 
01188   if (result == 0) // there is nobody to notify
01189     {
01190       ACE_OS::closesocket (new_handle);
01191       return 0;
01192     }
01193 
01194   if (new_handle == ACE_INVALID_HANDLE)
01195     {
01196       result->set_error(errno);
01197       ACE_ERROR ((LM_ERROR,
01198                   ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
01199                   ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
01200                   ACE_LIB_TEXT(" <accept> system call failed")));
01201 
01202       // Notify client as usual, "AIO" finished with errors
01203     }
01204 
01205   // Store the new handle.
01206   result->aio_fildes = new_handle;
01207 
01208   // Notify the main process about this completion
01209   // Send the Result through the notification pipe.
01210   if (this->posix_proactor ()->post_completion (result) == -1)
01211     ACE_ERROR ((LM_ERROR,
01212                 ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
01213                 ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
01214                 ACE_LIB_TEXT(" <post_completion> failed")));
01215 
01216   return 0;
01217 }
01218 
01219 // *********************************************************************
01220 
01221 ACE_HANDLE
01222 ACE_POSIX_Asynch_Connect_Result::connect_handle (void) const
01223 {
01224   return this->aio_fildes;
01225 }
01226 
01227 void ACE_POSIX_Asynch_Connect_Result::connect_handle (ACE_HANDLE handle)
01228 {
01229   this->aio_fildes = handle;
01230 }
01231 
01232 
01233 ACE_POSIX_Asynch_Connect_Result::ACE_POSIX_Asynch_Connect_Result
01234   (ACE_Handler &handler,
01235    ACE_HANDLE connect_handle,
01236    const void* act,
01237    ACE_HANDLE event,
01238    int priority,
01239    int signal_number)
01240 
01241   : ACE_Asynch_Result_Impl (),
01242     ACE_Asynch_Connect_Result_Impl (),
01243     ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
01244 {
01245   this->aio_fildes = connect_handle;
01246   this->aio_nbytes = 0;
01247 }
01248 
01249 void
01250 ACE_POSIX_Asynch_Connect_Result::complete (size_t bytes_transferred,
01251                                            int success,
01252                                            const void *completion_key,
01253                                            u_long error)
01254 {
01255   // Copy the data.
01256   this->bytes_transferred_ = bytes_transferred;
01257   this->success_ = success;
01258   this->completion_key_ = completion_key;
01259   this->error_ = error;
01260 
01261   // Create the interface result class.
01262   ACE_Asynch_Connect::Result result (this);
01263 
01264   // Call the application handler.
01265   this->handler_.handle_connect (result);
01266 }
01267 
01268 ACE_POSIX_Asynch_Connect_Result::~ACE_POSIX_Asynch_Connect_Result (void)
01269 {
01270 }
01271 
01272 // *********************************************************************
01273 
01274 ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_Proactor * posix_proactor)
01275   : ACE_Asynch_Operation_Impl (),
01276     ACE_Asynch_Connect_Impl (),
01277     ACE_POSIX_Asynch_Operation (posix_proactor),
01278     flg_open_ (0),
01279     task_lock_count_ (0)
01280 {
01281 }
01282 
01283 ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect (void)
01284 {
01285   this->close ();
01286   this->reactor(0); // to avoid purge_pending_notifications
01287 }
01288 
01289 ACE_HANDLE
01290 ACE_POSIX_Asynch_Connect::get_handle (void) const
01291 {
01292 
01293   ACE_ASSERT (0);
01294   return  ACE_INVALID_HANDLE;
01295 }
01296 
01297 void
01298 ACE_POSIX_Asynch_Connect::set_handle (ACE_HANDLE)
01299 {
01300   ACE_ASSERT (0) ;
01301 }
01302 
01303 int
01304 ACE_POSIX_Asynch_Connect::open (ACE_Handler &handler,
01305                                 ACE_HANDLE handle,
01306                                 const void *completion_key,
01307                                 ACE_Proactor *proactor)
01308 {
01309   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::open\n"));
01310 
01311   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01312 
01313   // if we are already opened,
01314   // we could not create a new handler without closing the previous
01315 
01316   if (this->flg_open_ != 0)
01317     ACE_ERROR_RETURN ((LM_ERROR,
01318                        ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::open:")
01319                        ACE_LIB_TEXT("connector already open \n")),
01320                       -1);
01321 
01322   //int result =
01323   ACE_POSIX_Asynch_Operation::open (handler,
01324                                     handle,
01325                                     completion_key,
01326                                     proactor);
01327 
01328   // Ignore result as we pass ACE_INVALID_HANDLE
01329   //if (result == -1)
01330   //  return result;
01331 
01332   this->flg_open_ = 1;
01333 
01334   return 0;
01335 }
01336 
01337 int
01338 ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle,
01339                                    const ACE_Addr & remote_sap,
01340                                    const ACE_Addr & local_sap,
01341                                    int reuse_addr,
01342                                    const void *act,
01343                                    int priority,
01344                                    int signal_number)
01345 {
01346   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect\n"));
01347 
01348   {
01349     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01350 
01351     if (this->flg_open_ == 0)
01352       ACE_ERROR_RETURN ((LM_ERROR,
01353                          ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
01354                          ACE_LIB_TEXT("connector was not opened before\n")),
01355                         -1);
01356 
01357     // Common code for both WIN and POSIX.
01358     // Create future Asynch_Connect_Result
01359     ACE_POSIX_Asynch_Connect_Result *result = 0;
01360     ACE_NEW_RETURN (result,
01361                     ACE_POSIX_Asynch_Connect_Result (*this->handler_,
01362                                                     connect_handle,
01363                                                     act,
01364                                                     this->posix_proactor ()->get_handle (),
01365                                                     priority,
01366                                                     signal_number),
01367                   -1);
01368 
01369     int rc = connect_i (result,
01370                         remote_sap,
01371                         local_sap,
01372                         reuse_addr);
01373 
01374     // update handle
01375     connect_handle = result->connect_handle ();
01376 
01377     if (rc != 0)
01378       return post_result (result, 1);
01379 
01380     //  Enqueue result we will wait for completion
01381 
01382     if (this->result_map_.bind (connect_handle, result) == -1)
01383       {
01384         ACE_ERROR  ((LM_ERROR,
01385                      ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect:")
01386                      ACE_LIB_TEXT("result map binding failed\n")));
01387 
01388         result->set_error (EFAULT);
01389         return post_result (result, 1);
01390       }
01391 
01392     task_lock_count_ ++;
01393   }
01394 
01395   ACE_Asynch_Pseudo_Task & task =
01396     this->posix_proactor ()->get_asynch_pseudo_task ();
01397 
01398   int rc_task = task.register_io_handler (connect_handle,
01399                                           this,
01400                                           ACE_Event_Handler::CONNECT_MASK,
01401                                           0);  // not to suspend after register
01402   {
01403     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01404 
01405     this->task_lock_count_ --;
01406 
01407     int post_enable = 1;
01408 
01409     if (rc_task == -2 && task_lock_count_ == 0)  // task is closing
01410       {
01411         post_enable = 0;
01412         task.unlock_finish ();
01413       }
01414 
01415     if (rc_task < 0)
01416       {
01417         ACE_POSIX_Asynch_Connect_Result *result = 0;
01418 
01419         this->result_map_.unbind (connect_handle, result);
01420 
01421         if (result != 0)
01422           {
01423             result->set_error (EFAULT);
01424 
01425             return post_result (result, post_enable);
01426           }
01427       }
01428   }
01429 
01430   return 0;
01431 }
01432 
01433 int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result,
01434                                            int post_enable)
01435 {
01436   if (this->flg_open_ != 0 && post_enable != 0)
01437     {
01438       if (this->posix_proactor ()->post_completion (result) == 0)
01439         return 0 ;
01440 
01441       ACE_ERROR ((LM_ERROR,
01442                   ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
01443                   ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::post_result: ")
01444                   ACE_LIB_TEXT(" <post_completion> failed")));
01445     }
01446 
01447   ACE_HANDLE handle = result->connect_handle ();
01448 
01449   if (handle != ACE_INVALID_HANDLE)
01450     ACE_OS::closesocket (handle);
01451 
01452    delete result;
01453 
01454    return -1;
01455 }
01456 
01457 //@@ New method connect_i
01458 //  return code :
01459 //   -1   errors  before  attempt to connect
01460 //    0   connect started
01461 //    1   connect finished ( may be unsuccessfully)
01462 
01463 int
01464 ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
01465                                      const ACE_Addr & remote_sap,
01466                                      const ACE_Addr & local_sap,
01467                                      int  reuse_addr)
01468 {
01469   result->set_bytes_transferred (0);
01470 
01471   ACE_HANDLE handle = result->connect_handle ();
01472 
01473   if (handle == ACE_INVALID_HANDLE)
01474     {
01475       int protocol_family = remote_sap.get_type ();
01476 
01477       handle = ACE_OS::socket (protocol_family,
01478                                SOCK_STREAM,
01479                                0);
01480       // save it
01481       result->connect_handle (handle);
01482 
01483       if (handle == ACE_INVALID_HANDLE)
01484         {
01485           result->set_error (errno);
01486 
01487           ACE_ERROR_RETURN ((LM_ERROR,
01488                        ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
01489                        ACE_LIB_TEXT(" ACE_OS::socket failed\n")),
01490                       -1);
01491         }
01492 
01493       // Reuse the address
01494       int one = 1;
01495       if (protocol_family != PF_UNIX  &&
01496            reuse_addr != 0 &&
01497            ACE_OS::setsockopt (handle,
01498                                SOL_SOCKET,
01499                                SO_REUSEADDR,
01500                                (const char*) &one,
01501                                sizeof one) == -1 )
01502         {
01503           result->set_error (errno);
01504 
01505           ACE_ERROR_RETURN ((LM_ERROR,
01506                        ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
01507                        ACE_LIB_TEXT(" ACE_OS::setsockopt failed\n")),
01508                       -1);
01509         }
01510     }
01511 
01512   if (local_sap != ACE_Addr::sap_any)
01513     {
01514       sockaddr * laddr = ACE_reinterpret_cast (sockaddr *,
01515                                                local_sap.get_addr ());
01516       size_t size = local_sap.get_size ();
01517 
01518       if (ACE_OS::bind (handle, laddr, size) == -1)
01519         {
01520            result->set_error (errno);
01521 
01522            ACE_ERROR_RETURN ((LM_ERROR,
01523                        ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
01524                        ACE_LIB_TEXT(" ACE_OS::bind failed\n")),
01525                       -1);
01526         }
01527     }
01528 
01529   // set non blocking mode
01530   if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
01531     {
01532       result->set_error (errno);
01533 
01534       ACE_ERROR_RETURN ((LM_ERROR,
01535                          ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
01536                          ACE_LIB_TEXT(" ACE::set_flags failed\n")),
01537                         -1);
01538     }
01539 
01540   for (;;)
01541     {
01542       int rc = ACE_OS::connect (handle,
01543                                 ACE_reinterpret_cast (sockaddr *,
01544                                                       remote_sap.get_addr ()),
01545                                 remote_sap.get_size ());
01546       if (rc < 0)  // failure
01547         {
01548           if (errno == EWOULDBLOCK || errno == EINPROGRESS)
01549             return 0; // connect started
01550 
01551           if (errno == EINTR)
01552              continue;
01553 
01554           result->set_error (errno);
01555         }
01556 
01557       return 1 ;  // connect finished
01558     }
01559 
01560   ACE_NOTREACHED (return 0);
01561 }
01562 
01563 
01564 //@@ New method cancel_uncompleted
01565 // It performs cancellation of all pending requests
01566 //
01567 // Parameter flg_notify can be
01568 //     0  - don't send notifications about canceled accepts
01569 //    !0  - notify user about canceled accepts
01570 //          according POSIX standards we should receive notifications
01571 //          on canceled AIO requests
01572 //
01573 //  Return value : number of cancelled requests
01574 //
01575 
01576 int
01577 ACE_POSIX_Asynch_Connect::cancel_uncompleted (int flg_notify,
01578                                               ACE_Handle_Set & set)
01579 {
01580   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::cancel_uncompleted\n"));
01581 
01582   int retval = 0;
01583 
01584   MAP_ITERATOR  iter (result_map_);
01585   MAP_ENTRY *   me = 0;
01586 
01587   set.reset ();
01588 
01589   for (; iter.next (me) != 0;  retval++ , iter.advance ())
01590     {
01591        ACE_HANDLE handle = me->ext_id_;
01592        ACE_POSIX_Asynch_Connect_Result* result = me->int_id_ ;
01593 
01594        set.set_bit (handle);
01595 
01596        result->set_bytes_transferred (0);
01597        result->set_error (ECANCELED);
01598        this->post_result (result, flg_notify);
01599     }
01600 
01601   result_map_.unbind_all ();
01602 
01603   return retval;
01604 }
01605 
01606 int
01607 ACE_POSIX_Asynch_Connect::cancel (void)
01608 {
01609   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::cancel\n"));
01610 
01611   //We are not really ACE_POSIX_Asynch_Operation
01612   //so we could not call ::aiocancel ()
01613   // or just write
01614   //return ACE_POSIX_Asynch_Operation::cancel ();
01615   //We delegate real cancelation to cancel_uncompleted (1)
01616 
01617   int rc = -1 ;  // ERRORS
01618 
01619   ACE_Handle_Set set;
01620 
01621   {
01622     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01623 
01624     int num_cancelled = cancel_uncompleted (flg_open_, set);
01625 
01626     if (num_cancelled == 0)
01627        rc = 1 ;        // AIO_ALLDONE
01628     else if (num_cancelled > 0)
01629        rc = 0 ;        // AIO_CANCELED
01630 
01631     if (this->flg_open_ == 0)
01632        return rc ;
01633 
01634     this->task_lock_count_++;
01635   }
01636 
01637   ACE_Asynch_Pseudo_Task & task =
01638     this->posix_proactor ()->get_asynch_pseudo_task ();
01639 
01640   int rc_task = task.remove_io_handler (set);
01641 
01642   {
01643     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01644 
01645     this->task_lock_count_--;
01646 
01647     if (rc_task == -2 && task_lock_count_ == 0)  // task is closing
01648       task.unlock_finish ();
01649   }
01650 
01651   return rc;
01652 }
01653 
01654 int
01655 ACE_POSIX_Asynch_Connect::close (void)
01656 {
01657   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::close\n"));
01658 
01659   ACE_Handle_Set set ;
01660 
01661   {
01662     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01663 
01664     int num_cancelled = cancel_uncompleted (flg_open_, set);
01665 
01666     if (num_cancelled == 0 || this->flg_open_ == 0)
01667       {
01668         this->flg_open_ = 0;
01669         return 0;
01670       }
01671 
01672     this->task_lock_count_++;
01673   }
01674 
01675   ACE_Asynch_Pseudo_Task & task =
01676     this->posix_proactor ()->get_asynch_pseudo_task ();
01677 
01678   int rc_task = task.remove_io_handler (set);
01679 
01680   {
01681     ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
01682 
01683     this->task_lock_count_--;
01684 
01685     if (rc_task == -2 && task_lock_count_ == 0)  // task is closing
01686       task.unlock_finish ();
01687 
01688     this->flg_open_ = 0;
01689   }
01690 
01691   return 0;
01692 }
01693 
01694 int
01695 ACE_POSIX_Asynch_Connect::handle_exception (ACE_HANDLE fd)
01696 {
01697   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_exception\n"));
01698   return handle_input (fd);
01699 }
01700 
01701 int
01702 ACE_POSIX_Asynch_Connect::handle_input (ACE_HANDLE fd)
01703 {
01704   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_input\n"));
01705 
01706   return handle_input (fd);
01707 }
01708 
01709 int
01710 ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd)
01711 {
01712   ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_output\n"));
01713 
01714   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01715 
01716   ACE_POSIX_Asynch_Connect_Result* result = 0;
01717 
01718   if (this->result_map_.unbind (fd, result) != 0) // not found
01719     return -1;
01720 
01721   int sockerror  = 0 ;
01722   int lsockerror = sizeof sockerror;
01723 
01724   ACE_OS::getsockopt (fd,
01725                       SOL_SOCKET,
01726                       SO_ERROR,
01727                       (char*) &sockerror,
01728                       &lsockerror);
01729 
01730   result->set_bytes_transferred (0);
01731   result->set_error (sockerror);
01732   this->post_result (result, this->flg_open_);
01733 
01734   return -1;
01735 
01736   //ACE_Asynch_Pseudo_Task & task =
01737   //       this->posix_proactor()->get_asynch_pseudo_task();
01738 
01739   //task.remove_io_handler ( fd );
01740 
01741   //return 0;
01742 }
01743 
01744 
01745 int
01746 ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
01747 {
01748   ACE_TRACE (ACE_LIB_TEXT ("ACE_POSIX_Asynch_Connect::handle_close\n"));
01749 
01750   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
01751 
01752   ACE_Asynch_Pseudo_Task &task =
01753          this->posix_proactor ()->get_asynch_pseudo_task ();
01754 
01755   if (task.is_active() == 0)  // task is closing
01756     {
01757       if (this->flg_open_ !=0)  // we are open
01758         {
01759           this->flg_open_ = 0;
01760 
01761           // it means other thread is waiting for reactor token_
01762           if (task_lock_count_ > 0)
01763             task.lock_finish ();
01764         }
01765 
01766       ACE_Handle_Set set;
01767       this->cancel_uncompleted (0, set);
01768 
01769       return 0;
01770     }
01771 
01772   // remove_io_handler() contains flag DONT_CALL
01773   // so it is save
01774   task.remove_io_handler (fd);
01775 
01776   ACE_POSIX_Asynch_Connect_Result* result = 0;
01777 
01778   if (this->result_map_.unbind (fd, result) != 0 ) // not found
01779     return -1;
01780 
01781   result->set_bytes_transferred (0);
01782   result->set_error (ECANCELED);
01783   this->post_result (result, this->flg_open_);
01784 
01785   return 0;
01786 }
01787 
01788 // *********************************************************************
01789 
01790 ACE_HANDLE
01791 ACE_POSIX_Asynch_Transmit_File_Result::socket (void) const
01792 {
01793   return this->socket_;
01794 }
01795 
01796 ACE_HANDLE
01797 ACE_POSIX_Asynch_Transmit_File_Result::file (void) const
01798 {
01799   return this->aio_fildes;
01800 }
01801 
01802 ACE_Asynch_Transmit_File::Header_And_Trailer *
01803 ACE_POSIX_Asynch_Transmit_File_Result::header_and_trailer (void) const
01804 {
01805   return this->header_and_trailer_;
01806 }
01807 
01808 size_t
01809 ACE_POSIX_Asynch_Transmit_File_Result::bytes_to_write (void) const
01810 {
01811   return this->aio_nbytes;
01812 }
01813 
01814 size_t
01815 ACE_POSIX_Asynch_Transmit_File_Result::bytes_per_send (void) const
01816 {
01817   return this->bytes_per_send_;
01818 }
01819 
01820 u_long
01821 ACE_POSIX_Asynch_Transmit_File_Result::flags (void) const
01822 {
01823   return this->flags_;
01824 }
01825 
01826 ACE_POSIX_Asynch_Transmit_File_Result::ACE_POSIX_Asynch_Transmit_File_Result
01827   (ACE_Handler &handler,
01828    ACE_HANDLE socket,
01829    ACE_HANDLE file,
01830    ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
01831    size_t bytes_to_write,
01832    u_long offset,
01833    u_long offset_high,
01834    size_t bytes_per_send,
01835    u_long flags,
01836    const void *act,
01837    ACE_HANDLE event,
01838    int priority,
01839    int signal_number)
01840 
01841   : ACE_Asynch_Result_Impl (),
01842     ACE_Asynch_Transmit_File_Result_Impl (),
01843     ACE_POSIX_Asynch_Result (handler, act, event, offset, offset_high, priority, signal_number),
01844     socket_ (socket),
01845     header_and_trailer_ (header_and_trailer),
01846     bytes_per_send_ (bytes_per_send),
01847     flags_ (flags)
01848 {
01849   this->aio_fildes = file;
01850   this->aio_nbytes = bytes_to_write;
01851 }
01852 
01853 void
01854 ACE_POSIX_Asynch_Transmit_File_Result::complete (size_t bytes_transferred,
01855                                                  int success,
01856                                                  const void *completion_key,
01857                                                  u_long error)
01858 {
01859   // Copy the data.
01860   this->bytes_transferred_ = bytes_transferred;
01861   this->success_ = success;
01862   this->completion_key_ = completion_key;
01863   this->error_ = error;
01864 
01865   // We will not do this because (a) the header and trailer blocks may
01866   // be the same message_blocks and (b) in cases of failures we have
01867   // no idea how much of what (header, data, trailer) was sent.
01868   /*
01869     if (this->success_ && this->header_and_trailer_ != 0)
01870     {
01871     ACE_Message_Block *header = this->header_and_trailer_->header ();
01872     if (header != 0)
01873     header->rd_ptr (this->header_and_trailer_->header_bytes ());
01874 
01875     ACE_Message_Block *trailer = this->header_and_trailer_->trailer ();
01876     if (trailer != 0)
01877     trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ());
01878     }
01879   */
01880 
01881   // Create the interface result class.
01882   ACE_Asynch_Transmit_File::Result result (this);
01883 
01884   // Call the application handler.
01885   this->handler_.handle_transmit_file (result);
01886 }
01887 
01888 ACE_POSIX_Asynch_Transmit_File_Result::~ACE_POSIX_Asynch_Transmit_File_Result (void)
01889 {
01890 }
01891 
01892 
01893 // *********************************************************************
01894 
01895 /**
01896  * @class ACE_POSIX_Asynch_Transmit_Handler
01897  *
01898  * @brief Auxillary handler for doing <Asynch_Transmit_File> in
01899  * Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this.
01900  *
01901  * This is a helper class for implementing
01902  * <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
01903  */
01904 class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler
01905 {
01906 public:
01907   /// Constructor. Result pointer will have all the information to do
01908   /// the file transmission (socket, file, application handler, bytes
01909   /// to write).
01910   ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Proactor *posix_proactor,
01911                                      ACE_POSIX_Asynch_Transmit_File_Result *result);
01912 
01913   /// Destructor.
01914   virtual ~ACE_POSIX_Asynch_Transmit_Handler (void);
01915 
01916   /// Do the transmission. All the info to do the transmission is in
01917   /// the <result> member.
01918   int transmit (void);
01919 
01920 protected:
01921 
01922   /// The asynch result pointer made from the initial transmit file
01923   /// request.
01924   ACE_POSIX_Asynch_Transmit_File_Result *result_;
01925 
01926   /// Message bloack used to do the transmission.
01927   ACE_Message_Block *mb_;
01928 
01929   enum ACT
01930   {
01931     HEADER_ACT  = 1,
01932     DATA_ACT    = 2,
01933     TRAILER_ACT = 3
01934   };
01935 
01936   /// ACT to transmit header.
01937   ACT header_act_;
01938 
01939   /// ACT to transmit data.
01940   ACT data_act_;
01941 
01942   /// ACT to transmit trailer.
01943   ACT trailer_act_;
01944 
01945   /// Current offset of the file being transmitted.
01946   size_t file_offset_;
01947 
01948   /// Total size of the file.
01949   size_t file_size_;
01950 
01951   /// Number of bytes transferred on the stream.
01952   size_t bytes_transferred_;
01953 
01954   /// This is called when asynchronous writes from the socket complete.
01955   virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
01956 
01957   /// This is called when asynchronous reads from the file complete.
01958   virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
01959 
01960   /// Issue asynch read from  the file.
01961   int initiate_read_file (void);
01962 
01963   /// To read from the file to be transmitted.
01964   ACE_POSIX_Asynch_Read_File rf_;
01965 
01966   /// Write stream to write the header, trailer and the data.
01967   ACE_POSIX_Asynch_Write_Stream ws_;
01968 };
01969 
01970 // ************************************************************
01971 
01972 // Constructor.
01973 ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler
01974       (ACE_POSIX_Proactor *posix_proactor,
01975        ACE_POSIX_Asynch_Transmit_File_Result *result)
01976   : result_ (result),
01977     mb_ (0),
01978     header_act_ (this->HEADER_ACT),
01979     data_act_ (this->DATA_ACT),
01980     trailer_act_ (this->TRAILER_ACT),
01981     file_offset_ (result->offset ()),
01982     file_size_ (0),
01983     bytes_transferred_ (0),
01984     rf_ (posix_proactor),
01985     ws_ (posix_proactor)
01986 {
01987   // Allocate memory for the message block.
01988   ACE_NEW (this->mb_,
01989            ACE_Message_Block (this->result_->bytes_per_send ()
01990                               + 1));
01991   // Init the file size.
01992   file_size_ = ACE_OS::filesize (this->result_->file ());
01993 }
01994 
01995 // Destructor.
01996 ACE_POSIX_Asynch_Transmit_Handler::~ACE_POSIX_Asynch_Transmit_Handler (void)
01997 {
01998   delete result_;
01999   mb_->release ();
02000 }
02001 
02002 
02003 // Do the transmission.
02004 // Initiate transmitting the header. When that completes
02005 // handle_write_stream will be called, there start transmitting the file.
02006 int
02007 ACE_POSIX_Asynch_Transmit_Handler::transmit (void)
02008 {
02009   // No proactor is given for the <open>'s. Because we are using the
02010   // concrete implementations of the  Asynch_Operations, and we have
02011   // already given them the specific proactor, so they wont need the
02012   // general <proactor> interface pointer.
02013 
02014   // Open Asynch_Read_File.
02015   if (this->rf_.open (*this,
02016                       this->result_->file (),
02017                       0,
02018                       0) == -1)
02019     ACE_ERROR_RETURN ((LM_ERROR,
02020                        "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
02021                       -1);
02022 
02023   // Open Asynch_Write_Stream.
02024   if (this->ws_.open (*this,
02025                       this->result_->socket (),
02026                       0,
02027                       0) == -1)
02028     ACE_ERROR_RETURN ((LM_ERROR,
02029                        "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
02030                       -1);
02031 
02032   // Transmit the header.
02033   if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
02034                        this->result_->header_and_trailer ()->header_bytes (),
02035                        ACE_reinterpret_cast (void *, &this->header_act_),
02036                        0) == -1)
02037     ACE_ERROR_RETURN ((LM_ERROR,
02038                        "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
02039                       -1);
02040   return 0;
02041 }
02042 
02043 void
02044 ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
02045 {
02046   // Update bytes transferred so far.
02047   this->bytes_transferred_ += result.bytes_transferred ();
02048 
02049   // Check the success parameter.
02050   if (result.success () == 0)
02051     {
02052       // Failure.
02053 
02054       ACE_ERROR ((LM_ERROR,
02055                   "Asynch_Transmit_File failed.\n"));
02056 
02057       ACE_SEH_TRY
02058         {
02059           this->result_->complete (this->bytes_transferred_,
02060                                    0,      // Failure.
02061                                    0,      // @@ Completion key.
02062                                    0);     // @@ Error no.
02063         }
02064       ACE_SEH_FINALLY
02065         {
02066           // This is crucial to prevent memory leaks. This deletes
02067           // the result pointer also.
02068           delete this;
02069         }
02070     }
02071 
02072   // Write stream successful.
02073 
02074   // Partial write to socket.
02075   int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
02076   if (unsent_data != 0)
02077     {
02078       ACE_DEBUG ((LM_DEBUG,
02079                   "%N:%l:Partial write to socket: Asynch_write called again\n"));
02080 
02081       // Duplicate the message block and retry remaining data
02082       if (this->ws_.write (*result.message_block ().duplicate (),
02083                            unsent_data,
02084                            result.act (),
02085                            this->result_->priority (),
02086                            this->result_->signal_number ()) == -1)
02087         {
02088           // @@ Handle this error.
02089           ACE_ERROR ((LM_ERROR,
02090                       "Asynch_Transmit_Handler:write_stream failed\n"));
02091           return;
02092         }
02093 
02094       // @@ Handling *partial write* to a socket.  Let us not continue
02095       // further before this write finishes. Because proceeding with
02096       // another read and then write might change the order of the
02097       // file transmission, because partial write to the stream is
02098       // always possible.
02099       return;
02100     }
02101 
02102   // Not a partial write. A full write.
02103 
02104   // Check ACT to see what was sent.
02105   ACT act = * (ACT *) result.act ();
02106 
02107   switch (act)
02108     {
02109     case TRAILER_ACT:
02110       // If it is the "trailer" that is just sent, then transmit file
02111       // is complete.
02112       // Call the application handler.
02113       ACE_SEH_TRY
02114         {
02115           this->result_->complete (this->bytes_transferred_,
02116                                    1,      // @@ Success.
02117                                    0,      // @@ Completion key.
02118                                    0);     // @@ Errno.
02119         }
02120       ACE_SEH_FINALLY
02121         {
02122           delete this;
02123         }
02124       break;
02125 
02126     case HEADER_ACT:
02127     case DATA_ACT:
02128       // If header/data was sent, initiate the file data transmission.
02129       if (this->initiate_read_file () == -1)
02130         // @@ Handle this error.
02131         ACE_ERROR ((LM_ERROR,
02132                     "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
02133       break;
02134 
02135     default:
02136       // @@ Handle this error.
02137       ACE_ERROR ((LM_ERROR,
02138                   "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
02139     }
02140 }
02141 
02142 void
02143 ACE_POSIX_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
02144 {
02145   // Failure.
02146   if (result.success () == 0)
02147     {
02148       //
02149       ACE_SEH_TRY
02150         {
02151           this->result_->complete (this->bytes_transferred_,
02152                                    0,      // Failure.
02153                                    0,      // @@ Completion key.
02154                                    errno); // Error no.
02155         }
02156       ACE_SEH_FINALLY
02157         {
02158           delete this;
02159         }
02160       return;
02161     }
02162 
02163   // Read successful.
02164   if (result.bytes_transferred () == 0)
02165     return;
02166 
02167   // Increment offset.
02168   this->file_offset_ += result.bytes_transferred ();
02169 
02170   // Write data to network.
02171   if (this->ws_.write (result.message_block (),
02172                        result.bytes_transferred (),
02173                        (void *)&this->data_act_,
02174                        this->result_->priority (),
02175                        this->result_->signal_number ()) == -1)
02176     {
02177       // @@ Handle this error.
02178       ACE_ERROR ((LM_ERROR,
02179                   "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
02180       return;
02181     }
02182 }
02183 
02184 int
02185 ACE_POSIX_Asynch_Transmit_Handler::initiate_read_file (void)
02186 {
02187   // Is there something to read.
02188   if (this->file_offset_ >= this->file_size_)
02189     {
02190       // File is sent. Send the trailer.
02191       if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
02192                            this->result_->header_and_trailer ()->trailer_bytes (),
02193                            (void *)&this->trailer_act_,
02194                            this->result_->priority (),
02195                            this->result_->signal_number ()) == -1)
02196         ACE_ERROR_RETURN ((LM_ERROR,
02197                            "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
02198                           -1);
02199       return 0;
02200     }
02201   else
02202     {
02203       // @@ Is this right??
02204       // Previous reads and writes are over. For the new read, adjust
02205       // the wr_ptr and the rd_ptr to the beginning.
02206       this->mb_->rd_ptr (this->mb_->base ());
02207       this->mb_->wr_ptr (this->mb_->base ());
02208 
02209       // Inititiate an asynchronous read from the file.
02210       if (this->rf_.read (*this->mb_,
02211                           this->mb_->size () - 1,
02212                           this->file_offset_,
02213                           0, // @@ offset_high !!! if aiocb64 is used.
02214                           0, // Act
02215                           this->result_->priority (),
02216                           this->result_->signal_number ()) == -1)
02217         ACE_ERROR_RETURN ((LM_ERROR,
02218                            "Error:Asynch_Transmit_Handler::read from file failed\n"),
02219                           -1);
02220       return 0;
02221     }
02222 }
02223 
02224 // *********************************************************************
02225 
02226 ACE_POSIX_Asynch_Transmit_File::ACE_POSIX_Asynch_Transmit_File (ACE_POSIX_Proactor *posix_proactor)
02227   : ACE_Asynch_Operation_Impl (),
02228     ACE_Asynch_Transmit_File_Impl (),
02229     ACE_POSIX_Asynch_Operation (posix_proactor)
02230 {
02231 }
02232 
02233 int
02234 ACE_POSIX_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
02235                                                ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
02236                                                size_t bytes_to_write,
02237                                                u_long offset,
02238                                                u_long offset_high,
02239                                                size_t bytes_per_send,
02240                                                u_long flags,
02241                                                const void *act,
02242                                                int priority,
02243                                                int signal_number)
02244 {
02245   // Adjust these parameters if there are default values specified.
02246   ssize_t file_size = ACE_OS::filesize (file);
02247 
02248   if (file_size == -1)
02249     ACE_ERROR_RETURN ((LM_ERROR,
02250                        "Error:%N:%l:%p\n",
02251                        "POSIX_Asynch_Transmit_File:filesize failed"),
02252                       -1);
02253 
02254   if (bytes_to_write == 0)
02255     bytes_to_write = file_size;
02256 
02257   if (offset > (size_t) file_size)
02258     ACE_ERROR_RETURN ((LM_ERROR,
02259                        "Error:%p\n",
02260                        "Asynch_Transmit_File:File size is less than offset"),
02261                       -1);
02262 
02263   if (offset != 0)
02264     bytes_to_write = file_size - offset + 1;
02265 
02266   if (bytes_per_send == 0)
02267     bytes_per_send = bytes_to_write;
02268 
02269   // Configure the result parameter.
02270   ACE_POSIX_Asynch_Transmit_File_Result *result = 0;
02271 
02272   ACE_NEW_RETURN (result,
02273                   ACE_POSIX_Asynch_Transmit_File_Result (*this->handler_,
02274                                                          this->handle_,
02275                                                          file,
02276                                                          header_and_trailer,
02277                                                          bytes_to_write,
02278                                                          offset,
02279                                                          offset_high,
02280                                                          bytes_per_send,
02281                                                          flags,
02282                                                          act,
02283                                                          this->posix_proactor ()->get_handle (),
02284                                                          priority,
02285                                                          signal_number),
02286                   -1);
02287 
02288   // Make the auxillary handler and initiate transmit.
02289   ACE_POSIX_Asynch_Transmit_Handler *transmit_handler = 0;
02290 
02291   ACE_NEW_RETURN (transmit_handler,
02292                   ::ACE_POSIX_Asynch_Transmit_Handler (this->posix_proactor (),
02293                                                              result),
02294                   -1);
02295 
02296   ssize_t return_val = transmit_handler->transmit ();
02297 
02298   if (return_val == -1)
02299     // This deletes the <result> in it too.
02300     delete transmit_handler;
02301 
02302   return 0;
02303 }
02304 
02305 ACE_POSIX_Asynch_Transmit_File::~ACE_POSIX_Asynch_Transmit_File (void)
02306 {
02307 }
02308 
02309 // *********************************************************************
02310 size_t
02311 ACE_POSIX_Asynch_Read_Dgram_Result::bytes_to_read (void) const
02312 {
02313   return this->bytes_to_read_;
02314 }
02315 
02316 int
02317 ACE_POSIX_Asynch_Read_Dgram_Result::remote_address (ACE_Addr& addr) const
02318 {
02319   int retVal = -1;  // failure
02320 
02321   // make sure the addresses are of the same type
02322   if (addr.get_type () == this->remote_address_->get_type ())
02323   { // copy the remote_address_ into addr
02324     addr.set_addr (this->remote_address_->get_addr (),
02325                    this->remote_address_->get_size ());
02326     retVal = 0; // success
02327   }
02328 
02329   return retVal;
02330 }
02331 
02332 sockaddr *
02333 ACE_POSIX_Asynch_Read_Dgram_Result::saddr () const
02334 {
02335   return (sockaddr *) this->remote_address_->get_addr ();
02336 }
02337 
02338 
02339 int
02340 ACE_POSIX_Asynch_Read_Dgram_Result::flags (void) const
02341 {
02342   return this->flags_;
02343 }
02344 
02345 ACE_HANDLE
02346 ACE_POSIX_Asynch_Read_Dgram_Result::handle (void) const
02347 {
02348   return this->handle_;
02349 }
02350 
02351 ACE_Message_Block*
02352 ACE_POSIX_Asynch_Read_Dgram_Result::message_block () const
02353 {
02354   return this->message_block_;
02355 }
02356 
02357 ACE_POSIX_Asynch_Read_Dgram_Result::ACE_POSIX_Asynch_Read_Dgram_Result
02358   (ACE_Handler &handler,
02359    ACE_HANDLE handle,
02360    ACE_Message_Block *message_block,
02361    size_t bytes_to_read,
02362    int flags,
02363    int protocol_family,
02364    const void* act,
02365    ACE_HANDLE event,
02366    int priority,
02367    int signal_number)
02368 
02369   : ACE_Asynch_Result_Impl (),
02370     ACE_Asynch_Read_Dgram_Result_Impl(),
02371     ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
02372     bytes_to_read_ (bytes_to_read),
02373     message_block_ (message_block),
02374     remote_address_ (0),
02375     addr_len_ (0),
02376     flags_ (flags),
02377     handle_ (handle)
02378 {
02379   ACE_UNUSED_ARG (protocol_family);
02380   this->aio_fildes = handle;
02381   this->aio_nbytes = bytes_to_read;
02382   ACE_NEW (this->remote_address_, ACE_INET_Addr);
02383 }
02384 
02385 void
02386 ACE_POSIX_Asynch_Read_Dgram_Result::complete (size_t bytes_transferred,
02387                                               int success,
02388                                               const void *completion_key,
02389                                               u_long error)
02390 {
02391   // Copy the data which was returned by GetQueuedCompletionStatus
02392   this->bytes_transferred_ = bytes_transferred;
02393   this->success_ = success;
02394   this->completion_key_ = completion_key;
02395   this->error_ = error;
02396 
02397   // <errno> is available in the aiocb.
02398   ACE_UNUSED_ARG (error);
02399 
02400  this->remote_address_->set_size(this->addr_len_);
02401 
02402   // Create the interface result class.
02403   ACE_Asynch_Read_Dgram::Result result (this);
02404 
02405   // Call the application handler.
02406   this->handler_.handle_read_dgram (result);
02407 }
02408 
02409 ACE_POSIX_Asynch_Read_Dgram_Result::~ACE_POSIX_Asynch_Read_Dgram_Result (void)
02410 {
02411   delete this->remote_address_;
02412 }
02413 
02414 //***************************************************************************
02415 size_t
02416 ACE_POSIX_Asynch_Write_Dgram_Result::bytes_to_write (void) const
02417 {
02418   return this->bytes_to_write_;
02419 }
02420 
02421 int
02422 ACE_POSIX_Asynch_Write_Dgram_Result::flags (void) const
02423 {
02424   return this->flags_;
02425 }
02426 
02427 ACE_HANDLE
02428 ACE_POSIX_Asynch_Write_Dgram_Result::handle (void) const
02429 {
02430   return this->handle_;
02431 }
02432 
02433 
02434 ACE_Message_Block*
02435 ACE_POSIX_Asynch_Write_Dgram_Result::message_block () const
02436 {
02437   return this->message_block_;
02438 }
02439 
02440 ACE_POSIX_Asynch_Write_Dgram_Result::ACE_POSIX_Asynch_Write_Dgram_Result
02441   (ACE_Handler &handler,
02442    ACE_HANDLE handle,
02443    ACE_Message_Block *message_block,
02444    size_t bytes_to_write,
02445    int flags,
02446    const void* act,
02447    ACE_HANDLE event,
02448    int priority,
02449    int signal_number)
02450 
02451   : ACE_Asynch_Result_Impl (),
02452     ACE_Asynch_Write_Dgram_Result_Impl(),
02453     ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
02454     bytes_to_write_ (bytes_to_write),
02455     message_block_ (message_block),
02456     flags_ (flags),
02457     handle_ (handle)
02458 
02459 {
02460   this->aio_fildes = handle;
02461   this->aio_nbytes = bytes_to_write;
02462 }
02463 
02464 void
02465 ACE_POSIX_Asynch_Write_Dgram_Result::complete (size_t bytes_transferred,
02466                                                int success,
02467                                                const void *completion_key,
02468                                                u_long error)
02469 {
02470   // Copy the data which was returned by GetQueuedCompletionStatus
02471   this->bytes_transferred_ = bytes_transferred;
02472   this->success_ = success;
02473   this->completion_key_ = completion_key;
02474   this->error_ = error;
02475 
02476   // <errno> is available in the aiocb.
02477   ACE_UNUSED_ARG (error);
02478 
02479   // Appropriately move the pointers in the message block.
02480   //this->message_block_.wr_ptr (bytes_transferred);
02481 
02482   // Create the interface result class.
02483   ACE_Asynch_Write_Dgram::Result result (this);
02484 
02485   // Call the application handler.
02486   this->handler_.handle_write_dgram (result);
02487 }
02488 
02489 ACE_POSIX_Asynch_Write_Dgram_Result::~ACE_POSIX_Asynch_Write_Dgram_Result (void)
02490 {
02491 }
02492 
02493 /***************************************************************************/
02494 ACE_POSIX_Asynch_Read_Dgram::~ACE_POSIX_Asynch_Read_Dgram (void)
02495 {
02496 }
02497 
02498 ssize_t
02499 ACE_POSIX_Asynch_Read_Dgram::recv (ACE_Message_Block *message_block,
02500                                    size_t &number_of_bytes_recvd,
02501                                    int flags,
02502                                    int protocol_family,
02503                                    const void *act,
02504                                    int priority,
02505                                    int signal_number)
02506 {
02507   ACE_UNUSED_ARG (message_block);
02508   ACE_UNUSED_ARG (number_of_bytes_recvd);
02509   ACE_UNUSED_ARG (flags);
02510   ACE_UNUSED_ARG (protocol_family);
02511   ACE_UNUSED_ARG (act);
02512   ACE_UNUSED_ARG (priority);
02513   ACE_UNUSED_ARG (signal_number);
02514   ACE_NOTSUP_RETURN (-1);
02515 }
02516 
02517 ACE_POSIX_Asynch_Read_Dgram::ACE_POSIX_Asynch_Read_Dgram (ACE_POSIX_Proactor *posix_proactor)
02518   : ACE_Asynch_Operation_Impl (),
02519     ACE_Asynch_Read_Dgram_Impl (),
02520     ACE_POSIX_Asynch_Operation (posix_proactor)
02521 {
02522 }
02523 
02524 //***************************************************************************
02525 
02526 ACE_POSIX_Asynch_Write_Dgram::~ACE_POSIX_Asynch_Write_Dgram (void)
02527 {
02528 }
02529 
02530 ssize_t
02531 ACE_POSIX_Asynch_Write_Dgram::send (ACE_Message_Block *message_block,
02532                                     size_t &number_of_bytes_sent,
02533                                     int flags,
02534                                     const ACE_Addr &addr,
02535                                     const void *act,
02536                                     int priority,
02537                                     int signal_number)
02538 {
02539   ACE_UNUSED_ARG (message_block);
02540   ACE_UNUSED_ARG (number_of_bytes_sent);
02541   ACE_UNUSED_ARG (flags);
02542   ACE_UNUSED_ARG (addr);
02543   ACE_UNUSED_ARG (act);
02544   ACE_UNUSED_ARG (priority);
02545   ACE_UNUSED_ARG (signal_number);
02546   ACE_NOTSUP_RETURN (-1);
02547 }
02548 
02549 ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram
02550   (ACE_POSIX_Proactor *posix_proactor)
02551   : ACE_Asynch_Operation_Impl (),
02552     ACE_Asynch_Write_Dgram_Impl (),
02553     ACE_POSIX_Asynch_Operation (posix_proactor)
02554 {
02555 }
02556 
02557 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
02558 
02559 template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>;
02560 template class ACE_Node<ACE_POSIX_Asynch_Accept_Result *>;
02561 template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Accept_Result *>;
02562 
02563 template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *>;
02564 template class ACE_Node<ACE_POSIX_Asynch_Result *>;
02565 template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>;
02566 
02567 template class ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *>;
02568 template class ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
02569 template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
02570 template class ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
02571 template class ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
02572 template class ACE_Map_Const_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
02573 template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
02574 
02575 #elif  defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
02576 
02577 #pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>
02578 #pragma instantiate ACE_Node<ACE_POSIX_Asynch_Accept_Result *>
02579 #pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Accept_Result *>
02580 
02581 #pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *>
02582 #pragma instantiate ACE_Node<ACE_POSIX_Asynch_Result *>
02583 #pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>
02584 
02585 #pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *>
02586 #pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
02587 #pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
02588 #pragma instantiate ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
02589 #pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
02590 #pragma instantiate ACE_Map_Const_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
02591 #pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
02592 
02593 
02594 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
02595 
02596 
02597 #endif /* ACE_HAS_AIO_CALLS */

Generated on Mon Jun 16 11:20:50 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002