IBR-DTN  1.0.0
DHTNameService.cpp
Go to the documentation of this file.
1 /*
2  * DHTNameService.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Till Lorentzen <lorenzte@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 
24 #include "DHTNameService.h"
25 #include "ibrcommon/net/vsocket.h"
26 #include "core/EventDispatcher.h"
27 #include "core/BundleCore.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 
31 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/thread/MutexLock.h>
33 
34 #include <sys/socket.h>
35 #include <arpa/inet.h>
36 #include <set>
37 #include <cstdlib>
38 #include <sys/time.h>
39 #include <fcntl.h>
40 #include <string.h>
41 #include <unistd.h>
42 
44  _exiting(false), _initialized(false), _announced(false), _foundNodes(-1),
45  _bootstrapped(0),
46  _config(daemon::Configuration::getInstance().getDHT()) {
47 }
48 
50  join();
51 }
52 
53 const std::string dtn::dht::DHTNameService::getName() const {
54  return "DHT Naming Service";
55 }
56 
58 }
59 
60 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() {
61  for (int i = 0; i < 2; ++i) {
62  int opts;
63  opts = fcntl(_interrupt_pipe[i], F_GETFL);
64  if (opts < 0) {
65  return false;
66  }
67  opts |= O_NONBLOCK;
68  if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) {
69  return false;
70  }
71  }
72  return true;
73 }
74 
76  // register as discovery beacon handler
78 
79  // creating interrupt pipe
80  if (pipe(_interrupt_pipe) < 0) {
81  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "Error " << errno << " creating pipe"
82  << IBRCOMMON_LOGGER_ENDL;
83  return;
84  }
85  // set the pipe to non-blocking
86  if (!this->setNonBlockingInterruptPipe()) {
87  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "Error " << errno
88  << " setting pipe to non-blocking mode"
89  << IBRCOMMON_LOGGER_ENDL;
90  return;
91  }
92  // init DHT
93  int rc = dtn_dht_initstruct(&_context);
94  if (rc < 0) {
95  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Context creation failed: " << rc
96  << IBRCOMMON_LOGGER_ENDL;
97  return;
98  }
99  if (_config.randomPort()) {
100  srand( time(NULL));
101  _context.port = rand() % 64000 + 1024;
102  } else {
103  _context.port = _config.getPort();
104  }
105  if (_config.getIPv4Binding().size() > 0) {
106  _context.bind = _config.getIPv4Binding().c_str();
107  }
108  if (_config.getIPv6Binding().size() > 0) {
109  _context.bind6 = _config.getIPv6Binding().c_str();
110  }
111  if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) {
112  _context.type = BINDBOTH;
113  } else if (_config.isIPv4Enabled()) {
114  _context.type = IPV4ONLY;
115  } else if (_config.isIPv6Enabled()) {
116  _context.type = IPV6ONLY;
117  } else {
118  _context.type = BINDNONE;
119  }
120  string myid = _config.getID();
121  if (!_config.randomID()) {
122  dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size());
123  }
124 
125  rc = dtn_dht_init_sockets(&_context);
126  if (rc != 0) {
127  if(_context.type == BINDBOTH) {
128  if(rc == -2){
129  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv6 socket couldn't be initialized"
130  <<IBRCOMMON_LOGGER_ENDL;
131  } else {
132  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT socket(s) couldn't be initialized"
133  <<IBRCOMMON_LOGGER_ENDL;
134  }
135  dtn_dht_close_sockets(&_context);
136  // Try only IPv4
137  _context.type = IPV4ONLY;
138  rc = dtn_dht_init_sockets(&_context);
139  if(rc != 0){
140  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv4 socket couldn't be initialized"
141  <<IBRCOMMON_LOGGER_ENDL;
142  dtn_dht_close_sockets(&_context);
143  // Try only IPv6
144  _context.type = IPV6ONLY;
145  rc = dtn_dht_init_sockets(&_context);
146  if(rc!=0){
147  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv6 socket couldn't be initialized"
148  <<IBRCOMMON_LOGGER_ENDL;
149  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Sockets couldn't be initialized"
150  <<IBRCOMMON_LOGGER_ENDL;
151  dtn_dht_close_sockets(&_context);
152  return;
153  }
154  }
155  } else {
156  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Sockets couldn't be initialized"
157  <<IBRCOMMON_LOGGER_ENDL;
158  dtn_dht_close_sockets(&_context);
159  return;
160  }
161  }
162  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "Sockets for DHT initialized"
163  <<IBRCOMMON_LOGGER_ENDL;
164  if (!_config.isBlacklistEnabled()) {
165  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "DHT Blacklist disabled"
166  <<IBRCOMMON_LOGGER_ENDL;
167  dtn_dht_blacklist(0);
168  } else {
169  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "DHT Blacklist enabled"
170  <<IBRCOMMON_LOGGER_ENDL;
171  }
172  {
173  ibrcommon::MutexLock l(this->_libmutex);
174  rc = dtn_dht_init(&_context);
175  }
176  if (rc < 0) {
177  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT initialization failed"
178  << IBRCOMMON_LOGGER_ENDL;
179  return;
180  }
181  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT initialized on Port: " << _context.port
182  << " with ID: " << myid << IBRCOMMON_LOGGER_ENDL;
183  this->_initialized = true;
184  // Warning about possibly wrong configuration
185  if(_config.isNeighbourAnnouncementEnabled() &&
187  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT will not announce neighbor nodes: "
188  << "announcement enabled but routing forwarding is disabled"
189  << IBRCOMMON_LOGGER_ENDL;
190  }
191 }
192 
194  if (!this->_initialized) {
195  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT is not initialized"
196  <<IBRCOMMON_LOGGER_ENDL;
197  return;
198  }
199  std::string cltype_;
200  std::string port_;
201  const std::list<dtn::daemon::Configuration::NetConfig>
202  &nets =
204 
205  // for every available interface, build the correct struct
206  for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter =
207  nets.begin(); iter != nets.end(); ++iter) {
208  try {
209  cltype_ = getConvergenceLayerName((*iter));
210  std::stringstream ss;
211  ss << (*iter).port;
212  port_ = ss.str();
213  struct dtn_convergence_layer * clstruct =
214  (struct dtn_convergence_layer*) malloc(
215  sizeof(struct dtn_convergence_layer));
216  clstruct->clname = (char*) malloc(cltype_.size());
217  clstruct->clnamelen = cltype_.size();
218  memcpy(clstruct->clname, cltype_.c_str(), cltype_.size());
219  struct dtn_convergence_layer_arg * arg =
220  (struct dtn_convergence_layer_arg*) malloc(
221  sizeof(struct dtn_convergence_layer_arg));
222  arg->key = (char*) malloc(5);
223  arg->key[4] = '\n';
224 
225 #ifdef HAVE_VMIME
226  if(cltype_ == "email") {
227  memcpy(arg->key, "email", 5);
228  arg->keylen = 5;
229  const std::string &email =
231  arg->value = (char*) malloc(email.size());
232  memcpy(arg->value, email.c_str(), email.size());
233  arg->valuelen = email.size();
234  }else{
235 #endif
236 
237  memcpy(arg->key, "port", 4);
238  arg->keylen = 4;
239  arg->value = (char*) malloc(port_.size());
240  memcpy(arg->value, port_.c_str(), port_.size());
241  arg->valuelen = port_.size();
242 
243 #ifdef HAVE_VMIME
244  }
245 #endif
246 
247  arg->next = NULL;
248  clstruct->args = arg;
249  clstruct->next = _context.clayer;
250  _context.clayer = clstruct;
251  } catch (const std::exception&) {
252  continue;
253  }
254  }
255 
256  // Bootstrapping the DHT
257  int rc, numberOfRandomRequests = 0;
260 
261  // DHT main loop
262  time_t tosleep = 0;
263  struct sockaddr_storage from;
264  socklen_t fromlen = sizeof(sockaddr_storage);
265  ::memset(&from, 0, fromlen);
266 
267  while (!this->_exiting) {
268  if (this->_foundNodes == 0) {
269  bootstrapping();
270  }
271  //Announce myself
272  if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) {
273  this->_announced = true;
274  if (!_config.isSelfAnnouncingEnabled())
275  return;
276  announce(dtn::core::BundleCore::local, SINGLETON);
277  // Read all unknown EIDs and lookup them
279  dtn::storage::BundleStorage & storage = core.getStorage();
280  std::set<dtn::data::EID> eids_ = storage.getDistinctDestinations();
281  std::set<dtn::data::EID>::iterator iterator;
282  for (iterator = eids_.begin(); iterator != eids_.end(); ++iterator) {
283  lookup(*iterator);
284  }
285  std::set<dtn::core::Node> neighbours_ = core.getConnectionManager().getNeighbors();
286  std::set<dtn::core::Node>::iterator neighbouriterator;
287  for (neighbouriterator = neighbours_.begin(); neighbouriterator
288  != neighbours_.end(); ++neighbouriterator) {
289  if (isNeighbourAnnouncable(*neighbouriterator))
290  announce(*neighbouriterator, NEIGHBOUR);
291  }
292  while (!this->cachedLookups.empty()) {
293  lookup(this->cachedLookups.front());
294  this->cachedLookups.pop_front();
295  }
296  }
297  // Faster Bootstrapping by searching for random Keys
298  if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work(
299  &_context) <= 2 && numberOfRandomRequests < 40) {
300  dtn_dht_start_random_lookup(&_context);
301  numberOfRandomRequests++;
302  }
303  struct timeval tv;
304  fd_set readfds;
305  tv.tv_sec = tosleep;
306  tv.tv_usec = random() % 1000000;
307  int high_fd = _interrupt_pipe[0];
308  FD_ZERO(&readfds);
309  FD_SET(_interrupt_pipe[0], &readfds);
310  if (_context.ipv4socket >= 0) {
311  FD_SET(_context.ipv4socket, &readfds);
312  high_fd = max(high_fd, _context.ipv4socket);
313  }
314  if (_context.ipv6socket >= 0) {
315  FD_SET(_context.ipv6socket, &readfds);
316  high_fd = max(high_fd, _context.ipv6socket);
317  }
318  rc = select(high_fd + 1, &readfds, NULL, NULL, &tv);
319  if (rc < 0) {
320  if (errno != EINTR) {
321  IBRCOMMON_LOGGER_TAG("DHTNameService", error)
322  << "select of DHT Sockets failed with error: "
323  << errno << IBRCOMMON_LOGGER_ENDL;
324  ibrcommon::Thread::sleep(1000);
325  }
326  }
327  if (FD_ISSET(_interrupt_pipe[0], &readfds)) {
328  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "unblocked by self-pipe-trick"
329  << IBRCOMMON_LOGGER_ENDL;
330  // this was an interrupt with the self-pipe-trick
331  char buf[2];
332  int read = ::read(_interrupt_pipe[0], buf, 2);
333  if (read <= 2 || _exiting)
334  break;
335  }
336  if (rc > 0) {
337  fromlen = sizeof(from);
338  if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket,
339  &readfds))
340  rc = recvfrom(_context.ipv4socket, _buf, sizeof(_buf) - 1, 0,
341  (struct sockaddr*) &from, &fromlen);
342  else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket,
343  &readfds))
344  rc = recvfrom(_context.ipv6socket, _buf, sizeof(_buf) - 1, 0,
345  (struct sockaddr*) &from, &fromlen);
346  }
347  if (rc > 0) {
348  _buf[rc] = '\0';
349  {
350  ibrcommon::MutexLock l(this->_libmutex);
351  rc = dtn_dht_periodic(&_context, _buf, rc,
352  (struct sockaddr*) &from, fromlen, &tosleep);
353  }
354  } else {
355  {
356  ibrcommon::MutexLock l(this->_libmutex);
357  rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep);
358  }
359  }
360  int numberOfHosts = 0;
361  int numberOfGoodHosts = 0;
362  int numberOfGood6Hosts = 0;
363  unsigned int numberOfBlocksHosts = 0;
364  unsigned int numberOfBlocksHostsIPv4 = 0;
365  unsigned int numberOfBlocksHostsIPv6 = 0;
366  {
367  ibrcommon::MutexLock l(this->_libmutex);
368  if (_context.ipv4socket >= 0)
369  numberOfHosts
370  = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL);
371  if (_context.ipv6socket >= 0)
372  numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts,
373  NULL);
374  numberOfBlocksHosts = dtn_dht_blacklisted_nodes(
375  &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6);
376  }
377  if (this->_foundNodes != numberOfHosts) {
378  if (_config.isBlacklistEnabled()) {
379  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 42) << "DHT Nodes available: "
380  << numberOfHosts << "(Good:" << numberOfGoodHosts
381  << "+" << numberOfGood6Hosts << ") Blocked: "
382  << numberOfBlocksHosts << "("
383  << numberOfBlocksHostsIPv4 << "+"
384  << numberOfBlocksHostsIPv6 << ")"
385  << IBRCOMMON_LOGGER_ENDL;
386 
387  } else {
388  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 42) << "DHT Nodes available: "
389  << numberOfHosts << "(Good:" << numberOfGoodHosts
390  << "+" << numberOfGood6Hosts << ")"
391  << IBRCOMMON_LOGGER_ENDL;
392 
393  }
394  this->_foundNodes = numberOfHosts;
395  }
396  if (rc < 0) {
397  if (errno == EINTR) {
398  continue;
399  } else {
400  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "dtn_dht_periodic failed"
401  << IBRCOMMON_LOGGER_ENDL;
402  if (rc == EINVAL || rc == EFAULT) {
403  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT failed -> stopping DHT"
404  << IBRCOMMON_LOGGER_ENDL;
405  break;
406  }
407  tosleep = 1;
408  }
409  }
410  }
413  this->_initialized = false;
414  if (_config.getPathToNodeFiles().size() > 0) {
415  ibrcommon::MutexLock l(this->_libmutex);
416  int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str());
417  if (save != 0) {
418  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT could not save nodes"
419  << IBRCOMMON_LOGGER_ENDL;
420  } else {
421  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT saved nodes"
422  << IBRCOMMON_LOGGER_ENDL;
423  }
424  }
425  if (this->_announced && _config.isSelfAnnouncingEnabled())
426  deannounce(dtn::core::BundleCore::local);
427  dtn_dht_uninit();
428  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT shut down" << IBRCOMMON_LOGGER_ENDL;
429  // Closes all sockets of the DHT
430  dtn_dht_close_sockets(&_context);
431 
432  dtn_dht_free_convergence_layer_struct(_context.clayer);
433 
434  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT sockets are closed"
435  << IBRCOMMON_LOGGER_ENDL;
436  ::close(_interrupt_pipe[0]);
437  ::close(_interrupt_pipe[1]);
438 }
439 
441  // un-register as discovery beacon handler
443 
444  this->_exiting = true;
445  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT will be shut down"
446  << IBRCOMMON_LOGGER_ENDL;
447  ssize_t written = ::write(_interrupt_pipe[1], "i", 1);
448  if (written < 1) {
449  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT pipeline trick failed"
450  << IBRCOMMON_LOGGER_ENDL;
451  }
452 }
453 
454 void dtn::dht::DHTNameService::lookup(const dtn::data::EID &eid) {
455  if (!dtn::core::BundleCore::local.sameHost(eid)) {
456  if (this->_announced) {
457  std::string eid_ = eid.getNode().getString();
458  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 30) << "DHT Lookup: " << eid_
459  << IBRCOMMON_LOGGER_ENDL;
460  ibrcommon::MutexLock l(this->_libmutex);
461  dtn_dht_lookup(&_context, eid_.c_str(), eid_.size());
462  } else {
463  this->cachedLookups.push_front(eid);
464  }
465  }
466 }
467 
471 void dtn::dht::DHTNameService::announce(const dtn::core::Node &n,
472  enum dtn_dht_lookup_type type) {
473  if (this->_announced) {
474  std::string eid_ = n.getEID().getNode().getString();
475  ibrcommon::MutexLock l(this->_libmutex);
476  int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type);
477  if (rc > 0) {
478  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT Announcing: " << eid_
479  << IBRCOMMON_LOGGER_ENDL;
480  }
481  }
482 }
483 
484 bool dtn::dht::DHTNameService::isNeighbourAnnouncable(
485  const dtn::core::Node &node) {
486  if (!_config.isNeighbourAnnouncementEnabled())
487  return false;
488  // If forwarding of bundles is disabled, do not announce the neighbors to
489  // prevent receiving bundles for neighbor EIDs, which would not be forwarded
491  return false;
492  }
493 
494  try {
495  // get the merged node object
496  const dtn::core::Node n =
498 
499  // Ignore all none discovered and none static nodes
500  // They could only be discovered by the DHT,
501  // so they are not really close neighbours and could be found directly in the DHT.
502  // This prevents the node to be announced by all neighbours, how found this node
503  std::set<dtn::core::Node::Type> types = n.getTypes();
504  if (types.find(dtn::core::Node::NODE_DISCOVERED) == types.end()
505  && (types.find(dtn::core::Node::NODE_STATIC_GLOBAL) == types.end())) {
506  return false;
507  }
508 
509  // Proof, if the neighbour has told us, that he don't want to be published on the DHT
510  std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
511  if (!services.empty()) {
512  for (std::list<dtn::core::Node::Attribute>::const_iterator service =
513  services.begin(); service != services.end(); ++service) {
514  bool proxy = true;
515  std::vector < string > parameters = dtn::utils::Utils::tokenize(
516  ";", (*service).value);
517  std::vector<string>::const_iterator param_iter = parameters.begin();
518 
519  while (param_iter != parameters.end()) {
520  std::vector < string > p = dtn::utils::Utils::tokenize("=",
521  (*param_iter));
522  if (p[0].compare("proxy") == 0) {
523  std::stringstream proxy_stream;
524  proxy_stream << p[1];
525  proxy_stream >> proxy;
526  }
527  ++param_iter;
528  }
529  if (!proxy)
530  return false;
531  }
532  }
533  return true;
534  } catch (const NodeNotAvailableException&) {
535  // do not announce the node if it is
536  // not recognized as neighbor
537  return false;
538  }
539 }
540 
541 void dtn::dht::DHTNameService::deannounce(const dtn::core::Node &n) {
542  std::string eid_ = n.getEID().getNode().getString();
543  ibrcommon::MutexLock l(this->_libmutex);
544  dtn_dht_deannounce(eid_.c_str(), eid_.size());
545  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT Deannounced: " << eid_
546  << IBRCOMMON_LOGGER_ENDL;
547 }
548 
549 std::string dtn::dht::DHTNameService::getConvergenceLayerName(
551  std::string cltype_ = "";
552  switch (net.type) {
554  cltype_ = "tcp";
555  break;
557  cltype_ = "udp";
558  break;
560  cltype_ = "email";
561  break;
562  // case dtn::daemon::Configuration::NetConfig::NETWORK_HTTP:
563  // cltype_ = "http";
564  // break;
565  default:
566  throw ibrcommon::Exception("type of convergence layer not supported");
567  }
568  return cltype_;
569 }
570 
572 {
573  if (!event.bundle.destination.sameHost(dtn::core::BundleCore::local)
574  && !event.bundle.destination.isNone()) {
575  lookup(event.bundle.destination);
576  }
577 }
578 
580 {
581  const dtn::core::Node &n = nodeevent.getNode();
583  switch (nodeevent.getAction()) {
584  case NODE_AVAILABLE:
585  if (isNeighbourAnnouncable(n))
586  announce(n, NEIGHBOUR);
587  pingNode(n);
588  break;
589  case NODE_UNAVAILABLE:
590  deannounce(n);
591  default:
592  break;
593  }
594  }
595 }
596 
597 void dtn::dht::DHTNameService::pingNode(const dtn::core::Node &n) {
598  int rc;
599  std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
600  unsigned int port = 9999;
601  if (!services.empty()) {
602  for (std::list<dtn::core::Node::Attribute>::const_iterator service =
603  services.begin(); service != services.end(); ++service) {
604  std::vector < string > parameters = dtn::utils::Utils::tokenize(
605  ";", (*service).value);
606  std::vector<string>::const_iterator param_iter = parameters.begin();
607  bool portFound = false;
608  while (param_iter != parameters.end()) {
609  std::vector < string > p = dtn::utils::Utils::tokenize("=",
610  (*param_iter));
611  if (p[0].compare("port") == 0) {
612  std::stringstream port_stream;
613  port_stream << p[1];
614  port_stream >> port;
615  portFound = true;
616  }
617  ++param_iter;
618  }
619  // Do not ping the node, if he doesn't tell us, which port he has
620  if(!portFound){
621  continue;
622  }
623  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "trying to ping node "
624  << n.toString() << IBRCOMMON_LOGGER_ENDL;
625  const std::list<std::string> ips = getAllIPAddresses(n);
626  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << ips.size() << " IP Addresses found!"
627  << IBRCOMMON_LOGGER_ENDL;
628  std::string lastip = "";
629  for (std::list<std::string>::const_iterator iter = ips.begin(); iter
630  != ips.end(); ++iter) {
631  const std::string ip = (*iter);
632  // Ignoring double existence of ip's
633  if (ip == lastip) {
634  continue;
635  }
636  // decode address and port
637  struct sockaddr_in sin;
638  memset(&sin, 0, sizeof(sin));
639  sin.sin_family = AF_INET;
640  sin.sin_port = htons(port);
641  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 26) << " --- Using IP address: " << ip
642  <<IBRCOMMON_LOGGER_ENDL;
643  rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr));
644  if (rc == 1) {
645  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 26) << "Pinging node "
646  << n.toString() << " on " << ip << ":" << port
647  << IBRCOMMON_LOGGER_ENDL;
648  ibrcommon::MutexLock l(this->_libmutex);
649  dtn_dht_ping_node((struct sockaddr*) &sin, sizeof(sin));
650  } else {
651  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << " --- ERROR pton: " << rc
652  <<IBRCOMMON_LOGGER_ENDL;
653  }
654  lastip = ip;
655  }
656  }
657  }
658 }
659 
660 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses(
661  const dtn::core::Node &n) {
662  std::string address = "0.0.0.0";
663  unsigned int port = 0;
664  std::list < std::string > ret;
665  const std::list<dtn::core::Node::URI> uri_list = n.get(
667  for (std::list<dtn::core::Node::URI>::const_iterator iter =
668  uri_list.begin(); iter != uri_list.end(); ++iter) {
669  const dtn::core::Node::URI &uri = (*iter);
670  uri.decode(address, port);
671  ret.push_front(address);
672  }
673  const std::list<dtn::core::Node::URI> udp_uri_list = n.get(
675  for (std::list<dtn::core::Node::URI>::const_iterator iter =
676  udp_uri_list.begin(); iter != udp_uri_list.end(); ++iter) {
677  const dtn::core::Node::URI &udp_uri = (*iter);
678  udp_uri.decode(address, port);
679  ret.push_front(address);
680  }
681  return ret;
682 }
683 
684 void dtn::dht::DHTNameService::bootstrapping() {
685  if (this->_bootstrapped > time(NULL) + 30) {
686  return;
687  }
688  this->_bootstrapped = time(NULL);
689  if (_config.getPathToNodeFiles().size() > 0) {
690  bootstrappingFile();
691  }
692  if (_config.isDNSBootstrappingEnabled()) {
693  bootstrappingDNS();
694  }
695  if (_config.isIPBootstrappingEnabled()) {
696  bootstrappingIPs();
697  }
698 }
699 
700 void dtn::dht::DHTNameService::bootstrappingFile() {
701  int rc;
702  {
703  ibrcommon::MutexLock l(this->_libmutex);
704  rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str());
705  }
706  if (rc != 0) {
707  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT loading of saved nodes failed"
708  << IBRCOMMON_LOGGER_ENDL;
709  } else {
710  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT loading of saved nodes done"
711  << IBRCOMMON_LOGGER_ENDL;
712  }
713 }
714 
715 void dtn::dht::DHTNameService::bootstrappingDNS() {
716  int rc;
717  std::vector < string > dns = _config.getDNSBootstrappingNames();
718  if (!dns.empty()) {
719  std::vector<string>::const_iterator dns_iter = dns.begin();
720  while (dns_iter != dns.end()) {
721  const string &dn = (*dns_iter);
722  {
723  ibrcommon::MutexLock l(this->_libmutex);
724  rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL);
725  }
726  if (rc != 0) {
727  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "bootstrapping from domain " << dn
728  << " failed with error: " << rc
729  << IBRCOMMON_LOGGER_ENDL;
730  } else {
731  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25)
732  << "DHT Bootstrapping done for domain" << dn
733  << IBRCOMMON_LOGGER_ENDL;
734  }
735  ++dns_iter;
736  }
737  } else {
738  {
739  ibrcommon::MutexLock l(this->_libmutex);
740  rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL);
741  }
742  if (rc != 0) {
743  IBRCOMMON_LOGGER_TAG("DHTNameService", warning)
744  << "bootstrapping from default domain failed with error: "
745  << rc << IBRCOMMON_LOGGER_ENDL;
746  } else {
747  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25)
748  << "DHT Bootstrapping done for default domain"
749  << IBRCOMMON_LOGGER_ENDL;
750  }
751  }
752 
753 }
754 
755 void dtn::dht::DHTNameService::bootstrappingIPs() {
756  int rc;
757  std::vector < string > ips = _config.getIPBootstrappingIPs();
758  std::vector<string>::const_iterator ip_iter = ips.begin();
759  while (ip_iter != ips.end()) {
760  std::vector < string > ip
761  = dtn::utils::Utils::tokenize(" ", (*ip_iter));
762  int size, ipversion = AF_INET;
763  struct sockaddr *wellknown_node;
764  struct sockaddr_in sin;
765  struct sockaddr_in6 sin6;
766  memset(&sin, 0, sizeof(sin));
767  int port = 9999;
768 
769  // read port
770  if (ip.size() > 1) {
771  port = atoi(ip[1].c_str());
772  }
773 
774  // read address
775  if (!ip.empty()) {
776  rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr));
777  if (rc <= 0) {
778  ipversion = AF_INET6;
779  rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr));
780  if (rc <= 0) {
781  break;
782  } else {
783  sin6.sin6_family = ipversion;
784  sin6.sin6_port = htons(port);
785  size = sizeof(sin6);
786  wellknown_node = (struct sockaddr *) &sin6;
787  }
788  } else {
789  sin.sin_family = ipversion;
790  sin.sin_port = htons(port);
791  size = sizeof(sin);
792  wellknown_node = (struct sockaddr *) &sin;
793  }
794  if (rc == 1) {
795  ibrcommon::MutexLock l(this->_libmutex);
796  dtn_dht_ping_node(wellknown_node, size);
797  }
798  }
799 
800  ++ip_iter;
801  }
802 }
803 
804 // TODO Nur für Interfaces zulassen, auf denen ich gebunden bin!
805 
806 void dtn::dht::DHTNameService::onUpdateBeacon(const ibrcommon::vinterface&, DiscoveryBeacon &beacon)
808  if (this->_initialized) {
809  stringstream service;
810  service << "port=" << this->_context.port << ";";
811  if (!this->_config.isNeighbourAllowedToAnnounceMe()) {
812  service << "proxy=false;";
813  }
814  beacon.addService( DiscoveryService("dhtns", service.str()));
815  } else {
816  if(!this->_config.isNeighbourAllowedToAnnounceMe()) {
817  beacon.addService( DiscoveryService("dhtns", "proxy=false;"));
818  } else {
820  }
821  }
822 }
823 
824 void dtn_dht_handle_lookup_result(const struct dtn_dht_lookup_result *result) {
825  if (result == NULL || result->eid == NULL || result->clayer == NULL) {
826  return;
827  }
828 
829  // Extracting the EID of the answering node
830  std::string eid__;
831  std::string clname__;
832  struct dtn_eid * eid = result->eid;
833  unsigned int i;
834  for (i = 0; i < eid->eidlen; ++i) {
835  eid__ += eid->eid[i];
836  }
837 
838  // Extracting the convergence layer of the node
839  stringstream ss;
840  struct dtn_convergence_layer * cl = result->clayer;
841  if (cl == NULL)
842  return;
843  dtn::core::Node node(eid__);
844  ss << "Adding Node " << eid__ << ": ";
845  // Iterating over the list of convergence layer
846  bool hasCL = false;
847  while (cl != NULL) {
848  enum Node::Protocol proto__;
849  stringstream service;
850  clname__ = "";
851  for (i = 0; i < cl->clnamelen; ++i) {
852  clname__ += cl->clname[i];
853  }
854  if (clname__ == "tcp") {
855  proto__ = Node::CONN_TCPIP;
856  } else if (clname__ == "udp") {
857  proto__ = Node::CONN_UDPIP;
858  } else if (clname__ == "email") {
859  proto__ = Node::CONN_EMAIL;
860  } else {
861  proto__ = Node::CONN_UNDEFINED;
862  //TODO find the right string to be added to service string
863  }
864  // Extracting the arguments of the convergence layer
865  struct dtn_convergence_layer_arg * arg = cl->args;
866  std::string argstring__;
867  while (arg != NULL) {
868  if (arg->keylen <= 0 || arg->valuelen <= 0) {
869  arg = arg->next;
870  continue;
871  }
872  argstring__ = "";
873  for (i = 0; i < arg->keylen; ++i) {
874  argstring__ += arg->key[i];
875  }
876  service << argstring__ << "=";
877  argstring__ = "";
878  for (i = 0; i < arg->valuelen && arg->value[i] != '\0'; ++i) {
879  argstring__ += arg->value[i];
880  }
881  service << argstring__ << ";";
882  arg = arg->next;
883  }
884  ss << clname__ << "(" << service.str() << ") ";
885  // Add the found convergence layer to the node object
886  node.add(
887  Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(),
890  hasCL = true;
891  cl = cl->next;
892  }
893 
894  // Adding the new node to the BundleCore to make it available to the daemon
896  if (hasCL) {
897  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << ss.str() << IBRCOMMON_LOGGER_ENDL;
898  core.getConnectionManager().add(node);
899  }
900 
901  // Extracting all neighbours of the new node if usage is allowed
902  if (!dtn::daemon::Configuration::getInstance().getDHT().ignoreDHTNeighbourInformations()) {
903  struct dtn_eid * neighbour = result->neighbours;
904  std::string neighbour__;
905  while (neighbour) {
906  neighbour__ = "";
907  for (i = 0; i < neighbour->eidlen; ++i) {
908  neighbour__ += neighbour->eid[i];
909  }
910  dtn::data::EID n(neighbour__);
911  dtn::core::Node neighbourNode(n);
912  if (!dtn::core::BundleCore::local.sameHost(n)
913  && !core.getConnectionManager().isNeighbor(n)) {
916  }
917  neighbour = neighbour->next;
918  }
919  }
920  //TODO GROUP HANDLING SHOULD BE IMPLEMENTED HERE!!!
921  struct dtn_eid * group = result->groups;
922  while (group) {
923  group = group->next;
924  }
925  return;
926 }
927 
928 // This function is called, if the DHT has done a search or announcement.
929 // At this moment, it is just ignored
930 void dtn_dht_operation_done(const unsigned char *info_hash){
931  return;
932 }
static Configuration & getInstance(bool reset=false)
void decode(std::string &address, unsigned int &port) const
Definition: Node.cpp:47
const std::set< dtn::core::Node > getNeighbors()
void raiseEvent(const dtn::routing::QueueBundleEvent &evt)
const std::list< NetConfig > & getInterfaces() const
static dtn::data::EID local
Definition: BundleCore.h:79
static void add(EventReceiver< E > *receiver)
bool sameHost(const std::string &other) const
Definition: EID.cpp:322
bool isNeighbor(const dtn::core::Node &)
#define DHT_PATH_EXPIRE_TIMEOUT
#define DHT_DISCOVERED_NODE_PRIORITY
static void remove(const EventReceiver< E > *receiver)
void addRoute(const dtn::data::EID &destination, const dtn::data::EID &nexthop, const dtn::data::Timeout timeout=0)
Definition: BundleCore.cpp:270
dtn::net::ConnectionManager & getConnectionManager()
Definition: BundleCore.cpp:260
const std::string getName() const
dtn::net::DiscoveryAgent & getDiscoveryAgent()
Definition: BundleCore.cpp:265
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
const Configuration::Network & getNetwork() const
void add(const dtn::core::Node &n)
void dtn_dht_handle_lookup_result(const struct dtn_dht_lookup_result *result)
virtual const eid_set getDistinctDestinations()=0
#define DHT_RESULTS_EXPIRE_TIMEOUT
EID getNode() const
Definition: EID.cpp:528
void add(const URI &u)
Definition: Node.cpp:280
std::list< URI > get(Node::Protocol proto) const
Definition: Node.cpp:325
static std::string toString(const Node::Type type)
Definition: Node.cpp:152
std::string getString() const
Definition: EID.cpp:374
void dtn_dht_operation_done(const unsigned char *info_hash)
const Configuration::EMail & getEMail() const
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
dtn::data::EID group
Definition: dtntrigger.cpp:46
const dtn::core::Node getNeighbor(const dtn::data::EID &eid)
static std::vector< std::string > tokenize(const std::string &token, const std::string &data, const std::string::size_type max=std::string::npos)
Definition: Utils.cpp:60
void onUpdateBeacon(const ibrcommon::vinterface &iface, DiscoveryBeacon &beacon)
std::set< Node::Type > getTypes() const
Definition: Node.cpp:385
static BundleCore & getInstance()
Definition: BundleCore.cpp:82