33 #include <ibrcommon/TimeMeasurement.h>
34 #include <ibrcommon/Logger.h>
39 #define suseconds_t long
46 const unsigned int DTNTPWorker::PROTO_VERSION = 1;
47 const std::string DTNTPWorker::TAG =
"DTNTPWorker";
48 DTNTPWorker::TimeSyncState DTNTPWorker::_sync_state;
51 : sync_threshold(0.15f), base_rating(0.0), psi(0.99), sigma(1.0), last_sync_set(false)
63 return static_cast<double>(val.tv_sec) + (static_cast<double>(val.tv_nsec) / 1000000000.0);
67 : _announce_rating(false), _sync(false)
69 AbstractWorker::initialize(
"dtntp");
84 IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) <<
"Expiration limited due to wrong local clock." << IBRCOMMON_LOGGER_ENDL;
101 if (_sync || _announce_rating) {
102 IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, info) <<
"Time-Synchronization enabled: " << (conf.
hasReference() ?
"master mode" :
"slave mode") << IBRCOMMON_LOGGER_ENDL;
120 : type(TIMESYNC_REQUEST), origin_rating(dtn::utils::Clock::getRating()), peer_rating(0.0)
134 std::stringstream ss;
136 stream << (char)obj.
type;
156 std::stringstream ss;
165 ss.str((
const std::string&)bs);
176 ss.str((
const std::string&)bs);
192 ibrcommon::MutexLock l(_sync_lock);
199 ibrcommon::MutexLock l(_peer_lock);
200 for (peer_map::iterator iter = _peers.begin(); iter != _peers.end();)
202 const SyncPeer &peer = (*iter).second;
205 if (peer.isExpired()) {
206 _peers.erase(iter++);
222 if (t.getTimestamp() > 0)
225 IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) <<
"The local clock seems to be okay again." << IBRCOMMON_LOGGER_ENDL;
235 struct timespec ts_now, ts_diff;
236 ibrcommon::MonotonicClock::gettime(ts_now);
237 ibrcommon::MonotonicClock::diff(_sync_state.
last_sync_time, ts_now, ts_diff);
253 float best_rating = 0.0;
260 for (std::set<dtn::core::Node>::const_iterator iter = nodes.begin(); iter != nodes.end(); ++iter)
262 float rating = getPeerRating(*iter);
264 if (rating > best_rating) {
265 best_peer = (*iter).getEID();
266 best_rating = rating;
271 if (!best_peer.
isNone()) syncWith(best_peer);
278 if (!node.
has(
"dtntp"))
return 0.0;
281 const std::list<dtn::core::Node::Attribute> attrs = node.
get(
"dtntp");
283 if (attrs.empty())
return 0.0;
286 unsigned int version = 0;
289 decode(attrs.front(), version, timestamp, rating);
292 if (version != 1)
return 0.0;
304 ibrcommon::MutexLock l(_peer_lock);
305 const peer_map::const_iterator it = _peers.find(peer);
306 if (it != _peers.end())
308 const SyncPeer &peer = (*it).second;
311 if (peer.state != SyncPeer::STATE_IDLE)
318 SyncPeer &p = _peers[peer];
320 p.state = SyncPeer::STATE_PREPARE;
327 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
331 ibrcommon::BLOB::iostream stream = ref.iostream();
336 ibrcommon::MutexLock l(_peer_lock);
339 SyncPeer &p = _peers[peer];
340 p.request_timestamp = msg.origin_timestamp;
341 ibrcommon::MonotonicClock::gettime(p.request_monotonic_time);
342 p.state = SyncPeer::STATE_REQUEST;
358 b.destination = peer;
361 b.destination.setApplication(
"dtntp");
378 }
catch (
const ibrcommon::IOException &ex) {
379 IBRCOMMON_LOGGER_TAG(
DTNTPWorker::TAG, error) <<
"error while synchronizing, Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
387 std::stringstream ss;
396 std::vector<std::string>::const_iterator param_iter = parameters.begin();
398 while (param_iter != parameters.end())
402 if (p[0].compare(
"version") == 0)
404 std::stringstream ss(p[1]);
408 if (p[0].compare(
"timestamp") == 0)
413 if (p[0].compare(
"quality") == 0)
415 std::stringstream ss(p[1]);
423 bool DTNTPWorker::hasReference()
const {
424 return (_sync_state.
sigma == 1.0);
427 void DTNTPWorker::sync(
const TimeSyncMessage &msg,
const struct timeval &tv_offset,
const struct timeval &tv_local,
const struct timeval &tv_remote)
430 if (hasReference())
return;
432 ibrcommon::MutexLock l(_sync_lock);
443 struct timespec ts_now, ts_diff;
444 ibrcommon::MonotonicClock::gettime(ts_now);
445 ibrcommon::MonotonicClock::diff(_sync_state.
last_sync_time, ts_now, ts_diff);
450 if (t_stable > 0.0) {
451 double sigma_base = (1 / ::pow(_sync_state.
psi, 1/t_stable));
452 double sigma_adjustment = ::fabs(remote_time - local_time) / t_stable * msg.peer_rating;
453 _sync_state.
sigma = sigma_base + sigma_adjustment;
455 IBRCOMMON_LOGGER_DEBUG_TAG(
DTNTPWorker::TAG, 25) <<
"new sigma: " << _sync_state.
sigma << IBRCOMMON_LOGGER_ENDL;
459 if (local_time > remote_time) {
461 _sync_state.
base_rating = msg.peer_rating * (remote_time / local_time);
464 _sync_state.
base_rating = msg.peer_rating * (local_time / remote_time);
491 char type = 0; (*p.
getBLOB().iostream()).
get(type);
518 ibrcommon::BLOB::Reference ref = p.
getBLOB();
519 ibrcommon::BLOB::iostream stream = ref.iostream();
559 if (!age_it.next(b.
end()))
throw ibrcommon::Exception(
"first ageblock missing");
562 if (!age_it.next(b.
end()))
throw ibrcommon::Exception(
"second ageblock missing");
565 timeval tv_rtt_measured, tv_local_timestamp, tv_rtt, tv_prop_delay, tv_sync_delay, tv_peer_timestamp, tv_offset;
567 timerclear(&tv_rtt_measured);
568 tv_rtt_measured.tv_sec = origin_age.
getSeconds().
get<time_t>();
571 ibrcommon::BLOB::Reference ref = p.
getBLOB();
572 ibrcommon::BLOB::iostream stream = ref.iostream();
579 ibrcommon::MutexLock l(_peer_lock);
582 const peer_map::const_iterator it = _peers.find(b.
source.
getNode());
583 if (it == _peers.end())
break;
585 const SyncPeer &p = (*it).second;
588 if (p.state != SyncPeer::STATE_REQUEST)
592 if ((p.request_timestamp.tv_sec != msg.origin_timestamp.tv_sec) ||
593 (p.request_timestamp.tv_usec != msg.origin_timestamp.tv_usec))
597 struct timespec diff, now;
598 ibrcommon::MonotonicClock::gettime(now);
599 ibrcommon::MonotonicClock::diff(p.request_monotonic_time, now, diff);
601 tv_rtt.tv_sec = diff.tv_sec;
602 tv_rtt.tv_usec = diff.tv_nsec / 1000;
613 IBRCOMMON_LOGGER_TAG(
DTNTPWorker::TAG, warning) <<
"RTT " << rtt <<
" is too small" << IBRCOMMON_LOGGER_ENDL;
617 double prop_delay = 0.0;
622 if (rtt <= rtt_measured) {
623 timerclear(&tv_prop_delay);
624 IBRCOMMON_LOGGER_TAG(
DTNTPWorker::TAG, warning) <<
"Prop. delay " << prop_delay <<
" is smaller than the tracked time (" << rtt_measured <<
")" << IBRCOMMON_LOGGER_ENDL;
626 timersub(&tv_rtt, &tv_rtt_measured, &tv_prop_delay);
631 tv_prop_delay.tv_sec /= 2;
632 tv_prop_delay.tv_usec /= 2;
635 timerclear(&tv_sync_delay);
640 timeradd(&msg.peer_timestamp, &tv_sync_delay, &tv_peer_timestamp);
643 timeradd(&msg.peer_timestamp, &tv_prop_delay, &tv_peer_timestamp);
646 timersub(&tv_local_timestamp, &tv_peer_timestamp, &tv_offset);
652 sync(msg, tv_offset, tv_local_timestamp, tv_peer_timestamp);
655 ibrcommon::MutexLock l(_peer_lock);
657 p.state = SyncPeer::STATE_SYNC;
663 }
catch (
const ibrcommon::Exception&) { };
668 return DTNTPWorker::_sync_state;
671 DTNTPWorker::SyncPeer::SyncPeer()
674 request_monotonic_time.tv_sec = 0;
675 request_monotonic_time.tv_nsec = 0;
677 request_timestamp.tv_sec = 0;
678 request_timestamp.tv_usec = 0;
683 DTNTPWorker::SyncPeer::~SyncPeer()
687 void DTNTPWorker::SyncPeer::touch()
692 bool DTNTPWorker::SyncPeer::isExpired()
const
static Configuration & getInstance(bool reset=false)
const std::set< dtn::core::Node > getNeighbors()
const Configuration::TimeSync & getTimeSync() const
static dtn::data::EID local
static void add(EventReceiver< E > *receiver)
void onUpdateBeacon(const ibrcommon::vinterface &iface, DiscoveryBeacon &announcement)
bool sameHost(const std::string &other) const
void setApplication(const dtn::data::Number &app)
dtn::data::Number getSeconds() const
static void gettimeofday(struct timeval *tv)
static void raise(const timeval &offset, const double &rating)
static void remove(const EventReceiver< E > *receiver)
static const dtn::data::block_t BLOCK_TYPE
dtn::net::ConnectionManager & getConnectionManager()
static void setOffset(const struct timeval &tv)
bool has(Node::Protocol proto) const
static double toDouble(const timespec &val)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void callbackBundleReceived(const Bundle &b)
void transmit(dtn::data::Bundle &bundle)
std::istream & operator>>(std::istream &stream, DTNTPWorker::TimeSyncMessage &obj)
bool hasReference() const
std::string toString() const
static double toDouble(const timeval &val)
float getSyncLevel() const
static dtn::data::Timestamp getTime()
void setLimit(const Number &hops)
std::list< URI > get(Node::Protocol proto) const
void fromString(const std::string &data)
static double getRating()
std::string getString() const
void relabel(bool zero_timestamp=false)
static void setRating(double val)
iterator find(block_t blocktype)
ibrcommon::BLOB::Reference getBLOB() const
static const TimeSyncState & getState()
std::ostream & operator<<(std::ostream &stream, const DTNTPWorker::TimeSyncMessage &obj)
bool sendDiscoveryBeacons() const
static dtn::data::Timestamp getMonotonicTimestamp()
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
dtn::data::SDNV< Size > Number
void raiseEvent(const dtn::core::TimeEvent &evt)
dtn::data::Number getMicroseconds() const
static std::vector< std::string > tokenize(const std::string &token, const std::string &data, const std::string::size_type max=std::string::npos)
void set(FLAGS flag, bool value)
struct timespec last_sync_time
static BundleCore & getInstance()
ibrcommon::find_iterator< const_iterator, block_t > const_find_iterator