00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/net/tcpstream.h"
00010 #include "ibrcommon/thread/MutexLock.h"
00011 #include <netinet/in.h>
00012 #include <sys/types.h>
00013 #include <sys/socket.h>
00014 #include <arpa/inet.h>
00015 #include <errno.h>
00016
00017 namespace ibrcommon
00018 {
00019 tcpstream::tcpstream(int socket) :
00020 iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(
00021 new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _state(
00022 TCPSTREAM_CONNECTED)
00023 {
00024
00025 struct linger linger;
00026
00027 linger.l_onoff = 1;
00028 linger.l_linger = 0;
00029 setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger));
00030
00031
00032 setg(0, 0, 0);
00033 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00034 }
00035
00036 tcpstream::~tcpstream()
00037 {
00038 delete[] in_buf_;
00039 delete[] out_buf_;
00040
00041
00042 ::close(_socket);
00043 }
00044
00045 string tcpstream::getAddress() const
00046 {
00047 struct ::sockaddr_in sa;
00048 int iLen = sizeof(sa);
00049
00050 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00051 return inet_ntoa(sa.sin_addr);
00052 }
00053
00054 int tcpstream::getPort() const
00055 {
00056 struct ::sockaddr_in sa;
00057 int iLen = sizeof(sa);
00058
00059 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00060 return ntohs(sa.sin_port);
00061 }
00062
00063 void tcpstream::close()
00064 {
00065 if (_state == TCPSTREAM_HALF_CLOSED)
00066 {
00067 ::shutdown(_socket, SHUT_RDWR);
00068 _state = TCPSTREAM_CLOSED;
00069 }
00070 else if (::close(_socket) == -1)
00071 {
00072 _state = TCPSTREAM_CLOSED;
00073 throw ConnectionClosedException();
00074 }
00075
00076 _state = TCPSTREAM_CLOSED;
00077 }
00078
00079 void tcpstream::done()
00080 {
00081 flush();
00082 _state = TCPSTREAM_HALF_CLOSED;
00083 ::shutdown(_socket, SHUT_WR);
00084 }
00085
00086 int tcpstream::sync()
00087 {
00088 int ret = std::char_traits<char>::eq_int_type(this->overflow(
00089 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00090 : 0;
00091
00092 return ret;
00093 }
00094
00095 int tcpstream::overflow(int c)
00096 {
00097 if (_state >= TCPSTREAM_HALF_CLOSED)
00098 {
00099 return std::char_traits<char>::eof();
00100 }
00101
00102 char *ibegin = out_buf_;
00103 char *iend = pptr();
00104
00105
00106 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00107
00108 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00109 {
00110 *iend++ = std::char_traits<char>::to_char_type(c);
00111 }
00112
00113
00114 if ((iend - ibegin) == 0)
00115 {
00116 return std::char_traits<char>::not_eof(c);
00117 }
00118
00119
00120 if (::send(_socket, out_buf_, (iend - ibegin), MSG_NOSIGNAL) < 0)
00121 {
00122 switch (errno)
00123 {
00124 case EPIPE:
00125
00126 errmsg = ERROR_EPIPE;
00127 break;
00128
00129 case 104:
00130
00131 errmsg = ERROR_RESET;
00132 break;
00133
00134 default:
00135 errmsg = ERROR_WRITE;
00136 }
00137
00138
00139 ::shutdown(_socket, SHUT_WR);
00140 _state = TCPSTREAM_HALF_CLOSED;
00141 return std::char_traits<char>::eof();
00142 }
00143
00144 return std::char_traits<char>::not_eof(c);
00145 }
00146
00147 int tcpstream::underflow()
00148 {
00149 if (_state == TCPSTREAM_CLOSED)
00150 {
00151 return std::char_traits<char>::eof();
00152 }
00153
00154 int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0);
00155
00156
00157 if (bytes == 0)
00158 {
00159 errmsg = ERROR_CLOSED;
00160 ::shutdown(_socket, SHUT_RD);
00161 _state = TCPSTREAM_CLOSED;
00162 return std::char_traits<char>::eof();
00163 }
00164 else if (bytes < 0)
00165 {
00166 switch (errno)
00167 {
00168 case EPIPE:
00169
00170 errmsg = ERROR_EPIPE;
00171 break;
00172
00173 default:
00174 errmsg = ERROR_READ;
00175 }
00176
00177 ::shutdown(_socket, SHUT_RD);
00178 _state = TCPSTREAM_CLOSED;
00179 return std::char_traits<char>::eof();
00180 }
00181
00182
00183
00184 setg(in_buf_, in_buf_, in_buf_ + bytes);
00185
00186 return std::char_traits<char>::not_eof(in_buf_[0]);
00187 }
00188
00189 void tcpstream::enableKeepalive()
00190 {
00191
00192 int optval = 1;
00193 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
00194 throw ibrcommon::SocketException("tcpstream: can not activate keepalives");
00195 }
00196 }
00197
00198 }