25 #include <ibrcommon/Logger.h>
26 #include <ibrcommon/TimeMeasurement.h>
34 _chunk_size(chunk_size), _chunk_payload(ibrcommon::BLOB::create()), _chunk_offset(0), _in_seq(0),
35 _out_seq(0), _streaming(wait_seq_zero), _first_chunk(true), _last_chunk_received(false), _timeout_receive(0)
39 setp(&_in_buf[0], &_in_buf[
BUFF_SIZE - 1]);
48 int ret = std::char_traits<char>::eq_int_type(this->
overflow(
49 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
60 char *ibegin = &_in_buf[0];
64 setp(&_in_buf[0], &_in_buf[0] +
BUFF_SIZE - 1);
66 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
68 *iend++ = std::char_traits<char>::to_char_type(c);
72 if ((iend - ibegin) == 0)
74 return std::char_traits<char>::not_eof(c);
78 BundleStreamBuf::append(_chunk_payload, &_in_buf[0], iend - ibegin);
81 if (_chunk_payload.size() >
static_cast<std::streamsize
>(_chunk_size))
86 return std::char_traits<char>::not_eof(c);
89 void BundleStreamBuf::flushPayload(
bool final)
93 if ((_first_chunk) && (_chunk_payload.size() == 0))
105 if (_first_chunk) _first_chunk =
false;
114 _chunk_payload = ibrcommon::BLOB::create();
127 _timeout_receive = timeout;
130 void BundleStreamBuf::append(ibrcommon::BLOB::Reference &ref,
const char* data,
const dtn::data::Length &length)
132 ibrcommon::BLOB::iostream stream = ref.iostream();
133 (*stream).seekp(0, ios::end);
134 (*stream).write(data, length);
140 if (_last_chunk_received)
142 return std::char_traits<char>::eof();
146 while (_chunks.empty())
151 IBRCOMMON_LOGGER_DEBUG_TAG(
"BundleStreamBuf", 40) <<
"bundle received" << IBRCOMMON_LOGGER_ENDL;
156 if (c._seq >= _in_seq)
158 IBRCOMMON_LOGGER_DEBUG_TAG(
"BundleStreamBuf", 40) <<
"bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL;
163 ibrcommon::TimeMeasurement tm;
167 while (_in_seq != (*_chunks.begin())._seq)
172 IBRCOMMON_LOGGER_DEBUG_TAG(
"BundleStreamBuf", 40) <<
"bundle received" << IBRCOMMON_LOGGER_ENDL;
177 if (c._seq >= _in_seq)
179 IBRCOMMON_LOGGER_DEBUG_TAG(
"BundleStreamBuf", 40) <<
"bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL;
182 }
catch (std::exception&) {
187 if (((_timeout_receive > 0) && (tm.getSeconds() > _timeout_receive)) || !_streaming)
190 _in_seq = (*_chunks.begin())._seq;
197 IBRCOMMON_LOGGER_DEBUG_TAG(
"BundleStreamBuf", 40) <<
"read the payload" << IBRCOMMON_LOGGER_ENDL;
200 const Chunk &c = (*_chunks.begin());
202 if (c._meta != _current_bundle)
206 _current_bundle = storage.
get(c._meta);
213 ibrcommon::BLOB::Reference r = payload.
getBLOB();
215 bool end_of_stream =
false;
216 std::streamsize bytes = 0;
221 ibrcommon::BLOB::iostream stream = r.iostream();
224 (*stream).seekg(_chunk_offset, ios::beg);
230 bytes = (*stream).gcount();
233 end_of_stream = (*stream).eof();
244 _last_chunk_received =
true;
268 _chunk_offset += bytes;
273 setg(&_out_buf[0], &_out_buf[0], &_out_buf[0] + bytes);
275 return std::char_traits<char>::not_eof(_out_buf[0]);
279 : _meta(m), _seq(0), _first(false), _last(false)
292 BundleStreamBuf::Chunk::~Chunk()
296 bool BundleStreamBuf::Chunk::operator==(
const Chunk& other)
const
298 return (_seq == other._seq);
301 bool BundleStreamBuf::Chunk::operator<(
const Chunk& other)
const
303 return (_seq < other._seq);
void setTimeout(const dtn::data::Timeout &timeout)
virtual ~BundleStreamBuf()
BundleStreamBuf(BundleStreamBufCallback &callback, const dtn::data::Length chunk_size=4096, bool wait_seq_zero=false)
bool get(STREAM_FLAGS flag) const
virtual void delivered(const dtn::data::MetaBundle &b)=0
virtual std::char_traits< char >::int_type underflow()
void setChunkSize(const dtn::data::Length &size)
static const dtn::data::Length BUFF_SIZE
virtual dtn::data::MetaBundle get(const dtn::data::Timeout timeout=0)=0
void set(STREAM_FLAGS flag, const bool &value)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
static void processBlocks(dtn::data::Bundle &b)
iterator find(block_t blocktype)
const Number & getSequenceNumber() const
ibrcommon::BLOB::Reference getBLOB() const
virtual void put(dtn::data::Bundle &b)=0
virtual std::char_traits< char >::int_type overflow(std::char_traits< char >::int_type=std::char_traits< char >::eof())
dtn::storage::BundleStorage & getStorage()
void setSequenceNumber(Number seq)
static BundleCore & getInstance()