IBR-DTNSuite 0.6

ibrcommon/ibrcommon/net/tcpstream.cpp

Go to the documentation of this file.
00001 /*
00002  * tcpstream.cpp
00003  *
00004  *  Created on: 29.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/Logger.h"
00010 #include "ibrcommon/net/tcpstream.h"
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include <netinet/in.h>
00013 #include <sys/types.h>
00014 #include <sys/socket.h>
00015 #include <arpa/inet.h>
00016 #include <netinet/tcp.h>
00017 #include <errno.h>
00018 #include <signal.h>
00019 #include <fcntl.h>
00020 #include <string.h>
00021 
00022 namespace ibrcommon
00023 {
00024         tcpstream::tcpstream(int socket) :
00025                 std::iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _nonblocking(false), _timeout(0)
00026         {
00027                 // create a pipe for interruption
00028                 if (pipe(_interrupt_pipe_read) < 0)
00029                 {
00030                         IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL;
00031                         throw ibrcommon::Exception("failed to create pipe");
00032                 }
00033 
00034                 // create a pipe for interruption
00035                 if (pipe(_interrupt_pipe_write) < 0)
00036                 {
00037 			::close(_interrupt_pipe_read[0]);
00038 			::close(_interrupt_pipe_read[1]);
00039                         IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL;
00040                         throw ibrcommon::Exception("failed to create pipe");
00041                 }
00042 
00043                 // prevent SIGPIPE from being thrown
00044                 int set = 1;
00045 
00046 #ifdef HAVE_FEATURES_H
00047                 ::setsockopt(_socket, SOL_SOCKET, MSG_NOSIGNAL, (void *)&set, sizeof(int));
00048 #else
00049                 ::setsockopt(_socket, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
00050 #endif
00051 
00052                 // set the pipe to non-blocking
00053                 vsocket::set_non_blocking(_interrupt_pipe_read[0]);
00054                 vsocket::set_non_blocking(_interrupt_pipe_read[1]);
00055                 vsocket::set_non_blocking(_interrupt_pipe_write[0]);
00056                 vsocket::set_non_blocking(_interrupt_pipe_write[1]);
00057 
00058                 // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00059                 setg(0, 0, 0);
00060                 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00061         }
00062 
00063         tcpstream::~tcpstream()
00064         {
00065                 delete[] in_buf_;
00066                 delete[] out_buf_;
00067 
00068                 // finally, close the socket
00069                 close();
00070 
00071                 // close all used pipes
00072 		::close(_interrupt_pipe_read[0]);
00073 		::close(_interrupt_pipe_read[1]);
00074 		::close(_interrupt_pipe_write[0]);
00075 		::close(_interrupt_pipe_write[1]);
00076         }
00077 
00078         string tcpstream::getAddress() const
00079         {
00080                 struct ::sockaddr_in sa;
00081                 int iLen = sizeof(sa);
00082 
00083                 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00084                 return inet_ntoa(sa.sin_addr);
00085         }
00086 
00087         int tcpstream::getPort() const
00088         {
00089                 struct ::sockaddr_in sa;
00090                 int iLen = sizeof(sa);
00091 
00092                 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00093                 return ntohs(sa.sin_port);
00094         }
00095 
00096         void tcpstream::interrupt()
00097         {
00098                 ::write(_interrupt_pipe_read[1], "i", 1);
00099                 ::write(_interrupt_pipe_write[1], "i", 1);
00100         }
00101 
00102         void tcpstream::close(bool errorcheck)
00103         {
00104                 static ibrcommon::Mutex close_lock;
00105                 ibrcommon::MutexLock l(close_lock);
00106 
00107                 if (_socket == -1)
00108                 {
00109                         if (errorcheck) throw ConnectionClosedException();
00110                         return;
00111                 }
00112 
00113                 int sock = _socket;
00114                 _socket = -1;
00115 
00116                 // unblock all socket operations
00117                 interrupt();
00118 
00119                 if ((::close(sock) == -1) && errorcheck)
00120                 {
00121                         throw ConnectionClosedException();
00122                 }
00123         }
00124 
00125         int tcpstream::sync()
00126         {
00127                 int ret = std::char_traits<char>::eq_int_type(this->overflow(
00128                                 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00129                                 : 0;
00130 
00131                 return ret;
00132         }
00133 
00134         int tcpstream::select(int int_pipe, bool &read, bool &write, bool &error, int timeout) throw (select_exception)
00135         {
00136                 // return if the stream was closed
00137                 if (_socket == -1)
00138                 {
00139                         throw select_exception(select_exception::SELECT_CLOSED);
00140                 }
00141 
00142                 if (!_nonblocking)
00143                 {
00144                         // set the tcp socket to non-blocking
00145                         vsocket::set_non_blocking(_socket);
00146                         
00147                         // do not set this mode again
00148                         _nonblocking = true;
00149                 }
00150 
00151                 int high_fd = _socket;
00152 
00153                 fd_set fds_read, fds_write, fds_error;
00154                 fd_set *fdsp_write = NULL, *fdsp_error = NULL;
00155 
00156                 FD_ZERO(&fds_read);
00157                 FD_ZERO(&fds_write);
00158                 FD_ZERO(&fds_error);
00159 
00160                 if (read)
00161                 {
00162                         FD_SET(_socket, &fds_read);
00163                 }
00164 
00165                 if (write)
00166                 {
00167                         FD_SET(_socket, &fds_write);
00168                         fdsp_write = &fds_write;
00169                 }
00170 
00171                 if (error)
00172                 {
00173                         FD_SET(_socket, &fds_error);
00174                         fdsp_error = &fds_error;
00175                 }
00176 
00177                 read = false;
00178                 write = false;
00179                 error = false;
00180 
00181                 // socket for the self-pipe trick
00182                 FD_SET(int_pipe, &fds_read);
00183                 if (high_fd < int_pipe) high_fd = int_pipe;
00184 
00185                 int res = 0;
00186 
00187                 // set timeout
00188                 struct timeval tv;
00189                 tv.tv_sec = timeout;
00190                 tv.tv_usec = 0;
00191 
00192                 bool _continue = true;
00193                 while (_continue)
00194                 {
00195                         if (timeout > 0)
00196                         {
00197 #ifdef HAVE_FEATURES_H
00198                                 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv);
00199 #else
00200                                 res = __nonlinux_select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv);
00201 #endif
00202                         }
00203                         else
00204                         {
00205                                 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, NULL);
00206                         }
00207 
00208                         // check for select error
00209                         if (res < 0)
00210                         {
00211                                 throw select_exception(select_exception::SELECT_ERROR);
00212                         }
00213 
00214                         // check for timeout
00215                         if (res == 0)
00216                         {
00217                                 throw select_exception(select_exception::SELECT_TIMEOUT);
00218                         }
00219 
00220                         if (FD_ISSET(int_pipe, &fds_read))
00221                         {
00222                                 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL;
00223 
00224                                 // this was an interrupt with the self-pipe-trick
00225                                 char buf[2];
00226                                 ::read(int_pipe, buf, 2);
00227                         }
00228 
00229                         // return if the stream was closed
00230                         if (_socket == -1)
00231                         {
00232                                 throw select_exception(select_exception::SELECT_CLOSED);
00233                         }
00234 
00235                         if (FD_ISSET(_socket, &fds_read))
00236                         {
00237                                 read = true;
00238                                 _continue = false;
00239                         }
00240 
00241                         if (FD_ISSET(_socket, &fds_write))
00242                         {
00243                                 write = true;
00244                                 _continue = false;
00245                         }
00246 
00247                         if (FD_ISSET(_socket, &fds_error))
00248                         {
00249                                 error = true;
00250                                 _continue = false;
00251                         }
00252                 }
00253 
00254                 return res;
00255         }
00256 
00257         int tcpstream::overflow(int c)
00258         {
00259                 char *ibegin = out_buf_;
00260                 char *iend = pptr();
00261 
00262                 // mark the buffer as free
00263                 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00264 
00265                 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00266                 {
00267                         *iend++ = std::char_traits<char>::to_char_type(c);
00268                 }
00269 
00270                 // if there is nothing to send, just return
00271                 if ((iend - ibegin) == 0)
00272                 {
00273                         IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
00274                         return std::char_traits<char>::not_eof(c);
00275                 }
00276 
00277                 try {
00278                         bool read = false, write = true, error = false;
00279                         select(_interrupt_pipe_write[0], read, write, error, _timeout);
00280 
00281                         // bytes to send
00282                         size_t bytes = (iend - ibegin);
00283 
00284                         // send the data
00285                         ssize_t ret = ::send(_socket, out_buf_, (iend - ibegin), 0);
00286 
00287                         if (ret < 0)
00288                         {
00289                                 switch (errno)
00290                                 {
00291                                 case EPIPE:
00292                                         // connection has been reset
00293                                         errmsg = ERROR_EPIPE;
00294                                         break;
00295 
00296                                 case ECONNRESET:
00297                                         // Connection reset by peer
00298                                         errmsg = ERROR_RESET;
00299                                         break;
00300 
00301                                 case EAGAIN:
00302                                         // sent failed but we should retry again
00303                                         return overflow(c);
00304 
00305                                 default:
00306                                         errmsg = ERROR_WRITE;
00307                                 }
00308 
00309                                 // failure
00310                                 close();
00311                                 std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno;
00312                                 throw ConnectionClosedException(ss.str());
00313                         }
00314                         else
00315                         {
00316                                 // check how many bytes are sent
00317                                 if ((size_t)ret < bytes)
00318                                 {
00319                                         // we did not sent all bytes
00320                                         char *resched_begin = ibegin + ret;
00321                                         char *resched_end = iend;
00322 
00323                                         // bytes left to send
00324                                         size_t bytes_left = resched_end - resched_begin;
00325 
00326                                         // move the data to the begin of the buffer
00327                                         ::memcpy(ibegin, resched_begin, bytes_left);
00328 
00329                                         // new free buffer
00330                                         char *buffer_begin = ibegin + bytes_left;
00331 
00332                                         // mark the buffer as free
00333                                         setp(buffer_begin, out_buf_ + BUFF_SIZE - 1);
00334                                 }
00335                         }
00336                 } catch (const select_exception &ex) {
00337                         // send timeout
00338                         errmsg = ERROR_WRITE;
00339                         close();
00340                         throw ConnectionClosedException("<tcpstream> send() timed out");
00341                 }
00342 
00343                 return std::char_traits<char>::not_eof(c);
00344         }
00345 
00346         int tcpstream::underflow()
00347         {
00348                 try {
00349                         bool read = true;
00350                         bool write = false;
00351                         bool error = false;
00352 
00353                         select(_interrupt_pipe_read[0], read, write, error, _timeout);
00354 
00355                         // read some bytes
00356                         int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0);
00357 
00358                         // end of stream
00359                         if (bytes == 0)
00360                         {
00361                                 errmsg = ERROR_CLOSED;
00362                                 close();
00363                                 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() returned zero: " << errno << IBRCOMMON_LOGGER_ENDL;
00364                                 return std::char_traits<char>::eof();
00365                         }
00366                         else if (bytes < 0)
00367                         {
00368                                 switch (errno)
00369                                 {
00370                                 case EPIPE:
00371                                         // connection has been reset
00372                                         errmsg = ERROR_EPIPE;
00373                                         break;
00374 
00375                                 default:
00376                                         errmsg = ERROR_READ;
00377                                 }
00378 
00379                                 close();
00380                                 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() failed: " << errno << IBRCOMMON_LOGGER_ENDL;
00381                                 return std::char_traits<char>::eof();
00382                         }
00383 
00384                         // Since the input buffer content is now valid (or is new)
00385                         // the get pointer should be initialized (or reset).
00386                         setg(in_buf_, in_buf_, in_buf_ + bytes);
00387 
00388                         return std::char_traits<char>::not_eof(in_buf_[0]);
00389                 } catch (const select_exception &ex) {
00390                         return std::char_traits<char>::eof();
00391                 }
00392         }
00393 
00394         void tcpstream::setTimeout(unsigned int value)
00395         {
00396                 _timeout = value;
00397         }
00398 
00399         void tcpstream::enableKeepalive()
00400         {
00401                 /* Set the option active */
00402                 int optval = 1;
00403                 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
00404                         throw ibrcommon::vsocket_exception("<tcpstream> can not activate keepalives");
00405                 }
00406         }
00407 
00408         void tcpstream::enableLinger(int l)
00409         {
00410                 // set linger option to the socket
00411                 struct linger linger;
00412 
00413                 linger.l_onoff = 1;
00414                 linger.l_linger = l;
00415                 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger));
00416         }
00417 
00418         void tcpstream::enableNoDelay()
00419         {
00420                 int set = 1;
00421                 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set));
00422         }
00423 
00424 }