Go to the documentation of this file.00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "net/TransferAbortedEvent.h"
00006 #include "routing/RequeueBundleEvent.h"
00007 #include <ibrcommon/net/UnicastSocket.h>
00008 #include <ibrcommon/net/vaddress.h>
00009 #include <ibrcommon/net/vinterface.h>
00010 #include "core/BundleCore.h"
00011
00012 #include <ibrcommon/data/BLOB.h>
00013 #include <ibrcommon/Logger.h>
00014 #include <ibrcommon/thread/MutexLock.h>
00015
00016 #include <ibrdtn/utils/Utils.h>
00017 #include <ibrdtn/data/Serializer.h>
00018
00019 #include <sys/socket.h>
00020 #include <poll.h>
00021 #include <errno.h>
00022
00023 #include <sys/types.h>
00024 #include <netinet/in.h>
00025 #include <arpa/inet.h>
00026 #include <unistd.h>
00027 #include <stdlib.h>
00028 #include <stdio.h>
00029 #include <string.h>
00030 #include <fcntl.h>
00031 #include <limits.h>
00032
00033 #include <iostream>
00034 #include <list>
00035
00036
00037 using namespace dtn::data;
00038
00039 namespace dtn
00040 {
00041 namespace net
00042 {
00043 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00044
00045 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu)
00046 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00047 {
00048 _socket = new ibrcommon::UnicastSocket();
00049 }
00050
00051 UDPConvergenceLayer::~UDPConvergenceLayer()
00052 {
00053 componentDown();
00054 delete _socket;
00055 }
00056
00057 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00058 {
00059 return dtn::core::Node::CONN_UDPIP;
00060 }
00061
00062 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00063 {
00064 if (iface == _net)
00065 {
00066 name = "udpcl";
00067 stringstream service;
00068
00069 try {
00070 std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00071 if (!list.empty())
00072 {
00073 service << "ip=" << list.front().get(false) << ";port=" << _port << ";";
00074 }
00075 else
00076 {
00077 service << "port=" << _port << ";";
00078 }
00079 } catch (const ibrcommon::vinterface::interface_not_set&) {
00080 service << "port=" << _port << ";";
00081 };
00082
00083 params = service.str();
00084 }
00085 else
00086 {
00087 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00088 }
00089 }
00090
00091 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00092 {
00093 std::stringstream ss;
00094 dtn::data::DefaultSerializer serializer(ss);
00095
00096 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00097
00098 try {
00099
00100 const dtn::data::Bundle bundle = storage.get(job._bundle);
00101
00102 unsigned int size = serializer.getLength(bundle);
00103
00104 if (size > m_maxmsgsize)
00105 {
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143 throw ConnectionInterruptedException();
00144 }
00145
00146 serializer << bundle;
00147 string data = ss.str();
00148
00149
00150 ibrcommon::vaddress addr(node.getAddress());
00151
00152
00153 ibrcommon::MutexLock l(m_writelock);
00154
00155
00156 int ret = _socket->send(addr, node.getPort(), data.c_str(), data.length());
00157
00158 if (ret == -1)
00159 {
00160
00161 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00162
00163 return;
00164 }
00165
00166
00167 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00168 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00169 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00170
00171 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00172 }
00173
00174 }
00175
00176 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00177 {
00178 ibrcommon::MutexLock l(m_readlock);
00179
00180 char data[m_maxmsgsize];
00181
00182
00183 int len = _socket->receive(data, m_maxmsgsize);
00184
00185 if (len > 0)
00186 {
00187
00188 stringstream ss;
00189 ss.write(data, len);
00190
00191
00192 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00193 }
00194
00195 return (*this);
00196 }
00197
00198 void UDPConvergenceLayer::componentUp()
00199 {
00200 try {
00201 try {
00202 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00203 sock.bind(_port, _net);
00204 } catch (const std::bad_cast&) {
00205
00206 }
00207 } catch (const ibrcommon::udpsocket::SocketException &ex) {
00208 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00209 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00210 }
00211 }
00212
00213 void UDPConvergenceLayer::componentDown()
00214 {
00215 _running = false;
00216 _socket->shutdown();
00217 stop();
00218 join();
00219 }
00220
00221 void UDPConvergenceLayer::componentRun()
00222 {
00223 _running = true;
00224
00225 while (_running)
00226 {
00227 try {
00228 dtn::data::Bundle bundle;
00229 (*this) >> bundle;
00230
00231
00232 EID sender;
00233
00234
00235 dtn::net::BundleReceivedEvent::raise(sender, bundle);
00236
00237 } catch (const dtn::InvalidDataException &ex) {
00238 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00239 } catch (const ibrcommon::IOException &ex) {
00240
00241 }
00242 yield();
00243 }
00244 }
00245
00246 bool UDPConvergenceLayer::__cancellation()
00247 {
00248
00249 return false;
00250 }
00251
00252 const std::string UDPConvergenceLayer::getName() const
00253 {
00254 return "UDPConvergenceLayer";
00255 }
00256 }
00257 }