Go to the documentation of this file.00001 #include "net/UDPConvergenceLayer.h"
00002 #include "net/BundleReceivedEvent.h"
00003 #include "core/BundleEvent.h"
00004 #include "net/TransferCompletedEvent.h"
00005 #include "net/TransferAbortedEvent.h"
00006 #include "routing/RequeueBundleEvent.h"
00007 #include <ibrcommon/net/UnicastSocket.h>
00008 #include <ibrcommon/net/BroadcastSocket.h>
00009 #include "core/BundleCore.h"
00010
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <ibrcommon/Logger.h>
00013 #include <ibrcommon/thread/MutexLock.h>
00014
00015 #include <ibrdtn/utils/Utils.h>
00016 #include <ibrdtn/data/Serializer.h>
00017
00018 #include <sys/socket.h>
00019 #include <poll.h>
00020 #include <errno.h>
00021
00022 #include <sys/types.h>
00023 #include <netinet/in.h>
00024 #include <arpa/inet.h>
00025 #include <unistd.h>
00026 #include <stdlib.h>
00027 #include <stdio.h>
00028 #include <string.h>
00029 #include <fcntl.h>
00030 #include <limits.h>
00031
00032 #include <iostream>
00033
00034
00035 using namespace dtn::data;
00036
00037 namespace dtn
00038 {
00039 namespace net
00040 {
00041 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
00042
00043 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::NetInterface net, int port, bool broadcast, unsigned int mtu)
00044 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
00045 {
00046 if (broadcast)
00047 {
00048 _socket = new ibrcommon::BroadcastSocket();
00049 }
00050 else
00051 {
00052 _socket = new ibrcommon::UnicastSocket();
00053 }
00054 }
00055
00056 UDPConvergenceLayer::~UDPConvergenceLayer()
00057 {
00058 componentDown();
00059 delete _socket;
00060 }
00061
00062 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const
00063 {
00064 return dtn::core::Node::CONN_UDPIP;
00065 }
00066
00067 void UDPConvergenceLayer::update(std::string &name, std::string ¶ms)
00068 {
00069 name = "udpcl";
00070
00071 stringstream service; service << "ip=" << _net.getAddress() << ";port=" << _port << ";";
00072 params = service.str();
00073 }
00074
00075 bool UDPConvergenceLayer::onInterface(const ibrcommon::NetInterface &net) const
00076 {
00077 if (_net == net) return true;
00078 return false;
00079 }
00080
00081 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00082 {
00083 std::stringstream ss;
00084 dtn::data::DefaultSerializer serializer(ss);
00085
00086 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00087
00088 try {
00089
00090 const dtn::data::Bundle bundle = storage.get(job._bundle);
00091
00092 unsigned int size = serializer.getLength(bundle);
00093
00094 if (size > m_maxmsgsize)
00095 {
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133 throw ConnectionInterruptedException();
00134 }
00135
00136 serializer << bundle;
00137 string data = ss.str();
00138
00139
00140 ibrcommon::udpsocket::peer p = _socket->getPeer(node.getAddress(), node.getPort());
00141
00142
00143 ibrcommon::MutexLock l(m_writelock);
00144
00145
00146 int ret = p.send(data.c_str(), data.length());
00147
00148 if (ret == -1)
00149 {
00150
00151 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00152
00153 return;
00154 }
00155
00156
00157 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00158 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00159 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00160
00161 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00162 }
00163
00164 }
00165
00166 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle)
00167 {
00168 ibrcommon::MutexLock l(m_readlock);
00169
00170 char data[m_maxmsgsize];
00171
00172
00173 int len = _socket->receive(data, m_maxmsgsize);
00174
00175 if (len > 0)
00176 {
00177
00178 stringstream ss;
00179 ss.write(data, len);
00180
00181
00182 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle;
00183 }
00184
00185 return (*this);
00186 }
00187
00188 void UDPConvergenceLayer::componentUp()
00189 {
00190 try {
00191 try {
00192 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket);
00193 sock.bind(_port, _net);
00194 } catch (std::bad_cast) {
00195
00196 }
00197
00198 try {
00199 ibrcommon::BroadcastSocket &sock = dynamic_cast<ibrcommon::BroadcastSocket&>(*_socket);
00200 sock.bind(_port, _net);
00201 } catch (std::bad_cast) {
00202
00203 }
00204
00205 } catch (ibrcommon::udpsocket::SocketException ex) {
00206 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.getAddress() << ":" << _port << IBRCOMMON_LOGGER_ENDL;
00207 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00208 }
00209 }
00210
00211 void UDPConvergenceLayer::componentDown()
00212 {
00213 _running = false;
00214 _socket->shutdown();
00215 join();
00216 }
00217
00218 void UDPConvergenceLayer::componentRun()
00219 {
00220 _running = true;
00221
00222 while (_running)
00223 {
00224 try {
00225 dtn::data::Bundle bundle;
00226 (*this) >> bundle;
00227
00228
00229 EID sender;
00230
00231
00232 dtn::net::BundleReceivedEvent::raise(sender, bundle);
00233
00234 } catch (dtn::InvalidDataException ex) {
00235 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00236 } catch (ibrcommon::IOException ex) {
00237
00238 }
00239 yield();
00240 }
00241 }
00242
00243 bool UDPConvergenceLayer::__cancellation()
00244 {
00245
00246 return false;
00247 }
00248
00249 const std::string UDPConvergenceLayer::getName() const
00250 {
00251 return "UDPConvergenceLayer";
00252 }
00253 }
00254 }