|
IBR-DTNSuite 0.6
|
00001 /* 00002 * BinaryStreamClient.cpp 00003 * 00004 * Created on: 19.07.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "Configuration.h" 00010 #include "api/BinaryStreamClient.h" 00011 #include "core/GlobalEvent.h" 00012 #include "core/BundleCore.h" 00013 #include "net/BundleReceivedEvent.h" 00014 #include "core/BundleEvent.h" 00015 #include <ibrdtn/streams/StreamContactHeader.h> 00016 #include <ibrdtn/data/Serializer.h> 00017 #include <iostream> 00018 #include <ibrcommon/Logger.h> 00019 00020 namespace dtn 00021 { 00022 namespace api 00023 { 00024 BinaryStreamClient::BinaryStreamClient(ClientHandler &client, ibrcommon::tcpstream &stream) 00025 : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream) 00026 { 00027 } 00028 00029 BinaryStreamClient::~BinaryStreamClient() 00030 { 00031 _sender.join(); 00032 } 00033 00034 const dtn::data::EID& BinaryStreamClient::getPeer() const 00035 { 00036 return _eid; 00037 } 00038 00039 void BinaryStreamClient::eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases) 00040 { 00041 } 00042 00043 void BinaryStreamClient::eventTimeout() 00044 { 00045 } 00046 00047 void BinaryStreamClient::eventError() 00048 { 00049 } 00050 00051 void BinaryStreamClient::eventConnectionUp(const dtn::streams::StreamContactHeader &header) 00052 { 00053 if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME) 00054 { 00055 // This node is working with compressed addresses 00056 // generate a number 00057 _eid = BundleCore::local + "." + header._localeid.getHost(); 00058 } 00059 else 00060 { 00061 // contact received event 00062 _eid = BundleCore::local + "/" + header._localeid.getHost() + header._localeid.getApplication(); 00063 } 00064 00065 IBRCOMMON_LOGGER_DEBUG(20) << "new client connected: " << _eid.getString() << IBRCOMMON_LOGGER_ENDL; 00066 00067 _client.getRegistration().subscribe(_eid); 00068 } 00069 00070 void BinaryStreamClient::eventConnectionDown() 00071 { 00072 IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL; 00073 00074 _client.getRegistration().unsubscribe(_eid); 00075 00076 try { 00077 // stop the sender 00078 _sender.stop(); 00079 } catch (const ibrcommon::ThreadException &ex) { 00080 IBRCOMMON_LOGGER_DEBUG(50) << "BinaryStreamClient::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00081 } 00082 } 00083 00084 void BinaryStreamClient::eventBundleRefused() 00085 { 00086 try { 00087 const dtn::data::Bundle bundle = _sentqueue.getnpop(); 00088 00089 // set ACK to zero 00090 _lastack = 0; 00091 00092 } catch (const ibrcommon::QueueUnblockedException&) { 00093 // pop on empty queue! 00094 } 00095 } 00096 00097 void BinaryStreamClient::eventBundleForwarded() 00098 { 00099 try { 00100 const dtn::data::Bundle bundle = _sentqueue.getnpop(); 00101 00102 // notify bundle as delivered 00103 _client.getRegistration().delivered(bundle); 00104 00105 // set ACK to zero 00106 _lastack = 0; 00107 } catch (const ibrcommon::QueueUnblockedException&) { 00108 // pop on empty queue! 00109 } 00110 } 00111 00112 void BinaryStreamClient::eventBundleAck(size_t ack) 00113 { 00114 _lastack = ack; 00115 } 00116 00117 bool BinaryStreamClient::__cancellation() 00118 { 00119 // shutdown 00120 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00121 00122 // close the stream 00123 try { 00124 _stream.close(); 00125 } catch (const ibrcommon::ConnectionClosedException&) { }; 00126 00127 return true; 00128 } 00129 00130 void BinaryStreamClient::finally() 00131 { 00132 IBRCOMMON_LOGGER_DEBUG(60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL; 00133 00134 // close the stream 00135 try { 00136 _stream.close(); 00137 } catch (const ibrcommon::ConnectionClosedException&) { }; 00138 00139 try { 00140 // shutdown the sender thread 00141 _sender.stop(); 00142 } catch (const std::exception&) { }; 00143 } 00144 00145 void BinaryStreamClient::run() 00146 { 00147 try { 00148 char flags = 0; 00149 00150 // request acknowledgements 00151 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00152 00153 // do the handshake 00154 _connection.handshake(dtn::core::BundleCore::local, 10, flags); 00155 00156 // start the sender thread 00157 _sender.start(); 00158 00159 while (!_connection.eof()) 00160 { 00161 dtn::data::Bundle bundle; 00162 dtn::data::DefaultDeserializer(_connection) >> bundle; 00163 00164 // create a new sequence number 00165 bundle.relabel(); 00166 00167 // process the new bundle 00168 _client.getAPIServer().processIncomingBundle(_eid, bundle); 00169 } 00170 } catch (const ibrcommon::ThreadException &ex) { 00171 IBRCOMMON_LOGGER(error) << "failed to start thread in BinaryStreamClient\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00172 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00173 } catch (const dtn::SerializationFailedException &ex) { 00174 IBRCOMMON_LOGGER(error) << "BinaryStreamClient::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00175 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00176 } catch (const ibrcommon::IOException &ex) { 00177 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00178 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00179 } catch (const dtn::InvalidDataException &ex) { 00180 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00181 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00182 } catch (const std::exception &ex) { 00183 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00184 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00185 } 00186 } 00187 00188 bool BinaryStreamClient::good() const 00189 { 00190 return _stream.good(); 00191 } 00192 00193 BinaryStreamClient::Sender::Sender(BinaryStreamClient &client) 00194 : _client(client) 00195 { 00196 } 00197 00198 BinaryStreamClient::Sender::~Sender() 00199 { 00200 } 00201 00202 bool BinaryStreamClient::Sender::__cancellation() 00203 { 00204 // cancel the main thread in here 00205 this->abort(); 00206 00207 // abort all blocking calls on the registration object 00208 _client._client.getRegistration().abort(); 00209 00210 return true; 00211 } 00212 00213 void BinaryStreamClient::Sender::run() 00214 { 00215 Registration ® = _client._client.getRegistration(); 00216 00217 try { 00218 while (_client.good()) 00219 { 00220 try { 00221 dtn::data::Bundle bundle = reg.receive(); 00222 00223 // process the bundle block (security, compression, ...) 00224 dtn::core::BundleCore::processBlocks(bundle); 00225 00226 // add bundle to the queue 00227 _client._sentqueue.push(bundle); 00228 00229 // transmit the bundle 00230 dtn::data::DefaultSerializer(_client._connection) << bundle; 00231 00232 // mark the end of the bundle 00233 _client._connection << std::flush; 00234 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00235 reg.wait_for_bundle(); 00236 } 00237 00238 // idle a little bit 00239 yield(); 00240 } 00241 } catch (const ibrcommon::QueueUnblockedException &ex) { 00242 IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00243 return; 00244 } catch (const ibrcommon::IOException &ex) { 00245 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00246 } catch (const dtn::InvalidDataException &ex) { 00247 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00248 } catch (const std::exception &ex) { 00249 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00250 } 00251 } 00252 00253 void BinaryStreamClient::queue(const dtn::data::Bundle &bundle) 00254 { 00255 _sender.push(bundle); 00256 } 00257 } 00258 }