Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

RMCast_IO_UDP.cpp

Go to the documentation of this file.
00001 // $Id: RMCast_IO_UDP.cpp,v 1.1.1.2.2.1 2003/05/05 16:04:55 chad Exp $
00002 
00003 #include "RMCast_IO_UDP.h"
00004 #include "RMCast_UDP_Proxy.h"
00005 #include "RMCast_Module_Factory.h"
00006 
00007 #include "ace/Handle_Set.h"
00008 #include "ace/Reactor.h"
00009 #include "ace/Message_Block.h"
00010 
00011 #if !defined (__ACE_INLINE__)
00012 # include "RMCast_IO_UDP.i"
00013 #endif /* ! __ACE_INLINE__ */
00014 
00015 ACE_RCSID(ace, RMCast_IO_UDP, "RMCast_IO_UDP.cpp,v 1.12 2000/12/20 22:00:33 oci Exp")
00016 
00017 ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void)
00018 {
00019 }
00020 
00021 int
00022 ACE_RMCast_IO_UDP::init (const ACE_INET_Addr &mcast_group,
00023                          const ACE_Addr &local,
00024                          int protocol_family,
00025                          int protocol,
00026                          int reuse_addr)
00027 {
00028   this->mcast_group_ = mcast_group;
00029 
00030   ACE_SOCK_Dgram &dgram = this->dgram_;
00031   return dgram.open (local, protocol_family, protocol, reuse_addr);
00032 }
00033 
00034 int
00035 ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr,
00036                               int reuse_addr,
00037                               const ACE_TCHAR *net_if,
00038                               int protocol_family,
00039                               int protocol)
00040 {
00041   this->mcast_group_ = mcast_addr;
00042   return this->dgram_.subscribe (mcast_addr,
00043                                  reuse_addr,
00044                                  net_if,
00045                                  protocol_family,
00046                                  protocol);
00047 }
00048 
00049 int
00050 ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv)
00051 {
00052   ACE_HANDLE h = this->dgram_.get_handle ();
00053   if (h == ACE_INVALID_HANDLE)
00054     return -1;
00055 
00056   ACE_Handle_Set handle_set;
00057   handle_set.set_bit (h);
00058 
00059   ACE_Countdown_Time countdown (tv);
00060 
00061   int r = ACE_OS::select (int(size_t(h)) + 1,
00062                           handle_set, 0, 0,
00063                           tv);
00064   if (r == -1)
00065     {
00066       if (errno == EINTR)
00067         return 0;
00068       else
00069         return -1;
00070     }
00071   else if (r == 0)
00072     {
00073       return 0;
00074     }
00075 
00076   return this->handle_input (h);
00077 }
00078 
00079 int
00080 ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE)
00081 {
00082   // @@ We should use a system constant instead of this literal
00083   const int max_udp_packet_size = 65536;
00084   char buffer[max_udp_packet_size];
00085 
00086   ACE_INET_Addr from_address;
00087   ssize_t r =
00088     this->dgram_.recv (buffer, sizeof(buffer), from_address);
00089 
00090   if (r == -1)
00091     {
00092       // @@ LOG??
00093       ACE_ERROR ((LM_ERROR,
00094                   "RMCast_IO_UDP::handle_input () - error in recv %p\n",
00095                   ACE_TEXT ("")));
00096       return -1;
00097     }
00098 
00099   // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
00100 
00101   // @@ Locking!
00102 
00103   int type = buffer[0];
00104 
00105   if (type < 0 || type >= ACE_RMCast::MT_LAST)
00106     {
00107       // @@ Log: invalid message type!!
00108       // @@ TODO: should we return -1?  The socket is still valid, it
00109       // makes little sense to destroy it just because one remote
00110       // sender is sending invalid messages. Maybe we should
00111       // strategize this too, and report the problem to the
00112       // application, this could indicate a misconfiguration or
00113       // something worse...
00114 
00115       // In any case the proxy should be destroyed, its peer is making
00116       // something really wrong.
00117       ACE_RMCast_UDP_Proxy *proxy;
00118       if (this->map_.unbind (from_address, proxy) == 0)
00119         {
00120           this->factory_->destroy (proxy->next ());
00121           delete proxy;
00122         }
00123       return 0;
00124     }
00125 
00126   ACE_RMCast_UDP_Proxy *proxy;
00127   if (this->map_.find (from_address, proxy) != 0)
00128     {
00129       //ACE_DEBUG ((LM_DEBUG,
00130       //            "IO_UDP::handle_input - new proxy from <%s:%d>\n",
00131       //            from_address.get_host_addr (),
00132       //            from_address.get_port_number ()));
00133 
00134       // @@ We should validate the message *before* creating the
00135       // object, all we need is some sort of validation strategy, a
00136       // different one for the receiver and another one for the
00137       // sender.
00138 
00139 #if 0
00140       if (type == ACE_RMCast::MT_ACK
00141           || type == ACE_RMCast::MT_JOIN
00142           || type == ACE_RMCast::MT_LEAVE
00143           || type == ACE_RMCast::MT_ACK_LEAVE)
00144         {
00145           // All these message types indicate a problem, the should be
00146           // generated by receivers, not received by them.
00147           return 0;
00148         }
00149 #endif /* 0 */
00150       ACE_RMCast_Module *module = this->factory_->create ();
00151       if (module == 0)
00152         {
00153           // @@ LOG??
00154           // Try to continue working, maybe the module can be created
00155           // later.
00156           return 0;
00157         }
00158       // This is necessary to satisfy the xgcc for Lynx on Solaris
00159       // by including the code directly causes :
00160       // RMCast_IO_UDP.cpp:202: error: internal error--unrecognizable insn:
00161       // (insn 1510 1507 524 (set (mem:SI (plus:SI (reg:SI 28 r28)
00162       //                 (const_int 65536)))
00163       //         (reg:SI 0 r0)) -1 (insn_list 528 (insn_list 1507 (nil)))
00164       //     (nil))
00165       // /usr/lynx/home2/jose/98r2/src/gcc/toplev.c:1489: Internal compiler error in function fatal_insn
00166       // to be thrown at the end of the function.
00167       if ((proxy = allocate_and_bind_proxy(module,from_address)) == 0)
00168         return 0;
00169     }
00170 
00171   // Have the proxy process the message and do the right thing.
00172   if (proxy->receive_message (buffer, r) != 0)
00173     {
00174       (void) this->map_.unbind (from_address);
00175       this->factory_->destroy (proxy->next ());
00176       delete proxy;
00177     }
00178 
00179   return 0;
00180 }
00181 
00182 ACE_HANDLE
00183 ACE_RMCast_IO_UDP::get_handle (void) const
00184 {
00185   return this->dgram_.get_handle ();
00186 }
00187 
00188 int
00189 ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data)
00190 {
00191   return this->send_data (data, this->mcast_group_);
00192 }
00193 
00194 int
00195 ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll)
00196 {
00197   return this->send_poll (poll, this->mcast_group_);
00198 }
00199 
00200 int
00201 ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join)
00202 {
00203   return this->send_ack_join (ack_join, this->mcast_group_);
00204 }
00205 
00206 int
00207 ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
00208 {
00209   return this->send_ack_leave (ack_leave, this->mcast_group_);
00210 }
00211 
00212 int
00213 ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack)
00214 {
00215   return this->send_ack (ack, this->mcast_group_);
00216 }
00217 
00218 int
00219 ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join)
00220 {
00221   return this->send_join (join, this->mcast_group_);
00222 }
00223 
00224 int
00225 ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave)
00226 {
00227   return this->send_leave (leave, this->mcast_group_);
00228 }
00229 
00230 int
00231 ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data,
00232                               const ACE_INET_Addr &to)
00233 {
00234   //ACE_DEBUG ((LM_DEBUG,
00235   //            "IO_UDP::send_data - pushing out to <%s:%d>\n",
00236   //            to.get_host_addr (),
00237   //            to.get_port_number ()));
00238 
00239   // The first message block contains the header
00240   // @@ TODO: We could keep the header pre-initialized, and only
00241   // update the portions that do change...
00242   ACE_UINT32 tmp;
00243   char header[1 + 3 * sizeof(ACE_UINT32)];
00244   header[0] = ACE_RMCast::MT_DATA;
00245 
00246   tmp = ACE_HTONL (data.sequence_number);
00247   ACE_OS::memcpy (header + 1,
00248                   &tmp, sizeof(ACE_UINT32));
00249   tmp = ACE_HTONL (data.total_size);
00250   ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
00251                   &tmp, sizeof(ACE_UINT32));
00252   tmp = ACE_HTONL (data.fragment_offset);
00253   ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
00254                   &tmp, sizeof(ACE_UINT32));
00255 
00256   iovec iov[ACE_IOV_MAX];
00257   int iovcnt = 1;
00258 
00259   iov[0].iov_base = header;
00260   iov[0].iov_len = sizeof(header);
00261 
00262   ACE_Message_Block *mb = data.payload;
00263 
00264   for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
00265     {
00266       iov[iovcnt].iov_base = i->rd_ptr ();
00267       iov[iovcnt].iov_len = ACE_static_cast (u_long, i->length ());
00268       iovcnt++;
00269       if (iovcnt >= IOV_MAX)
00270         return -1;
00271     }
00272 
00273   // @@ This pacing stuff here reduced the number of packet lost in
00274   // loopback tests, but it should be taken out for real applications
00275   // (or at least made configurable!)
00276   ACE_Time_Value tv (0, 10000);
00277   ACE_OS::sleep (tv);
00278 
00279   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00280   ACE_SOCK_Dgram &dgram = this->dgram_;
00281 
00282   if (dgram.send (iov, iovcnt, to) == -1)
00283     return -1;
00284 
00285 #if 0
00286   ACE_HEX_DUMP ((LM_DEBUG,
00287                  (char*)iov[0].iov_base,
00288                  iov[0].iov_len,
00289                  "Sending"));
00290 #endif
00291 
00292   return 0;
00293 }
00294 
00295 int
00296 ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &,
00297                               const ACE_INET_Addr &to)
00298 {
00299   //ACE_DEBUG ((LM_DEBUG,
00300   //            "IO_UDP::send_poll - pushing out to <%s:%d>\n",
00301   //            to.get_host_addr (),
00302   //            to.get_port_number ()));
00303 
00304   // @@ TODO: We could keep the header pre-initialized, and only
00305   // update the portions that do change...
00306   char header[16];
00307   header[0] = ACE_RMCast::MT_POLL;
00308 
00309   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00310   ACE_SOCK_Dgram &dgram = this->dgram_;
00311 
00312   if (dgram.send (header, 1, to) == -1)
00313     return -1;
00314 
00315   return 0;
00316 }
00317 
00318 int
00319 ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join,
00320                                   const ACE_INET_Addr &to)
00321 {
00322   //ACE_DEBUG ((LM_DEBUG,
00323   //            "IO_UDP::send_ack_join - pushing out to <%s:%d>\n",
00324   //            to.get_host_addr (),
00325   //            to.get_port_number ()));
00326 
00327   // @@ TODO: We could keep the header pre-initialized, and only
00328   // update the portions that do change...
00329   char header[16];
00330   header[0] = ACE_RMCast::MT_ACK_JOIN;
00331 
00332   ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number);
00333   ACE_OS::memcpy (header + 1,
00334                   &tmp, sizeof(ACE_UINT32));
00335   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00336   ACE_SOCK_Dgram &dgram = this->dgram_;
00337 
00338   if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1)
00339     return -1;
00340 
00341   return 0;
00342 }
00343 
00344 int
00345 ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &,
00346                                    const ACE_INET_Addr &to)
00347 {
00348   //ACE_DEBUG ((LM_DEBUG,
00349   //            "IO_UDP::send_ack_leave - pushing out to <%s:%d>\n",
00350   //            to.get_host_addr (),
00351   //            to.get_port_number ()));
00352 
00353   // @@ TODO: We could keep the header pre-initialized, and only
00354   // update the portions that do change...
00355   char header[16];
00356   header[0] = ACE_RMCast::MT_ACK_LEAVE;
00357 
00358   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00359   ACE_SOCK_Dgram &dgram = this->dgram_;
00360 
00361   if (dgram.send (header, 1, to) == -1)
00362     return -1;
00363 
00364   return 0;
00365 }
00366 
00367 int
00368 ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
00369                               const ACE_INET_Addr &to)
00370 {
00371   //ACE_DEBUG ((LM_DEBUG,
00372   //            "IO_UDP::send_ack - pushing (%d:%d) out to <%s:%d>\n",
00373   //            ack.next_expected,
00374   //            ack.highest_received,
00375   //            to.get_host_addr (),
00376   //            to.get_port_number ()));
00377 
00378   // @@ TODO: We could keep the header pre-initialized, and only
00379   // update the portions that do change...
00380   char header[16];
00381   header[0] = ACE_RMCast::MT_ACK;
00382 
00383   ACE_UINT32 tmp = ACE_HTONL (ack.next_expected);
00384   ACE_OS::memcpy (header + 1,
00385                   &tmp, sizeof(ACE_UINT32));
00386   tmp = ACE_HTONL (ack.highest_received);
00387   ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
00388                   &tmp, sizeof(ACE_UINT32));
00389 
00390   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00391   ACE_SOCK_Dgram &dgram = this->dgram_;
00392 
00393   if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1)
00394     return -1;
00395 
00396   return 0;
00397 }
00398 
00399 int
00400 ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &,
00401                               const ACE_INET_Addr &to)
00402 {
00403   //ACE_DEBUG ((LM_DEBUG,
00404   //            "IO_UDP::send_join - pushing out to <%s:%d>\n",
00405   //            to.get_host_addr (),
00406   //            to.get_port_number ()));
00407 
00408   // @@ TODO: We could keep the header pre-initialized, and only
00409   // update the portions that do change...
00410   char header[16];
00411   header[0] = ACE_RMCast::MT_JOIN;
00412 
00413   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00414   ACE_SOCK_Dgram &dgram = this->dgram_;
00415 
00416   if (dgram.send (header, 1, to) == -1)
00417     return -1;
00418 
00419   return 0;
00420 }
00421 
00422 int
00423 ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &,
00424                                const ACE_INET_Addr &to)
00425 {
00426   //ACE_DEBUG ((LM_DEBUG,
00427   //            "IO_UDP::send_leave - pushing out to <%s:%d>\n",
00428   //            to.get_host_addr (),
00429   //            to.get_port_number ()));
00430 
00431   // @@ TODO: We could keep the header pre-initialized, and only
00432   // update the portions that do change...
00433   char header[16];
00434   header[0] = ACE_RMCast::MT_LEAVE;
00435 
00436   // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
00437   ACE_SOCK_Dgram &dgram = this->dgram_;
00438 
00439   if (dgram.send (header, 1, to) == -1)
00440     return -1;
00441 
00442   return 0;
00443 }
00444 
00445 
00446 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00447 
00448 template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
00449 template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
00450 template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
00451 template class ACE_Hash_Map_Iterator_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
00452 template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
00453 template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
00454 template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>;
00455 template class ACE_Equal_To<ACE_INET_Addr>;
00456 template class ACE_Hash<ACE_INET_Addr>;
00457 
00458 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Mon Jun 16 13:12:35 2003 for ACE_RMCast by doxygen1.2.14 written by Dimitri van Heesch, © 1997-2002