• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/UDPConvergenceLayer.cpp

Go to the documentation of this file.
00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "net/TransferAbortedEvent.h"
00006 #include "routing/RequeueBundleEvent.h"
00007 #include <ibrcommon/net/UnicastSocket.h>
00008 #include <ibrcommon/net/vaddress.h>
00009 #include <ibrcommon/net/vinterface.h>
00010 #include "core/BundleCore.h"
00011 
00012 #include <ibrcommon/data/BLOB.h>
00013 #include <ibrcommon/Logger.h>
00014 #include <ibrcommon/thread/MutexLock.h>
00015 
00016 #include <ibrdtn/utils/Utils.h>
00017 #include <ibrdtn/data/Serializer.h>
00018 
00019 #include <sys/socket.h>
00020 #include <poll.h>
00021 #include <errno.h>
00022 
00023 #include <sys/types.h>
00024 #include <netinet/in.h>
00025 #include <arpa/inet.h>
00026 #include <unistd.h>
00027 #include <stdlib.h>
00028 #include <stdio.h>
00029 #include <string.h>
00030 #include <fcntl.h>
00031 #include <limits.h>
00032 
00033 #include <iostream>
00034 #include <list>
00035 
00036 
00037 using namespace dtn::data;
00038 
00039 namespace dtn
00040 {
00041         namespace net
00042         {
00043                 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00044 
00045                 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu)
00046                         : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00047                 {
00048                         _socket = new ibrcommon::UnicastSocket();
00049                 }
00050 
00051                 UDPConvergenceLayer::~UDPConvergenceLayer()
00052                 {
00053                         componentDown();
00054                         delete _socket;
00055                 }
00056 
00057                 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00058                 {
00059                         return dtn::core::Node::CONN_UDPIP;
00060                 }
00061 
00062                 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string &params) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00063                 {
00064                         if (iface == _net)
00065                         {
00066                                 name = "udpcl";
00067                                 stringstream service;
00068 
00069                                 try {
00070                                         std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00071                                         if (!list.empty())
00072                                         {
00073                                                  service << "ip=" << list.front().get(false) << ";port=" << _port << ";";
00074                                         }
00075                                         else
00076                                         {
00077                                                 service << "port=" << _port << ";";
00078                                         }
00079                                 } catch (const ibrcommon::vinterface::interface_not_set&) {
00080                                         service << "port=" << _port << ";";
00081                                 };
00082 
00083                                 params = service.str();
00084                         }
00085                         else
00086                         {
00087                                  throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00088                         }
00089                 }
00090 
00091                 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00092                 {
00093                         std::stringstream ss;
00094                         dtn::data::DefaultSerializer serializer(ss);
00095 
00096                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00097 
00098                         try {
00099                                 // read the bundle out of the storage
00100                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00101 
00102                                 unsigned int size = serializer.getLength(bundle);
00103 
00104                                 if (size > m_maxmsgsize)
00105                                 {
00106                                         // TODO: create a fragment of length "size"
00107 
00108 //                              // get the payload block
00109 //                              PayloadBlock *payload = utils::Utils::getPayloadBlock( b );
00110 //                              size_t application_length = payload->getBLOB().getSize();
00111 //
00112 //                              // the new size for the payload
00113 //                              size_t payload_size = m_maxmsgsize;
00114 //
00115 //                              // reduce by the overhead
00116 //                              payload_size -= (size - application_length);
00117 //
00118 //                              // possible size of fragment offset and application data length
00119 //                              payload_size -= 20;
00120 //
00121 //                              // split the payload block
00122 //                              pair<PayloadBlock*, PayloadBlock*> frags = payload->split(payload_size);
00123 //
00124 //                              Bundle frag1 = b;
00125 //                              frag1.clearBlocks();
00126 //                              frag1.addBlock(frags.first);
00127 //
00128 //                              Bundle frag2 = b;
00129 //                              frag2.clearBlocks();
00130 //                              frag2.addBlock(frags.second);
00131 //                              frag2._fragmentoffset += payload_size;
00132 //
00133 //                              if (!(b._procflags & Bundle::FRAGMENT))
00134 //                              {
00135 //                                      frag1._procflags += Bundle::FRAGMENT;
00136 //                                      frag1._appdatalength += application_length;
00137 //                                      frag2._procflags += Bundle::FRAGMENT;
00138 //                                      frag2._appdatalength += application_length;
00139 //                              }
00140 
00141                                 // TODO: transfer the fragment
00142 
00143                                         throw ConnectionInterruptedException();
00144                                 }
00145 
00146                                 serializer << bundle;
00147                                 string data = ss.str();
00148 
00149                                 // get the address of the node
00150                                 ibrcommon::vaddress addr(node.getAddress());
00151 
00152                                 // set write lock
00153                                 ibrcommon::MutexLock l(m_writelock);
00154 
00155                                 // send converted line back to client.
00156                                 int ret = _socket->send(addr, node.getPort(), data.c_str(), data.length());
00157 
00158                                 if (ret == -1)
00159                                 {
00160                                         // CL is busy, requeue bundle
00161                                         dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00162 
00163                                         return;
00164                                 }
00165 
00166                                 // raise bundle event
00167                                 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00168                                 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00169                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00170                                 // send transfer aborted event
00171                                 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00172                         }
00173 
00174                 }
00175 
00176                 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00177                 {
00178                         ibrcommon::MutexLock l(m_readlock);
00179 
00180                         char data[m_maxmsgsize];
00181 
00182                         // data waiting
00183                         int len = _socket->receive(data, m_maxmsgsize);
00184 
00185                         if (len > 0)
00186                         {
00187                                 // read all data into a stream
00188                                 stringstream ss;
00189                                 ss.write(data, len);
00190 
00191                                 // get the bundle
00192                                 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00193                         }
00194 
00195                         return (*this);
00196                 }
00197 
00198                 void UDPConvergenceLayer::componentUp()
00199                 {
00200                         try {
00201                                 try {
00202                                         ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00203                                         sock.bind(_port, _net);
00204                                 } catch (const std::bad_cast&) {
00205 
00206                                 }
00207                         } catch (const ibrcommon::udpsocket::SocketException &ex) {
00208                                 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00209                                 IBRCOMMON_LOGGER(error) << "      Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00210                         }
00211                 }
00212 
00213                 void UDPConvergenceLayer::componentDown()
00214                 {
00215                         _running = false;
00216                         _socket->shutdown();
00217                         stop();
00218                         join();
00219                 }
00220 
00221                 void UDPConvergenceLayer::componentRun()
00222                 {
00223                         _running = true;
00224 
00225                         while (_running)
00226                         {
00227                                 try {
00228                                         dtn::data::Bundle bundle;
00229                                         (*this) >> bundle;
00230 
00231                                         // determine sender
00232                                         EID sender;
00233 
00234                                         // raise default bundle received event
00235                                         dtn::net::BundleReceivedEvent::raise(sender, bundle);
00236 
00237                                 } catch (const dtn::InvalidDataException &ex) {
00238                                         IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00239                                 } catch (const ibrcommon::IOException &ex) {
00240 
00241                                 }
00242                                 yield();
00243                         }
00244                 }
00245 
00246                 bool UDPConvergenceLayer::__cancellation()
00247                 {
00248                         // since this is an receiving thread we have to cancel the hard way
00249                         return false;
00250                 }
00251 
00252                 const std::string UDPConvergenceLayer::getName() const
00253                 {
00254                         return "UDPConvergenceLayer";
00255                 }
00256         }
00257 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1