IBR-DTNSuite 0.6

daemon/src/net/UDPConvergenceLayer.cpp

Go to the documentation of this file.
00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "net/TransferCompletedEvent.h"
00004 #include "net/TransferAbortedEvent.h"
00005 #include "core/BundleEvent.h"
00006 #include "core/BundleCore.h"
00007 #include "routing/RequeueBundleEvent.h"
00008 
00009 #include <ibrdtn/utils/Utils.h>
00010 #include <ibrdtn/data/Serializer.h>
00011 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00012 
00013 #include <ibrcommon/net/UnicastSocket.h>
00014 #include <ibrcommon/net/vaddress.h>
00015 #include <ibrcommon/net/vinterface.h>
00016 #include <ibrcommon/data/BLOB.h>
00017 #include <ibrcommon/Logger.h>
00018 #include <ibrcommon/thread/MutexLock.h>
00019 
00020 #include <sys/socket.h>
00021 #include <poll.h>
00022 #include <errno.h>
00023 
00024 #include <sys/types.h>
00025 #include <netinet/in.h>
00026 #include <arpa/inet.h>
00027 #include <unistd.h>
00028 #include <stdlib.h>
00029 #include <stdio.h>
00030 #include <string.h>
00031 #include <fcntl.h>
00032 #include <limits.h>
00033 
00034 #include <iostream>
00035 #include <list>
00036 
00037 
00038 using namespace dtn::data;
00039 
00040 namespace dtn
00041 {
00042         namespace net
00043         {
00044                 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00045 
00046                 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu)
00047                         : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00048                 {
00049                         _socket = new ibrcommon::UnicastSocket();
00050                 }
00051 
00052                 UDPConvergenceLayer::~UDPConvergenceLayer()
00053                 {
00054                         componentDown();
00055                         delete _socket;
00056                 }
00057 
00058                 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00059                 {
00060                         return dtn::core::Node::CONN_UDPIP;
00061                 }
00062 
00063                 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string &params) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00064                 {
00065                         if (iface == _net)
00066                         {
00067                                 name = "udpcl";
00068                                 stringstream service;
00069 
00070                                 try {
00071                                         std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00072                                         if (!list.empty())
00073                                         {
00074                                                  service << "ip=" << list.front().get(false) << ";port=" << _port << ";";
00075                                         }
00076                                         else
00077                                         {
00078                                                 service << "port=" << _port << ";";
00079                                         }
00080                                 } catch (const ibrcommon::vinterface::interface_not_set&) {
00081                                         service << "port=" << _port << ";";
00082                                 };
00083 
00084                                 params = service.str();
00085                         }
00086                         else
00087                         {
00088                                  throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00089                         }
00090                 }
00091 
00092                 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00093                 {
00094                         const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP);
00095                         if (uri_list.empty())
00096                         {
00097                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_UNDEFINED);
00098                                 return;
00099                         }
00100 
00101                         std::stringstream ss;
00102                         dtn::data::DefaultSerializer serializer(ss);
00103 
00104                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00105 
00106                         try {
00107                                 // read the bundle out of the storage
00108                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00109 
00110                                 unsigned int size = serializer.getLength(bundle);
00111 
00112                                 if (size > m_maxmsgsize)
00113                                 {
00114                                         // TODO: create a fragment of length "size"
00115                                         throw ConnectionInterruptedException();
00116                                 }
00117 
00118                                 serializer << bundle;
00119                                 string data = ss.str();
00120 
00121                                 const dtn::core::Node::URI &uri = uri_list.front();
00122 
00123                                 std::string address = "0.0.0.0";
00124                                 unsigned int port = 0;
00125 
00126                                 // read values
00127                                 uri.decode(address, port);
00128 
00129                                 // get the address of the node
00130                                 ibrcommon::vaddress addr(address);
00131 
00132                                 // set write lock
00133                                 ibrcommon::MutexLock l(m_writelock);
00134 
00135                                 // send converted line back to client.
00136                                 int ret = _socket->send(addr, port, data.c_str(), data.length());
00137 
00138                                 if (ret == -1)
00139                                 {
00140                                         // CL is busy, requeue bundle
00141                                         dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00142 
00143                                         return;
00144                                 }
00145 
00146                                 // raise bundle event
00147                                 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00148                                 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00149                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00150                                 // send transfer aborted event
00151                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00152                         }
00153 
00154                 }
00155 
00156                 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00157                 {
00158                         ibrcommon::MutexLock l(m_readlock);
00159 
00160                         char data[m_maxmsgsize];
00161 
00162                         // data waiting
00163                         int len = _socket->receive(data, m_maxmsgsize);
00164 
00165                         if (len > 0)
00166                         {
00167                                 // read all data into a stream
00168                                 stringstream ss;
00169                                 ss.write(data, len);
00170 
00171                                 // get the bundle
00172                                 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00173                         }
00174 
00175                         return (*this);
00176                 }
00177 
00178                 void UDPConvergenceLayer::componentUp()
00179                 {
00180                         try {
00181                                 try {
00182                                         ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00183                                         sock.bind(_port, _net);
00184                                 } catch (const std::bad_cast&) {
00185 
00186                                 }
00187                         } catch (const ibrcommon::udpsocket::SocketException &ex) {
00188                                 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00189                                 IBRCOMMON_LOGGER(error) << "      Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00190                         }
00191                 }
00192 
00193                 void UDPConvergenceLayer::componentDown()
00194                 {
00195                         _running = false;
00196                         _socket->shutdown();
00197                         stop();
00198                         join();
00199                 }
00200 
00201                 void UDPConvergenceLayer::componentRun()
00202                 {
00203                         _running = true;
00204 
00205                         while (_running)
00206                         {
00207                                 try {
00208                                         dtn::data::Bundle bundle;
00209                                         (*this) >> bundle;
00210 
00211                                         // determine sender
00212                                         EID sender;
00213 
00214                                         // increment value in the scope control hop limit block
00215                                         try {
00216                                                 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00217                                                 schl.increment();
00218                                         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00219 
00220                                         // raise default bundle received event
00221                                         dtn::net::BundleReceivedEvent::raise(sender, bundle);
00222 
00223                                 } catch (const dtn::InvalidDataException &ex) {
00224                                         IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00225                                 } catch (const ibrcommon::IOException &ex) {
00226 
00227                                 }
00228                                 yield();
00229                         }
00230                 }
00231 
00232                 bool UDPConvergenceLayer::__cancellation()
00233                 {
00234                         // since this is an receiving thread we have to cancel the hard way
00235                         return false;
00236                 }
00237 
00238                 const std::string UDPConvergenceLayer::getName() const
00239                 {
00240                         return "UDPConvergenceLayer";
00241                 }
00242         }
00243 }