34 #include <ibrcommon/data/File.h>
35 #include <ibrcommon/Logger.h>
36 #include <ibrcommon/thread/MutexLock.h>
42 FileConvergenceLayer::Task::Task(FileConvergenceLayer::Task::Action a,
const dtn::core::Node &n)
47 FileConvergenceLayer::Task::~Task()
56 FileConvergenceLayer::StoreBundleTask::~StoreBundleTask()
92 Task *t = _tasks.poll();
104 case Task::TASK_STORE:
107 StoreBundleTask &sbt =
dynamic_cast<StoreBundleTask&
>(*t);
111 ibrcommon::File path = getPath(sbt.node);
114 std::list<dtn::data::MetaBundle> bundles = scan(path);
118 context.
setPeer(sbt.job.getNeighbor());
137 if (ret != BundleFilter::ACCEPT)
145 ibrcommon::MutexLock l(_blacklist_mutex);
146 if (_blacklist.find(meta) != _blacklist.end())
152 _blacklist.add(meta);
156 replyHandshake(bundle, bundles);
165 for (std::list<dtn::data::MetaBundle>::const_iterator iter = bundles.begin(); iter != bundles.end(); ++iter)
167 if ((*iter) == sbt.job.getBundle())
175 ibrcommon::TemporaryFile filename(path,
"bundle");
185 if (ret != BundleFilter::ACCEPT)
191 std::fstream fs(filename.getPath().c_str(), std::fstream::out);
193 IBRCOMMON_LOGGER_TAG(
"FileConvergenceLayer", info) <<
"write bundle " << sbt.job.getBundle().toString() <<
" to file " << filename.getPath() << IBRCOMMON_LOGGER_ENDL;
202 }
catch (
const ibrcommon::Exception&) {
209 }
catch (
const ibrcommon::Exception&) {
213 }
catch (
const std::bad_cast&) { }
217 }
catch (
const std::exception &ex) {
218 IBRCOMMON_LOGGER_TAG(
"FileConvergenceLayer", error) <<
"error while processing file convergencelayer task: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
222 }
catch (
const ibrcommon::QueueUnblockedException &ex) { };
232 _tasks.push(
new Task(Task::TASK_LOAD, n));
241 ibrcommon::MutexLock l(_blacklist_mutex);
242 _blacklist.expire(time.getTimestamp());
248 return "FileConvergenceLayer";
262 std::list<dtn::data::MetaBundle> ret;
263 std::list<ibrcommon::File> files;
266 getPath(n).getFiles(files);
276 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
278 const ibrcommon::File &f = (*iter);
281 if (f.isSystem())
continue;
285 std::fstream fs(f.getPath().c_str(), std::fstream::in);
296 if ( ( bundle.destination ==
EID() ) || ( bundle.source ==
EID() ) )
303 if ( router.
isKnown(bundle) )
continue;
304 }
catch (
const std::exception&) {
311 std::fstream fs(f.getPath().c_str(), std::fstream::in);
325 if (ret == BundleFilter::ACCEPT)
334 IBRCOMMON_LOGGER_DEBUG_TAG(
"FileConvergenceLayer", 2) <<
"bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
338 IBRCOMMON_LOGGER_DEBUG_TAG(
"FileConvergenceLayer", 2) <<
"invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
343 ibrcommon::File FileConvergenceLayer::getPath(
const dtn::core::Node &n)
348 if (uris.empty())
throw ibrcommon::Exception(
"path not defined");
351 const std::string &uri = uris.front().value;
353 if (uri.substr(0, 7) !=
"file://")
throw ibrcommon::Exception(
"path invalid");
355 return ibrcommon::File(uri.substr(7, uri.length() - 7));
358 std::list<dtn::data::MetaBundle> FileConvergenceLayer::scan(
const ibrcommon::File &path)
360 std::list<dtn::data::MetaBundle> ret;
361 std::list<ibrcommon::File> files;
364 path.getFiles(files);
366 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
368 const ibrcommon::File &f = (*iter);
371 if (f.isSystem())
continue;
375 std::fstream fs(f.getPath().c_str(), std::fstream::in);
388 throw ibrcommon::Exception(
"bundle is expired");
393 }
catch (
const std::exception&) {
394 IBRCOMMON_LOGGER_DEBUG_TAG(
"FileConvergenceLayer", 34) <<
"bundle in file " << f.getPath() <<
" invalid or expired" << IBRCOMMON_LOGGER_ENDL;
397 ibrcommon::File(f).remove();
406 _tasks.push(
new StoreBundleTask(n, job));
409 void FileConvergenceLayer::replyHandshake(
const dtn::data::Bundle &bundle, std::list<dtn::data::MetaBundle> &bl)
413 ibrcommon::BLOB::Reference ref = p.
getBLOB();
418 ibrcommon::BLOB::iostream s = ref.iostream();
434 for (std::list<dtn::data::MetaBundle>::const_iterator iter = bl.begin(); iter != bl.end(); ++iter)
441 ibrcommon::MutexLock l(_blacklist_mutex);
442 for (std::set<dtn::data::MetaBundle>::const_iterator iter = _blacklist.begin(); iter != _blacklist.end(); ++iter)
452 response.addItem(item);
473 ibrcommon::BLOB::Reference ref = p.
getBLOB();
477 ibrcommon::BLOB::iostream ios = ref.iostream();
494 if (ret == BundleFilter::ACCEPT)
void open(const dtn::core::Node &)
static void add(EventReceiver< E > *receiver)
dtn::routing::BaseRouter & getRouter() const
void setBundle(const dtn::data::Bundle &data)
dtn::core::Node::Protocol getDiscoveryProtocol() const
static void remove(const EventReceiver< E > *receiver)
void remove(const Block &block)
bool has(Node::Protocol proto) const
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
bool hasRequest(const dtn::data::Number &identifier) const
void setProtocol(const dtn::core::Node::Protocol &protocol)
virtual void add(const dtn::data::MetaBundle &bundle)
void setPeer(const dtn::data::EID &endpoint)
virtual ~FileConvergenceLayer()
MESSAGE_TYPE getType() const
static const dtn::data::Number identifier
static dtn::data::Timestamp getTime()
void setLimit(const Number &hops)
void raiseEvent(const dtn::core::NodeEvent &evt)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
bool isKnown(const dtn::data::BundleID &id)
std::list< URI > get(Node::Protocol proto) const
iterator find(block_t blocktype)
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
ibrcommon::BLOB::Reference getBLOB() const
const dtn::data::EID & getEID() const
dtn::storage::BundleStorage & getStorage()
const std::string getName() const
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
void set(FLAGS flag, bool value)
bool isApplication(const dtn::data::Number &app) const
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()