IBR-DTNSuite 0.6

daemon/src/api/BinaryStreamClient.cpp

Go to the documentation of this file.
00001 /*
00002  * BinaryStreamClient.cpp
00003  *
00004  *  Created on: 19.07.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "api/BinaryStreamClient.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "net/BundleReceivedEvent.h"
00014 #include "core/BundleEvent.h"
00015 #include <ibrdtn/streams/StreamContactHeader.h>
00016 #include <ibrdtn/data/Serializer.h>
00017 #include <iostream>
00018 #include <ibrcommon/Logger.h>
00019 
00020 namespace dtn
00021 {
00022         namespace api
00023         {
00024                 BinaryStreamClient::BinaryStreamClient(ClientHandler &client, ibrcommon::tcpstream &stream)
00025                  : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream)
00026                 {
00027                 }
00028 
00029                 BinaryStreamClient::~BinaryStreamClient()
00030                 {
00031                         _sender.join();
00032                 }
00033 
00034                 const dtn::data::EID& BinaryStreamClient::getPeer() const
00035                 {
00036                         return _eid;
00037                 }
00038 
00039                 void BinaryStreamClient::eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases)
00040                 {
00041                 }
00042 
00043                 void BinaryStreamClient::eventTimeout()
00044                 {
00045                 }
00046 
00047                 void BinaryStreamClient::eventError()
00048                 {
00049                 }
00050 
00051                 void BinaryStreamClient::eventConnectionUp(const dtn::streams::StreamContactHeader &header)
00052                 {
00053                         if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00054                         {
00055                                 // This node is working with compressed addresses
00056                                 // generate a number
00057                                 _eid = BundleCore::local + "." + header._localeid.getHost();
00058                         }
00059                         else
00060                         {
00061                                 // contact received event
00062                                 _eid = BundleCore::local + "/" + header._localeid.getHost() + header._localeid.getApplication();
00063                         }
00064 
00065                         IBRCOMMON_LOGGER_DEBUG(20) << "new client connected: " << _eid.getString() << IBRCOMMON_LOGGER_ENDL;
00066 
00067                         _client.getRegistration().subscribe(_eid);
00068                 }
00069 
00070                 void BinaryStreamClient::eventConnectionDown()
00071                 {
00072                         IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00073 
00074                         _client.getRegistration().unsubscribe(_eid);
00075 
00076                         try {
00077                                 // stop the sender
00078                                 _sender.stop();
00079                         } catch (const ibrcommon::ThreadException &ex) {
00080                                 IBRCOMMON_LOGGER_DEBUG(50) << "BinaryStreamClient::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00081                         }
00082                 }
00083 
00084                 void BinaryStreamClient::eventBundleRefused()
00085                 {
00086                         try {
00087                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00088 
00089                                 // set ACK to zero
00090                                 _lastack = 0;
00091 
00092                         } catch (const ibrcommon::QueueUnblockedException&) {
00093                                 // pop on empty queue!
00094                         }
00095                 }
00096 
00097                 void BinaryStreamClient::eventBundleForwarded()
00098                 {
00099                         try {
00100                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00101 
00102                                 // notify bundle as delivered
00103                                 _client.getRegistration().delivered(bundle);
00104 
00105                                 // set ACK to zero
00106                                 _lastack = 0;
00107                         } catch (const ibrcommon::QueueUnblockedException&) {
00108                                 // pop on empty queue!
00109                         }
00110                 }
00111 
00112                 void BinaryStreamClient::eventBundleAck(size_t ack)
00113                 {
00114                         _lastack = ack;
00115                 }
00116 
00117                 bool BinaryStreamClient::__cancellation()
00118                 {
00119                         // shutdown
00120                         _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00121 
00122                         // close the stream
00123                         try {
00124                                 _stream.close();
00125                         } catch (const ibrcommon::ConnectionClosedException&) { };
00126 
00127                         return true;
00128                 }
00129 
00130                 void BinaryStreamClient::finally()
00131                 {
00132                         IBRCOMMON_LOGGER_DEBUG(60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL;
00133 
00134                         // close the stream
00135                         try {
00136                                 _stream.close();
00137                         } catch (const ibrcommon::ConnectionClosedException&) { };
00138 
00139                         try {
00140                                 // shutdown the sender thread
00141                                 _sender.stop();
00142                         } catch (const std::exception&) { };
00143                 }
00144 
00145                 void BinaryStreamClient::run()
00146                 {
00147                         try {
00148                                 char flags = 0;
00149 
00150                                 // request acknowledgements
00151                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00152 
00153                                 // do the handshake
00154                                 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00155 
00156                                 // start the sender thread
00157                                 _sender.start();
00158 
00159                                 while (!_connection.eof())
00160                                 {
00161                                         dtn::data::Bundle bundle;
00162                                         dtn::data::DefaultDeserializer(_connection) >> bundle;
00163 
00164                                         // create a new sequence number
00165                                         bundle.relabel();
00166 
00167                                         // process the new bundle
00168                                         _client.getAPIServer().processIncomingBundle(_eid, bundle);
00169                                 }
00170                         } catch (const ibrcommon::ThreadException &ex) {
00171                                 IBRCOMMON_LOGGER(error) << "failed to start thread in BinaryStreamClient\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00172                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00173                         } catch (const dtn::SerializationFailedException &ex) {
00174                                 IBRCOMMON_LOGGER(error) << "BinaryStreamClient::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00175                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00176                         } catch (const ibrcommon::IOException &ex) {
00177                                 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00178                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00179                         } catch (const dtn::InvalidDataException &ex) {
00180                                 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00181                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00182                         } catch (const std::exception &ex) {
00183                                 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00184                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00185                         }
00186                 }
00187 
00188                 bool BinaryStreamClient::good() const
00189                 {
00190                         return _stream.good();
00191                 }
00192 
00193                 BinaryStreamClient::Sender::Sender(BinaryStreamClient &client)
00194                  : _client(client)
00195                 {
00196                 }
00197 
00198                 BinaryStreamClient::Sender::~Sender()
00199                 {
00200                 }
00201 
00202                 bool BinaryStreamClient::Sender::__cancellation()
00203                 {
00204                         // cancel the main thread in here
00205                         this->abort();
00206 
00207                         // abort all blocking calls on the registration object
00208                         _client._client.getRegistration().abort();
00209 
00210                         return true;
00211                 }
00212 
00213                 void BinaryStreamClient::Sender::run()
00214                 {
00215                         Registration &reg = _client._client.getRegistration();
00216 
00217                         try {
00218                                 while (_client.good())
00219                                 {
00220                                         try {
00221                                                 dtn::data::Bundle bundle = reg.receive();
00222 
00223                                                 // process the bundle block (security, compression, ...)
00224                                                 dtn::core::BundleCore::processBlocks(bundle);
00225 
00226                                                 // add bundle to the queue
00227                                                 _client._sentqueue.push(bundle);
00228 
00229                                                 // transmit the bundle
00230                                                 dtn::data::DefaultSerializer(_client._connection) << bundle;
00231 
00232                                                 // mark the end of the bundle
00233                                                 _client._connection << std::flush;
00234                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00235                                                 reg.wait_for_bundle();
00236                                         }
00237 
00238                                         // idle a little bit
00239                                         yield();
00240                                 }
00241                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00242                                 IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00243                                 return;
00244                         } catch (const ibrcommon::IOException &ex) {
00245                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00246                         } catch (const dtn::InvalidDataException &ex) {
00247                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00248                         } catch (const std::exception &ex) {
00249                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00250                         }
00251                 }
00252 
00253                 void BinaryStreamClient::queue(const dtn::data::Bundle &bundle)
00254                 {
00255                         _sender.push(bundle);
00256                 }
00257         }
00258 }