Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

ACE_RMCast_Reordering Class Reference

Pass messages up in sent order. More...

#include <RMCast_Reordering.h>

Inheritance diagram for ACE_RMCast_Reordering:

Inheritance graph
[legend]
Collaboration diagram for ACE_RMCast_Reordering:

Collaboration graph
[legend]
List of all members.

Public Types

typedef ACE_RB_Tree< ACE_UINT32,
ACE_RMCast::Data, ACE_Less_Than<
ACE_UINT32 >, ACE_Null_Mutex
Messages
typedef ACE_RB_Tree_Iterator<
ACE_UINT32, ACE_RMCast::Data,
ACE_Less_Than< ACE_UINT32 >,
ACE_Null_Mutex
Messages_Iterator

Public Methods

 ACE_RMCast_Reordering (void)
 Constructor. More...

virtual ~ACE_RMCast_Reordering (void)
 Destructor. More...

virtual int close (void)
 Remove messages still pending. More...

virtual int data (ACE_RMCast::Data &)
 Process a Data message. More...

virtual int ack_join (ACE_RMCast::Ack_Join &)
 During the join process the server informs us of the next expected message. More...


Protected Attributes

Messages messages_
 The reordering buffer. More...

ACE_UINT32 next_expected_
 The smallest value of
Parameters:
next_expected  for all the proxies.
More...


ACE_UINT32 highest_received_
 The highest value of
Parameters:
highest_received  for all the proxies.
More...


ACE_SYNCH_MUTEX mutex_
 Synchronization. More...


Private Methods

int push_queued_messages (void)
 Push any messages that are pending in the queue. More...


Detailed Description

Pass messages up in sent order.

Some applications require receivers to process messages in the same order that messages are sent. This module buffers out of order messages and only delivers a message if:

The module also sends the Ack feedback to the sender.

NOTE: This is not the same as causal or total ordering, that could be implemented someday, but requires a lot more than what we have right now.

Definition at line 44 of file RMCast_Reordering.h.


Member Typedef Documentation

typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> ACE_RMCast_Reordering::Messages
 

Definition at line 55 of file RMCast_Reordering.h.

typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> ACE_RMCast_Reordering::Messages_Iterator
 

Definition at line 56 of file RMCast_Reordering.h.


Constructor & Destructor Documentation

ACE_INLINE ACE_RMCast_Reordering::ACE_RMCast_Reordering void   
 

Constructor.

Definition at line 4 of file RMCast_Reordering.i.

00005   :  next_expected_ (0)
00006   ,  highest_received_ (0)
00007 {
00008 }

ACE_RMCast_Reordering::~ACE_RMCast_Reordering void    [virtual]
 

Destructor.

Definition at line 15 of file RMCast_Reordering.cpp.

00016 {
00017 }


Member Function Documentation

int ACE_RMCast_Reordering::ack_join ACE_RMCast::Ack_Join   [virtual]
 

During the join process the server informs us of the next expected message.

Reimplemented from ACE_RMCast_Module.

Definition at line 115 of file RMCast_Reordering.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::begin, ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::end, highest_received_, ACE_RB_Tree_Iterator< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::key, messages_, next_expected_, ACE_RMCast::Ack_Join::next_sequence_number, push_queued_messages, ACE_Message_Block::release, and ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::unbind.

00116 {
00117   //ACE_DEBUG ((LM_DEBUG, "RMCast_Reordering::ack_join - <%d,%d>\n",
00118   //            this->next_expected_,
00119   //            ack_join.next_sequence_number));
00120 
00121   {
00122     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
00123     if (this->next_expected_ >= ack_join.next_sequence_number)
00124       {
00125         // Nothing to do in this case...
00126         return 0;
00127       }
00128 
00129     Messages_Iterator i = this->messages_.begin ();
00130     Messages_Iterator end = this->messages_.end ();
00131 
00132     while (i != end
00133            && (*i).key () < ack_join.next_sequence_number)
00134       {
00135         ACE_Message_Block::release ((*i).item ().payload);
00136         this->messages_.unbind ((*i).key ());
00137         i = this->messages_.begin ();
00138       }
00139 
00140     this->next_expected_ = ack_join.next_sequence_number;
00141     if (this->highest_received_ < ack_join.next_sequence_number)
00142       this->highest_received_ = ack_join.next_sequence_number;
00143 
00144     this->push_queued_messages ();
00145   }
00146 
00147   return 0;
00148 }

int ACE_RMCast_Reordering::close void    [virtual]
 

Remove messages still pending.

Reimplemented from ACE_RMCast_Module.

Definition at line 20 of file RMCast_Reordering.cpp.

References ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::begin, ACE_RMCast_Module::close, ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::end, messages_, ACE_Message_Block::release, and ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::unbind.

00021 {
00022   Messages_Iterator i = this->messages_.begin ();
00023   Messages_Iterator end = this->messages_.end ();
00024 
00025   while (i != end)
00026     {
00027       ACE_Message_Block::release ((*i).item ().payload);
00028       this->messages_.unbind ((*i).key ());
00029       i = this->messages_.begin ();
00030     }
00031   return this->ACE_RMCast_Module::close ();
00032 }

int ACE_RMCast_Reordering::data ACE_RMCast::Data   [virtual]
 

Process a Data message.

Process a Data message, sending the right Ack message back. The message is passed up only if it is in order.

Reimplemented from ACE_RMCast_Module.

Definition at line 35 of file RMCast_Reordering.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ACE_RMCast_Module::ack, ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::bind, ACE_RMCast_Module::data, ACE_Message_Block::duplicate, ACE_RMCast::Ack::highest_received, highest_received_, messages_, ACE_RMCast_Module::next, ACE_RMCast::Ack::next_expected, next_expected_, ACE_RMCast::Data::payload, push_queued_messages, ACE_RMCast_Proxy::reply_ack, ACE_RMCast::Data::sequence_number, and ACE_RMCast::Data::source.

Referenced by push_queued_messages.

00036 {
00037   int must_ack = 0;
00038   int result = 0;
00039   ACE_RMCast::Ack ack;
00040 
00041   //ACE_DEBUG ((LM_DEBUG, "Received message (%d)\n", data.sequence_number));
00042   {
00043     ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
00044 
00045     if (data.sequence_number < this->next_expected_)
00046       {
00047         // Old message.  Ack with the current status (look at the end
00048         // of this block).
00049         must_ack = 1;
00050 
00051         //ACE_DEBUG ((LM_DEBUG, ".... old message is ignored\n"));
00052       }
00053 
00054     else if (data.sequence_number == this->next_expected_)
00055       {
00056         //ACE_DEBUG ((LM_DEBUG, ".... message is in order, received\n"));
00057 
00058         // Accept the message, the current thread will dispatch it, so
00059         // it is marked as accepted (using the <next_expected> field).
00060         // Any other thread will not push that message because now it
00061         // is "old".
00062 
00063         this->next_expected_++;
00064 
00065         // Right message, process as many messages as possible from
00066         // the queue, then ack the right level...
00067 
00068         // NOTE: we cannot release the mutex while dispatching
00069         // events, otherwise: how do we stop other threads from
00070         // delivering messages out of order?  I.E. what if the
00071         // next thread receives the next message?
00072         if (this->next () != 0)
00073           {
00074             result = this->next ()->data (data);
00075           }
00076 
00077         // After delivering one message there may be more messages
00078         // pending
00079         if (result == 0)
00080           result = this->push_queued_messages ();
00081 
00082         //@@ This should be strategized, for example, only Ack if
00083         //   there is a message out of order or something, otherwise
00084         //   continue with happiness.  That works well for "optimistic
00085         //   models".
00086         must_ack = 1;
00087       }
00088 
00089     else
00090       {
00091         //ACE_DEBUG ((LM_DEBUG, ".... message out of sequence, saved\n"));
00092 
00093         // Out of sequence.
00094         if (this->highest_received_ < data.sequence_number)
00095           {
00096             this->highest_received_ = data.sequence_number;
00097           }
00098         ACE_RMCast::Data new_data = data;
00099         new_data.payload = ACE_Message_Block::duplicate (data.payload);
00100         (void) this->messages_.bind (data.sequence_number, new_data);
00101         // re-ack, otherwise save it and ack.
00102       }
00103 
00104     ack.next_expected = this->next_expected_;
00105     ack.highest_received = this->highest_received_;
00106   }
00107 
00108   if (must_ack && data.source != 0)
00109     (void) data.source->reply_ack (ack);
00110 
00111   return result;
00112 }

int ACE_RMCast_Reordering::push_queued_messages void    [private]
 

Push any messages that are pending in the queue.

Definition at line 151 of file RMCast_Reordering.cpp.

References ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::begin, ACE_RMCast_Module::data, data, ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::end, ACE_RB_Tree_Iterator< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::key, messages_, ACE_RMCast_Module::next, next_expected_, ACE_Message_Block::release, and ACE_RB_Tree< ACE_UINT32, ACE_RMCast::Data, ACE_Less_Than< ACE_UINT32 >, ACE_Null_Mutex >::unbind.

Referenced by ack_join, and data.

00152 {
00153   Messages_Iterator i = this->messages_.begin ();
00154   Messages_Iterator end = this->messages_.end ();
00155 
00156   while (i != end
00157          && (*i).key () == this->next_expected_)
00158     {
00159       int r = 0;
00160       if (this->next () != 0)
00161         {
00162           ACE_RMCast::Data data = (*i).item ();
00163           r = this->next ()->data (data);
00164         }
00165 
00166       ACE_Message_Block::release ((*i).item ().payload);
00167       this->messages_.unbind ((*i).key ());
00168       i = this->messages_.begin ();
00169       this->next_expected_++;
00170       if (r != 0)
00171         return r;
00172     }
00173   return 0;
00174 }


Member Data Documentation

ACE_UINT32 ACE_RMCast_Reordering::highest_received_ [protected]
 

The highest value of

Parameters:
highest_received  for all the proxies.

Definition at line 85 of file RMCast_Reordering.h.

Referenced by ack_join, and data.

Messages ACE_RMCast_Reordering::messages_ [protected]
 

The reordering buffer.

Definition at line 79 of file RMCast_Reordering.h.

Referenced by ack_join, close, data, and push_queued_messages.

ACE_SYNCH_MUTEX ACE_RMCast_Reordering::mutex_ [protected]
 

Synchronization.

Definition at line 88 of file RMCast_Reordering.h.

ACE_UINT32 ACE_RMCast_Reordering::next_expected_ [protected]
 

The smallest value of

Parameters:
next_expected  for all the proxies.

Definition at line 82 of file RMCast_Reordering.h.

Referenced by ack_join, data, and push_queued_messages.


The documentation for this class was generated from the following files:
Generated on Mon Jun 16 13:14:14 2003 for ACE_RMCast by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002