00001
00002
00003
00004
00005
00006
00007
00008 #include "ClientHandler.h"
00009 #include "core/GlobalEvent.h"
00010 #include "core/BundleCore.h"
00011 #include "net/BundleReceivedEvent.h"
00012 #include "core/BundleEvent.h"
00013 #include <ibrdtn/streams/StreamContactHeader.h>
00014 #include <ibrdtn/data/Serializer.h>
00015 #include <iostream>
00016
00017 using namespace dtn::data;
00018 using namespace dtn::streams;
00019 using namespace dtn::core;
00020
00021 namespace dtn
00022 {
00023 namespace daemon
00024 {
00025 ClientHandler::ClientHandler(ibrcommon::tcpstream *stream)
00026 : ibrcommon::JoinableThread(), _free(false), _running(true), _stream(stream), _connection(*this, *_stream)
00027 {
00028 }
00029
00030 ClientHandler::~ClientHandler()
00031 {
00032 shutdown();
00033 join();
00034 }
00035
00036 const dtn::data::EID& ClientHandler::getPeer() const
00037 {
00038 return _eid;
00039 }
00040
00041 void ClientHandler::iamfree()
00042 {
00043 _free = true;
00044 }
00045
00046 bool ClientHandler::free()
00047 {
00048 ibrcommon::MutexLock l(_freemutex);
00049 return _free;
00050 }
00051
00052 void ClientHandler::eventShutdown()
00053 {
00054
00055 (*_stream).done();
00056 (*_stream).close();
00057 }
00058
00059 void ClientHandler::eventTimeout()
00060 {
00061 (*_stream).done();
00062 (*_stream).close();
00063 }
00064
00065 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00066 {
00067 if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00068 {
00069
00070
00071 _eid = BundleCore::local + "." + header._localeid.getNode();
00072 }
00073 else
00074 {
00075
00076 _eid = BundleCore::local + "/" + header._localeid.getNode();
00077 }
00078 }
00079
00080 void ClientHandler::eventError()
00081 {
00082 (*_stream).close();
00083 }
00084
00085 void ClientHandler::eventConnectionDown()
00086 {
00087 try {
00088 while (true)
00089 {
00090 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105 _lastack = 0;
00106 }
00107 } catch (ibrcommon::Exception ex) {
00108
00109 }
00110
00111 ibrcommon::MutexLock l(_freemutex);
00112 iamfree();
00113 }
00114
00115 void ClientHandler::eventBundleRefused()
00116 {
00117 try {
00118 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00119
00120
00121 _lastack = 0;
00122
00123 } catch (ibrcommon::Exception ex) {
00124
00125 }
00126 }
00127
00128 void ClientHandler::eventBundleForwarded()
00129 {
00130 try {
00131 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00132
00133
00134 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00135
00136
00137 _lastack = 0;
00138
00139 } catch (ibrcommon::Exception ex) {
00140
00141 }
00142 }
00143
00144 void ClientHandler::eventBundleAck(size_t ack)
00145 {
00146 _lastack = ack;
00147 }
00148
00149 bool ClientHandler::isConnected()
00150 {
00151 return _connection.isConnected();
00152 }
00153
00154 void ClientHandler::shutdown()
00155 {
00156 try {
00157
00158 _connection.wait(500);
00159 } catch (...) {
00160
00161 }
00162
00163
00164 _connection.shutdown();
00165 }
00166
00167 void ClientHandler::run()
00168 {
00169 try {
00170 try {
00171 char flags = 0;
00172
00173
00174 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00175
00176
00177 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00178
00179 BundleStorage &storage = BundleCore::getInstance().getStorage();
00180
00181 while (_running)
00182 {
00183 try {
00184 dtn::data::Bundle b = storage.get( getPeer() );
00185 dtn::data::DefaultSerializer(_connection) << b; _connection << std::flush;
00186 storage.remove(b);
00187 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00188 break;
00189 }
00190 }
00191
00192 while (_running)
00193 {
00194 dtn::data::Bundle bundle;
00195 dtn::data::DefaultDeserializer(_connection) >> bundle;
00196
00197
00198 bundle.relabel();
00199
00200
00201 dtn::data::EID clienteid("api:me");
00202
00203
00204 bundle._source = _eid;
00205
00206 if (bundle._destination == clienteid) bundle._destination = _eid;
00207 if (bundle._reportto == clienteid) bundle._reportto = _eid;
00208 if (bundle._custodian == clienteid) bundle._custodian = _eid;
00209
00210
00211 dtn::net::BundleReceivedEvent::raise(EID(), bundle);
00212
00213 yield();
00214 }
00215
00216 _connection.shutdown();
00217 } catch (ibrcommon::IOException ex) {
00218 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00219 _running = false;
00220 } catch (dtn::InvalidDataException ex) {
00221 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00222 _running = false;
00223 }
00224 } catch (...) {
00225
00226 }
00227 }
00228
00229 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00230 {
00231
00232 dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00233
00234 return conn;
00235 }
00236
00237 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00238 {
00239
00240 conn._sentqueue.push(bundle);
00241
00242
00243 dtn::data::DefaultSerializer(conn._connection) << bundle;
00244
00245
00246 conn._connection << std::flush;
00247
00248 return conn;
00249 }
00250 }
00251 }
00252