00001 /* 00002 * StreamConnection.cpp 00003 * 00004 * Created on: 02.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/streams/StreamConnection.h" 00009 #include "ibrdtn/data/Exceptions.h" 00010 #include "ibrdtn/streams/StreamContactHeader.h" 00011 #include "ibrdtn/streams/StreamDataSegment.h" 00012 #include "ibrdtn/data/Exceptions.h" 00013 #include <ibrcommon/TimeMeasurement.h> 00014 #include <ibrcommon/Logger.h> 00015 00016 using namespace dtn::data; 00017 00018 namespace dtn 00019 { 00020 namespace streams 00021 { 00022 StreamConnection::StreamConnection(StreamConnection::Callback &cb, iostream &stream) 00023 : std::iostream(&_buf), _callback(cb), _in_state(CONNECTION_INITIAL), 00024 _out_state(CONNECTION_INITIAL), _buf(*this, stream), _shutdown_reason(CONNECTION_SHUTDOWN_NOTSET) 00025 { 00026 } 00027 00028 StreamConnection::~StreamConnection() 00029 { 00030 } 00031 00032 void StreamConnection::handshake(const dtn::data::EID &eid, const size_t timeout, const char flags) 00033 { 00034 // create a new header 00035 dtn::streams::StreamContactHeader header(eid); 00036 00037 // set timeout 00038 header._keepalive = timeout; 00039 00040 // set flags 00041 header._flags = flags; 00042 00043 // do the handshake 00044 _peer = _buf.handshake(header); 00045 00046 // set the stream state 00047 { 00048 ibrcommon::MutexLock out_lock(_out_state); 00049 _out_state.setState(CONNECTION_CONNECTED); 00050 00051 ibrcommon::MutexLock in_lock(_in_state); 00052 _in_state.setState(CONNECTION_CONNECTED); 00053 } 00054 00055 // signal the complete handshake 00056 _callback.eventConnectionUp(_peer); 00057 } 00058 00059 void StreamConnection::reject() 00060 { 00061 _buf.reject(); 00062 } 00063 00064 void StreamConnection::close() 00065 { 00066 { 00067 ibrcommon::MutexLock l(_in_state); 00068 _in_state.setState(CONNECTION_CLOSED); 00069 } 00070 00071 { 00072 ibrcommon::MutexLock l(_out_state); 00073 _out_state.setState(CONNECTION_CLOSED); 00074 } 00075 } 00076 00077 void StreamConnection::shutdown(ConnectionShutdownCases csc) 00078 { 00079 // skip if another shutdown is in progress 00080 { 00081 ibrcommon::MutexLock l(_shutdown_reason_lock); 00082 if (_shutdown_reason != CONNECTION_SHUTDOWN_NOTSET) return; 00083 _shutdown_reason = csc; 00084 } 00085 00086 try { 00087 switch (csc) 00088 { 00089 case CONNECTION_SHUTDOWN_IDLE: 00090 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_IDLE_TIMEOUT); 00091 _callback.eventTimeout(); 00092 break; 00093 case CONNECTION_SHUTDOWN_ERROR: 00094 _callback.eventError(); 00095 break; 00096 case CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN: 00097 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_NONE); 00098 _callback.eventShutdown(); 00099 break; 00100 case CONNECTION_SHUTDOWN_NODE_TIMEOUT: 00101 _callback.eventTimeout(); 00102 break; 00103 case CONNECTION_SHUTDOWN_PEER_SHUTDOWN: 00104 _callback.eventShutdown(); 00105 break; 00106 case CONNECTION_SHUTDOWN_NOTSET: 00107 _callback.eventShutdown(); 00108 break; 00109 } 00110 } catch (StreamConnection::StreamErrorException ex) { 00111 _callback.eventError(); 00112 } 00113 00114 _buf.close(); 00115 close(); 00116 _callback.eventConnectionDown(); 00117 } 00118 00119 void StreamConnection::eventShutdown() 00120 { 00121 _in_state.setState(CONNECTION_CLOSED); 00122 _callback.eventShutdown(); 00123 } 00124 00125 void StreamConnection::eventBundleAck(size_t ack) 00126 { 00127 _callback.eventBundleAck(ack); 00128 _in_state.signal(true); 00129 } 00130 00131 void StreamConnection::eventBundleRefused() 00132 { 00133 IBRCOMMON_LOGGER_DEBUG(20) << "bundle has been refused" << IBRCOMMON_LOGGER_ENDL; 00134 _callback.eventBundleRefused(); 00135 } 00136 00137 void StreamConnection::eventBundleForwarded() 00138 { 00139 IBRCOMMON_LOGGER_DEBUG(20) << "bundle has been forwarded" << IBRCOMMON_LOGGER_ENDL; 00140 _callback.eventBundleForwarded(); 00141 } 00142 00143 bool StreamConnection::isConnected() 00144 { 00145 if ( 00146 (_in_state.getState() > CONNECTION_INITIAL) && 00147 (_in_state.getState() < CONNECTION_CLOSED) 00148 ) 00149 return true; 00150 00151 return false; 00152 } 00153 00154 void StreamConnection::wait(const size_t timeout) 00155 { 00156 ibrcommon::MutexLock l(_in_state); 00157 ibrcommon::TimeMeasurement tm; 00158 00159 if (timeout == 0) tm.start(); 00160 00161 while (!_buf.isCompleted() && (_in_state.getState() == CONNECTION_CONNECTED)) 00162 { 00163 IBRCOMMON_LOGGER_DEBUG(15) << "StreamConnection::wait(): wait for completion of transmission" << IBRCOMMON_LOGGER_ENDL; 00164 00165 if (timeout == 0) 00166 { 00167 _in_state.wait(); 00168 } 00169 else 00170 { 00171 _in_state.wait(timeout); 00172 tm.stop(); 00173 if (tm.getMilliseconds() >= timeout) 00174 { 00175 IBRCOMMON_LOGGER_DEBUG(15) << "StreamConnection::wait(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL; 00176 return; 00177 } 00178 } 00179 } 00180 00181 if (_buf.isCompleted()) 00182 { 00183 IBRCOMMON_LOGGER_DEBUG(15) << "StreamConnection::wait(): transfer completed" << IBRCOMMON_LOGGER_ENDL; 00184 } 00185 else 00186 { 00187 IBRCOMMON_LOGGER_DEBUG(15) << "StreamConnection::wait(): transfer aborted" << IBRCOMMON_LOGGER_ENDL; 00188 } 00189 } 00190 00191 void StreamConnection::connectionTimeout() 00192 { 00193 // connection closed 00194 { 00195 ibrcommon::MutexLock l(_in_state); 00196 _in_state.setState(CONNECTION_CLOSED); 00197 } 00198 00199 { 00200 ibrcommon::MutexLock l(_out_state); 00201 _out_state.setState(CONNECTION_CLOSED); 00202 } 00203 00204 // call superclass 00205 _callback.eventTimeout(); 00206 } 00207 } 00208 }
1.6.3