31 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/thread/MutexLock.h>
35 #ifdef IBRDTN_SUPPORT_BSP
43 const std::string FragmentManager::TAG =
"FragmentManager";
45 ibrcommon::Mutex FragmentManager::_offsets_mutex;
46 std::set<FragmentManager::Transmission> FragmentManager::_offsets;
59 return "FragmentManager";
93 if (storage.
contains(origin))
continue;
99 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) <<
"found " << list.size() <<
" fragments similar to bundle " << meta.
toString() << IBRCOMMON_LOGGER_ENDL;
104 std::set<BundleMerger::Chunk> chunks;
105 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
111 chunks.insert(chunk);
121 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
127 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) <<
"fragment: " << (*iter).
toString() << IBRCOMMON_LOGGER_ENDL;
136 IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) <<
"could not load fragment to merge bundle" << IBRCOMMON_LOGGER_ENDL;
145 IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, notice) <<
"Bundle " << merged.
toString() <<
" merged" << IBRCOMMON_LOGGER_ENDL;
159 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
166 }
catch (
const ibrcommon::QueueUnblockedException&) { }
180 if (!queued.bundle.isFragment())
return;
193 _incoming.push(queued.bundle);
215 if (meta.
source != _similar.source)
return false;
216 if (meta.
timestamp != _similar.timestamp)
return false;
231 storage.
get(filter, list);
240 t.offset = get_payload_offset(b, abs_offset, frag_offset);
242 if (t.offset <= 0)
return;
248 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 4) <<
"Store offset of partial transmitted bundle " <<
id.toString() <<
" to " << peer.getString() <<
249 ", offset: " << t.offset <<
" (" << abs_offset <<
")" << IBRCOMMON_LOGGER_ENDL;
251 ibrcommon::MutexLock l(_offsets_mutex);
259 ibrcommon::MutexLock l(_offsets_mutex);
260 for (std::set<Transmission>::const_iterator iter = _offsets.begin(); iter != _offsets.end(); ++iter)
262 const Transmission &t = (*iter);
263 if (t.peer != peer)
continue;
264 if (t.id !=
id)
continue;
279 if (frag_offset > 0) {
301 if (abs_offset < header)
return 0;
302 return frag_offset + (abs_offset - header);
303 }
catch (std::bad_cast&) { };
314 ibrcommon::MutexLock l(_offsets_mutex);
315 for (std::set<Transmission>::iterator iter = _offsets.begin(); iter != _offsets.end();)
317 const Transmission &t = (*iter);
318 if (t.expires >= timestamp)
return;
319 _offsets.erase(iter++);
336 if (payloadLength <= maxPayloadLength)
351 ibrcommon::BLOB::Reference ref = payloadBlock.
getBLOB();
352 ibrcommon::BLOB::iostream stream = ref.iostream();
354 bool isFirstFragment =
true;
357 while (!(*stream).eof() && (payloadLength > offset))
368 ibrcommon::BLOB::Reference fragment_ref = ibrcommon::BLOB::create();
371 ibrcommon::BLOB::iostream fragment_stream = fragment_ref.iostream();
373 if ((offset + maxPayloadLength) > payloadLength) {
375 ibrcommon::BLOB::copy(*fragment_stream, *stream, payloadLength - offset, 65535);
378 ibrcommon::BLOB::copy(*fragment_stream, *stream, maxPayloadLength, 65535);
382 offset += fragment_stream.size();
388 addBlocksFromBundleToFragment(bundle, fragment, fragment_payloadBlock, isFirstFragment, payloadLength == offset);
389 }
catch (
const ibrcommon::IOException &ex) {
390 IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) <<
"error while splitting bundle into fragments: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
394 fragments.push_back(fragment);
396 if (isFirstFragment) isFirstFragment =
false;
398 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) <<
"Fragment created: " << fragment.
toString() << IBRCOMMON_LOGGER_ENDL;
408 bool isAfterPayload =
false;
409 bool isReplicateInEveryBundle =
false;
413 IBRCOMMON_LOGGER_DEBUG_TAG(
"FragmentManager", 5) <<
"Fragment original bundle block count: " << fragment.
toString() <<
" " << bundle.
size() << IBRCOMMON_LOGGER_ENDL;
419 const Block ¤t_block =
dynamic_cast<const Block&
>(**it);
421 block_type = current_block.
getType();
423 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) <<
"Fragment block found: " << fragment.
toString() <<
" " << (
unsigned int)block_type << IBRCOMMON_LOGGER_ENDL;
428 isAfterPayload =
true;
437 if (!isAfterPayload && (isFirstFragment || isReplicateInEveryBundle))
447 fragment_block = current_block;
449 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) <<
"Added block before payload: " << fragment.
toString()<<
" " << block_type << IBRCOMMON_LOGGER_ENDL;
451 catch(
const ibrcommon::Exception &ex)
457 fragment_block = current_block;
459 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) <<
"Added block before payload: " << fragment.
toString()<<
" " << block_type << IBRCOMMON_LOGGER_ENDL;
466 else if (isAfterPayload && (isLastFragment || isReplicateInEveryBundle))
476 fragment_block = current_block;
478 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) <<
"Added block after payload: " << fragment.
toString()<<
" " << block_type << IBRCOMMON_LOGGER_ENDL;
480 catch (
const ibrcommon::Exception &ex)
486 fragment_block = current_block;
488 IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) <<
"Added block after payload: " << fragment.
toString()<<
" " << block_type << IBRCOMMON_LOGGER_ENDL;
496 FragmentManager::Transmission::Transmission()
497 : offset(0), expires(0)
501 FragmentManager::Transmission::~Transmission()
505 bool FragmentManager::Transmission::operator<(
const Transmission &other)
const
507 if (expires < other.expires)
return true;
508 if (expires != other.expires)
return false;
510 if (peer < other.peer)
return true;
511 if (peer != other.peer)
return false;
513 return (
id < other.id);
516 bool FragmentManager::Transmission::operator==(
const Transmission &other)
const
518 if (expires != other.expires)
return false;
519 if (peer != other.peer)
return false;
520 if (
id != other.id)
return false;
std::string toString() const
static dtn::data::EID local
static void add(EventReceiver< E > *receiver)
bool get(ProcFlags flag) const
static void split(const dtn::data::Bundle &bundle, const dtn::data::Length &maxPayloadLength, std::list< dtn::data::Bundle > &fragments)
void setBundle(const dtn::data::Bundle &data)
dtn::data::Timestamp timestamp
static Container getContainer()
static dtn::data::Timestamp getExpireTime(const dtn::data::Bundle &b)
virtual bool contains(const dtn::data::BundleID &id)=0
static void remove(const EventReceiver< E > *receiver)
static void setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset)
virtual dtn::data::Length getPayloadLength() const
virtual void setFragment(bool val)
T & insert(iterator before)
const std::string getName() const
dtn::data::Number sequencenumber
bool get(FLAGS flag) const
block_list::const_iterator const_iterator
virtual Length getLength() const
void raiseEvent(const dtn::routing::QueueBundleEvent &evt)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
static Factory & get(block_t type)
static const dtn::data::block_t BLOCK_TYPE
virtual Length getLength() const =0
iterator find(block_t blocktype)
ibrcommon::BLOB::Reference getBLOB() const
static void raise(const dtn::data::MetaBundle &meta, REASON_CODE reason=DELIVERED)
dtn::storage::BundleStorage & getStorage()
const block_t & getType() const
dtn::data::Number fragmentoffset
static dtn::data::Length getOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id)
void set(FLAGS flag, bool value)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
virtual ~FragmentManager()
static BundleCore & getInstance()