IBR-DTNSuite 0.6

ibrdtn/ibrdtn/api/Client.cpp

Go to the documentation of this file.
00001 /*
00002  * Client.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 
00009 
00010 
00011 #include "ibrdtn/api/Client.h"
00012 #include "ibrdtn/api/Bundle.h"
00013 #include "ibrdtn/data/SDNV.h"
00014 #include "ibrdtn/data/Exceptions.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include "ibrdtn/streams/StreamContactHeader.h"
00017 
00018 #include <ibrcommon/net/tcpstream.h>
00019 #include <ibrcommon/Logger.h>
00020 
00021 #include <iostream>
00022 #include <string>
00023 
00024 namespace dtn
00025 {
00026         namespace api
00027         {
00028                 Client::AsyncReceiver::AsyncReceiver(Client &client)
00029                  : _client(client), _running(true)
00030                 {
00031                 }
00032 
00033                 Client::AsyncReceiver::~AsyncReceiver()
00034                 {
00035                 }
00036 
00037                 bool Client::AsyncReceiver::__cancellation()
00038                 {
00039                         _running = false;
00040                         return true;
00041                 }
00042 
00043                 void Client::AsyncReceiver::run()
00044                 {
00045                         try {
00046                                 while (!_client.eof() && _running)
00047                                 {
00048                                         dtn::api::Bundle b;
00049                                         _client >> b;
00050                                         _client.received(b);
00051                                         yield();
00052                                 }
00053                         } catch (const dtn::api::ConnectionException &ex) {
00054                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00055                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00056                         } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) {
00057                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00058                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00059                         } catch (const ibrcommon::IOException &ex) {
00060                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00061                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00062                         } catch (const dtn::InvalidDataException &ex) {
00063                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00064                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00065                         } catch (const std::exception&) {
00066                                 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL;
00067                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00068                         }
00069                 }
00070 
00071                 Client::Client(const std::string &app, const dtn::data::EID &group, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode)
00072                   : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _group(group), _receiver(*this)
00073                 {
00074                 }
00075 
00076                 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode)
00077                   : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this)
00078                 {
00079                 }
00080 
00081                 Client::~Client()
00082                 {
00083                         try {
00084                                 // stop the receiver
00085                                 _receiver.stop();
00086                         } catch (const ibrcommon::ThreadException &ex) {
00087                                 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00088                         }
00089                         
00090                         // Close the stream. This releases all reading or writing threads.
00091                         _stream.close();
00092 
00093                         // wait until the async thread has been finished
00094                         _receiver.join();
00095                 }
00096 
00097                 void Client::connect()
00098                 {
00099                         // do a handshake
00100                         EID localeid(EID("api:" + _app));
00101 
00102                         // connection flags
00103                         char flags = 0;
00104 
00105                         // request acknowledgements
00106                         flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00107 
00108                         // set comm. mode
00109                         if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY;
00110 
00111                         // receive API banner
00112                         std::string buffer;
00113                         std::getline(_stream, buffer);
00114 
00115                         // if requested...
00116                         if (_group != dtn::data::EID())
00117                         {
00118                                 // join the group
00119                                 _stream << "registration add " << _group.getString() << std::endl;
00120 
00121                                 // read the reply
00122                                 std::getline(_stream, buffer);
00123                         }
00124 
00125                         // switch to API tcpcl mode
00126                         _stream << "protocol tcpcl" << std::endl;
00127 
00128                         // do the handshake (no timeout, no keepalive)
00129                         handshake(localeid, 0, flags);
00130 
00131                         try {
00132                                 // run the receiver
00133                                 _receiver.start();
00134                         } catch (const ibrcommon::ThreadException &ex) {
00135                                 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00136                         }
00137                 }
00138 
00139                 void Client::close()
00140                 {
00141                         // shutdown the bundle stream connection
00142                         shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00143                 }
00144 
00145                 void Client::eventConnectionDown()
00146                 {
00147                         _inqueue.abort();
00148 
00149                         try {
00150                                 _receiver.stop();
00151                         } catch (const ibrcommon::ThreadException &ex) {
00152                                 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00153                         }
00154                 }
00155 
00156                 void Client::eventBundleAck(size_t ack)
00157                 {
00158                         lastack = ack;
00159                 }
00160 
00161                 void Client::received(const dtn::api::Bundle &b)
00162                 {
00163                         // if we are in send only mode...
00164                         if (_mode != dtn::api::Client::MODE_SENDONLY)
00165                         {
00166                                 // ... then discard the received bundle
00167                                 _inqueue.push(b);
00168                         }
00169                 }
00170 
00171                 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException)
00172                 {
00173                         try {
00174                                 return _inqueue.getnpop(true, timeout * 1000);
00175                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00176                                 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT)
00177                                 {
00178                                         throw ConnectionTimeoutException();
00179                                 }
00180                                 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00181                                 {
00182                                         throw ConnectionAbortedException(ex.what());
00183                                 }
00184 
00185                                 throw ConnectionException(ex.what());
00186                         } catch (const std::exception &ex) {
00187                                 throw ConnectionException(ex.what());
00188                         }
00189 
00190                         throw ConnectionException();
00191                 }
00192         }
00193 }