26 #include <ibrcommon/thread/MutexLock.h>
28 #include <ibrcommon/Logger.h>
38 EventSwitch::EventSwitch()
43 EventSwitch::~EventSwitch()
48 void EventSwitch::componentUp() throw ()
53 _queue = std::queue<Task*>();
54 _prio_queue = std::queue<Task*>();
55 _low_queue = std::queue<Task*>();
65 void EventSwitch::componentDown() throw ()
68 ibrcommon::MutexLock l(_queue_cond);
74 while (!this->empty())
80 }
catch (
const ibrcommon::Conditional::ConditionalAbortException&) {};
83 bool EventSwitch::empty()
const
85 return (_low_queue.empty() && _queue.empty() && _prio_queue.empty());
88 void EventSwitch::process(ibrcommon::TimeMeasurement &tm,
bool &inprogress,
bool profiling)
90 EventSwitch::Task *t = NULL;
94 ibrcommon::MutexLock l(_queue_cond);
102 if (!_prio_queue.empty())
104 t = _prio_queue.front();
107 else if (!_queue.empty())
112 else if (!_low_queue.empty())
114 t = _low_queue.front();
129 _queue_cond.signal(
true);
139 t->processor.process(t->event);
147 if (t->event->isLoggable())
150 IBRCOMMON_LOGGER_TAG(t->event->getName(), notice) << t->event->getMessage() <<
" (" << tm.getMilliseconds() <<
" ms)" << IBRCOMMON_LOGGER_ENDL;
152 IBRCOMMON_LOGGER_TAG(t->event->getName(), notice) << t->event->getMessage() << IBRCOMMON_LOGGER_ENDL;
161 bool EventSwitch::isStalled()
163 if (!_inprogress)
return false;
165 return (_tm.getMilliseconds() > 5000);
168 void EventSwitch::loop(
size_t threads,
bool profiling)
171 IBRCOMMON_LOGGER_TAG(
"EventSwitch", warning) <<
"Profiling and stalled event detection enabled" << IBRCOMMON_LOGGER_ENDL;
174 for (
size_t i = 0; i < threads; ++i)
176 Worker *w =
new Worker(*
this, profiling);
182 if (profiling) _wd.up();
187 process(_tm, _inprogress, profiling);
189 }
catch (
const ibrcommon::Conditional::ConditionalAbortException&) { };
192 if (profiling) _wd.down();
195 for (std::list<Worker*>::iterator iter = _wlist.begin(); iter != _wlist.end(); ++iter)
210 ibrcommon::MutexLock l(s._queue_cond);
219 EventSwitch::Task *t =
new EventSwitch::Task(proc, evt);
223 s._prio_queue.push(t);
225 else if (evt->
prio < 0)
227 s._low_queue.push(t);
233 s._queue_cond.signal();
236 void EventSwitch::shutdown()
239 ibrcommon::MutexLock l(_queue_cond);
245 _queue_cond.signal(
true);
246 }
catch (
const ibrcommon::Conditional::ConditionalAbortException&) {};
255 const std::string EventSwitch::getName()
const
257 return "EventSwitch";
261 : processor(proc), event(evt)
265 EventSwitch::Task::~Task()
273 EventSwitch::Worker::Worker(EventSwitch &sw,
bool profiling)
274 : _switch(sw),
_running(true), _inprogress(false), _profiling(profiling)
277 EventSwitch::Worker::~Worker()
280 bool EventSwitch::Worker::isStalled()
282 if (!_inprogress)
return false;
284 return (_tm.getMilliseconds() > 5000);
287 void EventSwitch::Worker::run() throw ()
291 _switch.process(_tm, _inprogress, _profiling);
292 }
catch (
const ibrcommon::Conditional::ConditionalAbortException&) { };
295 void EventSwitch::Worker::__cancellation() throw ()
300 EventSwitch::WatchDog::WatchDog(EventSwitch &sw, std::list<Worker*> &workers)
301 : _switch(sw), _workers(workers),
_running(true)
304 EventSwitch::WatchDog::~WatchDog()
307 void EventSwitch::WatchDog::up()
310 if (JoinableThread::isFinalized())
312 JoinableThread::reset();
316 JoinableThread::start();
319 void EventSwitch::WatchDog::down()
321 JoinableThread::stop();
324 void EventSwitch::WatchDog::run() throw ()
327 ibrcommon::MutexLock l(_cond);
332 }
catch (
const ibrcommon::Conditional::ConditionalAbortException &ex) {
333 if (ex.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT) {
334 if (_switch.isStalled()) {
335 IBRCOMMON_LOGGER_TAG(
"EventSwitch", critical) <<
"stalled event detected" << IBRCOMMON_LOGGER_ENDL;
339 for (std::list<Worker*>::iterator iter = _workers.begin(); iter != _workers.end(); ++iter)
343 if (w->isStalled()) {
344 IBRCOMMON_LOGGER_TAG(
"EventSwitch", critical) <<
"stalled event detected" << IBRCOMMON_LOGGER_ENDL;
352 }
catch (
const ibrcommon::Conditional::ConditionalAbortException&) { };
355 void EventSwitch::WatchDog::__cancellation() throw ()
357 ibrcommon::MutexLock l(_cond);