29 #include <ibrcommon/Logger.h>
30 #include <ibrcommon/thread/MutexLock.h>
31 #include <ibrcommon/thread/RWLock.h>
40 const std::string DatagramConvergenceLayer::TAG =
"DatagramConvergenceLayer";
43 : _service(ds), _receiver(*this),
_running(false),
44 _stats_in(0), _stats_out(0), _stats_rtt(0.0), _stats_retries(0), _stats_failure(0)
55 ibrcommon::MutexLock l(_cond_connections);
56 while (_connections.size() > 0) _cond_connections.wait();
67 NodeGone *gone =
new NodeGone();
68 gone->eid =
event.getNode().getEID();
69 _action_queue.push(gone);
84 std::stringstream ss_format;
93 ss_format << _stats_in;
94 data[IN_TAG] = ss_format.str();
97 ss_format << _stats_out;
98 data[OUT_TAG] = ss_format.str();
101 ss_format << _stats_rtt;
102 data[RTT_TAG] = ss_format.str();
105 ss_format << _stats_retries;
106 data[RETRIES_TAG] = ss_format.str();
109 ss_format << _stats_failure;
110 data[FAIL_TAG] = ss_format.str();
121 ibrcommon::MutexLock l(_send_lock);
124 _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len);
133 ibrcommon::MutexLock l(_send_lock);
136 _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0);
142 ibrcommon::MutexLock l(_send_lock);
145 _service->send(HEADER_NACK, 0, seqno, destination, NULL, 0);
151 if (!_running)
return;
153 const std::list<dtn::core::Node::URI> uri_list = node.
get(_service->
getProtocol());
154 if (uri_list.empty())
return;
160 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"job queued for " << node.
getEID().
getString() << IBRCOMMON_LOGGER_ENDL;
162 QueueBundle *
queue =
new QueueBundle(job);
163 queue->uri = uri.
value;
165 _action_queue.push( queue );
173 for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
175 if ((*i)->getIdentifier() == identifier)
187 ibrcommon::MutexLock l(_cond_connections);
190 _connections.push_back(connection);
193 _cond_connections.signal(
true);
196 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"Selected identifier: " << connection->
getIdentifier() << IBRCOMMON_LOGGER_ENDL;
204 _stats_retries += retries;
214 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"Up: " << conn->
getIdentifier() << IBRCOMMON_LOGGER_ENDL;
219 ConnectionDown *cd =
new ConnectionDown();
221 _action_queue.push(cd);
229 }
catch (
const std::exception &e) {
230 IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) <<
"bind to " << _service->
getInterface().toString() <<
" failed (" << e.what() <<
")" << IBRCOMMON_LOGGER_ENDL;
251 _action_queue.push(
new Shutdown());
257 if (iface != _service->getInterface())
return;
263 std::streamsize len = ss.str().size();
267 ibrcommon::MutexLock l(_send_lock);
270 _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(),
static_cast<dtn::data::Length>(len));
280 unsigned int seqno = 0;
283 std::vector<char> data(maxlen);
293 len = _service->
recvfrom(&data[0], maxlen, type, flags, seqno, address);
299 IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) <<
"recvfrom() failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
305 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"receive() Address: " << address << IBRCOMMON_LOGGER_ENDL;
311 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"receive() Announcement received" << IBRCOMMON_LOGGER_ENDL;
316 ss.write(&data[0], len);
322 if (beacon.isShort())
331 BeaconReceived *bc =
new BeaconReceived();
332 bc->address = address;
334 _action_queue.push(bc);
335 }
catch (
const ibrcommon::Exception&) {
343 SegmentReceived *seg =
new SegmentReceived(maxlen);
344 seg->address = address;
349 _action_queue.push(seg);
353 AckReceived *ack =
new AckReceived();
354 ack->address = address;
356 _action_queue.push(ack);
360 NackReceived *nack =
new NackReceived();
361 nack->address = address;
364 _action_queue.push(nack);
378 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"componentRun() entered" << IBRCOMMON_LOGGER_ENDL;
381 while (_running || (_connections.size() > 0))
383 Action *action = _action_queue.poll();
385 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"processing task" << IBRCOMMON_LOGGER_ENDL;
388 AckReceived &ack =
dynamic_cast<AckReceived&
>(*action);
394 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 20) <<
"ack received for seqno " << ack.seqno << IBRCOMMON_LOGGER_ENDL;
397 connection.
ack(ack.seqno);
398 }
catch (
const ConnectionNotAvailableException &ex) {
401 }
catch (
const std::bad_cast&) { };
404 NackReceived &nack =
dynamic_cast<NackReceived&
>(*action);
411 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 20) <<
"nack received for seqno " << nack.seqno << IBRCOMMON_LOGGER_ENDL;
414 connection.
nack(nack.seqno, nack.temporary);
415 }
catch (
const ConnectionNotAvailableException &ex) {
418 }
catch (
const std::bad_cast&) { };
421 SegmentReceived &segment =
dynamic_cast<SegmentReceived&
>(*action);
428 connection.
queue(segment.flags, segment.seqno, &segment.data[0], segment.len);
429 }
catch (
const ibrcommon::Exception &ex) {
430 IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
433 }
catch (
const std::bad_cast&) { };
436 BeaconReceived &beacon =
dynamic_cast<BeaconReceived&
>(*action);
444 }
catch (
const std::bad_cast&) { };
447 ConnectionDown &cd =
dynamic_cast<ConnectionDown&
>(*action);
449 ibrcommon::MutexLock l(_cond_connections);
450 for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
452 if ((*i)->getIdentifier() == cd.id)
454 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) <<
"Down: " << cd.id << IBRCOMMON_LOGGER_ENDL;
455 _connections.erase(i);
461 _cond_connections.signal(
true);
465 }
catch (
const std::bad_cast&) { };
468 NodeGone &gone =
dynamic_cast<NodeGone&
>(*action);
470 for (connection_list::iterator i = _connections.begin(); i != _connections.end(); ++i)
472 if ((*i)->getPeerEID() == gone.eid)
479 }
catch (
const std::bad_cast&) { };
482 QueueBundle &
queue =
dynamic_cast<QueueBundle&
>(*action);
488 conn.
queue(queue.job);
489 }
catch (
const std::bad_cast&) { };
492 dynamic_cast<Shutdown&
>(*action);
495 for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
501 }
catch (
const std::bad_cast&) { };
508 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
522 return DatagramConvergenceLayer::TAG;
530 DatagramConvergenceLayer::Receiver::~Receiver()
537 if (JoinableThread::isFinalized()) JoinableThread::reset();
540 void DatagramConvergenceLayer::Receiver::run() throw ()
545 void DatagramConvergenceLayer::Receiver::__cancellation() throw ()
DiscoveryBeacon obtainBeacon() const
static dtn::data::EID local
virtual const ibrcommon::vinterface & getInterface() const =0
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
static void add(EventReceiver< E > *receiver)
void nack(const unsigned int &seqno, const bool temporary)
void callback_nack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination)
virtual void componentRun()
void onAdvertiseBeacon(const ibrcommon::vinterface &iface, const DiscoveryBeacon &beacon)
static void remove(const EventReceiver< E > *receiver)
virtual const Parameter & getParameter() const =0
void connectionUp(const DatagramConnection *conn)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
virtual const std::string getName() const
void queue(const dtn::net::BundleTransfer &job)
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void raiseEvent(const dtn::core::NodeEvent &evt)
virtual ~DatagramConvergenceLayer()
void reportSuccess(size_t retries, double rtt)
virtual void getStats(ConvergenceLayer::stats_data &data) const
virtual void shutdown()=0
std::map< string, string > stats_data
int init(int argc, char **argv)
std::list< URI > get(Node::Protocol proto) const
void onBeaconReceived(const DiscoveryBeacon &beacon)
virtual void resetStats()
std::string getString() const
std::string toString() const
virtual void componentUp()
const dtn::data::EID & getEID() const
dtn::core::Node::Protocol getDiscoveryProtocol() const
void connectionDown(const DatagramConnection *conn)
DatagramConvergenceLayer(DatagramService *ds)
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
void callback_send(DatagramConnection &connection, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len)
const std::string & getIdentifier() const
virtual size_t recvfrom(char *buf, size_t length, char &type, char &flags, unsigned int &seqno, std::string &address)=0
void callback_ack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination)
void setPeerEID(const dtn::data::EID &peer)
virtual dtn::core::Node::Protocol getProtocol() const =0
static BundleCore & getInstance()
void addService(const DiscoveryService &service)
virtual void componentDown()
void ack(const unsigned int &seqno)