Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/Logger.h"
00010 #include "ibrcommon/net/tcpstream.h"
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include <netinet/in.h>
00013 #include <sys/types.h>
00014 #include <sys/socket.h>
00015 #include <arpa/inet.h>
00016 #include <netinet/tcp.h>
00017 #include <errno.h>
00018
00019 namespace ibrcommon
00020 {
00021 tcpstream::tcpstream(int socket) :
00022 iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(
00023 new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE])
00024 {
00025
00026 setg(0, 0, 0);
00027 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00028 }
00029
00030 tcpstream::~tcpstream()
00031 {
00032 delete[] in_buf_;
00033 delete[] out_buf_;
00034
00035
00036 close();
00037 }
00038
00039 string tcpstream::getAddress() const
00040 {
00041 struct ::sockaddr_in sa;
00042 int iLen = sizeof(sa);
00043
00044 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00045 return inet_ntoa(sa.sin_addr);
00046 }
00047
00048 int tcpstream::getPort() const
00049 {
00050 struct ::sockaddr_in sa;
00051 int iLen = sizeof(sa);
00052
00053 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00054 return ntohs(sa.sin_port);
00055 }
00056
00057 void tcpstream::close(bool errorcheck)
00058 {
00059 if ((::close(_socket) == -1) && errorcheck)
00060 {
00061 throw ConnectionClosedException();
00062 }
00063 }
00064
00065 int tcpstream::sync()
00066 {
00067 int ret = std::char_traits<char>::eq_int_type(this->overflow(
00068 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00069 : 0;
00070
00071 return ret;
00072 }
00073
00074 int tcpstream::overflow(int c)
00075 {
00076 char *ibegin = out_buf_;
00077 char *iend = pptr();
00078
00079
00080 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00081
00082 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00083 {
00084 *iend++ = std::char_traits<char>::to_char_type(c);
00085 }
00086
00087
00088 if ((iend - ibegin) == 0)
00089 {
00090 IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
00091 return std::char_traits<char>::not_eof(c);
00092 }
00093
00094
00095 if (::send(_socket, out_buf_, (iend - ibegin), MSG_NOSIGNAL) < 0)
00096 {
00097 switch (errno)
00098 {
00099 case EPIPE:
00100
00101 errmsg = ERROR_EPIPE;
00102 break;
00103
00104 case ECONNRESET:
00105
00106 errmsg = ERROR_RESET;
00107 break;
00108
00109 default:
00110 errmsg = ERROR_WRITE;
00111 }
00112
00113
00114 ::close(_socket);
00115 std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno;
00116 throw ConnectionClosedException(ss.str());
00117 }
00118
00119 return std::char_traits<char>::not_eof(c);
00120 }
00121
00122 int tcpstream::underflow()
00123 {
00124 int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0);
00125
00126
00127 if (bytes == 0)
00128 {
00129 errmsg = ERROR_CLOSED;
00130 ::close(_socket);
00131 std::stringstream ss; ss << "<tcpstream> recv() returned zero: " << errno;
00132 throw ConnectionClosedException(ss.str());
00133 }
00134 else if (bytes < 0)
00135 {
00136 switch (errno)
00137 {
00138 case EPIPE:
00139
00140 errmsg = ERROR_EPIPE;
00141 break;
00142
00143 default:
00144 errmsg = ERROR_READ;
00145 }
00146
00147 ::close(_socket);
00148 std::stringstream ss; ss << "<tcpstream> recv() failed: " << errno;
00149 throw ConnectionClosedException(ss.str());
00150 }
00151
00152
00153
00154 setg(in_buf_, in_buf_, in_buf_ + bytes);
00155
00156 return std::char_traits<char>::not_eof(in_buf_[0]);
00157 }
00158
00159 void tcpstream::enableKeepalive()
00160 {
00161
00162 int optval = 1;
00163 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
00164 throw ibrcommon::SocketException("<tcpstream> can not activate keepalives");
00165 }
00166 }
00167
00168 void tcpstream::enableLinger(int l)
00169 {
00170
00171 struct linger linger;
00172
00173 linger.l_onoff = 1;
00174 linger.l_linger = l;
00175 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger));
00176 }
00177
00178 void tcpstream::enableNoDelay()
00179 {
00180 int set = 1;
00181 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set));
00182 }
00183
00184 }