28 #include <ibrcommon/Logger.h>
31 #ifdef IBRDTN_SUPPORT_BSP
40 :
ProtocolHandler(client, stream), _sender(*this), _streambuf(*this), _bundlestream(&_streambuf), _group(true), _lifetime(3600)
58 IBRCOMMON_LOGGER_DEBUG_TAG(
"OrderedStreamHandler", 20) <<
"put()" << IBRCOMMON_LOGGER_ENDL;
86 IBRCOMMON_LOGGER_DEBUG_TAG(
"OrderedStreamHandler", 20) <<
"get()" << IBRCOMMON_LOGGER_ENDL;
96 if ((!_group) && (bundle.
source != _peer))
98 IBRCOMMON_LOGGER_DEBUG_TAG(
"OrderedStreamHandler", 30) <<
"get(): bundle source " << bundle.
source.
getString() <<
" not expected - discard" << IBRCOMMON_LOGGER_ENDL;
104 IBRCOMMON_LOGGER_DEBUG_TAG(
"OrderedStreamHandler", 30) <<
"get(): no bundle found wait for notify" << IBRCOMMON_LOGGER_ENDL;
120 IBRCOMMON_LOGGER_DEBUG_TAG(
"OrderedStreamHandler", 60) <<
"OrderedStreamHandler down" << IBRCOMMON_LOGGER_ENDL;
130 }
catch (
const std::exception&) { };
142 std::string::reverse_iterator iter = buffer.rbegin();
143 if ( (*iter) ==
'\r' ) buffer = buffer.substr(0, buffer.length() - 1);
146 if (cmd.empty())
continue;
149 if (cmd[0] ==
"connect")
157 _bundlestream <<
_stream.rdbuf() << std::flush;
159 else if (cmd[0] ==
"set")
161 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
163 if (cmd[1] ==
"endpoint")
165 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
168 if (cmd[2].length() <= 0)
186 else if (cmd[1] ==
"destination")
192 else if (cmd[1] ==
"group")
198 else if (cmd[1] ==
"lifetime")
200 std::stringstream ss(cmd[2]);
204 else if (cmd[1] ==
"chunksize")
207 std::stringstream ss(cmd[2]);
212 else if (cmd[1] ==
"timeout")
215 std::stringstream ss(cmd[2]);
229 }
catch (
const std::exception&) {
240 OrderedStreamHandler::Sender::~Sender()
242 ibrcommon::JoinableThread::join();
245 void OrderedStreamHandler::Sender::__cancellation() throw ()
248 _handler._client.getRegistration().abort();
251 void OrderedStreamHandler::Sender::finally() throw ()
253 _handler._client.getRegistration().abort();
256 void OrderedStreamHandler::Sender::run() throw ()
259 _handler._stream << _handler._bundlestream.rdbuf() << std::flush;
260 }
catch (
const std::exception &ex) {
261 IBRCOMMON_LOGGER_DEBUG_TAG(
"OrderedStreamHandler", 10) <<
"unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
static void inject(const dtn::data::EID &source, dtn::data::Bundle &bundle)
void delivered(const dtn::data::MetaBundle &m) const
virtual void __cancellation()
void setTimeout(const dtn::data::Timeout &timeout)
void subscribe(const dtn::data::EID &endpoint)
virtual void delivered(const dtn::data::MetaBundle &m)
void setApplication(const dtn::data::Number &app)
virtual dtn::data::MetaBundle get(const dtn::data::Timeout timeout=0)
void setChunkSize(const dtn::data::Length &size)
dtn::data::MetaBundle receiveMetaBundle()
const dtn::data::EID & getDefaultEID() const
Registration & getRegistration()
virtual ~OrderedStreamHandler()
void unsubscribe(const dtn::data::EID &endpoint)
void read(std::istream &stream)
std::string getString() const
void wait_for_bundle(size_t timeout=0)
ibrcommon::socketstream & _stream
OrderedStreamHandler(ClientHandler &client, ibrcommon::socketstream &stream)
static std::vector< std::string > tokenize(const std::string &token, const std::string &data, const std::string::size_type max=std::string::npos)
void set(FLAGS flag, bool value)
virtual void put(dtn::data::Bundle &b)