|
IBR-DTNSuite 0.6
|
00001 /* 00002 * Client.cpp 00003 * 00004 * Created on: 24.06.2009 00005 * Author: morgenro 00006 */ 00007 00008 00009 00010 00011 #include "ibrdtn/api/Client.h" 00012 #include "ibrdtn/api/Bundle.h" 00013 #include "ibrdtn/data/SDNV.h" 00014 #include "ibrdtn/data/Exceptions.h" 00015 #include "ibrdtn/streams/StreamDataSegment.h" 00016 #include "ibrdtn/streams/StreamContactHeader.h" 00017 00018 #include <ibrcommon/net/tcpstream.h> 00019 #include <ibrcommon/Logger.h> 00020 00021 #include <iostream> 00022 #include <string> 00023 00024 namespace dtn 00025 { 00026 namespace api 00027 { 00028 Client::AsyncReceiver::AsyncReceiver(Client &client) 00029 : _client(client), _running(true) 00030 { 00031 } 00032 00033 Client::AsyncReceiver::~AsyncReceiver() 00034 { 00035 } 00036 00037 bool Client::AsyncReceiver::__cancellation() 00038 { 00039 _running = false; 00040 return true; 00041 } 00042 00043 void Client::AsyncReceiver::run() 00044 { 00045 try { 00046 while (!_client.eof() && _running) 00047 { 00048 dtn::api::Bundle b; 00049 _client >> b; 00050 _client.received(b); 00051 yield(); 00052 } 00053 } catch (const dtn::api::ConnectionException &ex) { 00054 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00055 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00056 } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) { 00057 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00058 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00059 } catch (const ibrcommon::IOException &ex) { 00060 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00061 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00062 } catch (const dtn::InvalidDataException &ex) { 00063 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00064 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00065 } catch (const std::exception&) { 00066 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL; 00067 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00068 } 00069 } 00070 00071 Client::Client(const std::string &app, const dtn::data::EID &group, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode) 00072 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _group(group), _receiver(*this) 00073 { 00074 } 00075 00076 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode) 00077 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this) 00078 { 00079 } 00080 00081 Client::~Client() 00082 { 00083 try { 00084 // stop the receiver 00085 _receiver.stop(); 00086 } catch (const ibrcommon::ThreadException &ex) { 00087 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00088 } 00089 00090 // Close the stream. This releases all reading or writing threads. 00091 _stream.close(); 00092 00093 // wait until the async thread has been finished 00094 _receiver.join(); 00095 } 00096 00097 void Client::connect() 00098 { 00099 // do a handshake 00100 EID localeid(EID("api:" + _app)); 00101 00102 // connection flags 00103 char flags = 0; 00104 00105 // request acknowledgements 00106 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00107 00108 // set comm. mode 00109 if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY; 00110 00111 // receive API banner 00112 std::string buffer; 00113 std::getline(_stream, buffer); 00114 00115 // if requested... 00116 if (_group != dtn::data::EID()) 00117 { 00118 // join the group 00119 _stream << "registration add " << _group.getString() << std::endl; 00120 00121 // read the reply 00122 std::getline(_stream, buffer); 00123 } 00124 00125 // switch to API tcpcl mode 00126 _stream << "protocol tcpcl" << std::endl; 00127 00128 // do the handshake (no timeout, no keepalive) 00129 handshake(localeid, 0, flags); 00130 00131 try { 00132 // run the receiver 00133 _receiver.start(); 00134 } catch (const ibrcommon::ThreadException &ex) { 00135 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00136 } 00137 } 00138 00139 void Client::close() 00140 { 00141 // shutdown the bundle stream connection 00142 shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN); 00143 } 00144 00145 void Client::eventConnectionDown() 00146 { 00147 _inqueue.abort(); 00148 00149 try { 00150 _receiver.stop(); 00151 } catch (const ibrcommon::ThreadException &ex) { 00152 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00153 } 00154 } 00155 00156 void Client::eventBundleAck(size_t ack) 00157 { 00158 lastack = ack; 00159 } 00160 00161 void Client::received(const dtn::api::Bundle &b) 00162 { 00163 // if we are in send only mode... 00164 if (_mode != dtn::api::Client::MODE_SENDONLY) 00165 { 00166 // ... then discard the received bundle 00167 _inqueue.push(b); 00168 } 00169 } 00170 00171 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException) 00172 { 00173 try { 00174 return _inqueue.getnpop(true, timeout * 1000); 00175 } catch (const ibrcommon::QueueUnblockedException &ex) { 00176 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT) 00177 { 00178 throw ConnectionTimeoutException(); 00179 } 00180 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT) 00181 { 00182 throw ConnectionAbortedException(ex.what()); 00183 } 00184 00185 throw ConnectionException(ex.what()); 00186 } catch (const std::exception &ex) { 00187 throw ConnectionException(ex.what()); 00188 } 00189 00190 throw ConnectionException(); 00191 } 00192 } 00193 }