Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Related Pages  

TAO_Transport Class Reference

Generic definitions for the Transport class. More...

#include <Transport.h>

Inheritance diagram for TAO_Transport:

Inheritance graph
[legend]
Collaboration diagram for TAO_Transport:

Collaboration graph
[legend]
List of all members.

Incoming Queue Methods

enum  { TAO_ONEWAY_REQUEST = 0, TAO_TWOWAY_REQUEST = 1, TAO_REPLY }
int generate_locate_request (TAO_Target_Specification &spec, TAO_Operation_Details &opdetails, TAO_OutputCDR &output)
 This is a request for the transport object to write a LocateRequest header before it is sent out. More...

virtual int generate_request_header (TAO_Operation_Details &opd, TAO_Target_Specification &spec, TAO_OutputCDR &msg)
 This is a request for the transport object to write a request header before it sends out the request. More...

int recache_transport (TAO_Transport_Descriptor_Interface *desc)
 recache ourselves in the cache. More...

virtual void connection_handler_closing (void)
 Method for the connection handler to signify that it is being closed and destroyed. More...

virtual int handle_input_i (TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time=0, int block=0)
 Callback to read incoming data. More...

void try_to_complete (ACE_Time_Value *max_wait_time)
virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, int message_semantics, ACE_Time_Value *max_time_wait)=0
 Prepare the waiting and demuxing strategy to receive a reply for a new request. More...

virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub=0, int message_semantics=TAO_Transport::TAO_TWOWAY_REQUEST, ACE_Time_Value *max_time_wait=0)=0
 This method formats the stream and then sends the message on the transport. More...

virtual int send_message_shared (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Sent the contents of <message_block>. More...

int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time=0)
 Send a message block chain,. More...

int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time)
 Send a message block chain, assuming the lock is held. More...

void purge_entry (void)
 Cache management. More...

int make_idle (void)
 Cache management. More...

int update_transport (void)
 Cache management. More...

int handle_timeout (const ACE_Time_Value &current_time, const void *act)
 The timeout callback, invoked when any of the timers related to this transport expire. More...

size_t recv_buffer_size (void)
 Accessor to recv_buffer_size_. More...

size_t sent_byte_count (void)
 Accessor to sent_byte_count_. More...

int enqueue_incoming_message (TAO_Queued_Data *queueable_message)
 Queue up queueable_message as a completely-received incoming message. More...

virtual ACE_Event_Handlerevent_handler_i (void)=0
 Return the event handler used to receive notifications from the Reactor. More...

virtual TAO_Connection_Handlerconnection_handler_i (void)=0
virtual TAO_Connection_Handlerinvalidate_event_handler_i (void)=0
 Called by connection_handler_closing() to signal that the protocol-specific transport should dissociate itself with the protocol-specific connection handler. More...

virtual TAO_Pluggable_Messagingmessaging_object (void)=0
 Return the messaging object that is used to format the data that needs to be sent. More...

virtual ssize_t send_i (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)=0
 Write the complete iovec chain to the connection. More...

virtual ssize_t recv_i (char *buffer, size_t len, const ACE_Time_Value *timeout=0)=0
virtual int register_handler_i (void)=0
 Register the handler with the reactor. More...

int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh)
 Process the message by sending it to the higher layers of the ORB. More...

int send_message_shared_i (TAO_Stub *stub, int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Implement send_message_shared() assuming the handler_lock_ is held. More...

int check_event_handler_i (const char *caller)
 Check if the underlying event handler is still valid. More...


Public Types


Public Methods

 TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core)
 default creator, requres the tag value be supplied. More...

virtual ~TAO_Transport (void)
 destructor. More...

CORBA::ULong tag (void) const
 Return the protocol tag. More...

TAO_ORB_Coreorb_core (void) const
 Access the ORB that owns this connection. More...

TAO_Transport_Mux_Strategytms (void) const
 Get the TAO_Tranport_Mux_Strategy used by this object. More...

TAO_Wait_Strategywait_strategy (void) const
 Return the TAO_Wait_Strategy used by this object. More...

int handle_output (void)
 Callback method to reactively drain the outgoing data queue. More...

int bidirectional_flag (void) const
 Get/Set the bidirectional flag. More...

void bidirectional_flag (int flag)
void cache_map_entry (TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry)
 Set/Get the Cache Map entry. More...

TAO_Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry (void)
size_t id (void) const
 Set and Get the identifier for this transport instance. More...

void id (size_t id)
unsigned long purging_order (void) const
 Get and Set the purging order. The purging strategy uses the set version to set the purging order. More...

void purging_order (unsigned long value)
int queue_is_empty (void)
 Check if there are messages pending in the queue. More...

void provide_handle (ACE_Handle_Set &reactor_registered, TAO_EventHandlerSet &unregistered)
 Fill in a handle_set with any associated handler's reactor handle. More...

int register_handler (void)
 Remove all messages from the outgoing queue. More...

ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout=0)
 Write the complete Message_Block chain to the connection. More...

ssize_t recv (char *buffer, size_t len, const ACE_Time_Value *timeout=0)
 Read len bytes from into buf. More...

virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor)=0
virtual int tear_listen_point_list (TAO_InputCDR &cdr)
 Extracts the list of listen points from the <cdr> stream. The list would have the protocol specific details of the ListenPoints. More...

TAO_Codeset_Translator_Factorychar_translator (void) const
 CodeSet Negotiation - Get the char codeset translator factory. More...

TAO_Codeset_Translator_Factorywchar_translator (void) const
 CodeSet Negotiation - Get the wchar codeset translator factory. More...

void char_translator (TAO_Codeset_Translator_Factory *)
 CodeSet negotiation - Set the char codeset translator factory. More...

void wchar_translator (TAO_Codeset_Translator_Factory *)
 CodeSet negotiation - Set the wchar codeset translator factory. More...

void assign_translators (TAO_InputCDR *, TAO_OutputCDR *)
 Use the Transport's codeset factories to set the translator for input and output CDRs. More...

CORBA::Boolean is_tcs_set () const
 Return true if the tcs has been set. More...

void first_request_sent ()
 Set the state of the first_request_ flag to 0. More...

Control connection lifecycle
These methods are routed through the TMS object. The TMS strategies implement them correctly.

virtual int idle_after_send (void)
 Request has been just sent, but the reply is not received. Idle the transport now. More...

virtual int idle_after_reply (void)
 Request is sent and the reply is received. Idle the transport now. More...

virtual void close_connection (void)
 Call the implementation method after obtaining the lock. More...


Static Public Methods

TAO_Transport * _duplicate (TAO_Transport *transport)
void release (TAO_Transport *transport)

Protected Attributes

CORBA::ULong tag_
 IOP protocol tag. More...

TAO_ORB_Coreorb_core_
 Global orbcore resource. More...

TAO_Transport_Cache_Manager::HASH_MAP_ENTRYcache_map_entry_
 Our entry in the cache. We dont own this. It is here for our convinience. We cannot just change things around. More...

TAO_Transport_Mux_Strategytms_
 Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request. More...

TAO_Wait_Strategyws_
 Strategy for waiting for the reply after sending the request. More...

int bidirectional_flag_
 Use to check if bidirectional info has been synchronized with the peer. More...

TAO_Queued_Messagehead_
 Implement the outgoing data queue. More...

TAO_Queued_Messagetail_
TAO_Incoming_Message_Queue incoming_message_queue_
 Queue of the completely-received incoming messages.. More...

TAO_Queued_Datauncompleted_message_
 Place to hold a partially-received (waiting-to-be-completed) message. More...

ACE_Time_Value current_deadline_
 The queue will start draining no later than <queing_deadline_> *if* the deadline is. More...

long flush_timer_id_
 The timer ID. More...

TAO_Transport_Timer transport_timer_
 The adapter used to receive timeout callbacks from the Reactor. More...

ACE_Lockhandler_lock_
 Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized. More...

size_t id_
 A unique identifier for the transport. More...

unsigned long purging_order_
 Used by the LRU, LFU and FIFO Connection Purging Strategies. More...

size_t recv_buffer_size_
 Size of the buffer received. More...

size_t sent_byte_count_
 Number of bytes sent. More...


Private Methods

TAO_Transport_Cache_Managertransport_cache_manager (void)
 Helper method that returns the Transport Cache Manager. More...

int drain_queue (void)
 Send some of the data in the queue. More...

int drain_queue_i (void)
 Implement drain_queue() assuming the lock is held. More...

int queue_is_empty_i (void)
 Check if there are messages pending in the queue. More...

int drain_queue_helper (int &iovcnt, iovec iov[])
 A helper routine used in drain_queue_i(). More...

int schedule_output_i (void)
 Schedule handle_output() callbacks. More...

int cancel_output_i (void)
 Cancel handle_output() callbacks. More...

void cleanup_queue (size_t byte_count)
 Cleanup the queue. More...

int check_buffering_constraints_i (TAO_Stub *stub, int &must_flush)
 Copy the contents of a message block into a Queued_Message TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); Check if the buffering constraints have been reached. More...

int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Send a synchronous message, i.e. block until the message is on the wire. More...

int send_reply_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time)
 Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue. More...

int send_synch_message_helper_i (TAO_Synch_Queued_Message &s, ACE_Time_Value *max_wait_time)
 A helper method used by <send_synchronous_message_i> and <send_reply_message_i>. Reusable code that could be used by both the methods. More...

int flush_timer_pending (void) const
 Check if the flush timer is still pending. More...

void reset_flush_timer (void)
 The flush timer expired or was explicitly cancelled, mark it as not pending. More...

void report_invalid_event_handler (const char *caller)
 Print out error messages if the event handler is not valid. More...

int process_queue_head (TAO_Resume_Handle &rh)
int notify_reactor (void)
TAO_Connection_Handlerinvalidate_event_handler (void)
 Grab the mutex and then call invalidate_event_handler_i(). More...

void send_connection_closed_notifications (void)
 Notify all the components inside a Transport when the underlying connection is closed. More...

void send_connection_closed_notifications_i (void)
 Assume the lock is held. More...

void close_connection_i (void)
 Implement close_connection() assuming the handler_lock_ is held. More...

void close_connection_no_purge (void)
 Close the underlying connection, do not purge the entry from the map (supposedly it was purged already, trust the caller, yuck!). More...

void close_connection_shared (int purge, TAO_Connection_Handler *eh)
 Close the underlying connection, implements the code shared by all the close_connection_* variants. More...

 TAO_Transport (const TAO_Transport &)
 Prohibited. More...

void operator= (const TAO_Transport &)

Private Attributes

TAO_Codeset_Translator_Factorychar_translator_
 Additional member values required to support codeset translation. More...

TAO_Codeset_Translator_Factorywchar_translator_
CORBA::Boolean tcs_set_
 The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be. More...

CORBA::Boolean first_request_
 First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection. More...


Friends

class TAO_Block_Flushing_Strategy
 This class needs priviledged access to - queue_is_empty_i() - drain_queue_i(). More...

class TAO_Reactive_Flushing_Strategy
 These classes need privileged access to: - schedule_output_i() - cancel_output_i(). More...

class TAO_Leader_Follower_Flushing_Strategy
class TAO_Transport_Cache_Manager
 This class needs priviledged access to: close_connection_no_purge (). More...


Detailed Description

Generic definitions for the Transport class.

The transport object is created in the Service handler constructor and deleted in the Service Handler's destructor!!

The main responsability of a Transport object is to encapsulate a connection, and provide a transport independent way to send and receive data. Since TAO is heavily based on the Reactor for all if not all its I/O the Transport class is usually implemented with a helper Connection Handler that adapts the generic Transport interface to the Reactor types.

The outgoing data path:

One of the responsibilities of the TAO_Transport class is to send out GIOP messages as efficiently as possible. In most cases messages are put out in FIFO order, the transport object will put out the message using a single system call and return control to the application. However, for oneways and AMI requests it may be more efficient (or required if the SYNC_NONE policy is in effect) to queue the messages until a large enough data set is available. Another reason to queue is that some applications cannot block for I/O, yet they want to send messages so large that a single write() operation would not be able to cope with them. In such cases we need to queue the data and use the Reactor to drain the queue.

Therefore, the Transport class may need to use a queue to temporarily hold the messages, and, in some configurations, it may need to use the Reactor to concurrently drain such queues.

Out of order messages:

TAO provides explicit policies to send 'urgent' messages. Such messages may put at the head of the queue. However, they cannot be sent immediately because the transport may already be sending another message in a reactive fashion.

Consequently, the Transport must also know if the head of the queue has been partially sent. In that case new messages can only follow the head. Only once the head is completely sent we can start sending new messages.

Waiting threads:

One or more threads can be blocked waiting for the connection to completely send the message. The thread should return as soon as its message has been sent, so a per-thread condition is required. This suggest that simply using a ACE_Message_Queue would not be enough: there is a significant amount of ancillary information, to keep on each message that the Message_Block class does not provide room for.

Blocking I/O is still attractive for some applications. First, my eliminating the Reactor overhead performance is improved when sending large blocks of data. Second, using the Reactor to send out data opens the door for nested upcalls, yet some applications cannot deal with the reentrancy issues in this case.

Timeouts:

Some or all messages could have a timeout period attached to them. The timeout source could either be some high-level policy or maybe some strategy to prevent denial of service attacks. In any case the timeouts are per-message, and later messages could have shorter timeouts. In fact, some kind of scheduling (such as EDF) could be required in a few applications.

Conclusions:

The outgoing data path consist in several components:

The Transport object provides a single method to send request messages (send_request_message ()).

The incoming data path:

One of the main responsibilities of the transport is to read and process the incoming GIOP message as quickly and efficiently as possible. There are other forces that needs to be given due consideration. They are

Parsing messages (GIOP) & processing the message:

The messages should be checked for validity and the right information should be sent to the higher layer for processing. The process of doing a sanity check and preparing the messages for the higher layers of the ORB are done by the messaging protocol.

Design forces and Challenges

To keep things as efficient as possible for medium sized requests, it would be good to minimise data copying and locking along the incoming path ie. from the time of reading the data from the handle to the application. We achieve this by creating a buffer on stack and reading the data from the handle into the buffer. We then pass the same data block (the buffer is encapsulated into a data block) to the higher layers of the ORB. The problems stem from the following (a) Data is bigger than the buffer that we have on stack (b) Transports like TCP do not guarantee availability of the whole chunk of data in one shot. Data could trickle in byte by byte. (c) Single read gives multiple messages

We solve the problems as follows

(a) First do a read with the buffer on stack. Query the underlying messaging object whether the message has any incomplete portion. If so, we just grow the buffer for the missing size and read the rest of the message. We free the handle and then send the message to the higher layers of the ORB for processing.

(b) If we block (ie. if we receive a EWOULDBLOCK) while trying to do the above (ie. trying to read after growing the buffer size) we put the message in a queue and return back to the reactor. The reactor would call us back when the handle becomes read ready.

(c) If we get multiple messages (possible if the client connected to the server sends oneways or AMI requests), we parse and split the messages. Every message is put in the queue. Once the messages are queued, the thread picks up one message to send to the higher layers of the ORB. Before doing that, if it finds more messages, it sends a notify to the reactor without resuming the handle. The next thread picks up a message from the queue and processes that. Once the queue is drained the last thread resumes the handle.

Sending Replies

We could use the outgoing path of the ORB to send replies. This would allow us to reuse most of the code in the outgoing data path. We were doing this till TAO-1.2.3. We run in to problems. When writing the reply the ORB gets flow controlled, and the ORB tries to flush the message by going into the reactor. This resulted in unnecessary nesting. The thread that gets into the Reactor could potentially handle other messages (incoming or outgoing) and the stack starts growing leading to crashes.

Solution to the nesting problem

The solution that we (plan to) adopt is pretty straight forward. The thread sending replies will not block to send the replies but queue the replies and return to the Reactor. (Note the careful usages of the terms "blocking in the Reactor" as opposed to "return back to the Reactor".

See Also:

http://deuce.doc.wustl.edu/cvsweb/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html

Definition at line 211 of file Transport.h.


Member Enumeration Documentation

anonymous enum
 

Enumeration values:
TAO_ONEWAY_REQUEST 
TAO_TWOWAY_REQUEST 
TAO_REPLY 

Definition at line 558 of file Transport.h.

00559     {
00560       TAO_ONEWAY_REQUEST = 0,
00561       TAO_TWOWAY_REQUEST = 1,
00562       TAO_REPLY
00563     };


Constructor & Destructor Documentation

TAO_Transport::TAO_Transport CORBA::ULong    tag,
TAO_ORB_Core   orb_core
 

default creator, requres the tag value be supplied.

Definition at line 104 of file Transport.cpp.

References TAO_ORB_Core::client_factory, TAO_Resource_Factory::create_cached_connection_lock, TAO_Client_Strategy_Factory::create_transport_mux_strategy, TAO_Client_Strategy_Factory::create_wait_strategy, handler_lock_, orb_core_, TAO_ORB_Core::resource_factory, tms_, and ws_.

00106   : TAO_Synch_Refcountable (orb_core->resource_factory ()->create_cached_connection_lock (), 1)
00107   , tag_ (tag)
00108   , orb_core_ (orb_core)
00109   , cache_map_entry_ (0)
00110   , bidirectional_flag_ (-1)
00111   , head_ (0)
00112   , tail_ (0)
00113   , incoming_message_queue_ (orb_core)
00114   , uncompleted_message_ (0)
00115   , current_deadline_ (ACE_Time_Value::zero)
00116   , flush_timer_id_ (-1)
00117   , transport_timer_ (this)
00118   , id_ ((size_t) this)
00119   , purging_order_ (0)
00120   , recv_buffer_size_ (0)
00121   , sent_byte_count_ (0)
00122   , char_translator_ (0)
00123   , wchar_translator_ (0)
00124   , tcs_set_ (0)
00125   , first_request_ (1)
00126 {
00127   TAO_Client_Strategy_Factory *cf =
00128     this->orb_core_->client_factory ();
00129 
00130   // Create WS now.
00131   this->ws_ = cf->create_wait_strategy (this);
00132 
00133   // Create TMS now.
00134   this->tms_ = cf->create_transport_mux_strategy (this);
00135 
00136   // Create a handler lock
00137   this->handler_lock_ =
00138     this->orb_core_->resource_factory ()->create_cached_connection_lock ();
00139 }

TAO_Transport::~TAO_Transport void    [virtual]
 

destructor.

Definition at line 141 of file Transport.cpp.

References ACE_ASSERT, handler_lock_, tms_, and ws_.

00142 {
00143   ACE_ASSERT(this->refcount() == 0);
00144 
00145   delete this->ws_;
00146 
00147   delete this->tms_;
00148 
00149   delete this->handler_lock_;
00150 
00151   // By the time the destructor is reached all the connection stuff
00152   // *must* have been cleaned up
00153   ACE_ASSERT(this->head_ == 0);
00154   ACE_ASSERT(this->cache_map_entry_ == 0);
00155 }

TAO_Transport::TAO_Transport const TAO_Transport &    [private]
 

Prohibited.


Member Function Documentation

TAO_Transport * TAO_Transport::_duplicate TAO_Transport *    transport [static]
 

Definition at line 164 of file Transport.cpp.

References TAO_Synch_Refcountable::increment.

Referenced by TAO_IIOP_Connector::make_connection, TAO_Cache_IntId::operator=, TAO_Transport_Cache_Manager::purge, TAO_Connection_Handler::transport, and TAO_Asynch_Reply_Dispatcher_Base::transport.

00165 {
00166   if (transport != 0)
00167     {
00168       transport->increment ();
00169     }
00170   return transport;
00171 }

void TAO_Transport::assign_translators TAO_InputCDR  ,
TAO_OutputCDR  
 

Use the Transport's codeset factories to set the translator for input and output CDRs.

Definition at line 2164 of file Transport.cpp.

References TAO_Codeset_Translator_Factory::assign, char_translator_, and wchar_translator_.

Referenced by TAO_ServerRequest::init_reply, TAO_GIOP_Synch_Invocation::inp_stream, TAO_GIOP_Message_Lite::process_request_message, TAO_GIOP_Message_Base::process_request_message, and TAO_ServerRequest::tao_send_reply_exception.

02165 {
02166   if (this->char_translator_)
02167     {
02168       this->char_translator_->assign (inp);
02169       this->char_translator_->assign (outp);
02170     }
02171   if (this->wchar_translator_)
02172     {
02173       this->wchar_translator_->assign (inp);
02174       this->wchar_translator_->assign (outp);
02175     }
02176 }

ACE_INLINE void TAO_Transport::bidirectional_flag int    flag
 

Definition at line 38 of file Transport.inl.

References bidirectional_flag_.

00039 {
00040   this->bidirectional_flag_ = flag;
00041 }

ACE_INLINE int TAO_Transport::bidirectional_flag void    const
 

Get/Set the bidirectional flag.

Definition at line 32 of file Transport.inl.

References bidirectional_flag_.

Referenced by TAO_IIOP_Transport::generate_request_header, TAO_Muxed_TMS::request_id, TAO_Exclusive_TMS::request_id, and TAO_IIOP_Transport::tear_listen_point_list.

00033 {
00034   return this->bidirectional_flag_;
00035 }

ACE_INLINE TAO_Transport_Cache_Manager::HASH_MAP_ENTRY * TAO_Transport::cache_map_entry void   
 

Definition at line 44 of file Transport.inl.

References cache_map_entry_.

00045 {
00046   return this->cache_map_entry_;
00047 }

ACE_INLINE void TAO_Transport::cache_map_entry TAO_Transport_Cache_Manager::HASH_MAP_ENTRY   entry
 

Set/Get the Cache Map entry.

Definition at line 51 of file Transport.inl.

References cache_map_entry_, and TAO_Transport_Cache_Manager::HASH_MAP_ENTRY.

Referenced by TAO_Transport_Cache_Manager::bind_i, and TAO_Transport_Cache_Manager::purge.

00053 {
00054   this->cache_map_entry_ = entry;
00055 }

int TAO_Transport::cancel_output_i void    [private]
 

Cancel handle_output() callbacks.

Definition at line 788 of file Transport.cpp.

References ACE_DEBUG, ACE_Reactor::cancel_wakeup, event_handler_i, LM_DEBUG, ACE_Event_Handler::reactor, and ACE_Event_Handler::WRITE_MASK.

Referenced by TAO_Reactive_Flushing_Strategy::cancel_output, and TAO_Leader_Follower_Flushing_Strategy::cancel_output.

00789 {
00790   ACE_Event_Handler *eh = this->event_handler_i ();
00791   if (eh == 0)
00792     return -1;
00793 
00794   ACE_Reactor *reactor = eh->reactor ();
00795   if (reactor == 0)
00796     return -1;
00797 
00798   if (TAO_debug_level > 3)
00799     {
00800       ACE_DEBUG ((LM_DEBUG,
00801                   "TAO (%P|%t) - Transport[%d]::cancel_output\n",
00802                   this->id ()));
00803     }
00804 
00805   return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00806 }

ACE_INLINE void TAO_Transport::char_translator TAO_Codeset_Translator_Factory  
 

CodeSet negotiation - Set the char codeset translator factory.

Definition at line 141 of file Transport.inl.

References char_translator_, and tcs_set_.

00142 {
00143   this->char_translator_ = tf;
00144   this->tcs_set_ = 1;
00145 }

ACE_INLINE TAO_Codeset_Translator_Factory * TAO_Transport::char_translator void    const
 

CodeSet Negotiation - Get the char codeset translator factory.

Definition at line 129 of file Transport.inl.

References char_translator_.

Referenced by TAO_Codeset_Manager::generate_service_context, TAO_Codeset_Manager::process_service_context, and TAO_Codeset_Manager::set_tcs.

00130 {
00131   return this->char_translator_;
00132 }

int TAO_Transport::check_buffering_constraints_i TAO_Stub   stub,
int &    must_flush
[private]
 

Copy the contents of a message block into a Queued_Message TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); Check if the buffering constraints have been reached.

Definition at line 1048 of file Transport.cpp.

References TAO_Sync_Strategy::buffering_constraints_reached, ACE_Reactor::cancel_timer, current_deadline_, event_handler_i, flush_timer_id_, flush_timer_pending, ACE_OS::gettimeofday, head_, TAO_Queued_Message::message_length, TAO_Queued_Message::next, ACE_Event_Handler::reactor, ACE_Reactor::schedule_timer, and TAO_Stub::sync_strategy.

Referenced by send_message_shared_i.

01050 {
01051   // First let's compute the size of the queue:
01052   size_t msg_count = 0;
01053   size_t total_bytes = 0;
01054   for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
01055     {
01056       msg_count++;
01057       total_bytes += i->message_length ();
01058     }
01059 
01060   int set_timer;
01061   ACE_Time_Value new_deadline;
01062 
01063   int constraints_reached =
01064     stub->sync_strategy ().buffering_constraints_reached (stub,
01065                                                           msg_count,
01066                                                           total_bytes,
01067                                                           must_flush,
01068                                                           this->current_deadline_,
01069                                                           set_timer,
01070                                                           new_deadline);
01071 
01072   // ... set the new timer, also cancel any previous timers ...
01073   if (set_timer)
01074     {
01075       ACE_Event_Handler *eh = this->event_handler_i ();
01076       if (eh != 0)
01077         {
01078           ACE_Reactor *reactor = eh->reactor ();
01079           if (reactor != 0)
01080             {
01081               this->current_deadline_ = new_deadline;
01082               ACE_Time_Value delay =
01083                 new_deadline - ACE_OS::gettimeofday ();
01084 
01085               if (this->flush_timer_pending ())
01086                 {
01087                   (void) reactor->cancel_timer (this->flush_timer_id_);
01088                 }
01089               this->flush_timer_id_ =
01090                 reactor->schedule_timer (&this->transport_timer_,
01091                                          &this->current_deadline_,
01092                                          delay);
01093             }
01094         }
01095     }
01096 
01097   return constraints_reached;
01098 }

ACE_INLINE int TAO_Transport::check_event_handler_i const char *    caller [protected]
 

Check if the underlying event handler is still valid.

Returns:
Returns -1 if not, 0 if it is.

Definition at line 111 of file Transport.inl.

References event_handler_i, and report_invalid_event_handler.

Referenced by drain_queue_helper, TAO_IIOP_Transport::get_listen_point, recv, register_handler, send, send_message_block_chain, send_message_shared, TAO_IIOP_Transport::send_message_shared, TAO_IIOP_Transport::send_request, and TAO_IIOP_Transport::tear_listen_point_list.

00112 {
00113   // if there's no associated event handler, then we act like a null
00114   // transport
00115   if (this->event_handler_i () == 0)
00116     {
00117       this->report_invalid_event_handler (caller);
00118       errno = ENOENT;
00119       return -1;
00120     }
00121   return 0;
00122 }

void TAO_Transport::cleanup_queue size_t    byte_count [private]
 

Cleanup the queue.

Exactly <byte_count> bytes have been sent, the queue must be cleaned up as potentially several messages have been completely sent out. It leaves on head_ the next message to send out.

Definition at line 1011 of file Transport.cpp.

References ACE_DEBUG, TAO_Queued_Message::all_data_sent, TAO_Queued_Message::bytes_transferred, TAO_Queued_Message::destroy, head_, LM_DEBUG, TAO_Queued_Message::message_length, and TAO_Queued_Message::remove_from_list.

Referenced by drain_queue_helper.

01012 {
01013   while (this->head_ != 0 && byte_count > 0)
01014     {
01015       TAO_Queued_Message *i = this->head_;
01016 
01017       if (TAO_debug_level > 4)
01018         {
01019           ACE_DEBUG ((LM_DEBUG,
01020                       "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
01021                       "byte_count = %d\n",
01022                       this->id (), byte_count));
01023         }
01024 
01025       // Update the state of the first message
01026       i->bytes_transferred (byte_count);
01027 
01028       if (TAO_debug_level > 4)
01029         {
01030           ACE_DEBUG ((LM_DEBUG,
01031                       "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
01032                       "after transfer, bc = %d, all_sent = %d, ml = %d\n",
01033                       this->id (), byte_count, i->all_data_sent (),
01034                       i->message_length ()));
01035         }
01036 
01037       // ... if all the data was sent the message must be removed from
01038       // the queue...
01039       if (i->all_data_sent ())
01040         {
01041           i->remove_from_list (this->head_, this->tail_);
01042           i->destroy ();
01043         }
01044     }
01045 }

void TAO_Transport::close_connection void    [virtual]
 

Call the implementation method after obtaining the lock.

Definition at line 288 of file Transport.cpp.

References close_connection_shared, and invalidate_event_handler.

Referenced by TAO_GIOP_Invocation::close_connection, send_message_shared, TAO_IIOP_Transport::send_message_shared, and TAO_Wait_On_Read::wait.

00289 {
00290   TAO_Connection_Handler * eh = this->invalidate_event_handler ();
00291   this->close_connection_shared (1, eh);
00292 }

void TAO_Transport::close_connection_i void    [private]
 

Implement close_connection() assuming the handler_lock_ is held.

Definition at line 693 of file Transport.cpp.

References close_connection_shared, and invalidate_event_handler_i.

00694 {
00695   TAO_Connection_Handler * eh = this->invalidate_event_handler_i ();
00696   this->close_connection_shared (1, eh);
00697 }

void TAO_Transport::close_connection_no_purge void    [private]
 

Close the underlying connection, do not purge the entry from the map (supposedly it was purged already, trust the caller, yuck!).

Definition at line 700 of file Transport.cpp.

References close_connection_shared, and invalidate_event_handler.

Referenced by TAO_Transport_Cache_Manager::purge.

00701 {
00702   TAO_Connection_Handler * eh = this->invalidate_event_handler ();
00703 
00704   this->close_connection_shared (0,
00705                                  eh);
00706 }

void TAO_Transport::close_connection_shared int    purge,
TAO_Connection_Handler   eh
[private]
 

Close the underlying connection, implements the code shared by all the close_connection_* variants.

Definition at line 709 of file Transport.cpp.

References ACE_DEFAULT_TIMEOUT, TAO_Connection_Handler::close_connection, TAO_Connection_Handler::connection_close_wait, TAO_Connection_Handler::decr_refcount, TAO_ORB_Core::leader_follower, TAO_Wait_Strategy::non_blocking, orb_core_, TAO_Transport_Cache_Manager::purge_entry, send_connection_closed_notifications, TAO_LF_CH_Event::successful, transport_cache_manager, TAO_Leader_Follower::wait_for_event, and ws_.

Referenced by close_connection, close_connection_i, and close_connection_no_purge.

00711 {
00712   // Purge the entry
00713   if (purge)
00714     {
00715       this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00716     }
00717 
00718   if (eh == 0)
00719     {
00720       // The connection was already closed
00721       return;
00722     }
00723 
00724   // Set the event handler in the connection close wait state.
00725   (void) eh->connection_close_wait ();
00726 
00727   // NOTE: If the wait strategy is in blocking mode, then there is no
00728   // chance that it could be inside the reactor. We can safely skip
00729   // driving the LF. If <purge> is 0, then we are cleaned up by the
00730   // cache. So no point in driving the LF either.
00731   if (this->ws_->non_blocking () && purge)
00732     {
00733       // NOTE: This is a work around for BUG 1020. We drive the leader
00734       // follower for a predetermined amount of time. Ideally this
00735       // needs to be an ORB option. But this is just the first
00736       // cut. Doing that will be a todo..
00737 
00738       ACE_Time_Value tv (ACE_DEFAULT_TIMEOUT, 0);
00739       this->orb_core_->leader_follower ().wait_for_event (eh,
00740                                                           this,
00741                                                           &tv);
00742 
00743     }
00744 
00745   // We need to explicitly shut it down to avoid memory leaks.
00746   if (!eh->successful () ||
00747       !this->ws_->non_blocking ())
00748     {
00749       eh->close_connection ();
00750     }
00751 
00752   this->send_connection_closed_notifications ();
00753 
00754   // REFCNT: Matches incr_refcnt in XXX_Transport::XXX_Transport
00755   // REFCNT: Only one of this or connection_handler_closing() run
00756   eh->decr_refcount ();
00757 }

void TAO_Transport::connection_handler_closing void    [virtual]
 

Method for the connection handler to signify that it is being closed and destroyed.

Definition at line 353 of file Transport.cpp.

References TAO_Connection_Handler::decr_refcount, invalidate_event_handler, and send_connection_closed_notifications.

Referenced by TAO_Connection_Handler::transport.

00354 {
00355   // The connection has closed, we must invalidate the handler to
00356   // ensure that any attempt to use this transport results in an
00357   // error.  Basically all the other methods in the Transport
00358   // cooperate via check_event_handler_i()
00359 
00360   TAO_Connection_Handler * eh = this->invalidate_event_handler ();
00361   this->send_connection_closed_notifications ();
00362 
00363   if (eh != 0)
00364     {
00365       // REFCNT: Matches incr_refcnt in XXX_Transport::XXX_Transport
00366       // REFCNT: Only one of this or close_connection_shared() run
00367       eh->decr_refcount();
00368     }
00369 }

virtual TAO_Connection_Handler* TAO_Transport::connection_handler_i void    [protected, pure virtual]
 

Implemented in TAO_IIOP_Transport.

int TAO_Transport::drain_queue void    [private]
 

Send some of the data in the queue.

As the outgoing data is drained this method is invoked to send as much of the current message as possible.

Returns 0 if there is more data to send, -1 if there was an error and 1 if the message was completely sent.

Definition at line 838 of file Transport.cpp.

References ACE_GUARD_RETURN, TAO_Flushing_Strategy::cancel_output, drain_queue_i, TAO_ORB_Core::flushing_strategy, and orb_core.

Referenced by handle_output.

00839 {
00840   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00841 
00842   int retval = this->drain_queue_i ();
00843 
00844   if (retval == 1)
00845     {
00846       // ... there is no current message or it was completely
00847       // sent, cancel output...
00848       TAO_Flushing_Strategy *flushing_strategy =
00849         this->orb_core ()->flushing_strategy ();
00850 
00851       flushing_strategy->cancel_output (this);
00852 
00853       return 0;
00854     }
00855 
00856   return retval;
00857 }

int TAO_Transport::drain_queue_helper int &    iovcnt,
iovec    iov[]
[private]
 

A helper routine used in drain_queue_i().

Definition at line 860 of file Transport.cpp.

References ACE_ASSERT, ACE_DEBUG, check_event_handler_i, cleanup_queue, dump_iov, EWOULDBLOCK, head_, LM_DEBUG, send_i, sent_byte_count_, and ssize_t.

Referenced by drain_queue_i.

00861 {
00862   if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1)
00863     return -1;
00864 
00865   size_t byte_count = 0;
00866 
00867   // ... send the message ...
00868   ssize_t retval =
00869     this->send_i (iov, iovcnt, byte_count);
00870 
00871   if (TAO_debug_level == 5)
00872     {
00873       dump_iov (iov, iovcnt, this->id (),
00874                 byte_count, "drain_queue_helper");
00875     }
00876 
00877   // ... now we need to update the queue, removing elements
00878   // that have been sent, and updating the last element if it
00879   // was only partially sent ...
00880   this->cleanup_queue (byte_count);
00881   iovcnt = 0;
00882 
00883   if (retval == 0)
00884     {
00885       if (TAO_debug_level > 4)
00886         {
00887           ACE_DEBUG ((LM_DEBUG,
00888                       "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
00889                       "send() returns 0",
00890                       this->id ()));
00891         }
00892       return -1;
00893     }
00894   else if (retval == -1)
00895     {
00896       if (TAO_debug_level > 4)
00897         {
00898           ACE_DEBUG ((LM_DEBUG,
00899                       "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
00900                       "error during %p\n",
00901                       this->id (), "send_i()"));
00902         }
00903       if (errno == EWOULDBLOCK)
00904         return 0;
00905       return -1;
00906     }
00907 
00908   // ... start over, how do we guarantee progress?  Because if
00909   // no bytes are sent send() can only return 0 or -1
00910   ACE_ASSERT (byte_count != 0);
00911 
00912   // Total no. of bytes sent for a send call
00913   this->sent_byte_count_ += byte_count;
00914 
00915   if (TAO_debug_level > 4)
00916     {
00917       ACE_DEBUG ((LM_DEBUG,
00918                   "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
00919                   "byte_count = %d, head_is_empty = %d\n",
00920                   this->id(), byte_count, (this->head_ == 0)));
00921     }
00922   return 1;
00923 }

int TAO_Transport::drain_queue_i void    [private]
 

Implement drain_queue() assuming the lock is held.

Definition at line 926 of file Transport.cpp.

References ACE_DEBUG, ACE_IOV_MAX, ACE_Reactor::cancel_timer, drain_queue_helper, event_handler_i, TAO_Queued_Message::fill_iov, flush_timer_pending, head_, LM_DEBUG, TAO_Queued_Message::next, ACE_Event_Handler::reactor, reset_flush_timer, and sent_byte_count_.

Referenced by drain_queue, TAO_Block_Flushing_Strategy::schedule_output, send_message_block_chain_i, and send_synch_message_helper_i.

00927 {
00928   // This is the vector used to send data, it must be declared outside
00929   // the loop because after the loop there may still be data to be
00930   // sent
00931   int iovcnt = 0;
00932   iovec iov[ACE_IOV_MAX];
00933 
00934   // We loop over all the elements in the queue ...
00935   TAO_Queued_Message *i = this->head_;
00936 
00937   // reset the value so that the counting is done for each new send
00938   // call.
00939   this->sent_byte_count_ = 0;
00940 
00941   while (i != 0)
00942     {
00943       // ... each element fills the iovector ...
00944       i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
00945 
00946       // ... the vector is full, no choice but to send some data out.
00947       // We need to loop because a single message can span multiple
00948       // IOV_MAX elements ...
00949       if (iovcnt == ACE_IOV_MAX)
00950         {
00951           int retval =
00952             this->drain_queue_helper (iovcnt, iov);
00953 
00954           if (TAO_debug_level > 4)
00955             {
00956               ACE_DEBUG ((LM_DEBUG,
00957                           "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
00958                           "helper retval = %d\n",
00959                           this->id (), retval));
00960             }
00961           if (retval != 1)
00962             return retval;
00963 
00964           i = this->head_;
00965           continue;
00966         }
00967       // ... notice that this line is only reached if there is still
00968       // room in the iovector ...
00969       i = i->next ();
00970     }
00971 
00972 
00973   if (iovcnt != 0)
00974     {
00975       int retval =
00976         this->drain_queue_helper (iovcnt, iov);
00977 
00978           if (TAO_debug_level > 4)
00979             {
00980               ACE_DEBUG ((LM_DEBUG,
00981                           "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
00982                           "helper retval = %d\n",
00983                           this->id (), retval));
00984             }
00985       if (retval != 1)
00986         return retval;
00987     }
00988 
00989   if (this->head_ == 0)
00990     {
00991       if (this->flush_timer_pending ())
00992         {
00993           ACE_Event_Handler *eh = this->event_handler_i ();
00994           if (eh != 0)
00995             {
00996               ACE_Reactor *reactor = eh->reactor ();
00997               if (reactor != 0)
00998                 {
00999                   (void) reactor->cancel_timer (this->flush_timer_id_);
01000                 }
01001             }
01002           this->reset_flush_timer ();
01003         }
01004       return 1;
01005     }
01006 
01007   return 0;
01008 }

int TAO_Transport::enqueue_incoming_message TAO_Queued_Data   queueable_message
 

Queue up queueable_message as a completely-received incoming message.

This method queues up a completely-received queueable GIOP message (i.e., it must be dynamically-allocated). It does not assemble a complete GIOP message; that should be done prior to calling this message, and is currently done in handle_input_i.

This does, however, assure that a completely-received GIOP FRAGMENT gets associated with any previously-received related fragments. It does this through collaboration with the messaging object (since fragment reassembly is protocol specific).

Parameters:
queueable_message  instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received
Returns:
0 successfully enqueued queueable_message
Returns:
-1 failed to enqueue queueable_message
Todo:
How do we indicate what may have failed?

Definition at line 1767 of file Transport.cpp.

References ACE_ERROR_RETURN, TAO_Queued_Data::consolidate, ACE_Message_Block::cont, TAO_Incoming_Message_Queue::enqueue_tail, TAO_Incoming_Message_Queue::find_fragment, incoming_message_queue_, LM_ERROR, TAO_Queued_Data::major_version_, TAO_Queued_Data::minor_version_, TAO_Queued_Data::more_fragments_, TAO_Queued_Data::msg_block_, ACE_Message_Block::rd_ptr, TAO_Queued_Data::release, TAO_Queued_Data::request_id_, TAO_GIOP_MESSAGE_FRAGMENT_HEADER, and TAO_GIOP_MESSAGE_HEADER_LEN.

Referenced by handle_input_i.

01768 {
01769   // Get the GIOP version
01770   CORBA::Octet major  = queueable_message->major_version_;
01771   CORBA::Octet minor  = queueable_message->minor_version_;
01772   CORBA::UShort whole = major << 8 | minor;
01773 
01774   // Set up a couple of pointers that are shared by the code
01775   // for the different GIOP versions.
01776   ACE_Message_Block *mb = 0;
01777   TAO_Queued_Data *fragment_message = 0;
01778 
01779   switch(whole)
01780     {
01781     case 0x0100:  // GIOP 1.0
01782       if (!queueable_message->more_fragments_)
01783         return this->incoming_message_queue_.enqueue_tail (
01784                                                   queueable_message);
01785 
01786       // Fragments aren't supported in 1.0.  This is an error and
01787       // we should reject it somehow.  What do we do here?  Do we throw
01788       // an exception to the receiving side?  Do we throw an exception
01789       // to the sending side?
01790       //
01791       // At the very least, we need to log the fact that we received
01792       // nonsense.
01793       ACE_ERROR_RETURN ((LM_ERROR,
01794                          "TAO (%P|%t) - "
01795                          "TAO_Transport::enqueue_incoming_message "
01796                          "detected a fragmented GIOP 1.0 message\n"),
01797                         -1);
01798       break;
01799     case 0x0101:  // GIOP 1.1
01800       // In 1.1, fragments kinda suck because they don't have they're
01801       // own message-specific header.  Therefore, we have to do the
01802       // following:
01803       fragment_message =
01804           this->incoming_message_queue_.find_fragment (major, minor);
01805 
01806       // No fragment was found
01807       if (fragment_message == 0)
01808         return this->incoming_message_queue_.enqueue_tail (
01809                                                 queueable_message);
01810 
01811       if (queueable_message->more_fragments_)
01812         {
01813           // Find the last message block in the continuation
01814           mb = fragment_message->msg_block_;
01815           while (mb->cont () != 0)
01816             mb = mb->cont ();
01817 
01818           // Add the current message block to the end of the chain
01819           // after adjusting the read pointer to skip the GIOP header
01820           queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN);
01821           mb->cont (queueable_message->msg_block_);
01822 
01823           // Get rid of the queuable message but save the message block
01824           queueable_message->msg_block_ = 0;
01825           queueable_message->release ();
01826 
01827           // One note is that TAO_Queued_Data contains version numbers,
01828           // but doesn't indicate the actual protocol to which those
01829           // version numbers refer.  That's not a problem, though, because
01830           // instances of TAO_Queued_Data live in a queue, and that queue
01831           // lives in a particular instance of a Transport, and the
01832           // transport instance has an association with a particular
01833           // messaging_object.  The concrete messaging object embodies a
01834           // messaging protocol, and must cover all versions of that
01835           // protocol.  Therefore, we just need to cover the bases of all
01836           // versions of that one protocol.
01837         }
01838       else
01839         {
01840           // There is a complete chain of fragments
01841           fragment_message->consolidate ();
01842 
01843           // Go ahead and enqueue this message
01844           return this->incoming_message_queue_.enqueue_tail (
01845                                                     queueable_message);
01846         }
01847       break;
01848     case 0x0102:  // GIOP 1.2
01849       // In 1.2, we get a little more context.  There's a
01850       // FRAGMENT message-specific header, and inside that is the
01851       // request id with which the fragment is associated.
01852       fragment_message =
01853           this->incoming_message_queue_.find_fragment (
01854                                 queueable_message->request_id_);
01855 
01856       // No fragment was found
01857       if (fragment_message == 0)
01858         return this->incoming_message_queue_.enqueue_tail (
01859                                                 queueable_message);
01860 
01861       if (fragment_message->major_version_ != major ||
01862           fragment_message->minor_version_ != minor)
01863         ACE_ERROR_RETURN ((LM_ERROR,
01864                            "TAO (%P|%t) - "
01865                            "TAO_Transport::enqueue_incoming_message "
01866                            "GIOP versions do not match "
01867                            "(%d.%d != %d.%d\n",
01868                            fragment_message->major_version_,
01869                            fragment_message->minor_version_,
01870                            major, minor),
01871                           -1);
01872 
01873       // Find the last message block in the continuation
01874       mb = fragment_message->msg_block_;
01875       while (mb->cont () != 0)
01876         mb = mb->cont ();
01877 
01878       // Add the current message block to the end of the chain
01879       // after adjusting the read pointer to skip the GIOP header
01880       queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN +
01881                                             TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
01882       mb->cont (queueable_message->msg_block_);
01883 
01884       // Remove our reference to the message block.  At this point
01885       // the message block of the fragment head owns it as part of a
01886       // chain
01887       queueable_message->msg_block_ = 0;
01888 
01889       if (!queueable_message->more_fragments_)
01890         {
01891           // This is the end of the fragments for this request
01892           fragment_message->consolidate ();
01893         }
01894 
01895       // Get rid of the queuable message
01896       queueable_message->release ();
01897       break;
01898     default:
01899       if (!queueable_message->more_fragments_)                   
01900         return this->incoming_message_queue_.enqueue_tail (
01901                                                   queueable_message);
01902       // This is an unknown GIOP version
01903       ACE_ERROR_RETURN ((LM_ERROR,
01904                          "TAO (%P|%t) - "
01905                          "TAO_Transport::enqueue_incoming_message "
01906                          "can not handle a fragmented GIOP %d.%d "
01907                          "message\n", major, minor),
01908                         -1);
01909     }
01910 
01911   return 0;
01912 }

virtual ACE_Event_Handler* TAO_Transport::event_handler_i void    [protected, pure virtual]
 

Return the event handler used to receive notifications from the Reactor.

Normally a concrete TAO_Transport object has-a ACE_Event_Handler member that function as an adapter between the ACE_Reactor framework and the TAO pluggable protocol framework. In all the protocols implemented so far this role is fullfilled by an instance of ACE_Svc_Handler.

Todo:
Since we only use a limited functionality of ACE_Svc_Handler we could probably implement a generic adapter class (TAO_Transport_Event_Handler or something), this will reduce footprint and simplify the process of implementing a pluggable protocol.

Implemented in TAO_IIOP_Transport.

Referenced by cancel_output_i, check_buffering_constraints_i, check_event_handler_i, drain_queue_i, notify_reactor, provide_handle, and schedule_output_i.

ACE_INLINE void TAO_Transport::first_request_sent  
 

Set the state of the first_request_ flag to 0.

Definition at line 162 of file Transport.inl.

References first_request_.

Referenced by TAO_IIOP_Transport::send_request.

00163 {
00164   this->first_request_ = 0;
00165 }

ACE_INLINE int TAO_Transport::flush_timer_pending void    const [private]
 

Check if the flush timer is still pending.

Definition at line 98 of file Transport.inl.

References flush_timer_id_.

Referenced by check_buffering_constraints_i, drain_queue_i, and handle_timeout.

00099 {
00100   return this->flush_timer_id_ != -1;
00101 }

int TAO_Transport::generate_locate_request TAO_Target_Specification   spec,
TAO_Operation_Details   opdetails,
TAO_OutputCDR   output
 

This is a request for the transport object to write a LocateRequest header before it is sent out.

Definition at line 295 of file Transport.cpp.

References ACE_DEBUG, TAO_Pluggable_Messaging::generate_locate_request_header, LM_DEBUG, and messaging_object.

00299 {
00300   if (this->messaging_object ()->generate_locate_request_header (opdetails,
00301                                                                  spec,
00302                                                                  output) == -1)
00303     {
00304       if (TAO_debug_level > 0)
00305         ACE_DEBUG ((LM_DEBUG,
00306                     "TAO (%P|%t) - Transport[%d]::generate_locate_request, "
00307                     "error while marshalling the LocateRequest header\n",
00308                     this->id ()));
00309 
00310       return -1;
00311     }
00312 
00313   return 0;
00314 }

int TAO_Transport::generate_request_header TAO_Operation_Details   opd,
TAO_Target_Specification   spec,
TAO_OutputCDR   msg
[virtual]
 

This is a request for the transport object to write a request header before it sends out the request.

Reimplemented in TAO_IIOP_Transport.

Definition at line 318 of file Transport.cpp.

References ACE_DEBUG, TAO_ORB_Core::codeset_manager, first_request_, TAO_Pluggable_Messaging::generate_request_header, TAO_Codeset_Manager::generate_service_context, LM_DEBUG, messaging_object, and orb_core.

Referenced by TAO_IIOP_Transport::generate_request_header.

00322 {
00323   // codeset service context is only supposed to be sent in the first request
00324   // on a particular connection.
00325   TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager();
00326   if (csm && this->first_request_)
00327     csm->generate_service_context( opdetails, *this );
00328 
00329   if (this->messaging_object ()->generate_request_header (opdetails,
00330                                                           spec,
00331                                                           output) == -1)
00332     {
00333       if (TAO_debug_level > 0)
00334         ACE_DEBUG ((LM_DEBUG,
00335                     "(%P|%t) - Transport[%d]::generate_request_header, "
00336                     "error while marshalling the Request header\n",
00337                     this->id()));
00338 
00339       return -1;
00340     }
00341 
00342   return 0;
00343 }

int TAO_Transport::handle_input_i TAO_Resume_Handle   rh,
ACE_Time_Value   max_wait_time = 0,
int    block = 0
[virtual]
 

Callback to read incoming data.

The ACE_Event_Handler adapter invokes this method as part of its handle_input() operation.

Todo:
: the method name is confusing! Calling it handle_input() would probably make things easier to understand and follow!

Once a complete message is read the Transport class delegates on the Messaging layer to invoke the right upcall (on the server) or the TAO_Reply_Dispatcher (on the client side).

Parameters:
max_wait_time  In some cases the I/O is synchronous, e.g. a thread-per-connection server or when Wait_On_Read is enabled. In those cases a maximum read time can be specified.
block  Is deprecated and ignored.

Definition at line 1319 of file Transport.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_HEX_DUMP, ACE_MIN, ACE_TEXT, TAO_Queued_Data::COMPLETED, ACE_Message_Block::copy, TAO_Queued_Data::current_state_, ACE_Message_Block::DONT_DELETE, enqueue_incoming_message, ACE_CDR::grow, incoming_message_queue_, TAO_ORB_Core::input_cdr_dblock_allocator, ACE_Message_Block::length, LM_DEBUG, TAO_ORB_Core::locking_strategy, TAO_Queued_Data::make_completed_message, TAO_Queued_Data::make_uncompleted_message, ACE_CDR::mb_align, ACE_Message_Block::MB_DATA, ACE_OS_String::memset, messaging_object, TAO_Queued_Data::missing_data_bytes_, TAO_Queued_Data::msg_block_, orb_core_, TAO_ORB_Core::orb_params, process_queue_head, TAO_Incoming_Message_Queue::queue_length, ACE_Message_Block::rd_ptr, recv, recv_buffer_size_, send_connection_closed_notifications, TAO_ORB_Parameters::single_read_optimization, ACE_Message_Block::size, ACE_Message_Block::space, ssize_t, TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, TAO_MAXBUFSIZE, try_to_complete, uncompleted_message_, TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER, TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD, and ACE_Message_Block::wr_ptr.

Referenced by TAO_Notify_Handler::handle_input, TAO_Connection_Handler::handle_input_eh, TAO_Connection_Handler::svc_i, and TAO_Wait_On_Read::wait.

01322 {
01323   CTHack cthack;
01324 
01325   if (TAO_debug_level > 3)
01326     {
01327       ACE_DEBUG ((LM_DEBUG,
01328                   "TAO (%P|%t) - Transport[%d]::handle_input_i\n",
01329                   this->id ()));
01330     }
01331 
01332   // First try to process messages off the head of the incoming queue.
01333   int retval = this->process_queue_head (rh);
01334   if (retval <= 0)
01335     {
01336       if (retval == -1)
01337         {
01338           if (TAO_debug_level > 2)
01339             ACE_DEBUG ((LM_DEBUG,
01340                         "TAO (%P|%t) - Transport[%d]::handle_input_i, "
01341                         "error while parsing the head of the queue\n",
01342                         this->id()));
01343 
01344           this->send_connection_closed_notifications ();
01345         }
01346       return retval;
01347     }
01348 
01349   // If there are no messages then we can go ahead to read from the
01350   // handle for further reading..
01351 
01352   // The buffer on the stack which will be used to hold the input
01353   // messages
01354   char buf[TAO_MAXBUFSIZE];
01355 
01356 #if defined (ACE_HAS_PURIFY)
01357   (void) ACE_OS::memset (buf,
01358                          '\0',
01359                          sizeof buf);
01360 #endif /* ACE_HAS_PURIFY */
01361 
01362   // Create a data block
01363   ACE_Data_Block db (sizeof (buf),
01364                      ACE_Message_Block::MB_DATA,
01365                      buf,
01366                      this->orb_core_->input_cdr_buffer_allocator (),
01367                      this->orb_core_->locking_strategy (),
01368                      ACE_Message_Block::DONT_DELETE,
01369                      this->orb_core_->input_cdr_dblock_allocator ());
01370 
01371   // Create a message block
01372   ACE_Message_Block message_block (&db,
01373                                    ACE_Message_Block::DONT_DELETE,
01374                                    this->orb_core_->input_cdr_msgblock_allocator ());
01375 
01376   // We'll loop trying to complete the message this number of times,
01377   // and that's it.
01378   unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS;
01379 
01380   unsigned int did_queue_message = 0;
01381 
01382   // Align the message block
01383   ACE_CDR::mb_align (&message_block);
01384 
01385   size_t recv_size = 0;
01386   if (this->orb_core_->orb_params ()->single_read_optimization ())
01387     {
01388       recv_size = message_block.space ();
01389     }
01390   else
01391     {
01392       recv_size = this->messaging_object ()->header_length ();
01393     }
01394 
01395   // Saving the size of the received buffer in case any one needs to
01396   // get the size of the message thats received in the
01397   // context. Obviously the value will be changed for each recv call
01398   // and the user is supposed to invoke the accessor only in the
01399   // invocation context to get meaningful information.
01400   this->recv_buffer_size_ = recv_size;
01401 
01402   // Read the message into the  message block that we have created on
01403   // the stack.
01404   ssize_t n = this->recv (message_block.wr_ptr (),
01405                           recv_size,
01406                           max_wait_time);
01407 
01408   // If there is an error return to the reactor..
01409   if (n <= 0)
01410     {
01411       if (n == -1)
01412         this->send_connection_closed_notifications ();
01413 
01414       return n;
01415     }
01416 
01417   if (TAO_debug_level > 2)
01418     {
01419       ACE_DEBUG ((LM_DEBUG,
01420                   "TAO (%P|%t) - Transport[%d]::handle_input_i: "
01421                   "read %d bytes\n",
01422                   this->id (), n));
01423     }
01424 
01425   // Set the write pointer in the stack buffer
01426   message_block.wr_ptr (n);
01427 
01428   if (TAO_debug_level >= 10)
01429     ACE_HEX_DUMP ((LM_DEBUG,
01430                    (const char *) message_block.rd_ptr (),
01431                    message_block.length (),
01432                    ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket")));
01433 
01434 
01435 complete_message_and_possibly_enqueue:
01436   // Check to see if we're still working to complete a message
01437   if (this->uncompleted_message_)
01438     {
01439       // try to complete it
01440 
01441       // on exit from this frame we have one of the following states:
01442       //
01443       // (a) an uncompleted message still in uncompleted_message_
01444       //     AND message_block is empty
01445       //
01446       // (b) uncompleted_message_ zero, the completed message at the
01447       //     tail of the incoming queue; message_block could be empty
01448       //     or still contain bytes
01449 
01450       // ==> repeat
01451       do
01452         {
01453           /*
01454            * Append the "right number of bytes" to uncompleted_message_
01455            */
01456           // ==> right_number_of_bytes = MIN(bytes missing from
01457           //          uncompleted_message_, length of message_block);
01458           size_t right_number_of_bytes =
01459             ACE_MIN (this->uncompleted_message_->missing_data_bytes_,
01460                      message_block.length () );
01461 
01462           if (TAO_debug_level > 2)
01463             {
01464               ACE_DEBUG ((LM_DEBUG,
01465                           "(%P|%t) Transport[%d]::handle_input_i: "
01466                           "trying to use %u (of %u) "
01467                           "bytes to complete message missing %u bytes\n",
01468                           this->id (),
01469                           right_number_of_bytes,
01470                           message_block.length (),
01471                           this->uncompleted_message_->missing_data_bytes_));
01472             }
01473 
01474           // ==> append right_number_of_bytes from message_block
01475           //     to uncomplete_message_ & update read pointer of
01476           //     message_block;
01477 
01478           // 1. we assume that uncompleted_message_.msg_block_'s
01479           //    wr_ptr is properly maintained
01480           // 2. we presume that uncompleted_message_.msg_block was
01481           //    allocated with enough space to contain the *entire*
01482           //    expected GIOP message, so this copy shouldn't involve an
01483           //    additional allocation
01484           this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (),
01485                                                         right_number_of_bytes);
01486           this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
01487           message_block.rd_ptr (right_number_of_bytes);
01488 
01489           switch (this->uncompleted_message_->current_state_)
01490             {
01491             case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER:
01492               if (this->messaging_object()->check_for_valid_header (
01493                     *this->uncompleted_message_->msg_block_))
01494                 {
01495                   // ==> update bytes missing from uncompleted_message_
01496                   //     with size of message from valid header;
01497                   this->messaging_object()->set_queued_data_from_message_header (
01498                     this->uncompleted_message_,
01499                     *this->uncompleted_message_->msg_block_);
01500                   // ==> change state of uncompleted_event_ to
01501                   // WAITING_TO_COMPLETE_PAYLOAD;
01502                   this->uncompleted_message_->current_state_ =
01503                     TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD;
01504 
01505                   // ==> Resize the message block to have capacity for
01506                   // the rest of the incoming message
01507                   ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
01508                   ACE_CDR::grow (&mb,
01509                                  mb.size ()
01510                                  + this->uncompleted_message_->missing_data_bytes_);
01511 
01512                   if (TAO_debug_level > 2)
01513                     {
01514                       ACE_DEBUG ((LM_DEBUG,
01515                                   "(%P|%t) Transport[%d]::handle_input_i: "
01516                                   "found a valid header in the message; "
01517                                   "waiting for %u bytes to complete payload\n",
01518                                   this->id (),
01519                                   this->uncompleted_message_->missing_data_bytes_));
01520                     }
01521 
01522                   // Continue the loop...
01523                   continue;
01524                 }
01525               else
01526                 {
01527                   // What the heck will we do with a bad header?  Just
01528                   // better to close the connection and let things
01529                   // re-train from there.
01530                   return -1;
01531 #if 0 // I don't think I need this clause, but I'm leaving it just in case.
01532                   // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes;
01533                   this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
01534                   ACE_ASSERT (this->uncompleted_message->missing_data_bytes_ > 0);
01535 #endif
01536                 }
01537               break;
01538 
01539             case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
01540               // Here we have an opportunity to try to finish reading the
01541               // uncompleted message.  This is a Good Idea(TM) because there are
01542               // good odds that either more data came available since the last
01543               // time we read, or that we simply didn't read the whole message on
01544               // the first read.  So, we try to read again.
01545               //
01546               // NOTE! this changes this->uncompleted_message_!
01547               this->try_to_complete (max_wait_time);
01548 
01549               // ==> if (bytes missing from uncompleted_message_ == 0)
01550               if (this->uncompleted_message_->missing_data_bytes_ == 0)
01551                 {
01552                   /*
01553                    * We completed the message!  Hooray!
01554                    */
01555                   // ==> place uncompleted_message_ (which is now
01556                   // complete!) at the tail of the incoming message
01557                   // queue;
01558 
01559                   // ---> NOTE: whoever pulls this off the queue must delete it!
01560                   this->uncompleted_message_->current_state_
01561                     = TAO_Queued_Data::COMPLETED;
01562 
01563                   // @@CJC NEED TO CHECK RETURN VALUE HERE!
01564                   this->enqueue_incoming_message (this->uncompleted_message_);
01565                   did_queue_message = 1;
01566                   // zero out uncompleted_message_;
01567                   this->uncompleted_message_ = 0;
01568 
01569                   if (TAO_debug_level > 2)
01570                     {
01571                       ACE_DEBUG ((LM_DEBUG,
01572                                   "(%P|%t) Transport[%d]::handle_input_i: "
01573                                   "completed and queued message for processing!\n",
01574                                   this->id ()));
01575                     }
01576 
01577                 }
01578               else
01579                 {
01580 
01581                   if (TAO_debug_level > 2)
01582                     {
01583                       ACE_DEBUG ((LM_DEBUG,
01584                                   "(%P|%t) Transport[%d]::handle_input_i: "
01585                                   "still need %u bytes to complete uncompleted message.\n",
01586                                   this->id (),
01587                                   this->uncompleted_message_->missing_data_bytes_));
01588                     }
01589                 }
01590               break;
01591 
01592             default:
01593               // @@CJC What do we do here?!
01594               ACE_ASSERT (! "Transport::handle_input_i: unexpected state"
01595                           "in uncompleted_message_");
01596             }
01597         }
01598       // Does the order of the checks matter?  In both (a) and (b),
01599       // message_block is empty, but only in (b) is there no
01600       // uncompleted_message_.
01601       // ==> until (message_block is empty || there is no uncompleted_message_);
01602       //    or, rewritten in C++ looping constructs
01603       // ==> while ( ! message_block is empty && there is an uncompleted_message_ );
01604       while (message_block.length() != 0 && this->uncompleted_message_);
01605     }
01606 
01607   // *****************************
01608   // @@ CJC
01609   //
01610   // Once upon a time we tried to complete reading the uncompleted
01611   // message here, but testing found that completing later worked
01612   // better.
01613   // *****************************
01614 
01615 
01616   // At this point, there should be nothing in uncompleted_message_.
01617   // We now need to chop up the bytes in message_block and store any
01618   // complete messages in the incoming message queue.
01619   //
01620   // ==> if (message_block still has data)
01621   if (message_block.length () != 0)
01622     {
01623       TAO_Queued_Data *complete_message = 0;
01624       do
01625         {
01626           if (TAO_debug_level >= 10)
01627             {
01628               ACE_DEBUG ((LM_DEBUG,
01629                           ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ")
01630                           ACE_TEXT("extracting complete messages\n")));
01631               ACE_HEX_DUMP ((LM_DEBUG,
01632                              message_block.rd_ptr (),
01633                              message_block.length (),
01634                              ACE_TEXT ("           from this message buffer")));
01635             }
01636 
01637           complete_message =
01638             TAO_Queued_Data::make_completed_message (
01639               message_block, *this->messaging_object ());
01640           if (complete_message)
01641             {
01642               this->enqueue_incoming_message (complete_message);
01643               did_queue_message = 1;
01644             }
01645         }
01646       while (complete_message != 0);
01647       // On exit from this frame we have one of the following states:
01648       // (a) message_block is empty
01649       // (b) message_block contains bytes from a partial message
01650     }
01651 
01652   // If, at this point, there's still data in message_block, it's
01653   // an incomplete message.  Therefore, we stuff it into the
01654   // uncompleted_message_ and clear out message_block.
01655   // ==> if (message_block still has data)
01656   if (message_block.length () != 0)
01657     {
01658       // duplicate message_block remainder into this->uncompleted_message_
01659       ACE_ASSERT (this->uncompleted_message_ == 0);
01660       this->uncompleted_message_ =
01661         TAO_Queued_Data::make_uncompleted_message (&message_block,
01662                                                    *this->messaging_object ());
01663       ACE_ASSERT (this->uncompleted_message_ != 0);
01664     }
01665 
01666   // We should have consumed ALL the bytes by now.
01667   ACE_ASSERT (message_block.length () == 0);
01668 
01669   //
01670   // We don't want to try to re-read earlier because we may not have
01671   // an uncompleted message until we get to this point.  So, if we did
01672   // it earlier, we could have missed the opportunity to complete it
01673   // and dispatch.
01674   //
01675   // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again
01676   // to increase throughput!
01677 
01678   if (this->uncompleted_message_)
01679     {
01680       if (number_of_read_attempts--)
01681         {
01682           // We try to read again just in case more data arrived while
01683           // we were doing the stuff above.  This way, we can increase
01684           // throughput without much of a penalty.
01685 
01686           if (TAO_debug_level > 2)
01687             {
01688               ACE_DEBUG ((LM_DEBUG,
01689                           "(%P|%t) Transport[%d]::handle_input_i: "
01690                           "still have an uncompleted message; "
01691                           "will try %d more times before letting "
01692                           "somebody else have a chance.\n",
01693                           this->id (),
01694                           number_of_read_attempts));
01695             }
01696 
01697           goto complete_message_and_possibly_enqueue;
01698         }
01699       else
01700         {
01701           // The queue should be empty because it should have been processed
01702           // above.  But I wonder if I should put a check in here anyway.
01703           if (TAO_debug_level > 2)
01704             {
01705               ACE_DEBUG ((LM_DEBUG,
01706                           "(%P|%t) Transport[%d]::handle_input_i: "
01707                           "giving up reading for now and returning "
01708                           "with incoming queue length = %d\n",
01709                           this->id (),
01710                           this->incoming_message_queue_.queue_length ()));
01711               if (this->uncompleted_message_)
01712                 ACE_DEBUG ((LM_DEBUG,
01713                             "(%P|%t) Transport[%d]::handle_input_i: "
01714                             "missing bytes from uncompleted message = %u\n",
01715                             this->uncompleted_message_->missing_data_bytes_));
01716             }
01717           return 1;
01718         }
01719     }
01720 
01721   // **** END   CJC PMG CHANGES ****
01722 
01723   return did_queue_message ? this->process_queue_head (rh) : 1;
01724 }

int TAO_Transport::handle_output void   
 

Callback method to reactively drain the outgoing data queue.

Definition at line 408 of file Transport.cpp.

References ACE_DEBUG, drain_queue, and LM_DEBUG.

Referenced by TAO_Block_Flushing_Strategy::flush_message, TAO_Block_Flushing_Strategy::flush_transport, and TAO_Connection_Handler::handle_output_eh.

00409 {
00410   if (TAO_debug_level > 3)
00411     {
00412       ACE_DEBUG ((LM_DEBUG,
00413                   "TAO (%P|%t) - Transport[%d]::handle_output\n",
00414                   this->id ()));
00415     }
00416 
00417   // The flushing strategy (potentially via the Reactor) wants to send
00418   // more data, first check if there is a current message that needs
00419   // more sending...
00420   int retval = this->drain_queue ();
00421 
00422   if (TAO_debug_level > 3)
00423     {
00424       ACE_DEBUG ((LM_DEBUG,
00425                   "TAO (%P|%t) - Transport[%d]::handle_output, "
00426                   "drain_queue returns %d/%d\n",
00427                   this->id (),
00428                   retval, errno));
00429     }
00430 
00431   // Any errors are returned directly to the Reactor
00432   return retval;
00433 }

int TAO_Transport::handle_timeout const ACE_Time_Value   current_time,
const void *    act
 

The timeout callback, invoked when any of the timers related to this transport expire.

Parameters:
current_time  The current time as reported from the Reactor
act  The Asynchronous Completion Token. Currently it is interpreted as follows:
  • If the ACT is the address of this->current_deadline_ the queueing timeout has expired and the queue should start flushing.
Returns:
Returns 0 if there are no problems, -1 if there is an error
Todo:
In the future this function could be used to expire messages (oneways) that have been sitting for too long on the queue.

Definition at line 809 of file Transport.cpp.

References ACE_DEBUG, current_deadline_, flush_timer_pending, TAO_ORB_Core::flushing_strategy, LM_DEBUG, orb_core, reset_flush_timer, and TAO_Flushing_Strategy::schedule_output.

Referenced by TAO_Transport_Timer::handle_timeout.

00811 {
00812   if (TAO_debug_level > 6)
00813     {
00814       ACE_DEBUG ((LM_DEBUG,
00815                   "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, "
00816                   "timer expired\n",
00817                   this->id ()));
00818     }
00819 
00820   /// This is the only legal ACT in the current configuration....
00821   if (act != &this->current_deadline_)
00822     return -1;
00823 
00824   if (this->flush_timer_pending ())
00825     {
00826       // The timer is always a oneshot timer, so mark is as not
00827       // pending.
00828       this->reset_flush_timer ();
00829 
00830       TAO_Flushing_Strategy *flushing_strategy =
00831         this->orb_core ()->flushing_strategy ();
00832       (void) flushing_strategy->schedule_output (this);
00833     }
00834   return 0;
00835 }

ACE_INLINE void TAO_Transport::id size_t    id
 

Definition at line 80 of file Transport.inl.

References id, and id_.

00081 {
00082   this->id_ = id;
00083 }

ACE_INLINE size_t TAO_Transport::id void    const
 

Set and Get the identifier for this transport instance.

If not set, this will return an integer representation of the this pointer for the instance on which it's called.

Definition at line 74 of file Transport.inl.

References id_.

Referenced by TAO_IIOP_Connection_Handler::activate, TAO_Transport_Cache_Manager::bind_i, TAO_Connection_Handler::close_connection_eh, TAO_Connector::connect, TAO_Transport_Cache_Manager::find_i, TAO_Connection_Handler::handle_input_eh, id, TAO_IIOP_Connection_Handler::open, TAO_Transport_Cache_Manager::purge, TAO_GIOP_Utils::read_buffer, release, and TAO_Leader_Follower::wait_for_event.

00075 {
00076   return this->id_;
00077 }

int TAO_Transport::idle_after_reply void    [virtual]
 

Request is sent and the reply is received. Idle the transport now.

Definition at line 276 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_reply, and tms.

Referenced by TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base, and TAO_GIOP_Synch_Invocation::~TAO_GIOP_Synch_Invocation.

00277 {
00278   return this->tms ()->idle_after_reply ();
00279 }

int TAO_Transport::idle_after_send void    [virtual]
 

Request has been just sent, but the reply is not received. Idle the transport now.

Definition at line 270 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::idle_after_send, and tms.

Referenced by TAO_IIOP_Transport::send_request.

00271 {
00272   return this->tms ()->idle_after_send ();
00273 }

TAO_Connection_Handler * TAO_Transport::invalidate_event_handler void    [private]
 

Grab the mutex and then call invalidate_event_handler_i().

Definition at line 1113 of file Transport.cpp.

References ACE_GUARD_RETURN, ACE_MT, and invalidate_event_handler_i.

Referenced by close_connection, close_connection_no_purge, and connection_handler_closing.

01114 {
01115   ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0));
01116 
01117   return this->invalidate_event_handler_i ();
01118 }

virtual TAO_Connection_Handler* TAO_Transport::invalidate_event_handler_i void    [protected, pure virtual]
 

Called by connection_handler_closing() to signal that the protocol-specific transport should dissociate itself with the protocol-specific connection handler.

Typically, this just sets the pointer to the associated connection handler to zero, although it could also clear out any additional resources associated with the handler association.

Returns:
The old event handler

Implemented in TAO_IIOP_Transport.

Referenced by close_connection_i, and invalidate_event_handler.

ACE_INLINE CORBA::Boolean TAO_Transport::is_tcs_set   const
 

Return true if the tcs has been set.

Definition at line 156 of file Transport.inl.

References tcs_set_.

Referenced by TAO_GIOP_Invocation::perform_call, TAO_Codeset_Manager::process_service_context, and TAO_Codeset_Manager::set_tcs.

00157 {
00158   return tcs_set_;
00159 }

int TAO_Transport::make_idle void   
 

Cache management.

Definition at line 390 of file Transport.cpp.

References TAO_Transport_Cache_Manager::make_idle, and transport_cache_manager.

Referenced by TAO_Exclusive_TMS::idle_after_reply, TAO_Muxed_TMS::idle_after_send, TAO_GIOP_Invocation::perform_call, and TAO_IIOP_Connection_Handler::process_listen_point_list.

00391 {
00392   return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
00393 }

virtual int TAO_Transport::messaging_init CORBA::Octet    major,
CORBA::Octet    minor
[pure virtual]
 

Initialising the messaging object. This would be used by the connector side. On the acceptor side the connection handler would take care of the messaging objects.

Implemented in TAO_IIOP_Transport.

Referenced by TAO_GIOP_Invocation::perform_call.

virtual TAO_Pluggable_Messaging* TAO_Transport::messaging_object void    [protected, pure virtual]
 

Return the messaging object that is used to format the data that needs to be sent.

Implemented in TAO_IIOP_Transport.

Referenced by generate_locate_request, generate_request_header, handle_input_i, process_parsed_messages, and send_connection_closed_notifications_i.

int TAO_Transport::notify_reactor void    [private]
 

Definition at line 2091 of file Transport.cpp.

References ACE_DEBUG, TAO_Notify_Handler::create_handler, event_handler_i, ACE_Event_Handler::get_handle, TAO_Wait_Strategy::is_registered, LM_DEBUG, ACE_Reactor::notify, orb_core, TAO_ORB_Core::reactor, ACE_Event_Handler::READ_MASK, TAO_ORB_Core::transport_message_buffer_allocator, and ws_.

Referenced by process_queue_head.

02092 {
02093   if (!this->ws_->is_registered ())
02094     return 0;
02095 
02096   ACE_Event_Handler *eh =
02097     this->event_handler_i ();
02098 
02099   if (eh == 0)
02100     return -1;
02101 
02102   // Get the reactor associated with the event handler
02103   ACE_Reactor *reactor =
02104     this->orb_core ()->reactor ();
02105 
02106   if (reactor == 0)
02107     return -1;
02108 
02109   // NOTE: Instead of creating the handler seperately, it would be
02110   // awesome if we  could create the handler when we create the
02111   // TAO_Queued_Data. That would save us an allocation.
02112   TAO_Notify_Handler *nh =
02113     TAO_Notify_Handler::create_handler (
02114         this,
02115         eh->get_handle (),
02116         this->orb_core ()->transport_message_buffer_allocator ());
02117 
02118   if (TAO_debug_level > 0)
02119     {
02120       ACE_DEBUG ((LM_DEBUG,
02121                   "TAO (%P|%t) - Transport[%d]::notify_reactor, "
02122                   "notify to Reactor\n",
02123                   this->id ()));
02124     }
02125 
02126 
02127   // Send a notification to the reactor...
02128   int retval = reactor->notify (nh,
02129                                 ACE_Event_Handler::READ_MASK);
02130 
02131   if (retval < 0 && TAO_debug_level > 2)
02132     {
02133       // @@todo: need to think about what is the action that
02134       // we can take when we get here.
02135       ACE_DEBUG ((LM_DEBUG,
02136                   "TAO (%P|%t) - Transport[%d]::process_queue_head, "
02137                   "notify to the reactor failed..\n",
02138                   this->id ()));
02139     }
02140 
02141   return 1;
02142 }

void TAO_Transport::operator= const TAO_Transport &    [private]
 

ACE_INLINE TAO_ORB_Core * TAO_Transport::orb_core void    const
 

Access the ORB that owns this connection.

Definition at line 13 of file Transport.inl.

References orb_core_.

Referenced by TAO_Connection_Handler::close_connection_eh, drain_queue, TAO_Reactive_Flushing_Strategy::flush_message, TAO_Leader_Follower_Flushing_Strategy::flush_message, TAO_Reactive_Flushing_Strategy::flush_transport, TAO_Leader_Follower_Flushing_Strategy::flush_transport, generate_request_header, TAO_IIOP_Transport::generate_request_header, handle_timeout, notify_reactor, send_message_shared_i, send_reply_message_i, send_synchronous_message_i, TAO_IIOP_Transport::set_bidir_context_info, TAO_Wait_On_Reactor::wait, and TAO_Wait_On_Leader_Follower::wait.

00014 {
00015   return this->orb_core_;
00016 }

int TAO_Transport::process_parsed_messages TAO_Queued_Data   qd,
TAO_Resume_Handle   rh
[protected]
 

Process the message by sending it to the higher layers of the ORB.

Definition at line 1916 of file Transport.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, TAO_Transport_Mux_Strategy::dispatch_reply, LM_DEBUG, LM_ERROR, messaging_object, TAO_Queued_Data::msg_type_, TAO_Pluggable_Messaging::process_request_message, TAO_Resume_Handle::resume_handle, send_connection_closed_notifications, TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, TAO_PLUGGABLE_MESSAGE_LOCATEREPLY, TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST, TAO_PLUGGABLE_MESSAGE_MESSAGERROR, TAO_PLUGGABLE_MESSAGE_REPLY, TAO_PLUGGABLE_MESSAGE_REQUEST, TAO_Pluggable_Message_Type, and tms.

Referenced by process_queue_head.

01918 {
01919   // Get the <message_type> that we have received
01920   TAO_Pluggable_Message_Type t =  qd->msg_type_;
01921 
01922   int result = 0;
01923   if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
01924     {
01925       if (TAO_debug_level > 0)
01926         ACE_DEBUG ((LM_DEBUG,
01927                     "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
01928                     "received CloseConnection message %p\n",
01929                     this->id(), ""));
01930 
01931       this->send_connection_closed_notifications ();
01932 
01933       // Return a "-1" so that the next stage can take care of
01934       // closing connection and the necessary memory management.
01935       return -1;
01936     }
01937   else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST ||
01938            t == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST)
01939     {
01940       // Ready to process a request. Increment the refcount of <this
01941       // transport>. Theoretically, after handler resumption another
01942       // thread can access this very same transport and can even close
01943       // this. To have a valid Transport object for further processing
01944       // we should increment the refcount. Please see Bug 1382 for
01945       // more details.
01946       // REFCNT: Matched by the release before returning.
01947 
01948       // This generic class takes care of everything.
01949       TAO_Transport_Refcount_Guard rg (this);
01950 
01951       // Let us resume the handle before we go ahead to process the
01952       // request. This will open up the handle for other threads.
01953       rh.resume_handle ();
01954 
01955       if (this->messaging_object ()->process_request_message (
01956             this,
01957             qd) == -1)
01958         {
01959           this->send_connection_closed_notifications ();
01960 
01961 
01962           // Return a "-1" so that the next stage can take care of
01963           // closing connection and the necessary memory management.
01964           return -1;
01965         }
01966     }
01967   else if (t == TAO_PLUGGABLE_MESSAGE_REPLY ||
01968            t == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
01969     {
01970       // Please see ..else if (XXX_REQUEST) for whys and whats..
01971       TAO_Transport_Refcount_Guard rg (this);
01972       rh.resume_handle ();
01973 
01974       // @@todo: Maybe the input_cdr can be constructed from the
01975       // message_block
01976       TAO_Pluggable_Reply_Params params (this->orb_core ());
01977 
01978 
01979       if (this->messaging_object ()->process_reply_message (params,
01980                                                             qd) == -1)
01981         {
01982           if (TAO_debug_level > 0)
01983             ACE_DEBUG ((LM_DEBUG,
01984                         "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
01985                         "error in process_reply_message %p\n",
01986                         this->id (), ""));
01987 
01988           this->send_connection_closed_notifications ();
01989           return -1;
01990         }
01991 
01992       result = this->tms ()->dispatch_reply (params);
01993 
01994       if (result == -1)
01995         {
01996           // Something really critical happened, we will forget about
01997           // every reply on this connection.
01998           if (TAO_debug_level > 0)
01999             ACE_ERROR ((LM_ERROR,
02000                         "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
02001                         "dispatch reply failed\n",
02002                         this->id ()));
02003 
02004           this->send_connection_closed_notifications ();
02005           return -1;
02006         }
02007 
02008     }
02009   else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
02010     {
02011       if (TAO_debug_level)
02012         {
02013           ACE_ERROR_RETURN ((LM_ERROR,
02014                              "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
02015                              "received MessageError, closing connection\n",
02016                              this->id ()),
02017                             -1);
02018         }
02019     }
02020 
02021   // If not, just return back..
02022   return 0;
02023 }

int TAO_Transport::process_queue_head TAO_Resume_Handle   rh [private]
 

Process the message that is in the head of the incoming queue. If there are more messages in the queue, this method calls this->notify_reactor () to wake up a thread.

Returns:
-1 An error occurred; occurs independent presence of messages in the queue. 1 No messages in the queue to process; nothing processed. 0 Messages were in the queue to process and one got processed.

Definition at line 2026 of file Transport.cpp.

References ACE_DEBUG, TAO_Incoming_Message_Queue::dequeue_head, incoming_message_queue_, TAO_Incoming_Message_Queue::is_head_complete, LM_DEBUG, notify_reactor, process_parsed_messages, TAO_Incoming_Message_Queue::queue_length, TAO_Queued_Data::release, TAO_Resume_Handle::set_flag, TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED, and TAO_Resume_Handle::TAO_HANDLE_RESUMABLE.

Referenced by handle_input_i.

02027 {
02028   if (TAO_debug_level > 3)
02029     {
02030       ACE_DEBUG ((LM_DEBUG,
02031                   "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
02032                   this->id ()));
02033     }
02034 
02035   if (this->incoming_message_queue_.is_head_complete () != 1)
02036     return 1;
02037 
02038   // Get the message on the head of the queue..
02039   TAO_Queued_Data *qd =
02040     this->incoming_message_queue_.dequeue_head ();
02041 
02042   if (TAO_debug_level > 3)
02043     {
02044       ACE_DEBUG ((LM_DEBUG,
02045                   "TAO (%P|%t) - Transport[%d]::process_queue_head, "
02046                   "the size of the queue is [%d]\n",
02047                   this->id (),
02048                   this->incoming_message_queue_.queue_length()));
02049     }
02050   // Now that we have pulled out out one message out of the queue,
02051   // check whether we have one more message in the queue...
02052   if (this->incoming_message_queue_.queue_length () > 0)
02053     {
02054       if (TAO_debug_level > 0)
02055         {
02056           ACE_DEBUG ((LM_DEBUG,
02057                       "TAO (%P|%t) - Transport[%d]::process_queue_head, "
02058                       "notify reactor\n",
02059                       this->id ()));
02060 
02061         }
02062       int retval =
02063         this->notify_reactor ();
02064 
02065       if (retval == 1)
02066         {
02067           // Let the class know that it doesn't need to resume  the
02068           // handle..
02069           rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
02070         }
02071       else if (retval < 0)
02072         return -1;
02073     }
02074   else
02075     {
02076       // As we are ready to process the last message just resume
02077       // the handle. Set the flag incase someone had reset the flag..
02078       rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
02079     }
02080 
02081   // Process the message...
02082   int retval = this->process_parsed_messages (qd, rh);
02083 
02084   // Delete the Queued_Data..
02085   TAO_Queued_Data::release (qd);
02086 
02087   return (retval == -1) ? -1 : 0;
02088 }

void TAO_Transport::provide_handle ACE_Handle_Set   reactor_registered,
TAO_EventHandlerSet   unregistered
 

Fill in a handle_set with any associated handler's reactor handle.

Called by the cache when the cache is closing in order to fill in a handle_set in a thread-safe manner.

Parameters:
reactor_registered  the ACE_Handle_Set into which the transport should place any handle registered with the reactor
unregistered  the TAO_EventHandlerSet into which the transport should place any event handler that is not registered with anyone

Definition at line 197 of file Transport.cpp.

References ACE_GUARD, ACE_MT, event_handler_i, ACE_Event_Handler::get_handle, ACE_Unbounded_Set::insert, TAO_Wait_Strategy::is_registered, ACE_Handle_Set::set_bit, and ws_.

00199 {
00200   ACE_MT (ACE_GUARD (ACE_Lock,
00201                      guard,
00202                      *this->handler_lock_));
00203   ACE_Event_Handler *eh = this->event_handler_i ();
00204 
00205   if (eh != 0)
00206     {
00207       if (this->ws_->is_registered ())
00208         {
00209           reactor_registered.set_bit (eh->get_handle ());
00210         }
00211       else
00212         {
00213           unregistered.insert (eh);
00214         }
00215     }
00216 }

void TAO_Transport::purge_entry void   
 

Cache management.

Definition at line 384 of file Transport.cpp.

References TAO_Transport_Cache_Manager::purge_entry, and transport_cache_manager.

Referenced by TAO_Connection_Handler::close_connection_eh.

00385 {
00386   (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00387 }

ACE_INLINE void TAO_Transport::purging_order unsigned long    value
 

Definition at line 64 of file Transport.inl.

References purging_order_.

00065 {
00066   // This should only be called by the Transport Cache Manager when
00067   // it is holding it's lock.
00068   // The transport should still be here since the cache manager still
00069   // has a reference to it.
00070   this->purging_order_ = value;
00071 }

ACE_INLINE unsigned long TAO_Transport::purging_order void    const
 

Get and Set the purging order. The purging strategy uses the set version to set the purging order.

Definition at line 58 of file Transport.inl.

References purging_order_.

Referenced by TAO_Transport_Cache_Manager::sort_set, and TAO_LRU_Connection_Purging_Strategy::update_item.

00059 {
00060   return this->purging_order_;
00061 }

ACE_INLINE int TAO_Transport::queue_is_empty void   
 

Check if there are messages pending in the queue.

Returns:
1 if the queue is empty

Definition at line 86 of file Transport.inl.

References ACE_GUARD_RETURN, and queue_is_empty_i.

Referenced by TAO_Reactive_Flushing_Strategy::flush_transport, TAO_Leader_Follower_Flushing_Strategy::flush_transport, and TAO_Block_Flushing_Strategy::flush_transport.

00087 {
00088   ACE_GUARD_RETURN (ACE_Lock,
00089                     ace_mon,
00090                     *this->handler_lock_,
00091                     -1);
00092   return this->queue_is_empty_i ();
00093 }

int TAO_Transport::queue_is_empty_i void    [private]
 

Check if there are messages pending in the queue.

This version assumes that the lock is already held. Use with care!

Returns:
1 if the queue is empty

Definition at line 760 of file Transport.cpp.

References head_.

Referenced by queue_is_empty, and TAO_Block_Flushing_Strategy::schedule_output.

00761 {
00762   return (this->head_ == 0);
00763 }

int TAO_Transport::recache_transport TAO_Transport_Descriptor_Interface   desc
 

recache ourselves in the cache.

Definition at line 373 of file Transport.cpp.

References TAO_Transport_Cache_Manager::cache_transport, TAO_Transport_Cache_Manager::purge_entry, and transport_cache_manager.

Referenced by TAO_IIOP_Connection_Handler::process_listen_point_list.

00374 {
00375   // First purge our entry
00376   this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
00377 
00378   // Then add ourselves to the cache
00379   return this->transport_cache_manager ().cache_transport (desc,
00380                                                            this);
00381 }

ssize_t TAO_Transport::recv char *    buffer,
size_t    len,
const ACE_Time_Value   timeout = 0
 

Read len bytes from into buf.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Parameters:
buffer  ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.

Definition at line 252 of file Transport.cpp.

References ACE_GUARD_RETURN, ACE_MT, check_event_handler_i, and recv_i.

Referenced by handle_input_i, TAO_GIOP_Utils::read_buffer, TAO_GIOP_Utils::read_bytes_input, and try_to_complete.

00255 {
00256   ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00257                             guard,
00258                             *this->handler_lock_,
00259                             -1));
00260 
00261   if (this->check_event_handler_i ("Transport::recv") == -1)
00262     return -1;
00263 
00264   // now call the template method
00265   return this->recv_i (buffer, len, timeout);
00266 }

size_t TAO_Transport::recv_buffer_size void   
 

Accessor to recv_buffer_size_.

Definition at line 2152 of file Transport.cpp.

References recv_buffer_size_.

02153 {
02154   return this->recv_buffer_size_;
02155 }

virtual ssize_t TAO_Transport::recv_i char *    buffer,
size_t    len,
const ACE_Time_Value   timeout = 0
[protected, pure virtual]
 

Parameters:
buffer  ORB allocated buffer where the data should be @ The ACE_Time_Value *s is just a place holder for now. It is not clear this this is the best place to specify this. The actual timeout values will be kept in the Policies.

Implemented in TAO_IIOP_Transport.

Referenced by recv.

int TAO_Transport::register_handler void   
 

Remove all messages from the outgoing queue.

Register the handler with the reactor. This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Definition at line 220 of file Transport.cpp.

References ACE_GUARD_RETURN, ACE_MT, check_event_handler_i, and register_handler_i.

Referenced by TAO_Wait_On_Reactor::register_handler, TAO_Wait_On_Leader_Follower::register_handler, and TAO_Wait_On_Leader_Follower::sending_request.

00221 {
00222   ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00223                             guard,
00224                             *this->handler_lock_,
00225                             -1));
00226   if (this->check_event_handler_i ("Transport::register_handler") == -1)
00227     return -1;
00228 
00229   return this->register_handler_i ();
00230 }

virtual int TAO_Transport::register_handler_i void    [protected, pure virtual]
 

Register the handler with the reactor.

This method is used by the Wait_On_Reactor strategy. The transport must register its event handler with the ORB's Reactor.

Todo:
: I think this method is pretty much useless, the connections are *always* registered with the Reactor, except in thread-per-connection mode. In that case putting the connection in the Reactor would produce unpredictable results anyway.

Implemented in TAO_IIOP_Transport.

Referenced by register_handler.

void TAO_Transport::release TAO_Transport *    transport [static]
 

Definition at line 174 of file Transport.cpp.

References ACE_OS::abort, ACE_ERROR, TAO_Synch_Refcountable::decrement, id, and LM_ERROR.

Referenced by TAO_GIOP_Invocation::close_connection, TAO_Cache_IntId::operator=, TAO_GIOP_Invocation::perform_call, TAO_Transport_Cache_Manager::purge, TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler, TAO_Connection_Handler::transport, TAO_Asynch_Reply_Dispatcher_Base::transport, TAO_Asynch_Reply_Dispatcher_Base::~TAO_Asynch_Reply_Dispatcher_Base, TAO_Cache_IntId::~TAO_Cache_IntId, TAO_GIOP_Invocation::~TAO_GIOP_Invocation, TAO_Notify_Handler::~TAO_Notify_Handler, and TAO_Transport_Refcount_Guard::~TAO_Transport_Refcount_Guard.

00175 {
00176   if (transport != 0)
00177     {
00178       int count = transport->decrement ();
00179 
00180       if (count == 0)
00181         {
00182           delete transport;
00183         }
00184       else if (count < 0)
00185         {
00186           ACE_ERROR ((LM_ERROR,
00187                       "TAO (%P|%t) - Transport[%d]::release, "
00188                       "reference count is less than zero: %d\n",
00189                       transport->id (), count));
00190           ACE_OS::abort ();
00191         }
00192     }
00193 }

void TAO_Transport::report_invalid_event_handler const char *    caller [private]
 

Print out error messages if the event handler is not valid.

Definition at line 1101 of file Transport.cpp.

References ACE_DEBUG, LM_DEBUG, and tag_.

Referenced by check_event_handler_i.

01102 {
01103   if (TAO_debug_level > 0)
01104     {
01105       ACE_DEBUG ((LM_DEBUG,
01106                   "TAO (%P|%t) - Transport[%d]::report_invalid_event_handler"
01107                   "(%s) no longer associated with handler [tag=%d]\n",
01108                   this->id (), caller, this->tag_));
01109     }
01110 }

ACE_INLINE void TAO_Transport::reset_flush_timer void    [private]
 

The flush timer expired or was explicitly cancelled, mark it as not pending.

Definition at line 104 of file Transport.inl.

References current_deadline_, flush_timer_id_, and ACE_Time_Value::zero.

Referenced by drain_queue_i, and handle_timeout.

00105 {
00106   this->flush_timer_id_ = -1;
00107   this->current_deadline_ = ACE_Time_Value::zero;
00108 }

int TAO_Transport::schedule_output_i void    [private]
 

Schedule handle_output() callbacks.

Definition at line 767 of file Transport.cpp.

References ACE_DEBUG, event_handler_i, LM_DEBUG, ACE_Event_Handler::reactor, ACE_Reactor::schedule_wakeup, and ACE_Event_Handler::WRITE_MASK.

Referenced by TAO_Reactive_Flushing_Strategy::schedule_output, and TAO_Leader_Follower_Flushing_Strategy::schedule_output.

00768 {
00769   ACE_Event_Handler *eh = this->event_handler_i ();
00770   if (eh == 0)
00771     return -1;
00772 
00773   ACE_Reactor *reactor = eh->reactor ();
00774   if (reactor == 0)
00775     return -1;
00776 
00777   if (TAO_debug_level > 3)
00778     {
00779       ACE_DEBUG ((LM_DEBUG,
00780                   "TAO (%P|%t) - Transport[%d]::schedule_output\n",
00781                   this->id ()));
00782     }
00783 
00784   return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
00785 }

ssize_t TAO_Transport::send iovec   iov,
int    iovcnt,
size_t &    bytes_transferred,
const ACE_Time_Value   timeout = 0
 

Write the complete Message_Block chain to the connection.

This method serializes on handler_lock_, guaranteeing that only thread can execute it on the same instance concurrently.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
mblk  contains the data that must be sent. For each message block in the cont() chain all the data between rd_ptr() and wr_ptr() should be delivered to the remote peer.
timeout  is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.
bytes_transferred  should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.
This call can also fail if the transport instance is no longer associated with a connection (e.g., the connection handler closed down). In that case, it returns -1 and sets errno to ENOENT.

Definition at line 234 of file Transport.cpp.

References ACE_GUARD_RETURN, ACE_MT, check_event_handler_i, and send_i.

00237 {
00238   ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
00239                             guard,
00240                             *this->handler_lock_,
00241                             -1));
00242 
00243   if (this->check_event_handler_i ("Transport::send") == -1)
00244     return -1;
00245 
00246   // now call the template method
00247   return this->send_i (iov, iovcnt, bytes_transferred, timeout);
00248 }

void TAO_Transport::send_connection_closed_notifications void    [private]
 

Notify all the components inside a Transport when the underlying connection is closed.

Definition at line 1121 of file Transport.cpp.

References ACE_GUARD, ACE_MT, and send_connection_closed_notifications_i.

Referenced by close_connection_shared, connection_handler_closing, handle_input_i, and process_parsed_messages.

01122 {
01123   ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
01124 
01125   this->send_connection_closed_notifications_i ();
01126 }

void TAO_Transport::send_connection_closed_notifications_i void    [private]
 

Assume the lock is held.

Definition at line 1129 of file Transport.cpp.

References TAO_Transport_Mux_Strategy::connection_closed, TAO_Queued_Message::destroy, head_, TAO_LF_Event::LFS_CONNECTION_CLOSED, messaging_object, TAO_Queued_Message::next, TAO_LF_Event::state_changed, and tms.

Referenced by send_connection_closed_notifications.

01130 {
01131   while (this->head_ != 0)
01132     {
01133       TAO_Queued_Message *i = this->head_;
01134 
01135       // @@ This is a good point to insert a flag to indicate that a
01136       //    CloseConnection message was successfully received.
01137       i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED);
01138 
01139       this->head_ = i->next ();
01140 
01141       i->destroy ();
01142     }
01143 
01144   this->tms ()->connection_closed ();
01145   this->messaging_object ()->reset ();
01146 }

virtual ssize_t TAO_Transport::send_i iovec   iov,
int    iovcnt,
size_t &    bytes_transferred,
const ACE_Time_Value   timeout = 0
[protected, pure virtual]
 

Write the complete iovec chain to the connection.

Often the implementation simply forwards the arguments to the underlying ACE_Svc_Handler class. Using the code factored out into ACE.

Be careful with protocols that perform non-trivial transformations of the data, such as SSLIOP or protocols that compress the stream.

Parameters:
iov  contains the data that must be sent.
iovcnt  is the number of iovec structures in the list where iov points.
bytes_transferred  should return the total number of bytes successfully transferred before the connection blocked. This is required because in some platforms and/or protocols multiple system calls may be required to send the chain of message blocks. The first few calls can work successfully, but the final one can fail or signal a flow control situation (via EAGAIN). In this case the ORB expects the function to return -1, errno to be appropriately set and this argument to return the number of bytes already on the OS I/O subsystem.
timeout  is the maximum time that the application is willing to wait for the data to be sent, useful in platforms that implement timed writes. The timeout value is obtained from the policies set by the application.

Implemented in TAO_IIOP_Transport.

Referenced by drain_queue_helper, and send.

virtual int TAO_Transport::send_message TAO_OutputCDR   stream,
TAO_Stub   stub = 0,
int    message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
ACE_Time_Value   max_time_wait = 0
[pure virtual]
 

This method formats the stream and then sends the message on the transport.

Once the ORB is prepared to receive a reply (see send_request() above), and all the arguments have been marshaled the CDR stream must be 'formatted', i.e. the message_size field in the GIOP header can finally be set to the proper value.

Implemented in TAO_IIOP_Transport.

Referenced by TAO_ServerRequest::send_no_exception_reply, TAO_ServerRequest::tao_send_reply, and TAO_ServerRequest::tao_send_reply_exception.

int TAO_Transport::send_message_block_chain const ACE_Message_Block   message_block,
size_t &    bytes_transferred,
ACE_Time_Value   max_wait_time = 0
 

Send a message block chain,.

Definition at line 437 of file Transport.cpp.

References ACE_GUARD_RETURN, check_event_handler_i, and send_message_block_chain_i.

00440 {
00441   ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00442 
00443   if (this->check_event_handler_i ("Transport::send_message_block_chain") == -1)
00444     return -1;
00445 
00446   return this->send_message_block_chain_i (mb,
00447                                            bytes_transferred,
00448                                            max_wait_time);
00449 }

int TAO_Transport::send_message_block_chain_i const ACE_Message_Block   message_block,
size_t &    bytes_transferred,
ACE_Time_Value   max_wait_time
 

Send a message block chain, assuming the lock is held.

Definition at line 452 of file Transport.cpp.

References ACE_ASSERT, TAO_Synch_Queued_Message::all_data_sent, drain_queue_i, TAO_Synch_Queued_Message::message_length, TAO_Queued_Message::next, TAO_Queued_Message::prev, TAO_Queued_Message::push_back, TAO_Queued_Message::remove_from_list, and ACE_Message_Block::total_length.

Referenced by send_message_block_chain, and send_message_shared_i.

00455 {
00456   size_t total_length = mb->total_length ();
00457 
00458   // We are going to block, so there is no need to clone
00459   // the message block.
00460   TAO_Synch_Queued_Message synch_message (mb);
00461 
00462   synch_message.push_back (this->head_, this->tail_);
00463 
00464   int n = this->drain_queue_i ();
00465   if (n == -1)
00466     {
00467       synch_message.remove_from_list (this->head_, this->tail_);
00468       ACE_ASSERT (synch_message.next () == 0);
00469       ACE_ASSERT (synch_message.prev () == 0);
00470       return -1; // Error while sending...
00471     }
00472   else if (n == 1)
00473     {
00474       ACE_ASSERT (synch_message.all_data_sent ());
00475       ACE_ASSERT (synch_message.next () == 0);
00476       ACE_ASSERT (synch_message.prev () == 0);
00477       bytes_transferred = total_length;
00478       return 1;  // Empty queue, message was sent..
00479     }
00480 
00481   ACE_ASSERT (n == 0); // Some data sent, but data remains.
00482 
00483   // Remove the temporary message from the queue...
00484   synch_message.remove_from_list (this->head_, this->tail_);
00485 
00486   bytes_transferred =
00487     total_length - synch_message.message_length ();
00488 
00489   return 0;
00490 }

int TAO_Transport::send_message_shared TAO_Stub   stub,
int    message_semantics,
const ACE_Message_Block   message_block,
ACE_Time_Value   max_wait_time
[virtual]
 

Sent the contents of <message_block>.

Parameters:
stub  The object reference used for this operation, useful to obtain the current policies.
message_semantics  If this is set to TAO_TWO_REQUEST this method will block until the operation is completely written on the wire. If it is set to other values this operation could return.
message_block  The CDR encapsulation of the GIOP message that must be sent. The message may consist of multiple Message Blocks chained through the cont() field.
max_wait_time  The maximum time that the operation can block, used in the implementation of timeouts.

Reimplemented in TAO_IIOP_Transport.

Definition at line 493 of file Transport.cpp.

References ACE_GUARD_RETURN, check_event_handler_i, close_connection, and send_message_shared_i.

00497 {
00498   int r;
00499   {
00500     ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
00501 
00502     if (this->check_event_handler_i ("Transport::send_message_shared") == -1)
00503       return -1;
00504 
00505     r = this->send_message_shared_i (stub, message_semantics,
00506                                      message_block, max_wait_time);
00507   }
00508   if (r == -1)
00509   {
00510     this->close_connection ();
00511   }
00512 
00513   return r;
00514 }

int TAO_Transport::send_message_shared_i TAO_Stub   stub,
int    message_semantics,
const ACE_Message_Block   message_block,
ACE_Time_Value   max_wait_time
[protected]
 

Implement send_message_shared() assuming the handler_lock_ is held.

Definition at line 1149 of file Transport.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_GUARD_RETURN, ACE_NEW_RETURN, check_buffering_constraints_i, ACE_Message_Block::cont, ETIME, EWOULDBLOCK, TAO_Flushing_Strategy::flush_transport, TAO_ORB_Core::flushing_strategy, head_, ACE_Message_Block::length, LM_DEBUG, TAO_Sync_Strategy::must_queue, orb_core, TAO_Queued_Message::push_back, TAO_Flushing_Strategy::schedule_output, send_message_block_chain_i, send_reply_message_i, send_synchronous_message_i, ssize_t, TAO_Stub::sync_strategy, TAO_REPLY, TAO_TWOWAY_REQUEST, and ACE_Message_Block::total_length.

Referenced by send_message_shared, and TAO_IIOP_Transport::send_message_shared.

01153 {
01154   if (message_semantics == TAO_Transport::TAO_TWOWAY_REQUEST)
01155     {
01156       return this->send_synchronous_message_i (message_block,
01157                                                max_wait_time);
01158     }
01159   else if (message_semantics == TAO_Transport::TAO_REPLY)
01160     {
01161       return this->send_reply_message_i (message_block,
01162                                          max_wait_time);
01163     }
01164 
01165 
01166   // Let's figure out if the message should be queued without trying
01167   // to send first:
01168   int try_sending_first = 1;
01169 
01170   int queue_empty = (this->head_ == 0);
01171 
01172   if (!queue_empty)
01173     try_sending_first = 0;
01174   else if (stub->sync_strategy ().must_queue (queue_empty))
01175     try_sending_first = 0;
01176 
01177   ssize_t n;
01178 
01179   TAO_Flushing_Strategy *flushing_strategy =
01180     this->orb_core ()->flushing_strategy ();
01181 
01182   if (try_sending_first)
01183     {
01184       size_t byte_count = 0;
01185       // ... in this case we must try to send the message first ...
01186 
01187       size_t total_length = message_block->total_length ();
01188       if (TAO_debug_level > 6)
01189         {
01190           ACE_DEBUG ((LM_DEBUG,
01191                       "TAO (%P|%t) - Transport[%d]::send_message_i, "
01192                       "trying to send the message (ml = %d)\n",
01193                       this->id (), total_length));
01194         }
01195 
01196       // @@ I don't think we want to hold the mutex here, however if
01197       // we release it we need to recheck the status of the transport
01198       // after we return... once I understand the final form for this
01199       // code I will re-visit this decision
01200       n = this->send_message_block_chain_i (message_block,
01201                                             byte_count,
01202                                             max_wait_time);
01203       if (n == -1)
01204         {
01205           // ... if this is just an EWOULDBLOCK we must schedule the
01206           // message for later, if it is ETIME we still have to send
01207           // the complete message, because cutting off the message at
01208           // this point will destroy the synchronization with the
01209           // server ...
01210           if (errno != EWOULDBLOCK && errno != ETIME)
01211             {
01212               if (TAO_debug_level > 0)
01213                 {
01214                   ACE_DEBUG ((LM_DEBUG,
01215                               "TAO (%P|%t) - Transport[%d]::send_message_i, "
01216                               "fatal error in "
01217                               "send_message_block_chain_i %p\n",
01218                               this->id (), ""));
01219                 }
01220               return -1;
01221             }
01222         }
01223 
01224       // ... let's figure out if the complete message was sent ...
01225       if (total_length == byte_count)
01226         {
01227           // Done, just return.  Notice that there are no allocations
01228           // or copies up to this point (though some fancy calling
01229           // back and forth).
01230           // This is the common case for the critical path, it should
01231           // be fast.
01232           return 0;
01233         }
01234 
01235       if (TAO_debug_level > 6)
01236         {
01237           ACE_DEBUG ((LM_DEBUG,
01238                       "TAO (%P|%t) - Transport[%d]::send_message_i, "
01239                       "partial send %d / %d bytes\n",
01240                       this->id (), byte_count, total_length));
01241         }
01242 
01243       // ... part of the data was sent, need to figure out what piece
01244       // of the message block chain must be queued ...
01245       while (message_block != 0 && message_block->length () == 0)
01246         message_block = message_block->cont ();
01247 
01248       // ... at least some portion of the message block chain should
01249       // remain ...
01250       ACE_ASSERT (message_block != 0);
01251     }
01252 
01253   // ... either the message must be queued or we need to queue it
01254   // because it was not completely sent out ...
01255 
01256   if (TAO_debug_level > 6)
01257     {
01258       ACE_DEBUG ((LM_DEBUG,
01259                   "TAO (%P|%t) - Transport[%d]::send_message_i, "
01260                   "message is queued\n",
01261                   this->id ()));
01262     }
01263 
01264   TAO_Queued_Message *queued_message = 0;
01265   ACE_NEW_RETURN (queued_message,
01266                   TAO_Asynch_Queued_Message (message_block,
01267                                              0,
01268                                              1),
01269                   -1);
01270   queued_message->push_back (this->head_, this->tail_);
01271 
01272   // ... if the queue is full we need to activate the output on the
01273   // queue ...
01274   int must_flush = 0;
01275   int constraints_reached =
01276     this->check_buffering_constraints_i (stub,
01277                                          must_flush);
01278 
01279   // ... but we also want to activate it if the message was partially
01280   // sent.... Plus, when we use the blocking flushing strategy the
01281   // queue is flushed as a side-effect of 'schedule_output()'
01282 
01283   if (constraints_reached || try_sending_first)
01284     {
01285       (void) flushing_strategy->schedule_output (this);
01286     }
01287 
01288   if (must_flush)
01289     {
01290       typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
01291       TAO_REVERSE_LOCK reverse (*this->handler_lock_);
01292       ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
01293 
01294       (void) flushing_strategy->flush_transport (this);
01295     }
01296 
01297   return 0;
01298 }

int TAO_Transport::send_reply_message_i const ACE_Message_Block   message_block,
ACE_Time_Value   max_wait_time
[private]
 

Send a reply message, i.e. do not block until the message is on the wire, but just return after adding them to the queue.

Definition at line 610 of file Transport.cpp.

References ACE_ASSERT, ACE_DEBUG, TAO_Synch_Queued_Message::clone, TAO_ORB_Core::flushing_strategy, LM_DEBUG, orb_core, TAO_Queued_Message::push_back, TAO_Queued_Message::remove_from_list, TAO_Flushing_Strategy::schedule_output, and send_synch_message_helper_i.

Referenced by send_message_shared_i.

00612 {
00613   // Dont clone now.. We could be sent in one shot!
00614   TAO_Synch_Queued_Message synch_message (mb);
00615 
00616   synch_message.push_back (this->head_,
00617                            this->tail_);
00618 
00619   int n =
00620     this->send_synch_message_helper_i (synch_message,
00621                                        max_wait_time);
00622 
00623   if (n == -1 ||
00624       n == 1)
00625     return n;
00626 
00627   ACE_ASSERT (n == 0);
00628 
00629   if (TAO_debug_level > 3)
00630     {
00631       ACE_DEBUG ((LM_DEBUG,
00632                   "TAO (%P|%t) - Transport[%d]::send_reply_message_i, "
00633                   "preparing to add to queue before leaving \n",
00634                   this->id ()));
00635     }
00636 
00637   // Till this point we shouldnt have any copying and that is the
00638   // point anyway. Now, remove the node from the list
00639   synch_message.remove_from_list (this->head_,
00640                                   this->tail_);
00641 
00642   // Clone the node that we have.
00643   TAO_Queued_Message *msg =
00644     synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
00645 
00646   // Stick it in the queue
00647   msg->push_back (this->head_,
00648                   this->tail_);
00649 
00650   TAO_Flushing_Strategy *flushing_strategy =
00651     this->orb_core ()->flushing_strategy ();
00652 
00653   (void) flushing_strategy->schedule_output (this);
00654 
00655   return 1;
00656 }

virtual int TAO_Transport::send_request TAO_Stub   stub,
TAO_ORB_Core   orb_core,
TAO_OutputCDR   stream,
int    message_semantics,
ACE_Time_Value   max_time_wait
[pure virtual]
 

Prepare the waiting and demuxing strategy to receive a reply for a new request.

Preparing the ORB to receive the reply only once the request is completely sent opens the system to some subtle race conditions: suppose the ORB is running in a multi-threaded configuration, thread A makes a request while thread B is using the Reactor to process all incoming requests. Thread A could be implemented as follows: 1) send the request 2) setup the ORB to receive the reply 3) wait for the request

but in this case thread B may receive the reply between step (1) and (2), and drop it as an invalid or unexpected message. Consequently the correct implementation is: 1) setup the ORB to receive the reply 2) send the request 3) wait for the reply

The following method encapsulates this idiom.

Todo:
This is generic code, it should be factored out into the Transport class.

Implemented in TAO_IIOP_Transport.

int TAO_Transport::send_synch_message_helper_i TAO_Synch_Queued_Message   s,
ACE_Time_Value   max_wait_time
[private]
 

A helper method used by <send_synchronous_message_i> and <send_reply_message_i>. Reusable code that could be used by both the methods.

Definition at line 659 of file Transport.cpp.

References ACE_ASSERT, TAO_Synch_Queued_Message::all_data_sent, drain_queue_i, TAO_Queued_Message::next, TAO_Queued_Message::prev, and TAO_Queued_Message::remove_from_list.

Referenced by send_reply_message_i, and send_synchronous_message_i.

00661 {
00662   // @@todo: Need to send timeouts for writing..
00663   int n = this->drain_queue_i ();
00664   if (n == -1)
00665     {
00666       synch_message.remove_from_list (this->head_, this->tail_);
00667       ACE_ASSERT (synch_message.next () == 0);
00668       ACE_ASSERT (synch_message.prev () == 0);
00669       return -1; // Error while sending...
00670     }
00671   else if (n == 1)
00672     {
00673       ACE_ASSERT (synch_message.all_data_sent ());
00674       ACE_ASSERT (synch_message.next () == 0);
00675       ACE_ASSERT (synch_message.prev () == 0);
00676       return 1;  // Empty queue, message was sent..
00677     }
00678 
00679   ACE_ASSERT (n == 0); // Some data sent, but data remains.
00680 
00681   if (synch_message.all_data_sent ())
00682     {
00683       ACE_ASSERT (synch_message.next () == 0);
00684       ACE_ASSERT (synch_message.prev () == 0);
00685       return 1;
00686     }
00687 
00688   return 0;
00689 }

int TAO_Transport::send_synchronous_message_i const ACE_Message_Block   message_block,
ACE_Time_Value   max_wait_time
[private]
 

Send a synchronous message, i.e. block until the message is on the wire.

Definition at line 517 of file Transport.cpp.

References ACE_ASSERT, ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_RETURN, TAO_Synch_Queued_Message::all_data_sent, TAO_Synch_Queued_Message::current_block, ETIME, TAO_Flushing_Strategy::flush_message, TAO_ORB_Core::flushing_strategy, head_, LM_ERROR, TAO_Queued_Message::next, orb_core, TAO_Queued_Message::prev, TAO_Queued_Message::push_back, TAO_Queued_Message::push_front, TAO_Queued_Message::remove_from_list, TAO_Flushing_Strategy::schedule_output, and send_synch_message_helper_i.

Referenced by send_message_shared_i.

00519 {
00520   // We are going to block, so there is no need to clone
00521   // the message block.
00522   TAO_Synch_Queued_Message synch_message (mb);
00523 
00524   synch_message.push_back (this->head_, this->tail_);
00525 
00526   int n =
00527     this->send_synch_message_helper_i (synch_message,
00528                                        max_wait_time);
00529 
00530   if (n == -1 ||
00531       n == 1)
00532     return n;
00533 
00534   ACE_ASSERT (n == 0);
00535 
00536   // @todo: Check for timeouts!
00537   // if (max_wait_time != 0 && errno == ETIME) return -1;
00538   TAO_Flushing_Strategy *flushing_strategy =
00539     this->orb_core ()->flushing_strategy ();
00540   (void) flushing_strategy->schedule_output (this);
00541 
00542   // Release the mutex, other threads may modify the queue as we
00543   // block for a long time writing out data.
00544   int result;
00545   {
00546     typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
00547     TAO_REVERSE_LOCK reverse (*this->handler_lock_);
00548     ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
00549 
00550     result = flushing_strategy->flush_message (this,
00551                                                &synch_message,
00552                                                max_wait_time);
00553   }
00554   if (result == -1)
00555     {
00556       synch_message.remove_from_list (this->head_, this->tail_);
00557       if (errno == ETIME)
00558         {
00559           if (this->head_ == &synch_message)
00560             {
00561               // This is a timeout, there is only one nasty case: the
00562               // message has been partially sent!  We simply cannot take
00563               // the message out of the queue, because that would corrupt
00564               // the connection.
00565               //
00566               // What we do is replace the queued message with an
00567               // asynchronous message, that contains only what remains of
00568               // the timed out request.  If you think about sending
00569               // CancelRequests in this case: there is no much point in
00570               // doing that: the receiving ORB would probably ignore it,
00571               // and figuring out the request ID would be a bit of a
00572               // nightmare.
00573               //
00574 
00575               synch_message.remove_from_list (this->head_, this->tail_);
00576               TAO_Queued_Message *queued_message = 0;
00577               ACE_NEW_RETURN (queued_message,
00578                               TAO_Asynch_Queued_Message (
00579                                   synch_message.current_block (),
00580                                   0,
00581                                   1),
00582                               -1);
00583               queued_message->push_front (this->head_, this->tail_);
00584             }
00585         }
00586 
00587       if (TAO_debug_level > 0)
00588         {
00589           ACE_ERROR ((LM_ERROR,
00590                       "TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, "
00591                       "error while flushing message %p\n",
00592                       this->id (), ""));
00593         }
00594 
00595       return -1;
00596     }
00597 
00598   else
00599     {
00600       ACE_ASSERT (synch_message.all_data_sent () != 0);
00601     }
00602 
00603   ACE_ASSERT (synch_message.next () == 0);
00604   ACE_ASSERT (synch_message.prev () == 0);
00605   return 1;
00606 }

size_t TAO_Transport::sent_byte_count void   
 

Accessor to sent_byte_count_.

Definition at line 2158 of file Transport.cpp.

References sent_byte_count_.

02159 {
02160   return this->sent_byte_count_;
02161 }

ACE_INLINE CORBA::ULong TAO_Transport::tag void    const
 

Return the protocol tag.

The OMG assigns unique tags (a 32-bit unsigned number) to each protocol. New protocol tags can be obtained free of charge from the OMG, check the documents in corbafwd.h for more details.

Definition at line 7 of file Transport.inl.

References tag_.

00008 {
00009   return this->tag_;
00010 }

int TAO_Transport::tear_listen_point_list TAO_InputCDR   cdr [virtual]
 

Extracts the list of listen points from the <cdr> stream. The list would have the protocol specific details of the ListenPoints.

Reimplemented in TAO_IIOP_Transport.

Definition at line 282 of file Transport.cpp.

Referenced by TAO_GIOP_Message_Generator_Parser_12::process_bidir_context.

00283 {
00284   ACE_NOTSUP_RETURN (-1);
00285 }

ACE_INLINE TAO_Transport_Mux_Strategy * TAO_Transport::tms void    const
 

Get the TAO_Tranport_Mux_Strategy used by this object.

The role of the TAO_Transport_Mux_Strategy is described in more detail in that class' documentation. Enough is to say that the class is used to control how many threads can have pending requests over the same connection. Multiplexing multiple threads over the same connection conserves resources and is almost required for AMI, but having only one pending request per connection is more efficient and reduces the possibilities of priority inversions.

Definition at line 19 of file Transport.inl.

References tms_.

Referenced by idle_after_reply, idle_after_send, process_parsed_messages, and send_connection_closed_notifications_i.

00020 {
00021   return tms_;
00022 }

TAO_Transport_Cache_Manager & TAO_Transport::transport_cache_manager void    [private]
 

Helper method that returns the Transport Cache Manager.

Definition at line 2146 of file Transport.cpp.

References TAO_ORB_Core::lane_resources, orb_core_, and TAO_Thread_Lane_Resources::transport_cache.

Referenced by close_connection_shared, make_idle, purge_entry, recache_transport, and update_transport.

02147 {
02148   return this->orb_core_->lane_resources ().transport_cache ();
02149 }

void TAO_Transport::try_to_complete ACE_Time_Value   max_wait_time
 

Definition at line 1728 of file Transport.cpp.

References ACE_DEBUG, LM_DEBUG, TAO_Queued_Data::missing_data_bytes_, TAO_Queued_Data::msg_block_, recv, ssize_t, uncompleted_message_, and ACE_Message_Block::wr_ptr.

Referenced by handle_input_i.

01729 {
01730   if (this->uncompleted_message_ == 0)
01731     return;
01732 
01733   ssize_t n = 0;
01734   size_t &missing_data = this->uncompleted_message_->missing_data_bytes_;
01735   ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_;
01736 
01737   // Try to complete this until we error or block right here...
01738   for (ssize_t bytes = missing_data;
01739        bytes != 0;
01740        bytes -= n)
01741     {
01742       // .. do a read on the socket again.
01743       n = this->recv (mb.wr_ptr (),
01744                       bytes,
01745                       max_wait_time);
01746 
01747       if (TAO_debug_level > 6)
01748         {
01749           ACE_DEBUG ((LM_DEBUG,
01750                       "TAO (%P|%t) - Transport[%d]::handle_input_i, "
01751                       "read %d bytes on attempt\n",
01752                       this->id(), n));
01753         }
01754 
01755       if (n == 0 || n == -1)
01756         {
01757           break;
01758         }
01759 
01760       mb.wr_ptr (n);
01761       missing_data -= n;
01762     }
01763 }

int TAO_Transport::update_transport void   
 

Cache management.

Definition at line 396 of file Transport.cpp.

References transport_cache_manager, and TAO_Transport_Cache_Manager::update_entry.

Referenced by TAO_Notify_Handler::handle_input, TAO_Connection_Handler::handle_input_eh, and TAO_Connection_Handler::handle_output_eh.

00397 {
00398   return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
00399 }

ACE_INLINE TAO_Wait_Strategy * TAO_Transport::wait_strategy void    const
 

Return the TAO_Wait_Strategy used by this object.

The role of the TAO_Wait_Strategy is described in more detail in that class' documentation. Enough is to say that the ORB can wait for a reply blocking on read(), using the Reactor to wait for multiple events concurrently or using the Leader/Followers protocol.

Definition at line 26 of file Transport.inl.

References ws_.

Referenced by TAO_Connection_Handler::close_connection_eh, TAO_IIOP_Connector::make_connection, and TAO_IIOP_Connection_Handler::open.

00027 {
00028   return this->ws_;
00029 }

ACE_INLINE void TAO_Transport::wchar_translator TAO_Codeset_Translator_Factory  
 

CodeSet negotiation - Set the wchar codeset translator factory.

Definition at line 148 of file Transport.inl.

References tcs_set_, and wchar_translator_.

00149 {
00150   this->wchar_translator_ = tf;
00151   this->tcs_set_ = 1;
00152 }

ACE_INLINE TAO_Codeset_Translator_Factory * TAO_Transport::wchar_translator void    const
 

CodeSet Negotiation - Get the wchar codeset translator factory.

Definition at line 135 of file Transport.inl.

References wchar_translator_.

Referenced by TAO_Codeset_Manager::generate_service_context, TAO_Codeset_Manager::process_service_context, and TAO_Codeset_Manager::set_tcs.

00136 {
00137   return this->wchar_translator_;
00138 }


Friends And Related Function Documentation

friend class TAO_Block_Flushing_Strategy [friend]
 

This class needs priviledged access to - queue_is_empty_i() - drain_queue_i().

Definition at line 794 of file Transport.h.

friend class TAO_Leader_Follower_Flushing_Strategy [friend]
 

Definition at line 812 of file Transport.h.

friend class TAO_Reactive_Flushing_Strategy [friend]
 

These classes need privileged access to: - schedule_output_i() - cancel_output_i().

Definition at line 811 of file Transport.h.

friend class TAO_Transport_Cache_Manager [friend]
 

This class needs priviledged access to: close_connection_no_purge ().

Definition at line 894 of file Transport.h.


Member Data Documentation

int TAO_Transport::bidirectional_flag_ [protected]
 

Use to check if bidirectional info has been synchronized with the peer.

Have we sent any info on bidirectional information or have we received any info regarding making the connection served by this transport bidirectional. The flag is used as follows: + We dont want to send the bidirectional context info more than once on the connection. Why? Waste of marshalling and demarshalling time on the client. + On the server side -- once a client that has established the connection asks the server to use the connection both ways, we *dont* want the server to pack service info to the client. That is not allowed. We need a flag to prevent such a things from happening.

The value of this flag will be 0 if the client sends info and 1 if the server receives the info.

Definition at line 947 of file Transport.h.

Referenced by bidirectional_flag.

TAO_Transport_Cache_Manager::HASH_MAP_ENTRY* TAO_Transport::cache_map_entry_ [protected]
 

Our entry in the cache. We dont own this. It is here for our convinience. We cannot just change things around.

Definition at line 919 of file Transport.h.

Referenced by cache_map_entry.

TAO_Codeset_Translator_Factory* TAO_Transport::char_translator_ [private]
 

Additional member values required to support codeset translation.

Definition at line 1002 of file Transport.h.

Referenced by assign_translators, and char_translator.

ACE_Time_Value TAO_Transport::current_deadline_ [protected]
 

The queue will start draining no later than <queing_deadline_> *if* the deadline is.

Definition at line 961 of file Transport.h.

Referenced by check_buffering_constraints_i, handle_timeout, and reset_flush_timer.

CORBA::Boolean TAO_Transport::first_request_ [private]
 

First_request_ is true until the first request is sent or received. This is necessary since codeset context information is necessary only on the first request. After that, the translators are fixed for the life of the connection.

Definition at line 1013 of file Transport.h.

Referenced by first_request_sent, and generate_request_header.

long TAO_Transport::flush_timer_id_ [protected]
 

The timer ID.

Definition at line 964 of file Transport.h.

Referenced by check_buffering_constraints_i, flush_timer_pending, and reset_flush_timer.

ACE_Lock* TAO_Transport::handler_lock_ [protected]
 

Lock that insures that activities that *might* use handler-related resources (such as a connection handler) get serialized.

This is an ACE_Lock that gets initialized from TAO_ORB_Core::resource_factory()->create_cached_connection_lock (). This way, one can use a lock appropriate for the type of system, i.e., a null lock for single-threaded systems, and a real lock for multi-threaded systems.

Definition at line 978 of file Transport.h.

Referenced by TAO_Transport, and ~TAO_Transport.

TAO_Queued_Message* TAO_Transport::head_ [protected]
 

Implement the outgoing data queue.

Definition at line 950 of file Transport.h.

Referenced by check_buffering_constraints_i, cleanup_queue, drain_queue_helper, drain_queue_i, queue_is_empty_i, send_connection_closed_notifications_i, send_message_shared_i, and send_synchronous_message_i.

size_t TAO_Transport::id_ [protected]
 

A unique identifier for the transport.

This never *never* changes over the lifespan, so we don't have to worry about locking it.

HINT: Protocol-specific transports that use connection handler might choose to set this to the handle for their connection.

Definition at line 989 of file Transport.h.

Referenced by id.

TAO_Incoming_Message_Queue TAO_Transport::incoming_message_queue_ [protected]
 

Queue of the completely-received incoming messages..

Definition at line 954 of file Transport.h.

Referenced by enqueue_incoming_message, handle_input_i, and process_queue_head.

TAO_ORB_Core* TAO_Transport::orb_core_ [protected]
 

Global orbcore resource.

Definition at line 915 of file Transport.h.

Referenced by close_connection_shared, handle_input_i, orb_core, TAO_IIOP_Transport::register_handler_i, TAO_IIOP_Transport::send_request, TAO_Transport, and transport_cache_manager.

unsigned long TAO_Transport::purging_order_ [protected]
 

Used by the LRU, LFU and FIFO Connection Purging Strategies.

Definition at line 992 of file Transport.h.

Referenced by purging_order.

size_t TAO_Transport::recv_buffer_size_ [protected]
 

Size of the buffer received.

Definition at line 995 of file Transport.h.

Referenced by handle_input_i, and recv_buffer_size.

size_t TAO_Transport::sent_byte_count_ [protected]
 

Number of bytes sent.

Definition at line 998 of file Transport.h.

Referenced by drain_queue_helper, drain_queue_i, and sent_byte_count.

CORBA::ULong TAO_Transport::tag_ [protected]
 

IOP protocol tag.

Definition at line 912 of file Transport.h.

Referenced by report_invalid_event_handler, and tag.

TAO_Queued_Message* TAO_Transport::tail_ [protected]
 

Definition at line 951 of file Transport.h.

CORBA::Boolean TAO_Transport::tcs_set_ [private]
 

The tcs_set_ flag indicates that negotiation has occured and so the translators are correct, since a null translator is valid if both ends are using the same codeset, whatever that codeset might be.

Definition at line 1008 of file Transport.h.

Referenced by char_translator, is_tcs_set, and wchar_translator.

TAO_Transport_Mux_Strategy* TAO_Transport::tms_ [protected]
 

Strategy to decide whether multiple requests can be sent over the same connection or the connection is exclusive for a request.

Definition at line 923 of file Transport.h.

Referenced by TAO_Transport, tms, and ~TAO_Transport.

TAO_Transport_Timer TAO_Transport::transport_timer_ [protected]
 

The adapter used to receive timeout callbacks from the Reactor.

Definition at line 967 of file Transport.h.

TAO_Queued_Data* TAO_Transport::uncompleted_message_ [protected]
 

Place to hold a partially-received (waiting-to-be-completed) message.

Definition at line 957 of file Transport.h.

Referenced by handle_input_i, and try_to_complete.

TAO_Codeset_Translator_Factory* TAO_Transport::wchar_translator_ [private]
 

Definition at line 1003 of file Transport.h.

Referenced by assign_translators, and wchar_translator.

TAO_Wait_Strategy* TAO_Transport::ws_ [protected]
 

Strategy for waiting for the reply after sending the request.

Definition at line 926 of file Transport.h.

Referenced by close_connection_shared, notify_reactor, provide_handle, TAO_IIOP_Transport::register_handler_i, TAO_IIOP_Transport::send_request, TAO_Transport, wait_strategy, and ~TAO_Transport.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 15:43:44 2003 for TAO by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002