|
IBR-DTNSuite 0.6
|
00001 /* 00002 * tcpstream.cpp 00003 * 00004 * Created on: 29.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrcommon/config.h" 00009 #include "ibrcommon/Logger.h" 00010 #include "ibrcommon/net/tcpstream.h" 00011 #include "ibrcommon/thread/MutexLock.h" 00012 #include <netinet/in.h> 00013 #include <sys/types.h> 00014 #include <sys/socket.h> 00015 #include <arpa/inet.h> 00016 #include <netinet/tcp.h> 00017 #include <errno.h> 00018 #include <signal.h> 00019 #include <fcntl.h> 00020 #include <string.h> 00021 00022 namespace ibrcommon 00023 { 00024 tcpstream::tcpstream(int socket) : 00025 std::iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _nonblocking(false), _timeout(0) 00026 { 00027 // create a pipe for interruption 00028 if (pipe(_interrupt_pipe_read) < 0) 00029 { 00030 IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL; 00031 throw ibrcommon::Exception("failed to create pipe"); 00032 } 00033 00034 // create a pipe for interruption 00035 if (pipe(_interrupt_pipe_write) < 0) 00036 { 00037 ::close(_interrupt_pipe_read[0]); 00038 ::close(_interrupt_pipe_read[1]); 00039 IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL; 00040 throw ibrcommon::Exception("failed to create pipe"); 00041 } 00042 00043 // prevent SIGPIPE from being thrown 00044 int set = 1; 00045 00046 #ifdef HAVE_FEATURES_H 00047 ::setsockopt(_socket, SOL_SOCKET, MSG_NOSIGNAL, (void *)&set, sizeof(int)); 00048 #else 00049 ::setsockopt(_socket, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); 00050 #endif 00051 00052 // set the pipe to non-blocking 00053 vsocket::set_non_blocking(_interrupt_pipe_read[0]); 00054 vsocket::set_non_blocking(_interrupt_pipe_read[1]); 00055 vsocket::set_non_blocking(_interrupt_pipe_write[0]); 00056 vsocket::set_non_blocking(_interrupt_pipe_write[1]); 00057 00058 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00059 setg(0, 0, 0); 00060 setp(out_buf_, out_buf_ + BUFF_SIZE - 1); 00061 } 00062 00063 tcpstream::~tcpstream() 00064 { 00065 delete[] in_buf_; 00066 delete[] out_buf_; 00067 00068 // finally, close the socket 00069 close(); 00070 00071 // close all used pipes 00072 ::close(_interrupt_pipe_read[0]); 00073 ::close(_interrupt_pipe_read[1]); 00074 ::close(_interrupt_pipe_write[0]); 00075 ::close(_interrupt_pipe_write[1]); 00076 } 00077 00078 string tcpstream::getAddress() const 00079 { 00080 struct ::sockaddr_in sa; 00081 int iLen = sizeof(sa); 00082 00083 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen); 00084 return inet_ntoa(sa.sin_addr); 00085 } 00086 00087 int tcpstream::getPort() const 00088 { 00089 struct ::sockaddr_in sa; 00090 int iLen = sizeof(sa); 00091 00092 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen); 00093 return ntohs(sa.sin_port); 00094 } 00095 00096 void tcpstream::interrupt() 00097 { 00098 ::write(_interrupt_pipe_read[1], "i", 1); 00099 ::write(_interrupt_pipe_write[1], "i", 1); 00100 } 00101 00102 void tcpstream::close(bool errorcheck) 00103 { 00104 static ibrcommon::Mutex close_lock; 00105 ibrcommon::MutexLock l(close_lock); 00106 00107 if (_socket == -1) 00108 { 00109 if (errorcheck) throw ConnectionClosedException(); 00110 return; 00111 } 00112 00113 int sock = _socket; 00114 _socket = -1; 00115 00116 // unblock all socket operations 00117 interrupt(); 00118 00119 if ((::close(sock) == -1) && errorcheck) 00120 { 00121 throw ConnectionClosedException(); 00122 } 00123 } 00124 00125 int tcpstream::sync() 00126 { 00127 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00128 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00129 : 0; 00130 00131 return ret; 00132 } 00133 00134 int tcpstream::select(int int_pipe, bool &read, bool &write, bool &error, int timeout) throw (select_exception) 00135 { 00136 // return if the stream was closed 00137 if (_socket == -1) 00138 { 00139 throw select_exception(select_exception::SELECT_CLOSED); 00140 } 00141 00142 if (!_nonblocking) 00143 { 00144 // set the tcp socket to non-blocking 00145 vsocket::set_non_blocking(_socket); 00146 00147 // do not set this mode again 00148 _nonblocking = true; 00149 } 00150 00151 int high_fd = _socket; 00152 00153 fd_set fds_read, fds_write, fds_error; 00154 fd_set *fdsp_write = NULL, *fdsp_error = NULL; 00155 00156 FD_ZERO(&fds_read); 00157 FD_ZERO(&fds_write); 00158 FD_ZERO(&fds_error); 00159 00160 if (read) 00161 { 00162 FD_SET(_socket, &fds_read); 00163 } 00164 00165 if (write) 00166 { 00167 FD_SET(_socket, &fds_write); 00168 fdsp_write = &fds_write; 00169 } 00170 00171 if (error) 00172 { 00173 FD_SET(_socket, &fds_error); 00174 fdsp_error = &fds_error; 00175 } 00176 00177 read = false; 00178 write = false; 00179 error = false; 00180 00181 // socket for the self-pipe trick 00182 FD_SET(int_pipe, &fds_read); 00183 if (high_fd < int_pipe) high_fd = int_pipe; 00184 00185 int res = 0; 00186 00187 // set timeout 00188 struct timeval tv; 00189 tv.tv_sec = timeout; 00190 tv.tv_usec = 0; 00191 00192 bool _continue = true; 00193 while (_continue) 00194 { 00195 if (timeout > 0) 00196 { 00197 #ifdef HAVE_FEATURES_H 00198 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv); 00199 #else 00200 res = __nonlinux_select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv); 00201 #endif 00202 } 00203 else 00204 { 00205 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, NULL); 00206 } 00207 00208 // check for select error 00209 if (res < 0) 00210 { 00211 throw select_exception(select_exception::SELECT_ERROR); 00212 } 00213 00214 // check for timeout 00215 if (res == 0) 00216 { 00217 throw select_exception(select_exception::SELECT_TIMEOUT); 00218 } 00219 00220 if (FD_ISSET(int_pipe, &fds_read)) 00221 { 00222 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL; 00223 00224 // this was an interrupt with the self-pipe-trick 00225 char buf[2]; 00226 ::read(int_pipe, buf, 2); 00227 } 00228 00229 // return if the stream was closed 00230 if (_socket == -1) 00231 { 00232 throw select_exception(select_exception::SELECT_CLOSED); 00233 } 00234 00235 if (FD_ISSET(_socket, &fds_read)) 00236 { 00237 read = true; 00238 _continue = false; 00239 } 00240 00241 if (FD_ISSET(_socket, &fds_write)) 00242 { 00243 write = true; 00244 _continue = false; 00245 } 00246 00247 if (FD_ISSET(_socket, &fds_error)) 00248 { 00249 error = true; 00250 _continue = false; 00251 } 00252 } 00253 00254 return res; 00255 } 00256 00257 int tcpstream::overflow(int c) 00258 { 00259 char *ibegin = out_buf_; 00260 char *iend = pptr(); 00261 00262 // mark the buffer as free 00263 setp(out_buf_, out_buf_ + BUFF_SIZE - 1); 00264 00265 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00266 { 00267 *iend++ = std::char_traits<char>::to_char_type(c); 00268 } 00269 00270 // if there is nothing to send, just return 00271 if ((iend - ibegin) == 0) 00272 { 00273 IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL; 00274 return std::char_traits<char>::not_eof(c); 00275 } 00276 00277 try { 00278 bool read = false, write = true, error = false; 00279 select(_interrupt_pipe_write[0], read, write, error, _timeout); 00280 00281 // bytes to send 00282 size_t bytes = (iend - ibegin); 00283 00284 // send the data 00285 ssize_t ret = ::send(_socket, out_buf_, (iend - ibegin), 0); 00286 00287 if (ret < 0) 00288 { 00289 switch (errno) 00290 { 00291 case EPIPE: 00292 // connection has been reset 00293 errmsg = ERROR_EPIPE; 00294 break; 00295 00296 case ECONNRESET: 00297 // Connection reset by peer 00298 errmsg = ERROR_RESET; 00299 break; 00300 00301 case EAGAIN: 00302 // sent failed but we should retry again 00303 return overflow(c); 00304 00305 default: 00306 errmsg = ERROR_WRITE; 00307 } 00308 00309 // failure 00310 close(); 00311 std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno; 00312 throw ConnectionClosedException(ss.str()); 00313 } 00314 else 00315 { 00316 // check how many bytes are sent 00317 if ((size_t)ret < bytes) 00318 { 00319 // we did not sent all bytes 00320 char *resched_begin = ibegin + ret; 00321 char *resched_end = iend; 00322 00323 // bytes left to send 00324 size_t bytes_left = resched_end - resched_begin; 00325 00326 // move the data to the begin of the buffer 00327 ::memcpy(ibegin, resched_begin, bytes_left); 00328 00329 // new free buffer 00330 char *buffer_begin = ibegin + bytes_left; 00331 00332 // mark the buffer as free 00333 setp(buffer_begin, out_buf_ + BUFF_SIZE - 1); 00334 } 00335 } 00336 } catch (const select_exception &ex) { 00337 // send timeout 00338 errmsg = ERROR_WRITE; 00339 close(); 00340 throw ConnectionClosedException("<tcpstream> send() timed out"); 00341 } 00342 00343 return std::char_traits<char>::not_eof(c); 00344 } 00345 00346 int tcpstream::underflow() 00347 { 00348 try { 00349 bool read = true; 00350 bool write = false; 00351 bool error = false; 00352 00353 select(_interrupt_pipe_read[0], read, write, error, _timeout); 00354 00355 // read some bytes 00356 int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0); 00357 00358 // end of stream 00359 if (bytes == 0) 00360 { 00361 errmsg = ERROR_CLOSED; 00362 close(); 00363 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() returned zero: " << errno << IBRCOMMON_LOGGER_ENDL; 00364 return std::char_traits<char>::eof(); 00365 } 00366 else if (bytes < 0) 00367 { 00368 switch (errno) 00369 { 00370 case EPIPE: 00371 // connection has been reset 00372 errmsg = ERROR_EPIPE; 00373 break; 00374 00375 default: 00376 errmsg = ERROR_READ; 00377 } 00378 00379 close(); 00380 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() failed: " << errno << IBRCOMMON_LOGGER_ENDL; 00381 return std::char_traits<char>::eof(); 00382 } 00383 00384 // Since the input buffer content is now valid (or is new) 00385 // the get pointer should be initialized (or reset). 00386 setg(in_buf_, in_buf_, in_buf_ + bytes); 00387 00388 return std::char_traits<char>::not_eof(in_buf_[0]); 00389 } catch (const select_exception &ex) { 00390 return std::char_traits<char>::eof(); 00391 } 00392 } 00393 00394 void tcpstream::setTimeout(unsigned int value) 00395 { 00396 _timeout = value; 00397 } 00398 00399 void tcpstream::enableKeepalive() 00400 { 00401 /* Set the option active */ 00402 int optval = 1; 00403 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) { 00404 throw ibrcommon::vsocket_exception("<tcpstream> can not activate keepalives"); 00405 } 00406 } 00407 00408 void tcpstream::enableLinger(int l) 00409 { 00410 // set linger option to the socket 00411 struct linger linger; 00412 00413 linger.l_onoff = 1; 00414 linger.l_linger = l; 00415 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)); 00416 } 00417 00418 void tcpstream::enableNoDelay() 00419 { 00420 int set = 1; 00421 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set)); 00422 } 00423 00424 }