|
IBR-DTNSuite 0.6
|
00001 /* 00002 * BundleStream.h 00003 * 00004 * Created on: 23.03.2011 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef BUNDLESTREAM_H_ 00009 #define BUNDLESTREAM_H_ 00010 00011 #include <ibrdtn/api/Bundle.h> 00012 #include <ibrdtn/api/Client.h> 00013 #include <ibrdtn/data/EID.h> 00014 #include <ibrdtn/data/SDNV.h> 00015 #include <ibrcommon/net/tcpclient.h> 00016 00017 class StreamBlock : public dtn::data::Block 00018 { 00019 public: 00020 class Factory : public dtn::data::ExtensionBlock::Factory 00021 { 00022 public: 00023 Factory() : dtn::data::ExtensionBlock::Factory(StreamBlock::BLOCK_TYPE) {}; 00024 virtual ~Factory() {}; 00025 virtual dtn::data::Block* create(); 00026 }; 00027 00028 static const char BLOCK_TYPE = 242; 00029 00030 StreamBlock(); 00031 virtual ~StreamBlock(); 00032 00033 virtual size_t getLength() const; 00034 virtual std::ostream &serialize(std::ostream &stream, size_t &length) const; 00035 virtual std::istream &deserialize(std::istream &stream, const size_t length); 00036 00037 void setSequenceNumber(size_t seq); 00038 size_t getSequenceNumber() const; 00039 00040 private: 00041 dtn::data::SDNV _seq; 00042 }; 00043 00047 static StreamBlock::Factory __StreamBlockFactory__; 00048 00049 class StreamBundle : public dtn::api::Bundle 00050 { 00051 public: 00052 StreamBundle(); 00053 StreamBundle(const dtn::api::Bundle &b); 00054 virtual ~StreamBundle(); 00055 00059 void append(const char* data, size_t length); 00060 00064 void clear(); 00065 00070 size_t size(); 00071 00072 static size_t getSequenceNumber(const StreamBundle &b); 00073 00074 private: 00075 // reference to the BLOB where all data is stored until transmission 00076 ibrcommon::BLOB::Reference _ref; 00077 }; 00078 00079 class BundleStreamBuf : public std::basic_streambuf<char, std::char_traits<char> > 00080 { 00081 public: 00082 // The size of the input and output buffers. 00083 static const size_t BUFF_SIZE = 5120; 00084 00085 BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer = 4096); 00086 virtual ~BundleStreamBuf(); 00087 00088 virtual void received(const dtn::api::Bundle &b); 00089 00090 protected: 00091 virtual int sync(); 00092 virtual int overflow(int = std::char_traits<char>::eof()); 00093 virtual int underflow(); 00094 int __underflow(); 00095 00096 private: 00097 class Chunk 00098 { 00099 public: 00100 Chunk(const dtn::api::Bundle &b); 00101 virtual ~Chunk(); 00102 00103 bool operator==(const Chunk& other) const; 00104 bool operator<(const Chunk& other) const; 00105 00106 dtn::api::Bundle _bundle; 00107 size_t _seq; 00108 }; 00109 00110 // Input buffer 00111 char *_in_buf; 00112 // Output buffer 00113 char *_out_buf; 00114 00115 dtn::api::Client &_client; 00116 StreamBundle &_chunk; 00117 size_t _buffer; 00118 00119 ibrcommon::Conditional _chunks_cond; 00120 std::set<Chunk> _chunks; 00121 size_t _chunk_offset; 00122 00123 size_t _in_seq; 00124 }; 00125 00126 class BundleStream : public dtn::api::Client 00127 { 00128 public: 00129 BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app = "stream"); 00130 virtual ~BundleStream(); 00131 00132 BundleStreamBuf& rdbuf(); 00133 dtn::api::Bundle& base(); 00134 00135 protected: 00136 virtual void received(const dtn::api::Bundle &b); 00137 00138 private: 00139 ibrcommon::tcpstream &_stream; 00140 00141 BundleStreamBuf _buf; 00142 StreamBundle _chunk; 00143 }; 00144 00145 #endif /* BUNDLESTREAM_H_ */