|
IBR-DTNSuite 0.6
|
00001 /* 00002 * Client.cpp 00003 * 00004 * Created on: 24.06.2009 00005 * Author: morgenro 00006 */ 00007 00008 00009 00010 00011 #include "ibrdtn/api/Client.h" 00012 #include "ibrdtn/api/Bundle.h" 00013 #include "ibrdtn/data/SDNV.h" 00014 #include "ibrdtn/data/Exceptions.h" 00015 #include "ibrdtn/streams/StreamDataSegment.h" 00016 #include "ibrdtn/streams/StreamContactHeader.h" 00017 00018 #include <ibrcommon/net/tcpstream.h> 00019 #include <ibrcommon/Logger.h> 00020 00021 using namespace dtn::data; 00022 using namespace dtn::streams; 00023 00024 namespace dtn 00025 { 00026 namespace api 00027 { 00028 Client::AsyncReceiver::AsyncReceiver(Client &client) 00029 : _client(client), _running(true) 00030 { 00031 // enable exception throwing on stream events 00032 _client.exceptions(std::ios::badbit | std::ios::eofbit); 00033 } 00034 00035 Client::AsyncReceiver::~AsyncReceiver() 00036 { 00037 } 00038 00039 bool Client::AsyncReceiver::__cancellation() 00040 { 00041 _running = false; 00042 return true; 00043 } 00044 00045 void Client::AsyncReceiver::run() 00046 { 00047 try { 00048 while (!_client.eof() && _running) 00049 { 00050 dtn::api::Bundle b; 00051 _client >> b; 00052 _client.received(b); 00053 yield(); 00054 } 00055 } catch (const dtn::api::ConnectionException &ex) { 00056 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00057 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00058 } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) { 00059 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00060 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00061 } catch (const ibrcommon::IOException &ex) { 00062 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00063 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00064 } catch (const dtn::InvalidDataException &ex) { 00065 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00066 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00067 } catch (const std::exception&) { 00068 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL; 00069 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00070 } 00071 } 00072 00073 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode) 00074 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this) 00075 { 00076 } 00077 00078 Client::~Client() 00079 { 00080 try { 00081 // stop the receiver 00082 _receiver.stop(); 00083 } catch (const ibrcommon::ThreadException &ex) { 00084 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00085 } 00086 00087 // Close the stream. This releases all reading or writing threads. 00088 _stream.close(); 00089 00090 // wait until the async thread has been finished 00091 _receiver.join(); 00092 } 00093 00094 void Client::connect() 00095 { 00096 // do a handshake 00097 EID localeid(EID("api:" + _app)); 00098 00099 // connection flags 00100 char flags = 0; 00101 00102 // request acknowledgements 00103 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00104 00105 // set comm. mode 00106 if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY; 00107 00108 // do the handshake (no timeout, no keepalive) 00109 handshake(localeid, 0, flags); 00110 00111 try { 00112 // run the receiver 00113 _receiver.start(); 00114 } catch (const ibrcommon::ThreadException &ex) { 00115 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00116 } 00117 } 00118 00119 void Client::close() 00120 { 00121 // shutdown the bundle stream connection 00122 shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN); 00123 } 00124 00125 void Client::eventConnectionDown() 00126 { 00127 _inqueue.abort(); 00128 00129 try { 00130 _receiver.stop(); 00131 } catch (const ibrcommon::ThreadException &ex) { 00132 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00133 } 00134 } 00135 00136 void Client::eventBundleAck(size_t ack) 00137 { 00138 lastack = ack; 00139 } 00140 00141 void Client::received(const dtn::api::Bundle &b) 00142 { 00143 // if we are in send only mode... 00144 if (_mode != dtn::api::Client::MODE_SENDONLY) 00145 { 00146 // ... then discard the received bundle 00147 _inqueue.push(b); 00148 } 00149 } 00150 00151 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException) 00152 { 00153 try { 00154 return _inqueue.getnpop(true, timeout * 1000); 00155 } catch (const ibrcommon::QueueUnblockedException &ex) { 00156 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT) 00157 { 00158 throw ConnectionTimeoutException(); 00159 } 00160 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT) 00161 { 00162 throw ConnectionAbortedException(ex.what()); 00163 } 00164 00165 throw ConnectionException(ex.what()); 00166 } catch (const std::exception &ex) { 00167 throw ConnectionException(ex.what()); 00168 } 00169 } 00170 } 00171 }