IBR-DTN  1.0.0
FragmentManager.cpp
Go to the documentation of this file.
1 /*
2  * FragmentManager.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "core/EventDispatcher.h"
24 #include "core/FragmentManager.h"
25 #include "core/BundleCore.h"
26 #include "core/BundlePurgeEvent.h"
28 
30 #include <ibrdtn/utils/Clock.h>
31 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/thread/MutexLock.h>
33 
34 #include <ibrdtn/ibrdtn.h>
35 #ifdef IBRDTN_SUPPORT_BSP
37 #endif
38 
39 namespace dtn
40 {
41  namespace core
42  {
43  const std::string FragmentManager::TAG = "FragmentManager";
44 
45  ibrcommon::Mutex FragmentManager::_offsets_mutex;
46  std::set<FragmentManager::Transmission> FragmentManager::_offsets;
47 
49  : _running(false)
50  {
51  }
52 
54  {
55  }
56 
57  const std::string FragmentManager::getName() const
58  {
59  return "FragmentManager";
60  }
61 
63  {
64  _running = false;
65  _incoming.abort();
66  }
67 
69  {
70  // routine checked for throw() on 15.02.2013
72  _running = true;
73  }
74 
76  {
77  // get reference to the storage
79 
80  // TODO: scan storage for fragments to reassemble on startup
81 
83 
84  // create a task loop to reassemble fragments asynchronously
85  try {
86  while (_running)
87  {
88  dtn::data::MetaBundle meta = _incoming.poll();
89 
90  // skip merge if complete bundle is already in the storage
91  dtn::data::BundleID origin(meta);
92  origin.setFragment(false);
93  if (storage.contains(origin)) continue;
94 
95  // search for matching bundles
96  list.clear();
97  search(meta, list);
98 
99  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "found " << list.size() << " fragments similar to bundle " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
100 
101  // TODO: drop fragments if other fragments available containing the same payload or larger payload
102 
103  // check first if all fragment are available
104  std::set<BundleMerger::Chunk> chunks;
105  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
106  {
107  const dtn::data::MetaBundle &m = (*iter);
108  if (meta.getPayloadLength() > 0)
109  {
111  chunks.insert(chunk);
112  }
113  }
114 
115  // wait for the next bundle if the fragment is not complete
116  if (!BundleMerger::Chunk::isComplete(meta.appdatalength.get<dtn::data::Length>(), chunks)) continue;
117 
118  // create a new bundle merger container
120 
121  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
122  {
123  const dtn::data::MetaBundle &meta = (*iter);
124 
125  if (meta.getPayloadLength() > 0)
126  {
127  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "fragment: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL;
128 
129  try {
130  // load bundle from storage
132 
133  // merge the bundle
134  c << bundle;
135  } catch (const dtn::storage::NoBundleFoundException&) {
136  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) << "could not load fragment to merge bundle" << IBRCOMMON_LOGGER_ENDL;
137  };
138  }
139  }
140 
141  if (c.isComplete())
142  {
143  dtn::data::Bundle &merged = c.getBundle();
144 
145  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, notice) << "Bundle " << merged.toString() << " merged" << IBRCOMMON_LOGGER_ENDL;
146 
147  // pass merged bundle through the filter
148  FilterContext context;
149  context.setBundle(merged);
150  if (BundleCore::getInstance().filter(BundleFilter::INPUT, context, merged) == BundleFilter::ACCEPT)
151  {
152  // raise default bundle received event
154  }
155 
156  // delete all fragments of the merged bundle
158  {
159  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
160  {
162  }
163  }
164  }
165  }
166  } catch (const ibrcommon::QueueUnblockedException&) { }
167  }
168 
170  {
172 
173  stop();
174  join();
175  }
176 
178  {
179  // process fragments
180  if (!queued.bundle.isFragment()) return;
181 
182  // do not merge a bundle if it is non-local and singleton
183  // we only touch local and group bundles which might be delivered locally
185  {
186  if (!queued.bundle.destination.sameHost(dtn::core::BundleCore::local))
187  {
188  return;
189  }
190  }
191 
192  // push the meta bundle into the incoming queue
193  _incoming.push(queued.bundle);
194  }
195 
196  void FragmentManager::search(const dtn::data::MetaBundle &meta, dtn::storage::BundleResult &list)
197  {
199  {
200  public:
202  : _similar(meta)
203  {};
204 
205  virtual ~BundleFilter() {};
206 
207  virtual dtn::data::Size limit() const throw () { return 0; };
208 
209  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
210  {
211  // fragments only
212  if (!meta.isFragment()) return false;
213 
214  // with the same unique bundle id
215  if (meta.source != _similar.source) return false;
216  if (meta.timestamp != _similar.timestamp) return false;
217  if (meta.sequencenumber != _similar.sequencenumber) return false;
218 
219  return true;
220  };
221 
222  private:
223  const dtn::data::MetaBundle &_similar;
224  };
225 
226  // create a bundle filter
227  BundleFilter filter(meta);
229 
230  try {
231  storage.get(filter, list);
232  } catch (const dtn::storage::NoBundleFoundException&) { }
233  }
234 
235  void FragmentManager::setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset) throw ()
236  {
237  try {
238  Transmission t;
240  t.offset = get_payload_offset(b, abs_offset, frag_offset);
241 
242  if (t.offset <= 0) return;
243 
244  t.id = id;
245  t.peer = peer;
246  t.expires = dtn::utils::Clock::getExpireTime( b );
247 
248  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 4) << "Store offset of partial transmitted bundle " << id.toString() << " to " << peer.getString() <<
249  ", offset: " << t.offset << " (" << abs_offset << ")" << IBRCOMMON_LOGGER_ENDL;
250 
251  ibrcommon::MutexLock l(_offsets_mutex);
252  _offsets.erase(t);
253  _offsets.insert(t);
254  } catch (const dtn::storage::NoBundleFoundException&) { };
255  }
256 
258  {
259  ibrcommon::MutexLock l(_offsets_mutex);
260  for (std::set<Transmission>::const_iterator iter = _offsets.begin(); iter != _offsets.end(); ++iter)
261  {
262  const Transmission &t = (*iter);
263  if (t.peer != peer) continue;
264  if (t.id != id) continue;
265  return t.offset;
266  }
267 
268  return 0;
269  }
270 
271  dtn::data::Length FragmentManager::get_payload_offset(const dtn::data::Bundle &bundle, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset) throw ()
272  {
273  try {
274  // build the dictionary for EID lookup
275  const dtn::data::Dictionary dict(bundle);
276 
277  PrimaryBlock prim = bundle;
278 
279  if (frag_offset > 0) {
280  prim.fragmentoffset += frag_offset;
282  prim.appdatalength = bundle.find<dtn::data::PayloadBlock>().getLength();
284  }
285  }
286 
287  // create a default serializer
288  dtn::data::DefaultSerializer serializer(std::cout, dict);
289 
290  // get the encoded length of the primary block
291  dtn::data::Length header = serializer.getLength(prim);
292 
293  for (dtn::data::Bundle::const_iterator iter = bundle.begin(); iter != bundle.end(); ++iter)
294  {
295  const dtn::data::Block &b = (**iter);
296  header += serializer.getLength(b);
297 
298  try {
299  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(b);
300  header -= payload.getLength();
301  if (abs_offset < header) return 0;
302  return frag_offset + (abs_offset - header);
303  } catch (std::bad_cast&) { };
304  }
305  } catch (const dtn::InvalidDataException&) {
306  // failure while calculating the bundle length
307  }
308 
309  return 0;
310  }
311 
312  void FragmentManager::expire_offsets(const dtn::data::Timestamp &timestamp)
313  {
314  ibrcommon::MutexLock l(_offsets_mutex);
315  for (std::set<Transmission>::iterator iter = _offsets.begin(); iter != _offsets.end();)
316  {
317  const Transmission &t = (*iter);
318  if (t.expires >= timestamp) return;
319  _offsets.erase(iter++);
320  }
321  }
322 
323  void FragmentManager::split(const dtn::data::Bundle &bundle, const dtn::data::Length &maxPayloadLength, std::list<dtn::data::Bundle> &fragments) throw (FragmentationAbortedException)
324  {
325  // get bundle DONT_FRAGMENT Flag
327  throw FragmentationProhibitedException("Bundle fragmentation is restricted by do-not-fragment bit.");
328 
329  try {
330  const dtn::data::PayloadBlock &payloadBlock = bundle.find<dtn::data::PayloadBlock>();
331 
332  // get bundle payload length
333  dtn::data::Length payloadLength = payloadBlock.getLength();
334 
335  // check if fragmentation needed
336  if (payloadLength <= maxPayloadLength)
337  throw FragmentationNotNecessaryException("Fragmentation not necessary. The payload block is smaller than the max. payload length.");
338 
339  // copy the origin bundle as template for the new fragment
340  Bundle fragment = bundle;
341 
342  // clear all the blocks
343  fragment.clear();
344 
345  // set bundle is fragment flag
346  fragment.set(dtn::data::PrimaryBlock::FRAGMENT, true);
347 
348  // set application data length
349  fragment.appdatalength = payloadLength;
350 
351  ibrcommon::BLOB::Reference ref = payloadBlock.getBLOB();
352  ibrcommon::BLOB::iostream stream = ref.iostream();
353 
354  bool isFirstFragment = true;
355  dtn::data::Length offset = 0;
356 
357  while (!(*stream).eof() && (payloadLength > offset))
358  {
359  // clear all the blocks
360  fragment.clear();
361 
362  // set fragment offset
363  fragment.fragmentoffset = offset;
364 
365  // copy partial payload to the payload of the fragment
366  try {
367  // create new blob for fragment payload
368  ibrcommon::BLOB::Reference fragment_ref = ibrcommon::BLOB::create();
369 
370  // get the iostream object
371  ibrcommon::BLOB::iostream fragment_stream = fragment_ref.iostream();
372 
373  if ((offset + maxPayloadLength) > payloadLength) {
374  // copy payload to the fragment
375  ibrcommon::BLOB::copy(*fragment_stream, *stream, payloadLength - offset, 65535);
376  } else {
377  // copy payload to the fragment
378  ibrcommon::BLOB::copy(*fragment_stream, *stream, maxPayloadLength, 65535);
379  }
380 
381  // set new offset position
382  offset += fragment_stream.size();
383 
384  // create fragment payload block
385  dtn::data::PayloadBlock &fragment_payloadBlock = fragment.push_back(fragment_ref);
386 
387  // add all necessary blocks from the bundle to the fragment
388  addBlocksFromBundleToFragment(bundle, fragment, fragment_payloadBlock, isFirstFragment, payloadLength == offset);
389  } catch (const ibrcommon::IOException &ex) {
390  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) << "error while splitting bundle into fragments: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
391  }
392 
393  // add current fragment to fragments list
394  fragments.push_back(fragment);
395 
396  if (isFirstFragment) isFirstFragment = false;
397 
398  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Fragment created: " << fragment.toString() << IBRCOMMON_LOGGER_ENDL;
399  }
401  // bundle has no payload
402  throw FragmentationAbortedException("Fragmentation aborted. No payload block found.");
403  }
404  }
405 
406  void FragmentManager::addBlocksFromBundleToFragment(const dtn::data::Bundle &bundle, dtn::data::Bundle &fragment, dtn::data::PayloadBlock &fragment_payloadBlock, bool isFirstFragment, bool isLastFragment)
407  {
408  bool isAfterPayload = false;
409  bool isReplicateInEveryBundle = false;
410 
411  char block_type = 0;
412 
413  IBRCOMMON_LOGGER_DEBUG_TAG("FragmentManager", 5) << "Fragment original bundle block count: " << fragment.toString() << " " << bundle.size() << IBRCOMMON_LOGGER_ENDL;
414 
415  //check for each block if it has to be added to the fragment
416  for (dtn::data::Bundle::const_iterator it = bundle.begin(); it != bundle.end(); ++it)
417  {
418  //get the current block
419  const Block &current_block = dynamic_cast<const Block&>(**it);
420 
421  block_type = current_block.getType();
422 
423  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Fragment block found: " << fragment.toString() << " " << (unsigned int)block_type << IBRCOMMON_LOGGER_ENDL;
424 
425 
426  if (block_type == dtn::data::PayloadBlock::BLOCK_TYPE)
427  {
428  isAfterPayload = true;
429  }
430  else
431  {
432  isReplicateInEveryBundle = current_block.get(dtn::data::Block::REPLICATE_IN_EVERY_FRAGMENT);
433 
434  //if block is before payload
435  //add if fragment is the first one
436  //or if ReplicateInEveryBundle Flag is set
437  if (!isAfterPayload && (isFirstFragment || isReplicateInEveryBundle))
438  {
439  try
440  { //get factory
442 
443  //insert new Block before payload block
444  dtn::data::Block &fragment_block = fragment.insert(fragment.find(fragment_payloadBlock), f);
445 
446  //copy block
447  fragment_block = current_block;
448 
449  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block before payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
450  }
451  catch(const ibrcommon::Exception &ex)
452  {
453  //insert new Block before payload block
454  dtn::data::Block &fragment_block = fragment.insert<dtn::data::ExtensionBlock>(fragment.find(fragment_payloadBlock));
455 
456  //copy block
457  fragment_block = current_block;
458 
459  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block before payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
460  }
461 
462  }
463  //if block is after payload
464  //add if fragment is the last one
465  //or if ReplicateInEveryBundle Flag is set
466  else if (isAfterPayload && (isLastFragment || isReplicateInEveryBundle))
467  {
468  try
469  { //get factory
471 
472  //push back new Block after payload block
473  dtn::data::Block &fragment_block = fragment.push_back(f);
474 
475  //copy block
476  fragment_block = current_block;
477 
478  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block after payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
479  }
480  catch (const ibrcommon::Exception &ex)
481  {
482  //push back new Block after payload block
483  dtn::data::Block &fragment_block = fragment.push_back<dtn::data::ExtensionBlock>();
484 
485  //copy block
486  fragment_block = current_block;
487 
488  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block after payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
489  }
490  }
491  }
492  }
493 
494  }
495 
496  FragmentManager::Transmission::Transmission()
497  : offset(0), expires(0)
498  {
499  }
500 
501  FragmentManager::Transmission::~Transmission()
502  {
503  }
504 
505  bool FragmentManager::Transmission::operator<(const Transmission &other) const
506  {
507  if (expires < other.expires) return true;
508  if (expires != other.expires) return false;
509 
510  if (peer < other.peer) return true;
511  if (peer != other.peer) return false;
512 
513  return (id < other.id);
514  }
515 
516  bool FragmentManager::Transmission::operator==(const Transmission &other) const
517  {
518  if (expires != other.expires) return false;
519  if (peer != other.peer) return false;
520  if (id != other.id) return false;
521 
522  return true;
523  }
524  } /* namespace core */
525 } /* namespace dtn */
std::string toString() const
Definition: BundleID.cpp:190
static dtn::data::EID local
Definition: BundleCore.h:79
static void add(EventReceiver< E > *receiver)
bool get(ProcFlags flag) const
Definition: Block.cpp:82
static void split(const dtn::data::Bundle &bundle, const dtn::data::Length &maxPayloadLength, std::list< dtn::data::Bundle > &fragments)
size_t Length
Definition: Number.h:33
void setBundle(const dtn::data::Bundle &data)
dtn::data::Timestamp timestamp
Definition: BundleID.h:54
static Container getContainer()
static dtn::data::Timestamp getExpireTime(const dtn::data::Bundle &b)
Definition: Clock.cpp:91
virtual bool contains(const dtn::data::BundleID &id)=0
static void remove(const EventReceiver< E > *receiver)
static void setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset)
iterator begin()
Definition: Bundle.cpp:49
virtual dtn::data::Length getPayloadLength() const
Definition: BundleID.cpp:112
virtual void setFragment(bool val)
Definition: BundleID.cpp:127
T & insert(iterator before)
Definition: Bundle.h:200
const std::string getName() const
dtn::data::Number sequencenumber
Definition: BundleID.h:55
bool get(FLAGS flag) const
bool _running
Definition: dtninbox.cpp:122
block_list::const_iterator const_iterator
Definition: Bundle.h:77
T get() const
Definition: SDNV.h:113
virtual Length getLength() const
bool isFragment() const
Definition: MetaBundle.cpp:165
void raiseEvent(const dtn::routing::QueueBundleEvent &evt)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
Size size() const
Definition: Bundle.cpp:258
static Factory & get(block_t type)
static const dtn::data::block_t BLOCK_TYPE
Definition: PayloadBlock.h:38
virtual Length getLength() const =0
size_t Size
Definition: Number.h:34
T & push_back()
Definition: Bundle.h:180
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
ibrcommon::BLOB::Reference getBLOB() const
static void raise(const dtn::data::MetaBundle &meta, REASON_CODE reason=DELIVERED)
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
const block_t & getType() const
Definition: Block.h:73
iterator end()
Definition: Bundle.cpp:54
dtn::data::Number fragmentoffset
Definition: BundleID.h:57
static dtn::data::Length getOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id)
void set(FLAGS flag, bool value)
dtn::data::EID source
Definition: BundleID.h:53
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82