IBR-DTNSuite 0.6

tools/src/BundleStream.h

Go to the documentation of this file.
00001 /*
00002  * BundleStream.h
00003  *
00004  *  Created on: 23.03.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef BUNDLESTREAM_H_
00009 #define BUNDLESTREAM_H_
00010 
00011 #include <ibrdtn/api/Bundle.h>
00012 #include <ibrdtn/api/Client.h>
00013 #include <ibrdtn/data/EID.h>
00014 #include <ibrdtn/data/SDNV.h>
00015 #include <ibrcommon/net/tcpclient.h>
00016 
00017 class StreamBlock : public dtn::data::Block
00018 {
00019 public:
00020         class Factory : public dtn::data::ExtensionBlock::Factory
00021         {
00022         public:
00023                 Factory() : dtn::data::ExtensionBlock::Factory(StreamBlock::BLOCK_TYPE) {};
00024                 virtual ~Factory() {};
00025                 virtual dtn::data::Block* create();
00026         };
00027 
00028         static const char BLOCK_TYPE = 242;
00029 
00030         StreamBlock();
00031         virtual ~StreamBlock();
00032 
00033         virtual size_t getLength() const;
00034         virtual std::ostream &serialize(std::ostream &stream, size_t &length) const;
00035         virtual std::istream &deserialize(std::istream &stream, const size_t length);
00036 
00037         void setSequenceNumber(size_t seq);
00038         size_t getSequenceNumber() const;
00039 
00040 private:
00041         dtn::data::SDNV _seq;
00042 };
00043 
00047 static StreamBlock::Factory __StreamBlockFactory__;
00048 
00049 class StreamBundle : public dtn::api::Bundle
00050 {
00051 public:
00052         StreamBundle();
00053         StreamBundle(const dtn::api::Bundle &b);
00054         virtual ~StreamBundle();
00055 
00059         void append(const char* data, size_t length);
00060 
00064         void clear();
00065 
00070         size_t size();
00071 
00072         static size_t getSequenceNumber(const StreamBundle &b);
00073 
00074 private:
00075         // reference to the BLOB where all data is stored until transmission
00076         ibrcommon::BLOB::Reference _ref;
00077 };
00078 
00079 class BundleStreamBuf : public std::basic_streambuf<char, std::char_traits<char> >
00080 {
00081 public:
00082         // The size of the input and output buffers.
00083         static const size_t BUFF_SIZE = 5120;
00084 
00085         BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer = 4096);
00086         virtual ~BundleStreamBuf();
00087 
00088         virtual void received(const dtn::api::Bundle &b);
00089 
00090 protected:
00091         virtual int sync();
00092         virtual int overflow(int = std::char_traits<char>::eof());
00093         virtual int underflow();
00094         int __underflow();
00095 
00096 private:
00097         class Chunk
00098         {
00099         public:
00100                 Chunk(const dtn::api::Bundle &b);
00101                 virtual ~Chunk();
00102 
00103                 bool operator==(const Chunk& other) const;
00104                 bool operator<(const Chunk& other) const;
00105 
00106                 dtn::api::Bundle _bundle;
00107                 size_t _seq;
00108         };
00109 
00110         // Input buffer
00111         char *_in_buf;
00112         // Output buffer
00113         char *_out_buf;
00114 
00115         dtn::api::Client &_client;
00116         StreamBundle &_chunk;
00117         size_t _buffer;
00118 
00119         ibrcommon::Conditional _chunks_cond;
00120         std::set<Chunk> _chunks;
00121         size_t _chunk_offset;
00122 
00123         size_t _in_seq;
00124 };
00125 
00126 class BundleStream : public dtn::api::Client
00127 {
00128 public:
00129         BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app = "stream");
00130         virtual ~BundleStream();
00131 
00132         BundleStreamBuf& rdbuf();
00133         dtn::api::Bundle& base();
00134 
00135 protected:
00136         virtual void received(const dtn::api::Bundle &b);
00137 
00138 private:
00139         ibrcommon::tcpstream &_stream;
00140 
00141         BundleStreamBuf _buf;
00142         StreamBundle _chunk;
00143 };
00144 
00145 #endif /* BUNDLESTREAM_H_ */