Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "core/BundleCore.h"
00009 #include "core/GlobalEvent.h"
00010 #include "core/BundleEvent.h"
00011 #include "core/BundleStorage.h"
00012 #include "Configuration.h"
00013
00014 #include "net/TCPConvergenceLayer.h"
00015 #include "net/BundleReceivedEvent.h"
00016 #include "net/ConnectionEvent.h"
00017 #include "net/TransferCompletedEvent.h"
00018 #include "net/TransferAbortedEvent.h"
00019
00020 #include <ibrcommon/TimeMeasurement.h>
00021 #include <ibrcommon/net/NetInterface.h>
00022 #include <ibrcommon/Logger.h>
00023
00024 #include <iostream>
00025 #include <iomanip>
00026
00027 namespace dtn
00028 {
00029 namespace net
00030 {
00031
00032
00033
00034 TCPConvergenceLayer::TCPConnection::TCPConnection(GenericServer<TCPConnection> &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout)
00035 : GenericConnection<TCPConvergenceLayer::TCPConnection>((GenericServer<TCPConvergenceLayer::TCPConnection>&)tcpsrv), ibrcommon::DetachedThread(),
00036 _peer(), _node(Node::NODE_CONNECTED), _tcpstream(stream), _stream(*this, *stream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00037 _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0)
00038 {
00039 _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00040
00041 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00042 {
00043 stream->enableNoDelay();
00044 }
00045
00046 stream->enableLinger(10);
00047 stream->enableKeepalive();
00048 _node.setProtocol(Node::CONN_TCPIP);
00049 }
00050
00051 TCPConvergenceLayer::TCPConnection::~TCPConnection()
00052 {
00053 }
00054
00055 void TCPConvergenceLayer::TCPConnection::queue(const dtn::data::BundleID &bundle)
00056 {
00057 _sender.push(bundle);
00058 }
00059
00060 const StreamContactHeader TCPConvergenceLayer::TCPConnection::getHeader() const
00061 {
00062 return _peer;
00063 }
00064
00065 const dtn::core::Node& TCPConvergenceLayer::TCPConnection::getNode() const
00066 {
00067 return _node;
00068 }
00069
00070 void TCPConvergenceLayer::TCPConnection::rejectTransmission()
00071 {
00072 _stream.reject();
00073 }
00074
00075 void TCPConvergenceLayer::TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases csc)
00076 {
00077 }
00078
00079 void TCPConvergenceLayer::TCPConnection::eventTimeout()
00080 {
00081
00082 ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00083
00084
00085 this->stop();
00086 }
00087
00088 void TCPConvergenceLayer::TCPConnection::eventError()
00089 {
00090 }
00091
00092 void TCPConvergenceLayer::TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00093 {
00094 _peer = header;
00095 _node.setURI(header._localeid.getString());
00096 _keepalive_timeout = header._keepalive * 1000;
00097
00098
00099 ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00100 }
00101
00102 void TCPConvergenceLayer::TCPConnection::eventConnectionDown()
00103 {
00104 IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00105
00106 try {
00107
00108 _sender.stop();
00109 } catch (const ibrcommon::ThreadException &ex) {
00110 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00111 }
00112
00113 if (_peer._localeid != dtn::data::EID())
00114 {
00115
00116 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00117 }
00118 }
00119
00120 void TCPConvergenceLayer::TCPConnection::eventBundleRefused()
00121 {
00122 try {
00123 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00124
00125
00126 TransferAbortedEvent::raise(EID(_node.getURI()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00127
00128
00129 _lastack = 0;
00130
00131 } catch (ibrcommon::QueueUnblockedException) {
00132
00133 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00134 }
00135 }
00136
00137 void TCPConvergenceLayer::TCPConnection::eventBundleForwarded()
00138 {
00139 try {
00140 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00141
00142
00143 TransferCompletedEvent::raise(EID(_node.getURI()), bundle);
00144
00145
00146 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00147
00148
00149 _lastack = 0;
00150 } catch (ibrcommon::QueueUnblockedException) {
00151
00152 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00153 }
00154 }
00155
00156 void TCPConvergenceLayer::TCPConnection::eventBundleAck(size_t ack)
00157 {
00158 _lastack = ack;
00159 }
00160
00161 void TCPConvergenceLayer::TCPConnection::initialize()
00162 {
00163
00164 try {
00165 start();
00166 } catch (const ibrcommon::ThreadException &ex) {
00167 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00168 }
00169 }
00170
00171 void TCPConvergenceLayer::TCPConnection::shutdown()
00172 {
00173
00174 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00175
00176 try {
00177
00178 this->stop();
00179 } catch (const ibrcommon::ThreadException &ex) {
00180 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00181 }
00182 }
00183
00184 void TCPConvergenceLayer::TCPConnection::finally()
00185 {
00186 IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00187
00188
00189 try {
00190 _tcpstream->close();
00191 } catch (const ibrcommon::ConnectionClosedException&) { };
00192
00193 try {
00194
00195 _sender.stop();
00196 } catch (const std::exception&) { };
00197
00198 try {
00199 ibrcommon::MutexLock l(_server.mutex());
00200 _server.remove(this);
00201 } catch (const ibrcommon::MutexException&) { };
00202
00203
00204 clearQueue();
00205 }
00206
00207 void TCPConvergenceLayer::TCPConnection::run()
00208 {
00209 try {
00210
00211 char flags = 0;
00212
00213
00214 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00215 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00216
00217
00218 _stream.handshake(_name, _timeout, flags);
00219
00220
00221 _sender.start();
00222
00223 while (_stream.good())
00224 {
00225 try {
00226
00227 dtn::data::Bundle bundle;
00228
00229
00230 (*this) >> bundle;
00231
00232
00233 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00234 {
00235
00236 throw dtn::data::Validator::RejectedException("destination or source EID is null");
00237 }
00238
00239
00240 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle);
00241 }
00242 catch (const dtn::data::Validator::RejectedException &ex)
00243 {
00244
00245 rejectTransmission();
00246
00247
00248 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00249 }
00250
00251 yield();
00252 }
00253 } catch (const ibrcommon::ThreadException &ex) {
00254 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00255 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00256 } catch (const std::exception &ex) {
00257 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00258 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00259 }
00260 }
00261
00262
00263 TCPConvergenceLayer::TCPConnection& operator>>(TCPConvergenceLayer::TCPConnection &conn, dtn::data::Bundle &bundle)
00264 {
00265 std::iostream &stream = conn._stream;
00266
00267
00268 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00269
00270 dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle;
00271 return conn;
00272 }
00273
00274 TCPConvergenceLayer::TCPConnection& operator<<(TCPConvergenceLayer::TCPConnection &conn, const dtn::data::Bundle &bundle)
00275 {
00276
00277 ibrcommon::TimeMeasurement m;
00278
00279 std::iostream &stream = conn._stream;
00280
00281
00282 dtn::data::DefaultSerializer serializer(stream);
00283
00284
00285 conn._sentqueue.push(bundle);
00286
00287
00288 m.start();
00289
00290 try {
00291
00292 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00293
00294
00295 serializer << bundle;
00296
00297
00298 stream << std::flush;
00299
00300
00301 m.stop();
00302
00303
00304 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00305
00306
00307 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00308 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00309
00310 } catch (const ibrcommon::Exception &ex) {
00311
00312 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00313
00314
00315 throw;
00316 }
00317
00318 return conn;
00319 }
00320
00321 TCPConvergenceLayer::TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout)
00322 : _abort(false), _connection(connection), _keepalive_timeout(keepalive_timeout)
00323 {
00324 }
00325
00326 TCPConvergenceLayer::TCPConnection::Sender::~Sender()
00327 {
00328 join();
00329 }
00330
00331 bool TCPConvergenceLayer::TCPConnection::Sender::__cancellation()
00332 {
00333
00334 _abort = true;
00335 this->abort();
00336
00337
00338 return false;
00339 }
00340
00341 void TCPConvergenceLayer::TCPConnection::Sender::run()
00342 {
00343
00344
00345 int oldstate;
00346 ibrcommon::Thread::disableCancel(oldstate);
00347
00348 try {
00349 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00350
00351 while (!_abort)
00352 {
00353 try {
00354 dtn::data::BundleID _current_transfer = getnpop(true, _keepalive_timeout);
00355
00356 try {
00357
00358 const dtn::data::Bundle bundle = storage.get(_current_transfer);
00359
00360
00361 ibrcommon::Thread::CancelProtector cprotect(true);
00362
00363
00364 _connection << bundle;
00365
00366 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00367
00368 TransferAbortedEvent::raise(EID(_connection._node.getURI()), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00369 }
00370
00371
00372 _current_transfer = dtn::data::BundleID();
00373
00374 } catch (const ibrcommon::QueueUnblockedException &ex) {
00375 switch (ex.reason)
00376 {
00377 case ibrcommon::QueueUnblockedException::QUEUE_ERROR:
00378 case ibrcommon::QueueUnblockedException::QUEUE_ABORT:
00379 throw;
00380 case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT:
00381 {
00382
00383 _connection.keepalive();
00384 }
00385 }
00386 }
00387
00388
00389 yield();
00390 }
00391 } catch (const ibrcommon::QueueUnblockedException &ex) {
00392 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00393 return;
00394 } catch (const std::exception &ex) {
00395 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00396 }
00397
00398 _connection.stop();
00399 }
00400
00401 void TCPConvergenceLayer::TCPConnection::clearQueue()
00402 {
00403 try {
00404 while (true)
00405 {
00406 const dtn::data::BundleID id = _sender.getnpop();
00407
00408 if (_lastack > 0)
00409 {
00410
00411
00412 TransferAbortedEvent::raise(EID(_node.getURI()), id, dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN);
00413 }
00414 else
00415 {
00416
00417 TransferAbortedEvent::raise(EID(_node.getURI()), id, dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN);
00418 }
00419
00420
00421 _lastack = 0;
00422 }
00423 } catch (ibrcommon::QueueUnblockedException) {
00424
00425 }
00426 }
00427
00428 void TCPConvergenceLayer::TCPConnection::keepalive()
00429 {
00430 _stream.keepalive();
00431 }
00432
00433 void TCPConvergenceLayer::TCPConnection::Sender::finally()
00434 {
00435
00436 if (_current_transfer != dtn::data::BundleID())
00437 {
00438 TransferAbortedEvent::raise(EID(_connection._node.getURI()), _current_transfer, dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN);
00439 }
00440 }
00441 }
00442 }