Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "ibrdtn/api/Client.h"
00012 #include "ibrdtn/api/Bundle.h"
00013 #include "ibrdtn/data/SDNV.h"
00014 #include "ibrdtn/data/Exceptions.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include "ibrdtn/streams/StreamContactHeader.h"
00017
00018 #include <ibrcommon/net/tcpstream.h>
00019 #include <ibrcommon/Logger.h>
00020
00021 using namespace dtn::data;
00022 using namespace dtn::streams;
00023
00024 namespace dtn
00025 {
00026 namespace api
00027 {
00028 Client::AsyncReceiver::AsyncReceiver(Client &client)
00029 : _client(client)
00030 {
00031 _client.exceptions(std::ios::badbit | std::ios::eofbit);
00032 }
00033
00034 Client::AsyncReceiver::~AsyncReceiver()
00035 {
00036 join();
00037 }
00038
00039 void Client::AsyncReceiver::run()
00040 {
00041 try {
00042 while (!_client.eof())
00043 {
00044 dtn::api::Bundle b;
00045 _client >> b;
00046 _client.received(b);
00047 yield();
00048 }
00049 } catch (const dtn::api::ConnectionException &ex) {
00050 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00051 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00052 } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) {
00053 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00054 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00055 } catch (const ibrcommon::IOException &ex) {
00056 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00057 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00058 } catch (const dtn::InvalidDataException &ex) {
00059 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00060 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00061 } catch (const std::exception&) {
00062 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL;
00063 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00064 }
00065 }
00066
00067 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode)
00068 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this)
00069 {
00070 }
00071
00072 Client::~Client()
00073 {
00074 try {
00075 _receiver.stop();
00076 } catch (const ibrcommon::ThreadException &ex) {
00077 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00078 }
00079
00080 _receiver.join();
00081 }
00082
00083 void Client::connect()
00084 {
00085
00086 EID localeid(EID("api:" + _app));
00087
00088
00089 char flags = 0;
00090
00091
00092 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00093
00094
00095 if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY;
00096
00097
00098 handshake(localeid, 0, flags);
00099
00100 try {
00101
00102 _receiver.start();
00103 } catch (const ibrcommon::ThreadException &ex) {
00104 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00105 }
00106 }
00107
00108 void Client::close()
00109 {
00110 shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00111 }
00112
00113 void Client::eventConnectionDown()
00114 {
00115 _inqueue.abort();
00116
00117 try {
00118 _receiver.stop();
00119 } catch (const ibrcommon::ThreadException &ex) {
00120 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00121 }
00122 }
00123
00124 void Client::eventBundleAck(size_t ack)
00125 {
00126 lastack = ack;
00127 }
00128
00129 void Client::received(const dtn::api::Bundle &b)
00130 {
00131 if (_mode != dtn::api::Client::MODE_SENDONLY)
00132 {
00133 _inqueue.push(b);
00134 }
00135 }
00136
00137 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException)
00138 {
00139 try {
00140 return _inqueue.getnpop(true, timeout * 1000);
00141 } catch (const ibrcommon::QueueUnblockedException &ex) {
00142 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT)
00143 {
00144 throw ConnectionTimeoutException();
00145 }
00146 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00147 {
00148 throw ConnectionAbortedException(ex.what());
00149 }
00150
00151 throw ConnectionException(ex.what());
00152 } catch (const std::exception &ex) {
00153 throw ConnectionException(ex.what());
00154 }
00155 }
00156 }
00157 }