00001 /* 00002 * StreamConnection.cpp 00003 * 00004 * Created on: 02.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/streams/StreamConnection.h" 00009 #include "ibrdtn/data/Exceptions.h" 00010 #include "ibrdtn/streams/StreamContactHeader.h" 00011 #include "ibrdtn/streams/StreamDataSegment.h" 00012 #include "ibrdtn/data/Exceptions.h" 00013 #include <ibrcommon/TimeMeasurement.h> 00014 #include <ibrcommon/Logger.h> 00015 00016 using namespace dtn::data; 00017 00018 namespace dtn 00019 { 00020 namespace streams 00021 { 00022 StreamConnection::StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size) 00023 : std::iostream(&_buf), _callback(cb), _buf(*this, stream, buffer_size), _shutdown_reason(CONNECTION_SHUTDOWN_NOTSET) 00024 { 00025 } 00026 00027 StreamConnection::~StreamConnection() 00028 { 00029 _buf.shutdowntimers(); 00030 } 00031 00032 void StreamConnection::handshake(const dtn::data::EID &eid, const size_t timeout, const char flags) 00033 { 00034 // create a new header 00035 dtn::streams::StreamContactHeader header(eid); 00036 00037 // set timeout 00038 header._keepalive = timeout; 00039 00040 // set flags 00041 header._flags = flags; 00042 00043 // do the handshake 00044 _peer = _buf.handshake(header); 00045 00046 // signal the complete handshake 00047 _callback.eventConnectionUp(_peer); 00048 } 00049 00050 void StreamConnection::reject() 00051 { 00052 _buf.reject(); 00053 } 00054 00055 void StreamConnection::keepalive() 00056 { 00057 _buf.keepalive(); 00058 } 00059 00060 void StreamConnection::close() 00061 { 00062 } 00063 00064 void StreamConnection::shutdown(ConnectionShutdownCases csc) 00065 { 00066 if (csc == CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN) 00067 { 00068 // wait for the last ACKs 00069 _buf.wait(); 00070 } 00071 00072 // skip if another shutdown is in progress 00073 { 00074 ibrcommon::MutexLock l(_shutdown_reason_lock); 00075 if (_shutdown_reason != CONNECTION_SHUTDOWN_NOTSET) 00076 { 00077 _buf.abort(); 00078 return; 00079 } 00080 _shutdown_reason = csc; 00081 } 00082 00083 try { 00084 switch (csc) 00085 { 00086 case CONNECTION_SHUTDOWN_IDLE: 00087 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_IDLE_TIMEOUT); 00088 _buf.shutdowntimers(); 00089 _buf.abort(); 00090 _callback.eventTimeout(); 00091 break; 00092 case CONNECTION_SHUTDOWN_ERROR: 00093 _buf.shutdowntimers(); 00094 _buf.abort(); 00095 _callback.eventError(); 00096 break; 00097 case CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN: 00098 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_NONE); 00099 _buf.shutdowntimers(); 00100 _callback.eventShutdown(csc); 00101 break; 00102 case CONNECTION_SHUTDOWN_NODE_TIMEOUT: 00103 _buf.abort(); 00104 _callback.eventTimeout(); 00105 break; 00106 case CONNECTION_SHUTDOWN_PEER_SHUTDOWN: 00107 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_NONE); 00108 case CONNECTION_SHUTDOWN_NOTSET: 00109 _buf.shutdowntimers(); 00110 _buf.abort(); 00111 _callback.eventShutdown(csc); 00112 break; 00113 } 00114 } catch (const StreamConnection::StreamErrorException&) { 00115 _buf.shutdowntimers(); 00116 _callback.eventError(); 00117 } 00118 00119 _buf.close(); 00120 close(); 00121 _callback.eventConnectionDown(); 00122 } 00123 00124 void StreamConnection::eventShutdown(StreamConnection::ConnectionShutdownCases csc) 00125 { 00126 _callback.eventShutdown(csc); 00127 } 00128 00129 void StreamConnection::eventBundleAck(size_t ack) 00130 { 00131 _callback.eventBundleAck(ack); 00132 } 00133 00134 void StreamConnection::eventBundleRefused() 00135 { 00136 IBRCOMMON_LOGGER_DEBUG(20) << "bundle has been refused" << IBRCOMMON_LOGGER_ENDL; 00137 _callback.eventBundleRefused(); 00138 } 00139 00140 void StreamConnection::eventBundleForwarded() 00141 { 00142 IBRCOMMON_LOGGER_DEBUG(20) << "bundle has been forwarded" << IBRCOMMON_LOGGER_ENDL; 00143 _callback.eventBundleForwarded(); 00144 } 00145 00146 void StreamConnection::connectionTimeout() 00147 { 00148 // call superclass 00149 _callback.eventTimeout(); 00150 } 00151 } 00152 }
1.7.1