|
IBR-DTNSuite 0.6
|
00001 /* 00002 * TCPConnection.cpp 00003 * 00004 * Created on: 26.04.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "Configuration.h" 00010 #include "core/BundleCore.h" 00011 #include "core/BundleEvent.h" 00012 #include "core/BundleStorage.h" 00013 00014 #include "net/TCPConvergenceLayer.h" 00015 #include "net/BundleReceivedEvent.h" 00016 #include "net/ConnectionEvent.h" 00017 #include "net/TransferCompletedEvent.h" 00018 #include "net/TransferAbortedEvent.h" 00019 #include "routing/RequeueBundleEvent.h" 00020 00021 #include <ibrcommon/net/tcpclient.h> 00022 #include <ibrcommon/TimeMeasurement.h> 00023 #include <ibrcommon/net/vinterface.h> 00024 #include <ibrcommon/Logger.h> 00025 00026 #include <iostream> 00027 #include <iomanip> 00028 00029 #ifdef WITH_BUNDLE_SECURITY 00030 #include "security/SecurityManager.h" 00031 #endif 00032 00033 namespace dtn 00034 { 00035 namespace net 00036 { 00037 /* 00038 * class TCPConnection 00039 */ 00040 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout) 00041 : _peer(), _node(Node::NODE_CONNECTED), _tcpstream(stream), _stream(*this, *stream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout), 00042 _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv) 00043 { 00044 _stream.exceptions(std::ios::badbit | std::ios::eofbit); 00045 00046 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() ) 00047 { 00048 stream->enableNoDelay(); 00049 } 00050 00051 stream->enableLinger(10); 00052 stream->enableKeepalive(); 00053 00054 // add default TCP connection 00055 _node.add(dtn::core::Node::URI("0.0.0.0", Node::CONN_TCPIP)); 00056 } 00057 00058 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout) 00059 : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout), 00060 _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv) 00061 { 00062 _stream.exceptions(std::ios::badbit | std::ios::eofbit); 00063 } 00064 00065 TCPConnection::~TCPConnection() 00066 { 00067 // wait until the sender thread is finished 00068 _sender.join(); 00069 } 00070 00071 void TCPConnection::queue(const dtn::data::BundleID &bundle) 00072 { 00073 _sender.push(bundle); 00074 } 00075 00076 const StreamContactHeader& TCPConnection::getHeader() const 00077 { 00078 return _peer; 00079 } 00080 00081 const dtn::core::Node& TCPConnection::getNode() const 00082 { 00083 return _node; 00084 } 00085 00086 void TCPConnection::rejectTransmission() 00087 { 00088 _stream.reject(); 00089 } 00090 00091 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases) 00092 { 00093 } 00094 00095 void TCPConnection::eventTimeout() 00096 { 00097 // event 00098 ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node); 00099 00100 // stop the receiver thread 00101 this->stop(); 00102 } 00103 00104 void TCPConnection::eventError() 00105 { 00106 } 00107 00108 void TCPConnection::eventConnectionUp(const StreamContactHeader &header) 00109 { 00110 _peer = header; 00111 _node.setEID(header._localeid); 00112 _keepalive_timeout = header._keepalive * 1000; 00113 00114 // set the incoming timer if set (> 0) 00115 if (_peer._keepalive > 0) 00116 { 00117 // set the timer 00118 _tcpstream->setTimeout(header._keepalive * 2); 00119 } 00120 00121 // raise up event 00122 ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node); 00123 } 00124 00125 void TCPConnection::eventConnectionDown() 00126 { 00127 IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL; 00128 00129 try { 00130 // stop the sender 00131 _sender.stop(); 00132 } catch (const ibrcommon::ThreadException &ex) { 00133 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00134 } 00135 00136 if (_peer._localeid != dtn::data::EID()) 00137 { 00138 // event 00139 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node); 00140 } 00141 } 00142 00143 void TCPConnection::eventBundleRefused() 00144 { 00145 try { 00146 const dtn::data::BundleID bundle = _sentqueue.getnpop(); 00147 00148 // requeue the bundle 00149 TransferAbortedEvent::raise(EID(_node.getEID()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED); 00150 00151 // set ACK to zero 00152 _lastack = 0; 00153 00154 } catch (const ibrcommon::QueueUnblockedException&) { 00155 // pop on empty queue! 00156 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL; 00157 } 00158 } 00159 00160 void TCPConnection::eventBundleForwarded() 00161 { 00162 try { 00163 const dtn::data::MetaBundle bundle = _sentqueue.getnpop(); 00164 00165 // signal completion of the transfer 00166 TransferCompletedEvent::raise(_node.getEID(), bundle); 00167 00168 // raise bundle event 00169 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED); 00170 00171 // set ACK to zero 00172 _lastack = 0; 00173 } catch (const ibrcommon::QueueUnblockedException&) { 00174 // pop on empty queue! 00175 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL; 00176 } 00177 } 00178 00179 void TCPConnection::eventBundleAck(size_t ack) 00180 { 00181 _lastack = ack; 00182 } 00183 00184 void TCPConnection::initialize() 00185 { 00186 // start the receiver for incoming bundles + handshake 00187 try { 00188 start(); 00189 } catch (const ibrcommon::ThreadException &ex) { 00190 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00191 } 00192 } 00193 00194 void TCPConnection::shutdown() 00195 { 00196 // shutdown 00197 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00198 00199 try { 00200 // abort the connection thread 00201 ibrcommon::DetachedThread::stop(); 00202 } catch (const ibrcommon::ThreadException &ex) { 00203 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00204 } 00205 } 00206 00207 bool TCPConnection::__cancellation() 00208 { 00209 // close the stream 00210 try { 00211 (*_tcpstream).close(); 00212 } catch (const ibrcommon::ConnectionClosedException&) { }; 00213 00214 return true; 00215 } 00216 00217 void TCPConnection::finally() 00218 { 00219 IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL; 00220 00221 try { 00222 // shutdown the sender thread 00223 _sender.stop(); 00224 } catch (const std::exception&) { }; 00225 00226 // close the tcpstream 00227 try { 00228 _tcpstream->close(); 00229 } catch (const ibrcommon::ConnectionClosedException&) { }; 00230 00231 try { 00232 _callback.connectionDown(this); 00233 } catch (const ibrcommon::MutexException&) { }; 00234 00235 // clear the queue 00236 clearQueue(); 00237 } 00238 00239 void TCPConnection::setup() 00240 { 00241 // variables for address and port 00242 std::string address = "0.0.0.0"; 00243 unsigned int port = 0; 00244 00245 // try to connect to the other side 00246 try { 00247 const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP); 00248 if (uri_list.empty()) throw ibrcommon::tcpclient::SocketException("no address available to connect"); 00249 00250 // decode address and port 00251 const dtn::core::Node::URI &uri = uri_list.front(); 00252 uri.decode(address, port); 00253 00254 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream); 00255 client.open(address, port, _timeout); 00256 00257 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() ) 00258 { 00259 _tcpstream->enableNoDelay(); 00260 } 00261 00262 _tcpstream->enableLinger(10); 00263 _tcpstream->enableKeepalive(); 00264 } catch (const ibrcommon::tcpclient::SocketException&) { 00265 // error on open, requeue all bundles in the queue 00266 IBRCOMMON_LOGGER(warning) << "connection to " << _node.toString() << " (" << address << ":" << port << ") failed" << IBRCOMMON_LOGGER_ENDL; 00267 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00268 throw; 00269 } catch (const bad_cast&) { }; 00270 } 00271 00272 void TCPConnection::run() 00273 { 00274 try { 00275 // do the handshake 00276 char flags = 0; 00277 00278 // enable ACKs and NACKs 00279 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00280 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS; 00281 00282 // do the handshake 00283 _stream.handshake(_name, _timeout, flags); 00284 00285 // start the sender 00286 _sender.start(); 00287 00288 while (!_stream.eof()) 00289 { 00290 try { 00291 // create a new empty bundle 00292 dtn::data::Bundle bundle; 00293 00294 // deserialize the bundle 00295 (*this) >> bundle; 00296 00297 // check the bundle 00298 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) ) 00299 { 00300 // invalid bundle! 00301 throw dtn::data::Validator::RejectedException("destination or source EID is null"); 00302 } 00303 00304 // raise default bundle received event 00305 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle); 00306 } 00307 catch (const dtn::data::Validator::RejectedException &ex) 00308 { 00309 // bundle rejected 00310 rejectTransmission(); 00311 00312 // display the rejection 00313 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00314 } 00315 catch (const dtn::InvalidDataException &ex) { 00316 // bundle rejected 00317 rejectTransmission(); 00318 00319 // display the rejection 00320 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00321 } 00322 00323 yield(); 00324 } 00325 } catch (const ibrcommon::ThreadException &ex) { 00326 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00327 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00328 } catch (const std::exception &ex) { 00329 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00330 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00331 } 00332 } 00333 00334 00335 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle) 00336 { 00337 std::iostream &stream = conn._stream; 00338 00339 // check if the stream is still good 00340 if (!stream.good()) throw ibrcommon::IOException("stream went bad"); 00341 00342 dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle; 00343 return conn; 00344 } 00345 00346 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle) 00347 { 00348 // prepare a measurement 00349 ibrcommon::TimeMeasurement m; 00350 00351 std::iostream &stream = conn._stream; 00352 00353 // create a serializer 00354 dtn::data::DefaultSerializer serializer(stream); 00355 00356 // put the bundle into the sentqueue 00357 conn._sentqueue.push(bundle); 00358 00359 // start the measurement 00360 m.start(); 00361 00362 try { 00363 // activate exceptions for this method 00364 if (!stream.good()) throw ibrcommon::IOException("stream went bad"); 00365 00366 // transmit the bundle 00367 serializer << bundle; 00368 00369 // flush the stream 00370 stream << std::flush; 00371 00372 // stop the time measurement 00373 m.stop(); 00374 00375 // get throughput 00376 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024; 00377 00378 // print out throughput 00379 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with " 00380 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL; 00381 00382 } catch (const ibrcommon::Exception &ex) { 00383 // the connection not available 00384 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00385 00386 // forward exception 00387 throw; 00388 } 00389 00390 return conn; 00391 } 00392 00393 TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout) 00394 : _connection(connection), _keepalive_timeout(keepalive_timeout) 00395 { 00396 } 00397 00398 TCPConnection::Sender::~Sender() 00399 { 00400 } 00401 00402 bool TCPConnection::Sender::__cancellation() 00403 { 00404 // cancel the main thread in here 00405 ibrcommon::Queue<dtn::data::BundleID>::abort(); 00406 00407 return true; 00408 } 00409 00410 void TCPConnection::Sender::run() 00411 { 00412 try { 00413 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00414 00415 while (_connection.good()) 00416 { 00417 try { 00418 _current_transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true, _keepalive_timeout); 00419 00420 try { 00421 // read the bundle out of the storage 00422 dtn::data::Bundle bundle = storage.get(_current_transfer); 00423 00424 #ifdef WITH_BUNDLE_SECURITY 00425 const dtn::daemon::Configuration::Security::Level seclevel = 00426 dtn::daemon::Configuration::getInstance().getSecurity().getLevel(); 00427 00428 if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED) 00429 { 00430 try { 00431 dtn::security::SecurityManager::getInstance().auth(bundle); 00432 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00433 // sign requested, but no key is available 00434 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL; 00435 } 00436 } 00437 #endif 00438 // send bundle 00439 _connection << bundle; 00440 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00441 // send transfer aborted event 00442 TransferAbortedEvent::raise(_connection._node.getEID(), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00443 } 00444 00445 // unset the current transfer 00446 _current_transfer = dtn::data::BundleID(); 00447 00448 } catch (const ibrcommon::QueueUnblockedException &ex) { 00449 switch (ex.reason) 00450 { 00451 case ibrcommon::QueueUnblockedException::QUEUE_ERROR: 00452 case ibrcommon::QueueUnblockedException::QUEUE_ABORT: 00453 throw; 00454 case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT: 00455 { 00456 // send a keepalive 00457 _connection.keepalive(); 00458 } 00459 } 00460 } 00461 00462 // idle a little bit 00463 yield(); 00464 } 00465 } catch (const ibrcommon::QueueUnblockedException &ex) { 00466 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00467 return; 00468 } catch (const std::exception &ex) { 00469 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00470 } 00471 00472 _connection.stop(); 00473 } 00474 00475 void TCPConnection::clearQueue() 00476 { 00477 // requeue all bundles still queued 00478 try { 00479 while (true) 00480 { 00481 const dtn::data::BundleID id = _sender.getnpop(); 00482 00483 // raise transfer abort event for all bundles without an ACK 00484 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id); 00485 } 00486 } catch (const ibrcommon::QueueUnblockedException&) { 00487 // queue emtpy 00488 } 00489 00490 // requeue all bundles still in transit 00491 try { 00492 while (true) 00493 { 00494 const dtn::data::BundleID id = _sentqueue.getnpop(); 00495 00496 if (_lastack > 0) 00497 { 00498 // some data are already acknowledged, make a fragment? 00499 //TODO: make a fragment 00500 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id); 00501 } 00502 else 00503 { 00504 // raise transfer abort event for all bundles without an ACK 00505 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id); 00506 } 00507 00508 // set last ack to zero 00509 _lastack = 0; 00510 } 00511 } catch (const ibrcommon::QueueUnblockedException&) { 00512 // queue emtpy 00513 } 00514 } 00515 00516 void TCPConnection::keepalive() 00517 { 00518 _stream.keepalive(); 00519 } 00520 00521 bool TCPConnection::good() const 00522 { 00523 return _tcpstream->good(); 00524 } 00525 00526 void TCPConnection::Sender::finally() 00527 { 00528 // notify the aborted transfer of the last bundle 00529 if (_current_transfer != dtn::data::BundleID()) 00530 { 00531 dtn::routing::RequeueBundleEvent::raise(_connection._node.getEID(), _current_transfer); 00532 } 00533 } 00534 00535 bool TCPConnection::match(const dtn::core::Node &n) const 00536 { 00537 return (_node == n); 00538 } 00539 00540 bool TCPConnection::match(const dtn::data::EID &destination) const 00541 { 00542 return (_node.getEID() == destination.getNode()); 00543 } 00544 00545 bool TCPConnection::match(const NodeEvent &evt) const 00546 { 00547 const dtn::core::Node &n = evt.getNode(); 00548 return match(n); 00549 } 00550 } 00551 }