• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

tools/src/dtnstream.cpp

Go to the documentation of this file.
00001 /*
00002  * dtnstream.cpp
00003  *
00004  * The application dtnstream can transfer a data stream to another instance of dtnstream.
00005  * It uses an extension block to mark the sequence of the stream.
00006  *
00007  *  Created on: 23.03.2011
00008  *      Author: morgenro
00009  */
00010 
00011 #include "config.h"
00012 #include "BundleStream.h"
00013 #include <ibrdtn/api/Client.h>
00014 #include <ibrdtn/data/EID.h>
00015 #include <ibrdtn/data/PayloadBlock.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/data/File.h>
00018 #include <iostream>
00019 
00020 dtn::data::Block* StreamBlock::Factory::create()
00021 {
00022         return new StreamBlock();
00023 }
00024 
00025 StreamBlock::StreamBlock()
00026 : dtn::data::Block(StreamBlock::BLOCK_TYPE), _seq(0)
00027 {
00028 }
00029 
00030 StreamBlock::~StreamBlock()
00031 {
00032 
00033 }
00034 
00035 size_t StreamBlock::getLength() const
00036 {
00037         return _seq.getLength();
00038 }
00039 
00040 std::ostream& StreamBlock::serialize(std::ostream &stream) const
00041 {
00042         stream << _seq;
00043         return stream;
00044 }
00045 
00046 std::istream& StreamBlock::deserialize(std::istream &stream)
00047 {
00048         stream >> _seq;
00049         return stream;
00050 }
00051 
00052 void StreamBlock::setSequenceNumber(size_t seq)
00053 {
00054         _seq = seq;
00055 }
00056 
00057 size_t StreamBlock::getSequenceNumber() const
00058 {
00059         return _seq.getValue();
00060 }
00061 
00062 StreamBundle::StreamBundle()
00063  : _ref(ibrcommon::StringBLOB::create())
00064 {
00065         StreamBlock &block = _b.push_front<StreamBlock>();
00066         block.setSequenceNumber(0);
00067 
00068         _b.push_back(_ref);
00069 }
00070 
00071 StreamBundle::StreamBundle(const dtn::api::Bundle &b)
00072  : dtn::api::Bundle(b), _ref(getData())
00073 {
00074 }
00075 
00076 StreamBundle::~StreamBundle()
00077 {
00078 }
00079 
00080 void StreamBundle::append(const char* data, size_t length)
00081 {
00082         ibrcommon::BLOB::iostream stream = _ref.iostream();
00083         (*stream).seekp(0, ios::end);
00084         (*stream).write(data, length);
00085 }
00086 
00087 void StreamBundle::clear()
00088 {
00089         ibrcommon::BLOB::iostream stream = _ref.iostream();
00090         stream.clear();
00091 
00092         // increment the sequence number
00093         try {
00094                 StreamBlock &block = _b.getBlock<StreamBlock>();
00095                 block.setSequenceNumber(block.getSequenceNumber() + 1);
00096         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00097 }
00098 
00099 size_t StreamBundle::size()
00100 {
00101         ibrcommon::BLOB::iostream stream = _ref.iostream();
00102         return stream.size();
00103 }
00104 
00105 size_t StreamBundle::getSequenceNumber(const StreamBundle &b)
00106 {
00107         try {
00108                 const StreamBlock &block = b._b.getBlock<StreamBlock>();
00109                 return block.getSequenceNumber();
00110         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00111                 return 0;
00112         }
00113 }
00114 
00115 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer)
00116  : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk),
00117    _buffer(buffer), _chunk_offset(0), _in_seq(0)
00118 {
00119         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00120         setg(0, 0, 0);
00121         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00122 };
00123 
00124 BundleStreamBuf::~BundleStreamBuf()
00125 {
00126         delete[] _in_buf;
00127         delete[] _out_buf;
00128 };
00129 
00130 int BundleStreamBuf::sync()
00131 {
00132         int ret = std::char_traits<char>::eq_int_type(this->overflow(
00133                         std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00134                         : 0;
00135 
00136         // send the current chunk and clear it
00137         _client << _chunk; _client.flush();
00138         _chunk.clear();
00139 
00140         return ret;
00141 }
00142 
00143 int BundleStreamBuf::overflow(int c)
00144 {
00145         char *ibegin = _in_buf;
00146         char *iend = pptr();
00147 
00148         // mark the buffer as free
00149         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00150 
00151         if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00152         {
00153                 *iend++ = std::char_traits<char>::to_char_type(c);
00154         }
00155 
00156         // if there is nothing to send, just return
00157         if ((iend - ibegin) == 0)
00158         {
00159                 return std::char_traits<char>::not_eof(c);
00160         }
00161 
00162         // copy data into the bundles payload
00163         _chunk.append(_in_buf, iend - ibegin);
00164 
00165         // if size exceeds chunk limit, send it
00166         if (_chunk.size() > _buffer)
00167         {
00168                 _client << _chunk; _client.flush();
00169                 _chunk.clear();
00170         }
00171 
00172         return std::char_traits<char>::not_eof(c);
00173 }
00174 
00175 void BundleStreamBuf::received(const dtn::api::Bundle &b)
00176 {
00177         ibrcommon::MutexLock l(_chunks_cond);
00178 
00179         if (StreamBundle::getSequenceNumber(b) < _in_seq) return;
00180 
00181         _chunks.insert(Chunk(b));
00182         _chunks_cond.signal(true);
00183 
00184         // bundle received
00185 //      std::cerr << ". " << StreamBundle::getSequenceNumber(b) << std::flush;
00186 }
00187 
00188 int BundleStreamBuf::underflow()
00189 {
00190         ibrcommon::MutexLock l(_chunks_cond);
00191 
00192         return __underflow();
00193 }
00194 
00195 int BundleStreamBuf::__underflow()
00196 {
00197         // receive chunks until the next sequence number is received
00198         while (_chunks.empty())
00199         {
00200                 // wait for the next bundle
00201                 _chunks_cond.wait();
00202         }
00203 
00204         // while not the right sequence number received -> wait
00205         while (_in_seq != (*_chunks.begin())._seq)
00206         {
00207                 // wait for the next bundle
00208                 _chunks_cond.wait();
00209 
00210                 if (_chunks.size() > 10)
00211                 {
00212                         // skip the missing bundles and proceed with the last received one
00213                         _in_seq = (*_chunks.begin())._seq;
00214                 }
00215         }
00216 
00217         // get the first chunk in the buffer
00218         const Chunk &c = (*_chunks.begin());
00219 
00220         dtn::api::Bundle b = c._bundle;
00221         ibrcommon::BLOB::Reference r = b.getData();
00222 
00223         // get stream lock
00224         ibrcommon::BLOB::iostream stream = r.iostream();
00225 
00226         // jump to the offset position
00227         (*stream).seekg(_chunk_offset, ios::beg);
00228 
00229         // copy the data of the last received bundle into the buffer
00230         (*stream).read(_out_buf, BUFF_SIZE);
00231 
00232         // get the read bytes
00233         size_t bytes = (*stream).gcount();
00234 
00235         if ((*stream).eof())
00236         {
00237                 // bundle consumed
00238 //              std::cerr << std::endl << "# " << c._seq << std::endl << std::flush;
00239 
00240                 // delete the last chunk
00241                 _chunks.erase(c);
00242 
00243                 // reset the chunk offset
00244                 _chunk_offset = 0;
00245 
00246                 // increment sequence number
00247                 _in_seq++;
00248 
00249                 // if no more bytes are read, get the next bundle -> call underflow() recursive
00250                 if (bytes == 0)
00251                 {
00252                         return __underflow();
00253                 }
00254         }
00255         else
00256         {
00257                 // increment the chunk offset
00258                 _chunk_offset += bytes;
00259         }
00260         
00261         // Since the input buffer content is now valid (or is new)
00262         // the get pointer should be initialized (or reset).
00263         setg(_out_buf, _out_buf, _out_buf + bytes);
00264 
00265         return std::char_traits<char>::not_eof(_out_buf[0]);
00266 }
00267 
00268 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b)
00269  : _bundle(b), _seq(StreamBundle::getSequenceNumber(b))
00270 {
00271 }
00272 
00273 BundleStreamBuf::Chunk::~Chunk()
00274 {
00275 }
00276 
00277 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
00278 {
00279         return (_seq == other._seq);
00280 }
00281 
00282 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
00283 {
00284         return (_seq < other._seq);
00285 }
00286 
00287 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app)
00288  : dtn::api::Client(app, stream), _stream(stream), _buf(*this, _chunk, chunk_size)
00289 {};
00290 
00291 BundleStream::~BundleStream() {};
00292 
00293 BundleStreamBuf& BundleStream::rdbuf()
00294 {
00295         return _buf;
00296 }
00297 
00298 dtn::api::Bundle& BundleStream::base()
00299 {
00300         return _chunk;
00301 }
00302 
00303 void BundleStream::received(const dtn::api::Bundle &b)
00304 {
00305         _buf.received(b);
00306 }
00307 
00308 void print_help()
00309 {
00310         std::cout << "-- dtnstream (IBR-DTN) --" << std::endl;
00311         std::cout << "Syntax: dtnstream [options]"  << std::endl;
00312         std::cout << "* optional parameters *" << std::endl;
00313         std::cout << " -h               display this text" << std::endl;
00314         std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl;
00315         std::cout << " -s <identifier>  set the source identifier (e.g. stream)" << std::endl;
00316         std::cout << " -c <bytes>       set the chunk size (max. size of each bundle)" << std::endl;
00317 //      std::cout << " -t <seconds>     set the timeout of the buffer" << std::endl;
00318         std::cout << " -l <seconds>     set the lifetime of stream chunks default: 30" << std::endl;
00319         std::cout << " -E               request encryption on the bundle layer" << std::endl;
00320         std::cout << " -S               request signature on the bundle layer" << std::endl;
00321         std::cout << " -U <socket>      use UNIX domain sockets" << std::endl;
00322 }
00323 
00324 int main(int argc, char *argv[])
00325 {
00326         int opt = 0;
00327         dtn::data::EID _destination;
00328         std::string _source = "stream";
00329         unsigned int _lifetime = 30;
00330         size_t _chunk_size = 4096;
00331 //      size_t _timeout = 0;
00332         bool _bundle_encryption = false;
00333         bool _bundle_signed = false;
00334         ibrcommon::File _unixdomain;
00335 
00336         while((opt = getopt(argc, argv, "hd:s:c:l:ESU:")) != -1)
00337         {
00338                 switch (opt)
00339                 {
00340                 case 'h':
00341                         print_help();
00342                         return 0;
00343 
00344                 case 'd':
00345                         _destination = std::string(optarg);
00346                         break;
00347 
00348                 case 's':
00349                         _source = optarg;
00350                         break;
00351 
00352                 case 'c':
00353                         _chunk_size = atoi(optarg);
00354                         break;
00355 
00356 //              case 't':
00357 //                      _timeout = atoi(optarg);
00358 //                      break;
00359 
00360                 case 'l':
00361                         _lifetime = atoi(optarg);
00362                         break;
00363 
00364                 case 'E':
00365                         _bundle_encryption = true;
00366                         break;
00367 
00368                 case 'S':
00369                         _bundle_signed = true;
00370                         break;
00371 
00372                 case 'U':
00373                         _unixdomain = ibrcommon::File(optarg);
00374                         break;
00375 
00376                 default:
00377                         std::cout << "unknown command" << std::endl;
00378                         return -1;
00379                 }
00380         }
00381 
00382         try {
00383                 // Create a stream to the server using TCP.
00384                 ibrcommon::tcpclient conn;
00385 
00386                 // check if the unixdomain socket exists
00387                 if (_unixdomain.exists())
00388                 {
00389                         // connect to the unix domain socket
00390                         conn.open(_unixdomain);
00391                 }
00392                 else
00393                 {
00394                         // connect to the standard local api port
00395                         conn.open("127.0.0.1", 4550);
00396 
00397                         // enable nodelay option
00398                         conn.enableNoDelay();
00399                 }
00400 
00401                 // Initiate a derivated client
00402                 BundleStream bs(conn, _chunk_size, _source);
00403 
00404                 // Connect to the server. Actually, this function initiate the
00405                 // stream protocol by starting the thread and sending the contact header.
00406                 bs.connect();
00407 
00408                 // transmitter mode
00409                 if (_destination != dtn::data::EID())
00410                 {
00411                         bs.base().setDestination(_destination);
00412                         bs.base().setLifetime(_lifetime);
00413                         if (_bundle_encryption) bs.base().requestEncryption();
00414                         if (_bundle_signed) bs.base().requestSigned();
00415                         std::ostream stream(&bs.rdbuf());
00416                         stream << std::cin.rdbuf() << std::flush;
00417                 }
00418                 // receiver mode
00419                 else
00420                 {
00421                         std::istream stream(&bs.rdbuf());
00422                         std::cout << stream.rdbuf() << std::flush;
00423                 }
00424 
00425                 // Shutdown the client connection.
00426                 bs.close();
00427                 conn.close();
00428         } catch (const ibrcommon::tcpclient::SocketException&) {
00429                 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl;
00430                 return -1;
00431         } catch (...) {
00432 
00433         }
00434 
00435 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1