IBR-DTNSuite 0.6

tools/src/dtnstream.cpp

Go to the documentation of this file.
00001 /*
00002  * dtnstream.cpp
00003  *
00004  * The application dtnstream can transfer a data stream to another instance of dtnstream.
00005  * It uses an extension block to mark the sequence of the stream.
00006  *
00007  *  Created on: 23.03.2011
00008  *      Author: morgenro
00009  */
00010 
00011 #include "config.h"
00012 #include "BundleStream.h"
00013 #include <ibrdtn/api/Client.h>
00014 #include <ibrdtn/data/EID.h>
00015 #include <ibrdtn/data/PayloadBlock.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/data/File.h>
00018 #include <ibrcommon/TimeMeasurement.h>
00019 #include <iostream>
00020 
00021 unsigned int __timeout_receive__ = 0;
00022 
00023 dtn::data::Block* StreamBlock::Factory::create()
00024 {
00025         return new StreamBlock();
00026 }
00027 
00028 StreamBlock::StreamBlock()
00029 : dtn::data::Block(StreamBlock::BLOCK_TYPE), _seq(0)
00030 {
00031 }
00032 
00033 StreamBlock::~StreamBlock()
00034 {
00035 
00036 }
00037 
00038 size_t StreamBlock::getLength() const
00039 {
00040         return _seq.getLength();
00041 }
00042 
00043 std::ostream& StreamBlock::serialize(std::ostream &stream, size_t &length) const
00044 {
00045         stream << _seq;
00046         return stream;
00047 }
00048 
00049 std::istream& StreamBlock::deserialize(std::istream &stream, const size_t length)
00050 {
00051         stream >> _seq;
00052         return stream;
00053 }
00054 
00055 void StreamBlock::setSequenceNumber(size_t seq)
00056 {
00057         _seq = seq;
00058 }
00059 
00060 size_t StreamBlock::getSequenceNumber() const
00061 {
00062         return _seq.getValue();
00063 }
00064 
00065 StreamBundle::StreamBundle()
00066  : _ref(ibrcommon::StringBLOB::create())
00067 {
00068         StreamBlock &block = _b.push_front<StreamBlock>();
00069         block.setSequenceNumber(0);
00070 
00071         _b.push_back(_ref);
00072 }
00073 
00074 StreamBundle::StreamBundle(const dtn::api::Bundle &b)
00075  : dtn::api::Bundle(b), _ref(getData())
00076 {
00077 }
00078 
00079 StreamBundle::~StreamBundle()
00080 {
00081 }
00082 
00083 void StreamBundle::append(const char* data, size_t length)
00084 {
00085         ibrcommon::BLOB::iostream stream = _ref.iostream();
00086         (*stream).seekp(0, ios::end);
00087         (*stream).write(data, length);
00088 }
00089 
00090 void StreamBundle::clear()
00091 {
00092         ibrcommon::BLOB::iostream stream = _ref.iostream();
00093         stream.clear();
00094 
00095         // increment the sequence number
00096         try {
00097                 StreamBlock &block = _b.getBlock<StreamBlock>();
00098                 block.setSequenceNumber(block.getSequenceNumber() + 1);
00099         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00100 }
00101 
00102 size_t StreamBundle::size()
00103 {
00104         ibrcommon::BLOB::iostream stream = _ref.iostream();
00105         return stream.size();
00106 }
00107 
00108 size_t StreamBundle::getSequenceNumber(const StreamBundle &b)
00109 {
00110         try {
00111                 const StreamBlock &block = b._b.getBlock<StreamBlock>();
00112                 return block.getSequenceNumber();
00113         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00114                 return 0;
00115         }
00116 }
00117 
00118 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer)
00119  : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk),
00120    _buffer(buffer), _chunk_offset(0), _in_seq(0)
00121 {
00122         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00123         setg(0, 0, 0);
00124         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00125 };
00126 
00127 BundleStreamBuf::~BundleStreamBuf()
00128 {
00129         delete[] _in_buf;
00130         delete[] _out_buf;
00131 };
00132 
00133 int BundleStreamBuf::sync()
00134 {
00135         int ret = std::char_traits<char>::eq_int_type(this->overflow(
00136                         std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00137                         : 0;
00138 
00139         // send the current chunk and clear it
00140         _client << _chunk; _client.flush();
00141         _chunk.clear();
00142 
00143         return ret;
00144 }
00145 
00146 int BundleStreamBuf::overflow(int c)
00147 {
00148         char *ibegin = _in_buf;
00149         char *iend = pptr();
00150 
00151         // mark the buffer as free
00152         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00153 
00154         if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00155         {
00156                 *iend++ = std::char_traits<char>::to_char_type(c);
00157         }
00158 
00159         // if there is nothing to send, just return
00160         if ((iend - ibegin) == 0)
00161         {
00162                 return std::char_traits<char>::not_eof(c);
00163         }
00164 
00165         // copy data into the bundles payload
00166         _chunk.append(_in_buf, iend - ibegin);
00167 
00168         // if size exceeds chunk limit, send it
00169         if (_chunk.size() > _buffer)
00170         {
00171                 _client << _chunk; _client.flush();
00172                 _chunk.clear();
00173         }
00174 
00175         return std::char_traits<char>::not_eof(c);
00176 }
00177 
00178 void BundleStreamBuf::received(const dtn::api::Bundle &b)
00179 {
00180         ibrcommon::MutexLock l(_chunks_cond);
00181 
00182         if (StreamBundle::getSequenceNumber(b) < _in_seq) return;
00183 
00184         _chunks.insert(Chunk(b));
00185         _chunks_cond.signal(true);
00186 
00187         // bundle received
00188 //      std::cerr << ". " << StreamBundle::getSequenceNumber(b) << std::flush;
00189 }
00190 
00191 int BundleStreamBuf::underflow()
00192 {
00193         ibrcommon::MutexLock l(_chunks_cond);
00194 
00195         return __underflow();
00196 }
00197 
00198 int BundleStreamBuf::__underflow()
00199 {
00200         // receive chunks until the next sequence number is received
00201         while (_chunks.empty())
00202         {
00203                 // wait for the next bundle
00204                 _chunks_cond.wait();
00205         }
00206 
00207         ibrcommon::TimeMeasurement tm;
00208         tm.start();
00209 
00210         // while not the right sequence number received -> wait
00211         while (_in_seq != (*_chunks.begin())._seq)
00212         {
00213                 try {
00214                         // wait for the next bundle
00215                         _chunks_cond.wait(1000);
00216                 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
00217 
00218                 tm.stop();
00219                 if ((__timeout_receive__ > 0) && (tm.getSeconds() > __timeout_receive__))
00220                 {
00221                         // skip the missing bundles and proceed with the last received one
00222                         _in_seq = (*_chunks.begin())._seq;
00223                 }
00224         }
00225 
00226         // get the first chunk in the buffer
00227         const Chunk &c = (*_chunks.begin());
00228 
00229         dtn::api::Bundle b = c._bundle;
00230         ibrcommon::BLOB::Reference r = b.getData();
00231 
00232         // get stream lock
00233         ibrcommon::BLOB::iostream stream = r.iostream();
00234 
00235         // jump to the offset position
00236         (*stream).seekg(_chunk_offset, ios::beg);
00237 
00238         // copy the data of the last received bundle into the buffer
00239         (*stream).read(_out_buf, BUFF_SIZE);
00240 
00241         // get the read bytes
00242         size_t bytes = (*stream).gcount();
00243 
00244         if ((*stream).eof())
00245         {
00246                 // bundle consumed
00247 //              std::cerr << std::endl << "# " << c._seq << std::endl << std::flush;
00248 
00249                 // delete the last chunk
00250                 _chunks.erase(c);
00251 
00252                 // reset the chunk offset
00253                 _chunk_offset = 0;
00254 
00255                 // increment sequence number
00256                 _in_seq++;
00257 
00258                 // if no more bytes are read, get the next bundle -> call underflow() recursive
00259                 if (bytes == 0)
00260                 {
00261                         return __underflow();
00262                 }
00263         }
00264         else
00265         {
00266                 // increment the chunk offset
00267                 _chunk_offset += bytes;
00268         }
00269         
00270         // Since the input buffer content is now valid (or is new)
00271         // the get pointer should be initialized (or reset).
00272         setg(_out_buf, _out_buf, _out_buf + bytes);
00273 
00274         return std::char_traits<char>::not_eof(_out_buf[0]);
00275 }
00276 
00277 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b)
00278  : _bundle(b), _seq(StreamBundle::getSequenceNumber(b))
00279 {
00280 }
00281 
00282 BundleStreamBuf::Chunk::~Chunk()
00283 {
00284 }
00285 
00286 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
00287 {
00288         return (_seq == other._seq);
00289 }
00290 
00291 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
00292 {
00293         return (_seq < other._seq);
00294 }
00295 
00296 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app)
00297  : dtn::api::Client(app, stream), _stream(stream), _buf(*this, _chunk, chunk_size)
00298 {};
00299 
00300 BundleStream::~BundleStream() {};
00301 
00302 BundleStreamBuf& BundleStream::rdbuf()
00303 {
00304         return _buf;
00305 }
00306 
00307 dtn::api::Bundle& BundleStream::base()
00308 {
00309         return _chunk;
00310 }
00311 
00312 void BundleStream::received(const dtn::api::Bundle &b)
00313 {
00314         _buf.received(b);
00315 }
00316 
00317 void print_help()
00318 {
00319         std::cout << "-- dtnstream (IBR-DTN) --" << std::endl;
00320         std::cout << "Syntax: dtnstream [options]"  << std::endl;
00321         std::cout << "* optional parameters *" << std::endl;
00322         std::cout << " -h               display this text" << std::endl;
00323         std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl;
00324         std::cout << " -s <identifier>  set the source identifier (e.g. stream)" << std::endl;
00325         std::cout << " -c <bytes>       set the chunk size (max. size of each bundle)" << std::endl;
00326         std::cout << " -t <seconds>     set the timeout of the buffer" << std::endl;
00327         std::cout << " -l <seconds>     set the lifetime of stream chunks default: 30" << std::endl;
00328         std::cout << " -E               request encryption on the bundle layer" << std::endl;
00329         std::cout << " -S               request signature on the bundle layer" << std::endl;
00330         std::cout << " -U <socket>      use UNIX domain sockets" << std::endl;
00331 }
00332 
00333 int main(int argc, char *argv[])
00334 {
00335         int opt = 0;
00336         dtn::data::EID _destination;
00337         std::string _source = "stream";
00338         unsigned int _lifetime = 30;
00339         size_t _chunk_size = 4096;
00340         bool _bundle_encryption = false;
00341         bool _bundle_signed = false;
00342         ibrcommon::File _unixdomain;
00343 
00344         while((opt = getopt(argc, argv, "hd:t:s:c:l:ESU:")) != -1)
00345         {
00346                 switch (opt)
00347                 {
00348                 case 'h':
00349                         print_help();
00350                         return 0;
00351 
00352                 case 'd':
00353                         _destination = std::string(optarg);
00354                         break;
00355 
00356                 case 's':
00357                         _source = optarg;
00358                         break;
00359 
00360                 case 'c':
00361                         _chunk_size = atoi(optarg);
00362                         break;
00363 
00364                 case 't':
00365                         __timeout_receive__ = atoi(optarg);
00366                         break;
00367 
00368                 case 'l':
00369                         _lifetime = atoi(optarg);
00370                         break;
00371 
00372                 case 'E':
00373                         _bundle_encryption = true;
00374                         break;
00375 
00376                 case 'S':
00377                         _bundle_signed = true;
00378                         break;
00379 
00380                 case 'U':
00381                         _unixdomain = ibrcommon::File(optarg);
00382                         break;
00383 
00384                 default:
00385                         std::cout << "unknown command" << std::endl;
00386                         return -1;
00387                 }
00388         }
00389 
00390         try {
00391                 // Create a stream to the server using TCP.
00392                 ibrcommon::tcpclient conn;
00393 
00394                 // check if the unixdomain socket exists
00395                 if (_unixdomain.exists())
00396                 {
00397                         // connect to the unix domain socket
00398                         conn.open(_unixdomain);
00399                 }
00400                 else
00401                 {
00402                         // connect to the standard local api port
00403                         conn.open("127.0.0.1", 4550);
00404 
00405                         // enable nodelay option
00406                         conn.enableNoDelay();
00407                 }
00408 
00409                 // Initiate a derivated client
00410                 BundleStream bs(conn, _chunk_size, _source);
00411 
00412                 // Connect to the server. Actually, this function initiate the
00413                 // stream protocol by starting the thread and sending the contact header.
00414                 bs.connect();
00415 
00416                 // transmitter mode
00417                 if (_destination != dtn::data::EID())
00418                 {
00419                         bs.base().setDestination(_destination);
00420                         bs.base().setLifetime(_lifetime);
00421                         if (_bundle_encryption) bs.base().requestEncryption();
00422                         if (_bundle_signed) bs.base().requestSigned();
00423                         std::ostream stream(&bs.rdbuf());
00424                         stream << std::cin.rdbuf() << std::flush;
00425                 }
00426                 // receiver mode
00427                 else
00428                 {
00429                         std::istream stream(&bs.rdbuf());
00430                         std::cout << stream.rdbuf() << std::flush;
00431                 }
00432 
00433                 // Shutdown the client connection.
00434                 bs.close();
00435                 conn.close();
00436         } catch (const ibrcommon::tcpclient::SocketException&) {
00437                 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl;
00438                 return -1;
00439         } catch (...) {
00440 
00441         }
00442 
00443 }