IBR-DTN  1.0.0
dtnoutbox.cpp
Go to the documentation of this file.
1 /*
2  * dtnoutbox.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * David Goltzsche <goltzsch@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 #include "config.h"
24 #include <ibrdtn/api/Client.h>
25 #include <ibrcommon/net/socket.h>
26 #include <ibrcommon/thread/Mutex.h>
27 #include <ibrcommon/thread/MutexLock.h>
28 #include <ibrcommon/thread/SignalHandler.h>
30 #include <ibrcommon/data/BLOB.h>
31 #include <ibrcommon/data/File.h>
32 #include <ibrcommon/appstreambuf.h>
33 #include <ibrcommon/Logger.h>
34 
35 #include "io/TarUtils.h"
36 #include "io/ObservedFile.h"
37 
38 #ifdef HAVE_LIBTFFS
39 #include "io/FatImageReader.h"
40 #endif
41 
42 #include <stdlib.h>
43 #include <iostream>
44 #include <map>
45 #include <vector>
46 #include <sys/types.h>
47 #include <unistd.h>
48 #include <regex.h>
49 #include <getopt.h>
50 
51 typedef std::list<io::ObservedFile> filelist;
52 typedef std::set<io::ObservedFile> fileset;
53 typedef std::set<io::FileHash> hashlist;
54 
55 // set this variable to false to stop the app
56 bool _running = true;
57 
58 //global wait conditional
59 ibrcommon::Conditional _wait_cond;
60 bool _wait_abort = false;
61 
62 const std::string TAG = "dtnoutbox";
63 
64 class config {
65 public:
66  config()
67  : interval(5000), rounds(3), path("/"), regex_str("^\\."),
68  bundle_group(false), invert(false), quiet(false), verbose(false), fat(false), enabled(true)
69  {}
70 
71  //global conf values
72  string name;
73  string outbox;
74  string destination;
75 
76  //optional paramters
77  std::string workdir;
78  std::size_t interval;
79  std::size_t rounds;
80  std::string path;
81  std::string regex_str;
82  regex_t regex;
83 
84  int bundle_group;
85  int invert;
86  int quiet;
87  int verbose;
88  int fat;
89  int enabled;
90 };
91 typedef struct config config_t;
92 
93 struct option long_options[] =
94 {
95  {"destination", required_argument, 0, 'd'},
96  {"help", no_argument, 0, 'h'},
97  {"group", no_argument, 0, 'g'},
98  {"workdir", required_argument, 0, 'w'},
99  {"interval", required_argument, 0, 'i'},
100  {"rounds", required_argument, 0, 'r'},
101  {"path", required_argument, 0, 'p'},
102  {"regex", required_argument, 0, 'R'},
103  {"quiet", no_argument, 0, 'q'},
104  {"verbose", no_argument, 0, 'v'},
105  {0, 0, 0, 0}
106 };
107 
109 {
110 
111  // logging options
112  const unsigned char logopts = 0;
113 
114  // error filter
115  const unsigned char logerr = ibrcommon::Logger::LOGGER_ERR | ibrcommon::Logger::LOGGER_CRIT;
116 
117  // logging filter, everything but debug, err and crit
118  const unsigned char logstd = ibrcommon::Logger::LOGGER_ALL ^ (ibrcommon::Logger::LOGGER_DEBUG | logerr);
119 
120  // syslog filter, everything but DEBUG and NOTICE
121  const unsigned char logsys = ibrcommon::Logger::LOGGER_ALL ^ (ibrcommon::Logger::LOGGER_DEBUG | ibrcommon::Logger::LOGGER_NOTICE);
122 
123  const unsigned char logall = ibrcommon::Logger::LOGGER_ALL;
124 
125  //log notice messages to cout, if quiet not configured
126  if(!conf.quiet)
127  {
128  ibrcommon::Logger::addStream(std::cout, logsys, logopts);
129  if(conf.verbose)
130  {
131  ibrcommon::Logger::addStream(std::cout, logall, logopts);
132  }
133  }
134 
135  // add logging to the cerr
136  ibrcommon::Logger::addStream(std::cerr, logerr, logopts);
137 }
138 
140 {
141  std::cout << "-- dtnoutbox (IBR-DTN) --" << std::endl;
142  std::cout << "Syntax: dtnoutbox [options] <name> <outbox> <destination>" << std::endl;
143  std::cout << " <name> The application name" << std::endl;
144 #ifdef HAVE_LIBTFFS
145  std::cout << " <outbox> Location of outgoing files, directory or vfat-image" << std::endl;
146 #else
147  std::cout << " <outbox> Directory of outgoing files" << std::endl;
148 #endif
149  std::cout << " <destination> The destination EID for all outgoing files" << std::endl << std::endl;
150  std::cout << "* optional parameters *" << std::endl;
151  std::cout << " -h|--help Display this text" << std::endl;
152  std::cout << " -g|--group Receiver is a destination group" << std::endl;
153  std::cout << " -w|--workdir <dir>" << std::endl;
154  std::cout << " Temporary work directory" << std::endl;
155  std::cout << " -i|--interval <milliseconds>" << std::endl;
156  std::cout << " Interval in milliseconds, in which <outbox> is scanned" << std::endl;
157  std::cout << " for new/changed files. default: 5000" << std::endl;
158  std::cout << " -r|--rounds <n> Number of rounds of intervals, after which a unchanged" << std::endl;
159  std::cout << " file is considered as written. default: 3" << std::endl;
160 #ifdef HAVE_LIBTFFS
161  std::cout << " -p|--path <path> Path of outbox within vfat-image. default: /" << std::endl;
162 #endif
163  std::cout << " -R|--regex <regex>" << std::endl;
164  std::cout << " All files in <outbox> matching this regular expression" << std::endl;
165  std::cout << " will be ignored. default: ^\\." << std::endl;
166  std::cout << " -I|--invert Invert the regular expression defined with -R"<< std::endl;
167  std::cout << " -q|--quiet Only print error messages" << std::endl;
168  std::cout << " -v|--verbose print more verbose info messages, only works without -q" << std::endl;
169 
170  _running = false; //stop this app, after printing help
171 
172 }
173 
174 void read_configuration(int argc, char** argv, config_t &conf)
175 {
176  while(1)
177  {
178  /* getopt_long stores the option index here. */
179  int option_index = 0;
180  int c = getopt_long (argc, argv, "hw:i:r:p:R:qvIg",
181  long_options, &option_index);
182  /* Detect the end of the options. */
183  if (c == -1)
184  break;
185 
186  switch (c)
187  {
188  case 0:
189  /* If this option set a flag, do nothing else now. */
190  if (long_options[option_index].flag != 0)
191  break;
192  printf ("option %s", long_options[option_index].name);
193  if (optarg)
194  printf (" with arg %s", optarg);
195  printf ("\n");
196  break;
197 
198  case 'h':
199  print_help();
200  exit(EXIT_SUCCESS);
201  break;
202  case 'g':
203  conf.bundle_group = true;
204  break;
205  case 'w':
206  conf.workdir = std::string(optarg);
207  break;
208  case 'i':
209  conf.interval = atoi(optarg);
210  break;
211  case 'r':
212  conf.rounds = atoi(optarg);
213  break;
214  case 'p':
215  conf.path = std::string(optarg);
216  break;
217  case 'R':
218  conf.regex_str = std::string(optarg);
219  break;
220  case 'q':
221  conf.quiet = true;
222  break;
223  case 'v':
224  conf.verbose = true;
225  break;
226  case 'I':
227  conf.invert = true;
228  break;
229  case '?':
230  break;
231  default:
232  abort();
233  break;
234  }
235  }
236 
237  // print help if not enough parameters are set
238  if ((argc - optind) < 3)
239  {
240  print_help();
241  exit(EXIT_FAILURE);
242  }
243 
244  conf.name = std::string(argv[optind]);
245  conf.destination = std::string(argv[optind+2]);
246  conf.outbox = std::string(argv[optind+1]);
247 
248  //compile regex, if set
249  if (conf.regex_str.length() > 0 && regcomp(&conf.regex,conf.regex_str.c_str(),0))
250  {
251  IBRCOMMON_LOGGER_TAG(TAG,error) << "ERROR: invalid regex: " << optarg << IBRCOMMON_LOGGER_ENDL;
252  exit(-1);
253  }
254 
255 
256  //check outbox path for trailing slash
257  if (conf.outbox.at(conf.outbox.length()-1) == '/')
258  conf.outbox = conf.outbox.substr(0,conf.outbox.length() -1);
259 }
260 
261 void sighandler_func(int signal)
262 {
263  IBRCOMMON_LOGGER_TAG(TAG,notice) << "got signal " << signal << IBRCOMMON_LOGGER_ENDL;
264  switch (signal)
265  {
266  case SIGTERM:
267  case SIGINT:
268  {
269  //stop waiting and stop running, on SIGINT or SIGTERM
270  ibrcommon::MutexLock l(_wait_cond);
271  _running = false;
272  _wait_cond.signal(true);
273  break;
274  }
275 #ifndef __WIN32__
276  case SIGUSR1:
277  {
278  //stop waiting on SIGUSR1 -> "quickscan"
279  ibrcommon::MutexLock l(_wait_cond);
280  _wait_abort = true;
281  _wait_cond.signal(true);
282  break;
283  }
284 #endif
285  default:
286  break;
287  }
288 }
289 
290 /*
291  * main application method
292  */
293 int main( int argc, char** argv )
294 {
295  // catch process signals
296  ibrcommon::SignalHandler sighandler(sighandler_func);
297  sighandler.handle(SIGINT);
298  sighandler.handle(SIGTERM);
299 #ifndef __WIN32__
300  sighandler.handle(SIGUSR1);
301 #endif
302 
303  // configration object
304  config_t conf;
305 
306  // read the configuration
307  read_configuration(argc, argv, conf);
308 
309  init_logger(conf);
310 
311  // initialize sighandler after possible exit call
312  sighandler.initialize();
313 
314  // init working directory
315  if (conf.workdir.length() > 0)
316  {
317  ibrcommon::File blob_path(conf.workdir);
318 
319  if (blob_path.exists())
320  {
321  ibrcommon::BLOB::changeProvider(new ibrcommon::FileBLOBProvider(blob_path), true);
322  }
323  }
324 
325  // backoff for reconnect
326  unsigned int backoff = 2;
327 
328  ibrcommon::File outbox_file(conf.outbox);
329 
330  // create new file lists
331  fileset new_files, prev_files, deleted_files, files_to_send;
332  filelist observed_files;
333  hashlist sent_hashes;
334 
335  // observed root file
336  io::ObservedFile root(ibrcommon::File("/"));
337 
338 #ifdef HAVE_LIBTFFS
339  io::FatImageReader *imagereader = NULL;
340 #endif
341 
342  if (outbox_file.exists() && !outbox_file.isDirectory())
343  {
344 #ifdef HAVE_LIBTFFS
345  conf.fat = true;
346  imagereader = new io::FatImageReader(conf.outbox);
347  const io::FATFile fat_root(*imagereader, conf.path);
348  root = io::ObservedFile(fat_root);
349 #else
350  IBRCOMMON_LOGGER_TAG(TAG,error) << "ERROR: image-file provided, but this tool has been compiled without libtffs support!" << IBRCOMMON_LOGGER_ENDL;
351  return -1;
352 #endif
353  }
354  else
355  {
356  if (!outbox_file.exists()) {
357  ibrcommon::File::createDirectory(outbox_file);
358  }
359  root = io::ObservedFile(outbox_file);
360  }
361 
362  IBRCOMMON_LOGGER_TAG(TAG,info) << "-- dtnoutbox --" << IBRCOMMON_LOGGER_ENDL;
363 
364  // loop, if no stop if requested
365  while (_running)
366  {
367  try
368  {
369  // Create a stream to the server using TCP.
370  ibrcommon::vaddress addr("localhost", 4550);
371  ibrcommon::socketstream conn(new ibrcommon::tcpsocket(addr));
372 
373  // Initiate a client for synchronous receiving
374  dtn::api::Client client(conf.name, conn, dtn::api::Client::MODE_SENDONLY);
375 
376  // Connect to the server. Actually, this function initiate the
377  // stream protocol by starting the thread and sending the contact header.
378  client.connect();
379 
380  // reset backoff if connected
381  backoff = 2;
382 
383  // check the connection
384  while (_running)
385  {
386  // get all files
387  fileset current_files;
388  root.findFiles(current_files);
389 
390  // determine deleted files
391  fileset deleted_files;
392  std::set_difference(prev_files.begin(), prev_files.end(), current_files.begin(), current_files.end(), std::inserter(deleted_files, deleted_files.begin()));
393 
394  // remove deleted files from observation
395  for (fileset::const_iterator iter = deleted_files.begin(); iter != deleted_files.end(); ++iter)
396  {
397  const io::ObservedFile &deletedFile = (*iter);
398 
399  // remove references in the sent_hashes
400  for (hashlist::iterator hash_it = sent_hashes.begin(); hash_it != sent_hashes.end(); /* blank */) {
401  if ((*hash_it).getPath() == deletedFile.getFile().getPath()) {
402  sent_hashes.erase(hash_it++);
403  } else {
404  ++hash_it;
405  }
406  }
407 
408  // remove from observed files
409  observed_files.remove(deletedFile);
410 
411  // output
412  IBRCOMMON_LOGGER_TAG(TAG,info) << "file removed: " << deletedFile.getFile().getBasename() << IBRCOMMON_LOGGER_ENDL;
413  }
414 
415  // determine new files
416  fileset new_files;
417  std::set_difference(current_files.begin(), current_files.end(), prev_files.begin(), prev_files.end(), std::inserter(new_files, new_files.begin()));
418 
419  // add new files to observation
420  for (fileset::const_iterator iter = new_files.begin(); iter != new_files.end(); ++iter)
421  {
422  const io::ObservedFile &of = (*iter);
423 
424  int reg_ret = regexec(&conf.regex, of.getFile().getBasename().c_str(), 0, NULL, 0);
425  if (!reg_ret && !conf.invert)
426  continue;
427  if (reg_ret && conf.invert)
428  continue;
429 
430  // print error message, if regex error occurs
431  if (reg_ret && reg_ret != REG_NOMATCH)
432  {
433  char msgbuf[100];
434  regerror(reg_ret,&conf.regex,msgbuf,sizeof(msgbuf));
435  IBRCOMMON_LOGGER_TAG(TAG,info) << "ERROR: regex match failed : " << std::string(msgbuf) << IBRCOMMON_LOGGER_ENDL;
436  }
437 
438  // add new file to the observed set
439  observed_files.push_back(of);
440 
441  // log output
442  IBRCOMMON_LOGGER_TAG(TAG, info) << "file found: " << of.getFile().getBasename() << IBRCOMMON_LOGGER_ENDL;
443  }
444 
445  // store current files for the next round
446  prev_files.clear();
447  prev_files.insert(current_files.begin(), current_files.end());
448 
449  IBRCOMMON_LOGGER_TAG(TAG, notice)
450  << "file statistics: "
451  << observed_files.size() << " observed, "
452  << deleted_files.size() << " deleted, "
453  << new_files.size() << " new"
454  << IBRCOMMON_LOGGER_ENDL;
455 
456  // find files to send, create std::list
457  files_to_send.clear();
458 
459 
460  IBRCOMMON_LOGGER_TAG(TAG, notice) << "updating observed files:" << IBRCOMMON_LOGGER_ENDL;
461  for (filelist::iterator iter = observed_files.begin(); iter != observed_files.end(); ++iter)
462  {
463  io::ObservedFile &of = (*iter);
464 
465  // tick and update all files
466  of.update();
467 
468  if (of.getStableCounter() > conf.rounds)
469  {
470  if (sent_hashes.find(of.getHash()) == sent_hashes.end())
471  {
472  sent_hashes.insert(of.getHash());
473  files_to_send.insert(*iter);
474  }
475  }
476 
477  IBRCOMMON_LOGGER_TAG(TAG, notice)
478  << "\t"
479  << of.getFile().getBasename() << ": "
480  << of.getStableCounter()
481  << IBRCOMMON_LOGGER_ENDL;
482  }
483 
484  if (!files_to_send.empty())
485  {
486  std::stringstream ss;
487  for (fileset::const_iterator it = files_to_send.begin(); it != files_to_send.end(); ++it) {
488  ss << (*it).getFile().getBasename() << " ";
489  }
490  IBRCOMMON_LOGGER_TAG("dtnoutbox",info) << "files sent: " << ss.str() << IBRCOMMON_LOGGER_ENDL;
491 
492  try {
493  // create a blob
494  ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create();
495 
496  // write files into BLOB while it is locked
497  {
498  ibrcommon::BLOB::iostream stream = blob.iostream();
499  io::TarUtils::write(*stream, root, files_to_send);
500  }
501 
502  // create a new bundle
503  dtn::data::EID destination = EID(conf.destination);
504 
505  // create a new bundle
507 
508  // set destination
509  b.destination = destination;
510 
511  // add payload block using the blob
512  b.push_back(blob);
513 
514  // set destination address to non-singleton, if configured
515  if (conf.bundle_group)
517 
518  // send the bundle
519  client << b;
520  client.flush();
521  } catch (const ibrcommon::IOException &e) {
522  IBRCOMMON_LOGGER_TAG(TAG,error) << "send failed: " << e.what() << IBRCOMMON_LOGGER_ENDL;
523  }
524  }
525 
526  // wait defined seconds
527  ibrcommon::MutexLock l(_wait_cond);
528  IBRCOMMON_LOGGER_TAG(TAG, notice) << conf.interval <<" ms wait" << IBRCOMMON_LOGGER_ENDL;
529  while (!_wait_abort && _running) {
530  _wait_cond.wait(conf.interval);
531  }
532  _wait_abort = false;
533  }
534 
535  // clean up regex
536  regfree(&conf.regex);
537 
538  // close the client connection
539  client.close();
540 
541  // close the connection
542  conn.close();
543 
544  }
545  catch (const ibrcommon::socket_exception&)
546  {
547  if (_running)
548  {
549  IBRCOMMON_LOGGER_TAG(TAG,error) << "Connection to bundle daemon failed. Retry in " << backoff << " seconds." << IBRCOMMON_LOGGER_ENDL;
550  ibrcommon::Thread::sleep(backoff * 1000);
551 
552  // if backoff < 10 minutes
553  if (backoff < 600)
554  {
555  // set a new backoff
556  backoff = backoff * 2;
557  }
558  }
559  }
560  catch (const ibrcommon::IOException&)
561  {
562  if (_running)
563  {
564  IBRCOMMON_LOGGER_TAG(TAG,error) << "Connection to bundle daemon failed. Retry in " << backoff << " seconds." << IBRCOMMON_LOGGER_ENDL;
565  ibrcommon::Thread::sleep(backoff * 1000);
566 
567  // if backoff < 10 minutes
568  if (backoff < 600)
569  {
570  // set a new backoff
571  backoff = backoff * 2;
572  }
573  }
574  }
575  catch (const std::exception&) { };
576  }
577 
578  // clear observed files
579  observed_files.clear();
580 
581 #ifdef HAVE_LIBTFFS
582  // clean-up
583  if (imagereader != NULL) delete imagereader;
584 #endif
585 
586  return (EXIT_SUCCESS);
587 }
const unsigned char logsys
Definition: Main.cpp:64
struct config config_t
Definition: dtnoutbox.cpp:91
void findFiles(std::set< ObservedFile > &files) const
const unsigned char logerr
Definition: Main.cpp:58
struct option long_options[]
Definition: dtnoutbox.cpp:93
size_t getStableCounter() const
bool _wait_abort
Definition: dtnoutbox.cpp:60
std::set< io::FileHash > hashlist
Definition: dtnoutbox.cpp:53
void init_logger(config_t &conf)
Definition: dtnoutbox.cpp:108
std::set< io::ObservedFile > fileset
Definition: dtnoutbox.cpp:52
const std::string TAG
Definition: dtnoutbox.cpp:62
int main(int argc, char **argv)
Definition: dtnoutbox.cpp:293
ibrcommon::Conditional _wait_cond
Definition: dtnoutbox.cpp:59
void sighandler_func(int signal)
Definition: dtnoutbox.cpp:261
static void write(std::ostream &output, const io::ObservedFile &root, const std::set< ObservedFile > &files_to_send)
Definition: TarUtils.cpp:115
bool _running
Definition: dtnoutbox.cpp:56
unsigned char logopts
Definition: Main.cpp:55
void read_configuration(int argc, char **argv, config_t &conf)
Definition: dtnoutbox.cpp:174
std::list< io::ObservedFile > filelist
Definition: dtnoutbox.cpp:51
ibrcommon::File blob_path("/tmp")
const unsigned char logstd
Definition: Main.cpp:61
void print_help()
Definition: dtnoutbox.cpp:139
unidirectional communication is requested, no reception of bundles
Definition: Client.h:134
T & push_back()
Definition: Bundle.h:180
const io::FileHash & getHash() const
void connect()
Definition: Client.cpp:113
void set(FLAGS flag, bool value)
const ibrcommon::File & getFile() const