Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

RMCast_Reordering.cpp

Go to the documentation of this file.
00001 //
00002 // $Id: RMCast_Reordering.cpp,v 1.1.1.1 2001/12/04 14:33:17 chad Exp $
00003 //
00004 
00005 #include "RMCast_Reordering.h"
00006 #include "RMCast_Proxy.h"
00007 #include "ace/Message_Block.h"
00008 
00009 #if !defined (__ACE_INLINE__)
00010 # include "RMCast_Reordering.i"
00011 #endif /* ! __ACE_INLINE__ */
00012 
00013 ACE_RCSID(ace, RMCast_Reordering, "$Id: RMCast_Reordering.cpp,v 1.1.1.1 2001/12/04 14:33:17 chad Exp $")
00014 
00015 ACE_RMCast_Reordering::~ACE_RMCast_Reordering (void)
00016 {
00017 }
00018 
00019 int
00020 ACE_RMCast_Reordering::close (void)
00021 {
00022   Messages_Iterator i = this->messages_.begin ();
00023   Messages_Iterator end = this->messages_.end ();
00024 
00025   while (i != end)
00026     {
00027       ACE_Message_Block::release ((*i).item ().payload);
00028       this->messages_.unbind ((*i).key ());
00029       i = this->messages_.begin ();
00030     }
00031   return this->ACE_RMCast_Module::close ();
00032 }
00033 
00034 int
00035 ACE_RMCast_Reordering::data (ACE_RMCast::Data &data)
00036 {
00037   int must_ack = 0;
00038   int result = 0;
00039   ACE_RMCast::Ack ack;
00040 
00041   //ACE_DEBUG ((LM_DEBUG, "Received message (%d)\n", data.sequence_number));
00042   {
00043     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
00044 
00045     if (data.sequence_number < this->next_expected_)
00046       {
00047         // Old message.  Ack with the current status (look at the end
00048         // of this block).
00049         must_ack = 1;
00050 
00051         //ACE_DEBUG ((LM_DEBUG, ".... old message is ignored\n"));
00052       }
00053 
00054     else if (data.sequence_number == this->next_expected_)
00055       {
00056         //ACE_DEBUG ((LM_DEBUG, ".... message is in order, received\n"));
00057 
00058         // Accept the message, the current thread will dispatch it, so
00059         // it is marked as accepted (using the <next_expected> field).
00060         // Any other thread will not push that message because now it
00061         // is "old".
00062 
00063         this->next_expected_++;
00064 
00065         // Right message, process as many messages as possible from
00066         // the queue, then ack the right level...
00067 
00068         // NOTE: we cannot release the mutex while dispatching
00069         // events, otherwise: how do we stop other threads from
00070         // delivering messages out of order?  I.E. what if the
00071         // next thread receives the next message?
00072         if (this->next () != 0)
00073           {
00074             result = this->next ()->data (data);
00075           }
00076 
00077         // After delivering one message there may be more messages
00078         // pending
00079         if (result == 0)
00080           result = this->push_queued_messages ();
00081 
00082         //@@ This should be strategized, for example, only Ack if
00083         //   there is a message out of order or something, otherwise
00084         //   continue with happiness.  That works well for "optimistic
00085         //   models".
00086         must_ack = 1;
00087       }
00088 
00089     else
00090       {
00091         //ACE_DEBUG ((LM_DEBUG, ".... message out of sequence, saved\n"));
00092 
00093         // Out of sequence.
00094         if (this->highest_received_ < data.sequence_number)
00095           {
00096             this->highest_received_ = data.sequence_number;
00097           }
00098         ACE_RMCast::Data new_data = data;
00099         new_data.payload = ACE_Message_Block::duplicate (data.payload);
00100         (void) this->messages_.bind (data.sequence_number, new_data);
00101         // re-ack, otherwise save it and ack.
00102       }
00103 
00104     ack.next_expected = this->next_expected_;
00105     ack.highest_received = this->highest_received_;
00106   }
00107 
00108   if (must_ack && data.source != 0)
00109     (void) data.source->reply_ack (ack);
00110 
00111   return result;
00112 }
00113 
00114 int
00115 ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join)
00116 {
00117   //ACE_DEBUG ((LM_DEBUG, "RMCast_Reordering::ack_join - <%d,%d>\n",
00118   //            this->next_expected_,
00119   //            ack_join.next_sequence_number));
00120 
00121   {
00122     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
00123     if (this->next_expected_ >= ack_join.next_sequence_number)
00124       {
00125         // Nothing to do in this case...
00126         return 0;
00127       }
00128 
00129     Messages_Iterator i = this->messages_.begin ();
00130     Messages_Iterator end = this->messages_.end ();
00131 
00132     while (i != end
00133            && (*i).key () < ack_join.next_sequence_number)
00134       {
00135         ACE_Message_Block::release ((*i).item ().payload);
00136         this->messages_.unbind ((*i).key ());
00137         i = this->messages_.begin ();
00138       }
00139 
00140     this->next_expected_ = ack_join.next_sequence_number;
00141     if (this->highest_received_ < ack_join.next_sequence_number)
00142       this->highest_received_ = ack_join.next_sequence_number;
00143 
00144     this->push_queued_messages ();
00145   }
00146 
00147   return 0;
00148 }
00149 
00150 int
00151 ACE_RMCast_Reordering::push_queued_messages (void)
00152 {
00153   Messages_Iterator i = this->messages_.begin ();
00154   Messages_Iterator end = this->messages_.end ();
00155 
00156   while (i != end
00157          && (*i).key () == this->next_expected_)
00158     {
00159       int r = 0;
00160       if (this->next () != 0)
00161         {
00162           ACE_RMCast::Data data = (*i).item ();
00163           r = this->next ()->data (data);
00164         }
00165 
00166       ACE_Message_Block::release ((*i).item ().payload);
00167       this->messages_.unbind ((*i).key ());
00168       i = this->messages_.begin ();
00169       this->next_expected_++;
00170       if (r != 0)
00171         return r;
00172     }
00173   return 0;
00174 }
00175 
00176 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00177 
00178 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Mon Jun 16 13:12:37 2003 for ACE_RMCast by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002