31 #include <ibrcommon/Logger.h>
38 :
ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream), _lastack(0)
69 if (header._localeid.isNone())
77 _eid = BundleCore::local;
81 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 20) <<
"new client connected, handle: " << reg.
getHandle() <<
"; eid: "<< _eid.getString() << IBRCOMMON_LOGGER_ENDL;
88 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 40) <<
"BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
95 }
catch (
const ibrcommon::ThreadException &ex) {
96 IBRCOMMON_LOGGER_TAG(
"BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
108 }
catch (
const ibrcommon::QueueUnblockedException&) {
124 }
catch (
const ibrcommon::QueueUnblockedException&) {
145 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 60) <<
"BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL;
156 }
catch (
const std::exception&) { };
173 while (_connection.good())
181 }
catch (
const ibrcommon::ThreadException &ex) {
182 IBRCOMMON_LOGGER_TAG(
"BinaryStreamClient", error) <<
"failed to start thread: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
185 IBRCOMMON_LOGGER_TAG(
"BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
187 }
catch (
const ibrcommon::IOException &ex) {
188 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
191 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
193 }
catch (
const std::exception &ex) {
194 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
209 BinaryStreamClient::Sender::~Sender()
211 ibrcommon::JoinableThread::join();
214 void BinaryStreamClient::Sender::__cancellation() throw ()
223 void BinaryStreamClient::Sender::run() throw ()
225 Registration ® =
_client._client.getRegistration();
236 }
catch (
const ibrcommon::Exception&) {
242 _client._sentqueue.push(bundle);
248 _client._connection << std::flush;
250 reg.wait_for_bundle();
256 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
257 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
259 }
catch (
const ibrcommon::IOException &ex) {
260 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
262 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
263 }
catch (
const std::exception &ex) {
264 IBRCOMMON_LOGGER_DEBUG_TAG(
"BinaryStreamClient", 10) <<
"unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
270 _sender.push(bundle);
void delivered(const dtn::data::MetaBundle &m) const
static dtn::data::EID local
virtual void eventConnectionDown()
void subscribe(const dtn::data::EID &endpoint)
void setApplication(const dtn::data::Number &app)
const std::string & getHandle() const
virtual ~BinaryStreamClient()
const dtn::data::EID & getDefaultEID() const
BinaryStreamClient(ClientHandler &client, ibrcommon::socketstream &stream)
virtual void eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases csc)
dtn::api::Client * _client
Registration & getRegistration()
void handshake(const dtn::data::EID &eid, const dtn::data::Timeout &timeout, const dtn::data::Bitset< StreamContactHeader::HEADER_BITS > &flags)
virtual void eventTimeout()
void shutdown(ConnectionShutdownCases csc=CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN)
virtual void eventBundleRefused()
void unsubscribe(const dtn::data::EID &endpoint)
virtual void eventConnectionUp(const dtn::streams::StreamContactHeader &header)
virtual void eventBundleAck(const dtn::data::Length &ack)
void queue(const dtn::data::Bundle &bundle)
static void processBlocks(dtn::data::Bundle &b)
const dtn::data::EID & getPeer() const
ibrcommon::socketstream & _stream
virtual void eventError()
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
virtual void eventBundleForwarded()