00001
00002
00003
00004
00005
00006
00007
00008 #include "ClientHandler.h"
00009 #include "core/GlobalEvent.h"
00010 #include "core/BundleCore.h"
00011 #include "net/BundleReceivedEvent.h"
00012 #include "core/BundleEvent.h"
00013 #include <ibrdtn/streams/StreamContactHeader.h>
00014 #include <ibrdtn/data/Serializer.h>
00015 #include <iostream>
00016
00017 using namespace dtn::data;
00018 using namespace dtn::streams;
00019 using namespace dtn::core;
00020
00021 namespace dtn
00022 {
00023 namespace daemon
00024 {
00025 ClientHandler::ClientHandler(ibrcommon::tcpstream *stream)
00026 : ibrcommon::JoinableThread(), _free(false), _running(true), _stream(stream), _connection(*this, *_stream)
00027 {
00028 }
00029
00030 ClientHandler::~ClientHandler()
00031 {
00032 shutdown();
00033 join();
00034 }
00035
00036 const dtn::data::EID& ClientHandler::getPeer() const
00037 {
00038 return _eid;
00039 }
00040
00041 void ClientHandler::iamfree()
00042 {
00043 _free = true;
00044 }
00045
00046 bool ClientHandler::free()
00047 {
00048 ibrcommon::MutexLock l(_freemutex);
00049 return _free;
00050 }
00051
00052 void ClientHandler::eventShutdown()
00053 {
00054
00055 (*_stream).done();
00056 (*_stream).close();
00057 }
00058
00059 void ClientHandler::eventTimeout()
00060 {
00061 (*_stream).done();
00062 (*_stream).close();
00063 }
00064
00065 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00066 {
00067
00068 _eid = BundleCore::local + header._localeid.getApplication();
00069 }
00070
00071 void ClientHandler::eventError()
00072 {
00073 (*_stream).close();
00074 }
00075
00076 void ClientHandler::eventConnectionDown()
00077 {
00078 try {
00079 while (true)
00080 {
00081 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096 _lastack = 0;
00097 }
00098 } catch (ibrcommon::Exception ex) {
00099
00100 }
00101
00102 ibrcommon::MutexLock l(_freemutex);
00103 iamfree();
00104 }
00105
00106 void ClientHandler::eventBundleRefused()
00107 {
00108 try {
00109 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00110
00111
00112 _lastack = 0;
00113
00114 } catch (ibrcommon::Exception ex) {
00115
00116 }
00117 }
00118
00119 void ClientHandler::eventBundleForwarded()
00120 {
00121 try {
00122 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00123
00124
00125 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00126
00127
00128 _lastack = 0;
00129
00130 } catch (ibrcommon::Exception ex) {
00131
00132 }
00133 }
00134
00135 void ClientHandler::eventBundleAck(size_t ack)
00136 {
00137 _lastack = ack;
00138 }
00139
00140 bool ClientHandler::isConnected()
00141 {
00142 return _connection.isConnected();
00143 }
00144
00145 void ClientHandler::shutdown()
00146 {
00147 try {
00148
00149 _connection.wait(500);
00150 } catch (...) {
00151
00152 }
00153
00154
00155 _connection.shutdown();
00156 }
00157
00158 void ClientHandler::run()
00159 {
00160 try {
00161 try {
00162 char flags = 0;
00163
00164
00165 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00166
00167
00168 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00169
00170 BundleStorage &storage = BundleCore::getInstance().getStorage();
00171
00172 while (_running)
00173 {
00174 try {
00175 dtn::data::Bundle b = storage.get( getPeer() );
00176 dtn::data::DefaultSerializer(_connection) << b; _connection << std::flush;
00177 storage.remove(b);
00178 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00179 break;
00180 }
00181 }
00182
00183 while (_running)
00184 {
00185 dtn::data::Bundle bundle;
00186 dtn::data::DefaultDeserializer(_connection) >> bundle;
00187
00188
00189 bundle.relabel();
00190
00191
00192 dtn::data::EID clienteid("dtn:client");
00193
00194 if (bundle._source == EID()) bundle._source = _eid;
00195 else if (bundle._source == clienteid) bundle._source = _eid;
00196
00197 if (bundle._destination == clienteid) bundle._destination = _eid;
00198 if (bundle._reportto == clienteid) bundle._reportto = _eid;
00199 if (bundle._custodian == clienteid) bundle._custodian = _eid;
00200
00201
00202 dtn::net::BundleReceivedEvent::raise(EID(), bundle);
00203
00204 yield();
00205 }
00206
00207 _connection.shutdown();
00208 } catch (ibrcommon::IOException ex) {
00209 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00210 _running = false;
00211 } catch (dtn::InvalidDataException ex) {
00212 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00213 _running = false;
00214 }
00215 } catch (...) {
00216
00217 }
00218 }
00219
00220 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00221 {
00222
00223 dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00224
00225 return conn;
00226 }
00227
00228 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00229 {
00230
00231 conn._sentqueue.push(bundle);
00232
00233
00234 dtn::data::DefaultSerializer(conn._connection) << bundle;
00235
00236
00237 conn._connection << std::flush;
00238
00239 return conn;
00240 }
00241 }
00242 }
00243