00001 #include "ace_pch.h"
00002
00003
00004
00005 #ifndef ACE_STREAM_C
00006 #define ACE_STREAM_C
00007
00008
00009 #include "ace/Stream.h"
00010
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 # pragma once
00013 #endif
00014
00015 #include "ace/Stream_Modules.h"
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Stream.i"
00019 #endif
00020
00021 ACE_RCSID(ace, Stream, "$Id: Stream.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $")
00022
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Stream)
00024
00025
00026
00027 template <ACE_SYNCH_DECL> void
00028 ACE_Stream<ACE_SYNCH_USE>::dump (void) const
00029 {
00030 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::dump");
00031 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------- module links --------\n")));
00032
00033 for (ACE_Module<ACE_SYNCH_USE> *mp = this->stream_head_;
00034 ;
00035 mp = mp->next ())
00036 {
00037 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("module name = %s\n"), mp->name ()));
00038 if (mp == this->stream_tail_)
00039 break;
00040 }
00041
00042 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------- writer links --------\n")));
00043
00044 ACE_Task<ACE_SYNCH_USE> *tp;
00045
00046 for (tp = this->stream_head_->writer ();
00047 ;
00048 tp = tp->next ())
00049 {
00050 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("writer queue name = %s\n"), tp->name ()));
00051 tp->dump ();
00052 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------\n")));
00053 if (tp == this->stream_tail_->writer ()
00054 || (this->linked_us_
00055 && tp == this->linked_us_->stream_head_->reader ()))
00056 break;
00057 }
00058
00059 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------- reader links --------\n")));
00060 for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
00061 {
00062 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("reader queue name = %s\n"), tp->name ()));
00063 tp->dump ();
00064 ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("-------\n")));
00065 if (tp == this->stream_head_->reader ()
00066 || (this->linked_us_
00067 && tp == this->linked_us_->stream_head_->writer ()))
00068 break;
00069 }
00070 }
00071
00072 template <ACE_SYNCH_DECL> int
00073 ACE_Stream<ACE_SYNCH_USE>::push (ACE_Module<ACE_SYNCH_USE> *new_top)
00074 {
00075 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push");
00076 if (this->push_module (new_top,
00077 this->stream_head_->next (),
00078 this->stream_head_) == -1)
00079 return -1;
00080 else
00081 return 0;
00082 }
00083
00084 template <ACE_SYNCH_DECL> int
00085 ACE_Stream<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
00086 {
00087 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::put");
00088 return this->stream_head_->writer ()->put (mb, tv);
00089 }
00090
00091 template <ACE_SYNCH_DECL> int
00092 ACE_Stream<ACE_SYNCH_USE>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
00093 {
00094 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::get");
00095 return this->stream_head_->reader ()->getq (mb, tv);
00096 }
00097
00098
00099
00100
00101 template <ACE_SYNCH_DECL> int
00102 ACE_Stream<ACE_SYNCH_USE>::top (ACE_Module<ACE_SYNCH_USE> *&m)
00103 {
00104 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::top");
00105 if (this->stream_head_->next () == this->stream_tail_)
00106 return -1;
00107 else
00108 {
00109 m = this->stream_head_->next ();
00110 return 0;
00111 }
00112 }
00113
00114 template <ACE_SYNCH_DECL> int
00115 ACE_Stream<ACE_SYNCH_USE>::insert (const ACE_TCHAR *prev_name,
00116 ACE_Module<ACE_SYNCH_USE> *mod)
00117 {
00118 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::insert");
00119
00120 for (ACE_Module<ACE_SYNCH_USE> *prev_mod = this->stream_head_;
00121 prev_mod != 0;
00122 prev_mod = prev_mod->next ())
00123 if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
00124 {
00125 ACE_Module<ACE_SYNCH_USE> *next_mod = prev_mod->next ();
00126
00127
00128 if (next_mod == 0)
00129 return -1;
00130
00131 mod->link (next_mod);
00132 prev_mod->link (mod);
00133
00134 if (mod->reader ()->open (mod->arg ()) == -1)
00135 return -1;
00136
00137 if (mod->writer ()->open (mod->arg ()) == -1)
00138 return -1;
00139
00140 return 0;
00141 }
00142
00143 return -1;
00144 }
00145
00146 template <ACE_SYNCH_DECL> int
00147 ACE_Stream<ACE_SYNCH_USE>::replace (const ACE_TCHAR *replace_name,
00148 ACE_Module<ACE_SYNCH_USE> *mod,
00149 int flags)
00150 {
00151 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::replace");
00152 ACE_Module<ACE_SYNCH_USE> *prev_mod = 0;
00153
00154 for (ACE_Module<ACE_SYNCH_USE> *rep_mod = this->stream_head_;
00155 rep_mod != 0;
00156 rep_mod = rep_mod->next ())
00157 if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
00158 {
00159 ACE_Module<ACE_SYNCH_USE> *next_mod = rep_mod->next ();
00160
00161 if (next_mod)
00162 mod->link (next_mod);
00163 else
00164 {
00165 mod->writer ()->next (0);
00166 mod->next (0);
00167 this->stream_tail_ = mod;
00168 }
00169
00170 if (prev_mod)
00171 prev_mod->link (mod);
00172 else
00173 {
00174 mod->reader ()->next (0);
00175 this->stream_head_ = mod;
00176 }
00177
00178 if (mod->reader ()->open (mod->arg ()) == -1)
00179 return -1;
00180
00181 if (mod->writer ()->open (mod->arg ()) == -1)
00182 return -1;
00183
00184 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00185 {
00186 rep_mod->close (flags);
00187 delete rep_mod;
00188 }
00189
00190 return 0;
00191 }
00192 else
00193 prev_mod = rep_mod;
00194
00195 return -1;
00196 }
00197
00198
00199
00200
00201 template <ACE_SYNCH_DECL> int
00202 ACE_Stream<ACE_SYNCH_USE>::pop (int flags)
00203 {
00204 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::pop");
00205 if (this->stream_head_->next () == this->stream_tail_)
00206 return -1;
00207 else
00208 {
00209
00210 ACE_Module<ACE_SYNCH_USE> *top_mod = this->stream_head_->next ();
00211 ACE_Module<ACE_SYNCH_USE> *new_top = top_mod->next ();
00212
00213 this->stream_head_->next (new_top);
00214
00215
00216
00217 top_mod->close (flags);
00218
00219
00220 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00221 delete top_mod;
00222
00223 this->stream_head_->writer ()->next (new_top->writer ());
00224 new_top->reader ()->next (this->stream_head_->reader ());
00225 return 0;
00226 }
00227 }
00228
00229
00230
00231
00232 template <ACE_SYNCH_DECL> int
00233 ACE_Stream<ACE_SYNCH_USE>::remove (const ACE_TCHAR *name,
00234 int flags)
00235 {
00236 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::remove");
00237 ACE_Module<ACE_SYNCH_USE> *prev = 0;
00238
00239 for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00240 mod != 0;
00241 mod = mod->next ())
00242 if (ACE_OS::strcmp (mod->name (), name) == 0)
00243 {
00244 if (prev == 0)
00245 this->stream_head_->link (mod->next ());
00246 else
00247 prev->link (mod->next ());
00248
00249
00250 if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00251 {
00252
00253 mod->close (flags);
00254 delete mod;
00255 }
00256
00257 return 0;
00258 }
00259 else
00260 prev = mod;
00261
00262 return -1;
00263 }
00264
00265 template <ACE_SYNCH_DECL> ACE_Module<ACE_SYNCH_USE> *
00266 ACE_Stream<ACE_SYNCH_USE>::find (const ACE_TCHAR *name)
00267 {
00268 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::find");
00269 for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00270 mod != 0;
00271 mod = mod->next ())
00272 if (ACE_OS::strcmp (mod->name (), name) == 0)
00273 return mod;
00274
00275 return 0;
00276 }
00277
00278
00279
00280 template <ACE_SYNCH_DECL> int
00281 ACE_Stream<ACE_SYNCH_USE>::push_module (ACE_Module<ACE_SYNCH_USE> *new_top,
00282 ACE_Module<ACE_SYNCH_USE> *current_top,
00283 ACE_Module<ACE_SYNCH_USE> *head)
00284 {
00285 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push_module");
00286 ACE_Task<ACE_SYNCH_USE> *nt_reader = new_top->reader ();
00287 ACE_Task<ACE_SYNCH_USE> *nt_writer = new_top->writer ();
00288 ACE_Task<ACE_SYNCH_USE> *ct_reader = 0;
00289 ACE_Task<ACE_SYNCH_USE> *ct_writer = 0;
00290
00291 if (current_top)
00292 {
00293 ct_reader = current_top->reader ();
00294 ct_writer = current_top->writer ();
00295 ct_reader->next (nt_reader);
00296 }
00297
00298 nt_writer->next (ct_writer);
00299
00300 if (head)
00301 {
00302 if (head != new_top)
00303 head->link (new_top);
00304 }
00305 else
00306 nt_reader->next (0);
00307
00308 new_top->next (current_top);
00309
00310 if (nt_reader->open (new_top->arg ()) == -1)
00311 return -1;
00312
00313 if (nt_writer->open (new_top->arg ()) == -1)
00314 return -1;
00315 return 0;
00316 }
00317
00318 template <ACE_SYNCH_DECL> int
00319 ACE_Stream<ACE_SYNCH_USE>::open (void *a,
00320 ACE_Module<ACE_SYNCH_USE> *head,
00321 ACE_Module<ACE_SYNCH_USE> *tail)
00322 {
00323 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::open");
00324 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00325
00326 ACE_Task<ACE_SYNCH_USE> *h1 = 0, *h2 = 0;
00327 ACE_Task<ACE_SYNCH_USE> *t1 = 0, *t2 = 0;
00328
00329 if (head == 0)
00330 {
00331 ACE_NEW_RETURN (h1,
00332 ACE_Stream_Head<ACE_SYNCH_USE>,
00333 -1);
00334 ACE_NEW_RETURN (h2,
00335 ACE_Stream_Head<ACE_SYNCH_USE>,
00336 -1);
00337 ACE_NEW_RETURN (head,
00338 ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Head"),
00339 h1, h2,
00340 a,
00341 M_DELETE),
00342 -1);
00343 }
00344
00345 if (tail == 0)
00346 {
00347 ACE_NEW_RETURN (t1,
00348 ACE_Stream_Tail<ACE_SYNCH_USE>,
00349 -1);
00350 ACE_NEW_RETURN (t2,
00351 ACE_Stream_Tail<ACE_SYNCH_USE>,
00352 -1);
00353 ACE_NEW_RETURN (tail,
00354 ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Tail"),
00355 t1, t2,
00356 a,
00357 M_DELETE),
00358 -1);
00359 }
00360
00361
00362 if (head == 0 && (h1 == 0 || h2 == 0)
00363 || tail == 0 && (t1 == 0 || t2 == 0))
00364 {
00365 delete h1;
00366 delete h2;
00367 delete t1;
00368 delete t2;
00369 delete head;
00370 delete tail;
00371 errno = ENOMEM;
00372 return -1;
00373 }
00374
00375 this->stream_head_ = head;
00376 this->stream_tail_ = tail;
00377
00378 if (this->push_module (this->stream_tail_) == -1)
00379 return -1;
00380 else if (this->push_module (this->stream_head_,
00381 this->stream_tail_,
00382 this->stream_head_) == -1)
00383 return -1;
00384
00385 return 0;
00386 }
00387
00388 template <ACE_SYNCH_DECL> int
00389 ACE_Stream<ACE_SYNCH_USE>::close (int flags)
00390 {
00391 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::close");
00392 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00393
00394 if (this->stream_head_ != 0
00395 && this->stream_tail_ != 0)
00396 {
00397
00398 this->unlink_i ();
00399
00400 int result = 0;
00401
00402
00403
00404 while (this->stream_head_->next () != this->stream_tail_)
00405 if (this->pop (flags) == -1)
00406 result = -1;
00407
00408
00409 if (this->stream_head_->close (flags) == -1)
00410 result = -1;
00411 if (this->stream_tail_->close (flags) == -1)
00412 result = -1;
00413
00414
00415 delete this->stream_head_;
00416 delete this->stream_tail_;
00417
00418 this->stream_head_ = 0;
00419 this->stream_tail_ = 0;
00420
00421
00422 this->final_close_.broadcast ();
00423 return result;
00424 }
00425 return 0;
00426 }
00427
00428 template <ACE_SYNCH_DECL> int
00429 ACE_Stream<ACE_SYNCH_USE>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
00430 void *a)
00431 {
00432 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::control");
00433 ACE_IO_Cntl_Msg ioc (cmd);
00434
00435 ACE_Message_Block *db;
00436
00437
00438 ACE_NEW_RETURN (db,
00439 ACE_Message_Block (sizeof (int),
00440 ACE_Message_Block::MB_IOCTL,
00441 0,
00442 (char *) a),
00443 -1);
00444
00445
00446
00447 ACE_Message_Block *cb = 0;
00448
00449 ACE_NEW_RETURN (cb,
00450 ACE_Message_Block (sizeof ioc,
00451 ACE_Message_Block::MB_IOCTL,
00452 db,
00453 (char *) &ioc),
00454 -1);
00455
00456
00457
00458
00459
00460
00461 if (cb == 0)
00462 {
00463 db->release ();
00464 errno = ENOMEM;
00465 return -1;
00466 }
00467
00468 int result;
00469
00470 if (this->stream_head_->writer ()->put (cb) == -1)
00471 result = -1;
00472 else if (this->stream_head_->reader ()->getq (cb) == -1)
00473 result = -1;
00474 else
00475 result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
00476
00477
00478 cb->release ();
00479
00480 return result;
00481 }
00482
00483
00484
00485
00486
00487
00488 template <ACE_SYNCH_DECL> int
00489 ACE_Stream<ACE_SYNCH_USE>::link_i (ACE_Stream<ACE_SYNCH_USE> &us)
00490 {
00491 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link_i");
00492 this->linked_us_ = &us;
00493
00494 us.linked_us_ = this;
00495
00496 ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00497
00498 if (my_tail == 0)
00499 return -1;
00500
00501
00502 while (my_tail->next () != this->stream_tail_)
00503 my_tail = my_tail->next ();
00504
00505 ACE_Module<ACE_SYNCH_USE> *other_tail = us.stream_head_;
00506
00507 if (other_tail == 0)
00508 return -1;
00509
00510
00511 while (other_tail->next () != us.stream_tail_)
00512 other_tail = other_tail->next ();
00513
00514
00515 my_tail->writer ()->next (other_tail->reader ());
00516 other_tail->writer ()->next (my_tail->reader ());
00517 return 0;
00518 }
00519
00520 template <ACE_SYNCH_DECL> int
00521 ACE_Stream<ACE_SYNCH_USE>::link (ACE_Stream<ACE_SYNCH_USE> &us)
00522 {
00523 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link");
00524
00525 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00526
00527 return this->link_i (us);
00528 }
00529
00530
00531
00532 template <ACE_SYNCH_DECL> int
00533 ACE_Stream<ACE_SYNCH_USE>::unlink_i (void)
00534 {
00535 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink_i");
00536
00537
00538
00539 if (this->linked_us_ != 0)
00540 {
00541 ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00542
00543
00544 if (my_tail)
00545 {
00546
00547 while (my_tail->next () != this->stream_tail_)
00548 my_tail = my_tail->next ();
00549
00550
00551 my_tail->writer ()->next (this->stream_tail_->writer ());
00552 }
00553
00554 ACE_Module<ACE_SYNCH_USE> *other_tail =
00555 this->linked_us_->stream_head_;
00556
00557
00558 if (other_tail != 0)
00559 {
00560 while (other_tail->next () != this->linked_us_->stream_tail_)
00561 other_tail = other_tail->next ();
00562
00563 other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
00564
00565 }
00566
00567
00568 this->linked_us_->linked_us_ = 0;
00569
00570 this->linked_us_ = 0;
00571 return 0;
00572 }
00573 else
00574 return -1;
00575 }
00576
00577 template <ACE_SYNCH_DECL> int
00578 ACE_Stream<ACE_SYNCH_USE>::unlink (void)
00579 {
00580 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink");
00581 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00582 return this->unlink_i ();
00583 }
00584
00585 template <ACE_SYNCH_DECL>
00586 ACE_Stream<ACE_SYNCH_USE>::ACE_Stream (void * a,
00587 ACE_Module<ACE_SYNCH_USE> *head,
00588 ACE_Module<ACE_SYNCH_USE> *tail)
00589 : linked_us_ (0),
00590 final_close_ (this->lock_)
00591 {
00592 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::ACE_Stream");
00593 if (this->open (a, head, tail) == -1)
00594 ACE_ERROR ((LM_ERROR,
00595 ACE_LIB_TEXT ("ACE_Stream<ACE_SYNCH_USE>::open (%s, %s)\n"),
00596 head->name (), tail->name ()));
00597 }
00598
00599 template <ACE_SYNCH_DECL>
00600 ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream (void)
00601 {
00602 ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream");
00603
00604 if (this->stream_head_ != 0)
00605 this->close ();
00606 }
00607
00608 template <ACE_SYNCH_DECL>
00609 ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr)
00610 : next_ (sr.stream_head_)
00611 {
00612 ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator");
00613 }
00614
00615 #endif