27 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/data/Base64Reader.h>
33 #include <ibrcommon/data/Base64Stream.h>
38 #ifdef IBRDTN_SUPPORT_BSP
74 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 60) <<
"ExtendedApiConnection down" << IBRCOMMON_LOGGER_ENDL;
84 }
catch (
const std::exception&) { };
98 std::string::reverse_iterator iter = buffer.rbegin();
99 if ( (*iter) ==
'\r' ) buffer = buffer.substr(0, buffer.length() - 1);
102 if (cmd.empty())
continue;
107 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
109 if (cmd[1] ==
"endpoint")
111 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
113 ibrcommon::MutexLock l(_write_lock);
114 if (cmd[2].length() <= 0) {
133 else if (cmd[1] ==
"encoding")
135 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
144 ibrcommon::MutexLock l(_write_lock);
147 ibrcommon::MutexLock l(_write_lock);
153 ibrcommon::MutexLock l(_write_lock);
157 else if (cmd[0] ==
"endpoint")
159 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
163 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
165 ibrcommon::MutexLock l(_write_lock);
168 if (cmd[2].length() <= 0)
181 else if (cmd[1] ==
"del")
183 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
185 ibrcommon::MutexLock l(_write_lock);
188 if (cmd[2].length() <= 0)
200 if(_endpoint == del_endpoint)
209 else if (cmd[1] ==
"get")
211 ibrcommon::MutexLock l(_write_lock);
216 ibrcommon::MutexLock l(_write_lock);
220 else if (cmd[0] ==
"registration")
222 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
226 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
228 ibrcommon::MutexLock l(_write_lock);
242 else if (cmd[1] ==
"del")
244 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
246 ibrcommon::MutexLock l(_write_lock);
257 if(_endpoint == endpoint)
266 else if (cmd[1] ==
"list")
268 ibrcommon::MutexLock l(_write_lock);
272 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
274 _stream << (*iter).getString() << std::endl;
278 else if (cmd[1] ==
"save")
280 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
282 ibrcommon::Timer::time_t lifetime = 0;
283 std::stringstream ss(cmd[2]);
286 if(ss.fail())
throw ibrcommon::Exception(
"malformed command");
291 ibrcommon::MutexLock l(_write_lock);
294 else if (cmd[1] ==
"load")
296 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
298 const std::string handle = cmd[2];
312 Sender *old_sender = _sender;
314 _sender =
new Sender(*
this);
316 catch (
const std::bad_alloc &ex)
318 _sender = old_sender;
324 ibrcommon::MutexLock l(_write_lock);
329 ibrcommon::MutexLock l(_write_lock);
334 ibrcommon::MutexLock l(_write_lock);
340 ibrcommon::MutexLock l(_write_lock);
344 else if (cmd[0] ==
"neighbor")
346 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
348 if (cmd[1] ==
"list")
350 ibrcommon::MutexLock l(_write_lock);
354 for (std::set<dtn::core::Node>::const_iterator iter = nlist.begin(); iter != nlist.end(); ++iter)
356 _stream << (*iter).getEID().getString() << std::endl;
362 ibrcommon::MutexLock l(_write_lock);
366 else if (cmd[0] ==
"bundle")
368 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
373 ibrcommon::MutexLock l(_write_lock);
380 else if (cmd[2] ==
"binary")
385 else if (cmd[2] ==
"plain")
390 else if (cmd[2] ==
"xml")
399 else if (cmd[1] ==
"put")
402 ibrcommon::MutexLock l(_write_lock);
408 else if (cmd[2] ==
"plain")
415 }
catch (
const std::exception &ex) {
416 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 20) <<
"put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
421 else if (cmd[2] ==
"binary")
428 }
catch (
const std::exception &ex) {
429 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 20) <<
"put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
438 else if (cmd[1] ==
"load")
440 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
444 if (cmd[2] ==
"queue")
446 id = _bundle_queue.take();
451 id = readBundleID(cmd, 2);
462 ibrcommon::MutexLock l(_write_lock);
464 }
catch (
const ibrcommon::Exception &e) {
468 ibrcommon::MutexLock l(_write_lock);
471 }
catch (
const ibrcommon::Exception&) {
472 ibrcommon::MutexLock l(_write_lock);
476 else if (cmd[1] ==
"clear")
480 ibrcommon::MutexLock l(_write_lock);
483 else if (cmd[1] ==
"free")
488 ibrcommon::MutexLock l(_write_lock);
490 }
catch (
const ibrcommon::Exception&) {
491 ibrcommon::MutexLock l(_write_lock);
495 else if (cmd[1] ==
"delivered")
497 if (cmd.size() < 5)
throw ibrcommon::Exception(
"not enough parameters");
507 ibrcommon::MutexLock l(_write_lock);
509 }
catch (
const ibrcommon::Exception&) {
510 ibrcommon::MutexLock l(_write_lock);
514 else if (cmd[1] ==
"store")
519 ibrcommon::MutexLock l(_write_lock);
521 }
catch (
const ibrcommon::Exception&) {
522 ibrcommon::MutexLock l(_write_lock);
526 else if (cmd[1] ==
"send")
531 ibrcommon::MutexLock l(_write_lock);
534 else if (cmd[1] ==
"info")
537 ibrcommon::MutexLock l(_write_lock);
542 else if (cmd[1] ==
"block")
544 if (cmd.size() < 3)
throw ibrcommon::Exception(
"not enough parameters");
554 istringstream ss(cmd[3]);
557 if (ss.fail())
throw ibrcommon::Exception(
"malformed command");
559 if (static_cast<dtn::data::Size>(offset) >= _bundle_reg.
size())
573 ibrcommon::MutexLock l(_write_lock);
596 else if (cmd[2] ==
"del")
598 if (cmd.size() < 4)
throw ibrcommon::Exception(
"not enough parameters");
601 istringstream ss(cmd[3]);
604 if (ss.fail())
throw ibrcommon::Exception(
"malformed command");
607 std::advance(it, offset);
608 _bundle_reg.
erase(it);
610 ibrcommon::MutexLock l(_write_lock);
615 ibrcommon::MutexLock l(_write_lock);
621 ibrcommon::MutexLock l(_write_lock);
625 else if (cmd[0] ==
"payload")
628 if (cmd.size() < 2)
throw ibrcommon::Exception(
"not enough parameters");
635 size_t cmd_index = 1;
638 std::stringstream ss(cmd[1]);
645 std::advance(block_it, block_offset);
655 if (block_it == _bundle_reg.
end()) {
656 throw ibrcommon::Exception(
"invalid offset or no payload block found");
662 size_t cmd_remaining = cmd.size() - (cmd_index + 1);
663 if (cmd[cmd_index] ==
"get")
666 ibrcommon::MutexLock l(_write_lock);
671 if (cmd_remaining > 0)
673 size_t payload_offset = 0;
677 ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
679 if (cmd_remaining > 1)
681 ss.clear(); ss.str(cmd[cmd_index+2]); ss >> length;
689 ibrcommon::BLOB::Reference ref = pb.
getBLOB();
690 ibrcommon::BLOB::iostream stream = ref.iostream();
692 if (static_cast<std::streamsize>(payload_offset) >= stream.size())
693 throw ibrcommon::Exception(
"offset out of range");
695 size_t remaining = stream.size() - payload_offset;
697 if ((length > 0) && (remaining > length)) {
702 (*stream).ignore(payload_offset);
711 }
catch (
const std::bad_cast&) {
725 }
catch (
const std::exception &ex) {
729 else if (cmd[cmd_index] ==
"put")
731 ibrcommon::MutexLock l(_write_lock);
740 size_t payload_offset = 0;
741 if (cmd_remaining > 0)
744 ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
748 ibrcommon::BLOB::Reference ref = pb.
getBLOB();
749 ibrcommon::BLOB::iostream stream = ref.iostream();
752 if (static_cast<std::streamsize>(payload_offset) < stream.size()) {
754 (*stream).seekp(payload_offset, ios_base::beg);
755 }
else if (payload_offset > 0) {
757 (*stream).seekp(0, ios_base::end);
764 }
catch (std::bad_cast&) {
766 }
catch (
const std::exception&) {
770 else if (cmd[cmd_index] ==
"append")
772 ibrcommon::MutexLock l(_write_lock);
782 ibrcommon::BLOB::Reference ref = pb.
getBLOB();
783 ibrcommon::BLOB::iostream stream = ref.iostream();
786 (*stream).seekp(0, ios_base::end);
792 }
catch (std::bad_cast&) {
794 }
catch (
const std::exception&) {
798 else if (cmd[cmd_index] ==
"clear")
800 ibrcommon::MutexLock l(_write_lock);
806 ibrcommon::BLOB::Reference ref = pb.
getBLOB();
807 ibrcommon::BLOB::iostream stream = ref.iostream();
813 }
catch (std::bad_cast&) {
817 else if (cmd[cmd_index] ==
"length")
819 ibrcommon::MutexLock l(_write_lock);
824 else if (cmd[0] ==
"nodename")
826 ibrcommon::MutexLock l(_write_lock);
831 ibrcommon::MutexLock l(_write_lock);
834 }
catch (
const std::exception&) {
835 ibrcommon::MutexLock l(_write_lock);
846 ExtendedApiHandler::Sender::~Sender()
848 ibrcommon::JoinableThread::join();
851 void ExtendedApiHandler::Sender::__cancellation() throw ()
854 _handler._client.getRegistration().abort();
857 void ExtendedApiHandler::Sender::finally() throw ()
861 void ExtendedApiHandler::Sender::run() throw ()
863 Registration ® = _handler._client.getRegistration();
865 while(_handler.good()){
871 _handler.notifyAdministrativeRecord(
id);
874 _handler._client.getRegistration().delivered(
id);
877 _handler.notifyBundle(
id);
880 reg.wait_for_bundle();
885 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
886 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
888 }
catch (
const ibrcommon::IOException &ex) {
889 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
891 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
892 }
catch (
const std::exception &ex) {
893 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
899 }
catch (
const ibrcommon::ThreadException &ex) {
900 IBRCOMMON_LOGGER_DEBUG_TAG(
"ExtendedApiHandler", 50) << ex.what() << IBRCOMMON_LOGGER_ENDL;
907 _bundle_queue.push(bundle);
910 ibrcommon::MutexLock l(_write_lock);
933 report.
read(payload);
936 ibrcommon::MutexLock l(_write_lock);
990 custody.
read(payload);
993 ibrcommon::MutexLock l(_write_lock);
1033 stream <<
id.timestamp.toString() <<
" " <<
id.sequencenumber.toString() <<
" ";
1035 if (
id.isFragment())
1037 stream <<
id.fragmentoffset.toString() <<
" ";
1038 stream <<
id.getPayloadLength() <<
" ";
1041 stream <<
id.source.getString();
1044 dtn::data::BundleID ExtendedApiHandler::readBundleID(
const std::vector<std::string> &data,
const size_t start)
1047 std::stringstream ss;
1050 if ((data.size() - start) < 3)
1052 throw ibrcommon::Exception(
"not enough parameters");
1056 ss.clear(); ss.str(data[start]);
1057 id.timestamp.read(ss);
1061 throw ibrcommon::Exception(
"malformed parameters");
1065 ss.clear(); ss.str(data[start+1]);
1066 id.sequencenumber.read(ss);
1070 throw ibrcommon::Exception(
"malformed parameters");
1074 if ((data.size() - start) > 3)
1076 id.setFragment(
true);
1079 ss.clear(); ss.str(data[start+2]);
1080 id.fragmentoffset.read(ss);
1083 ss.clear(); ss.str(data[start+3]);
1086 id.setPayloadLength(len);
1090 throw ibrcommon::Exception(
"malformed parameters");
const std::set< dtn::core::Node > getNeighbors()
void delivered(const dtn::data::MetaBundle &m) const
POSITION getAlignment() const
virtual Registration & getRegistration(const std::string &handle)=0
void setPersistent(ibrcommon::Timer::time_t lifetime)
dtn::data::BundleID bundleid
static dtn::data::EID local
const Timestamp & getTimestamp() const
virtual void read(const dtn::data::PayloadBlock &p)
static Encoding parseEncoding(const std::string &data)
void subscribe(const dtn::data::EID &endpoint)
void readData(std::ostream &stream)
virtual void read(const dtn::data::PayloadBlock &p)
void setApplication(const dtn::data::Number &app)
dtn::data::Timestamp timestamp
dtn::data::Block & readBlock(dtn::data::BundleBuilder &builder)
const std::string & getHandle() const
void writeData(const dtn::data::Block &block)
void set(ProcFlags flag, const bool &value)
dtn::net::ConnectionManager & getConnectionManager()
ApiServerInterface & getAPIServer()
const dtn::data::EID & getDefaultEID() const
virtual void store(const dtn::data::Bundle &bundle)=0
virtual dtn::data::Length getPayloadLength() const
ExtendedApiHandler(ClientHandler &client, ibrcommon::socketstream &stream)
dtn::api::Client * _client
Registration & getRegistration()
virtual void remove(const dtn::data::BundleID &id)=0
dtn::data::Number sequencenumber
virtual ~ExtendedApiHandler()
std::string toString() const
DTNTime timeof_forwarding
virtual void __cancellation()
void unsubscribe(const dtn::data::EID &endpoint)
dtn::data::BundleID bundleid
DTNTime timeof_custodyaccept
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
block_list::iterator iterator
const std::set< dtn::data::EID > getSubscriptions()
static const dtn::data::block_t BLOCK_TYPE
void switchRegistration(Registration ®)
virtual Length getLength() const =0
static void processBlocks(dtn::data::Bundle &b)
std::string getString() const
iterator find(block_t blocktype)
ibrcommon::BLOB::Reference getBLOB() const
dtn::storage::BundleStorage & getStorage()
ibrcommon::socketstream & _stream
dtn::data::Number fragmentoffset
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
const Number & getNanoseconds() const
virtual bool isFragment() const
static std::vector< std::string > tokenize(const std::string &token, const std::string &data, const std::string::size_type max=std::string::npos)
static BundleCore & getInstance()