28 #include <ibrcommon/data/BLOB.h>
29 #include <ibrcommon/Logger.h>
30 #include <ibrcommon/thread/RWLock.h>
31 #include <ibrcommon/thread/MutexLock.h>
41 const std::string NativeSession::TAG =
"NativeSession";
44 : _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
52 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) <<
"Session created" << IBRCOMMON_LOGGER_ENDL;
56 : _registration(handle), _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
64 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) <<
"Session created" << IBRCOMMON_LOGGER_ENDL;
71 ibrcommon::RWLock l(_cb_mutex);
73 _serializer_cb = NULL;
76 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) <<
"Session destroyed" << IBRCOMMON_LOGGER_ENDL;
84 _registration.
abort();
94 ibrcommon::MutexLock l(_cb_mutex);
95 if (_session_cb == NULL)
return;
96 _session_cb->notifyBundle(
id);
101 ibrcommon::MutexLock l(_cb_mutex);
102 if (_session_cb == NULL)
return;
103 _session_cb->notifyStatusReport(source, report);
108 ibrcommon::MutexLock l(_cb_mutex);
109 if (_session_cb == NULL)
return;
110 _session_cb->notifyCustodySignal(source, custody);
116 if (suffix.length() <= 0)
123 _registration.unsubscribe(_endpoint);
126 _endpoint.setApplication(suffix);
129 _registration.subscribe(_endpoint);
132 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
141 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Endpoint set to " << _endpoint.
getString() << IBRCOMMON_LOGGER_ENDL;
147 if (suffix.length() <= 0)
155 _registration.subscribe(new_endpoint);
158 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Endpoint " << suffix <<
" added" << IBRCOMMON_LOGGER_ENDL;
164 if (suffix.length() <= 0)
172 _registration.unsubscribe(old_endpoint);
175 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Endpoint " << suffix <<
" removed" << IBRCOMMON_LOGGER_ENDL;
187 _registration.subscribe(eid);
190 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Registration " << eid.getString() <<
" added" << IBRCOMMON_LOGGER_ENDL;
202 _registration.unsubscribe(eid);
205 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Registration " << eid.getString() <<
" removed" << IBRCOMMON_LOGGER_ENDL;
213 for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
215 if (e != _endpoint) {
220 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Registrations cleared" << IBRCOMMON_LOGGER_ENDL;
226 std::vector<std::string> ret;
227 for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
228 ret.push_back((*it).getString());
238 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Next bundle in queue is " <<
id.
toString() << IBRCOMMON_LOGGER_ENDL;
241 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
242 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) <<
"No next bundle available" << IBRCOMMON_LOGGER_ENDL;
257 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Bundle " <<
id.toString() <<
" loaded" << IBRCOMMON_LOGGER_ENDL;
258 }
catch (
const ibrcommon::Exception &e) {
261 IBRCOMMON_LOGGER_TAG(NativeSession::TAG, warning) <<
"Failed to process bundle " <<
id.toString() <<
": " << e.what() << IBRCOMMON_LOGGER_ENDL;
263 }
catch (
const ibrcommon::Exception &ex) {
264 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) <<
"Failed to load bundle " <<
id.toString() <<
", Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
271 ibrcommon::MutexLock l(_cb_mutex);
272 if (_serializer_cb == NULL)
return;
276 serializer << _bundle[ri];
277 }
catch (
const ibrcommon::Exception &ex) {
278 IBRCOMMON_LOGGER_TAG(NativeSession::TAG, error) <<
"Get failed " << ex.what() << IBRCOMMON_LOGGER_ENDL;
284 ibrcommon::MutexLock l(_cb_mutex);
285 if (_serializer_cb == NULL)
return;
289 serializer << _bundle[ri];
290 }
catch (
const ibrcommon::Exception &ex) {
291 IBRCOMMON_LOGGER_TAG(NativeSession::TAG, error) <<
"Get failed " << ex.what() << IBRCOMMON_LOGGER_ENDL;
300 }
catch (
const ibrcommon::Exception&) {
315 _registration.delivered(meta);
317 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Bundle " <<
id.
toString() <<
" marked as delivered" << IBRCOMMON_LOGGER_ENDL;
318 }
catch (
const ibrcommon::Exception&) {
328 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"Bundle " << _bundle[ri].toString() <<
" sent" << IBRCOMMON_LOGGER_ENDL;
353 ibrcommon::BLOB::Reference ref = payload.
getBLOB();
354 ibrcommon::BLOB::iostream stream = ref.iostream();
356 std::streamsize stream_size = stream.size();
358 if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
359 (*stream).seekp(0, std::ios_base::end);
361 (*stream).seekp(offset);
364 (*stream).write(buf, len);
365 (*stream) << std::flush;
369 ibrcommon::BLOB::Reference ref = payload.
getBLOB();
370 ibrcommon::BLOB::iostream stream = ref.iostream();
372 std::streamsize stream_size = stream.size();
374 if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
375 (*stream).seekp(0, std::ios_base::end);
377 (*stream).seekp(offset);
379 (*stream).write(buf, len);
380 (*stream) << std::flush;
383 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << len <<
" bytes added to the payload" << IBRCOMMON_LOGGER_ENDL;
391 ibrcommon::BLOB::Reference ref = payload.
getBLOB();
392 ibrcommon::BLOB::iostream stream = ref.iostream();
394 (*stream).seekg(offset);
395 (*stream).read(buf, len);
397 len = (*stream).gcount();
399 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) <<
"no payload block available" << IBRCOMMON_LOGGER_ENDL;
404 NativeSession::BundleReceiver::BundleReceiver(
NativeSession &session)
409 NativeSession::BundleReceiver::~BundleReceiver()
416 if (queued.bundle.isFragment())
return;
418 if (_session._registration.hasSubscribed(queued.bundle.destination))
433 fireNotificationAdministrativeRecord(
id);
438 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"fire notification for new bundle " <<
id.toString() << IBRCOMMON_LOGGER_ENDL;
441 _bundle_queue.push(
id);
444 fireNotificationBundle(
id);
447 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) <<
"no more bundles found - wait until we are notified" << IBRCOMMON_LOGGER_ENDL;
450 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
451 throw NativeSessionException(std::string(
"loop aborted - ") + ex.what());
452 }
catch (
const std::exception &ex) {
453 throw NativeSessionException(std::string(
"loop aborted - ") + ex.what());
468 report.
read(payload);
470 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"fire notification for status report" << IBRCOMMON_LOGGER_ENDL;
473 fireNotificationStatusReport(b.
source, report);
481 custody.
read(payload);
483 IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) <<
"fire notification for custody signal" << IBRCOMMON_LOGGER_ENDL;
486 fireNotificationCustodySignal(b.
source, custody);
std::string toString() const
void delivered(const dtn::data::MetaBundle &m) const
void read(RegisterIndex ri, char *buf, size_t &len, const size_t offset=0)
void setEndpoint(const std::string &suffix)
static dtn::data::EID local
static void add(EventReceiver< E > *receiver)
virtual void read(const dtn::data::PayloadBlock &p)
void subscribe(const dtn::data::EID &endpoint)
virtual void read(const dtn::data::PayloadBlock &p)
void setApplication(const dtn::data::Number &app)
const std::string & getHandle() const
static void remove(const EventReceiver< E > *receiver)
void removeEndpoint(const std::string &suffix)
std::vector< std::string > getSubscriptions()
void write(RegisterIndex ri, const char *buf, const size_t len, const size_t offset=std::string::npos)
const dtn::data::EID & getNodeEID() const
dtn::data::MetaBundle receiveMetaBundle()
const dtn::data::EID & getDefaultEID() const
NativeSession(NativeSessionCallback *session_cb, NativeSerializerCallback *serializer_cb)
void load(RegisterIndex ri, const dtn::data::BundleID &id)
void removeRegistration(const dtn::data::EID &eid)
virtual void remove(const dtn::data::BundleID &id)=0
virtual dtn::data::MetaBundle info(const dtn::data::BundleID &id)=0
void unsubscribe(const dtn::data::EID &endpoint)
void put(RegisterIndex ri, const dtn::data::Bundle &b)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
virtual ~NativeSessionCallback()=0
void next(RegisterIndex ri)
void free(RegisterIndex ri)
const std::set< dtn::data::EID > getSubscriptions()
void clear(RegisterIndex ri)
static void processBlocks(dtn::data::Bundle &b)
std::string getString() const
void wait_for_bundle(size_t timeout=0)
iterator find(block_t blocktype)
ibrcommon::BLOB::Reference getBLOB() const
void delivered(const dtn::data::BundleID &id) const
void addEndpoint(const std::string &suffix)
const std::string & getHandle() const
dtn::storage::BundleStorage & getStorage()
void getInfo(RegisterIndex ri)
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
void get(RegisterIndex ri)
void addRegistration(const dtn::data::EID &eid)
dtn::data::BundleID send(RegisterIndex ri)
static BundleCore & getInstance()