IBR-DTN  1.0.0
OrderedStreamHandler.cpp
Go to the documentation of this file.
1 /*
2  * OrderedStreamHandler.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "core/BundleCore.h"
25 
27 #include <ibrdtn/utils/Utils.h>
28 #include <ibrcommon/Logger.h>
29 
30 #include <ibrdtn/ibrdtn.h>
31 #ifdef IBRDTN_SUPPORT_BSP
33 #endif
34 
35 namespace dtn
36 {
37  namespace api
38  {
39  OrderedStreamHandler::OrderedStreamHandler(ClientHandler &client, ibrcommon::socketstream &stream)
40  : ProtocolHandler(client, stream), _sender(*this), _streambuf(*this), _bundlestream(&_streambuf), _group(true), _lifetime(3600)
41  {
42  _endpoint = client.getRegistration().getDefaultEID();
43  }
44 
46  {
47  _sender.stop();
48  _sender.join();
49  }
50 
52  {
54  }
55 
57  {
58  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 20) << "put()" << IBRCOMMON_LOGGER_ENDL;
59 
60  // set destination EID
61  b.destination = _peer;
62 
63  // set source
64  b.source = _endpoint;
65 
66  // set lifetime
67  b.lifetime = _lifetime;
68 
69  // set flag if the bundles are addresses to a group
70  if (_group)
71  {
73  }
74  else
75  {
77  }
78 
79  // raise default bundle received event
81  }
82 
84  {
86  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 20) << "get()" << IBRCOMMON_LOGGER_ENDL;
87 
88  dtn::data::MetaBundle bundle;
89 
90  while (true)
91  {
92  try {
93  bundle = reg.receiveMetaBundle();
94 
95  // discard bundle if they are not from the specified peer
96  if ((!_group) && (bundle.source != _peer))
97  {
98  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 30) << "get(): bundle source " << bundle.source.getString() << " not expected - discard" << IBRCOMMON_LOGGER_ENDL;
99  continue;
100  }
101 
102  break;
103  } catch (const dtn::storage::NoBundleFoundException&) {
104  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 30) << "get(): no bundle found wait for notify" << IBRCOMMON_LOGGER_ENDL;
105  reg.wait_for_bundle(timeout);
106  }
107  }
108 
109  return bundle;
110  }
111 
113  {
114  // close the stream
115  _stream.close();
116  }
117 
119  {
120  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 60) << "OrderedStreamHandler down" << IBRCOMMON_LOGGER_ENDL;
121 
123 
124  // close the stream
125  _stream.close();
126 
127  try {
128  // shutdown the sender thread
129  _sender.stop();
130  } catch (const std::exception&) { };
131  }
132 
134  {
135  std::string buffer;
136  _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO ORDERED STREAM PROTOCOL" << std::endl;
137 
138  while (_stream.good())
139  {
140  getline(_stream, buffer);
141 
142  std::string::reverse_iterator iter = buffer.rbegin();
143  if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
144 
145  std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
146  if (cmd.empty()) continue;
147 
148  try {
149  if (cmd[0] == "connect")
150  {
151  _stream << ClientHandler::API_STATUS_CONTINUE << " CONNECTION ESTABLISHED" << std::endl;
152 
153  // start sender to transfer received payload to the client
154  _sender.start();
155 
156  // forward data to stream buffer
157  _bundlestream << _stream.rdbuf() << std::flush;
158  }
159  else if (cmd[0] == "set")
160  {
161  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
162 
163  if (cmd[1] == "endpoint")
164  {
165  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
166 
167  // error checking
168  if (cmd[2].length() <= 0)
169  {
170  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
171  }
172  else
173  {
174  // unsubscribe from old endpoint
175  _client.getRegistration().unsubscribe(_endpoint);
176 
177  // set new application endpoint
178  _endpoint.setApplication(cmd[2]);
179 
180  // subscribe to new endpoint
181  _client.getRegistration().subscribe(_endpoint);
182 
183  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
184  }
185  }
186  else if (cmd[1] == "destination")
187  {
188  _peer = cmd[2];
189  _group = false;
190  _stream << ClientHandler::API_STATUS_OK << " DESTINATION CHANGED" << std::endl;
191  }
192  else if (cmd[1] == "group")
193  {
194  _peer = cmd[2];
195  _group = true;
196  _stream << ClientHandler::API_STATUS_OK << " DESTINATION GROUP CHANGED" << std::endl;
197  }
198  else if (cmd[1] == "lifetime")
199  {
200  std::stringstream ss(cmd[2]);
201  _lifetime.read(ss);
202  _stream << ClientHandler::API_STATUS_OK << " LIFETIME CHANGED" << std::endl;
203  }
204  else if (cmd[1] == "chunksize")
205  {
206  size_t size = 0;
207  std::stringstream ss(cmd[2]);
208  ss >> size;
209  _streambuf.setChunkSize(size);
210  _stream << ClientHandler::API_STATUS_OK << " CHUNKSIZE CHANGED" << std::endl;
211  }
212  else if (cmd[1] == "timeout")
213  {
214  size_t timeout = 0;
215  std::stringstream ss(cmd[2]);
216  ss >> timeout;
217  _streambuf.setTimeout(timeout);
218  _stream << ClientHandler::API_STATUS_OK << " TIMEOUT CHANGED" << std::endl;
219  }
220  else
221  {
222  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
223  }
224  }
225  else
226  {
227  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
228  }
229  } catch (const std::exception&) {
230  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
231  }
232  }
233  }
234 
235  OrderedStreamHandler::Sender::Sender(OrderedStreamHandler &conn)
236  : _handler(conn)
237  {
238  }
239 
240  OrderedStreamHandler::Sender::~Sender()
241  {
242  ibrcommon::JoinableThread::join();
243  }
244 
245  void OrderedStreamHandler::Sender::__cancellation() throw ()
246  {
247  // cancel the main thread in here
248  _handler._client.getRegistration().abort();
249  }
250 
251  void OrderedStreamHandler::Sender::finally() throw ()
252  {
253  _handler._client.getRegistration().abort();
254  }
255 
256  void OrderedStreamHandler::Sender::run() throw ()
257  {
258  try {
259  _handler._stream << _handler._bundlestream.rdbuf() << std::flush;
260  } catch (const std::exception &ex) {
261  IBRCOMMON_LOGGER_DEBUG_TAG("OrderedStreamHandler", 10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
262  }
263  }
264  } /* namespace api */
265 } /* namespace dtn */
static void inject(const dtn::data::EID &source, dtn::data::Bundle &bundle)
Definition: BundleCore.cpp:706
void delivered(const dtn::data::MetaBundle &m) const
void setTimeout(const dtn::data::Timeout &timeout)
void subscribe(const dtn::data::EID &endpoint)
virtual void delivered(const dtn::data::MetaBundle &m)
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
ClientHandler & _client
Definition: ClientHandler.h:51
virtual dtn::data::MetaBundle get(const dtn::data::Timeout timeout=0)
void setChunkSize(const dtn::data::Length &size)
dtn::data::MetaBundle receiveMetaBundle()
const dtn::data::EID & getDefaultEID() const
Registration & getRegistration()
size_t Timeout
Definition: Number.h:35
void unsubscribe(const dtn::data::EID &endpoint)
void read(std::istream &stream)
Definition: SDNV.h:425
std::string getString() const
Definition: EID.cpp:374
void wait_for_bundle(size_t timeout=0)
ibrcommon::socketstream & _stream
Definition: ClientHandler.h:52
OrderedStreamHandler(ClientHandler &client, ibrcommon::socketstream &stream)
static std::vector< std::string > tokenize(const std::string &token, const std::string &data, const std::string::size_type max=std::string::npos)
Definition: Utils.cpp:60
void set(FLAGS flag, bool value)
dtn::data::EID source
Definition: BundleID.h:53
virtual void put(dtn::data::Bundle &b)