00001
00002
00003
00004
00005
00006
00007
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "net/BundleReceivedEvent.h"
00010 #include "core/BundleCore.h"
00011 #include "core/NodeEvent.h"
00012 #include "core/GlobalEvent.h"
00013 #include "net/ConnectionEvent.h"
00014 #include "core/BundleEvent.h"
00015 #include "ibrcommon/TimeMeasurement.h"
00016
00017 #include "net/TransferCompletedEvent.h"
00018 #include "net/TransferAbortedEvent.h"
00019
00020 #include <ibrcommon/net/NetInterface.h>
00021 #include <ibrcommon/Logger.h>
00022
00023 #include <iostream>
00024 #include <iomanip>
00025
00026 namespace dtn
00027 {
00028 namespace net
00029 {
00030
00031
00032
00033 TCPConvergenceLayer::TCPConnection::TCPConnection(ibrcommon::tcpstream *stream)
00034 : _free(false), _peer(), _node(Node::NODE_FLOATING), _tcpstream(stream), _stream(*this, *stream), _sender(*this), _receiver(*this), _name(), _timeout(0), _lastack(0)
00035 {
00036 stream->enableKeepalive();
00037 _node.setProtocol(Node::CONN_TCPIP);
00038 }
00039
00040 TCPConvergenceLayer::TCPConnection::~TCPConnection()
00041 {
00042 shutdown();
00043 }
00044
00045 void TCPConvergenceLayer::TCPConnection::iamfree()
00046 {
00047 _free = true;
00048 }
00049
00050 bool TCPConvergenceLayer::TCPConnection::free()
00051 {
00052 ibrcommon::MutexLock l(_freemutex);
00053 return _free;
00054 }
00055
00056 void TCPConvergenceLayer::TCPConnection::queue(const dtn::data::Bundle &bundle)
00057 {
00058 _sender.push(bundle);
00059 }
00060
00061 const StreamContactHeader TCPConvergenceLayer::TCPConnection::getHeader() const
00062 {
00063 return _peer;
00064 }
00065
00066 void TCPConvergenceLayer::TCPConnection::handshake()
00067 {
00068 char flags = 0;
00069
00070
00071 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00072 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00073
00074
00075 _stream.handshake(_name, _timeout, flags);
00076
00077
00078 _sender.start();
00079 }
00080
00081 void TCPConvergenceLayer::TCPConnection::initialize(const dtn::data::EID &name, const size_t timeout)
00082 {
00083 _name = name;
00084 _timeout = timeout;
00085
00086
00087 _receiver.start();
00088 }
00089
00090 void TCPConvergenceLayer::TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00091 {
00092 _peer = header;
00093 _node.setURI(header._localeid.getString());
00094
00095
00096 ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00097 }
00098
00099 void TCPConvergenceLayer::TCPConnection::eventConnectionDown()
00100 {
00101
00102 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00103
00104 try {
00105 while (true)
00106 {
00107 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00108
00109 if (_lastack > 0)
00110 {
00111
00112
00113 TransferAbortedEvent::raise(EID(_node.getURI()), bundle);
00114 }
00115 else
00116 {
00117
00118 TransferAbortedEvent::raise(EID(_node.getURI()), bundle);
00119 }
00120
00121
00122 _lastack = 0;
00123 }
00124 } catch (ibrcommon::Exception ex) {
00125
00126 }
00127
00128 ibrcommon::MutexLock l(_freemutex);
00129 iamfree();
00130 }
00131
00132 void TCPConvergenceLayer::TCPConnection::eventBundleRefused()
00133 {
00134 try {
00135 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00136
00137
00138 TransferAbortedEvent::raise(EID(_node.getURI()), bundle);
00139
00140
00141 _lastack = 0;
00142
00143 } catch (ibrcommon::Exception ex) {
00144
00145 }
00146 }
00147
00148 void TCPConvergenceLayer::TCPConnection::eventBundleForwarded()
00149 {
00150 try {
00151 const dtn::data::Bundle bundle = _sentqueue.frontpop();
00152
00153
00154 TransferCompletedEvent::raise(EID(_node.getURI()), bundle);
00155
00156
00157 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00158
00159
00160 _lastack = 0;
00161 } catch (ibrcommon::Exception ex) {
00162
00163 }
00164 }
00165
00166 void TCPConvergenceLayer::TCPConnection::eventBundleAck(size_t ack)
00167 {
00168 _lastack = ack;
00169 }
00170
00171 void TCPConvergenceLayer::TCPConnection::eventShutdown()
00172 {
00173
00174 _sender.shutdown();
00175
00176
00177 _receiver.shutdown();
00178
00179
00180 try {
00181 _tcpstream->done();
00182 _tcpstream->close();
00183 } catch (ibrcommon::ConnectionClosedException ex) {
00184
00185 }
00186 }
00187
00188 void TCPConvergenceLayer::TCPConnection::eventTimeout()
00189 {
00190
00191 _sender.shutdown();
00192
00193
00194 ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00195
00196
00197 try {
00198 _tcpstream->done();
00199 _tcpstream->close();
00200 } catch (ibrcommon::ConnectionClosedException ex) {
00201
00202 }
00203 }
00204
00205 void TCPConvergenceLayer::TCPConnection::eventError()
00206 {
00207
00208 _sender.shutdown();
00209
00210
00211 try {
00212 _tcpstream->close();
00213 } catch (ibrcommon::ConnectionClosedException ex) {
00214
00215 }
00216 }
00217
00218 void TCPConvergenceLayer::TCPConnection::shutdown()
00219 {
00220 _stream.shutdown();
00221 }
00222
00223 const dtn::core::Node& TCPConvergenceLayer::TCPConnection::getNode() const
00224 {
00225 return _node;
00226 }
00227
00228 void TCPConvergenceLayer::TCPConnection::rejectTransmission()
00229 {
00230 _stream.reject();
00231 }
00232
00233 TCPConvergenceLayer::TCPConnection& operator>>(TCPConvergenceLayer::TCPConnection &conn, dtn::data::Bundle &bundle)
00234 {
00235 try {
00236 dtn::data::DefaultDeserializer(conn._stream, dtn::core::BundleCore::getInstance()) >> bundle;
00237 } catch (dtn::data::Validator::RejectedException ex) {
00238
00239 conn.rejectTransmission();
00240 }
00241
00242 return conn;
00243 }
00244
00245 TCPConvergenceLayer::TCPConnection& operator<<(TCPConvergenceLayer::TCPConnection &conn, const dtn::data::Bundle &bundle)
00246 {
00247
00248 ibrcommon::TimeMeasurement m;
00249
00250
00251 dtn::data::DefaultSerializer serializer(conn._stream);
00252
00253
00254 conn._sentqueue.push(bundle);
00255
00256
00257 m.start();
00258
00259 try {
00260
00261 serializer << bundle;
00262
00263
00264 conn._stream << std::flush;
00265
00266
00267 m.stop();
00268
00269
00270 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00271
00272
00273 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00274 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00275
00276 } catch (ibrcommon::Exception ex) {
00277
00278 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00279
00280
00281 throw ex;
00282 }
00283
00284 return conn;
00285 }
00286
00287 TCPConvergenceLayer::TCPConnection::Receiver::Receiver(TCPConnection &connection)
00288 : _running(true), _connection(connection)
00289 {
00290 }
00291
00292 TCPConvergenceLayer::TCPConnection::Receiver::~Receiver()
00293 {
00294 shutdown();
00295 join();
00296 }
00297
00298 void TCPConvergenceLayer::TCPConnection::Receiver::run()
00299 {
00300 try {
00301
00302 _connection.handshake();
00303
00304 while (_running)
00305 {
00306 dtn::data::Bundle bundle;
00307
00308 try {
00309 _connection >> bundle;
00310
00311
00312 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00313 {
00314
00315 }
00316 else
00317 {
00318
00319 dtn::net::BundleReceivedEvent::raise(EID(), bundle);
00320 }
00321
00322 } catch (dtn::data::Validator::RejectedException ex) {
00323
00324 _connection.rejectTransmission();
00325 } catch (dtn::InvalidDataException ex) {
00326
00327 _connection.rejectTransmission();
00328 }
00329
00330 yield();
00331 }
00332 } catch (...) {
00333 _running = false;
00334 }
00335 }
00336
00337 void TCPConvergenceLayer::TCPConnection::Receiver::shutdown()
00338 {
00339 _running = false;
00340 }
00341
00342 TCPConvergenceLayer::TCPConnection::Sender::Sender(TCPConnection &connection)
00343 : _running(true), _connection(connection)
00344 {
00345 }
00346
00347 TCPConvergenceLayer::TCPConnection::Sender::~Sender()
00348 {
00349 shutdown();
00350 join();
00351 }
00352
00353 void TCPConvergenceLayer::TCPConnection::Sender::run()
00354 {
00355 try {
00356 while (_running)
00357 {
00358 dtn::data::Bundle bundle = blockingpop();
00359
00360
00361 _connection << bundle;
00362
00363
00364 yield();
00365 }
00366 } catch (...) {
00367 _running = false;
00368 }
00369 }
00370
00371 void TCPConvergenceLayer::TCPConnection::Sender::shutdown()
00372 {
00373 {
00374 ibrcommon::MutexLock l(*this);
00375 _running = false;
00376 signal();
00377 }
00378 }
00379 }
00380 }