|
IBR-DTNSuite 0.6
|
00001 /* 00002 * dtnstream.cpp 00003 * 00004 * The application dtnstream can transfer a data stream to another instance of dtnstream. 00005 * It uses an extension block to mark the sequence of the stream. 00006 * 00007 * Created on: 23.03.2011 00008 * Author: morgenro 00009 */ 00010 00011 #include "config.h" 00012 #include "BundleStream.h" 00013 #include <ibrdtn/api/Client.h> 00014 #include <ibrdtn/data/EID.h> 00015 #include <ibrdtn/data/PayloadBlock.h> 00016 #include <ibrcommon/net/tcpclient.h> 00017 #include <ibrcommon/data/File.h> 00018 #include <ibrcommon/TimeMeasurement.h> 00019 #include <iostream> 00020 00021 unsigned int __timeout_receive__ = 0; 00022 00023 dtn::data::Block* StreamBlock::Factory::create() 00024 { 00025 return new StreamBlock(); 00026 } 00027 00028 StreamBlock::StreamBlock() 00029 : dtn::data::Block(StreamBlock::BLOCK_TYPE), _seq(0) 00030 { 00031 } 00032 00033 StreamBlock::~StreamBlock() 00034 { 00035 00036 } 00037 00038 size_t StreamBlock::getLength() const 00039 { 00040 return _seq.getLength(); 00041 } 00042 00043 std::ostream& StreamBlock::serialize(std::ostream &stream, size_t &length) const 00044 { 00045 stream << _seq; 00046 return stream; 00047 } 00048 00049 std::istream& StreamBlock::deserialize(std::istream &stream, const size_t length) 00050 { 00051 stream >> _seq; 00052 return stream; 00053 } 00054 00055 void StreamBlock::setSequenceNumber(size_t seq) 00056 { 00057 _seq = seq; 00058 } 00059 00060 size_t StreamBlock::getSequenceNumber() const 00061 { 00062 return _seq.getValue(); 00063 } 00064 00065 StreamBundle::StreamBundle() 00066 : _ref(ibrcommon::BLOB::create()) 00067 { 00068 StreamBlock &block = _b.push_front<StreamBlock>(); 00069 block.setSequenceNumber(0); 00070 00071 _b.push_back(_ref); 00072 } 00073 00074 StreamBundle::StreamBundle(const dtn::api::Bundle &b) 00075 : dtn::api::Bundle(b), _ref(getData()) 00076 { 00077 } 00078 00079 StreamBundle::~StreamBundle() 00080 { 00081 } 00082 00083 void StreamBundle::append(const char* data, size_t length) 00084 { 00085 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00086 (*stream).seekp(0, ios::end); 00087 (*stream).write(data, length); 00088 } 00089 00090 void StreamBundle::clear() 00091 { 00092 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00093 stream.clear(); 00094 00095 // increment the sequence number 00096 try { 00097 StreamBlock &block = _b.getBlock<StreamBlock>(); 00098 block.setSequenceNumber(block.getSequenceNumber() + 1); 00099 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00100 } 00101 00102 size_t StreamBundle::size() 00103 { 00104 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00105 return stream.size(); 00106 } 00107 00108 size_t StreamBundle::getSequenceNumber(const StreamBundle &b) 00109 { 00110 try { 00111 const StreamBlock &block = b._b.getBlock<StreamBlock>(); 00112 return block.getSequenceNumber(); 00113 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { 00114 return 0; 00115 } 00116 } 00117 00118 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer, bool wait_seq_zero) 00119 : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk), 00120 _buffer(buffer), _chunk_offset(0), _in_seq(0), _streaming(wait_seq_zero) 00121 { 00122 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00123 setg(0, 0, 0); 00124 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00125 }; 00126 00127 BundleStreamBuf::~BundleStreamBuf() 00128 { 00129 delete[] _in_buf; 00130 delete[] _out_buf; 00131 }; 00132 00133 int BundleStreamBuf::sync() 00134 { 00135 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00136 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00137 : 0; 00138 00139 // send the current chunk and clear it 00140 _client << _chunk; _client.flush(); 00141 _chunk.clear(); 00142 00143 return ret; 00144 } 00145 00146 int BundleStreamBuf::overflow(int c) 00147 { 00148 char *ibegin = _in_buf; 00149 char *iend = pptr(); 00150 00151 // mark the buffer as free 00152 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00153 00154 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00155 { 00156 *iend++ = std::char_traits<char>::to_char_type(c); 00157 } 00158 00159 // if there is nothing to send, just return 00160 if ((iend - ibegin) == 0) 00161 { 00162 return std::char_traits<char>::not_eof(c); 00163 } 00164 00165 // copy data into the bundles payload 00166 _chunk.append(_in_buf, iend - ibegin); 00167 00168 // if size exceeds chunk limit, send it 00169 if (_chunk.size() > _buffer) 00170 { 00171 _client << _chunk; _client.flush(); 00172 _chunk.clear(); 00173 } 00174 00175 return std::char_traits<char>::not_eof(c); 00176 } 00177 00178 void BundleStreamBuf::received(const dtn::api::Bundle &b) 00179 { 00180 ibrcommon::MutexLock l(_chunks_cond); 00181 00182 if (StreamBundle::getSequenceNumber(b) < _in_seq) return; 00183 00184 _chunks.insert(Chunk(b)); 00185 _chunks_cond.signal(true); 00186 00187 // bundle received 00188 // std::cerr << ". " << StreamBundle::getSequenceNumber(b) << std::flush; 00189 } 00190 00191 int BundleStreamBuf::underflow() 00192 { 00193 ibrcommon::MutexLock l(_chunks_cond); 00194 00195 return __underflow(); 00196 } 00197 00198 int BundleStreamBuf::__underflow() 00199 { 00200 // receive chunks until the next sequence number is received 00201 while (_chunks.empty()) 00202 { 00203 // wait for the next bundle 00204 _chunks_cond.wait(); 00205 } 00206 00207 ibrcommon::TimeMeasurement tm; 00208 tm.start(); 00209 00210 // while not the right sequence number received -> wait 00211 while ((_in_seq != (*_chunks.begin())._seq)) 00212 { 00213 try { 00214 // wait for the next bundle 00215 _chunks_cond.wait(1000); 00216 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { }; 00217 00218 tm.stop(); 00219 if (((__timeout_receive__ > 0) && (tm.getSeconds() > __timeout_receive__)) || !_streaming) 00220 { 00221 // skip the missing bundles and proceed with the last received one 00222 _in_seq = (*_chunks.begin())._seq; 00223 00224 // set streaming to active 00225 _streaming = true; 00226 } 00227 } 00228 00229 // get the first chunk in the buffer 00230 const Chunk &c = (*_chunks.begin()); 00231 00232 dtn::api::Bundle b = c._bundle; 00233 ibrcommon::BLOB::Reference r = b.getData(); 00234 00235 // get stream lock 00236 ibrcommon::BLOB::iostream stream = r.iostream(); 00237 00238 // jump to the offset position 00239 (*stream).seekg(_chunk_offset, ios::beg); 00240 00241 // copy the data of the last received bundle into the buffer 00242 (*stream).read(_out_buf, BUFF_SIZE); 00243 00244 // get the read bytes 00245 size_t bytes = (*stream).gcount(); 00246 00247 if ((*stream).eof()) 00248 { 00249 // bundle consumed 00250 // std::cerr << std::endl << "# " << c._seq << std::endl << std::flush; 00251 00252 // delete the last chunk 00253 _chunks.erase(c); 00254 00255 // reset the chunk offset 00256 _chunk_offset = 0; 00257 00258 // increment sequence number 00259 _in_seq++; 00260 00261 // if no more bytes are read, get the next bundle -> call underflow() recursive 00262 if (bytes == 0) 00263 { 00264 return __underflow(); 00265 } 00266 } 00267 else 00268 { 00269 // increment the chunk offset 00270 _chunk_offset += bytes; 00271 } 00272 00273 // Since the input buffer content is now valid (or is new) 00274 // the get pointer should be initialized (or reset). 00275 setg(_out_buf, _out_buf, _out_buf + bytes); 00276 00277 return std::char_traits<char>::not_eof(_out_buf[0]); 00278 } 00279 00280 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b) 00281 : _bundle(b), _seq(StreamBundle::getSequenceNumber(b)) 00282 { 00283 } 00284 00285 BundleStreamBuf::Chunk::~Chunk() 00286 { 00287 } 00288 00289 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const 00290 { 00291 return (_seq == other._seq); 00292 } 00293 00294 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const 00295 { 00296 return (_seq < other._seq); 00297 } 00298 00299 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app, const dtn::data::EID &group, bool wait_seq_zero) 00300 : dtn::api::Client(app, group, stream), _stream(stream), _buf(*this, _chunk, chunk_size, wait_seq_zero) 00301 {}; 00302 00303 BundleStream::~BundleStream() {}; 00304 00305 BundleStreamBuf& BundleStream::rdbuf() 00306 { 00307 return _buf; 00308 } 00309 00310 dtn::api::Bundle& BundleStream::base() 00311 { 00312 return _chunk; 00313 } 00314 00315 void BundleStream::received(const dtn::api::Bundle &b) 00316 { 00317 _buf.received(b); 00318 } 00319 00320 void print_help() 00321 { 00322 std::cout << "-- dtnstream (IBR-DTN) --" << std::endl; 00323 std::cout << "Syntax: dtnstream [options]" << std::endl; 00324 std::cout << "" << std::endl; 00325 std::cout << "* common options *" << std::endl; 00326 std::cout << " -h display this text" << std::endl; 00327 std::cout << " -U <socket> use UNIX domain sockets" << std::endl; 00328 std::cout << " -s <identifier> set the source identifier (e.g. stream)" << std::endl; 00329 std::cout << "" << std::endl; 00330 std::cout << "* send options *" << std::endl; 00331 std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl; 00332 std::cout << " -G destination is a group" << std::endl; 00333 std::cout << " -c <bytes> set the chunk size (max. size of each bundle)" << std::endl; 00334 std::cout << " -l <seconds> set the lifetime of stream chunks default: 30" << std::endl; 00335 std::cout << " -E request encryption on the bundle layer" << std::endl; 00336 std::cout << " -S request signature on the bundle layer" << std::endl; 00337 std::cout << "" << std::endl; 00338 std::cout << "* receive options *" << std::endl; 00339 std::cout << " -g <group> join a destination group" << std::endl; 00340 std::cout << " -t <seconds> set the timeout of the buffer" << std::endl; 00341 std::cout << " -w wait for the bundle with seq zero" << std::endl; 00342 std::cout << "" << std::endl; 00343 } 00344 00345 int main(int argc, char *argv[]) 00346 { 00347 int opt = 0; 00348 dtn::data::EID _destination; 00349 std::string _source = "stream"; 00350 unsigned int _lifetime = 30; 00351 size_t _chunk_size = 4096; 00352 dtn::data::EID _group; 00353 bool _bundle_encryption = false; 00354 bool _bundle_signed = false; 00355 bool _bundle_group = false; 00356 bool _wait_seq_zero = false; 00357 ibrcommon::File _unixdomain; 00358 00359 while((opt = getopt(argc, argv, "hg:Gd:t:s:c:l:ESU:w")) != -1) 00360 { 00361 switch (opt) 00362 { 00363 case 'h': 00364 print_help(); 00365 return 0; 00366 00367 case 'd': 00368 _destination = std::string(optarg); 00369 break; 00370 00371 case 'g': 00372 _group = std::string(optarg); 00373 break; 00374 00375 case 'G': 00376 _bundle_group = true; 00377 break; 00378 00379 case 's': 00380 _source = optarg; 00381 break; 00382 00383 case 'c': 00384 _chunk_size = atoi(optarg); 00385 break; 00386 00387 case 't': 00388 __timeout_receive__ = atoi(optarg); 00389 break; 00390 00391 case 'l': 00392 _lifetime = atoi(optarg); 00393 break; 00394 00395 case 'E': 00396 _bundle_encryption = true; 00397 break; 00398 00399 case 'S': 00400 _bundle_signed = true; 00401 break; 00402 00403 case 'U': 00404 _unixdomain = ibrcommon::File(optarg); 00405 break; 00406 00407 case 'w': 00408 _wait_seq_zero = true; 00409 break; 00410 00411 default: 00412 std::cout << "unknown command" << std::endl; 00413 return -1; 00414 } 00415 } 00416 00417 try { 00418 // Create a stream to the server using TCP. 00419 ibrcommon::tcpclient conn; 00420 00421 // check if the unixdomain socket exists 00422 if (_unixdomain.exists()) 00423 { 00424 // connect to the unix domain socket 00425 conn.open(_unixdomain); 00426 } 00427 else 00428 { 00429 // connect to the standard local api port 00430 conn.open("127.0.0.1", 4550); 00431 00432 // enable nodelay option 00433 conn.enableNoDelay(); 00434 } 00435 00436 // Initiate a derivated client 00437 BundleStream bs(conn, _chunk_size, _source, _group, _wait_seq_zero); 00438 00439 // Connect to the server. Actually, this function initiate the 00440 // stream protocol by starting the thread and sending the contact header. 00441 bs.connect(); 00442 00443 // transmitter mode 00444 if (_destination != dtn::data::EID()) 00445 { 00446 bs.base().setDestination(_destination); 00447 bs.base().setLifetime(_lifetime); 00448 if (_bundle_encryption) bs.base().requestEncryption(); 00449 if (_bundle_signed) bs.base().requestSigned(); 00450 if (_bundle_group) bs.base().setSingleton(false); 00451 std::ostream stream(&bs.rdbuf()); 00452 stream << std::cin.rdbuf() << std::flush; 00453 } 00454 // receiver mode 00455 else 00456 { 00457 std::istream stream(&bs.rdbuf()); 00458 std::cout << stream.rdbuf() << std::flush; 00459 } 00460 00461 // Shutdown the client connection. 00462 bs.close(); 00463 conn.close(); 00464 } catch (const ibrcommon::tcpclient::SocketException&) { 00465 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl; 00466 return -1; 00467 } catch (...) { 00468 00469 } 00470 00471 }