• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/UDPConvergenceLayer.cpp

Go to the documentation of this file.
00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "net/TransferAbortedEvent.h"
00006 #include "routing/RequeueBundleEvent.h"
00007 #include <ibrcommon/net/UnicastSocket.h>
00008 #include <ibrcommon/net/BroadcastSocket.h>
00009 #include "core/BundleCore.h"
00010 
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <ibrcommon/Logger.h>
00013 #include <ibrcommon/thread/MutexLock.h>
00014 
00015 #include <ibrdtn/utils/Utils.h>
00016 #include <ibrdtn/data/Serializer.h>
00017 
00018 #include <sys/socket.h>
00019 #include <poll.h>
00020 #include <errno.h>
00021 
00022 #include <sys/types.h>
00023 #include <netinet/in.h>
00024 #include <arpa/inet.h>
00025 #include <unistd.h>
00026 #include <stdlib.h>
00027 #include <stdio.h>
00028 #include <string.h>
00029 #include <fcntl.h>
00030 #include <limits.h>
00031 
00032 #include <iostream>
00033 
00034 
00035 using namespace dtn::data;
00036 
00037 namespace dtn
00038 {
00039         namespace net
00040         {
00041                 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00042 
00043                 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::NetInterface net, int port, bool broadcast, unsigned int mtu)
00044                         : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00045                 {
00046                         if (broadcast)
00047                         {
00048                                 _socket = new ibrcommon::BroadcastSocket();
00049                         }
00050                         else
00051                         {
00052                                 _socket = new ibrcommon::UnicastSocket();
00053                         }
00054                 }
00055 
00056                 UDPConvergenceLayer::~UDPConvergenceLayer()
00057                 {
00058                         componentDown();
00059                         delete _socket;
00060                 }
00061 
00062                 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00063                 {
00064                         return dtn::core::Node::CONN_UDPIP;
00065                 }
00066 
00067                 void UDPConvergenceLayer::update(std::string &name, std::string &params)
00068                 {
00069                         name = "udpcl";
00070 
00071                         stringstream service; service << "ip=" << _net.getAddress() << ";port=" << _port << ";";
00072                         params = service.str();
00073                 }
00074 
00075                 bool UDPConvergenceLayer::onInterface(const ibrcommon::NetInterface &net) const
00076                 {
00077                         if (_net == net) return true;
00078                         return false;
00079                 }
00080 
00081                 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00082                 {
00083                         std::stringstream ss;
00084                         dtn::data::DefaultSerializer serializer(ss);
00085 
00086                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00087 
00088                         try {
00089                                 // read the bundle out of the storage
00090                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00091 
00092                                 unsigned int size = serializer.getLength(bundle);
00093 
00094                                 if (size > m_maxmsgsize)
00095                                 {
00096                                         // TODO: create a fragment of length "size"
00097 
00098 //                              // get the payload block
00099 //                              PayloadBlock *payload = utils::Utils::getPayloadBlock( b );
00100 //                              size_t application_length = payload->getBLOB().getSize();
00101 //
00102 //                              // the new size for the payload
00103 //                              size_t payload_size = m_maxmsgsize;
00104 //
00105 //                              // reduce by the overhead
00106 //                              payload_size -= (size - application_length);
00107 //
00108 //                              // possible size of fragment offset and application data length
00109 //                              payload_size -= 20;
00110 //
00111 //                              // split the payload block
00112 //                              pair<PayloadBlock*, PayloadBlock*> frags = payload->split(payload_size);
00113 //
00114 //                              Bundle frag1 = b;
00115 //                              frag1.clearBlocks();
00116 //                              frag1.addBlock(frags.first);
00117 //
00118 //                              Bundle frag2 = b;
00119 //                              frag2.clearBlocks();
00120 //                              frag2.addBlock(frags.second);
00121 //                              frag2._fragmentoffset += payload_size;
00122 //
00123 //                              if (!(b._procflags & Bundle::FRAGMENT))
00124 //                              {
00125 //                                      frag1._procflags += Bundle::FRAGMENT;
00126 //                                      frag1._appdatalength += application_length;
00127 //                                      frag2._procflags += Bundle::FRAGMENT;
00128 //                                      frag2._appdatalength += application_length;
00129 //                              }
00130 
00131                                 // TODO: transfer the fragment
00132 
00133                                         throw ConnectionInterruptedException();
00134                                 }
00135 
00136                                 serializer << bundle;
00137                                 string data = ss.str();
00138 
00139                                 // get a udp peer
00140                                 ibrcommon::udpsocket::peer p = _socket->getPeer(node.getAddress(), node.getPort());
00141 
00142                                 // set write lock
00143                                 ibrcommon::MutexLock l(m_writelock);
00144 
00145                                 // send converted line back to client.
00146                                 int ret = p.send(data.c_str(), data.length());
00147 
00148                                 if (ret == -1)
00149                                 {
00150                                         // CL is busy, requeue bundle
00151                                         dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00152 
00153                                         return;
00154                                 }
00155 
00156                                 // raise bundle event
00157                                 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00158                                 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00159                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00160                                 // send transfer aborted event
00161                                 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00162                         }
00163 
00164                 }
00165 
00166                 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00167                 {
00168                         ibrcommon::MutexLock l(m_readlock);
00169 
00170                         char data[m_maxmsgsize];
00171 
00172                         // data waiting
00173                         int len = _socket->receive(data, m_maxmsgsize);
00174 
00175                         if (len > 0)
00176                         {
00177                                 // read all data into a stream
00178                                 stringstream ss;
00179                                 ss.write(data, len);
00180 
00181                                 // get the bundle
00182                                 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00183                         }
00184 
00185                         return (*this);
00186                 }
00187 
00188                 void UDPConvergenceLayer::componentUp()
00189                 {
00190                         try {
00191                                 try {
00192                                         ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00193                                         sock.bind(_port, _net);
00194                                 } catch (std::bad_cast) {
00195 
00196                                 }
00197 
00198                                 try {
00199                                         ibrcommon::BroadcastSocket &sock = dynamic_cast<ibrcommon::BroadcastSocket&>(*_socket);
00200                                         sock.bind(_port, _net);
00201                                 } catch (std::bad_cast) {
00202 
00203                                 }
00204 
00205                         } catch (ibrcommon::udpsocket::SocketException ex) {
00206                                 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.getAddress() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00207                                 IBRCOMMON_LOGGER(error) << "      Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00208                         }
00209                 }
00210 
00211                 void UDPConvergenceLayer::componentDown()
00212                 {
00213                         _running = false;
00214                         _socket->shutdown();
00215                         join();
00216                 }
00217 
00218                 void UDPConvergenceLayer::componentRun()
00219                 {
00220                         _running = true;
00221 
00222                         while (_running)
00223                         {
00224                                 try {
00225                                         dtn::data::Bundle bundle;
00226                                         (*this) >> bundle;
00227 
00228                                         // determine sender
00229                                         EID sender;
00230 
00231                                         // raise default bundle received event
00232                                         dtn::net::BundleReceivedEvent::raise(sender, bundle);
00233 
00234                                 } catch (dtn::InvalidDataException ex) {
00235                                         IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00236                                 } catch (ibrcommon::IOException ex) {
00237 
00238                                 }
00239                                 yield();
00240                         }
00241                 }
00242 
00243                 bool UDPConvergenceLayer::__cancellation()
00244                 {
00245                         // since this is an receiving thread we have to cancel the hard way
00246                         return false;
00247                 }
00248 
00249                 const std::string UDPConvergenceLayer::getName() const
00250                 {
00251                         return "UDPConvergenceLayer";
00252                 }
00253         }
00254 }

Generated on Thu Nov 11 2010 09:49:47 for IBR-DTNSuite by  doxygen 1.7.1