00001
00002
00003
00004
00005
00006
00007
00008 #include "config.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleEvent.h"
00012 #include "core/BundleStorage.h"
00013 #include "Configuration.h"
00014
00015 #include "net/TCPConvergenceLayer.h"
00016 #include "net/BundleReceivedEvent.h"
00017 #include "net/ConnectionEvent.h"
00018 #include "net/TransferCompletedEvent.h"
00019 #include "net/TransferAbortedEvent.h"
00020 #include "routing/RequeueBundleEvent.h"
00021
00022 #include <ibrcommon/net/tcpclient.h>
00023 #include <ibrcommon/TimeMeasurement.h>
00024 #include <ibrcommon/net/vinterface.h>
00025 #include <ibrcommon/Logger.h>
00026
00027 #include <iostream>
00028 #include <iomanip>
00029
00030 #ifdef WITH_BUNDLE_SECURITY
00031 #include "security/SecurityManager.h"
00032 #endif
00033
00034 namespace dtn
00035 {
00036 namespace net
00037 {
00038
00039
00040
00041 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout)
00042 : _peer(), _node(Node::NODE_CONNECTED), _tcpstream(stream), _stream(*this, *stream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00043 _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv)
00044 {
00045 _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00046
00047 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00048 {
00049 stream->enableNoDelay();
00050 }
00051
00052 stream->enableLinger(10);
00053 stream->enableKeepalive();
00054 _node.setProtocol(Node::CONN_TCPIP);
00055 }
00056
00057 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout)
00058 : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00059 _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv)
00060 {
00061 _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00062 }
00063
00064 TCPConnection::~TCPConnection()
00065 {
00066 }
00067
00068 void TCPConnection::queue(const dtn::data::BundleID &bundle)
00069 {
00070 _sender.push(bundle);
00071 }
00072
00073 const StreamContactHeader& TCPConnection::getHeader() const
00074 {
00075 return _peer;
00076 }
00077
00078 const dtn::core::Node& TCPConnection::getNode() const
00079 {
00080 return _node;
00081 }
00082
00083 void TCPConnection::rejectTransmission()
00084 {
00085 _stream.reject();
00086 }
00087
00088 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases)
00089 {
00090 }
00091
00092 void TCPConnection::eventTimeout()
00093 {
00094
00095 ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00096
00097
00098 this->stop();
00099 }
00100
00101 void TCPConnection::eventError()
00102 {
00103 }
00104
00105 void TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00106 {
00107 _peer = header;
00108 _node.setEID(header._localeid);
00109 _keepalive_timeout = header._keepalive * 1000;
00110
00111
00112 ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00113 }
00114
00115 void TCPConnection::eventConnectionDown()
00116 {
00117 IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00118
00119 try {
00120
00121 _sender.stop();
00122 } catch (const ibrcommon::ThreadException &ex) {
00123 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00124 }
00125
00126 if (_peer._localeid != dtn::data::EID())
00127 {
00128
00129 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00130 }
00131 }
00132
00133 void TCPConnection::eventBundleRefused()
00134 {
00135 try {
00136 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00137
00138
00139 TransferAbortedEvent::raise(EID(_node.getURI()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00140
00141
00142 _lastack = 0;
00143
00144 } catch (const ibrcommon::QueueUnblockedException&) {
00145
00146 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00147 }
00148 }
00149
00150 void TCPConnection::eventBundleForwarded()
00151 {
00152 try {
00153 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00154
00155
00156 TransferCompletedEvent::raise(EID(_node.getURI()), bundle);
00157
00158
00159 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00160
00161
00162 _lastack = 0;
00163 } catch (const ibrcommon::QueueUnblockedException&) {
00164
00165 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00166 }
00167 }
00168
00169 void TCPConnection::eventBundleAck(size_t ack)
00170 {
00171 _lastack = ack;
00172 }
00173
00174 void TCPConnection::initialize()
00175 {
00176
00177 try {
00178 start();
00179 } catch (const ibrcommon::ThreadException &ex) {
00180 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00181 }
00182 }
00183
00184 void TCPConnection::shutdown()
00185 {
00186
00187 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00188
00189 try {
00190
00191 this->stop();
00192 } catch (const ibrcommon::ThreadException &ex) {
00193 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00194 }
00195 }
00196
00197 void TCPConnection::finally()
00198 {
00199 IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00200
00201
00202 try {
00203 _tcpstream->close();
00204 } catch (const ibrcommon::ConnectionClosedException&) { };
00205
00206 try {
00207
00208 _sender.stop();
00209 } catch (const std::exception&) { };
00210
00211 try {
00212 _callback.connectionDown(this);
00213 } catch (const ibrcommon::MutexException&) { };
00214
00215
00216 clearQueue();
00217 }
00218
00219 void TCPConnection::setup()
00220 {
00221
00222 try {
00223 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream);
00224 client.open(_node.getAddress(), _node.getPort());
00225
00226 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00227 {
00228 _tcpstream->enableNoDelay();
00229 }
00230
00231 _tcpstream->enableLinger(10);
00232 _tcpstream->enableKeepalive();
00233 _node.setProtocol(Node::CONN_TCPIP);
00234 } catch (const ibrcommon::tcpclient::SocketException&) {
00235
00236 IBRCOMMON_LOGGER(warning) << "connection to " << _node.getURI() << " (" << _node.getAddress() << ":" << _node.getPort() << ") failed" << IBRCOMMON_LOGGER_ENDL;
00237 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00238 throw;
00239 } catch (const bad_cast&) { };
00240 }
00241
00242 void TCPConnection::run()
00243 {
00244 try {
00245
00246 char flags = 0;
00247
00248
00249 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00250 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00251
00252
00253 _stream.handshake(_name, _timeout, flags);
00254
00255
00256 _sender.start();
00257
00258 while (!_stream.eof())
00259 {
00260 try {
00261
00262 dtn::data::Bundle bundle;
00263
00264
00265 (*this) >> bundle;
00266
00267
00268 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00269 {
00270
00271 throw dtn::data::Validator::RejectedException("destination or source EID is null");
00272 }
00273
00274
00275 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle);
00276 }
00277 catch (const dtn::data::Validator::RejectedException &ex)
00278 {
00279
00280 rejectTransmission();
00281
00282
00283 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00284 }
00285 catch (const dtn::InvalidDataException &ex) {
00286
00287 rejectTransmission();
00288
00289
00290 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00291 }
00292
00293 yield();
00294 }
00295 } catch (const ibrcommon::ThreadException &ex) {
00296 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00297 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00298 } catch (const std::exception &ex) {
00299 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00300 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00301 }
00302 }
00303
00304
00305 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle)
00306 {
00307 std::iostream &stream = conn._stream;
00308
00309
00310 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00311
00312 dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle;
00313 return conn;
00314 }
00315
00316 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle)
00317 {
00318
00319 ibrcommon::TimeMeasurement m;
00320
00321 std::iostream &stream = conn._stream;
00322
00323
00324 dtn::data::DefaultSerializer serializer(stream);
00325
00326
00327 conn._sentqueue.push(bundle);
00328
00329
00330 m.start();
00331
00332 try {
00333
00334 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00335
00336
00337 serializer << bundle;
00338
00339
00340 stream << std::flush;
00341
00342
00343 m.stop();
00344
00345
00346 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00347
00348
00349 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00350 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00351
00352 } catch (const ibrcommon::Exception &ex) {
00353
00354 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00355
00356
00357 throw;
00358 }
00359
00360 return conn;
00361 }
00362
00363 TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout)
00364 : _connection(connection), _keepalive_timeout(keepalive_timeout)
00365 {
00366 }
00367
00368 TCPConnection::Sender::~Sender()
00369 {
00370 ibrcommon::JoinableThread::join();
00371 }
00372
00373 bool TCPConnection::Sender::__cancellation()
00374 {
00375
00376 ibrcommon::Queue<dtn::data::BundleID>::abort();
00377
00378
00379 return false;
00380 }
00381
00382 void TCPConnection::Sender::run()
00383 {
00384
00385
00386 ibrcommon::Thread::CancelProtector __disable_cancellation__(false);
00387
00388 try {
00389 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00390
00391 while (true)
00392 {
00393 try {
00394 _current_transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true, _keepalive_timeout);
00395
00396 try {
00397
00398 dtn::data::Bundle bundle = storage.get(_current_transfer);
00399
00400 #ifdef WITH_BUNDLE_SECURITY
00401 const dtn::daemon::Configuration::Security::Level seclevel =
00402 dtn::daemon::Configuration::getInstance().getSecurity().getLevel();
00403
00404 if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED)
00405 {
00406 try {
00407 dtn::security::SecurityManager::getInstance().auth(bundle);
00408 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00409
00410 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00411 }
00412 }
00413 #endif
00414
00415
00416 ibrcommon::Thread::CancelProtector __enable_cancellation__(true);
00417
00418
00419 _connection << bundle;
00420
00421 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00422
00423 TransferAbortedEvent::raise(EID(_connection._node.getURI()), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00424 }
00425
00426
00427 _current_transfer = dtn::data::BundleID();
00428
00429 } catch (const ibrcommon::QueueUnblockedException &ex) {
00430 switch (ex.reason)
00431 {
00432 case ibrcommon::QueueUnblockedException::QUEUE_ERROR:
00433 case ibrcommon::QueueUnblockedException::QUEUE_ABORT:
00434 throw;
00435 case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT:
00436 {
00437
00438 _connection.keepalive();
00439 }
00440 }
00441 }
00442
00443
00444 yield();
00445 }
00446 } catch (const ibrcommon::QueueUnblockedException &ex) {
00447 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00448 return;
00449 } catch (const std::exception &ex) {
00450 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00451 }
00452
00453 _connection.stop();
00454 }
00455
00456 void TCPConnection::clearQueue()
00457 {
00458
00459 try {
00460 while (true)
00461 {
00462 const dtn::data::BundleID id = _sender.getnpop();
00463
00464
00465 dtn::routing::RequeueBundleEvent::raise(_node.getURI(), id);
00466 }
00467 } catch (const ibrcommon::QueueUnblockedException&) {
00468
00469 }
00470
00471
00472 try {
00473 while (true)
00474 {
00475 const dtn::data::BundleID id = _sentqueue.getnpop();
00476
00477 if (_lastack > 0)
00478 {
00479
00480
00481 dtn::routing::RequeueBundleEvent::raise(_node.getURI(), id);
00482 }
00483 else
00484 {
00485
00486 dtn::routing::RequeueBundleEvent::raise(_node.getURI(), id);
00487 }
00488
00489
00490 _lastack = 0;
00491 }
00492 } catch (const ibrcommon::QueueUnblockedException&) {
00493
00494 }
00495 }
00496
00497 void TCPConnection::keepalive()
00498 {
00499 _stream.keepalive();
00500 }
00501
00502 void TCPConnection::Sender::finally()
00503 {
00504
00505 if (_current_transfer != dtn::data::BundleID())
00506 {
00507 dtn::routing::RequeueBundleEvent::raise(_connection._node.getURI(), _current_transfer);
00508 }
00509 }
00510
00511 bool TCPConnection::match(const dtn::core::Node &n) const
00512 {
00513 return (_node == n);
00514 }
00515
00516 bool TCPConnection::match(const dtn::data::EID &destination) const
00517 {
00518 return (_node.getURI() == destination.getNodeEID());
00519 }
00520
00521 bool TCPConnection::match(const NodeEvent &evt) const
00522 {
00523 const dtn::core::Node &n = evt.getNode();
00524 return match(n);
00525 }
00526 }
00527 }