|
IBR-DTNSuite 0.6
|
00001 #include "net/UDPConvergenceLayer.h" 00002 #include "net/BundleReceivedEvent.h" 00003 #include "core/BundleEvent.h" 00004 #include "net/TransferCompletedEvent.h" 00005 #include "net/TransferAbortedEvent.h" 00006 #include "routing/RequeueBundleEvent.h" 00007 #include <ibrcommon/net/UnicastSocket.h> 00008 #include <ibrcommon/net/vaddress.h> 00009 #include <ibrcommon/net/vinterface.h> 00010 #include "core/BundleCore.h" 00011 00012 #include <ibrcommon/data/BLOB.h> 00013 #include <ibrcommon/Logger.h> 00014 #include <ibrcommon/thread/MutexLock.h> 00015 00016 #include <ibrdtn/utils/Utils.h> 00017 #include <ibrdtn/data/Serializer.h> 00018 00019 #include <sys/socket.h> 00020 #include <poll.h> 00021 #include <errno.h> 00022 00023 #include <sys/types.h> 00024 #include <netinet/in.h> 00025 #include <arpa/inet.h> 00026 #include <unistd.h> 00027 #include <stdlib.h> 00028 #include <stdio.h> 00029 #include <string.h> 00030 #include <fcntl.h> 00031 #include <limits.h> 00032 00033 #include <iostream> 00034 #include <list> 00035 00036 00037 using namespace dtn::data; 00038 00039 namespace dtn 00040 { 00041 namespace net 00042 { 00043 const int UDPConvergenceLayer::DEFAULT_PORT = 4556; 00044 00045 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu) 00046 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false) 00047 { 00048 _socket = new ibrcommon::UnicastSocket(); 00049 } 00050 00051 UDPConvergenceLayer::~UDPConvergenceLayer() 00052 { 00053 componentDown(); 00054 delete _socket; 00055 } 00056 00057 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const 00058 { 00059 return dtn::core::Node::CONN_UDPIP; 00060 } 00061 00062 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException) 00063 { 00064 if (iface == _net) 00065 { 00066 name = "udpcl"; 00067 stringstream service; 00068 00069 try { 00070 std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET); 00071 if (!list.empty()) 00072 { 00073 service << "ip=" << list.front().get(false) << ";port=" << _port << ";"; 00074 } 00075 else 00076 { 00077 service << "port=" << _port << ";"; 00078 } 00079 } catch (const ibrcommon::vinterface::interface_not_set&) { 00080 service << "port=" << _port << ";"; 00081 }; 00082 00083 params = service.str(); 00084 } 00085 else 00086 { 00087 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException(); 00088 } 00089 } 00090 00091 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00092 { 00093 const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP); 00094 if (uri_list.empty()) 00095 { 00096 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_UNDEFINED); 00097 return; 00098 } 00099 00100 std::stringstream ss; 00101 dtn::data::DefaultSerializer serializer(ss); 00102 00103 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00104 00105 try { 00106 // read the bundle out of the storage 00107 const dtn::data::Bundle bundle = storage.get(job._bundle); 00108 00109 unsigned int size = serializer.getLength(bundle); 00110 00111 if (size > m_maxmsgsize) 00112 { 00113 // TODO: create a fragment of length "size" 00114 throw ConnectionInterruptedException(); 00115 } 00116 00117 serializer << bundle; 00118 string data = ss.str(); 00119 00120 const dtn::core::Node::URI &uri = uri_list.front(); 00121 00122 std::string address = "0.0.0.0"; 00123 unsigned int port = 0; 00124 00125 // read values 00126 uri.decode(address, port); 00127 00128 // get the address of the node 00129 ibrcommon::vaddress addr(address); 00130 00131 // set write lock 00132 ibrcommon::MutexLock l(m_writelock); 00133 00134 // send converted line back to client. 00135 int ret = _socket->send(addr, port, data.c_str(), data.length()); 00136 00137 if (ret == -1) 00138 { 00139 // CL is busy, requeue bundle 00140 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle); 00141 00142 return; 00143 } 00144 00145 // raise bundle event 00146 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00147 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00148 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00149 // send transfer aborted event 00150 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00151 } 00152 00153 } 00154 00155 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle) 00156 { 00157 ibrcommon::MutexLock l(m_readlock); 00158 00159 char data[m_maxmsgsize]; 00160 00161 // data waiting 00162 int len = _socket->receive(data, m_maxmsgsize); 00163 00164 if (len > 0) 00165 { 00166 // read all data into a stream 00167 stringstream ss; 00168 ss.write(data, len); 00169 00170 // get the bundle 00171 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle; 00172 } 00173 00174 return (*this); 00175 } 00176 00177 void UDPConvergenceLayer::componentUp() 00178 { 00179 try { 00180 try { 00181 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket); 00182 sock.bind(_port, _net); 00183 } catch (const std::bad_cast&) { 00184 00185 } 00186 } catch (const ibrcommon::udpsocket::SocketException &ex) { 00187 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL; 00188 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00189 } 00190 } 00191 00192 void UDPConvergenceLayer::componentDown() 00193 { 00194 _running = false; 00195 _socket->shutdown(); 00196 stop(); 00197 join(); 00198 } 00199 00200 void UDPConvergenceLayer::componentRun() 00201 { 00202 _running = true; 00203 00204 while (_running) 00205 { 00206 try { 00207 dtn::data::Bundle bundle; 00208 (*this) >> bundle; 00209 00210 // determine sender 00211 EID sender; 00212 00213 // raise default bundle received event 00214 dtn::net::BundleReceivedEvent::raise(sender, bundle); 00215 00216 } catch (const dtn::InvalidDataException &ex) { 00217 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00218 } catch (const ibrcommon::IOException &ex) { 00219 00220 } 00221 yield(); 00222 } 00223 } 00224 00225 bool UDPConvergenceLayer::__cancellation() 00226 { 00227 // since this is an receiving thread we have to cancel the hard way 00228 return false; 00229 } 00230 00231 const std::string UDPConvergenceLayer::getName() const 00232 { 00233 return "UDPConvergenceLayer"; 00234 } 00235 } 00236 }