|
IBR-DTNSuite 0.6
|
00001 #include "net/UDPConvergenceLayer.h" 00002 #include "net/BundleReceivedEvent.h" 00003 #include "net/TransferCompletedEvent.h" 00004 #include "net/TransferAbortedEvent.h" 00005 #include "core/BundleEvent.h" 00006 #include "core/BundleCore.h" 00007 #include "routing/RequeueBundleEvent.h" 00008 00009 #include <ibrdtn/utils/Utils.h> 00010 #include <ibrdtn/data/Serializer.h> 00011 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00012 00013 #include <ibrcommon/net/UnicastSocket.h> 00014 #include <ibrcommon/net/vaddress.h> 00015 #include <ibrcommon/net/vinterface.h> 00016 #include <ibrcommon/data/BLOB.h> 00017 #include <ibrcommon/Logger.h> 00018 #include <ibrcommon/thread/MutexLock.h> 00019 00020 #include <sys/socket.h> 00021 #include <poll.h> 00022 #include <errno.h> 00023 00024 #include <sys/types.h> 00025 #include <netinet/in.h> 00026 #include <arpa/inet.h> 00027 #include <unistd.h> 00028 #include <stdlib.h> 00029 #include <stdio.h> 00030 #include <string.h> 00031 #include <fcntl.h> 00032 #include <limits.h> 00033 00034 #include <iostream> 00035 #include <list> 00036 00037 00038 using namespace dtn::data; 00039 00040 namespace dtn 00041 { 00042 namespace net 00043 { 00044 const int UDPConvergenceLayer::DEFAULT_PORT = 4556; 00045 00046 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, unsigned int mtu) 00047 : _socket(NULL), _net(net), _port(port), m_maxmsgsize(mtu), _running(false) 00048 { 00049 _socket = new ibrcommon::UnicastSocket(); 00050 } 00051 00052 UDPConvergenceLayer::~UDPConvergenceLayer() 00053 { 00054 componentDown(); 00055 delete _socket; 00056 } 00057 00058 dtn::core::Node::Protocol UDPConvergenceLayer::getDiscoveryProtocol() const 00059 { 00060 return dtn::core::Node::CONN_UDPIP; 00061 } 00062 00063 void UDPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException) 00064 { 00065 if (iface == _net) 00066 { 00067 name = "udpcl"; 00068 stringstream service; 00069 00070 try { 00071 std::list<ibrcommon::vaddress> list = _net.getAddresses(ibrcommon::vaddress::VADDRESS_INET); 00072 if (!list.empty()) 00073 { 00074 service << "ip=" << list.front().get(false) << ";port=" << _port << ";"; 00075 } 00076 else 00077 { 00078 service << "port=" << _port << ";"; 00079 } 00080 } catch (const ibrcommon::vinterface::interface_not_set&) { 00081 service << "port=" << _port << ";"; 00082 }; 00083 00084 params = service.str(); 00085 } 00086 else 00087 { 00088 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException(); 00089 } 00090 } 00091 00092 void UDPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00093 { 00094 const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP); 00095 if (uri_list.empty()) 00096 { 00097 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_UNDEFINED); 00098 return; 00099 } 00100 00101 std::stringstream ss; 00102 dtn::data::DefaultSerializer serializer(ss); 00103 00104 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00105 00106 try { 00107 // read the bundle out of the storage 00108 const dtn::data::Bundle bundle = storage.get(job._bundle); 00109 00110 unsigned int size = serializer.getLength(bundle); 00111 00112 if (size > m_maxmsgsize) 00113 { 00114 // TODO: create a fragment of length "size" 00115 throw ConnectionInterruptedException(); 00116 } 00117 00118 serializer << bundle; 00119 string data = ss.str(); 00120 00121 const dtn::core::Node::URI &uri = uri_list.front(); 00122 00123 std::string address = "0.0.0.0"; 00124 unsigned int port = 0; 00125 00126 // read values 00127 uri.decode(address, port); 00128 00129 // get the address of the node 00130 ibrcommon::vaddress addr(address); 00131 00132 // set write lock 00133 ibrcommon::MutexLock l(m_writelock); 00134 00135 // send converted line back to client. 00136 int ret = _socket->send(addr, port, data.c_str(), data.length()); 00137 00138 if (ret == -1) 00139 { 00140 // CL is busy, requeue bundle 00141 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle); 00142 00143 return; 00144 } 00145 00146 // raise bundle event 00147 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00148 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00149 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00150 // send transfer aborted event 00151 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00152 } 00153 00154 } 00155 00156 UDPConvergenceLayer& UDPConvergenceLayer::operator>>(dtn::data::Bundle &bundle) 00157 { 00158 ibrcommon::MutexLock l(m_readlock); 00159 00160 char data[m_maxmsgsize]; 00161 00162 // data waiting 00163 int len = _socket->receive(data, m_maxmsgsize); 00164 00165 if (len > 0) 00166 { 00167 // read all data into a stream 00168 stringstream ss; 00169 ss.write(data, len); 00170 00171 // get the bundle 00172 dtn::data::DefaultDeserializer(ss, dtn::core::BundleCore::getInstance()) >> bundle; 00173 } 00174 00175 return (*this); 00176 } 00177 00178 void UDPConvergenceLayer::componentUp() 00179 { 00180 try { 00181 try { 00182 ibrcommon::UnicastSocket &sock = dynamic_cast<ibrcommon::UnicastSocket&>(*_socket); 00183 sock.bind(_port, _net); 00184 } catch (const std::bad_cast&) { 00185 00186 } 00187 } catch (const ibrcommon::udpsocket::SocketException &ex) { 00188 IBRCOMMON_LOGGER(error) << "Failed to add UDP ConvergenceLayer on " << _net.toString() << ":" << _port << IBRCOMMON_LOGGER_ENDL; 00189 IBRCOMMON_LOGGER(error) << " Error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00190 } 00191 } 00192 00193 void UDPConvergenceLayer::componentDown() 00194 { 00195 _running = false; 00196 _socket->shutdown(); 00197 stop(); 00198 join(); 00199 } 00200 00201 void UDPConvergenceLayer::componentRun() 00202 { 00203 _running = true; 00204 00205 while (_running) 00206 { 00207 try { 00208 dtn::data::Bundle bundle; 00209 (*this) >> bundle; 00210 00211 // determine sender 00212 EID sender; 00213 00214 // increment value in the scope control hop limit block 00215 try { 00216 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00217 schl.increment(); 00218 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00219 00220 // raise default bundle received event 00221 dtn::net::BundleReceivedEvent::raise(sender, bundle); 00222 00223 } catch (const dtn::InvalidDataException &ex) { 00224 IBRCOMMON_LOGGER(warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00225 } catch (const ibrcommon::IOException &ex) { 00226 00227 } 00228 yield(); 00229 } 00230 } 00231 00232 bool UDPConvergenceLayer::__cancellation() 00233 { 00234 // since this is an receiving thread we have to cancel the hard way 00235 return false; 00236 } 00237 00238 const std::string UDPConvergenceLayer::getName() const 00239 { 00240 return "UDPConvergenceLayer"; 00241 } 00242 } 00243 }