IBR-DTNSuite 0.6

ibrcommon/ibrcommon/net/tcpstream.cpp

Go to the documentation of this file.
00001 /*
00002  * tcpstream.cpp
00003  *
00004  *  Created on: 29.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/Logger.h"
00010 #include "ibrcommon/net/tcpstream.h"
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include <netinet/in.h>
00013 #include <sys/types.h>
00014 #include <sys/socket.h>
00015 #include <arpa/inet.h>
00016 #include <netinet/tcp.h>
00017 #include <errno.h>
00018 #include <signal.h>
00019 #include <fcntl.h>
00020 #include <string.h>
00021 
00022 namespace ibrcommon
00023 {
00024         tcpstream::tcpstream(int socket) :
00025                 iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _nonblocking(false), _timeout(0)
00026         {
00027                 // create a pipe for interruption
00028                 if (pipe(_interrupt_pipe_read) < 0)
00029                 {
00030                         printf ("Error %d creating pipe\n", errno);
00031                         exit (-1);
00032                 }
00033 
00034                 // create a pipe for interruption
00035                 if (pipe(_interrupt_pipe_write) < 0)
00036                 {
00037                         printf ("Error %d creating pipe\n", errno);
00038                         exit (-1);
00039                 }
00040 
00041                 // set the pipe to non-blocking
00042                 vsocket::set_non_blocking(_interrupt_pipe_read[0]);
00043                 vsocket::set_non_blocking(_interrupt_pipe_read[1]);
00044                 vsocket::set_non_blocking(_interrupt_pipe_write[0]);
00045                 vsocket::set_non_blocking(_interrupt_pipe_write[1]);
00046 
00047                 // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00048                 setg(0, 0, 0);
00049                 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00050         }
00051 
00052         tcpstream::~tcpstream()
00053         {
00054                 delete[] in_buf_;
00055                 delete[] out_buf_;
00056 
00057                 // finally, close the socket
00058                 close();
00059         }
00060 
00061         string tcpstream::getAddress() const
00062         {
00063                 struct ::sockaddr_in sa;
00064                 int iLen = sizeof(sa);
00065 
00066                 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00067                 return inet_ntoa(sa.sin_addr);
00068         }
00069 
00070         int tcpstream::getPort() const
00071         {
00072                 struct ::sockaddr_in sa;
00073                 int iLen = sizeof(sa);
00074 
00075                 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00076                 return ntohs(sa.sin_port);
00077         }
00078 
00079         void tcpstream::interrupt()
00080         {
00081                 ::write(_interrupt_pipe_read[1], "i", 1);
00082                 ::write(_interrupt_pipe_write[1], "i", 1);
00083         }
00084 
00085         void tcpstream::close(bool errorcheck)
00086         {
00087                 static ibrcommon::Mutex close_lock;
00088                 ibrcommon::MutexLock l(close_lock);
00089 
00090                 if (_socket == -1)
00091                 {
00092                         if (errorcheck) throw ConnectionClosedException();
00093                         return;
00094                 }
00095 
00096                 int sock = _socket;
00097                 _socket = -1;
00098 
00099                 // unblock all socket operations
00100                 interrupt();
00101 
00102                 if ((::close(sock) == -1) && errorcheck)
00103                 {
00104                         throw ConnectionClosedException();
00105                 }
00106         }
00107 
00108         int tcpstream::sync()
00109         {
00110                 int ret = std::char_traits<char>::eq_int_type(this->overflow(
00111                                 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00112                                 : 0;
00113 
00114                 return ret;
00115         }
00116 
00117         int tcpstream::select(int int_pipe, bool &read, bool &write, bool &error, int timeout) throw (select_exception)
00118         {
00119                 // return if the stream was closed
00120                 if (_socket == -1)
00121                 {
00122                         throw select_exception(select_exception::SELECT_CLOSED);
00123                 }
00124 
00125                 if (!_nonblocking)
00126                 {
00127                         // set the tcp socket to non-blocking
00128                         vsocket::set_non_blocking(_socket);
00129                         
00130                         // do not set this mode again
00131                         _nonblocking = true;
00132                 }
00133 
00134                 int high_fd = _socket;
00135 
00136                 fd_set fds_read, fds_write, fds_error;
00137                 fd_set *fdsp_write = NULL, *fdsp_error = NULL;
00138 
00139                 FD_ZERO(&fds_read);
00140                 FD_ZERO(&fds_write);
00141                 FD_ZERO(&fds_error);
00142 
00143                 if (read)
00144                 {
00145                         FD_SET(_socket, &fds_read);
00146                 }
00147 
00148                 if (write)
00149                 {
00150                         FD_SET(_socket, &fds_write);
00151                         fdsp_write = &fds_write;
00152                 }
00153 
00154                 if (error)
00155                 {
00156                         FD_SET(_socket, &fds_error);
00157                         fdsp_error = &fds_error;
00158                 }
00159 
00160                 read = false;
00161                 write = false;
00162                 error = false;
00163 
00164                 // socket for the self-pipe trick
00165                 FD_SET(int_pipe, &fds_read);
00166                 if (high_fd < int_pipe) high_fd = int_pipe;
00167 
00168                 int res = 0;
00169 
00170                 // set timeout
00171                 struct timeval tv;
00172                 tv.tv_sec = timeout;
00173                 tv.tv_usec = 0;
00174 
00175                 bool _continue = true;
00176                 while (_continue)
00177                 {
00178                         if (timeout > 0)
00179                         {
00180                                 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv);
00181                         }
00182                         else
00183                         {
00184                                 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, NULL);
00185                         }
00186 
00187                         // check for select error
00188                         if (res < 0)
00189                         {
00190                                 throw select_exception(select_exception::SELECT_ERROR);
00191                         }
00192 
00193                         // check for timeout
00194                         if (res == 0)
00195                         {
00196                                 throw select_exception(select_exception::SELECT_TIMEOUT);
00197                         }
00198 
00199                         if (FD_ISSET(int_pipe, &fds_read))
00200                         {
00201                                 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL;
00202 
00203                                 // this was an interrupt with the self-pipe-trick
00204                                 char buf[2];
00205                                 ::read(int_pipe, buf, 2);
00206                         }
00207 
00208                         // return if the stream was closed
00209                         if (_socket == -1)
00210                         {
00211                                 throw select_exception(select_exception::SELECT_CLOSED);
00212                         }
00213 
00214                         if (FD_ISSET(_socket, &fds_read))
00215                         {
00216                                 read = true;
00217                                 _continue = false;
00218                         }
00219 
00220                         if (FD_ISSET(_socket, &fds_write))
00221                         {
00222                                 write = true;
00223                                 _continue = false;
00224                         }
00225 
00226                         if (FD_ISSET(_socket, &fds_error))
00227                         {
00228                                 error = true;
00229                                 _continue = false;
00230                         }
00231                 }
00232 
00233                 return res;
00234         }
00235 
00236         int tcpstream::overflow(int c)
00237         {
00238                 char *ibegin = out_buf_;
00239                 char *iend = pptr();
00240 
00241                 // mark the buffer as free
00242                 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00243 
00244                 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00245                 {
00246                         *iend++ = std::char_traits<char>::to_char_type(c);
00247                 }
00248 
00249                 // if there is nothing to send, just return
00250                 if ((iend - ibegin) == 0)
00251                 {
00252                         IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
00253                         return std::char_traits<char>::not_eof(c);
00254                 }
00255 
00256                 bool read = false, write = true, error = false;
00257                 select(_interrupt_pipe_write[0], read, write, error);
00258 
00259                 // bytes to send
00260                 size_t bytes = (iend - ibegin);
00261 
00262                 // send the data
00263                 ssize_t ret = ::send(_socket, out_buf_, (iend - ibegin), MSG_NOSIGNAL);
00264 
00265                 if (ret < 0)
00266                 {
00267                         switch (errno)
00268                         {
00269                         case EPIPE:
00270                                 // connection has been reset
00271                                 errmsg = ERROR_EPIPE;
00272                                 break;
00273 
00274                         case ECONNRESET:
00275                                 // Connection reset by peer
00276                                 errmsg = ERROR_RESET;
00277                                 break;
00278 
00279                         case EAGAIN:
00280                                 // sent failed but we should retry again
00281                                 return overflow(c);
00282 
00283                         default:
00284                                 errmsg = ERROR_WRITE;
00285                         }
00286 
00287                         // failure
00288                         close();
00289                         std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno;
00290                         throw ConnectionClosedException(ss.str());
00291                 }
00292                 else
00293                 {
00294                         // check how many bytes are sent
00295                         if ((size_t)ret < bytes)
00296                         {
00297                                 // we did not sent all bytes
00298                                 char *resched_begin = ibegin + ret;
00299                                 char *resched_end = iend;
00300 
00301                                 // bytes left to send
00302                                 size_t bytes_left = resched_end - resched_begin;
00303 
00304                                 // move the data to the begin of the buffer
00305                                 ::memcpy(ibegin, resched_begin, bytes_left);
00306 
00307                                 // new free buffer
00308                                 char *buffer_begin = ibegin + bytes_left;
00309 
00310                                 // mark the buffer as free
00311                                 setp(buffer_begin, out_buf_ + BUFF_SIZE - 1);
00312                         }
00313                 }
00314 
00315                 return std::char_traits<char>::not_eof(c);
00316         }
00317 
00318         int tcpstream::underflow()
00319         {
00320                 try {
00321                         bool read = true;
00322                         bool write = false;
00323                         bool error = false;
00324 
00325                         select(_interrupt_pipe_read[0], read, write, error, _timeout);
00326 
00327                         // read some bytes
00328                         int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0);
00329 
00330                         // end of stream
00331                         if (bytes == 0)
00332                         {
00333                                 errmsg = ERROR_CLOSED;
00334                                 close();
00335                                 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() returned zero: " << errno << IBRCOMMON_LOGGER_ENDL;
00336                                 return std::char_traits<char>::eof();
00337                         }
00338                         else if (bytes < 0)
00339                         {
00340                                 switch (errno)
00341                                 {
00342                                 case EPIPE:
00343                                         // connection has been reset
00344                                         errmsg = ERROR_EPIPE;
00345                                         break;
00346 
00347                                 default:
00348                                         errmsg = ERROR_READ;
00349                                 }
00350 
00351                                 close();
00352                                 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() failed: " << errno << IBRCOMMON_LOGGER_ENDL;
00353                                 return std::char_traits<char>::eof();
00354                         }
00355 
00356                         // Since the input buffer content is now valid (or is new)
00357                         // the get pointer should be initialized (or reset).
00358                         setg(in_buf_, in_buf_, in_buf_ + bytes);
00359 
00360                         return std::char_traits<char>::not_eof(in_buf_[0]);
00361                 } catch (const select_exception &ex) {
00362                         return std::char_traits<char>::eof();
00363                 }
00364         }
00365 
00366         void tcpstream::setTimeout(unsigned int value)
00367         {
00368                 _timeout = value;
00369         }
00370 
00371         void tcpstream::enableKeepalive()
00372         {
00373                 /* Set the option active */
00374                 int optval = 1;
00375                 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
00376                         throw ibrcommon::vsocket_exception("<tcpstream> can not activate keepalives");
00377                 }
00378         }
00379 
00380         void tcpstream::enableLinger(int l)
00381         {
00382                 // set linger option to the socket
00383                 struct linger linger;
00384 
00385                 linger.l_onoff = 1;
00386                 linger.l_linger = l;
00387                 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger));
00388         }
00389 
00390         void tcpstream::enableNoDelay()
00391         {
00392                 int set = 1;
00393                 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set));
00394         }
00395 
00396 }