IBR-DTN  1.0.0
FileConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * FileConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
26 #include "core/EventDispatcher.h"
27 #include "core/BundleEvent.h"
28 #include "core/BundleCore.h"
29 #include "routing/BaseRouter.h"
30 #include "routing/NodeHandshake.h"
31 #include <ibrdtn/data/BundleSet.h>
33 #include <ibrdtn/utils/Clock.h>
34 #include <ibrcommon/data/File.h>
35 #include <ibrcommon/Logger.h>
36 #include <ibrcommon/thread/MutexLock.h>
37 
38 namespace dtn
39 {
40  namespace net
41  {
42  FileConvergenceLayer::Task::Task(FileConvergenceLayer::Task::Action a, const dtn::core::Node &n)
43  : action(a), node(n)
44  {
45  }
46 
47  FileConvergenceLayer::Task::~Task()
48  {
49  }
50 
51  FileConvergenceLayer::StoreBundleTask::StoreBundleTask(const dtn::core::Node &n, const dtn::net::BundleTransfer &j)
52  : FileConvergenceLayer::Task(TASK_STORE, n), job(j)
53  {
54  }
55 
56  FileConvergenceLayer::StoreBundleTask::~StoreBundleTask()
57  {
58  }
59 
61  {
62  }
63 
65  {
66  }
67 
69  {
70  // routine checked for throw() on 15.02.2013
73  }
74 
76  {
77  // routine checked for throw() on 15.02.2013
80  }
81 
83  {
84  _tasks.abort();
85  }
86 
88  {
89  try {
90  while (true)
91  {
92  Task *t = _tasks.poll();
93 
94  try {
95  switch (t->action)
96  {
97  case Task::TASK_LOAD:
98  {
99  // load bundles (receive)
100  load(t->node);
101  break;
102  }
103 
104  case Task::TASK_STORE:
105  {
106  try {
107  StoreBundleTask &sbt = dynamic_cast<StoreBundleTask&>(*t);
109 
110  // get the file path of the node
111  ibrcommon::File path = getPath(sbt.node);
112 
113  // scan for bundles
114  std::list<dtn::data::MetaBundle> bundles = scan(path);
115 
116  // create a filter context
117  dtn::core::FilterContext context;
118  context.setPeer(sbt.job.getNeighbor());
119  context.setProtocol(getDiscoveryProtocol());
120 
121  try {
122  // check if bundle is a routing bundle
123  const dtn::data::EID &source = sbt.job.getBundle().source;
124 
125  if (source.isApplication("routing"))
126  {
127  // read the bundle out of the storage
128  dtn::data::Bundle bundle = storage.get(sbt.job.getBundle());
130 
131  if (bundle.destination.isApplication("routing"))
132  {
133  // push bundle through the filter routines
134  context.setBundle(bundle);
136 
137  if (ret != BundleFilter::ACCEPT)
138  {
140  continue;
141  }
142 
143  // add this bundle to the blacklist
144  {
145  ibrcommon::MutexLock l(_blacklist_mutex);
146  if (_blacklist.find(meta) != _blacklist.end())
147  {
148  // send transfer aborted event
150  continue;
151  }
152  _blacklist.add(meta);
153  }
154 
155  // create ECM reply
156  replyHandshake(bundle, bundles);
157 
158  // raise bundle event
159  sbt.job.complete();
160  continue;
161  }
162  }
163 
164  // check if bundle is already in the path
165  for (std::list<dtn::data::MetaBundle>::const_iterator iter = bundles.begin(); iter != bundles.end(); ++iter)
166  {
167  if ((*iter) == sbt.job.getBundle())
168  {
169  // send transfer aborted event
171  continue;
172  }
173  }
174 
175  ibrcommon::TemporaryFile filename(path, "bundle");
176 
177  try {
178  // read the bundle out of the storage
179  dtn::data::Bundle bundle = storage.get(sbt.job.getBundle());
180 
181  // push bundle through the filter routines
182  context.setBundle(bundle);
184 
185  if (ret != BundleFilter::ACCEPT)
186  {
188  continue;
189  }
190 
191  std::fstream fs(filename.getPath().c_str(), std::fstream::out);
192 
193  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", info) << "write bundle " << sbt.job.getBundle().toString() << " to file " << filename.getPath() << IBRCOMMON_LOGGER_ENDL;
194 
196 
197  // serialize the bundle
198  s << bundle;
199 
200  // raise bundle event
201  sbt.job.complete();
202  } catch (const ibrcommon::Exception&) {
203  filename.remove();
204  throw;
205  }
206  } catch (const dtn::storage::NoBundleFoundException&) {
207  // send transfer aborted event
209  } catch (const ibrcommon::Exception&) {
210  // something went wrong - requeue transfer for later
211  }
212 
213  } catch (const std::bad_cast&) { }
214  break;
215  }
216  }
217  } catch (const std::exception &ex) {
218  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", error) << "error while processing file convergencelayer task: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
219  }
220  delete t;
221  }
222  } catch (const ibrcommon::QueueUnblockedException &ex) { };
223  }
224 
226  {
227  if (node.getAction() == dtn::core::NODE_AVAILABLE)
228  {
229  const dtn::core::Node &n = node.getNode();
231  {
232  _tasks.push(new Task(Task::TASK_LOAD, n));
233  }
234  }
235  }
236 
238  {
239  if (time.getAction() == dtn::core::TIME_SECOND_TICK)
240  {
241  ibrcommon::MutexLock l(_blacklist_mutex);
242  _blacklist.expire(time.getTimestamp());
243  }
244  }
245 
246  const std::string FileConvergenceLayer::getName() const
247  {
248  return "FileConvergenceLayer";
249  }
250 
252  {
254  }
255 
257  {
258  }
259 
260  void FileConvergenceLayer::load(const dtn::core::Node &n)
261  {
262  std::list<dtn::data::MetaBundle> ret;
263  std::list<ibrcommon::File> files;
264 
265  // list all files in the folder
266  getPath(n).getFiles(files);
267 
268  // get a reference to the router
270 
271  // create a filter context
272  dtn::core::FilterContext context;
273  context.setPeer(n.getEID());
274  context.setProtocol(getDiscoveryProtocol());
275 
276  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
277  {
278  const ibrcommon::File &f = (*iter);
279 
280  // skip system files
281  if (f.isSystem()) continue;
282 
283  try {
284  // open the file
285  std::fstream fs(f.getPath().c_str(), std::fstream::in);
286 
287  // get a deserializer
289 
290  dtn::data::MetaBundle bundle;
291 
292  // load meta data
293  d >> bundle;
294 
295  // check the bundle
296  if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
297  {
298  // invalid bundle!
299  throw dtn::data::Validator::RejectedException("destination or source EID is null");
300  }
301 
302  // ask if the bundle is already known
303  if ( router.isKnown(bundle) ) continue;
304  } catch (const std::exception&) {
305  // bundle could not be read
306  continue;
307  }
308 
309  try {
310  // open the file
311  std::fstream fs(f.getPath().c_str(), std::fstream::in);
312 
313  // get a deserializer
315 
316  dtn::data::Bundle bundle;
317 
318  // load meta data
319  d >> bundle;
320 
321  // push bundle through the filter routines
322  context.setBundle(bundle);
324 
325  if (ret == BundleFilter::ACCEPT)
326  {
327  // raise default bundle received event
328  dtn::net::BundleReceivedEvent::raise(n.getEID(), bundle, false);
329  }
330  }
332  {
333  // display the rejection
334  IBRCOMMON_LOGGER_DEBUG_TAG("FileConvergenceLayer", 2) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
335  }
336  catch (const dtn::InvalidDataException &ex) {
337  // display the rejection
338  IBRCOMMON_LOGGER_DEBUG_TAG("FileConvergenceLayer", 2) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
339  }
340  }
341  }
342 
343  ibrcommon::File FileConvergenceLayer::getPath(const dtn::core::Node &n)
344  {
345  std::list<dtn::core::Node::URI> uris = n.get(dtn::core::Node::CONN_FILE);
346 
347  // abort the transfer, if no URI exists
348  if (uris.empty()) throw ibrcommon::Exception("path not defined");
349 
350  // get the URI of the file path
351  const std::string &uri = uris.front().value;
352 
353  if (uri.substr(0, 7) != "file://") throw ibrcommon::Exception("path invalid");
354 
355  return ibrcommon::File(uri.substr(7, uri.length() - 7));
356  }
357 
358  std::list<dtn::data::MetaBundle> FileConvergenceLayer::scan(const ibrcommon::File &path)
359  {
360  std::list<dtn::data::MetaBundle> ret;
361  std::list<ibrcommon::File> files;
362 
363  // list all files in the folder
364  path.getFiles(files);
365 
366  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
367  {
368  const ibrcommon::File &f = (*iter);
369 
370  // skip system files
371  if (f.isSystem()) continue;
372 
373  try {
374  // open the file
375  std::fstream fs(f.getPath().c_str(), std::fstream::in);
376 
377  // get a deserializer
379 
381 
382  // load meta data
383  d >> meta;
384 
385  if (meta.expiretime < dtn::utils::Clock::getTime())
386  {
388  throw ibrcommon::Exception("bundle is expired");
389  }
390 
391  // put the meta bundle in the list
392  ret.push_back(meta);
393  } catch (const std::exception&) {
394  IBRCOMMON_LOGGER_DEBUG_TAG("FileConvergenceLayer", 34) << "bundle in file " << f.getPath() << " invalid or expired" << IBRCOMMON_LOGGER_ENDL;
395 
396  // delete the file
397  ibrcommon::File(f).remove();
398  }
399  }
400 
401  return ret;
402  }
403 
405  {
406  _tasks.push(new StoreBundleTask(n, job));
407  }
408 
409  void FileConvergenceLayer::replyHandshake(const dtn::data::Bundle &bundle, std::list<dtn::data::MetaBundle> &bl)
410  {
411  // read the ecm
413  ibrcommon::BLOB::Reference ref = p.getBLOB();
415 
416  // locked within this region
417  {
418  ibrcommon::BLOB::iostream s = ref.iostream();
419  (*s) >> request;
420  }
421 
422  // if this is a request answer with an summary vector
424  {
425  // create a new request for the summary vector of the neighbor
427 
429  {
430  // add own summary vector to the message
432 
433  // add bundles in the path
434  for (std::list<dtn::data::MetaBundle>::const_iterator iter = bl.begin(); iter != bl.end(); ++iter)
435  {
436  vec.add(*iter);
437  }
438 
439  // add bundles from the blacklist
440  {
441  ibrcommon::MutexLock l(_blacklist_mutex);
442  for (std::set<dtn::data::MetaBundle>::const_iterator iter = _blacklist.begin(); iter != _blacklist.end(); ++iter)
443  {
444  vec.add(*iter);
445  }
446  }
447 
448  // create an item
450 
451  // add it to the handshake
452  response.addItem(item);
453  }
454 
455  // create a new bundle
456  dtn::data::Bundle answer;
457 
458  // set the source of the bundle
459  answer.source = bundle.destination;
460 
461  // set the destination of the bundle
463  answer.destination = bundle.source;
464 
465  // limit the lifetime to 60 seconds
466  answer.lifetime = 60;
467 
468  // set high priority
471 
473  ibrcommon::BLOB::Reference ref = p.getBLOB();
474 
475  // serialize the request into the payload
476  {
477  ibrcommon::BLOB::iostream ios = ref.iostream();
478  (*ios) << response;
479  }
480 
481  // add a schl block
483  schl.setLimit(1);
484 
485  // create a filter context
486  dtn::core::FilterContext context;
487  context.setPeer(answer.source);
488  context.setProtocol(getDiscoveryProtocol());
489 
490  // push bundle through the filter routines
491  context.setBundle(answer);
493 
494  if (ret == BundleFilter::ACCEPT)
495  {
496  // raise default bundle received event
498  }
499  }
500  }
501  } /* namespace net */
502 } /* namespace dtn */
void open(const dtn::core::Node &)
static void add(EventReceiver< E > *receiver)
T & push_front()
Definition: Bundle.h:161
dtn::routing::BaseRouter & getRouter() const
Definition: BundleCore.cpp:227
void setBundle(const dtn::data::Bundle &data)
dtn::core::Node::Protocol getDiscoveryProtocol() const
static void remove(const EventReceiver< E > *receiver)
void remove(const Block &block)
Definition: Bundle.cpp:99
bool has(Node::Protocol proto) const
Definition: Node.cpp:262
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
Definition: BundleCore.cpp:598
bool hasRequest(const dtn::data::Number &identifier) const
void setProtocol(const dtn::core::Node::Protocol &protocol)
virtual void add(const dtn::data::MetaBundle &bundle)
Definition: BundleSet.cpp:95
void setPeer(const dtn::data::EID &endpoint)
MESSAGE_TYPE getType() const
static const dtn::data::Number identifier
Definition: NodeHandshake.h:66
EID getNode() const
Definition: EID.cpp:528
static dtn::data::Timestamp getTime()
Definition: Clock.cpp:167
void raiseEvent(const dtn::core::NodeEvent &evt)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
bool isKnown(const dtn::data::BundleID &id)
Definition: BaseRouter.cpp:570
std::list< URI > get(Node::Protocol proto) const
Definition: Node.cpp:325
T & push_back()
Definition: Bundle.h:180
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
Definition: BundleEvent.cpp:78
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
ibrcommon::BLOB::Reference getBLOB() const
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
const std::string getName() const
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
void set(FLAGS flag, bool value)
dtn::data::EID source
Definition: BundleID.h:53
bool isApplication(const dtn::data::Number &app) const
Definition: EID.cpp:455
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82