IBR-DTN  1.0.0
NativeSession.cpp
Go to the documentation of this file.
1 /*
2  * NativeSession.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "api/NativeSession.h"
23 #include "api/NativeSerializer.h"
24 #include "core/BundleCore.h"
25 #include "core/EventDispatcher.h"
26 
28 #include <ibrcommon/data/BLOB.h>
29 #include <ibrcommon/Logger.h>
30 #include <ibrcommon/thread/RWLock.h>
31 #include <ibrcommon/thread/MutexLock.h>
32 
33 namespace dtn
34 {
35  namespace api
36  {
38  {
39  }
40 
41  const std::string NativeSession::TAG = "NativeSession";
42 
44  : _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
45  {
46  // set the local endpoint to the default
47  _endpoint = _registration.getDefaultEID();
48 
49  // listen to QueueBundleEvents
51 
52  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Session created" << IBRCOMMON_LOGGER_ENDL;
53  }
54 
55  NativeSession::NativeSession(NativeSessionCallback *session_cb, NativeSerializerCallback *serializer_cb, const std::string &handle)
56  : _registration(handle), _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
57  {
58  // set the local endpoint to the default
59  _endpoint = _registration.getDefaultEID();
60 
61  // listen to QueueBundleEvents
63 
64  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Session created" << IBRCOMMON_LOGGER_ENDL;
65  }
66 
68  {
69  // invalidate the callback pointer
70  {
71  ibrcommon::RWLock l(_cb_mutex);
72  _session_cb = NULL;
73  _serializer_cb = NULL;
74  }
75 
76  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Session destroyed" << IBRCOMMON_LOGGER_ENDL;
77  }
78 
79  void NativeSession::destroy() throw ()
80  {
81  // un-listen from QueueBundleEvents
83 
84  _registration.abort();
85  }
86 
87  const dtn::data::EID& NativeSession::getNodeEID() const throw ()
88  {
90  }
91 
92  void NativeSession::fireNotificationBundle(const dtn::data::BundleID &id) throw ()
93  {
94  ibrcommon::MutexLock l(_cb_mutex);
95  if (_session_cb == NULL) return;
96  _session_cb->notifyBundle(id);
97  }
98 
99  void NativeSession::fireNotificationStatusReport(const dtn::data::EID &source, const dtn::data::StatusReportBlock &report) throw ()
100  {
101  ibrcommon::MutexLock l(_cb_mutex);
102  if (_session_cb == NULL) return;
103  _session_cb->notifyStatusReport(source, report);
104  }
105 
106  void NativeSession::fireNotificationCustodySignal(const dtn::data::EID &source, const dtn::data::CustodySignalBlock &custody) throw ()
107  {
108  ibrcommon::MutexLock l(_cb_mutex);
109  if (_session_cb == NULL) return;
110  _session_cb->notifyCustodySignal(source, custody);
111  }
112 
113  void NativeSession::setEndpoint(const std::string &suffix) throw (NativeSessionException)
114  {
115  // error checking
116  if (suffix.length() <= 0)
117  {
118  throw NativeSessionException("given endpoint is not acceptable");
119  }
120  else
121  {
122  /* unsubscribe from the old endpoint and subscribe to the new one */
123  _registration.unsubscribe(_endpoint);
124 
125  // set new application part
126  _endpoint.setApplication(suffix);
127 
128  // subscribe to new endpoint
129  _registration.subscribe(_endpoint);
130  }
131 
132  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
133  }
134 
136  {
137  _registration.unsubscribe(_endpoint);
138  _endpoint = _registration.getDefaultEID();
139  _registration.subscribe(_endpoint);
140 
141  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
142  }
143 
144  void NativeSession::addEndpoint(const std::string &suffix) throw (NativeSessionException)
145  {
146  // error checking
147  if (suffix.length() <= 0)
148  {
149  throw NativeSessionException("given endpoint is not acceptable");
150  }
151  else
152  {
154  new_endpoint.setApplication( suffix );
155  _registration.subscribe(new_endpoint);
156  }
157 
158  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint " << suffix << " added" << IBRCOMMON_LOGGER_ENDL;
159  }
160 
161  void NativeSession::removeEndpoint(const std::string &suffix) throw (NativeSessionException)
162  {
163  // error checking
164  if (suffix.length() <= 0)
165  {
166  throw NativeSessionException("given endpoint is not acceptable");
167  }
168  else
169  {
171  old_endpoint.setApplication( suffix );
172  _registration.unsubscribe(old_endpoint);
173  }
174 
175  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint " << suffix << " removed" << IBRCOMMON_LOGGER_ENDL;
176  }
177 
179  {
180  // error checking
181  if (eid == dtn::data::EID())
182  {
183  throw NativeSessionException("given endpoint is not acceptable");
184  }
185  else
186  {
187  _registration.subscribe(eid);
188  }
189 
190  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registration " << eid.getString() << " added" << IBRCOMMON_LOGGER_ENDL;
191  }
192 
194  {
195  // error checking
196  if (eid == dtn::data::EID())
197  {
198  throw NativeSessionException("given endpoint is not acceptable");
199  }
200  else
201  {
202  _registration.unsubscribe(eid);
203  }
204 
205  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registration " << eid.getString() << " removed" << IBRCOMMON_LOGGER_ENDL;
206  }
207 
209  {
210  resetEndpoint();
211 
212  const std::set<dtn::data::EID> subs = _registration.getSubscriptions();
213  for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
214  const dtn::data::EID &e = (*it);
215  if (e != _endpoint) {
216  _registration.unsubscribe(e);
217  }
218  }
219 
220  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registrations cleared" << IBRCOMMON_LOGGER_ENDL;
221  }
222 
223  std::vector<std::string> NativeSession::getSubscriptions() throw ()
224  {
225  const std::set<dtn::data::EID> subs = _registration.getSubscriptions();
226  std::vector<std::string> ret;
227  for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
228  ret.push_back((*it).getString());
229  }
230  return ret;
231  }
232 
234  {
235  try {
236  const dtn::data::BundleID id = _bundle_queue.take();
237 
238  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Next bundle in queue is " << id.toString() << IBRCOMMON_LOGGER_ENDL;
239 
240  load(ri, id);
241  } catch (const ibrcommon::QueueUnblockedException &ex) {
242  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "No next bundle available" << IBRCOMMON_LOGGER_ENDL;
243  throw BundleNotFoundException();
244  }
245  }
246 
248  {
249  // load the bundle
250  try {
251  _bundle[ri] = dtn::core::BundleCore::getInstance().getStorage().get(id);
252 
253  try {
254  // process the bundle block (security, compression, ...)
256 
257  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << id.toString() << " loaded" << IBRCOMMON_LOGGER_ENDL;
258  } catch (const ibrcommon::Exception &e) {
259  // clear the register
260  _bundle[ri] = dtn::data::Bundle();
261  IBRCOMMON_LOGGER_TAG(NativeSession::TAG, warning) << "Failed to process bundle " << id.toString() << ": " << e.what() << IBRCOMMON_LOGGER_ENDL;
262  }
263  } catch (const ibrcommon::Exception &ex) {
264  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Failed to load bundle " << id.toString() << ", Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
265  throw BundleNotFoundException();
266  }
267  }
268 
270  {
271  ibrcommon::MutexLock l(_cb_mutex);
272  if (_serializer_cb == NULL) return;
273 
274  NativeSerializer serializer(*_serializer_cb, NativeSerializer::BUNDLE_FULL);
275  try {
276  serializer << _bundle[ri];
277  } catch (const ibrcommon::Exception &ex) {
278  IBRCOMMON_LOGGER_TAG(NativeSession::TAG, error) << "Get failed " << ex.what() << IBRCOMMON_LOGGER_ENDL;
279  }
280  }
281 
283  {
284  ibrcommon::MutexLock l(_cb_mutex);
285  if (_serializer_cb == NULL) return;
286 
287  NativeSerializer serializer(*_serializer_cb, NativeSerializer::BUNDLE_INFO);
288  try {
289  serializer << _bundle[ri];
290  } catch (const ibrcommon::Exception &ex) {
291  IBRCOMMON_LOGGER_TAG(NativeSession::TAG, error) << "Get failed " << ex.what() << IBRCOMMON_LOGGER_ENDL;
292  }
293  }
294 
296  {
297  try {
299  _bundle[ri] = dtn::data::Bundle();
300  } catch (const ibrcommon::Exception&) {
301  throw BundleNotFoundException();
302  }
303  }
304 
306  {
307  _bundle[ri] = dtn::data::Bundle();
308  }
309 
311  {
312  try {
313  // announce this bundle as delivered
315  _registration.delivered(meta);
316 
317  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << id.toString() << " marked as delivered" << IBRCOMMON_LOGGER_ENDL;
318  } catch (const ibrcommon::Exception&) {
319  throw BundleNotFoundException();
320  }
321  }
322 
324  {
325  // forward the bundle to the storage processing
326  dtn::api::Registration::processIncomingBundle(_endpoint, _bundle[ri]);
327 
328  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << _bundle[ri].toString() << " sent" << IBRCOMMON_LOGGER_ENDL;
329 
330  return _bundle[ri];
331  }
332 
334  {
335  // Copy the given bundle into the local register
336  _bundle[ri] = b;
337  }
338 
340  {
341  // clear all blocks in the register
342  _bundle[ri].clear();
343 
344  // Copy the given primary block into the local register
345  ((dtn::data::PrimaryBlock&)_bundle[ri]) = p;
346  }
347 
348  void NativeSession::write(RegisterIndex ri, const char *buf, const size_t len, const size_t offset) throw ()
349  {
350  try {
351  dtn::data::PayloadBlock &payload = _bundle[ri].find<dtn::data::PayloadBlock>();
352 
353  ibrcommon::BLOB::Reference ref = payload.getBLOB();
354  ibrcommon::BLOB::iostream stream = ref.iostream();
355 
356  std::streamsize stream_size = stream.size();
357 
358  if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
359  (*stream).seekp(0, std::ios_base::end);
360  } else {
361  (*stream).seekp(offset);
362  }
363 
364  (*stream).write(buf, len);
365  (*stream) << std::flush;
367  dtn::data::PayloadBlock &payload = _bundle[ri].push_back<dtn::data::PayloadBlock>();
368 
369  ibrcommon::BLOB::Reference ref = payload.getBLOB();
370  ibrcommon::BLOB::iostream stream = ref.iostream();
371 
372  std::streamsize stream_size = stream.size();
373 
374  if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
375  (*stream).seekp(0, std::ios_base::end);
376  } else {
377  (*stream).seekp(offset);
378  }
379  (*stream).write(buf, len);
380  (*stream) << std::flush;
381  }
382 
383  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << len << " bytes added to the payload" << IBRCOMMON_LOGGER_ENDL;
384  }
385 
386  void NativeSession::read(RegisterIndex ri, char *buf, size_t &len, const size_t offset) throw ()
387  {
388  try {
389  dtn::data::PayloadBlock &payload = _bundle[ri].find<dtn::data::PayloadBlock>();
390 
391  ibrcommon::BLOB::Reference ref = payload.getBLOB();
392  ibrcommon::BLOB::iostream stream = ref.iostream();
393 
394  (*stream).seekg(offset);
395  (*stream).read(buf, len);
396 
397  len = (*stream).gcount();
399  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << "no payload block available" << IBRCOMMON_LOGGER_ENDL;
400  len = 0;
401  }
402  }
403 
404  NativeSession::BundleReceiver::BundleReceiver(NativeSession &session)
405  : _session(session)
406  {
407  }
408 
409  NativeSession::BundleReceiver::~BundleReceiver()
410  {
411  }
412 
413  void NativeSession::BundleReceiver::raiseEvent(const dtn::routing::QueueBundleEvent &queued) throw ()
414  {
415  // ignore fragments - we can not deliver them directly to the client
416  if (queued.bundle.isFragment()) return;
417 
418  if (_session._registration.hasSubscribed(queued.bundle.destination))
419  {
420  _session._registration.notify(Registration::NOTIFY_BUNDLE_AVAILABLE);
421  }
422  }
423 
425  {
426  Registration &reg = _registration;
427  try {
428  try {
429  const dtn::data::MetaBundle id = reg.receiveMetaBundle();
430 
432  // transform custody signals & status reports into notifies
433  fireNotificationAdministrativeRecord(id);
434 
435  // announce the delivery of this bundle
436  reg.delivered(id);
437  } else {
438  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for new bundle " << id.toString() << IBRCOMMON_LOGGER_ENDL;
439 
440  // put the bundle into the API queue
441  _bundle_queue.push(id);
442 
443  // notify the client about the new bundle
444  fireNotificationBundle(id);
445  }
446  } catch (const dtn::storage::NoBundleFoundException&) {
447  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << "no more bundles found - wait until we are notified" << IBRCOMMON_LOGGER_ENDL;
448  reg.wait_for_bundle();
449  }
450  } catch (const ibrcommon::QueueUnblockedException &ex) {
451  throw NativeSessionException(std::string("loop aborted - ") + ex.what());
452  } catch (const std::exception &ex) {
453  throw NativeSessionException(std::string("loop aborted - ") + ex.what());
454  }
455  }
456 
457  void NativeSession::fireNotificationAdministrativeRecord(const dtn::data::MetaBundle &bundle)
458  {
459  // load the whole bundle
461 
462  // get the payload block of the bundle
464 
465  try {
466  // try to decode as status report
468  report.read(payload);
469 
470  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for status report" << IBRCOMMON_LOGGER_ENDL;
471 
472  // fire the status report notification
473  fireNotificationStatusReport(b.source, report);
475  // this is not a status report
476  }
477 
478  try {
479  // try to decode as custody signal
481  custody.read(payload);
482 
483  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for custody signal" << IBRCOMMON_LOGGER_ENDL;
484 
485  // fire the custody signal notification
486  fireNotificationCustodySignal(b.source, custody);
488  // this is not a custody report
489  }
490  }
491 
492  const std::string& NativeSession::getHandle() const
493  {
494  return _registration.getHandle();
495  }
496  } /* namespace net */
497 } /* namespace dtn */
std::string toString() const
Definition: BundleID.cpp:190
void delivered(const dtn::data::MetaBundle &m) const
void read(RegisterIndex ri, char *buf, size_t &len, const size_t offset=0)
void setEndpoint(const std::string &suffix)
static dtn::data::EID local
Definition: BundleCore.h:79
static void add(EventReceiver< E > *receiver)
virtual void read(const dtn::data::PayloadBlock &p)
void subscribe(const dtn::data::EID &endpoint)
virtual void read(const dtn::data::PayloadBlock &p)
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
const std::string & getHandle() const
static void remove(const EventReceiver< E > *receiver)
void removeEndpoint(const std::string &suffix)
std::vector< std::string > getSubscriptions()
void write(RegisterIndex ri, const char *buf, const size_t len, const size_t offset=std::string::npos)
const dtn::data::EID & getNodeEID() const
dtn::data::MetaBundle receiveMetaBundle()
const dtn::data::EID & getDefaultEID() const
NativeSession(NativeSessionCallback *session_cb, NativeSerializerCallback *serializer_cb)
void load(RegisterIndex ri, const dtn::data::BundleID &id)
void removeRegistration(const dtn::data::EID &eid)
virtual void remove(const dtn::data::BundleID &id)=0
virtual dtn::data::MetaBundle info(const dtn::data::BundleID &id)=0
void unsubscribe(const dtn::data::EID &endpoint)
void put(RegisterIndex ri, const dtn::data::Bundle &b)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
void next(RegisterIndex ri)
void free(RegisterIndex ri)
const std::set< dtn::data::EID > getSubscriptions()
void clear(RegisterIndex ri)
static void processBlocks(dtn::data::Bundle &b)
Definition: BundleCore.cpp:641
std::string getString() const
Definition: EID.cpp:374
void wait_for_bundle(size_t timeout=0)
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
ibrcommon::BLOB::Reference getBLOB() const
void delivered(const dtn::data::BundleID &id) const
void addEndpoint(const std::string &suffix)
const std::string & getHandle() const
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
void getInfo(RegisterIndex ri)
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
void get(RegisterIndex ri)
void addRegistration(const dtn::data::EID &eid)
dtn::data::EID source
Definition: BundleID.h:53
dtn::data::BundleID send(RegisterIndex ri)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82