|
IBR-DTNSuite 0.6
|
00001 /* 00002 * dtnstream.cpp 00003 * 00004 * The application dtnstream can transfer a data stream to another instance of dtnstream. 00005 * It uses an extension block to mark the sequence of the stream. 00006 * 00007 * Created on: 23.03.2011 00008 * Author: morgenro 00009 */ 00010 00011 #include "config.h" 00012 #include "BundleStream.h" 00013 #include <ibrdtn/api/Client.h> 00014 #include <ibrdtn/data/EID.h> 00015 #include <ibrdtn/data/PayloadBlock.h> 00016 #include <ibrcommon/net/tcpclient.h> 00017 #include <ibrcommon/data/File.h> 00018 #include <ibrcommon/TimeMeasurement.h> 00019 #include <iostream> 00020 00021 unsigned int __timeout_receive__ = 0; 00022 00023 dtn::data::Block* StreamBlock::Factory::create() 00024 { 00025 return new StreamBlock(); 00026 } 00027 00028 StreamBlock::StreamBlock() 00029 : dtn::data::Block(StreamBlock::BLOCK_TYPE), _seq(0) 00030 { 00031 } 00032 00033 StreamBlock::~StreamBlock() 00034 { 00035 00036 } 00037 00038 size_t StreamBlock::getLength() const 00039 { 00040 return _seq.getLength(); 00041 } 00042 00043 std::ostream& StreamBlock::serialize(std::ostream &stream, size_t &length) const 00044 { 00045 stream << _seq; 00046 return stream; 00047 } 00048 00049 std::istream& StreamBlock::deserialize(std::istream &stream, const size_t length) 00050 { 00051 stream >> _seq; 00052 return stream; 00053 } 00054 00055 void StreamBlock::setSequenceNumber(size_t seq) 00056 { 00057 _seq = seq; 00058 } 00059 00060 size_t StreamBlock::getSequenceNumber() const 00061 { 00062 return _seq.getValue(); 00063 } 00064 00065 StreamBundle::StreamBundle() 00066 : _ref(ibrcommon::StringBLOB::create()) 00067 { 00068 StreamBlock &block = _b.push_front<StreamBlock>(); 00069 block.setSequenceNumber(0); 00070 00071 _b.push_back(_ref); 00072 } 00073 00074 StreamBundle::StreamBundle(const dtn::api::Bundle &b) 00075 : dtn::api::Bundle(b), _ref(getData()) 00076 { 00077 } 00078 00079 StreamBundle::~StreamBundle() 00080 { 00081 } 00082 00083 void StreamBundle::append(const char* data, size_t length) 00084 { 00085 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00086 (*stream).seekp(0, ios::end); 00087 (*stream).write(data, length); 00088 } 00089 00090 void StreamBundle::clear() 00091 { 00092 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00093 stream.clear(); 00094 00095 // increment the sequence number 00096 try { 00097 StreamBlock &block = _b.getBlock<StreamBlock>(); 00098 block.setSequenceNumber(block.getSequenceNumber() + 1); 00099 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00100 } 00101 00102 size_t StreamBundle::size() 00103 { 00104 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00105 return stream.size(); 00106 } 00107 00108 size_t StreamBundle::getSequenceNumber(const StreamBundle &b) 00109 { 00110 try { 00111 const StreamBlock &block = b._b.getBlock<StreamBlock>(); 00112 return block.getSequenceNumber(); 00113 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { 00114 return 0; 00115 } 00116 } 00117 00118 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer) 00119 : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk), 00120 _buffer(buffer), _chunk_offset(0), _in_seq(0) 00121 { 00122 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00123 setg(0, 0, 0); 00124 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00125 }; 00126 00127 BundleStreamBuf::~BundleStreamBuf() 00128 { 00129 delete[] _in_buf; 00130 delete[] _out_buf; 00131 }; 00132 00133 int BundleStreamBuf::sync() 00134 { 00135 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00136 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00137 : 0; 00138 00139 // send the current chunk and clear it 00140 _client << _chunk; _client.flush(); 00141 _chunk.clear(); 00142 00143 return ret; 00144 } 00145 00146 int BundleStreamBuf::overflow(int c) 00147 { 00148 char *ibegin = _in_buf; 00149 char *iend = pptr(); 00150 00151 // mark the buffer as free 00152 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00153 00154 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00155 { 00156 *iend++ = std::char_traits<char>::to_char_type(c); 00157 } 00158 00159 // if there is nothing to send, just return 00160 if ((iend - ibegin) == 0) 00161 { 00162 return std::char_traits<char>::not_eof(c); 00163 } 00164 00165 // copy data into the bundles payload 00166 _chunk.append(_in_buf, iend - ibegin); 00167 00168 // if size exceeds chunk limit, send it 00169 if (_chunk.size() > _buffer) 00170 { 00171 _client << _chunk; _client.flush(); 00172 _chunk.clear(); 00173 } 00174 00175 return std::char_traits<char>::not_eof(c); 00176 } 00177 00178 void BundleStreamBuf::received(const dtn::api::Bundle &b) 00179 { 00180 ibrcommon::MutexLock l(_chunks_cond); 00181 00182 if (StreamBundle::getSequenceNumber(b) < _in_seq) return; 00183 00184 _chunks.insert(Chunk(b)); 00185 _chunks_cond.signal(true); 00186 00187 // bundle received 00188 // std::cerr << ". " << StreamBundle::getSequenceNumber(b) << std::flush; 00189 } 00190 00191 int BundleStreamBuf::underflow() 00192 { 00193 ibrcommon::MutexLock l(_chunks_cond); 00194 00195 return __underflow(); 00196 } 00197 00198 int BundleStreamBuf::__underflow() 00199 { 00200 // receive chunks until the next sequence number is received 00201 while (_chunks.empty()) 00202 { 00203 // wait for the next bundle 00204 _chunks_cond.wait(); 00205 } 00206 00207 ibrcommon::TimeMeasurement tm; 00208 tm.start(); 00209 00210 // while not the right sequence number received -> wait 00211 while (_in_seq != (*_chunks.begin())._seq) 00212 { 00213 try { 00214 // wait for the next bundle 00215 _chunks_cond.wait(1000); 00216 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { }; 00217 00218 tm.stop(); 00219 if ((__timeout_receive__ > 0) && (tm.getSeconds() > __timeout_receive__)) 00220 { 00221 // skip the missing bundles and proceed with the last received one 00222 _in_seq = (*_chunks.begin())._seq; 00223 } 00224 } 00225 00226 // get the first chunk in the buffer 00227 const Chunk &c = (*_chunks.begin()); 00228 00229 dtn::api::Bundle b = c._bundle; 00230 ibrcommon::BLOB::Reference r = b.getData(); 00231 00232 // get stream lock 00233 ibrcommon::BLOB::iostream stream = r.iostream(); 00234 00235 // jump to the offset position 00236 (*stream).seekg(_chunk_offset, ios::beg); 00237 00238 // copy the data of the last received bundle into the buffer 00239 (*stream).read(_out_buf, BUFF_SIZE); 00240 00241 // get the read bytes 00242 size_t bytes = (*stream).gcount(); 00243 00244 if ((*stream).eof()) 00245 { 00246 // bundle consumed 00247 // std::cerr << std::endl << "# " << c._seq << std::endl << std::flush; 00248 00249 // delete the last chunk 00250 _chunks.erase(c); 00251 00252 // reset the chunk offset 00253 _chunk_offset = 0; 00254 00255 // increment sequence number 00256 _in_seq++; 00257 00258 // if no more bytes are read, get the next bundle -> call underflow() recursive 00259 if (bytes == 0) 00260 { 00261 return __underflow(); 00262 } 00263 } 00264 else 00265 { 00266 // increment the chunk offset 00267 _chunk_offset += bytes; 00268 } 00269 00270 // Since the input buffer content is now valid (or is new) 00271 // the get pointer should be initialized (or reset). 00272 setg(_out_buf, _out_buf, _out_buf + bytes); 00273 00274 return std::char_traits<char>::not_eof(_out_buf[0]); 00275 } 00276 00277 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b) 00278 : _bundle(b), _seq(StreamBundle::getSequenceNumber(b)) 00279 { 00280 } 00281 00282 BundleStreamBuf::Chunk::~Chunk() 00283 { 00284 } 00285 00286 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const 00287 { 00288 return (_seq == other._seq); 00289 } 00290 00291 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const 00292 { 00293 return (_seq < other._seq); 00294 } 00295 00296 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app) 00297 : dtn::api::Client(app, stream), _stream(stream), _buf(*this, _chunk, chunk_size) 00298 {}; 00299 00300 BundleStream::~BundleStream() {}; 00301 00302 BundleStreamBuf& BundleStream::rdbuf() 00303 { 00304 return _buf; 00305 } 00306 00307 dtn::api::Bundle& BundleStream::base() 00308 { 00309 return _chunk; 00310 } 00311 00312 void BundleStream::received(const dtn::api::Bundle &b) 00313 { 00314 _buf.received(b); 00315 } 00316 00317 void print_help() 00318 { 00319 std::cout << "-- dtnstream (IBR-DTN) --" << std::endl; 00320 std::cout << "Syntax: dtnstream [options]" << std::endl; 00321 std::cout << "* optional parameters *" << std::endl; 00322 std::cout << " -h display this text" << std::endl; 00323 std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl; 00324 std::cout << " -s <identifier> set the source identifier (e.g. stream)" << std::endl; 00325 std::cout << " -c <bytes> set the chunk size (max. size of each bundle)" << std::endl; 00326 std::cout << " -t <seconds> set the timeout of the buffer" << std::endl; 00327 std::cout << " -l <seconds> set the lifetime of stream chunks default: 30" << std::endl; 00328 std::cout << " -E request encryption on the bundle layer" << std::endl; 00329 std::cout << " -S request signature on the bundle layer" << std::endl; 00330 std::cout << " -U <socket> use UNIX domain sockets" << std::endl; 00331 } 00332 00333 int main(int argc, char *argv[]) 00334 { 00335 int opt = 0; 00336 dtn::data::EID _destination; 00337 std::string _source = "stream"; 00338 unsigned int _lifetime = 30; 00339 size_t _chunk_size = 4096; 00340 bool _bundle_encryption = false; 00341 bool _bundle_signed = false; 00342 ibrcommon::File _unixdomain; 00343 00344 while((opt = getopt(argc, argv, "hd:t:s:c:l:ESU:")) != -1) 00345 { 00346 switch (opt) 00347 { 00348 case 'h': 00349 print_help(); 00350 return 0; 00351 00352 case 'd': 00353 _destination = std::string(optarg); 00354 break; 00355 00356 case 's': 00357 _source = optarg; 00358 break; 00359 00360 case 'c': 00361 _chunk_size = atoi(optarg); 00362 break; 00363 00364 case 't': 00365 __timeout_receive__ = atoi(optarg); 00366 break; 00367 00368 case 'l': 00369 _lifetime = atoi(optarg); 00370 break; 00371 00372 case 'E': 00373 _bundle_encryption = true; 00374 break; 00375 00376 case 'S': 00377 _bundle_signed = true; 00378 break; 00379 00380 case 'U': 00381 _unixdomain = ibrcommon::File(optarg); 00382 break; 00383 00384 default: 00385 std::cout << "unknown command" << std::endl; 00386 return -1; 00387 } 00388 } 00389 00390 try { 00391 // Create a stream to the server using TCP. 00392 ibrcommon::tcpclient conn; 00393 00394 // check if the unixdomain socket exists 00395 if (_unixdomain.exists()) 00396 { 00397 // connect to the unix domain socket 00398 conn.open(_unixdomain); 00399 } 00400 else 00401 { 00402 // connect to the standard local api port 00403 conn.open("127.0.0.1", 4550); 00404 00405 // enable nodelay option 00406 conn.enableNoDelay(); 00407 } 00408 00409 // Initiate a derivated client 00410 BundleStream bs(conn, _chunk_size, _source); 00411 00412 // Connect to the server. Actually, this function initiate the 00413 // stream protocol by starting the thread and sending the contact header. 00414 bs.connect(); 00415 00416 // transmitter mode 00417 if (_destination != dtn::data::EID()) 00418 { 00419 bs.base().setDestination(_destination); 00420 bs.base().setLifetime(_lifetime); 00421 if (_bundle_encryption) bs.base().requestEncryption(); 00422 if (_bundle_signed) bs.base().requestSigned(); 00423 std::ostream stream(&bs.rdbuf()); 00424 stream << std::cin.rdbuf() << std::flush; 00425 } 00426 // receiver mode 00427 else 00428 { 00429 std::istream stream(&bs.rdbuf()); 00430 std::cout << stream.rdbuf() << std::flush; 00431 } 00432 00433 // Shutdown the client connection. 00434 bs.close(); 00435 conn.close(); 00436 } catch (const ibrcommon::tcpclient::SocketException&) { 00437 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl; 00438 return -1; 00439 } catch (...) { 00440 00441 } 00442 00443 }