00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "routing/RequeueBundleEvent.h"
00006 #include <ibrcommon/net/UnicastSocket.h>
00007 #include <ibrcommon/net/BroadcastSocket.h>
00008 #include "core/BundleCore.h"
00009
00010 #include <ibrcommon/data/BLOB.h>
00011 #include <ibrcommon/Logger.h>
00012 #include <ibrcommon/thread/MutexLock.h>
00013
00014 #include <ibrdtn/utils/Utils.h>
00015 #include <ibrdtn/data/Serializer.h>
00016
00017 #include <sys/socket.h>
00018 #include <poll.h>
00019 #include <errno.h>
00020
00021 #include <sys/types.h>
00022 #include <netinet/in.h>
00023 #include <arpa/inet.h>
00024 #include <unistd.h>
00025 #include <stdlib.h>
00026 #include <stdio.h>
00027 #include <string.h>
00028 #include <fcntl.h>
00029 #include <limits.h>
00030
00031 #include <iostream>
00032
00033
00034 using namespace dtn::data;
00035
00036 namespace dtn
00037 {
00038 namespace net
00039 {
00040 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00041
00042 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::NetInterface net, int port, bool broadcast, unsigned int mtu)
00043 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00044 {
00045 if (broadcast)
00046 {
00047 _socket = new ibrcommon::BroadcastSocket();
00048 }
00049 else
00050 {
00051 _socket = new ibrcommon::UnicastSocket();
00052 }
00053 }
00054
00055 UDPConvergenceLayer::~UDPConvergenceLayer()
00056 {
00057 if (isRunning())
00058 {
00059 componentDown();
00060 }
00061
00062 delete _socket;
00063 }
00064
00065 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00066 {
00067 return dtn::core::Node::CONN_UDPIP;
00068 }
00069
00070 void UDPConvergenceLayer::update(std::string &name, std::string ¶ms)
00071 {
00072 name = "udpcl";
00073
00074 stringstream service; service << "ip=" << _net.getAddress() << ";port=" << _port << ";";
00075 params = service.str();
00076 }
00077
00078 bool UDPConvergenceLayer::onInterface(const ibrcommon::NetInterface &net) const
00079 {
00080 if (_net.getInterface() == net.getInterface()) return true;
00081 return false;
00082 }
00083
00084 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00085 {
00086 std::stringstream ss;
00087 dtn::data::DefaultSerializer serializer(ss);
00088
00089 unsigned int size = serializer.getLength(job._bundle);
00090
00091 if (size > m_maxmsgsize)
00092 {
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130 throw ConnectionInterruptedException();
00131 }
00132
00133 serializer << job._bundle;
00134 string data = ss.str();
00135
00136
00137 ibrcommon::udpsocket::peer p = _socket->getPeer(node.getAddress(), node.getPort());
00138
00139
00140 ibrcommon::MutexLock l(m_writelock);
00141
00142
00143 int ret = p.send(data.c_str(), data.length());
00144
00145 if (ret == -1)
00146 {
00147
00148 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00149
00150 return;
00151 }
00152
00153
00154 dtn::net::TransferCompletedEvent::raise(job._destination, job._bundle);
00155 dtn::core::BundleEvent::raise(job._bundle, dtn::core::BUNDLE_FORWARDED);
00156 }
00157
00158 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00159 {
00160 ibrcommon::MutexLock l(m_readlock);
00161
00162 char data[m_maxmsgsize];
00163
00164
00165 int len = _socket->receive(data, m_maxmsgsize);
00166
00167 if (len > 0)
00168 {
00169
00170 stringstream ss;
00171 ss.write(data, len);
00172
00173
00174 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00175 }
00176
00177 return (*this);
00178 }
00179
00180 void UDPConvergenceLayer::componentUp()
00181 {
00182 try {
00183 try {
00184 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00185 sock.bind(_port, _net);
00186 } catch (std::bad_cast) {
00187
00188 }
00189
00190 try {
00191 ibrcommon::BroadcastSocket &sock = dynamic_cast<ibrcommon::BroadcastSocket&>(*_socket);
00192 sock.bind(_port, _net);
00193 } catch (std::bad_cast) {
00194
00195 }
00196
00197 } catch (ibrcommon::udpsocket::SocketException ex) {
00198 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.getAddress() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00199 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00200 }
00201 }
00202
00203 void UDPConvergenceLayer::componentDown()
00204 {
00205 _running = false;
00206 _socket->shutdown();
00207 join();
00208 }
00209
00210 void UDPConvergenceLayer::componentRun()
00211 {
00212 _running = true;
00213
00214 while (_running)
00215 {
00216 try {
00217 dtn::data::Bundle bundle;
00218 (*this) >> bundle;
00219
00220
00221 EID sender;
00222
00223
00224 dtn::net::BundleReceivedEvent::raise(sender, bundle);
00225
00226 } catch (dtn::InvalidDataException ex) {
00227 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00228 } catch (ibrcommon::IOException ex) {
00229
00230 }
00231 yield();
00232 }
00233 }
00234 }
00235 }