Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ACE_POSIX_AIOCB_Proactor Class Reference

This Proactor makes use of Asynchronous I/O Control Blocks (AIOCB) to notify/get the completion status of the <aio_> operations issued. More...

#include <POSIX_Proactor.h>

Inheritance diagram for ACE_POSIX_AIOCB_Proactor:

Inheritance graph
[legend]
Collaboration diagram for ACE_POSIX_AIOCB_Proactor:

Collaboration graph
[legend]
List of all members.

Public Methods

 ACE_POSIX_AIOCB_Proactor (size_t nmaxop=ACE_AIO_DEFAULT_SIZE)
 Constructor defines max number asynchronous operations which can be started at the same time. More...

virtual Proactor_Type get_impl_type (void)
virtual ~ACE_POSIX_AIOCB_Proactor (void)
 Destructor. More...

virtual int close (void)
 Close down the Proactor. More...

virtual int handle_events (ACE_Time_Value &wait_time)
virtual int handle_events (void)
virtual int post_completion (ACE_POSIX_Asynch_Result *result)
 Post a result to the completion port of the Proactor. More...

virtual int start_aio (ACE_POSIX_Asynch_Result *result, ACE_POSIX_Proactor::Opcode op)
virtual int cancel_aio (ACE_HANDLE h)

Protected Methods

 ACE_POSIX_AIOCB_Proactor (size_t nmaxop, ACE_POSIX_Proactor::Proactor_Type ptype)
 Special constructor for ACE_SUN_Proactor and ACE_POSIX_SIG_Proactor. More...

virtual int get_result_status (ACE_POSIX_Asynch_Result *asynch_result, int &error_status, size_t &transfer_count)
 Check AIO for completion, error and result status Return: 1 - AIO completed , 0 - not completed yet. More...

int create_result_aiocb_list (void)
 Create aiocb list. More...

int delete_result_aiocb_list (void)
 Call this method from derived class when virtual table is built. More...

void create_notify_manager (void)
 Call these methods from derived class when virtual table is built. More...

void delete_notify_manager (void)
void check_max_aio_num (void)
 Define the maximum number of asynchronous I/O requests for the current OS. More...

void set_notify_handle (ACE_HANDLE h)
 To identify requests from Notify_Pipe_Manager. More...

int handle_events_i (u_long milli_seconds)
int start_deferred_aio (void)
 Start deferred AIO if necessary. More...

virtual int cancel_aiocb (ACE_POSIX_Asynch_Result *result)
 Cancel running or deferred AIO. More...

ACE_POSIX_Asynch_Resultfind_completed_aio (int &error_status, size_t &transfer_count, size_t &index, size_t &count)
 Extract the results of aio. More...

virtual ssize_t allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
 Find free slot to store result and aiocb pointer. More...

virtual int start_aio_i (ACE_POSIX_Asynch_Result *result)
 Initiate an aio operation. More...

virtual int notify_completion (int sig_num)
 Notify queue of "post_completed" ACE_POSIX_Asynch_Results called from post_completion method. More...

int putq_result (ACE_POSIX_Asynch_Result *result)
 Put "post_completed" result into the internal queue. More...

ACE_POSIX_Asynch_Resultgetq_result (void)
 Get "post_completed" result from the internal queue. More...

int clear_result_queue (void)
 Clear the internal results queue. More...

int process_result_queue (void)
 Process the internal results queue. More...


Protected Attributes

ACE_AIOCB_Notify_Pipe_Manageraiocb_notify_pipe_manager_
 This class takes care of doing <accept> when we use AIO_CONTROL_BLOCKS strategy. More...

aiocb ** aiocb_list_
 Use a dynamically allocated array to keep track of all the aio's issued currently. More...

ACE_POSIX_Asynch_Result ** result_list_
size_t aiocb_list_max_size_
 To maintain the maximum size of the array (list). More...

size_t aiocb_list_cur_size_
 To maintain the current size of the array (list). More...

ACE_SYNCH_MUTEX mutex_
 Mutex to protect work with lists. More...

ACE_HANDLE notify_pipe_read_handle_
 The purpose of this member is only to identify asynchronous request from NotifyManager. We will reserve for it always slot 0 in the list of aiocb's to be sure that don't lose notifications. More...

size_t num_deferred_aiocb_
 Number of ACE_POSIX_Asynch_Result's waiting for start i.e. deferred AIOs. More...

size_t num_started_aio_
 Number active,i.e. running requests. More...

ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * > result_queue_
 Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's. More...


Friends

class ACE_AIOCB_Notify_Pipe_Manager
 Handler needs to call application specific code. More...

class ACE_POSIX_Asynch_Operation
 This class does the registering of Asynch Operations with the Proactor which is necessary in the AIOCB strategy. More...

class ACE_POSIX_Asynch_Accept
class ACE_POSIX_Asynch_Connect

Detailed Description

This Proactor makes use of Asynchronous I/O Control Blocks (AIOCB) to notify/get the completion status of the <aio_> operations issued.

Definition at line 326 of file POSIX_Proactor.h.


Constructor & Destructor Documentation

ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor size_t    nmaxop = ACE_AIO_DEFAULT_SIZE
 

Constructor defines max number asynchronous operations which can be started at the same time.

Definition at line 752 of file POSIX_Proactor.cpp.

References check_max_aio_num, create_notify_manager, create_result_aiocb_list, ACE_POSIX_Proactor::get_asynch_pseudo_task, and ACE_Asynch_Pseudo_Task::start.

00753   : aiocb_notify_pipe_manager_ (0),
00754     aiocb_list_ (0),
00755     result_list_ (0),
00756     aiocb_list_max_size_ (max_aio_operations),
00757     aiocb_list_cur_size_ (0),
00758     notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00759     num_deferred_aiocb_ (0),
00760     num_started_aio_ (0)
00761 {
00762   // Check for correct value for max_aio_operations
00763   check_max_aio_num ();
00764 
00765   this->create_result_aiocb_list ();
00766 
00767   this->create_notify_manager ();
00768 
00769   // start pseudo-asynchronous accept task
00770   // one per all future acceptors
00771   this->get_asynch_pseudo_task().start ();
00772 
00773 }

ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor void    [virtual]
 

Destructor.

Definition at line 797 of file POSIX_Proactor.cpp.

References close.

00798 {
00799   this->close();
00800 }

ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor size_t    nmaxop,
ACE_POSIX_Proactor::Proactor_Type    ptype
[protected]
 

Special constructor for ACE_SUN_Proactor and ACE_POSIX_SIG_Proactor.

Definition at line 776 of file POSIX_Proactor.cpp.

References check_max_aio_num, create_result_aiocb_list, and ACE_POSIX_Proactor::Proactor_Type.

00778   : aiocb_notify_pipe_manager_ (0),
00779     aiocb_list_ (0),
00780     result_list_ (0),
00781     aiocb_list_max_size_ (max_aio_operations),
00782     aiocb_list_cur_size_ (0),
00783     notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
00784     num_deferred_aiocb_ (0),
00785     num_started_aio_ (0)
00786 {
00787   //check for correct value for max_aio_operations
00788   this->check_max_aio_num ();
00789 
00790   this->create_result_aiocb_list ();
00791 
00792   // @@ We should create Notify_Pipe_Manager in the derived class to
00793   // provide correct calls for virtual functions !!!
00794 }


Member Function Documentation

ssize_t ACE_POSIX_AIOCB_Proactor::allocate_aio_slot ACE_POSIX_Asynch_Result   result [protected, virtual]
 

Find free slot to store result and aiocb pointer.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1340 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, aiocb_list_max_size_, LM_ERROR, notify_pipe_read_handle_, result_list_, and ssize_t.

Referenced by ACE_POSIX_CB_Proactor::allocate_aio_slot, and start_aio.

01341 {
01342   size_t i = 0;
01343 
01344   // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
01345   // so make check for ACE_AIOCB_Notify_Pipe_Manager request
01346 
01347   if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ?
01348     {                                       // should be free,
01349       if (result_list_[i] != 0)           // only 1 request
01350         {                                   // is allowed
01351           errno   = EAGAIN;
01352           ACE_ERROR_RETURN ((LM_ERROR,
01353                      "%N:%l:(%P | %t)::\n"
01354                      "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01355                      "internal Proactor error 0\n"),
01356                      -1);
01357         }
01358     }
01359   else  //try to find free slot as usual, but starting from 1
01360     {
01361       for (i= 1; i < this->aiocb_list_max_size_; i++)
01362         if (result_list_[i] == 0)
01363           break;
01364     }
01365 
01366   if (i >= this->aiocb_list_max_size_)
01367     ACE_ERROR_RETURN ((LM_ERROR,
01368               "%N:%l:(%P | %t)::\n"
01369               "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
01370               "internal Proactor error 1\n"),
01371               -1);
01372 
01373   //setup OS notification methods for this aio
01374   result->aio_sigevent.sigev_notify = SIGEV_NONE;
01375 
01376   return ACE_static_cast (ssize_t, i);
01377 }

int ACE_POSIX_AIOCB_Proactor::cancel_aio ACE_HANDLE    h [virtual]
 

This method should be called from ACE_POSIX_Asynch_Operation::cancel() instead of usual aio_cancel. For all deferred AIO requests with handle "h" it removes its from the lists and notifies user. For all running AIO requests with handle "h" it calls aio_cancel. According to the POSIX standards we will receive ECANCELED for all aio_canceled AIO requests later on return from aio_suspend

Implements ACE_POSIX_Proactor.

Definition at line 1490 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_MT, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, cancel_aiocb, num_deferred_aiocb_, putq_result, result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred, and ACE_POSIX_Asynch_Result::set_error.

01491 {
01492   // This new method should be called from
01493   // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
01494   // It scans the result_list_ and defines all AIO requests
01495   // that were issued for handle "handle"
01496   //
01497   // For all deferred AIO requests with handle "handle"
01498   // it removes its from the lists and notifies user
01499   //
01500   // For all running AIO requests with handle "handle"
01501   // it calls ::aio_cancel. According to the POSIX standards
01502   // we will receive ECANCELED  for all ::aio_canceled AIO requests
01503   // later on return from ::aio_suspend
01504 
01505   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
01506 
01507   int    num_total     = 0;
01508   int    num_cancelled = 0;
01509 
01510   {
01511     ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01512 
01513     size_t ai = 0;
01514 
01515     for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
01516       {
01517         if (this->result_list_[ai] == 0)    // Skip empty slot
01518           continue;
01519 
01520         if (this->result_list_[ai]->aio_fildes != handle)  // Not ours
01521           continue;
01522 
01523         num_total++;
01524 
01525         ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
01526 
01527         if (this->aiocb_list_[ai] == 0)  // Canceling a deferred operation
01528           {
01529             num_cancelled++;
01530             this->num_deferred_aiocb_--;
01531 
01532             this->aiocb_list_[ai] = 0;
01533             this->result_list_[ai] = 0;
01534             this->aiocb_list_cur_size_--;
01535 
01536             asynch_result->set_error (ECANCELED);
01537             asynch_result->set_bytes_transferred (0);
01538             this->putq_result (asynch_result);
01539             // we are with locked mutex_ here !
01540           }
01541         else      // Cancel started aio
01542           {
01543             int rc_cancel = this->cancel_aiocb (asynch_result);
01544 
01545             if (rc_cancel == 0)    //notification in the future
01546               num_cancelled++;     //it is OS responsiblity
01547           }
01548       }
01549 
01550   } // release mutex_
01551 
01552   if (num_total == 0)
01553     return 1;  // ALLDONE
01554 
01555   if (num_cancelled == num_total)
01556     return 0;  // CANCELLED
01557 
01558   return 2; // NOT CANCELLED
01559 }

int ACE_POSIX_AIOCB_Proactor::cancel_aiocb ACE_POSIX_Asynch_Result   result [protected, virtual]
 

Cancel running or deferred AIO.

Definition at line 1562 of file POSIX_Proactor.cpp.

Referenced by cancel_aio, and delete_result_aiocb_list.

01563 {
01564   // This method is called from cancel_aio
01565   // to cancel a previously submitted AIO request
01566   int rc = ::aio_cancel (0, result);
01567 
01568   // Check the return value and return 0/1/2 appropriately.
01569   if (rc == AIO_CANCELED)
01570     return 0;
01571   else if (rc == AIO_ALLDONE)
01572     return 1;
01573   else // (rc == AIO_NOTCANCELED)
01574     return 2;
01575 }

void ACE_POSIX_AIOCB_Proactor::check_max_aio_num void    [protected]
 

Define the maximum number of asynchronous I/O requests for the current OS.

Definition at line 919 of file POSIX_Proactor.cpp.

References _SC_AIO_MAX, ACE_AIO_MAX_SIZE, ACE_DEBUG, ACE_LIB_TEXT, aiocb_list_max_size_, LM_DEBUG, ACE::max_handles, ACE::set_handle_limit, and ACE_OS::sysconf.

Referenced by ACE_POSIX_AIOCB_Proactor.

00920 {
00921   long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
00922 
00923   // Define max limit AIO's for concrete OS
00924   // -1 means that there is no limit, but it is not true
00925   // (example, SunOS 5.6)
00926 
00927   if (max_os_aio_num > 0 &&
00928       aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
00929      aiocb_list_max_size_ = max_os_aio_num;
00930 
00931 #if defined (HPUX)
00932   // Although HPUX 11.00 allows to start 2048 AIO's
00933   // for all process in system
00934   // it has a limit 256 max elements for aio_suspend ()
00935   // It is a pity, but ...
00936 
00937   long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
00938   if (max_os_listio_num > 0
00939       && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
00940     aiocb_list_max_size_ = max_os_listio_num;
00941 #endif /* HPUX */
00942 
00943   // check for user-defined value
00944   // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
00945 
00946   if (aiocb_list_max_size_ <= 0
00947      || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
00948      aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
00949 
00950   // check for max number files to open
00951 
00952   int max_num_files = ACE::max_handles ();
00953 
00954   if (max_num_files > 0
00955       && aiocb_list_max_size_ > (unsigned long) max_num_files)
00956     {
00957       ACE::set_handle_limit (aiocb_list_max_size_);
00958 
00959       max_num_files = ACE::max_handles ();
00960     }
00961 
00962   if (max_num_files > 0
00963       && aiocb_list_max_size_ > (unsigned long) max_num_files)
00964     aiocb_list_max_size_ = (unsigned long) max_num_files;
00965 
00966   ACE_DEBUG ((LM_DEBUG,
00967              "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
00968               aiocb_list_max_size_));
00969 
00970 #if defined(__sgi)
00971 
00972    ACE_DEBUG((LM_DEBUG,
00973               ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
00974 
00975 //typedef struct aioinit {
00976 //    int aio_threads;  /* The number of aio threads to start (5) */
00977 //    int aio_locks;    /* Initial number of preallocated locks (3) */
00978 //    int aio_num;      /* estimated total simultanious aiobc structs (1000) */
00979 //    int aio_usedba;   /* Try to use DBA for raw I/O in lio_listio (0) */
00980 //    int aio_debug;    /* turn on debugging (0) */
00981 //    int aio_numusers; /* max number of user sprocs making aio_* calls (5) */
00982 //    int aio_reserved[3];
00983 //} aioinit_t;
00984 
00985     aioinit_t  aioinit;
00986 
00987     aioinit.aio_threads = 10; /* The number of aio threads to start (5) */
00988     aioinit.aio_locks = 20;   /* Initial number of preallocated locks (3) */
00989                        /* estimated total simultaneous aiobc structs (1000) */
00990     aioinit.aio_num = aiocb_list_max_size_;
00991     aioinit.aio_usedba = 0;   /* Try to use DBA for raw IO in lio_listio (0) */
00992     aioinit.aio_debug = 0;    /* turn on debugging (0) */
00993     aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */
00994     aioinit.aio_reserved[0] = 0;
00995     aioinit.aio_reserved[1] = 0;
00996     aioinit.aio_reserved[2] = 0;
00997 
00998     aio_sgi_init (&aioinit);
00999 
01000 #endif
01001 
01002     return;
01003 }

int ACE_POSIX_AIOCB_Proactor::clear_result_queue void    [protected]
 

Clear the internal results queue.

Definition at line 1100 of file POSIX_Proactor.cpp.

References getq_result.

Referenced by close.

01101 {
01102   int ret_val = 0;
01103   ACE_POSIX_Asynch_Result* result = 0;
01104 
01105   while ((result = this->getq_result ()) != 0)
01106     {
01107       delete result;
01108       ret_val++;
01109     }
01110 
01111   return ret_val;
01112 }

int ACE_POSIX_AIOCB_Proactor::close void    [virtual]
 

Close down the Proactor.

Reimplemented from ACE_POSIX_Proactor.

Definition at line 803 of file POSIX_Proactor.cpp.

References clear_result_queue, delete_notify_manager, delete_result_aiocb_list, ACE_POSIX_Proactor::get_asynch_pseudo_task, and ACE_Asynch_Pseudo_Task::stop.

Referenced by ~ACE_POSIX_AIOCB_Proactor, and ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor.

00804 {
00805   // stop asynch accept task
00806   this->get_asynch_pseudo_task().stop ();
00807 
00808   this->delete_notify_manager ();
00809 
00810   this->clear_result_queue ();
00811 
00812   return this->delete_result_aiocb_list ();
00813 }

void ACE_POSIX_AIOCB_Proactor::create_notify_manager void    [protected]
 

Call these methods from derived class when virtual table is built.

Definition at line 1006 of file POSIX_Proactor.cpp.

References ACE_NEW, and aiocb_notify_pipe_manager_.

Referenced by ACE_POSIX_AIOCB_Proactor.

01007 {
01008   // Remember! this issues a Asynch_Read
01009   // on the notify pipe for doing the Asynch_Accept/Connect.
01010 
01011   if (aiocb_notify_pipe_manager_ == 0)
01012     ACE_NEW (aiocb_notify_pipe_manager_,
01013              ACE_AIOCB_Notify_Pipe_Manager (this));
01014 }

int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list void    [protected]
 

Create aiocb list.

Definition at line 820 of file POSIX_Proactor.cpp.

References ACE_NEW_RETURN, aiocb_list_, aiocb_list_max_size_, and result_list_.

Referenced by ACE_POSIX_AIOCB_Proactor.

00821 {
00822   if (aiocb_list_ != 0)
00823     return 0;
00824 
00825   ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
00826 
00827   ACE_NEW_RETURN (result_list_,
00828                   ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
00829                   -1);
00830 
00831   // Initialize the array.
00832   for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
00833     {
00834       aiocb_list_[ai] = 0;
00835       result_list_[ai] = 0;
00836     }
00837 
00838   return 0;
00839 }

void ACE_POSIX_AIOCB_Proactor::delete_notify_manager void    [protected]
 

Definition at line 1017 of file POSIX_Proactor.cpp.

References aiocb_notify_pipe_manager_.

Referenced by close.

01018 {
01019   // We are responsible for delete as all pointers set to 0 after
01020   // delete, it is save to delete twice
01021 
01022   delete aiocb_notify_pipe_manager_;
01023   aiocb_notify_pipe_manager_ = 0;
01024 }

int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list void    [protected]
 

Call this method from derived class when virtual table is built.

Definition at line 841 of file POSIX_Proactor.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_LIB_TEXT, aiocb_list_, aiocb_list_max_size_, cancel_aiocb, get_result_status, LM_DEBUG, LM_ERROR, result_list_, and ACE_OS_String::strerror.

Referenced by close.

00842 {
00843   if (aiocb_list_ == 0)  // already deleted
00844     return 0;
00845 
00846   size_t ai;
00847 
00848   // Try to cancel all uncomlpeted operarion POSIX systems may have
00849   // hidden system threads that still can work with our aiocb's!
00850   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00851     if (this->aiocb_list_[ai] != 0)  // active operation
00852       this->cancel_aiocb (result_list_[ai]);
00853 
00854   int num_pending = 0;
00855 
00856   for (ai = 0; ai < aiocb_list_max_size_; ai++)
00857     {
00858       if (this->aiocb_list_[ai] == 0 ) //  not active operation
00859         continue;
00860 
00861       // Get the error and return status of the aio_ operation.
00862       int error_status  = 0;
00863       size_t transfer_count = 0;
00864       int flg_completed = this->get_result_status (result_list_[ai],
00865                                                    error_status,
00866                                                    transfer_count);
00867 
00868       //don't delete uncompleted AIOCB's
00869       if (flg_completed == 0)  // not completed !!!
00870         {
00871           num_pending++;
00872 #if 0
00873           char * errtxt = ACE_OS::strerror (error_status);
00874           if (errtxt == 0)
00875               errtxt ="?????????";
00876 
00877           char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
00878             "WRITE":"READ" ;
00879 
00880 
00881           ACE_ERROR ((LM_ERROR,
00882                       ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
00883                       ai,
00884                       op,
00885                       error_status,
00886                       transfer_count,
00887                       errtxt));
00888 #endif /* 0 */
00889         }
00890       else // completed , OK
00891         {
00892           delete this->result_list_[ai];
00893           this->result_list_[ai] = 0;
00894           this->aiocb_list_[ai] = 0;
00895         }
00896     }
00897 
00898   // If it is not possible cancel some operation (num_pending > 0 ),
00899   // we can do only one thing -report about this
00900   // and complain about POSIX implementation.
00901   // We know that we have memory leaks, but it is better than
00902   // segmentation fault!
00903   ACE_DEBUG
00904     ((LM_DEBUG,
00905       ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
00906       ACE_LIB_TEXT(" number pending AIO=%d\n"),
00907       num_pending));
00908 
00909   delete [] this->aiocb_list_;
00910   this->aiocb_list_ = 0;
00911 
00912   delete [] this->result_list_;
00913   this->result_list_ = 0;
00914 
00915   return (num_pending == 0 ? 0 : -1);
00916   // ?? or just always return 0;
00917 }

ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio int &    error_status,
size_t &    transfer_count,
size_t &    index,
size_t &    count
[protected]
 

Extract the results of aio.

Definition at line 1220 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_MT, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, get_result_status, num_started_aio_, result_list_, and start_deferred_aio.

Referenced by handle_events_i.

01224 {
01225   // parameter index defines initial slot to scan
01226   // parameter count tells us how many slots should we scan
01227 
01228   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
01229 
01230   ACE_POSIX_Asynch_Result *asynch_result = 0;
01231 
01232   if (num_started_aio_ == 0)  // save time
01233     return 0;
01234 
01235   for (; count > 0; index++ , count--)
01236     {
01237       if (index >= aiocb_list_max_size_) // like a wheel
01238         index = 0;
01239 
01240       if (aiocb_list_[index] == 0) // Dont process null blocks.
01241         continue;
01242 
01243       if (0 != this->get_result_status (result_list_[index],
01244                                         error_status,
01245                                         transfer_count))  // completed
01246         break;
01247 
01248     } // end for
01249 
01250   if (count == 0) // all processed , nothing found
01251     return 0;
01252   asynch_result = result_list_[index];
01253 
01254   aiocb_list_[index] = 0;
01255   result_list_[index] = 0;
01256   aiocb_list_cur_size_--;
01257 
01258   num_started_aio_--;  // decrement count active aios
01259   index++;             // for next iteration
01260   count--;             // for next iteration
01261 
01262   this->start_deferred_aio ();
01263   //make attempt to start deferred AIO
01264   //It is safe as we are protected by mutex_
01265 
01266   return asynch_result;
01267 }

ACE_INLINE ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_AIOCB_Proactor::get_impl_type void    [virtual]
 

Reimplemented from ACE_POSIX_Proactor.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 11 of file POSIX_Proactor.i.

References ACE_POSIX_Proactor::PROACTOR_AIOCB, and ACE_POSIX_Proactor::Proactor_Type.

00012 {
00013   return PROACTOR_AIOCB;
00014 } 

int ACE_POSIX_AIOCB_Proactor::get_result_status ACE_POSIX_Asynch_Result   asynch_result,
int &    error_status,
size_t &    transfer_count
[protected, virtual]
 

Check AIO for completion, error and result status Return: 1 - AIO completed , 0 - not completed yet.

Definition at line 1201 of file POSIX_Proactor.cpp.

References EINPROGRESS, and ssize_t.

Referenced by delete_result_aiocb_list, and find_completed_aio.

01204 {
01205   transfer_count = 0;
01206 
01207   // Get the error status of the aio_ operation.
01208   error_status  = aio_error (asynch_result);
01209   if (error_status == EINPROGRESS)
01210     return 0;  // not completed
01211 
01212   ssize_t op_return = aio_return (asynch_result);
01213   if (op_return > 0)
01214     transfer_count = ACE_static_cast (size_t, op_return);
01215   // else transfer_count is already 0, error_status reports the error.
01216   return 1; // completed
01217 }

ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result void    [protected]
 

Get "post_completed" result from the internal queue.

Definition at line 1080 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_MT, ACE_SYNCH_MUTEX, ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::dequeue_head, and result_queue_.

Referenced by clear_result_queue, and process_result_queue.

01081 {
01082   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
01083 
01084 
01085   ACE_POSIX_Asynch_Result* result = 0;
01086 
01087   if (this->result_queue_.dequeue_head (result) != 0)
01088     return 0;
01089 
01090 //  don;t waste time if queue is empty - it is normal
01091 //  or check queue size before dequeue_head
01092 //    ACE_ERROR_RETURN ((LM_ERROR,
01093 //                       "%N:%l:(%P | %t):%p\n",
01094 //                       "ACE_POSIX_AIOCB_Proactor::getq_result failed"),
01095 //                      0);
01096 
01097   return result;
01098 }

int ACE_POSIX_AIOCB_Proactor::handle_events void    [virtual]
 

Block indefinitely until at least one event is dispatched. Dispatch a single set of events. If <wait_time> elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly.

Implements ACE_POSIX_Proactor.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1035 of file POSIX_Proactor.cpp.

References ACE_INFINITE, and handle_events_i.

01036 {
01037   return this->handle_events_i (ACE_INFINITE);
01038 }

int ACE_POSIX_AIOCB_Proactor::handle_events ACE_Time_Value   wait_time [virtual]
 

Dispatch a single set of events. If <wait_time> elapses before any events occur, return 0. Return 1 on success i.e., when a completion is dispatched, non-zero (-1) on errors and errno is set accordingly.

Implements ACE_POSIX_Proactor.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1027 of file POSIX_Proactor.cpp.

References handle_events_i, and ACE_Time_Value::msec.

01028 {
01029   // Decrement <wait_time> with the amount of time spent in the method
01030   ACE_Countdown_Time countdown (&wait_time);
01031   return this->handle_events_i (wait_time.msec ());
01032 }

int ACE_POSIX_AIOCB_Proactor::handle_events_i u_long    milli_seconds [protected]
 

Dispatch a single set of events. If <milli_seconds> elapses before any events occur, return 0. Return 1 if a completion dispatched. Return -1 on errors.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1134 of file POSIX_Proactor.cpp.

References ACE_ERROR, ACE_INFINITE, aiocb_list_, aiocb_list_max_size_, ACE_POSIX_Proactor::application_specific_code, find_completed_aio, LM_ERROR, process_result_queue, timespec::tv_nsec, and timespec::tv_sec.

Referenced by handle_events.

01135 {
01136   int result_suspend = 0;
01137   int retval= 0;
01138 
01139   if (milli_seconds == ACE_INFINITE)
01140     // Indefinite blocking.
01141     result_suspend = aio_suspend (aiocb_list_,
01142                                   aiocb_list_max_size_,
01143                                   0);
01144   else
01145     {
01146       // Block on <aio_suspend> for <milli_seconds>
01147       timespec timeout;
01148       timeout.tv_sec = milli_seconds / 1000;
01149       timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000;
01150       result_suspend = aio_suspend (aiocb_list_,
01151                                     aiocb_list_max_size_,
01152                                     &timeout);
01153     }
01154 
01155   // Check for errors
01156   if (result_suspend == -1)
01157     {
01158       if (errno != EAGAIN &&   // Timeout
01159           errno != EINTR )    // Interrupted call
01160           ACE_ERROR ((LM_ERROR,
01161                       "%N:%l:(%P | %t)::%p\n",
01162                       "ACE_POSIX_AIOCB_Proactor::handle_events:"
01163                       "aio_suspend failed\n"));
01164 
01165       // let continue work
01166       // we should check "post_completed" queue
01167     }
01168   else
01169     {
01170       size_t index = 0;
01171       size_t count = aiocb_list_max_size_;  // max number to iterate
01172       int error_status = 0;
01173       size_t transfer_count = 0;
01174 
01175       for (;; retval++)
01176         {
01177           ACE_POSIX_Asynch_Result *asynch_result =
01178             find_completed_aio (error_status,
01179                                 transfer_count,
01180                                 index,
01181                                 count);
01182 
01183           if (asynch_result == 0)
01184             break;
01185 
01186           // Call the application code.
01187           this->application_specific_code (asynch_result,
01188                                            transfer_count,
01189                                            0,             // No completion key.
01190                                            error_status);
01191         }
01192     }
01193 
01194   // process post_completed results
01195   retval += this->process_result_queue ();
01196 
01197   return retval > 0 ? 1 : 0;
01198 }

int ACE_POSIX_AIOCB_Proactor::notify_completion int    sig_num [protected, virtual]
 

Notify queue of "post_completed" ACE_POSIX_Asynch_Results called from post_completion method.

Reimplemented in ACE_POSIX_CB_Proactor.

Definition at line 1041 of file POSIX_Proactor.cpp.

References aiocb_notify_pipe_manager_, and ACE_AIOCB_Notify_Pipe_Manager::notify.

Referenced by putq_result.

01042 {
01043   ACE_UNUSED_ARG (sig_num);
01044 
01045   return this->aiocb_notify_pipe_manager_->notify ();
01046 }

int ACE_POSIX_AIOCB_Proactor::post_completion ACE_POSIX_Asynch_Result   result [virtual]
 

Post a result to the completion port of the Proactor.

Implements ACE_POSIX_Proactor.

Definition at line 1049 of file POSIX_Proactor.cpp.

References ACE_GUARD_RETURN, ACE_MT, ACE_SYNCH_MUTEX, and putq_result.

01050 {
01051   ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
01052 
01053   int ret_val = this->putq_result (result);
01054 
01055   return ret_val;
01056 }

int ACE_POSIX_AIOCB_Proactor::process_result_queue void    [protected]
 

Process the internal results queue.

Definition at line 1114 of file POSIX_Proactor.cpp.

References ACE_POSIX_Proactor::application_specific_code, ACE_POSIX_Asynch_Result::bytes_transferred, ACE_POSIX_Asynch_Result::error, and getq_result.

Referenced by handle_events_i.

01115 {
01116   int ret_val = 0;
01117   ACE_POSIX_Asynch_Result* result = 0;
01118 
01119   while ((result = this->getq_result ()) != 0)
01120     {
01121       this->application_specific_code
01122             (result,
01123              result->bytes_transferred(), // 0, No bytes transferred.
01124              0,  // No completion key.
01125              result->error());   //0, No error.
01126 
01127       ret_val++;
01128     }
01129 
01130   return ret_val;
01131 }

int ACE_POSIX_AIOCB_Proactor::putq_result ACE_POSIX_Asynch_Result   result [protected]
 

Put "post_completed" result into the internal queue.

Definition at line 1059 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, ACE_Unbounded_Queue< ACE_POSIX_Asynch_Result * >::enqueue_tail, LM_ERROR, notify_completion, result_queue_, and ACE_POSIX_Asynch_Result::signal_number.

Referenced by cancel_aio, post_completion, and start_deferred_aio.

01060 {
01061   // this protected method should be called with locked mutex_
01062   // we can't use GUARD as Proactor uses non-recursive mutex
01063 
01064   if (!result)
01065     return -1;
01066 
01067   int sig_num = result->signal_number ();
01068   int ret_val = this->result_queue_.enqueue_tail (result);
01069 
01070   if (ret_val == -1)
01071     ACE_ERROR_RETURN ((LM_ERROR,
01072                        "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
01073                       -1);
01074 
01075   this->notify_completion (sig_num);
01076 
01077   return 0;
01078 }

void ACE_POSIX_AIOCB_Proactor::set_notify_handle ACE_HANDLE    h [protected]
 

To identify requests from Notify_Pipe_Manager.

Definition at line 815 of file POSIX_Proactor.cpp.

References notify_pipe_read_handle_.

Referenced by ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager.

00816 {
00817   notify_pipe_read_handle_ = h;
00818 }

int ACE_POSIX_AIOCB_Proactor::start_aio ACE_POSIX_Asynch_Result   result,
ACE_POSIX_Proactor::Opcode    op
[virtual]
 

Definition at line 1271 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_MT, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, allocate_aio_slot, LM_ERROR, num_deferred_aiocb_, ACE_POSIX_Proactor::Opcode, ACE_POSIX_Proactor::READ, result_list_, ssize_t, start_aio_i, and ACE_POSIX_Proactor::WRITE.

01273 {
01274   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
01275 
01276   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
01277 
01278   int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
01279 
01280   if (result == 0) // Just check the status of the list
01281     return ret_val;
01282 
01283   // Save operation code in the aiocb
01284   switch (op)
01285     {
01286     case ACE_POSIX_Proactor::READ:
01287       result->aio_lio_opcode = LIO_READ;
01288       break;
01289 
01290     case ACE_POSIX_Proactor::WRITE:
01291       result->aio_lio_opcode = LIO_WRITE;
01292       break;
01293 
01294     default:
01295       ACE_ERROR_RETURN ((LM_ERROR,
01296                          "%N:%l:(%P | %t)::\n"
01297                          "start_aio: Invalid operation code\n"),
01298                         -1);
01299     }
01300 
01301   if (ret_val != 0)   // No free slot
01302     {
01303       errno = EAGAIN;
01304       return -1;
01305     }
01306 
01307   // Find a free slot and store.
01308 
01309   ssize_t slot = allocate_aio_slot (result);
01310 
01311   if (slot < 0)
01312     return -1;
01313 
01314   size_t index = ACE_static_cast (size_t, slot);
01315 
01316   result_list_[index] = result;   //Store result ptr anyway
01317   aiocb_list_cur_size_++;
01318 
01319   ret_val = start_aio_i (result);
01320   switch (ret_val)
01321     {
01322     case 0:     // started OK
01323       aiocb_list_[index] = result;
01324       return 0;
01325 
01326     case 1:     // OS AIO queue overflow
01327       num_deferred_aiocb_ ++;
01328       return 0;
01329 
01330     default:    // Invalid request, there is no point
01331       break;    // to start it later
01332     }
01333 
01334   result_list_[index] = 0;
01335   aiocb_list_cur_size_--;
01336   return -1;
01337 }

int ACE_POSIX_AIOCB_Proactor::start_aio_i ACE_POSIX_Asynch_Result   result [protected, virtual]
 

Initiate an aio operation.

Definition at line 1385 of file POSIX_Proactor.cpp.

References ACE_ERROR, ACE_LIB_TEXT, ACE_TCHAR, ACE_TRACE, LM_ERROR, and num_started_aio_.

Referenced by start_aio, and start_deferred_aio.

01386 {
01387   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
01388 
01389   int ret_val;
01390   const ACE_TCHAR *ptype;
01391 
01392   // Start IO
01393 
01394   switch (result->aio_lio_opcode )
01395     {
01396     case LIO_READ :
01397       ptype = ACE_LIB_TEXT ("read ");
01398       ret_val = aio_read (result);
01399       break;
01400     case LIO_WRITE :
01401       ptype = ACE_LIB_TEXT ("write");
01402       ret_val = aio_write (result);
01403       break;
01404     default:
01405       ptype = ACE_LIB_TEXT ("?????");
01406       ret_val = -1;
01407       break;
01408     }
01409 
01410   if (ret_val == 0)
01411     this->num_started_aio_++;
01412   else // if (ret_val == -1)
01413     {
01414       if (errno == EAGAIN || errno == ENOMEM)  //Ok, it will be deferred AIO
01415         ret_val = 1;
01416       else
01417         ACE_ERROR ((LM_ERROR,
01418                     ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
01419                     ptype,
01420                     ACE_LIB_TEXT ("queueing failed\n")));
01421     }
01422 
01423   return ret_val;
01424 }

int ACE_POSIX_AIOCB_Proactor::start_deferred_aio void    [protected]
 

Start deferred AIO if necessary.

Definition at line 1428 of file POSIX_Proactor.cpp.

References ACE_ERROR_RETURN, ACE_TRACE, aiocb_list_, aiocb_list_cur_size_, aiocb_list_max_size_, LM_ERROR, num_deferred_aiocb_, putq_result, result_list_, ACE_POSIX_Asynch_Result::set_bytes_transferred, ACE_POSIX_Asynch_Result::set_error, and start_aio_i.

Referenced by find_completed_aio.

01429 {
01430   ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
01431 
01432   // This protected method is called from
01433   // find_completed_aio after any AIO completion
01434   // We should call this method always with locked
01435   // ACE_POSIX_AIOCB_Proactor::mutex_
01436   //
01437   // It tries to start the first deferred AIO
01438   // if such exists
01439 
01440   if (num_deferred_aiocb_ == 0)
01441     return 0;  //  nothing to do
01442 
01443   size_t i = 0;
01444 
01445   for (i= 0; i < this->aiocb_list_max_size_; i++)
01446     if (result_list_[i] !=0       // check for
01447        && aiocb_list_[i]  ==0)     // deferred AIO
01448       break;
01449 
01450   if (i >= this->aiocb_list_max_size_)
01451     ACE_ERROR_RETURN ((LM_ERROR,
01452                  "%N:%l:(%P | %t)::\n"
01453                  "start_deferred_aio:"
01454                  "internal Proactor error 3\n"),
01455                  -1);
01456 
01457   ACE_POSIX_Asynch_Result *result = result_list_[i];
01458 
01459   int ret_val = start_aio_i (result);
01460 
01461   switch (ret_val)
01462     {
01463     case 0 :    //started OK , decrement count of deferred AIOs
01464       aiocb_list_[i] = result;
01465       num_deferred_aiocb_ --;
01466       return 0;
01467 
01468     case 1 :
01469       return 0;  //try again later
01470 
01471     default :     // Invalid Parameters , should never be
01472       break;
01473     }
01474 
01475   //AL notify  user
01476 
01477   result_list_[i] = 0;
01478   aiocb_list_cur_size_--;
01479 
01480   num_deferred_aiocb_ --;
01481 
01482   result->set_error (errno);
01483   result->set_bytes_transferred (0);
01484   this->putq_result (result);  // we are with locked mutex_ here !
01485 
01486   return -1;
01487 }


Friends And Related Function Documentation

friend class ACE_AIOCB_Notify_Pipe_Manager [friend]
 

Handler needs to call application specific code.

Definition at line 330 of file POSIX_Proactor.h.

friend class ACE_POSIX_Asynch_Accept [friend]
 

Definition at line 335 of file POSIX_Proactor.h.

friend class ACE_POSIX_Asynch_Connect [friend]
 

Definition at line 336 of file POSIX_Proactor.h.

friend class ACE_POSIX_Asynch_Operation [friend]
 

This class does the registering of Asynch Operations with the Proactor which is necessary in the AIOCB strategy.

Definition at line 334 of file POSIX_Proactor.h.


Member Data Documentation

aiocb** ACE_POSIX_AIOCB_Proactor::aiocb_list_ [protected]
 

Use a dynamically allocated array to keep track of all the aio's issued currently.

Definition at line 468 of file POSIX_Proactor.h.

Referenced by cancel_aio, create_result_aiocb_list, delete_result_aiocb_list, find_completed_aio, handle_events_i, start_aio, and start_deferred_aio.

size_t ACE_POSIX_AIOCB_Proactor::aiocb_list_cur_size_ [protected]
 

To maintain the current size of the array (list).

Definition at line 475 of file POSIX_Proactor.h.

Referenced by cancel_aio, find_completed_aio, start_aio, and start_deferred_aio.

size_t ACE_POSIX_AIOCB_Proactor::aiocb_list_max_size_ [protected]
 

To maintain the maximum size of the array (list).

Definition at line 472 of file POSIX_Proactor.h.

Referenced by allocate_aio_slot, cancel_aio, check_max_aio_num, create_result_aiocb_list, delete_result_aiocb_list, find_completed_aio, handle_events_i, start_aio, and start_deferred_aio.

ACE_AIOCB_Notify_Pipe_Manager* ACE_POSIX_AIOCB_Proactor::aiocb_notify_pipe_manager_ [protected]
 

This class takes care of doing <accept> when we use AIO_CONTROL_BLOCKS strategy.

Definition at line 464 of file POSIX_Proactor.h.

Referenced by create_notify_manager, delete_notify_manager, and notify_completion.

ACE_SYNCH_MUTEX ACE_POSIX_AIOCB_Proactor::mutex_ [protected]
 

Mutex to protect work with lists.

Definition at line 478 of file POSIX_Proactor.h.

ACE_HANDLE ACE_POSIX_AIOCB_Proactor::notify_pipe_read_handle_ [protected]
 

The purpose of this member is only to identify asynchronous request from NotifyManager. We will reserve for it always slot 0 in the list of aiocb's to be sure that don't lose notifications.

Definition at line 483 of file POSIX_Proactor.h.

Referenced by allocate_aio_slot, and set_notify_handle.

size_t ACE_POSIX_AIOCB_Proactor::num_deferred_aiocb_ [protected]
 

Number of ACE_POSIX_Asynch_Result's waiting for start i.e. deferred AIOs.

Definition at line 487 of file POSIX_Proactor.h.

Referenced by cancel_aio, start_aio, and start_deferred_aio.

size_t ACE_POSIX_AIOCB_Proactor::num_started_aio_ [protected]
 

Number active,i.e. running requests.

Definition at line 490 of file POSIX_Proactor.h.

Referenced by find_completed_aio, and start_aio_i.

ACE_POSIX_Asynch_Result** ACE_POSIX_AIOCB_Proactor::result_list_ [protected]
 

Definition at line 469 of file POSIX_Proactor.h.

Referenced by allocate_aio_slot, cancel_aio, create_result_aiocb_list, delete_result_aiocb_list, find_completed_aio, start_aio, and start_deferred_aio.

ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *> ACE_POSIX_AIOCB_Proactor::result_queue_ [protected]
 

Queue which keeps "post_completed" ACE_POSIX_Asynch_Result's.

Definition at line 493 of file POSIX_Proactor.h.

Referenced by getq_result, and putq_result.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 12:52:10 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002