Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "ibrdtn/api/Client.h"
00012 #include "ibrdtn/api/Bundle.h"
00013 #include "ibrdtn/data/SDNV.h"
00014 #include "ibrdtn/data/Exceptions.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include "ibrdtn/streams/StreamContactHeader.h"
00017
00018 #include <ibrcommon/net/tcpstream.h>
00019 #include <ibrcommon/Logger.h>
00020
00021 using namespace dtn::data;
00022 using namespace dtn::streams;
00023
00024 namespace dtn
00025 {
00026 namespace api
00027 {
00028 Client::AsyncReceiver::AsyncReceiver(Client &client)
00029 : _client(client)
00030 {
00031 _client.exceptions(std::ios::badbit | std::ios::eofbit);
00032 }
00033
00034 Client::AsyncReceiver::~AsyncReceiver()
00035 {
00036 join();
00037 }
00038
00039 bool Client::AsyncReceiver::__cancellation()
00040 {
00041
00042 return false;
00043 }
00044
00045 void Client::AsyncReceiver::run()
00046 {
00047 try {
00048 while (true)
00049 {
00050 dtn::api::Bundle b;
00051 _client >> b;
00052 _client.received(b);
00053 yield();
00054 }
00055 } catch (const dtn::api::ConnectionException &ex) {
00056 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00057 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00058 } catch (dtn::streams::StreamConnection::StreamErrorException &ex) {
00059 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00060 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00061 } catch (ibrcommon::IOException ex) {
00062 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00063 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00064 } catch (dtn::InvalidDataException ex) {
00065 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00066 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00067 } catch (std::exception) {
00068 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL;
00069 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00070 }
00071 }
00072
00073 Client::Client(COMMUNICATION_MODE mode, string app, ibrcommon::tcpstream &stream)
00074 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _connected(false), _receiver(*this)
00075 {
00076 }
00077
00078 Client::Client(string app, ibrcommon::tcpstream &stream)
00079 : StreamConnection(*this, stream), _stream(stream), _mode(MODE_BIDIRECTIONAL), _app(app), _connected(false), _receiver(*this)
00080 {
00081 }
00082
00083 Client::~Client()
00084 {
00085 try {
00086 _receiver.stop();
00087 } catch (const ibrcommon::ThreadException &ex) {
00088 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00089 }
00090
00091 _receiver.join();
00092 }
00093
00094 void Client::connect()
00095 {
00096
00097 EID localeid(EID("api:" + _app));
00098
00099
00100 char flags = 0;
00101
00102
00103 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00104
00105
00106 if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY;
00107
00108
00109 handshake(localeid, 0, flags);
00110
00111 try {
00112
00113 _receiver.start();
00114 } catch (const ibrcommon::ThreadException &ex) {
00115 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00116 }
00117 }
00118
00119 bool Client::isConnected()
00120 {
00121 return _connected;
00122 }
00123
00124 void Client::close()
00125 {
00126 shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00127 }
00128
00129 void Client::received(const StreamContactHeader&)
00130 {
00131 _connected = true;
00132 }
00133
00134 void Client::eventTimeout()
00135 {
00136 }
00137
00138 void Client::eventShutdown(StreamConnection::ConnectionShutdownCases csc)
00139 {
00140 }
00141
00142 void Client::eventConnectionUp(const StreamContactHeader&)
00143 {
00144 }
00145
00146 void Client::eventError()
00147 {
00148 }
00149
00150 void Client::eventConnectionDown()
00151 {
00152 _connected = false;
00153 _inqueue.abort();
00154
00155 try {
00156 _receiver.stop();
00157 } catch (const ibrcommon::ThreadException &ex) {
00158 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00159 }
00160 }
00161
00162 void Client::eventBundleRefused()
00163 {
00164
00165 }
00166
00167 void Client::eventBundleForwarded()
00168 {
00169
00170 }
00171
00172 void Client::eventBundleAck(size_t ack)
00173 {
00174 lastack = ack;
00175 }
00176
00177 void Client::received(const dtn::api::Bundle &b)
00178 {
00179 if (_mode != dtn::api::Client::MODE_SENDONLY)
00180 {
00181 _inqueue.push(b);
00182 }
00183 }
00184
00185 dtn::api::Bundle Client::getBundle(size_t timeout)
00186 {
00187 try {
00188 return _inqueue.getnpop(true, timeout * 1000);
00189 } catch (const ibrcommon::QueueUnblockedException &ex) {
00190 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT)
00191 {
00192 throw ConnectionTimeoutException();
00193 }
00194 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00195 {
00196 throw ConnectionAbortedException(ex.what());
00197 }
00198
00199 throw ConnectionException(ex.what());
00200 } catch (const std::exception &ex) {
00201 throw ConnectionException(ex.what());
00202 }
00203 }
00204 }
00205 }