IBR-DTN  1.0.0
DatagramConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * DatagramConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "net/DatagramConnection.h"
24 
25 #include "core/BundleCore.h"
26 #include "core/NodeEvent.h"
27 #include "core/EventDispatcher.h"
28 
29 #include <ibrcommon/Logger.h>
30 #include <ibrcommon/thread/MutexLock.h>
31 #include <ibrcommon/thread/RWLock.h>
32 
33 #include <string.h>
34 #include <vector>
35 
36 namespace dtn
37 {
38  namespace net
39  {
40  const std::string DatagramConvergenceLayer::TAG = "DatagramConvergenceLayer";
41 
43  : _service(ds), _receiver(*this), _running(false),
44  _stats_in(0), _stats_out(0), _stats_rtt(0.0), _stats_retries(0), _stats_failure(0)
45  {
46  }
47 
49  {
50  // wait until the component thread is terminated
51  join();
52 
53  // wait until all connections are down
54  {
55  ibrcommon::MutexLock l(_cond_connections);
56  while (_connections.size() > 0) _cond_connections.wait();
57  }
58 
59  // delete the associated service
60  delete _service;
61  }
62 
64  {
65  if (event.getAction() == NODE_UNAVAILABLE)
66  {
67  NodeGone *gone = new NodeGone();
68  gone->eid = event.getNode().getEID();
69  _action_queue.push(gone);
70  }
71  }
72 
74  {
75  _stats_in = 0;
76  _stats_out = 0;
77  _stats_rtt = 0.0;
78  _stats_retries = 0;
79  _stats_failure = 0;
80  }
81 
83  {
84  std::stringstream ss_format;
85 
86  static const std::string IN_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|in";
87  static const std::string OUT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|out";
88 
89  static const std::string RTT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|rtt";
90  static const std::string RETRIES_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|retries";
91  static const std::string FAIL_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|fail";
92 
93  ss_format << _stats_in;
94  data[IN_TAG] = ss_format.str();
95  ss_format.str("");
96 
97  ss_format << _stats_out;
98  data[OUT_TAG] = ss_format.str();
99  ss_format.str("");
100 
101  ss_format << _stats_rtt;
102  data[RTT_TAG] = ss_format.str();
103  ss_format.str("");
104 
105  ss_format << _stats_retries;
106  data[RETRIES_TAG] = ss_format.str();
107  ss_format.str("");
108 
109  ss_format << _stats_failure;
110  data[FAIL_TAG] = ss_format.str();
111  }
112 
114  {
115  return _service->getProtocol();
116  }
117 
118  void DatagramConvergenceLayer::callback_send(DatagramConnection&, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len) throw (DatagramException)
119  {
120  // only on sender at once
121  ibrcommon::MutexLock l(_send_lock);
122 
123  // forward the send request to DatagramService
124  _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len);
125 
126  // traffic monitoring
127  _stats_out += len;
128  }
129 
130  void DatagramConvergenceLayer::callback_ack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException)
131  {
132  // only on sender at once
133  ibrcommon::MutexLock l(_send_lock);
134 
135  // forward the send request to DatagramService
136  _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0);
137  }
138 
139  void DatagramConvergenceLayer::callback_nack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException)
140  {
141  // only on sender at once
142  ibrcommon::MutexLock l(_send_lock);
143 
144  // forward the send request to DatagramService
145  _service->send(HEADER_NACK, 0, seqno, destination, NULL, 0);
146  }
147 
149  {
150  // do not queue any new jobs if the convergence layer goes down
151  if (!_running) return;
152 
153  const std::list<dtn::core::Node::URI> uri_list = node.get(_service->getProtocol());
154  if (uri_list.empty()) return;
155 
156  // get the first element of the result
157  const dtn::core::Node::URI &uri = uri_list.front();
158 
159  // some debugging
160  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "job queued for " << node.getEID().getString() << IBRCOMMON_LOGGER_ENDL;
161 
162  QueueBundle *queue = new QueueBundle(job);
163  queue->uri = uri.value;
164 
165  _action_queue.push( queue );
166  }
167 
168  DatagramConnection& DatagramConvergenceLayer::getConnection(const std::string &identifier, bool create) throw (ConnectionNotAvailableException)
169  {
170  DatagramConnection *connection = NULL;
171 
172  // Test if connection for this address already exist
173  for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
174  {
175  if ((*i)->getIdentifier() == identifier)
176  return *(*i);
177  }
178 
179  // throw exception if we should not create new connections
180  if (!create) throw ConnectionNotAvailableException();
181 
182  // Connection does not exist, create one and put it into the list
183  connection = new DatagramConnection(identifier, _service->getParameter(), (*this));
184 
185  // increment the number of active connections
186  {
187  ibrcommon::MutexLock l(_cond_connections);
188 
189  // add a new connection to the list of connections
190  _connections.push_back(connection);
191 
192  // signal the modified connection list
193  _cond_connections.signal(true);
194  }
195 
196  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Selected identifier: " << connection->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
197  connection->start();
198  return *connection;
199  }
200 
201  void DatagramConvergenceLayer::reportSuccess(size_t retries, double rtt)
202  {
203  _stats_rtt = rtt;
204  _stats_retries += retries;
205  }
206 
208  {
209  _stats_failure++;
210  }
211 
213  {
214  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Up: " << conn->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
215  }
216 
218  {
219  ConnectionDown *cd = new ConnectionDown();
220  cd->id = conn->getIdentifier();
221  _action_queue.push(cd);
222  }
223 
225  {
226  // routine checked for throw() on 15.02.2013
227  try {
228  _service->bind();
229  } catch (const std::exception &e) {
230  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << "bind to " << _service->getInterface().toString() << " failed (" << e.what() << ")" << IBRCOMMON_LOGGER_ENDL;
231  }
232 
233  // register for NodeEvent objects
235 
236  // register for discovery beacon handling
238 
239  // set the running variable
240  _running = true;
241  }
242 
244  {
245  // un-register for discovery beacon handling
247 
248  // un-register for NodeEvent objects
250 
251  _action_queue.push(new Shutdown());
252  }
253 
254  void DatagramConvergenceLayer::onAdvertiseBeacon(const ibrcommon::vinterface &iface, const DiscoveryBeacon &beacon) throw ()
255  {
256  // only handler beacons for this interface
257  if (iface != _service->getInterface()) return;
258 
259  // serialize announcement
260  stringstream ss;
261  ss << beacon;
262 
263  std::streamsize len = ss.str().size();
264 
265  try {
266  // only on sender at once
267  ibrcommon::MutexLock l(_send_lock);
268 
269  // forward the send request to DatagramService
270  _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(), static_cast<dtn::data::Length>(len));
271  } catch (const DatagramException&) {
272  // ignore any send failure
273  };
274  }
275 
277  {
278  size_t maxlen = _service->getParameter().max_msg_length;
279  std::string address;
280  unsigned int seqno = 0;
281  char flags = 0;
282  char type = 0;
283  std::vector<char> data(maxlen);
284  size_t len = 0;
285 
286  // get the reference to the discovery agent
288 
289  while (_running)
290  {
291  try {
292  // Receive full frame from socket
293  len = _service->recvfrom(&data[0], maxlen, type, flags, seqno, address);
294 
295  // traffic monitoring
296  _stats_in += len;
297  } catch (const DatagramException &ex) {
298  if (_running) {
299  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << "recvfrom() failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
300  }
301  _running = false;
302  break;
303  }
304 
305  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "receive() Address: " << address << IBRCOMMON_LOGGER_ENDL;
306 
307  // Check for extended header and retrieve if available
308  if (type == HEADER_BROADCAST)
309  {
310  try {
311  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "receive() Announcement received" << IBRCOMMON_LOGGER_ENDL;
312 
313  DiscoveryBeacon beacon = agent.obtainBeacon();
314 
315  stringstream ss;
316  ss.write(&data[0], len);
317  ss >> beacon;
318 
319  // ignore own beacons
320  if (beacon.getEID() == dtn::core::BundleCore::local) continue;
321 
322  if (beacon.isShort())
323  {
324  // can not generate node name from short beacons
325  continue;
326  }
327 
328  // add discovered service entry
329  beacon.addService(dtn::net::DiscoveryService(_service->getProtocol(), address));
330 
331  BeaconReceived *bc = new BeaconReceived();
332  bc->address = address;
333  bc->data = beacon;
334  _action_queue.push(bc);
335  } catch (const ibrcommon::Exception&) {
336  // catch wrong formats
337  }
338 
339  continue;
340  }
341  else if ( type == HEADER_SEGMENT )
342  {
343  SegmentReceived *seg = new SegmentReceived(maxlen);
344  seg->address = address;
345  seg->seqno = seqno;
346  seg->flags = flags;
347  seg->data = data;
348  seg->len = len;
349  _action_queue.push(seg);
350  }
351  else if ( type == HEADER_ACK )
352  {
353  AckReceived *ack = new AckReceived();
354  ack->address = address;
355  ack->seqno = seqno;
356  _action_queue.push(ack);
357  }
358  else if ( type == HEADER_NACK )
359  {
360  NackReceived *nack = new NackReceived();
361  nack->address = address;
362  nack->seqno = seqno;
363  nack->temporary = flags & DatagramService::NACK_TEMPORARY;
364  _action_queue.push(nack);
365  }
366  }
367  }
368 
370  {
371  // start receiver
372  _receiver.init();
373  _receiver.start();
374 
375  // get the reference to the discovery agent
377 
378  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "componentRun() entered" << IBRCOMMON_LOGGER_ENDL;
379 
380  try {
381  while (_running || (_connections.size() > 0))
382  {
383  Action *action = _action_queue.poll();
384 
385  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "processing task" << IBRCOMMON_LOGGER_ENDL;
386 
387  try {
388  AckReceived &ack = dynamic_cast<AckReceived&>(*action);
389 
390  try {
391  // Connection instance for this address
392  DatagramConnection& connection = getConnection(ack.address, false);
393 
394  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 20) << "ack received for seqno " << ack.seqno << IBRCOMMON_LOGGER_ENDL;
395 
396  // Decide in which queue to write based on the src address
397  connection.ack(ack.seqno);
398  } catch (const ConnectionNotAvailableException &ex) {
399  // connection does not exists - ignore the ACK
400  }
401  } catch (const std::bad_cast&) { };
402 
403  try {
404  NackReceived &nack = dynamic_cast<NackReceived&>(*action);
405 
406  // the peer refused the current bundle
407  try {
408  // Connection instance for this address
409  DatagramConnection& connection = getConnection(nack.address, false);
410 
411  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 20) << "nack received for seqno " << nack.seqno << IBRCOMMON_LOGGER_ENDL;
412 
413  // Decide in which queue to write based on the src address
414  connection.nack(nack.seqno, nack.temporary);
415  } catch (const ConnectionNotAvailableException &ex) {
416  // connection does not exists - ignore the NACK
417  }
418  } catch (const std::bad_cast&) { };
419 
420  try {
421  SegmentReceived &segment = dynamic_cast<SegmentReceived&>(*action);
422 
423  // Connection instance for this address
424  DatagramConnection& connection = getConnection(segment.address, true);
425 
426  try {
427  // Decide in which queue to write based on the src address
428  connection.queue(segment.flags, segment.seqno, &segment.data[0], segment.len);
429  } catch (const ibrcommon::Exception &ex) {
430  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
431  connection.shutdown();
432  };
433  } catch (const std::bad_cast&) { };
434 
435  try {
436  BeaconReceived &beacon = dynamic_cast<BeaconReceived&>(*action);
437 
438  // Connection instance for this address
439  DatagramConnection& connection = getConnection(beacon.address, true);
440  connection.setPeerEID(beacon.data.getEID());
441 
442  // announce the received beacon
443  agent.onBeaconReceived(beacon.data);
444  } catch (const std::bad_cast&) { };
445 
446  try {
447  ConnectionDown &cd = dynamic_cast<ConnectionDown&>(*action);
448 
449  ibrcommon::MutexLock l(_cond_connections);
450  for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
451  {
452  if ((*i)->getIdentifier() == cd.id)
453  {
454  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Down: " << cd.id << IBRCOMMON_LOGGER_ENDL;
455  _connections.erase(i);
456 
457  // delete the connection
458  delete (*i);
459 
460  // signal the modified connection list
461  _cond_connections.signal(true);
462  break;
463  }
464  }
465  } catch (const std::bad_cast&) { };
466 
467  try {
468  NodeGone &gone = dynamic_cast<NodeGone&>(*action);
469 
470  for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
471  {
472  if ((*i)->getPeerEID() == gone.eid)
473  {
474  // shutdown the connection
475  (*i)->shutdown();
476  break;
477  }
478  }
479  } catch (const std::bad_cast&) { };
480 
481  try {
482  QueueBundle &queue = dynamic_cast<QueueBundle&>(*action);
483 
484  // get a new or the existing connection for this address
485  DatagramConnection &conn = getConnection( queue.uri, true );
486 
487  // queue the job to the connection
488  conn.queue(queue.job);
489  } catch (const std::bad_cast&) { };
490 
491  try {
492  dynamic_cast<Shutdown&>(*action);
493 
494  // shutdown all connections
495  for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
496  {
497  (*i)->shutdown();
498  }
499 
500  _running = false;
501  } catch (const std::bad_cast&) { };
502 
503  // delete task object
504  delete action;
505 
506  yield();
507  }
508  } catch (const ibrcommon::QueueUnblockedException &ex) {
509  // unblocked
510  }
511 
512  _receiver.join();
513  }
514 
516  {
517  _service->shutdown();
518  }
519 
520  const std::string DatagramConvergenceLayer::getName() const
521  {
522  return DatagramConvergenceLayer::TAG;
523  }
524 
525  DatagramConvergenceLayer::Receiver::Receiver(DatagramConvergenceLayer &cl)
526  : _cl(cl)
527  {
528  }
529 
530  DatagramConvergenceLayer::Receiver::~Receiver()
531  {
532  }
533 
535  {
536  // reset receiver is necessary
537  if (JoinableThread::isFinalized()) JoinableThread::reset();
538  }
539 
540  void DatagramConvergenceLayer::Receiver::run() throw ()
541  {
542  _cl.receive();
543  }
544 
545  void DatagramConvergenceLayer::Receiver::__cancellation() throw ()
546  {
547  }
548  } /* namespace data */
549 } /* namespace dtn */
DiscoveryBeacon obtainBeacon() const
static dtn::data::EID local
Definition: BundleCore.h:79
virtual const ibrcommon::vinterface & getInterface() const =0
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
static void add(EventReceiver< E > *receiver)
virtual void bind()=0
void nack(const unsigned int &seqno, const bool temporary)
void callback_nack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination)
size_t Length
Definition: Number.h:33
void onAdvertiseBeacon(const ibrcommon::vinterface &iface, const DiscoveryBeacon &beacon)
static void remove(const EventReceiver< E > *receiver)
virtual const Parameter & getParameter() const =0
void connectionUp(const DatagramConnection *conn)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
Definition: BundleCore.cpp:265
virtual const std::string getName() const
void queue(const dtn::net::BundleTransfer &job)
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void raiseEvent(const dtn::core::NodeEvent &evt)
bool _running
Definition: dtninbox.cpp:122
std::string value
Definition: Node.h:89
void reportSuccess(size_t retries, double rtt)
virtual void getStats(ConvergenceLayer::stats_data &data) const
virtual void shutdown()=0
std::map< string, string > stats_data
int init(int argc, char **argv)
Definition: dtntrigger.cpp:64
std::list< URI > get(Node::Protocol proto) const
Definition: Node.cpp:325
void onBeaconReceived(const DiscoveryBeacon &beacon)
std::string getString() const
Definition: EID.cpp:374
std::string toString() const
Definition: Node.cpp:510
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
dtn::core::Node::Protocol getDiscoveryProtocol() const
void connectionDown(const DatagramConnection *conn)
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
void callback_send(DatagramConnection &connection, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len)
const std::string & getIdentifier() const
virtual size_t recvfrom(char *buf, size_t length, char &type, char &flags, unsigned int &seqno, std::string &address)=0
void callback_ack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination)
void setPeerEID(const dtn::data::EID &peer)
virtual dtn::core::Node::Protocol getProtocol() const =0
static BundleCore & getInstance()
Definition: BundleCore.cpp:82
void addService(const DiscoveryService &service)
void ack(const unsigned int &seqno)