Go to the documentation of this file.00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "net/TransferAbortedEvent.h"
00006 #include "routing/RequeueBundleEvent.h"
00007 #include <ibrcommon/net/UnicastSocket.h>
00008 #include <ibrcommon/net/vaddress.h>
00009 #include <ibrcommon/net/vinterface.h>
00010 #include "core/BundleCore.h"
00011
00012 #include <ibrcommon/data/BLOB.h>
00013 #include <ibrcommon/Logger.h>
00014 #include <ibrcommon/thread/MutexLock.h>
00015
00016 #include <ibrdtn/utils/Utils.h>
00017 #include <ibrdtn/data/Serializer.h>
00018
00019 #include <sys/socket.h>
00020 #include <poll.h>
00021 #include <errno.h>
00022
00023 #include <sys/types.h>
00024 #include <netinet/in.h>
00025 #include <arpa/inet.h>
00026 #include <unistd.h>
00027 #include <stdlib.h>
00028 #include <stdio.h>
00029 #include <string.h>
00030 #include <fcntl.h>
00031 #include <limits.h>
00032
00033 #include <iostream>
00034 #include <list>
00035
00036
00037 using namespace dtn::data;
00038
00039 namespace dtn
00040 {
00041 namespace net
00042 {
00043 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00044
00045 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu)
00046 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00047 {
00048 _socket = new ibrcommon::UnicastSocket();
00049 }
00050
00051 UDPConvergenceLayer::~UDPConvergenceLayer()
00052 {
00053 componentDown();
00054 delete _socket;
00055 }
00056
00057 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00058 {
00059 return dtn::core::Node::CONN_UDPIP;
00060 }
00061
00062 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00063 {
00064 if (iface == _net) throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00065
00066 name = "udpcl";
00067 stringstream service;
00068
00069 try {
00070 std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00071 if (!list.empty())
00072 {
00073 service << "ip=" << list.front().get(false) << ";port=" << _port << ";";
00074 }
00075 else
00076 {
00077 service << "port=" << _port << ";";
00078 }
00079 } catch (const ibrcommon::vinterface::interface_not_set&) {
00080 service << "port=" << _port << ";";
00081 };
00082
00083 params = service.str();
00084 }
00085
00086 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00087 {
00088 std::stringstream ss;
00089 dtn::data::DefaultSerializer serializer(ss);
00090
00091 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00092
00093 try {
00094
00095 const dtn::data::Bundle bundle = storage.get(job._bundle);
00096
00097 unsigned int size = serializer.getLength(bundle);
00098
00099 if (size > m_maxmsgsize)
00100 {
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138 throw ConnectionInterruptedException();
00139 }
00140
00141 serializer << bundle;
00142 string data = ss.str();
00143
00144
00145 ibrcommon::vaddress addr(node.getAddress());
00146
00147
00148 ibrcommon::MutexLock l(m_writelock);
00149
00150
00151 int ret = _socket->send(addr, node.getPort(), data.c_str(), data.length());
00152
00153 if (ret == -1)
00154 {
00155
00156 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00157
00158 return;
00159 }
00160
00161
00162 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00163 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00164 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00165
00166 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00167 }
00168
00169 }
00170
00171 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00172 {
00173 ibrcommon::MutexLock l(m_readlock);
00174
00175 char data[m_maxmsgsize];
00176
00177
00178 int len = _socket->receive(data, m_maxmsgsize);
00179
00180 if (len > 0)
00181 {
00182
00183 stringstream ss;
00184 ss.write(data, len);
00185
00186
00187 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00188 }
00189
00190 return (*this);
00191 }
00192
00193 void UDPConvergenceLayer::componentUp()
00194 {
00195 try {
00196 try {
00197 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00198 sock.bind(_port, _net);
00199 } catch (const std::bad_cast&) {
00200
00201 }
00202 } catch (const ibrcommon::udpsocket::SocketException &ex) {
00203 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00204 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00205 }
00206 }
00207
00208 void UDPConvergenceLayer::componentDown()
00209 {
00210 _running = false;
00211 _socket->shutdown();
00212 join();
00213 }
00214
00215 void UDPConvergenceLayer::componentRun()
00216 {
00217 _running = true;
00218
00219 while (_running)
00220 {
00221 try {
00222 dtn::data::Bundle bundle;
00223 (*this) >> bundle;
00224
00225
00226 EID sender;
00227
00228
00229 dtn::net::BundleReceivedEvent::raise(sender, bundle);
00230
00231 } catch (const dtn::InvalidDataException &ex) {
00232 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00233 } catch (const ibrcommon::IOException &ex) {
00234
00235 }
00236 yield();
00237 }
00238 }
00239
00240 bool UDPConvergenceLayer::__cancellation()
00241 {
00242
00243 return false;
00244 }
00245
00246 const std::string UDPConvergenceLayer::getName() const
00247 {
00248 return "UDPConvergenceLayer";
00249 }
00250 }
00251 }