Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

UPIPE_Stream.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // UPIPE_Stream.cpp
00003 // $Id: UPIPE_Stream.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:24 chad Exp $
00004 
00005 #include "ace/UPIPE_Stream.h"
00006 
00007 ACE_RCSID(ace, UPIPE_Stream, "$Id: UPIPE_Stream.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:24 chad Exp $")
00008 
00009 #if defined (ACE_HAS_THREADS)
00010 
00011 #if !defined (__ACE_INLINE__) 
00012 #include "ace/UPIPE_Stream.i"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 
00016 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream)
00017 
00018 ACE_UPIPE_Stream::ACE_UPIPE_Stream (void)
00019   : mb_last_ (0),
00020     reference_count_ (0)
00021 {
00022   ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
00023 }
00024 
00025 ACE_UPIPE_Stream::~ACE_UPIPE_Stream (void)
00026 {
00027   if (this->mb_last_ != 0)
00028     {
00029       this->mb_last_->release ();
00030       this->mb_last_ = 0;
00031     }
00032 }
00033 
00034 int
00035 ACE_UPIPE_Stream::control (int cmd,
00036                            void * val) const
00037 {
00038   ACE_TRACE ("ACE_UPIPE_Stream::control");
00039 
00040   return ((ACE_UPIPE_Stream *) this)->stream_.control 
00041     ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds) cmd, val);
00042 }
00043 
00044 void
00045 ACE_UPIPE_Stream::dump (void) const
00046 {
00047   ACE_TRACE ("ACE_UPIPE_Stream::dump");
00048 }
00049 
00050 int 
00051 ACE_UPIPE_Stream::close (void)
00052 {
00053   ACE_TRACE ("ACE_UPIPE_Stream::close");
00054   ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
00055 
00056   this->reference_count_--;
00057 
00058   if (this->reference_count_ == 0)
00059     {
00060       // Since the UPIPE should have been closed earlier we won't bother
00061       // checking to see if closing it now fails.
00062 
00063       if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE)
00064         this->ACE_SPIPE::close ();
00065 
00066       // Close down the ACE_stream.
00067       return this->stream_.close ();
00068     }
00069   return 0;
00070 }
00071 
00072 int 
00073 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const
00074 {
00075   ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
00076   remote_sap = this->remote_addr_;
00077   return 0;
00078 }
00079 
00080 int
00081 ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p, 
00082                         ACE_Time_Value *timeout)
00083 {
00084   ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
00085   return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0;
00086 }
00087 
00088 int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p, 
00089                             ACE_Time_Value *timeout)
00090 {
00091   return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0;
00092 }
00093 
00094 // Send a buffer.
00095 
00096 ssize_t
00097 ACE_UPIPE_Stream::send (const char *buffer, 
00098                         size_t n, 
00099                         ACE_Time_Value *timeout)
00100 {
00101   ACE_TRACE ("ACE_UPIPE_Stream::send");
00102 
00103   ACE_Message_Block *mb_p;
00104   ACE_NEW_RETURN (mb_p,
00105                   ACE_Message_Block (n),
00106                   -1);
00107   mb_p->copy (buffer, n);
00108   return
00109     this->stream_.put (mb_p, timeout) == -1
00110     ? -1
00111     : ACE_static_cast (ssize_t, n);
00112 }
00113 
00114 // Receive a buffer.
00115 
00116 ssize_t
00117 ACE_UPIPE_Stream::recv (char *buffer, 
00118                         size_t n,
00119                         ACE_Time_Value *timeout)
00120 {
00121   ACE_TRACE ("ACE_UPIPE_Stream::recv");
00122   // Index in buffer.
00123   size_t bytes_read = 0;   
00124 
00125   while (bytes_read < n)
00126     if (this->mb_last_ != 0)
00127       {
00128         // We have remaining data in our last read Message_Buffer.
00129         size_t this_len = this->mb_last_->length ();
00130         if (this_len < n)
00131           {
00132             // The remaining data is not enough.
00133 
00134             ACE_OS::memcpy ((void *) &buffer[bytes_read], 
00135                             this->mb_last_->rd_ptr (), 
00136                             this_len);
00137             bytes_read += this_len;
00138             this->mb_last_ = this->mb_last_->release ();   // mb_last_ now 0
00139             return bytes_read;
00140           }
00141         else
00142           {
00143             // The remaining data is at least enough.  If there's
00144             // more, we'll get it the next time through.
00145             ACE_OS::memcpy (&buffer[bytes_read], 
00146                             this->mb_last_->rd_ptr (), 
00147                             n);
00148             bytes_read += n;
00149 
00150             // Advance rd_ptr.
00151             this->mb_last_->rd_ptr (n);
00152 
00153             if (this->mb_last_->length () == 0)
00154               // Now the Message_Buffer is empty.
00155               this->mb_last_ = this->mb_last_->release ();
00156           }
00157       }
00158     else
00159       {
00160         // We have to get a new Message_Buffer from our stream.
00161         int result = this->stream_.get (this->mb_last_, timeout);
00162 
00163         if (result == -1)
00164           {
00165             if (errno == EWOULDBLOCK && bytes_read > 0)
00166               // Return the number of bytes read before we timed out.
00167               return bytes_read; 
00168             else
00169               return -1;
00170           }
00171       }
00172 
00173   return bytes_read;
00174 }
00175 
00176 ssize_t
00177 ACE_UPIPE_Stream::send_n (const char *buf, 
00178                           size_t n,
00179                           ACE_Time_Value *timeout)
00180 {
00181   ACE_TRACE ("ACE_UPIPE_Stream::send_n");
00182 
00183   size_t bytes_written;
00184   ssize_t len = 0;
00185 
00186   for (bytes_written = 0; 
00187        bytes_written < n;
00188        bytes_written += len)
00189     {
00190       len = this->send (buf + bytes_written,
00191                         n - bytes_written, 
00192                         timeout);
00193 
00194       if (len == -1)
00195         return -1;
00196     }
00197 
00198   return bytes_written;
00199 }
00200 
00201 ssize_t
00202 ACE_UPIPE_Stream::recv_n (char *buf, 
00203                           size_t n, 
00204                           ACE_Time_Value *timeout)
00205 {
00206   ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
00207   size_t bytes_read;
00208   ssize_t len = 0;
00209 
00210   for (bytes_read = 0; 
00211        bytes_read < n;
00212        bytes_read += len)
00213     {
00214       len = this->recv (buf + bytes_read, 
00215                         n - bytes_read,
00216                         timeout);
00217       if (len == -1)
00218         return -1;
00219       else if (len == 0)
00220         break;
00221     }
00222 
00223   return bytes_read;      
00224 }
00225 
00226 
00227 #endif /* ACE_HAS_THREADS */

Generated on Mon Jun 16 11:22:03 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002