Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

RMCast_Fragment.cpp

Go to the documentation of this file.
00001 // $Id: RMCast_Fragment.cpp,v 1.1.1.2.2.1 2003/05/05 16:04:55 chad Exp $
00002 
00003 #include "RMCast_Fragment.h"
00004 #include "ace/Message_Block.h"
00005 
00006 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00007 # pragma once
00008 #endif /* ACE_LACKS_PRAGMA_ONCE */
00009 
00010 #if !defined (__ACE_INLINE__)
00011 #include "RMCast_Fragment.i"
00012 #endif /* __ACE_INLINE__ */
00013 
00014 ACE_RCSID(ace, RMCast_Fragment, "$Id: RMCast_Fragment.cpp,v 1.1.1.2.2.1 2003/05/05 16:04:55 chad Exp $")
00015 
00016 
00017 ACE_RMCast_Fragment::ACE_RMCast_Fragment (void)
00018   :  ACE_RMCast_Module ()
00019   ,  max_fragment_size_ (ACE_RMCAST_DEFAULT_FRAGMENT_SIZE)
00020 {
00021 }
00022 
00023 ACE_RMCast_Fragment::~ACE_RMCast_Fragment (void)
00024 {
00025 }
00026 
00027 int
00028 ACE_RMCast_Fragment::data (ACE_RMCast::Data &received_data)
00029 {
00030   if (this->next () == 0)
00031     return 0;
00032 
00033   // The Data object sent downstream
00034   ACE_RMCast::Data data = received_data;
00035 
00036   ACE_Message_Block *mb = data.payload;
00037 
00038   // @@ We should keep the total size precomputed
00039   data.total_size = ACE_static_cast (ACE_UINT32, mb->total_length ());
00040 
00041   // We must leave room for the header
00042 #if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
00043   const int ACE_RMCAST_WRITEV_MAX = ACE_IOV_MAX - 2;
00044 #else
00045   const int ACE_RMCAST_WRITEV_MAX = ACE_IOV_MAX - 1;
00046 #endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
00047 
00048   // Assume the header will be included on each fragment, so readuce
00049   // the maximum amount of memory allowed on each fragment....
00050   const size_t fragment_header_size = 1 + 3 * sizeof(ACE_UINT32);
00051 
00052   const size_t max_fragment_payload =
00053     this->max_fragment_size_ - fragment_header_size;
00054 
00055   // Iterate over all the message blocks in the chain.  If there is
00056   // enough data to send an MTU then it is sent immediately.
00057   // The last fragment is sent with whatever data remains.
00058   // A single fragment can expand multiple message blocks, put
00059   // together in an <iovec> array, it is also possible that a single
00060   // message block requires multiple fragments... so the code below is
00061   // as simple as possible, but not any simpler ;-)
00062 
00063 
00064   // The first piece of each fragment is a header that contains:
00065   // - A sequence number for reassembly, this is unrelated to
00066   //   the sequence number for re-transmission.
00067   //   NOTE: yes, this increases the bandwidth requires by 4 bytes on
00068   //   each message, I don't think this is a big deal.
00069   // - A fragment offset for reassembly.
00070   // - The total size of the message, so the reassembly layer knows
00071   //   when a complete message has been received.
00072 
00073   // Complete the initialization of the <data> structure
00074 
00075   data.fragment_offset = 0;
00076 
00077   // The underlying transport layer can only tolerate so many elements
00078   // in a chain, so we must count them and send a fragment if we are
00079   // going over the limit.
00080 
00081   ACE_Message_Block blocks[ACE_RMCAST_WRITEV_MAX];
00082 
00083 
00084   // How many elements of the <blocks> array are in use...
00085   int iovcnt = 0;
00086 
00087   // The size of the current message, adding the size of all its
00088   // message blocks.
00089   size_t fragment_size = 0;
00090 
00091   for (ACE_Message_Block* b = mb;  b != 0; b = b->cont ())
00092     {
00093       ACE_Message_Block *current_block = &blocks[iovcnt];
00094 
00095       // Add the block to the vector...
00096 
00097       current_block->data_block (b->data_block ()->duplicate ());
00098       current_block->rd_ptr (b->rd_ptr ());
00099       current_block->wr_ptr (b->wr_ptr ());
00100       current_block->cont (0);
00101 
00102       // Set the continuation field
00103       if (iovcnt != 0)
00104         blocks[iovcnt-1].cont (current_block);
00105 
00106       size_t current_block_length = current_block->length ();
00107 
00108       // Recompute the state of the fragment
00109       fragment_size += current_block_length;
00110       iovcnt++;
00111 
00112       while (fragment_size >= max_fragment_payload)
00113         {
00114           // We have filled a fragment.  It is possible that we need
00115           // to split the last message block in multiple fragments,
00116           // thus the loop above...
00117 
00118           // First adjust the last message block to exactly fit in the
00119           // fragment:
00120           size_t last_sent_mb_len =
00121             max_fragment_payload - (fragment_size - current_block_length);
00122 
00123           // Send only enough data of the last message block to fill
00124           // the fragment...
00125           current_block->wr_ptr (current_block->rd_ptr ()
00126                               + last_sent_mb_len);
00127 
00128           data.payload = blocks;
00129           if (this->next ()->data (data) == -1)
00130             return -1;
00131 
00132           // adjust the offset
00133           data.fragment_offset += ACE_static_cast (ACE_UINT32,
00134                                                    max_fragment_payload);
00135 
00136           // Now compute how much data is left in the last message
00137           // block, to check if we should continue sending it...
00138           current_block_length -= last_sent_mb_len;
00139           if (current_block_length == 0)
00140             {
00141               // No more data from this message block, just continue
00142               // the outer loop...
00143               iovcnt = 0;
00144               fragment_size = 0;
00145               blocks[0].cont (0);
00146               break; // while
00147             }
00148 
00149           // There is some data left, we try to send it in a single
00150           // fragment, if it is still too big the beginning of this
00151           // loop will adjust things.
00152 
00153           // We must put the data in the right place in the array..
00154           char *rd_ptr = current_block->rd_ptr () + last_sent_mb_len;
00155           char *wr_ptr = rd_ptr + current_block_length;
00156           blocks[0].data_block (current_block->replace_data_block (0));
00157 
00158           // And determine what segment of the data will be sent..
00159           blocks[0].rd_ptr (rd_ptr);
00160           blocks[0].wr_ptr (wr_ptr);
00161           blocks[0].cont (0);
00162 
00163           // Adjust the state of the fragment
00164           fragment_size = current_block_length;
00165           iovcnt = 1;
00166 
00167           // Notice that if <fragment_size> is too big the start of
00168           // this loop will continue the fragmentation.
00169         }
00170 
00171       // It is also possible to fill up the iovec array before the
00172       // fragment is completed, in this case we must send whatever we
00173       // have:
00174       if (iovcnt == ACE_RMCAST_WRITEV_MAX)
00175         {
00176           if (this->next ()->data (data) == -1)
00177             return -1;
00178 
00179           iovcnt = 0;
00180           fragment_size = 0;
00181           blocks[0].cont (0);
00182         }
00183     }
00184 
00185   if (iovcnt == 0)
00186     return 0;
00187 
00188   return this->next ()->data (data);
00189 }

Generated on Mon Jun 16 13:12:35 2003 for ACE_RMCast by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002