IBR-DTN  1.0.0
ExtendedApiHandler.cpp
Go to the documentation of this file.
1 /*
2  * ExtendedApiHandler.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * Written-by: Stephen Roettger <roettger@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 #include "config.h"
24 #include "Configuration.h"
25 #include "api/ExtendedApiHandler.h"
26 #include "core/BundleEvent.h"
27 #include <ibrcommon/Logger.h>
30 #include <ibrdtn/data/AgeBlock.h>
31 #include <ibrdtn/utils/Utils.h>
32 #include <ibrcommon/data/Base64Reader.h>
33 #include <ibrcommon/data/Base64Stream.h>
34 #include "core/BundleCore.h"
35 #include <ibrdtn/utils/Random.h>
36 
37 #include <ibrdtn/ibrdtn.h>
38 #ifdef IBRDTN_SUPPORT_BSP
40 #endif
41 
42 #include <algorithm>
43 
44 namespace dtn
45 {
46  namespace api
47  {
48  ExtendedApiHandler::ExtendedApiHandler(ClientHandler &client, ibrcommon::socketstream &stream)
49  : ProtocolHandler(client, stream), _sender(new Sender(*this)),
50  _endpoint(_client.getRegistration().getDefaultEID()), _encoding(dtn::api::PlainSerializer::BASE64)
51  {
52  _client.getRegistration().subscribe(_endpoint);
53  }
54 
56  {
58  _sender->join();
59  delete _sender;
60  }
61 
63  return _stream.good();
64  }
65 
67  {
68  // close the stream
69  _stream.close();
70  }
71 
73  {
74  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 60) << "ExtendedApiConnection down" << IBRCOMMON_LOGGER_ENDL;
75 
77 
78  // close the stream
79  _stream.close();
80 
81  try {
82  // shutdown the sender thread
83  _sender->stop();
84  } catch (const std::exception&) { };
85  }
86 
88  {
89  _sender->start();
90 
91  std::string buffer;
92  _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO EXTENDED" << std::endl;
93 
94  while (_stream.good())
95  {
96  getline(_stream, buffer);
97 
98  std::string::reverse_iterator iter = buffer.rbegin();
99  if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
100 
101  std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
102  if (cmd.empty()) continue;
103 
104  try {
105  if (cmd[0] == "set")
106  {
107  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
108 
109  if (cmd[1] == "endpoint")
110  {
111  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
112 
113  ibrcommon::MutexLock l(_write_lock);
114  if (cmd[2].length() <= 0) {
115  // send error notification
116  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
117  } else {
119 
120  // un-subscribe previous registration
121  reg.unsubscribe(_endpoint);
122 
123  // set new application endpoint
124  _endpoint.setApplication(cmd[2]);
125 
126  // subscribe to new endpoint
127  reg.subscribe(_endpoint);
128 
129  // send accepted notification
130  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
131  }
132  }
133  else if (cmd[1] == "encoding")
134  {
135  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
136 
137  // parse encoding
139 
141  // set the new encoding as default
142  _encoding = enc;
143 
144  ibrcommon::MutexLock l(_write_lock);
145  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
146  } else {
147  ibrcommon::MutexLock l(_write_lock);
148  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENCODING" << std::endl;
149  }
150  }
151  else
152  {
153  ibrcommon::MutexLock l(_write_lock);
154  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
155  }
156  }
157  else if (cmd[0] == "endpoint")
158  {
159  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
160 
161  if (cmd[1] == "add")
162  {
163  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
164 
165  ibrcommon::MutexLock l(_write_lock);
166 
167  // error checking
168  if (cmd[2].length() <= 0)
169  {
170  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
171  }
172  else
173  {
175  new_endpoint.setApplication(cmd[2]);
176 
177  _client.getRegistration().subscribe(new_endpoint);
178  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
179  }
180  }
181  else if (cmd[1] == "del")
182  {
183  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
184 
185  ibrcommon::MutexLock l(_write_lock);
186 
187  // error checking
188  if (cmd[2].length() <= 0)
189  {
190  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
191  }
192  else
193  {
195  del_endpoint.setApplication( cmd[2] );
196 
197  _client.getRegistration().unsubscribe(del_endpoint);
198 
199  // restore default endpoint if the standard endpoint has been removed
200  if(_endpoint == del_endpoint)
201  {
202  _endpoint = _client.getRegistration().getDefaultEID();
203  _client.getRegistration().subscribe(_endpoint);
204  }
205 
206  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
207  }
208  }
209  else if (cmd[1] == "get")
210  {
211  ibrcommon::MutexLock l(_write_lock);
212  _stream << ClientHandler::API_STATUS_OK << " ENDPOINT GET " << _endpoint.getString() << std::endl;
213  }
214  else
215  {
216  ibrcommon::MutexLock l(_write_lock);
217  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
218  }
219  }
220  else if (cmd[0] == "registration")
221  {
222  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
223 
224  if (cmd[1] == "add")
225  {
226  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
227 
228  ibrcommon::MutexLock l(_write_lock);
229  dtn::data::EID endpoint(cmd[2]);
230 
231  // error checking
232  if (endpoint == dtn::data::EID())
233  {
234  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
235  }
236  else
237  {
238  _client.getRegistration().subscribe(endpoint);
239  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
240  }
241  }
242  else if (cmd[1] == "del")
243  {
244  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
245 
246  ibrcommon::MutexLock l(_write_lock);
247  dtn::data::EID endpoint(cmd[2]);
248 
249  // error checking
250  if (endpoint == dtn::data::EID())
251  {
252  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
253  }
254  else
255  {
256  _client.getRegistration().unsubscribe(endpoint);
257  if(_endpoint == endpoint)
258  {
259  _endpoint = _client.getRegistration().getDefaultEID();
260  _client.getRegistration().subscribe(_endpoint);
261  }
262 
263  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
264  }
265  }
266  else if (cmd[1] == "list")
267  {
268  ibrcommon::MutexLock l(_write_lock);
269  const std::set<dtn::data::EID> list = _client.getRegistration().getSubscriptions();
270 
271  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LIST" << std::endl;
272  for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
273  {
274  _stream << (*iter).getString() << std::endl;
275  }
276  _stream << std::endl;
277  }
278  else if (cmd[1] == "save")
279  {
280  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
281 
282  ibrcommon::Timer::time_t lifetime = 0;
283  std::stringstream ss(cmd[2]);
284 
285  ss >> lifetime;
286  if(ss.fail()) throw ibrcommon::Exception("malformed command");
287 
288  /* make the registration persistent for a given lifetime */
290 
291  ibrcommon::MutexLock l(_write_lock);
292  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION SAVE " << _client.getRegistration().getHandle() << std::endl;
293  }
294  else if (cmd[1] == "load")
295  {
296  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
297 
298  const std::string handle = cmd[2];
299 
300  try
301  {
303 
304  /* stop the sender */
306  _sender->join();
307 
308  /* switch the registration */
310 
311  /* and switch the sender */
312  Sender *old_sender = _sender;
313  try{
314  _sender = new Sender(*this);
315  }
316  catch (const std::bad_alloc &ex)
317  {
318  _sender = old_sender;
319  throw ex;
320  }
321  delete old_sender;
322  _sender->start();
323 
324  ibrcommon::MutexLock l(_write_lock);
325  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LOAD" << std::endl;
326  }
328  {
329  ibrcommon::MutexLock l(_write_lock);
330  _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION BUSY" << std::endl;
331  }
332  catch (const Registration::NotFoundException& ex)
333  {
334  ibrcommon::MutexLock l(_write_lock);
335  _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION NOT FOUND" << std::endl;
336  }
337  }
338  else
339  {
340  ibrcommon::MutexLock l(_write_lock);
341  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
342  }
343  }
344  else if (cmd[0] == "neighbor")
345  {
346  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
347 
348  if (cmd[1] == "list")
349  {
350  ibrcommon::MutexLock l(_write_lock);
351  const std::set<dtn::core::Node> nlist = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
352 
353  _stream << ClientHandler::API_STATUS_OK << " NEIGHBOR LIST" << std::endl;
354  for (std::set<dtn::core::Node>::const_iterator iter = nlist.begin(); iter != nlist.end(); ++iter)
355  {
356  _stream << (*iter).getEID().getString() << std::endl;
357  }
358  _stream << std::endl;
359  }
360  else
361  {
362  ibrcommon::MutexLock l(_write_lock);
363  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
364  }
365  }
366  else if (cmd[0] == "bundle")
367  {
368  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
369 
370  if (cmd[1] == "get")
371  {
372  // transfer bundle data
373  ibrcommon::MutexLock l(_write_lock);
374 
375  if (cmd.size() == 2)
376  {
377  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
378  PlainSerializer(_stream, _encoding) << _bundle_reg;
379  }
380  else if (cmd[2] == "binary")
381  {
382  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET BINARY "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
383  dtn::data::DefaultSerializer(_stream) << _bundle_reg; _stream << std::flush;
384  }
385  else if (cmd[2] == "plain")
386  {
387  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET PLAIN "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
388  PlainSerializer(_stream, _encoding) << _bundle_reg;
389  }
390  else if (cmd[2] == "xml")
391  {
392  _stream << ClientHandler::API_STATUS_NOT_IMPLEMENTED << " FORMAT NOT IMPLEMENTED" << std::endl;
393  }
394  else
395  {
396  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN FORMAT" << std::endl;
397  }
398  }
399  else if (cmd[1] == "put")
400  {
401  // lock the stream during reception of bundle data
402  ibrcommon::MutexLock l(_write_lock);
403 
404  if (cmd.size() < 3)
405  {
406  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
407  }
408  else if (cmd[2] == "plain")
409  {
410  _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE PLAIN" << std::endl;
411 
412  try {
413  PlainDeserializer(_stream) >> _bundle_reg;
414  _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
415  } catch (const std::exception &ex) {
416  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 20) << "put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
417  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
418 
419  }
420  }
421  else if (cmd[2] == "binary")
422  {
423  _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE BINARY" << std::endl;
424 
425  try {
426  dtn::data::DefaultDeserializer(_stream) >> _bundle_reg;
427  _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
428  } catch (const std::exception &ex) {
429  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 20) << "put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
430  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
431  }
432  }
433  else
434  {
435  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
436  }
437  }
438  else if (cmd[1] == "load")
439  {
440  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
441 
443 
444  if (cmd[2] == "queue")
445  {
446  id = _bundle_queue.take();
447  }
448  else
449  {
450  // construct bundle id
451  id = readBundleID(cmd, 2);
452  }
453 
454  // load the bundle
455  try {
456  _bundle_reg = dtn::core::BundleCore::getInstance().getStorage().get(id);
457 
458  try {
459  // process the bundle block (security, compression, ...)
461 
462  ibrcommon::MutexLock l(_write_lock);
463  _stream << ClientHandler::API_STATUS_OK << " BUNDLE LOADED "; sayBundleID(_stream, id); _stream << std::endl;
464  } catch (const ibrcommon::Exception &e) {
465  // clear the register
466  _bundle_reg = dtn::data::Bundle();
467 
468  ibrcommon::MutexLock l(_write_lock);
469  _stream << ClientHandler::API_STATUS_INTERNAL_ERROR << " BUNDLE NOT LOADED" << std::endl;
470  }
471  } catch (const ibrcommon::Exception&) {
472  ibrcommon::MutexLock l(_write_lock);
473  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
474  }
475  }
476  else if (cmd[1] == "clear")
477  {
478  _bundle_reg = dtn::data::Bundle();
479 
480  ibrcommon::MutexLock l(_write_lock);
481  _stream << ClientHandler::API_STATUS_OK << " BUNDLE CLEARED" << std::endl;
482  }
483  else if (cmd[1] == "free")
484  {
485  try {
487  _bundle_reg = dtn::data::Bundle();
488  ibrcommon::MutexLock l(_write_lock);
489  _stream << ClientHandler::API_STATUS_OK << " BUNDLE FREE SUCCESSFUL" << std::endl;
490  } catch (const ibrcommon::Exception&) {
491  ibrcommon::MutexLock l(_write_lock);
492  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
493  }
494  }
495  else if (cmd[1] == "delivered")
496  {
497  if (cmd.size() < 5) throw ibrcommon::Exception("not enough parameters");
498 
499  try {
500  // construct bundle id
501  dtn::data::BundleID id = readBundleID(cmd, 2);
502 
503  // announce this bundle as delivered
506 
507  ibrcommon::MutexLock l(_write_lock);
508  _stream << ClientHandler::API_STATUS_OK << " BUNDLE DELIVERED ACCEPTED" << std::endl;
509  } catch (const ibrcommon::Exception&) {
510  ibrcommon::MutexLock l(_write_lock);
511  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
512  }
513  }
514  else if (cmd[1] == "store")
515  {
516  // store the bundle in the storage
517  try {
519  ibrcommon::MutexLock l(_write_lock);
520  _stream << ClientHandler::API_STATUS_OK << " BUNDLE STORE SUCCESSFUL" << std::endl;
521  } catch (const ibrcommon::Exception&) {
522  ibrcommon::MutexLock l(_write_lock);
523  _stream << ClientHandler::API_STATUS_INTERNAL_ERROR << " BUNDLE STORE FAILED" << std::endl;
524  }
525  }
526  else if (cmd[1] == "send")
527  {
528  // forward the bundle to the storage processing
529  dtn::api::Registration::processIncomingBundle(_endpoint, _bundle_reg);
530 
531  ibrcommon::MutexLock l(_write_lock);
532  _stream << ClientHandler::API_STATUS_OK << " BUNDLE SENT" << std::endl;
533  }
534  else if (cmd[1] == "info")
535  {
536  // transfer bundle data
537  ibrcommon::MutexLock l(_write_lock);
538 
539  _stream << ClientHandler::API_STATUS_OK << " BUNDLE INFO "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
541  }
542  else if (cmd[1] == "block")
543  {
544  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
545 
546  if (cmd[2] == "add")
547  {
549 
550  /* parse an optional offset, where to insert the block */
551  if (cmd.size() > 3)
552  {
553  int offset;
554  istringstream ss(cmd[3]);
555 
556  ss >> offset;
557  if (ss.fail()) throw ibrcommon::Exception("malformed command");
558 
559  if (static_cast<dtn::data::Size>(offset) >= _bundle_reg.size())
560  {
562  }
563  else if(offset == 0)
564  {
566  }
567  else
568  {
569  inserter = dtn::data::BundleBuilder(_bundle_reg, dtn::data::BundleBuilder::MIDDLE, offset);
570  }
571  }
572 
573  ibrcommon::MutexLock l(_write_lock);
574  _stream << ClientHandler::API_STATUS_CONTINUE << " BUNDLE BLOCK ADD" << std::endl;
575 
576  try
577  {
580  {
581  block.set(dtn::data::Block::LAST_BLOCK, true);
582  }
583  else
584  {
585  block.set(dtn::data::Block::LAST_BLOCK, false);
586  }
587  _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK ADD SUCCESSFUL" << std::endl;
588  }
589  catch (const BundleBuilder::DiscardBlockException &ex) {
590  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK DISCARDED" << std::endl;
591  }
593  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK ADD FAILED" << std::endl;
594  }
595  }
596  else if (cmd[2] == "del")
597  {
598  if (cmd.size() < 4) throw ibrcommon::Exception("not enough parameters");
599 
600  int offset;
601  istringstream ss(cmd[3]);
602 
603  ss >> offset;
604  if (ss.fail()) throw ibrcommon::Exception("malformed command");
605 
606  dtn::data::Bundle::iterator it = _bundle_reg.begin();
607  std::advance(it, offset);
608  _bundle_reg.erase(it);
609 
610  ibrcommon::MutexLock l(_write_lock);
611  _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK DEL SUCCESSFUL" << std::endl;
612  }
613  else
614  {
615  ibrcommon::MutexLock l(_write_lock);
616  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
617  }
618  }
619  else
620  {
621  ibrcommon::MutexLock l(_write_lock);
622  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
623  }
624  }
625  else if (cmd[0] == "payload")
626  {
627  // check if there are more commands/parameters
628  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
629 
630  // check if the command is valid
631  // [block-offset] get [[data-offset] [length]]
632 
633  dtn::data::Bundle::iterator block_it = _bundle_reg.begin();
634 
635  size_t cmd_index = 1;
636 
637  // check if a block offset is present
638  std::stringstream ss(cmd[1]);
639  size_t block_offset;
640  ss >> block_offset;
641 
642  if (!ss.fail()) {
643  // block offset present
644  // move forward to the selected block
645  std::advance(block_it, block_offset);
646 
647  // increment command index
648  ++cmd_index;
649  } else {
650  // search for the payload block
651  block_it = _bundle_reg.find(dtn::data::PayloadBlock::BLOCK_TYPE);
652  }
653 
654  // check if a valid block was selected
655  if (block_it == _bundle_reg.end()) {
656  throw ibrcommon::Exception("invalid offset or no payload block found");
657  }
658 
659  // get the selected block
660  dtn::data::Block &block = dynamic_cast<dtn::data::Block&>(**block_it);
661 
662  size_t cmd_remaining = cmd.size() - (cmd_index + 1);
663  if (cmd[cmd_index] == "get")
664  {
665  // lock the API stream
666  ibrcommon::MutexLock l(_write_lock);
667 
668  try {
669  dtn::api::PlainSerializer ps(_stream, _encoding);
670 
671  if (cmd_remaining > 0)
672  {
673  size_t payload_offset = 0;
674  size_t length = 0;
675 
676  /* read the payload offset */
677  ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
678 
679  if (cmd_remaining > 1)
680  {
681  ss.clear(); ss.str(cmd[cmd_index+2]); ss >> length;
682  }
683 
684  // abort here if the stream is no payload block
685  try {
686  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
687 
688  // open the payload BLOB
689  ibrcommon::BLOB::Reference ref = pb.getBLOB();
690  ibrcommon::BLOB::iostream stream = ref.iostream();
691 
692  if (static_cast<std::streamsize>(payload_offset) >= stream.size())
693  throw ibrcommon::Exception("offset out of range");
694 
695  size_t remaining = stream.size() - payload_offset;
696 
697  if ((length > 0) && (remaining > length)) {
698  remaining = length;
699  }
700 
701  // ignore all bytes leading the offset
702  (*stream).ignore(payload_offset);
703 
704  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
705 
706  // write the payload
707  ps.writeData((*stream), remaining);
708 
709  // final line break (mark the end)
710  _stream << std::endl;
711  } catch (const std::bad_cast&) {
712  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED INVALID BLOCK TYPE" << std::endl;
713  }
714  }
715  else
716  {
717  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
718 
719  // write the payload
720  ps.writeData(block);
721 
722  // final line break (mark the end)
723  _stream << std::endl;
724  }
725  } catch (const std::exception &ex) {
726  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED " << ex.what() << std::endl;
727  }
728  }
729  else if (cmd[cmd_index] == "put")
730  {
731  ibrcommon::MutexLock l(_write_lock);
732 
733  // abort there if the stream is no payload block
734  try {
735  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
736 
737  // write continue request to API
738  _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD PUT" << std::endl;
739 
740  size_t payload_offset = 0;
741  if (cmd_remaining > 0)
742  {
743  /* read the payload offset */
744  ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
745  }
746 
747  // open the payload BLOB
748  ibrcommon::BLOB::Reference ref = pb.getBLOB();
749  ibrcommon::BLOB::iostream stream = ref.iostream();
750 
751  // if the offset is valid
752  if (static_cast<std::streamsize>(payload_offset) < stream.size()) {
753  // move the streams put pointer to the given offset
754  (*stream).seekp(payload_offset, ios_base::beg);
755  } else if (payload_offset > 0) {
756  // move put-pointer to the end
757  (*stream).seekp(0, ios_base::end);
758  }
759 
760  /* write the new data into the blob */
762 
763  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD PUT SUCCESSFUL" << std::endl;
764  } catch (std::bad_cast&) {
765  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED INVALID BLOCK TYPE" << std::endl;
766  } catch (const std::exception&) {
767  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED" << std::endl;
768  }
769  }
770  else if (cmd[cmd_index] == "append")
771  {
772  ibrcommon::MutexLock l(_write_lock);
773 
774  // abort there if the stream is no payload block
775  try {
776  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
777 
778  // write continue request to API
779  _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD APPEND" << std::endl;
780 
781  // open the payload BLOB
782  ibrcommon::BLOB::Reference ref = pb.getBLOB();
783  ibrcommon::BLOB::iostream stream = ref.iostream();
784 
785  // move put-pointer to the end
786  (*stream).seekp(0, ios_base::end);
787 
788  /* write the new data into the blob */
790 
791  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD APPEND SUCCESSFUL" << std::endl;
792  } catch (std::bad_cast&) {
793  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED INVALID BLOCK TYPE" << std::endl;
794  } catch (const std::exception&) {
795  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED" << std::endl;
796  }
797  }
798  else if (cmd[cmd_index] == "clear")
799  {
800  ibrcommon::MutexLock l(_write_lock);
801  // abort there if the stream is no payload block
802  try {
803  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
804 
805  // open the payload BLOB
806  ibrcommon::BLOB::Reference ref = pb.getBLOB();
807  ibrcommon::BLOB::iostream stream = ref.iostream();
808 
809  // clear the payload
810  stream.clear();
811 
812  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD CLEAR SUCCESSFUL" << std::endl;
813  } catch (std::bad_cast&) {
814  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD CLEAR FAILED INVALID BLOCK TYPE" << std::endl;
815  }
816  }
817  else if (cmd[cmd_index] == "length")
818  {
819  ibrcommon::MutexLock l(_write_lock);
820  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD LENGTH" << std::endl;
821  _stream << "Length: " << block.getLength() << std::endl;
822  }
823  }
824  else if (cmd[0] == "nodename")
825  {
826  ibrcommon::MutexLock l(_write_lock);
827  _stream << ClientHandler::API_STATUS_OK << " NODENAME " << dtn::core::BundleCore::local.getString() << std::endl;
828  }
829  else
830  {
831  ibrcommon::MutexLock l(_write_lock);
832  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
833  }
834  } catch (const std::exception&) {
835  ibrcommon::MutexLock l(_write_lock);
836  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
837  }
838  }
839  }
840 
841  ExtendedApiHandler::Sender::Sender(ExtendedApiHandler &conn)
842  : _handler(conn)
843  {
844  }
845 
846  ExtendedApiHandler::Sender::~Sender()
847  {
848  ibrcommon::JoinableThread::join();
849  }
850 
851  void ExtendedApiHandler::Sender::__cancellation() throw ()
852  {
853  // abort all blocking calls on the registration object
854  _handler._client.getRegistration().abort();
855  }
856 
857  void ExtendedApiHandler::Sender::finally() throw ()
858  {
859  }
860 
861  void ExtendedApiHandler::Sender::run() throw ()
862  {
863  Registration &reg = _handler._client.getRegistration();
864  try{
865  while(_handler.good()){
866  try{
867  dtn::data::MetaBundle id = reg.receiveMetaBundle();
868 
870  // transform custody signals & status reports into notifies
871  _handler.notifyAdministrativeRecord(id);
872 
873  // announce the delivery of this bundle
874  _handler._client.getRegistration().delivered(id);
875  } else {
876  // notify the client about the new bundle
877  _handler.notifyBundle(id);
878  }
879  } catch (const dtn::storage::NoBundleFoundException&) {
880  reg.wait_for_bundle();
881  }
882 
883  yield();
884  }
885  } catch (const ibrcommon::QueueUnblockedException &ex) {
886  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
887  return;
888  } catch (const ibrcommon::IOException &ex) {
889  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
890  } catch (const dtn::InvalidDataException &ex) {
891  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
892  } catch (const std::exception &ex) {
893  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
894  }
895 
896  try {
897  //FIXME
898 // _handler.stop();
899  } catch (const ibrcommon::ThreadException &ex) {
900  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 50) << ex.what() << IBRCOMMON_LOGGER_ENDL;
901  }
902  }
903 
904  void ExtendedApiHandler::notifyBundle(dtn::data::MetaBundle &bundle)
905  {
906  // put the bundle into the API queue
907  _bundle_queue.push(bundle);
908 
909  // lock the API channel
910  ibrcommon::MutexLock l(_write_lock);
911 
912  // write notification header to API channel
913  _stream << API_STATUS_NOTIFY_BUNDLE << " NOTIFY BUNDLE ";
914 
915  // format the bundle ID and write it to the stream
916  sayBundleID(_stream, bundle);
917 
918  // finalize statement with a line-break
919  _stream << std::endl;
920  }
921 
922  void ExtendedApiHandler::notifyAdministrativeRecord(dtn::data::MetaBundle &bundle)
923  {
924  // load the whole bundle
926 
927  // get the payload block of the bundle
929 
930  try {
931  // try to decode as status report
933  report.read(payload);
934 
935  // lock the API channel
936  ibrcommon::MutexLock l(_write_lock);
937 
938  // write notification header to API channel
939  _stream << API_STATUS_NOTIFY_REPORT << " NOTIFY REPORT ";
940 
941  // write sender EID
942  _stream << b.source.getString() << " ";
943 
944  // format the bundle ID and write it to the stream
945  _stream << report.bundleid.timestamp.toString() << "." << report.bundleid.sequencenumber.toString();
946 
947  if (report.bundleid.isFragment()) {
948  _stream << "." << report.bundleid.fragmentoffset.toString() << ":" << report.bundleid.getPayloadLength() << " ";
949  } else {
950  _stream << " ";
951  }
952 
953  // origin source
954  _stream << report.bundleid.source.getString() << " ";
955 
956  // reason code
957  _stream << (int)report.reasoncode << " ";
958 
960  _stream << "RECEIPT[" << report.timeof_receipt.getTimestamp().toString() << "."
961  << report.timeof_receipt.getNanoseconds().toString() << "] ";
962 
964  _stream << "CUSTODY-ACCEPTANCE[" << report.timeof_custodyaccept.getTimestamp().toString() << "."
965  << report.timeof_custodyaccept.getNanoseconds().toString() << "] ";
966 
968  _stream << "FORWARDING[" << report.timeof_forwarding.getTimestamp().toString() << "."
969  << report.timeof_forwarding.getNanoseconds().toString() << "] ";
970 
972  _stream << "DELIVERY[" << report.timeof_delivery.getTimestamp().toString() << "."
973  << report.timeof_delivery.getNanoseconds().toString() << "] ";
974 
976  _stream << "DELETION[" << report.timeof_deletion.getTimestamp().toString() << "."
977  << report.timeof_deletion.getNanoseconds().toString() << "] ";
978 
979  // finalize statement with a line-break
980  _stream << std::endl;
981 
982  return;
984  // this is not a status report
985  }
986 
987  try {
988  // try to decode as custody signal
990  custody.read(payload);
991 
992  // lock the API channel
993  ibrcommon::MutexLock l(_write_lock);
994 
995  // write notification header to API channel
996  _stream << API_STATUS_NOTIFY_CUSTODY << " NOTIFY CUSTODY ";
997 
998  // write sender EID
999  _stream << b.source.getString() << " ";
1000 
1001  // format the bundle ID and write it to the stream
1002  _stream << custody.bundleid.timestamp.toString() << "." << custody.bundleid.sequencenumber.toString();
1003 
1004  if (custody.bundleid.isFragment()) {
1005  _stream << "." << custody.bundleid.fragmentoffset.toString() << ":" << custody.bundleid.getPayloadLength() << " ";
1006  } else {
1007  _stream << " ";
1008  }
1009 
1010  // origin source
1011  _stream << custody.bundleid.source.getString() << " ";
1012 
1013  if (custody.custody_accepted) {
1014  _stream << "ACCEPTED ";
1015  } else {
1016  _stream << "REJECTED(" << (int)custody.reason << ") ";
1017  }
1018 
1019  // add time of signal to the message
1020  _stream << custody.timeofsignal.getTimestamp().toString() << "." << custody.timeofsignal.getNanoseconds().toString();
1021 
1022  // finalize statement with a line-break
1023  _stream << std::endl;
1024 
1025  return;
1027  // this is not a custody report
1028  }
1029  }
1030 
1031  void ExtendedApiHandler::sayBundleID(ostream &stream, const dtn::data::BundleID &id)
1032  {
1033  stream << id.timestamp.toString() << " " << id.sequencenumber.toString() << " ";
1034 
1035  if (id.isFragment())
1036  {
1037  stream << id.fragmentoffset.toString() << " ";
1038  stream << id.getPayloadLength() << " ";
1039  }
1040 
1041  stream << id.source.getString();
1042  }
1043 
1044  dtn::data::BundleID ExtendedApiHandler::readBundleID(const std::vector<std::string> &data, const size_t start)
1045  {
1046  // load bundle id
1047  std::stringstream ss;
1049 
1050  if ((data.size() - start) < 3)
1051  {
1052  throw ibrcommon::Exception("not enough parameters");
1053  }
1054 
1055  // read timestamp
1056  ss.clear(); ss.str(data[start]);
1057  id.timestamp.read(ss);
1058 
1059  if(ss.fail())
1060  {
1061  throw ibrcommon::Exception("malformed parameters");
1062  }
1063 
1064  // read sequence number
1065  ss.clear(); ss.str(data[start+1]);
1066  id.sequencenumber.read(ss);
1067 
1068  if(ss.fail())
1069  {
1070  throw ibrcommon::Exception("malformed parameters");
1071  }
1072 
1073  // read fragment offset
1074  if ((data.size() - start) > 3)
1075  {
1076  id.setFragment(true);
1077 
1078  // read sequence number
1079  ss.clear(); ss.str(data[start+2]);
1080  id.fragmentoffset.read(ss);
1081 
1082  // read sequence number
1083  ss.clear(); ss.str(data[start+3]);
1084  dtn::data::Length len = 0;
1085  ss >> len;
1086  id.setPayloadLength(len);
1087 
1088  if(ss.fail())
1089  {
1090  throw ibrcommon::Exception("malformed parameters");
1091  }
1092  }
1093 
1094  // read EID
1095  id.source = dtn::data::EID(data[data.size() - 1]);
1096 
1097  // return bundle id
1098  return id;
1099  }
1100  }
1101 }
const std::set< dtn::core::Node > getNeighbors()
void delivered(const dtn::data::MetaBundle &m) const
POSITION getAlignment() const
virtual Registration & getRegistration(const std::string &handle)=0
void setPersistent(ibrcommon::Timer::time_t lifetime)
dtn::data::BundleID bundleid
static dtn::data::EID local
Definition: BundleCore.h:79
const Timestamp & getTimestamp() const
Definition: DTNTime.cpp:58
virtual void read(const dtn::data::PayloadBlock &p)
static Encoding parseEncoding(const std::string &data)
void subscribe(const dtn::data::EID &endpoint)
void readData(std::ostream &stream)
virtual void read(const dtn::data::PayloadBlock &p)
size_t Length
Definition: Number.h:33
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
dtn::data::Timestamp timestamp
Definition: BundleID.h:54
ClientHandler & _client
Definition: ClientHandler.h:51
dtn::data::Block & readBlock(dtn::data::BundleBuilder &builder)
const std::string & getHandle() const
void writeData(const dtn::data::Block &block)
void set(ProcFlags flag, const bool &value)
Definition: Block.cpp:77
void erase(iterator it)
Definition: Bundle.cpp:127
dtn::net::ConnectionManager & getConnectionManager()
Definition: BundleCore.cpp:260
ApiServerInterface & getAPIServer()
iterator begin()
Definition: Bundle.cpp:49
const dtn::data::EID & getDefaultEID() const
virtual void store(const dtn::data::Bundle &bundle)=0
virtual dtn::data::Length getPayloadLength() const
Definition: BundleID.cpp:112
ExtendedApiHandler(ClientHandler &client, ibrcommon::socketstream &stream)
dtn::api::Client * _client
Definition: dtnrecv.cpp:50
Registration & getRegistration()
virtual void remove(const dtn::data::BundleID &id)=0
dtn::data::Number sequencenumber
Definition: BundleID.h:55
std::string toString() const
Definition: SDNV.h:414
void unsubscribe(const dtn::data::EID &endpoint)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
block_list::iterator iterator
Definition: Bundle.h:76
Size size() const
Definition: Bundle.cpp:258
const std::set< dtn::data::EID > getSubscriptions()
static const dtn::data::block_t BLOCK_TYPE
Definition: PayloadBlock.h:38
void switchRegistration(Registration &reg)
virtual Length getLength() const =0
static void processBlocks(dtn::data::Bundle &b)
Definition: BundleCore.cpp:641
std::string getString() const
Definition: EID.cpp:374
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
ibrcommon::BLOB::Reference getBLOB() const
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
ibrcommon::socketstream & _stream
Definition: ClientHandler.h:52
iterator end()
Definition: Bundle.cpp:54
dtn::data::Number fragmentoffset
Definition: BundleID.h:57
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
const Number & getNanoseconds() const
Definition: DTNTime.cpp:63
virtual bool isFragment() const
Definition: BundleID.cpp:122
static std::vector< std::string > tokenize(const std::string &token, const std::string &data, const std::string::size_type max=std::string::npos)
Definition: Utils.cpp:60
dtn::data::EID source
Definition: BundleID.h:53
static BundleCore & getInstance()
Definition: BundleCore.cpp:82