Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

Stream.cpp

Go to the documentation of this file.
00001 #include "ace_pch.h"
00002 // Stream.cpp
00003 // $Id: Stream.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $
00004 
00005 #ifndef ACE_STREAM_C
00006 #define ACE_STREAM_C
00007 
00008 //#include "ace/Module.h"
00009 #include "ace/Stream.h"
00010 
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 # pragma once
00013 #endif /* ACE_LACKS_PRAGMA_ONCE */
00014 
00015 #include "ace/Stream_Modules.h"
00016 
00017 #if !defined (__ACE_INLINE__)
00018 #include "ace/Stream.i"
00019 #endif /* __ACE_INLINE__ */
00020 
00021 ACE_RCSID(ace, Stream, "$Id: Stream.cpp,v 1.1.1.4.2.1 2003/03/13 19:44:22 chad Exp $")
00022 
00023 ACE_ALLOC_HOOK_DEFINE(ACE_Stream)
00024 
00025 // Give some idea of what the heck is going on in a stream!
00026 
00027 template <ACE_SYNCH_DECL> void
00028 ACE_Stream<ACE_SYNCH_USE>::dump (void) const
00029 {
00030   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::dump");
00031   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- module links --------\n")));
00032 
00033   for (ACE_Module<ACE_SYNCH_USE> *mp = this->stream_head_;
00034        ;
00035        mp = mp->next ())
00036     {
00037       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("module name = %s\n"), mp->name ()));
00038       if (mp == this->stream_tail_)
00039         break;
00040     }
00041 
00042   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- writer links --------\n")));
00043 
00044   ACE_Task<ACE_SYNCH_USE> *tp;
00045 
00046   for (tp = this->stream_head_->writer ();
00047        ;
00048        tp = tp->next ())
00049     {
00050       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("writer queue name = %s\n"), tp->name ()));
00051       tp->dump ();
00052       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------\n")));
00053       if (tp == this->stream_tail_->writer ()
00054           || (this->linked_us_
00055               && tp == this->linked_us_->stream_head_->reader ()))
00056         break;
00057     }
00058 
00059   ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------- reader links --------\n")));
00060   for (tp = this->stream_tail_->reader (); ; tp = tp->next ())
00061     {
00062       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("reader queue name = %s\n"), tp->name ()));
00063       tp->dump ();
00064       ACE_DEBUG ((LM_DEBUG,  ACE_LIB_TEXT ("-------\n")));
00065       if (tp == this->stream_head_->reader ()
00066           || (this->linked_us_
00067               && tp == this->linked_us_->stream_head_->writer ()))
00068         break;
00069     }
00070 }
00071 
00072 template <ACE_SYNCH_DECL> int
00073 ACE_Stream<ACE_SYNCH_USE>::push (ACE_Module<ACE_SYNCH_USE> *new_top)
00074 {
00075   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push");
00076   if (this->push_module  (new_top,
00077                           this->stream_head_->next (),
00078                           this->stream_head_) == -1)
00079     return -1;
00080   else
00081     return 0;
00082 }
00083 
00084 template <ACE_SYNCH_DECL> int
00085 ACE_Stream<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
00086 {
00087   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::put");
00088   return this->stream_head_->writer ()->put (mb, tv);
00089 }
00090 
00091 template <ACE_SYNCH_DECL> int
00092 ACE_Stream<ACE_SYNCH_USE>::get (ACE_Message_Block *&mb, ACE_Time_Value *tv)
00093 {
00094   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::get");
00095   return this->stream_head_->reader ()->getq (mb, tv);
00096 }
00097 
00098 // Return the "top" ACE_Module in a ACE_Stream, skipping over the
00099 // stream_head.
00100 
00101 template <ACE_SYNCH_DECL> int
00102 ACE_Stream<ACE_SYNCH_USE>::top (ACE_Module<ACE_SYNCH_USE> *&m)
00103 {
00104   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::top");
00105   if (this->stream_head_->next () == this->stream_tail_)
00106     return -1;
00107   else
00108     {
00109       m = this->stream_head_->next ();
00110       return 0;
00111     }
00112 }
00113 
00114 template <ACE_SYNCH_DECL> int
00115 ACE_Stream<ACE_SYNCH_USE>::insert (const ACE_TCHAR *prev_name,
00116                                    ACE_Module<ACE_SYNCH_USE> *mod)
00117 {
00118   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::insert");
00119 
00120   for (ACE_Module<ACE_SYNCH_USE> *prev_mod = this->stream_head_;
00121        prev_mod != 0;
00122        prev_mod = prev_mod->next ())
00123     if (ACE_OS::strcmp (prev_mod->name (), prev_name) == 0)
00124       {
00125         ACE_Module<ACE_SYNCH_USE> *next_mod = prev_mod->next ();
00126 
00127         // We can't insert a module below <stream_tail_>.
00128         if (next_mod == 0)
00129           return -1;
00130 
00131         mod->link (next_mod);
00132         prev_mod->link (mod);
00133 
00134         if (mod->reader ()->open (mod->arg ()) == -1)
00135           return -1;
00136 
00137         if (mod->writer ()->open (mod->arg ()) == -1)
00138           return -1;
00139 
00140         return 0;
00141       }
00142 
00143   return -1;
00144 }
00145 
00146 template <ACE_SYNCH_DECL> int
00147 ACE_Stream<ACE_SYNCH_USE>::replace (const ACE_TCHAR *replace_name,
00148                                     ACE_Module<ACE_SYNCH_USE> *mod,
00149                                     int flags)
00150 {
00151   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::replace");
00152   ACE_Module<ACE_SYNCH_USE> *prev_mod = 0;
00153 
00154   for (ACE_Module<ACE_SYNCH_USE> *rep_mod = this->stream_head_;
00155        rep_mod != 0;
00156        rep_mod = rep_mod->next ())
00157     if (ACE_OS::strcmp (rep_mod->name (), replace_name) == 0)
00158       {
00159         ACE_Module<ACE_SYNCH_USE> *next_mod = rep_mod->next ();
00160 
00161         if (next_mod)
00162           mod->link (next_mod);
00163         else // In case the <next_mod> is <stream_tail_>.
00164           {
00165             mod->writer ()->next (0);
00166             mod->next (0);
00167             this->stream_tail_ = mod;
00168           }
00169 
00170         if (prev_mod)
00171           prev_mod->link (mod);
00172         else // In case the <rep_mod> is <stream_head_>.
00173           {
00174             mod->reader ()->next (0);
00175             this->stream_head_ = mod;
00176           }
00177 
00178         if (mod->reader ()->open (mod->arg ()) == -1)
00179           return -1;
00180 
00181         if (mod->writer ()->open (mod->arg ()) == -1)
00182           return -1;
00183 
00184         if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00185           {
00186             rep_mod->close (flags);
00187             delete rep_mod;
00188           }
00189 
00190         return 0;
00191       }
00192     else
00193       prev_mod = rep_mod;
00194 
00195   return -1;
00196 }
00197 
00198 // Remove the "top" ACE_Module in a ACE_Stream, skipping over the
00199 // stream_head.
00200 
00201 template <ACE_SYNCH_DECL> int
00202 ACE_Stream<ACE_SYNCH_USE>::pop (int flags)
00203 {
00204   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::pop");
00205   if (this->stream_head_->next () == this->stream_tail_)
00206     return -1;
00207   else
00208     {
00209       // Skip over the ACE_Stream head.
00210       ACE_Module<ACE_SYNCH_USE> *top_mod = this->stream_head_->next ();
00211       ACE_Module<ACE_SYNCH_USE> *new_top = top_mod->next ();
00212 
00213       this->stream_head_->next (new_top);
00214 
00215       // Close the top ACE_Module.
00216 
00217       top_mod->close (flags);
00218 
00219       // Don't delete the Module unless the flags request this.
00220       if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00221         delete top_mod;
00222 
00223       this->stream_head_->writer ()->next (new_top->writer ());
00224       new_top->reader ()->next (this->stream_head_->reader ());
00225       return 0;
00226     }
00227 }
00228 
00229 // Remove a named ACE_Module from an arbitrary place in the
00230 // ACE_Stream.
00231 
00232 template <ACE_SYNCH_DECL> int
00233 ACE_Stream<ACE_SYNCH_USE>::remove (const ACE_TCHAR *name,
00234                                  int flags)
00235 {
00236   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::remove");
00237   ACE_Module<ACE_SYNCH_USE> *prev = 0;
00238 
00239   for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00240        mod != 0;
00241        mod = mod->next ())
00242     if (ACE_OS::strcmp (mod->name (), name) == 0)
00243       {
00244         if (prev == 0) // Deleting ACE_Stream Head
00245           this->stream_head_->link (mod->next ());
00246         else
00247           prev->link (mod->next ());
00248 
00249         // Don't delete the Module unless the flags request this.
00250         if (flags != ACE_Module<ACE_SYNCH_USE>::M_DELETE_NONE)
00251           {
00252             // Close down the module and release the memory.
00253             mod->close (flags);
00254             delete mod;
00255           }
00256 
00257         return 0;
00258       }
00259     else
00260       prev = mod;
00261 
00262   return -1;
00263 }
00264 
00265 template <ACE_SYNCH_DECL> ACE_Module<ACE_SYNCH_USE> *
00266 ACE_Stream<ACE_SYNCH_USE>::find (const ACE_TCHAR *name)
00267 {
00268   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::find");
00269   for (ACE_Module<ACE_SYNCH_USE> *mod = this->stream_head_;
00270        mod != 0;
00271        mod = mod->next ())
00272     if (ACE_OS::strcmp (mod->name (), name) == 0)
00273         return mod;
00274 
00275   return 0;
00276 }
00277 
00278 // Actually push a module onto the stack...
00279 
00280 template <ACE_SYNCH_DECL> int
00281 ACE_Stream<ACE_SYNCH_USE>::push_module (ACE_Module<ACE_SYNCH_USE> *new_top,
00282                                       ACE_Module<ACE_SYNCH_USE> *current_top,
00283                                       ACE_Module<ACE_SYNCH_USE> *head)
00284 {
00285   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::push_module");
00286   ACE_Task<ACE_SYNCH_USE> *nt_reader = new_top->reader ();
00287   ACE_Task<ACE_SYNCH_USE> *nt_writer = new_top->writer ();
00288   ACE_Task<ACE_SYNCH_USE> *ct_reader = 0;
00289   ACE_Task<ACE_SYNCH_USE> *ct_writer = 0;
00290 
00291   if (current_top)
00292     {
00293       ct_reader = current_top->reader ();
00294       ct_writer = current_top->writer ();
00295       ct_reader->next (nt_reader);
00296     }
00297 
00298   nt_writer->next (ct_writer);
00299 
00300   if (head)
00301     {
00302       if (head != new_top)
00303         head->link (new_top);
00304     }
00305   else
00306     nt_reader->next (0);
00307 
00308   new_top->next (current_top);
00309 
00310   if (nt_reader->open (new_top->arg ()) == -1)
00311     return -1;
00312 
00313   if (nt_writer->open (new_top->arg ()) == -1)
00314     return -1;
00315   return 0;
00316 }
00317 
00318 template <ACE_SYNCH_DECL> int
00319 ACE_Stream<ACE_SYNCH_USE>::open (void *a,
00320                                  ACE_Module<ACE_SYNCH_USE> *head,
00321                                  ACE_Module<ACE_SYNCH_USE> *tail)
00322 {
00323   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::open");
00324   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00325 
00326   ACE_Task<ACE_SYNCH_USE> *h1 = 0, *h2 = 0;
00327   ACE_Task<ACE_SYNCH_USE> *t1 = 0, *t2 = 0;
00328 
00329   if (head == 0)
00330     {
00331       ACE_NEW_RETURN (h1,
00332                       ACE_Stream_Head<ACE_SYNCH_USE>,
00333                       -1);
00334       ACE_NEW_RETURN (h2,
00335                       ACE_Stream_Head<ACE_SYNCH_USE>,
00336                       -1);
00337       ACE_NEW_RETURN (head,
00338                       ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Head"),
00339                                                  h1, h2,
00340                                                  a,
00341                                                  M_DELETE),
00342                       -1);
00343     }
00344 
00345   if (tail == 0)
00346     {
00347       ACE_NEW_RETURN (t1,
00348                       ACE_Stream_Tail<ACE_SYNCH_USE>,
00349                       -1);
00350       ACE_NEW_RETURN (t2,
00351                       ACE_Stream_Tail<ACE_SYNCH_USE>,
00352                       -1);
00353       ACE_NEW_RETURN (tail,
00354                       ACE_Module<ACE_SYNCH_USE> (ACE_LIB_TEXT ("ACE_Stream_Tail"),
00355                                                  t1, t2,
00356                                                  a,
00357                                                  M_DELETE),
00358                       -1);
00359     }
00360 
00361   // Make sure *all* the allocation succeeded!
00362   if (head == 0 && (h1 == 0 || h2 == 0)
00363       || tail == 0 && (t1 == 0 || t2 == 0))
00364     {
00365       delete h1;
00366       delete h2;
00367       delete t1;
00368       delete t2;
00369       delete head;
00370       delete tail;
00371       errno = ENOMEM;
00372       return -1;
00373     }
00374 
00375   this->stream_head_ = head;
00376   this->stream_tail_ = tail;
00377 
00378   if (this->push_module (this->stream_tail_) == -1)
00379     return -1;
00380   else if (this->push_module (this->stream_head_,
00381                               this->stream_tail_,
00382                               this->stream_head_) == -1)
00383     return -1;
00384 
00385   return 0;
00386 }
00387 
00388 template <ACE_SYNCH_DECL> int
00389 ACE_Stream<ACE_SYNCH_USE>::close (int flags)
00390 {
00391   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::close");
00392   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00393 
00394   if (this->stream_head_ != 0
00395       && this->stream_tail_ != 0)
00396     {
00397       // Don't bother checking return value here.
00398       this->unlink_i ();
00399 
00400       int result = 0;
00401 
00402       // Remove and cleanup all the intermediate modules.
00403 
00404       while (this->stream_head_->next () != this->stream_tail_)
00405         if (this->pop (flags) == -1)
00406           result = -1;
00407 
00408       // Clean up the head and tail of the stream.
00409       if (this->stream_head_->close (flags) == -1)
00410         result = -1;
00411       if (this->stream_tail_->close (flags) == -1)
00412         result = -1;
00413 
00414       // Cleanup the memory.
00415       delete this->stream_head_;
00416       delete this->stream_tail_;
00417 
00418       this->stream_head_ = 0;
00419       this->stream_tail_ = 0;
00420 
00421       // Tell all threads waiting on the close that we are done.
00422       this->final_close_.broadcast ();
00423       return result;
00424     }
00425   return 0;
00426 }
00427 
00428 template <ACE_SYNCH_DECL> int
00429 ACE_Stream<ACE_SYNCH_USE>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
00430                                     void *a)
00431 {
00432   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::control");
00433   ACE_IO_Cntl_Msg ioc (cmd);
00434 
00435   ACE_Message_Block *db;
00436 
00437   // Try to create a data block that contains the user-supplied data.
00438   ACE_NEW_RETURN (db,
00439                   ACE_Message_Block (sizeof (int),
00440                                      ACE_Message_Block::MB_IOCTL,
00441                                      0,
00442                                      (char *) a),
00443                   -1);
00444   // Try to create a control block <cb> that contains the control
00445   // field and a pointer to the data block <db> in <cb>'s continuation
00446   // field.
00447   ACE_Message_Block *cb = 0;
00448 
00449   ACE_NEW_RETURN (cb,
00450                   ACE_Message_Block (sizeof ioc,
00451                                      ACE_Message_Block::MB_IOCTL,
00452                                      db,
00453                                      (char *) &ioc),
00454                   -1);
00455   // @@ Michael: The old semantic assumed that cb returns == 0
00456   //             if no memory was available. We will now return immediately
00457   //             without release (errno is set to ENOMEM by the macro).
00458 
00459   // If we can't allocate <cb> then we need to delete db and return
00460   // -1.
00461   if (cb == 0)
00462     {
00463       db->release ();
00464       errno = ENOMEM;
00465       return -1;
00466     }
00467 
00468   int result;
00469 
00470   if (this->stream_head_->writer ()->put (cb) == -1)
00471     result = -1;
00472   else if (this->stream_head_->reader ()->getq (cb) == -1)
00473     result = -1;
00474   else
00475     result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
00476 
00477   // This will also release db if it's reference count == 0.
00478   cb->release ();
00479 
00480   return result;
00481 }
00482 
00483 // Link two streams together at their bottom-most Modules (i.e., the
00484 // one just above the Stream tail).  Note that all of this is premised
00485 // on the fact that the Stream head and Stream tail are non-NULL...
00486 // This must be called with locks held.
00487 
00488 template <ACE_SYNCH_DECL> int
00489 ACE_Stream<ACE_SYNCH_USE>::link_i (ACE_Stream<ACE_SYNCH_USE> &us)
00490 {
00491   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link_i");
00492   this->linked_us_ = &us;
00493   // Make sure the other side is also linked to us!
00494   us.linked_us_ = this;
00495 
00496   ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00497 
00498   if (my_tail == 0)
00499     return -1;
00500 
00501   // Locate the module just above our Stream tail.
00502   while (my_tail->next () != this->stream_tail_)
00503     my_tail = my_tail->next ();
00504 
00505   ACE_Module<ACE_SYNCH_USE> *other_tail = us.stream_head_;
00506 
00507   if (other_tail == 0)
00508     return -1;
00509 
00510   // Locate the module just above the other Stream's tail.
00511   while (other_tail->next () != us.stream_tail_)
00512     other_tail = other_tail->next ();
00513 
00514   // Reattach the pointers so that the two streams are linked!
00515   my_tail->writer ()->next (other_tail->reader ());
00516   other_tail->writer ()->next (my_tail->reader ());
00517   return 0;
00518 }
00519 
00520 template <ACE_SYNCH_DECL> int
00521 ACE_Stream<ACE_SYNCH_USE>::link (ACE_Stream<ACE_SYNCH_USE> &us)
00522 {
00523   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::link");
00524 
00525   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00526 
00527   return this->link_i (us);
00528 }
00529 
00530 // Must be called with locks held...
00531 
00532 template <ACE_SYNCH_DECL> int
00533 ACE_Stream<ACE_SYNCH_USE>::unlink_i (void)
00534 {
00535   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink_i");
00536 
00537   // Only try to unlink if we are in fact still linked!
00538 
00539   if (this->linked_us_ != 0)
00540     {
00541       ACE_Module<ACE_SYNCH_USE> *my_tail = this->stream_head_;
00542 
00543       // Only relink if we still exist!
00544       if (my_tail)
00545         {
00546           // Find the module that's just before our stream tail.
00547           while (my_tail->next () != this->stream_tail_)
00548             my_tail = my_tail->next ();
00549 
00550           // Restore the writer's next() link to our tail.
00551           my_tail->writer ()->next (this->stream_tail_->writer ());
00552         }
00553 
00554       ACE_Module<ACE_SYNCH_USE> *other_tail =
00555         this->linked_us_->stream_head_;
00556 
00557       // Only fiddle with the other side if it in fact still remains.
00558       if (other_tail != 0)
00559         {
00560           while (other_tail->next () != this->linked_us_->stream_tail_)
00561             other_tail = other_tail->next ();
00562 
00563           other_tail->writer ()->next (this->linked_us_->stream_tail_->writer ());
00564 
00565         }
00566 
00567       // Make sure the other side is also aware that it's been unlinked!
00568       this->linked_us_->linked_us_ = 0;
00569 
00570       this->linked_us_ = 0;
00571       return 0;
00572     }
00573   else
00574     return -1;
00575 }
00576 
00577 template <ACE_SYNCH_DECL> int
00578 ACE_Stream<ACE_SYNCH_USE>::unlink (void)
00579 {
00580   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::unlink");
00581   ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
00582   return this->unlink_i ();
00583 }
00584 
00585 template <ACE_SYNCH_DECL>
00586 ACE_Stream<ACE_SYNCH_USE>::ACE_Stream (void * a,
00587                                        ACE_Module<ACE_SYNCH_USE> *head,
00588                                        ACE_Module<ACE_SYNCH_USE> *tail)
00589   : linked_us_ (0),
00590     final_close_ (this->lock_)
00591 {
00592   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::ACE_Stream");
00593   if (this->open (a, head, tail) == -1)
00594     ACE_ERROR ((LM_ERROR,
00595                 ACE_LIB_TEXT ("ACE_Stream<ACE_SYNCH_USE>::open (%s, %s)\n"),
00596                head->name (), tail->name ()));
00597 }
00598 
00599 template <ACE_SYNCH_DECL>
00600 ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream (void)
00601 {
00602   ACE_TRACE ("ACE_Stream<ACE_SYNCH_USE>::~ACE_Stream");
00603 
00604   if (this->stream_head_ != 0)
00605     this->close ();
00606 }
00607 
00608 template <ACE_SYNCH_DECL>
00609 ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr)
00610   : next_ (sr.stream_head_)
00611 {
00612   ACE_TRACE ("ACE_Stream_Iterator<ACE_SYNCH_USE>::ACE_Stream_Iterator");
00613 }
00614 
00615 #endif /* ACE_STREAM_C */

Generated on Mon Jun 16 11:21:29 2003 for ACE by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002