IBR-DTN  1.0.0
UDPConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * UDPConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
25 #include "core/BundleEvent.h"
26 #include "core/BundleCore.h"
27 #include "Configuration.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/Serializer.h>
31 
32 #include <ibrcommon/net/socket.h>
33 #include <ibrcommon/net/vaddress.h>
34 #include <ibrcommon/net/vinterface.h>
35 #include <ibrcommon/data/BLOB.h>
36 #include <ibrcommon/Logger.h>
37 #include <ibrcommon/thread/MutexLock.h>
38 
39 #include <sys/types.h>
40 #include <unistd.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <fcntl.h>
45 #include <limits.h>
46 
47 #include <iostream>
48 #include <list>
49 #include <vector>
50 
51 
52 using namespace dtn::data;
53 
54 namespace dtn
55 {
56  namespace net
57  {
58  const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
59 
60  UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, dtn::data::Length mtu)
61  : _net(net), _port(port), m_maxmsgsize(mtu), _running(false), _stats_in(0), _stats_out(0)
62  {
63  }
64 
66  {
67  componentDown();
68  }
69 
71  {
72  _stats_in = 0;
73  _stats_out = 0;
74  }
75 
77  {
78  std::stringstream ss_format;
79 
80  static const std::string IN_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|in";
81  static const std::string OUT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|out";
82 
83  ss_format << _stats_in;
84  data[IN_TAG] = ss_format.str();
85  ss_format.str("");
86 
87  ss_format << _stats_out;
88  data[OUT_TAG] = ss_format.str();
89  }
90 
92  {
94  }
95 
97  {
98  // announce port only if we are bound to any interface
99  if (_net.isAny()) {
100  std::stringstream service;
101  // ... set the port only
102  service << "port=" << _port << ";";
103  announcement.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
104  return;
105  }
106 
107  // do not announce if this is not our interface
109 
110  // determine if we should enable crosslayer discovery by sending out our own address
112 
113  // this marker should set to true if we added an service description
114  bool announced = false;
115 
116  try {
117  // check if cross layer discovery is disabled
118  if (!crosslayer) throw ibrcommon::Exception("crosslayer discovery disabled!");
119 
120  // get all global addresses of the interface
121  std::list<ibrcommon::vaddress> list = _net.getAddresses();
122 
123  // if no address is returned... (goto catch block)
124  if (list.empty()) throw ibrcommon::Exception("no address found");
125 
126  for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
127  {
128  const ibrcommon::vaddress &addr = (*addr_it);
129 
130  // only announce global scope addresses
131  if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL) continue;
132 
133  try {
134  // do not announce non-IP addresses
135  sa_family_t f = addr.family();
136  if ((f != AF_INET) && (f != AF_INET6)) continue;
137 
138  std::stringstream service;
139  // fill in the ip address
140  service << "ip=" << addr.address() << ";port=" << _port << ";";
141  announcement.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
142 
143  // set the announce mark
144  announced = true;
145  } catch (const ibrcommon::vaddress::address_exception &ex) {
146  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
147  }
148  }
149  } catch (const ibrcommon::Exception&) {
150  // address collection process aborted
151  }
152 
153  // if we still not announced anything...
154  if (!announced) {
155  // announce at least our local port
156  std::stringstream service;
157  service << "port=" << _port << ";";
158  announcement.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
159  }
160  }
161 
163  {
164  const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP);
165  if (uri_list.empty())
166  {
167  dtn::net::BundleTransfer local_job = job;
169  return;
170  }
171 
173 
174  const dtn::core::Node::URI &uri = uri_list.front();
175 
176  std::string address = "0.0.0.0";
177  unsigned int port = 0;
178 
179  // read values
180  uri.decode(address, port);
181 
182  // get the address of the node
183  ibrcommon::vaddress addr(address, port);
184 
185  try {
186  // read the bundle out of the storage
187  dtn::data::Bundle bundle = storage.get(job.getBundle());
188 
189  // create a filter context
190  dtn::core::FilterContext context;
191  context.setPeer(node.getEID());
193 
194  // push bundle through the filter routines
195  context.setBundle(bundle);
197 
198  switch (ret) {
199  case BundleFilter::ACCEPT:
200  break;
201  case BundleFilter::REJECT:
202  case BundleFilter::DROP:
203  dtn::net::BundleTransfer local_job = job;
205  return;
206  }
207 
208  // build the dictionary for EID lookup
209  const dtn::data::Dictionary dict(bundle);
210 
211  // create a default serializer
212  dtn::data::DefaultSerializer dummy(std::cout, dict);
213 
214  // get the encoded length of the primary block
215  size_t header = dummy.getLength((const PrimaryBlock&)bundle);
216  header += 20; // two times SDNV through fragmentation
217 
218  dtn::data::Length size = dummy.getLength(bundle);
219 
220  if (size > m_maxmsgsize)
221  {
222  // abort transmission if fragmentation is disabled
223  if (!dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
225  {
227  }
228 
229  const size_t psize = bundle.find<dtn::data::PayloadBlock>().getLength();
230  const size_t fragment_size = m_maxmsgsize - header;
231  const size_t fragment_count = (psize / fragment_size) + (((psize % fragment_size) > 0) ? 1 : 0);
232 
233  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 30) << "MTU of " << m_maxmsgsize << " is too small to carry " << psize << " bytes of payload." << IBRCOMMON_LOGGER_ENDL;
234  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 30) << "create " << fragment_count << " fragments with " << fragment_size << " bytes each." << IBRCOMMON_LOGGER_ENDL;
235 
236  for (size_t i = 0; i < fragment_count; ++i)
237  {
238  dtn::data::BundleFragment fragment(bundle, i * fragment_size, fragment_size);
239 
240  std::stringstream ss;
241  dtn::data::DefaultSerializer serializer(ss);
242 
243  serializer << fragment;
244  std::string data = ss.str();
245 
246  // send out the bundle data
247  send(addr, data);
248  }
249  }
250  else
251  {
252  std::stringstream ss;
253  dtn::data::DefaultSerializer serializer(ss);
254 
255  serializer << bundle;
256  std::string data = ss.str();
257 
258  // send out the bundle data
259  send(addr, data);
260  }
261 
262  // success - raise bundle event
263  dtn::net::BundleTransfer local_job = job;
264  local_job.complete();
265  } catch (const dtn::storage::NoBundleFoundException&) {
266  // send transfer aborted event
267  dtn::net::BundleTransfer local_job = job;
269  } catch (const ibrcommon::socket_exception&) {
270  // CL is busy, requeue bundle
271  } catch (const NoAddressFoundException &ex) {
272  // no connection available
273  dtn::net::BundleTransfer local_job = job;
275  }
276  }
277 
278  void UDPConvergenceLayer::send(const ibrcommon::vaddress &addr, const std::string &data) throw (ibrcommon::socket_exception, NoAddressFoundException)
279  {
280  // set write lock
281  ibrcommon::MutexLock l(m_writelock);
282 
283  // get the first global scope socket
284  ibrcommon::socketset socks = _vsocket.getAll();
285  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
286  ibrcommon::udpsocket &sock = dynamic_cast<ibrcommon::udpsocket&>(**iter);
287 
288  // send converted line back to client.
289  sock.sendto(data.c_str(), data.length(), 0, addr);
290 
291  // add statistic data
292  _stats_out += data.length();
293 
294  // success
295  return;
296  }
297 
298  // failure
299  throw NoAddressFoundException("no valid address found");
300  }
301 
302  void UDPConvergenceLayer::receive(dtn::data::Bundle &bundle, dtn::data::EID &sender) throw (ibrcommon::socket_exception, dtn::InvalidDataException)
303  {
304  ibrcommon::MutexLock l(m_readlock);
305 
306  std::vector<char> data(m_maxmsgsize);
307 
308  // data waiting
309  ibrcommon::socketset readfds;
310 
311  // wait for incoming messages
312  _vsocket.select(&readfds, NULL, NULL, NULL);
313 
314  if (readfds.size() > 0) {
315  ibrcommon::datagramsocket *sock = static_cast<ibrcommon::datagramsocket*>(*readfds.begin());
316 
317  ibrcommon::vaddress fromaddr;
318  size_t len = sock->recvfrom(&data[0], m_maxmsgsize, 0, fromaddr);
319 
320  // add statistic data
321  _stats_in += len;
322 
323  std::stringstream ss; ss << "udp://" << fromaddr.toString();
324  sender = dtn::data::EID(ss.str());
325 
326  if (len > 0)
327  {
328  // read all data into a stream
329  stringstream ss;
330  ss.write(&data[0], len);
331 
332  // get the bundle
334  }
335  }
336  }
337 
338  void UDPConvergenceLayer::eventNotify(const ibrcommon::LinkEvent &evt)
339  {
340  if (evt.getInterface() != _net) return;
341 
342  switch (evt.getAction())
343  {
344  case ibrcommon::LinkEvent::ACTION_ADDRESS_ADDED:
345  {
346  ibrcommon::vaddress bindaddr = evt.getAddress();
347  // convert the port into a string
348  std::stringstream ss; ss << _port;
349  bindaddr.setService(ss.str());
350  ibrcommon::udpsocket *sock = new ibrcommon::udpsocket(bindaddr);
351  try {
352  sock->up();
353  _vsocket.add(sock, evt.getInterface());
354  } catch (const ibrcommon::socket_exception&) {
355  delete sock;
356  }
357  break;
358  }
359 
360  case ibrcommon::LinkEvent::ACTION_ADDRESS_REMOVED:
361  {
362  ibrcommon::socketset socks = _vsocket.getAll();
363  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
364  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
365  if (sock->get_address().address() == evt.getAddress().address()) {
366  _vsocket.remove(sock);
367  sock->down();
368  delete sock;
369  break;
370  }
371  }
372  break;
373  }
374 
375  case ibrcommon::LinkEvent::ACTION_LINK_DOWN:
376  {
377  ibrcommon::socketset socks = _vsocket.get(evt.getInterface());
378  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
379  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
380  _vsocket.remove(sock);
381  sock->down();
382  delete sock;
383  }
384  break;
385  }
386 
387  default:
388  break;
389  }
390  }
391 
393  {
394  // routine checked for throw() on 15.02.2013
395  try {
396  // create sockets for all addresses on the interface
397  std::list<ibrcommon::vaddress> addrs = _net.getAddresses();
398 
399  // convert the port into a string
400  std::stringstream ss; ss << _port;
401 
402  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
403  ibrcommon::vaddress &addr = (*iter);
404 
405  try {
406  // handle the addresses according to their family
407  switch (addr.family()) {
408  case AF_INET:
409  case AF_INET6:
410  addr.setService(ss.str());
411  _vsocket.add(new ibrcommon::udpsocket(addr), _net);
412  break;
413  default:
414  break;
415  }
416  } catch (const ibrcommon::vaddress::address_exception &ex) {
417  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
418  }
419  }
420 
421  _vsocket.up();
422 
423  // subscribe to NetLink events on our interfaces
424  ibrcommon::LinkManager::getInstance().addEventListener(_net, this);
425 
426  // register as discovery handler for this interface
428  } catch (const ibrcommon::socket_exception &ex) {
429  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", error) << "bind failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
430  }
431  }
432 
434  {
435  // unsubscribe to NetLink events
436  ibrcommon::LinkManager::getInstance().removeEventListener(this);
437 
438  // unregister as discovery handler for this interface
440 
441  _vsocket.destroy();
442  stop();
443  join();
444  }
445 
447  {
448  _running = true;
449 
450  // create a filter context
451  dtn::core::FilterContext context;
453 
454  while (_running)
455  {
456  try {
457  dtn::data::Bundle bundle;
458  EID sender;
459 
460  receive(bundle, sender);
461 
462  // push bundle through the filter routines
463  context.setBundle(bundle);
465 
466  switch (ret) {
467  case BundleFilter::ACCEPT:
468  // raise default bundle received event
469  dtn::net::BundleReceivedEvent::raise(sender, bundle, false);
470  break;
471 
472  case BundleFilter::REJECT:
473  case BundleFilter::DROP:
474  break;
475  }
476 
477  } catch (const dtn::InvalidDataException &ex) {
478  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 2) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
479  } catch (const std::exception&) {
480  return;
481  }
482  yield();
483  }
484  }
485 
487  {
488  _running = false;
489  _vsocket.down();
490  }
491 
492  const std::string UDPConvergenceLayer::getName() const
493  {
494  return "UDPConvergenceLayer";
495  }
496  }
497 }
static Configuration & getInstance(bool reset=false)
void decode(std::string &address, unsigned int &port) const
Definition: Node.cpp:47
void eventNotify(const ibrcommon::LinkEvent &evt)
const Configuration::Discovery & getDiscovery() const
dtn::core::Node::Protocol getDiscoveryProtocol() const
virtual Length getLength(const dtn::data::Bundle &obj)
Definition: Serializer.cpp:382
size_t Length
Definition: Number.h:33
void setBundle(const dtn::data::Bundle &data)
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
Definition: BundleCore.cpp:265
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
Definition: BundleCore.cpp:598
void onUpdateBeacon(const ibrcommon::vinterface &iface, DiscoveryBeacon &announcement)
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void setProtocol(const dtn::core::Node::Protocol &protocol)
bool get(FLAGS flag) const
void setPeer(const dtn::data::EID &endpoint)
bool _running
Definition: dtninbox.cpp:122
virtual const std::string getName() const
std::map< string, string > stats_data
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
std::list< URI > get(Node::Protocol proto) const
Definition: Node.cpp:325
void abort(const TransferAbortedEvent::AbortReason reason)
const dtn::data::MetaBundle & getBundle() const
std::string toString() const
Definition: Node.cpp:510
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82
virtual void getStats(ConvergenceLayer::stats_data &data) const