Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "config.h"
00012 #include "BundleStream.h"
00013 #include <ibrdtn/api/Client.h>
00014 #include <ibrdtn/data/EID.h>
00015 #include <ibrdtn/data/PayloadBlock.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/data/File.h>
00018 #include <iostream>
00019
00020 dtn::data::Block* StreamBlock::Factory::create()
00021 {
00022 return new StreamBlock();
00023 }
00024
00025 StreamBlock::StreamBlock()
00026 : dtn::data::Block(StreamBlock::BLOCK_TYPE), _seq(0)
00027 {
00028 }
00029
00030 StreamBlock::~StreamBlock()
00031 {
00032
00033 }
00034
00035 size_t StreamBlock::getLength() const
00036 {
00037 return _seq.getLength();
00038 }
00039
00040 std::ostream& StreamBlock::serialize(std::ostream &stream) const
00041 {
00042 stream << _seq;
00043 return stream;
00044 }
00045
00046 std::istream& StreamBlock::deserialize(std::istream &stream)
00047 {
00048 stream >> _seq;
00049 return stream;
00050 }
00051
00052 void StreamBlock::setSequenceNumber(size_t seq)
00053 {
00054 _seq = seq;
00055 }
00056
00057 size_t StreamBlock::getSequenceNumber() const
00058 {
00059 return _seq.getValue();
00060 }
00061
00062 StreamBundle::StreamBundle()
00063 : _ref(ibrcommon::StringBLOB::create())
00064 {
00065 StreamBlock &block = _b.push_front<StreamBlock>();
00066 block.setSequenceNumber(0);
00067
00068 _b.push_back(_ref);
00069 }
00070
00071 StreamBundle::StreamBundle(const dtn::api::Bundle &b)
00072 : dtn::api::Bundle(b), _ref(getData())
00073 {
00074 }
00075
00076 StreamBundle::~StreamBundle()
00077 {
00078 }
00079
00080 void StreamBundle::append(const char* data, size_t length)
00081 {
00082 ibrcommon::BLOB::iostream stream = _ref.iostream();
00083 (*stream).seekp(0, ios::end);
00084 (*stream).write(data, length);
00085 }
00086
00087 void StreamBundle::clear()
00088 {
00089 ibrcommon::BLOB::iostream stream = _ref.iostream();
00090 stream.clear();
00091
00092
00093 try {
00094 StreamBlock &block = _b.getBlock<StreamBlock>();
00095 block.setSequenceNumber(block.getSequenceNumber() + 1);
00096 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00097 }
00098
00099 size_t StreamBundle::size()
00100 {
00101 ibrcommon::BLOB::iostream stream = _ref.iostream();
00102 return stream.size();
00103 }
00104
00105 size_t StreamBundle::getSequenceNumber(const StreamBundle &b)
00106 {
00107 try {
00108 const StreamBlock &block = b._b.getBlock<StreamBlock>();
00109 return block.getSequenceNumber();
00110 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00111 return 0;
00112 }
00113 }
00114
00115 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer)
00116 : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk),
00117 _buffer(buffer), _chunk_offset(0), _in_seq(0)
00118 {
00119
00120 setg(0, 0, 0);
00121 setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00122 };
00123
00124 BundleStreamBuf::~BundleStreamBuf()
00125 {
00126 delete[] _in_buf;
00127 delete[] _out_buf;
00128 };
00129
00130 int BundleStreamBuf::sync()
00131 {
00132 int ret = std::char_traits<char>::eq_int_type(this->overflow(
00133 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00134 : 0;
00135
00136
00137 _client << _chunk; _client.flush();
00138 _chunk.clear();
00139
00140 return ret;
00141 }
00142
00143 int BundleStreamBuf::overflow(int c)
00144 {
00145 char *ibegin = _in_buf;
00146 char *iend = pptr();
00147
00148
00149 setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00150
00151 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00152 {
00153 *iend++ = std::char_traits<char>::to_char_type(c);
00154 }
00155
00156
00157 if ((iend - ibegin) == 0)
00158 {
00159 return std::char_traits<char>::not_eof(c);
00160 }
00161
00162
00163 _chunk.append(_in_buf, iend - ibegin);
00164
00165
00166 if (_chunk.size() > _buffer)
00167 {
00168 _client << _chunk; _client.flush();
00169 _chunk.clear();
00170 }
00171
00172 return std::char_traits<char>::not_eof(c);
00173 }
00174
00175 void BundleStreamBuf::received(const dtn::api::Bundle &b)
00176 {
00177 ibrcommon::MutexLock l(_chunks_cond);
00178
00179 if (StreamBundle::getSequenceNumber(b) < _in_seq) return;
00180
00181 _chunks.insert(Chunk(b));
00182 _chunks_cond.signal(true);
00183
00184
00185
00186 }
00187
00188 int BundleStreamBuf::underflow()
00189 {
00190 ibrcommon::MutexLock l(_chunks_cond);
00191
00192 return __underflow();
00193 }
00194
00195 int BundleStreamBuf::__underflow()
00196 {
00197
00198 while (_chunks.empty())
00199 {
00200
00201 _chunks_cond.wait();
00202 }
00203
00204
00205 while (_in_seq != (*_chunks.begin())._seq)
00206 {
00207
00208 _chunks_cond.wait();
00209
00210 if (_chunks.size() > 10)
00211 {
00212
00213 _in_seq = (*_chunks.begin())._seq;
00214 }
00215 }
00216
00217
00218 const Chunk &c = (*_chunks.begin());
00219
00220 dtn::api::Bundle b = c._bundle;
00221 ibrcommon::BLOB::Reference r = b.getData();
00222
00223
00224 ibrcommon::BLOB::iostream stream = r.iostream();
00225
00226
00227 (*stream).seekg(_chunk_offset, ios::beg);
00228
00229
00230 (*stream).read(_out_buf, BUFF_SIZE);
00231
00232
00233 size_t bytes = (*stream).gcount();
00234
00235 if ((*stream).eof())
00236 {
00237
00238
00239
00240
00241 _chunks.erase(c);
00242
00243
00244 _chunk_offset = 0;
00245
00246
00247 _in_seq++;
00248
00249
00250 if (bytes == 0)
00251 {
00252 return __underflow();
00253 }
00254 }
00255 else
00256 {
00257
00258 _chunk_offset += bytes;
00259 }
00260
00261
00262
00263 setg(_out_buf, _out_buf, _out_buf + bytes);
00264
00265 return std::char_traits<char>::not_eof(_out_buf[0]);
00266 }
00267
00268 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b)
00269 : _bundle(b), _seq(StreamBundle::getSequenceNumber(b))
00270 {
00271 }
00272
00273 BundleStreamBuf::Chunk::~Chunk()
00274 {
00275 }
00276
00277 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
00278 {
00279 return (_seq == other._seq);
00280 }
00281
00282 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
00283 {
00284 return (_seq < other._seq);
00285 }
00286
00287 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app)
00288 : dtn::api::Client(app, stream), _stream(stream), _buf(*this, _chunk, chunk_size)
00289 {};
00290
00291 BundleStream::~BundleStream() {};
00292
00293 BundleStreamBuf& BundleStream::rdbuf()
00294 {
00295 return _buf;
00296 }
00297
00298 dtn::api::Bundle& BundleStream::base()
00299 {
00300 return _chunk;
00301 }
00302
00303 void BundleStream::received(const dtn::api::Bundle &b)
00304 {
00305 _buf.received(b);
00306 }
00307
00308 void print_help()
00309 {
00310 std::cout << "-- dtnstream (IBR-DTN) --" << std::endl;
00311 std::cout << "Syntax: dtnstream [options]" << std::endl;
00312 std::cout << "* optional parameters *" << std::endl;
00313 std::cout << " -h display this text" << std::endl;
00314 std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl;
00315 std::cout << " -s <identifier> set the source identifier (e.g. stream)" << std::endl;
00316 std::cout << " -c <bytes> set the chunk size (max. size of each bundle)" << std::endl;
00317
00318 std::cout << " -l <seconds> set the lifetime of stream chunks default: 30" << std::endl;
00319 std::cout << " -E request encryption on the bundle layer" << std::endl;
00320 std::cout << " -S request signature on the bundle layer" << std::endl;
00321 std::cout << " -U <socket> use UNIX domain sockets" << std::endl;
00322 }
00323
00324 int main(int argc, char *argv[])
00325 {
00326 int opt = 0;
00327 dtn::data::EID _destination;
00328 std::string _source = "stream";
00329 unsigned int _lifetime = 30;
00330 size_t _chunk_size = 4096;
00331
00332 bool _bundle_encryption = false;
00333 bool _bundle_signed = false;
00334 ibrcommon::File _unixdomain;
00335
00336 while((opt = getopt(argc, argv, "hd:s:c:l:ESU:")) != -1)
00337 {
00338 switch (opt)
00339 {
00340 case 'h':
00341 print_help();
00342 return 0;
00343
00344 case 'd':
00345 _destination = std::string(optarg);
00346 break;
00347
00348 case 's':
00349 _source = optarg;
00350 break;
00351
00352 case 'c':
00353 _chunk_size = atoi(optarg);
00354 break;
00355
00356
00357
00358
00359
00360 case 'l':
00361 _lifetime = atoi(optarg);
00362 break;
00363
00364 case 'E':
00365 _bundle_encryption = true;
00366 break;
00367
00368 case 'S':
00369 _bundle_signed = true;
00370 break;
00371
00372 case 'U':
00373 _unixdomain = ibrcommon::File(optarg);
00374 break;
00375
00376 default:
00377 std::cout << "unknown command" << std::endl;
00378 return -1;
00379 }
00380 }
00381
00382 try {
00383
00384 ibrcommon::tcpclient conn;
00385
00386
00387 if (_unixdomain.exists())
00388 {
00389
00390 conn.open(_unixdomain);
00391 }
00392 else
00393 {
00394
00395 conn.open("127.0.0.1", 4550);
00396
00397
00398 conn.enableNoDelay();
00399 }
00400
00401
00402 BundleStream bs(conn, _chunk_size, _source);
00403
00404
00405
00406 bs.connect();
00407
00408
00409 if (_destination != dtn::data::EID())
00410 {
00411 bs.base().setDestination(_destination);
00412 bs.base().setLifetime(_lifetime);
00413 if (_bundle_encryption) bs.base().requestEncryption();
00414 if (_bundle_signed) bs.base().requestSigned();
00415 std::ostream stream(&bs.rdbuf());
00416 stream << std::cin.rdbuf() << std::flush;
00417 }
00418
00419 else
00420 {
00421 std::istream stream(&bs.rdbuf());
00422 std::cout << stream.rdbuf() << std::flush;
00423 }
00424
00425
00426 bs.close();
00427 conn.close();
00428 } catch (const ibrcommon::tcpclient::SocketException&) {
00429 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl;
00430 return -1;
00431 } catch (...) {
00432
00433 }
00434
00435 }