|
IBR-DTNSuite 0.6
|
00001 /* 00002 * tcpstream.cpp 00003 * 00004 * Created on: 29.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrcommon/config.h" 00009 #include "ibrcommon/Logger.h" 00010 #include "ibrcommon/net/tcpstream.h" 00011 #include "ibrcommon/thread/MutexLock.h" 00012 #include <netinet/in.h> 00013 #include <sys/types.h> 00014 #include <sys/socket.h> 00015 #include <arpa/inet.h> 00016 #include <netinet/tcp.h> 00017 #include <errno.h> 00018 #include <signal.h> 00019 #include <fcntl.h> 00020 #include <string.h> 00021 00022 namespace ibrcommon 00023 { 00024 tcpstream::tcpstream(int socket) : 00025 iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _nonblocking(false), _timeout(0) 00026 { 00027 // create a pipe for interruption 00028 if (pipe(_interrupt_pipe_read) < 0) 00029 { 00030 printf ("Error %d creating pipe\n", errno); 00031 exit (-1); 00032 } 00033 00034 // create a pipe for interruption 00035 if (pipe(_interrupt_pipe_write) < 0) 00036 { 00037 printf ("Error %d creating pipe\n", errno); 00038 exit (-1); 00039 } 00040 00041 // set the pipe to non-blocking 00042 vsocket::set_non_blocking(_interrupt_pipe_read[0]); 00043 vsocket::set_non_blocking(_interrupt_pipe_read[1]); 00044 vsocket::set_non_blocking(_interrupt_pipe_write[0]); 00045 vsocket::set_non_blocking(_interrupt_pipe_write[1]); 00046 00047 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00048 setg(0, 0, 0); 00049 setp(out_buf_, out_buf_ + BUFF_SIZE - 1); 00050 } 00051 00052 tcpstream::~tcpstream() 00053 { 00054 delete[] in_buf_; 00055 delete[] out_buf_; 00056 00057 // finally, close the socket 00058 close(); 00059 } 00060 00061 string tcpstream::getAddress() const 00062 { 00063 struct ::sockaddr_in sa; 00064 int iLen = sizeof(sa); 00065 00066 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen); 00067 return inet_ntoa(sa.sin_addr); 00068 } 00069 00070 int tcpstream::getPort() const 00071 { 00072 struct ::sockaddr_in sa; 00073 int iLen = sizeof(sa); 00074 00075 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen); 00076 return ntohs(sa.sin_port); 00077 } 00078 00079 void tcpstream::interrupt() 00080 { 00081 ::write(_interrupt_pipe_read[1], "i", 1); 00082 ::write(_interrupt_pipe_write[1], "i", 1); 00083 } 00084 00085 void tcpstream::close(bool errorcheck) 00086 { 00087 static ibrcommon::Mutex close_lock; 00088 ibrcommon::MutexLock l(close_lock); 00089 00090 if (_socket == -1) 00091 { 00092 if (errorcheck) throw ConnectionClosedException(); 00093 return; 00094 } 00095 00096 int sock = _socket; 00097 _socket = -1; 00098 00099 // unblock all socket operations 00100 interrupt(); 00101 00102 if ((::close(sock) == -1) && errorcheck) 00103 { 00104 throw ConnectionClosedException(); 00105 } 00106 } 00107 00108 int tcpstream::sync() 00109 { 00110 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00111 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00112 : 0; 00113 00114 return ret; 00115 } 00116 00117 int tcpstream::select(int int_pipe, bool &read, bool &write, bool &error, int timeout) throw (select_exception) 00118 { 00119 // return if the stream was closed 00120 if (_socket == -1) 00121 { 00122 throw select_exception(select_exception::SELECT_CLOSED); 00123 } 00124 00125 if (!_nonblocking) 00126 { 00127 // set the tcp socket to non-blocking 00128 vsocket::set_non_blocking(_socket); 00129 00130 // do not set this mode again 00131 _nonblocking = true; 00132 } 00133 00134 int high_fd = _socket; 00135 00136 fd_set fds_read, fds_write, fds_error; 00137 fd_set *fdsp_write = NULL, *fdsp_error = NULL; 00138 00139 FD_ZERO(&fds_read); 00140 FD_ZERO(&fds_write); 00141 FD_ZERO(&fds_error); 00142 00143 if (read) 00144 { 00145 FD_SET(_socket, &fds_read); 00146 } 00147 00148 if (write) 00149 { 00150 FD_SET(_socket, &fds_write); 00151 fdsp_write = &fds_write; 00152 } 00153 00154 if (error) 00155 { 00156 FD_SET(_socket, &fds_error); 00157 fdsp_error = &fds_error; 00158 } 00159 00160 read = false; 00161 write = false; 00162 error = false; 00163 00164 // socket for the self-pipe trick 00165 FD_SET(int_pipe, &fds_read); 00166 if (high_fd < int_pipe) high_fd = int_pipe; 00167 00168 int res = 0; 00169 00170 // set timeout 00171 struct timeval tv; 00172 tv.tv_sec = timeout; 00173 tv.tv_usec = 0; 00174 00175 bool _continue = true; 00176 while (_continue) 00177 { 00178 if (timeout > 0) 00179 { 00180 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv); 00181 } 00182 else 00183 { 00184 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, NULL); 00185 } 00186 00187 // check for select error 00188 if (res < 0) 00189 { 00190 throw select_exception(select_exception::SELECT_ERROR); 00191 } 00192 00193 // check for timeout 00194 if (res == 0) 00195 { 00196 throw select_exception(select_exception::SELECT_TIMEOUT); 00197 } 00198 00199 if (FD_ISSET(int_pipe, &fds_read)) 00200 { 00201 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL; 00202 00203 // this was an interrupt with the self-pipe-trick 00204 char buf[2]; 00205 ::read(int_pipe, buf, 2); 00206 } 00207 00208 // return if the stream was closed 00209 if (_socket == -1) 00210 { 00211 throw select_exception(select_exception::SELECT_CLOSED); 00212 } 00213 00214 if (FD_ISSET(_socket, &fds_read)) 00215 { 00216 read = true; 00217 _continue = false; 00218 } 00219 00220 if (FD_ISSET(_socket, &fds_write)) 00221 { 00222 write = true; 00223 _continue = false; 00224 } 00225 00226 if (FD_ISSET(_socket, &fds_error)) 00227 { 00228 error = true; 00229 _continue = false; 00230 } 00231 } 00232 00233 return res; 00234 } 00235 00236 int tcpstream::overflow(int c) 00237 { 00238 char *ibegin = out_buf_; 00239 char *iend = pptr(); 00240 00241 // mark the buffer as free 00242 setp(out_buf_, out_buf_ + BUFF_SIZE - 1); 00243 00244 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00245 { 00246 *iend++ = std::char_traits<char>::to_char_type(c); 00247 } 00248 00249 // if there is nothing to send, just return 00250 if ((iend - ibegin) == 0) 00251 { 00252 IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL; 00253 return std::char_traits<char>::not_eof(c); 00254 } 00255 00256 bool read = false, write = true, error = false; 00257 select(_interrupt_pipe_write[0], read, write, error); 00258 00259 // bytes to send 00260 size_t bytes = (iend - ibegin); 00261 00262 // send the data 00263 ssize_t ret = ::send(_socket, out_buf_, (iend - ibegin), MSG_NOSIGNAL); 00264 00265 if (ret < 0) 00266 { 00267 switch (errno) 00268 { 00269 case EPIPE: 00270 // connection has been reset 00271 errmsg = ERROR_EPIPE; 00272 break; 00273 00274 case ECONNRESET: 00275 // Connection reset by peer 00276 errmsg = ERROR_RESET; 00277 break; 00278 00279 case EAGAIN: 00280 // sent failed but we should retry again 00281 return overflow(c); 00282 00283 default: 00284 errmsg = ERROR_WRITE; 00285 } 00286 00287 // failure 00288 close(); 00289 std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno; 00290 throw ConnectionClosedException(ss.str()); 00291 } 00292 else 00293 { 00294 // check how many bytes are sent 00295 if ((size_t)ret < bytes) 00296 { 00297 // we did not sent all bytes 00298 char *resched_begin = ibegin + ret; 00299 char *resched_end = iend; 00300 00301 // bytes left to send 00302 size_t bytes_left = resched_end - resched_begin; 00303 00304 // move the data to the begin of the buffer 00305 ::memcpy(ibegin, resched_begin, bytes_left); 00306 00307 // new free buffer 00308 char *buffer_begin = ibegin + bytes_left; 00309 00310 // mark the buffer as free 00311 setp(buffer_begin, out_buf_ + BUFF_SIZE - 1); 00312 } 00313 } 00314 00315 return std::char_traits<char>::not_eof(c); 00316 } 00317 00318 int tcpstream::underflow() 00319 { 00320 try { 00321 bool read = true; 00322 bool write = false; 00323 bool error = false; 00324 00325 select(_interrupt_pipe_read[0], read, write, error, _timeout); 00326 00327 // read some bytes 00328 int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0); 00329 00330 // end of stream 00331 if (bytes == 0) 00332 { 00333 errmsg = ERROR_CLOSED; 00334 close(); 00335 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() returned zero: " << errno << IBRCOMMON_LOGGER_ENDL; 00336 return std::char_traits<char>::eof(); 00337 } 00338 else if (bytes < 0) 00339 { 00340 switch (errno) 00341 { 00342 case EPIPE: 00343 // connection has been reset 00344 errmsg = ERROR_EPIPE; 00345 break; 00346 00347 default: 00348 errmsg = ERROR_READ; 00349 } 00350 00351 close(); 00352 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() failed: " << errno << IBRCOMMON_LOGGER_ENDL; 00353 return std::char_traits<char>::eof(); 00354 } 00355 00356 // Since the input buffer content is now valid (or is new) 00357 // the get pointer should be initialized (or reset). 00358 setg(in_buf_, in_buf_, in_buf_ + bytes); 00359 00360 return std::char_traits<char>::not_eof(in_buf_[0]); 00361 } catch (const select_exception &ex) { 00362 return std::char_traits<char>::eof(); 00363 } 00364 } 00365 00366 void tcpstream::setTimeout(unsigned int value) 00367 { 00368 _timeout = value; 00369 } 00370 00371 void tcpstream::enableKeepalive() 00372 { 00373 /* Set the option active */ 00374 int optval = 1; 00375 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) { 00376 throw ibrcommon::vsocket_exception("<tcpstream> can not activate keepalives"); 00377 } 00378 } 00379 00380 void tcpstream::enableLinger(int l) 00381 { 00382 // set linger option to the socket 00383 struct linger linger; 00384 00385 linger.l_onoff = 1; 00386 linger.l_linger = l; 00387 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)); 00388 } 00389 00390 void tcpstream::enableNoDelay() 00391 { 00392 int set = 1; 00393 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set)); 00394 } 00395 00396 }