00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "ibrdtn/api/Client.h"
00012 #include "ibrdtn/api/Bundle.h"
00013 #include "ibrdtn/data/SDNV.h"
00014 #include "ibrdtn/data/Exceptions.h"
00015
00016 #include "ibrcommon/net/tcpstream.h"
00017 #include "ibrdtn/streams/StreamDataSegment.h"
00018 #include "ibrdtn/streams/StreamContactHeader.h"
00019
00020 #include <ibrcommon/Logger.h>
00021
00022 using namespace dtn::data;
00023 using namespace dtn::streams;
00024
00025 namespace dtn
00026 {
00027 namespace api
00028 {
00029 Client::AsyncReceiver::AsyncReceiver(Client &client)
00030 : _client(client), _shutdown(false)
00031 {
00032 }
00033
00034 Client::AsyncReceiver::~AsyncReceiver()
00035 {
00036 _shutdown = true;
00037 join();
00038 }
00039
00040 void Client::AsyncReceiver::run()
00041 {
00042 try {
00043 while (!_shutdown)
00044 {
00045 dtn::api::Bundle b;
00046 _client >> b;
00047 _client.received(b);
00048 yield();
00049 }
00050 } catch (dtn::api::ConnectionException ex) {
00051 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver: ConnectionException" << IBRCOMMON_LOGGER_ENDL;
00052 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00053 } catch (ibrcommon::IOException ex) {
00054 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00055 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00056 } catch (dtn::InvalidDataException ex) {
00057 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver: InvalidDataException" << IBRCOMMON_LOGGER_ENDL;
00058 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00059 } catch (...) {
00060 IBRCOMMON_LOGGER(error) << "unknown error" << IBRCOMMON_LOGGER_ENDL;
00061 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00062 }
00063 }
00064
00065 Client::Client(COMMUNICATION_MODE mode, string app, ibrcommon::tcpstream &stream)
00066 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _connected(false), _receiver(*this)
00067 {
00068 }
00069
00070 Client::Client(string app, ibrcommon::tcpstream &stream)
00071 : StreamConnection(*this, stream), _stream(stream), _mode(MODE_BIDIRECTIONAL), _app(app), _connected(false), _receiver(*this)
00072 {
00073 }
00074
00075 Client::~Client()
00076 {
00077 close();
00078 }
00079
00080 void Client::connect()
00081 {
00082
00083 EID localeid(EID("api:" + _app));
00084
00085
00086 char flags = 0;
00087
00088
00089 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00090
00091
00092 if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY;
00093
00094
00095 handshake(localeid, 10, flags);
00096
00097
00098 _receiver.start();
00099 }
00100
00101 bool Client::isConnected()
00102 {
00103 return _connected;
00104 }
00105
00106 void Client::close()
00107 {
00108
00109 wait(30000);
00110
00111 StreamConnection::shutdown();
00112
00113 ibrcommon::MutexLock l(_inqueue);
00114 _inqueue.signal(true);
00115 }
00116
00117 void Client::received(const StreamContactHeader&)
00118 {
00119 _connected = true;
00120 }
00121
00122 void Client::eventTimeout()
00123 {
00124 _stream.done();
00125 _stream.close();
00126
00127 ibrcommon::MutexLock l(_inqueue);
00128 _inqueue.signal(true);
00129 }
00130
00131 void Client::eventShutdown()
00132 {
00133 _stream.done();
00134 _stream.close();
00135
00136 ibrcommon::MutexLock l(_inqueue);
00137 _inqueue.signal(true);
00138 }
00139
00140 void Client::eventConnectionUp(const StreamContactHeader &header)
00141 {
00142
00143 received(header);
00144
00145
00146 _connected = true;
00147
00148 ibrcommon::MutexLock l(_inqueue);
00149 _inqueue.signal(true);
00150 }
00151
00152 void Client::eventError()
00153 {
00154 _stream.close();
00155
00156 ibrcommon::MutexLock l(_inqueue);
00157 _inqueue.signal(true);
00158 }
00159
00160 void Client::eventConnectionDown()
00161 {
00162 _connected = false;
00163
00164 ibrcommon::MutexLock l(_inqueue);
00165 _inqueue.signal(true);
00166 }
00167
00168 void Client::eventBundleRefused()
00169 {
00170
00171 }
00172
00173 void Client::eventBundleForwarded()
00174 {
00175
00176 }
00177
00178 void Client::eventBundleAck(size_t)
00179 {
00180 }
00181
00182 void Client::received(const dtn::api::Bundle &b)
00183 {
00184 if (_mode != dtn::api::Client::MODE_SENDONLY)
00185 {
00186 _inqueue.push(b);
00187 }
00188 }
00189
00190 dtn::api::Bundle Client::getBundle(size_t timeout)
00191 {
00192 try {
00193 return _inqueue.blockingpop(timeout * 1000);
00194 } catch (ibrcommon::Exception ex) {
00195 throw ibrcommon::ConnectionClosedException();
00196 }
00197 }
00198 }
00199 }