00001 #include "ace_pch.h"
00002
00003
00004
00005 #include "ace/UPIPE_Stream.h"
00006
00007 ACE_RCSID(ace, UPIPE_Stream, "$Id: UPIPE_Stream.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:24 chad Exp $")
00008
00009 #if defined (ACE_HAS_THREADS)
00010
00011 #if !defined (__ACE_INLINE__)
00012 #include "ace/UPIPE_Stream.i"
00013 #endif
00014
00015
00016 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream)
00017
00018 ACE_UPIPE_Stream::ACE_UPIPE_Stream (void)
00019 : mb_last_ (0),
00020 reference_count_ (0)
00021 {
00022 ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
00023 }
00024
00025 ACE_UPIPE_Stream::~ACE_UPIPE_Stream (void)
00026 {
00027 if (this->mb_last_ != 0)
00028 {
00029 this->mb_last_->release ();
00030 this->mb_last_ = 0;
00031 }
00032 }
00033
00034 int
00035 ACE_UPIPE_Stream::control (int cmd,
00036 void * val) const
00037 {
00038 ACE_TRACE ("ACE_UPIPE_Stream::control");
00039
00040 return ((ACE_UPIPE_Stream *) this)->stream_.control
00041 ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds) cmd, val);
00042 }
00043
00044 void
00045 ACE_UPIPE_Stream::dump (void) const
00046 {
00047 ACE_TRACE ("ACE_UPIPE_Stream::dump");
00048 }
00049
00050 int
00051 ACE_UPIPE_Stream::close (void)
00052 {
00053 ACE_TRACE ("ACE_UPIPE_Stream::close");
00054 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
00055
00056 this->reference_count_--;
00057
00058 if (this->reference_count_ == 0)
00059 {
00060
00061
00062
00063 if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE)
00064 this->ACE_SPIPE::close ();
00065
00066
00067 return this->stream_.close ();
00068 }
00069 return 0;
00070 }
00071
00072 int
00073 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const
00074 {
00075 ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
00076 remote_sap = this->remote_addr_;
00077 return 0;
00078 }
00079
00080 int
00081 ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p,
00082 ACE_Time_Value *timeout)
00083 {
00084 ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
00085 return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0;
00086 }
00087
00088 int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p,
00089 ACE_Time_Value *timeout)
00090 {
00091 return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0;
00092 }
00093
00094
00095
00096 ssize_t
00097 ACE_UPIPE_Stream::send (const char *buffer,
00098 size_t n,
00099 ACE_Time_Value *timeout)
00100 {
00101 ACE_TRACE ("ACE_UPIPE_Stream::send");
00102
00103 ACE_Message_Block *mb_p;
00104 ACE_NEW_RETURN (mb_p,
00105 ACE_Message_Block (n),
00106 -1);
00107 mb_p->copy (buffer, n);
00108 return
00109 this->stream_.put (mb_p, timeout) == -1
00110 ? -1
00111 : ACE_static_cast (ssize_t, n);
00112 }
00113
00114
00115
00116 ssize_t
00117 ACE_UPIPE_Stream::recv (char *buffer,
00118 size_t n,
00119 ACE_Time_Value *timeout)
00120 {
00121 ACE_TRACE ("ACE_UPIPE_Stream::recv");
00122
00123 size_t bytes_read = 0;
00124
00125 while (bytes_read < n)
00126 if (this->mb_last_ != 0)
00127 {
00128
00129 size_t this_len = this->mb_last_->length ();
00130 if (this_len < n)
00131 {
00132
00133
00134 ACE_OS::memcpy ((void *) &buffer[bytes_read],
00135 this->mb_last_->rd_ptr (),
00136 this_len);
00137 bytes_read += this_len;
00138 this->mb_last_ = this->mb_last_->release ();
00139 return bytes_read;
00140 }
00141 else
00142 {
00143
00144
00145 ACE_OS::memcpy (&buffer[bytes_read],
00146 this->mb_last_->rd_ptr (),
00147 n);
00148 bytes_read += n;
00149
00150
00151 this->mb_last_->rd_ptr (n);
00152
00153 if (this->mb_last_->length () == 0)
00154
00155 this->mb_last_ = this->mb_last_->release ();
00156 }
00157 }
00158 else
00159 {
00160
00161 int result = this->stream_.get (this->mb_last_, timeout);
00162
00163 if (result == -1)
00164 {
00165 if (errno == EWOULDBLOCK && bytes_read > 0)
00166
00167 return bytes_read;
00168 else
00169 return -1;
00170 }
00171 }
00172
00173 return bytes_read;
00174 }
00175
00176 ssize_t
00177 ACE_UPIPE_Stream::send_n (const char *buf,
00178 size_t n,
00179 ACE_Time_Value *timeout)
00180 {
00181 ACE_TRACE ("ACE_UPIPE_Stream::send_n");
00182
00183 size_t bytes_written;
00184 ssize_t len = 0;
00185
00186 for (bytes_written = 0;
00187 bytes_written < n;
00188 bytes_written += len)
00189 {
00190 len = this->send (buf + bytes_written,
00191 n - bytes_written,
00192 timeout);
00193
00194 if (len == -1)
00195 return -1;
00196 }
00197
00198 return bytes_written;
00199 }
00200
00201 ssize_t
00202 ACE_UPIPE_Stream::recv_n (char *buf,
00203 size_t n,
00204 ACE_Time_Value *timeout)
00205 {
00206 ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
00207 size_t bytes_read;
00208 ssize_t len = 0;
00209
00210 for (bytes_read = 0;
00211 bytes_read < n;
00212 bytes_read += len)
00213 {
00214 len = this->recv (buf + bytes_read,
00215 n - bytes_read,
00216 timeout);
00217 if (len == -1)
00218 return -1;
00219 else if (len == 0)
00220 break;
00221 }
00222
00223 return bytes_read;
00224 }
00225
00226
00227 #endif