IBR-DTN  1.0.0
BinaryStreamClient.cpp
Go to the documentation of this file.
1 /*
2  * BinaryStreamClient.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/BinaryStreamClient.h"
25 #include "core/GlobalEvent.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
29 #include <ibrdtn/data/Serializer.h>
30 #include <iostream>
31 #include <ibrcommon/Logger.h>
32 
33 namespace dtn
34 {
35  namespace api
36  {
37  BinaryStreamClient::BinaryStreamClient(ClientHandler &client, ibrcommon::socketstream &stream)
38  : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream), _lastack(0)
39  {
40  }
41 
43  {
45  _sender.join();
46  }
47 
49  {
50  return _eid;
51  }
52 
54  {
55  }
56 
58  {
59  }
60 
62  {
63  }
64 
66  {
67  Registration &reg = _client.getRegistration();
68 
69  if (header._localeid.isNone())
70  {
71  // create an EID based on the registration handle
72  _eid = reg.getDefaultEID();
73  }
74  else
75  {
76  // contact received event
77  _eid = BundleCore::local;
78  _eid.setApplication( header._localeid.getSSP() );
79  }
80 
81  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 20) << "new client connected, handle: " << reg.getHandle() << "; eid: "<< _eid.getString() << IBRCOMMON_LOGGER_ENDL;
82 
83  reg.subscribe(_eid);
84  }
85 
87  {
88  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
89 
91 
92  try {
93  // stop the sender
94  _sender.stop();
95  } catch (const ibrcommon::ThreadException &ex) {
96  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
97  }
98  }
99 
101  {
102  try {
103  const dtn::data::Bundle bundle = _sentqueue.take();
104 
105  // set ACK to zero
106  _lastack = 0;
107 
108  } catch (const ibrcommon::QueueUnblockedException&) {
109  // pop on empty queue!
110  }
111  }
112 
114  {
115  try {
116  const dtn::data::Bundle bundle = _sentqueue.take();
118 
119  // notify bundle as delivered
121 
122  // set ACK to zero
123  _lastack = 0;
124  } catch (const ibrcommon::QueueUnblockedException&) {
125  // pop on empty queue!
126  }
127  }
128 
130  {
131  _lastack = ack;
132  }
133 
135  {
136  // shutdown
138 
139  // close the stream
140  _stream.close();
141  }
142 
144  {
145  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL;
146 
147  // abort blocking registrations
149 
150  // close the stream
151  _stream.close();
152 
153  try {
154  // shutdown the sender thread
155  _sender.stop();
156  } catch (const std::exception&) { };
157  }
158 
160  {
161  try {
162  char flags = 0;
163 
164  // request acknowledgements
166 
167  // do the handshake
168  _connection.handshake(dtn::core::BundleCore::local, 10, flags);
169 
170  // start the sender thread
171  _sender.start();
172 
173  while (_connection.good())
174  {
175  dtn::data::Bundle bundle;
176  dtn::data::DefaultDeserializer(_connection) >> bundle;
177 
178  // process the new bundle
180  }
181  } catch (const ibrcommon::ThreadException &ex) {
182  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << "failed to start thread: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
184  } catch (const dtn::SerializationFailedException &ex) {
185  IBRCOMMON_LOGGER_TAG("BinaryStreamClient", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
187  } catch (const ibrcommon::IOException &ex) {
188  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
190  } catch (const dtn::InvalidDataException &ex) {
191  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
193  } catch (const std::exception &ex) {
194  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
196  }
197  }
198 
200  {
201  return _stream.good();
202  }
203 
204  BinaryStreamClient::Sender::Sender(BinaryStreamClient &client)
205  : _client(client)
206  {
207  }
208 
209  BinaryStreamClient::Sender::~Sender()
210  {
211  ibrcommon::JoinableThread::join();
212  }
213 
214  void BinaryStreamClient::Sender::__cancellation() throw ()
215  {
216  // cancel the main thread in here
217  this->abort();
218 
219  // abort all blocking calls on the registration object
220  _client._client.getRegistration().abort();
221  }
222 
223  void BinaryStreamClient::Sender::run() throw ()
224  {
225  Registration &reg = _client._client.getRegistration();
226 
227  try {
228  while (_client.good())
229  {
230  try {
231  dtn::data::Bundle bundle = reg.receive();
232 
233  try {
234  // process the bundle block (security, compression, ...)
236  } catch (const ibrcommon::Exception&) {
237  // bundle processing failure
238  continue;
239  }
240 
241  // add bundle to the queue
242  _client._sentqueue.push(bundle);
243 
244  // transmit the bundle
245  dtn::data::DefaultSerializer(_client._connection) << bundle;
246 
247  // mark the end of the bundle
248  _client._connection << std::flush;
249  } catch (const dtn::storage::NoBundleFoundException&) {
250  reg.wait_for_bundle();
251  }
252 
253  // idle a little bit
254  yield();
255  }
256  } catch (const ibrcommon::QueueUnblockedException &ex) {
257  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
258  return;
259  } catch (const ibrcommon::IOException &ex) {
260  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
261  } catch (const dtn::InvalidDataException &ex) {
262  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
263  } catch (const std::exception &ex) {
264  IBRCOMMON_LOGGER_DEBUG_TAG("BinaryStreamClient", 10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
265  }
266  }
267 
269  {
270  _sender.push(bundle);
271  }
272  }
273 }
void delivered(const dtn::data::MetaBundle &m) const
static dtn::data::EID local
Definition: BundleCore.h:79
void subscribe(const dtn::data::EID &endpoint)
size_t Length
Definition: Number.h:33
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
ClientHandler & _client
Definition: ClientHandler.h:51
const std::string & getHandle() const
const dtn::data::EID & getDefaultEID() const
BinaryStreamClient(ClientHandler &client, ibrcommon::socketstream &stream)
virtual void eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases csc)
dtn::api::Client * _client
Definition: dtnrecv.cpp:50
Registration & getRegistration()
void handshake(const dtn::data::EID &eid, const dtn::data::Timeout &timeout, const dtn::data::Bitset< StreamContactHeader::HEADER_BITS > &flags)
void shutdown(ConnectionShutdownCases csc=CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN)
void unsubscribe(const dtn::data::EID &endpoint)
virtual void eventConnectionUp(const dtn::streams::StreamContactHeader &header)
virtual void eventBundleAck(const dtn::data::Length &ack)
void queue(const dtn::data::Bundle &bundle)
static void processBlocks(dtn::data::Bundle &b)
Definition: BundleCore.cpp:641
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
const dtn::data::EID & getPeer() const
ibrcommon::socketstream & _stream
Definition: ClientHandler.h:52
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)