IBR-DTNSuite 0.6

ibrdtn/ibrdtn/api/Client.cpp

Go to the documentation of this file.
00001 /*
00002  * Client.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 
00009 
00010 
00011 #include "ibrdtn/api/Client.h"
00012 #include "ibrdtn/api/Bundle.h"
00013 #include "ibrdtn/data/SDNV.h"
00014 #include "ibrdtn/data/Exceptions.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include "ibrdtn/streams/StreamContactHeader.h"
00017 
00018 #include <ibrcommon/net/tcpstream.h>
00019 #include <ibrcommon/Logger.h>
00020 
00021 using namespace dtn::data;
00022 using namespace dtn::streams;
00023 
00024 namespace dtn
00025 {
00026         namespace api
00027         {
00028                 Client::AsyncReceiver::AsyncReceiver(Client &client)
00029                  : _client(client), _running(true)
00030                 {
00031                         // enable exception throwing on stream events
00032                         _client.exceptions(std::ios::badbit | std::ios::eofbit);
00033                 }
00034 
00035                 Client::AsyncReceiver::~AsyncReceiver()
00036                 {
00037                 }
00038 
00039                 bool Client::AsyncReceiver::__cancellation()
00040                 {
00041                         _running = false;
00042                         return true;
00043                 }
00044 
00045                 void Client::AsyncReceiver::run()
00046                 {
00047                         try {
00048                                 while (!_client.eof() && _running)
00049                                 {
00050                                         dtn::api::Bundle b;
00051                                         _client >> b;
00052                                         _client.received(b);
00053                                         yield();
00054                                 }
00055                         } catch (const dtn::api::ConnectionException &ex) {
00056                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00057                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00058                         } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) {
00059                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00060                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00061                         } catch (const ibrcommon::IOException &ex) {
00062                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00063                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00064                         } catch (const dtn::InvalidDataException &ex) {
00065                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00066                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00067                         } catch (const std::exception&) {
00068                                 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL;
00069                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00070                         }
00071                 }
00072 
00073                 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode)
00074                   : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this)
00075                 {
00076                 }
00077 
00078                 Client::~Client()
00079                 {
00080                         try {
00081                                 // stop the receiver
00082                                 _receiver.stop();
00083                         } catch (const ibrcommon::ThreadException &ex) {
00084                                 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00085                         }
00086                         
00087                         // Close the stream. This releases all reading or writing threads.
00088                         _stream.close();
00089 
00090                         // wait until the async thread has been finished
00091                         _receiver.join();
00092                 }
00093 
00094                 void Client::connect()
00095                 {
00096                         // do a handshake
00097                         EID localeid(EID("api:" + _app));
00098 
00099                         // connection flags
00100                         char flags = 0;
00101 
00102                         // request acknowledgements
00103                         flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00104 
00105                         // set comm. mode
00106                         if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY;
00107 
00108                         // do the handshake (no timeout, no keepalive)
00109                         handshake(localeid, 0, flags);
00110 
00111                         try {
00112                                 // run the receiver
00113                                 _receiver.start();
00114                         } catch (const ibrcommon::ThreadException &ex) {
00115                                 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00116                         }
00117                 }
00118 
00119                 void Client::close()
00120                 {
00121                         // shutdown the bundle stream connection
00122                         shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00123                 }
00124 
00125                 void Client::eventConnectionDown()
00126                 {
00127                         _inqueue.abort();
00128 
00129                         try {
00130                                 _receiver.stop();
00131                         } catch (const ibrcommon::ThreadException &ex) {
00132                                 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00133                         }
00134                 }
00135 
00136                 void Client::eventBundleAck(size_t ack)
00137                 {
00138                         lastack = ack;
00139                 }
00140 
00141                 void Client::received(const dtn::api::Bundle &b)
00142                 {
00143                         // if we are in send only mode...
00144                         if (_mode != dtn::api::Client::MODE_SENDONLY)
00145                         {
00146                                 // ... then discard the received bundle
00147                                 _inqueue.push(b);
00148                         }
00149                 }
00150 
00151                 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException)
00152                 {
00153                         try {
00154                                 return _inqueue.getnpop(true, timeout * 1000);
00155                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00156                                 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT)
00157                                 {
00158                                         throw ConnectionTimeoutException();
00159                                 }
00160                                 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00161                                 {
00162                                         throw ConnectionAbortedException(ex.what());
00163                                 }
00164 
00165                                 throw ConnectionException(ex.what());
00166                         } catch (const std::exception &ex) {
00167                                 throw ConnectionException(ex.what());
00168                         }
00169                 }
00170         }
00171 }