IBR-DTN  1.0.0
EventSwitch.cpp
Go to the documentation of this file.
1 /*
2  * EventSwitch.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "core/EventSwitch.h"
23 #include "core/EventReceiver.h"
24 #include "core/EventDispatcher.h"
25 
26 #include <ibrcommon/thread/MutexLock.h>
27 #include "core/GlobalEvent.h"
28 #include <ibrcommon/Logger.h>
29 #include <stdexcept>
30 #include <iostream>
31 #include <typeinfo>
32 #include <signal.h>
33 
34 namespace dtn
35 {
36  namespace core
37  {
38  EventSwitch::EventSwitch()
39  : _running(true), _shutdown(false), _wd(*this, _wlist), _inprogress(false)
40  {
41  }
42 
43  EventSwitch::~EventSwitch()
44  {
45  componentDown();
46  }
47 
48  void EventSwitch::componentUp() throw ()
49  {
50  // routine checked for throw() on 15.02.2013
51 
52  // clear all queues
53  _queue = std::queue<Task*>();
54  _prio_queue = std::queue<Task*>();
55  _low_queue = std::queue<Task*>();
56 
57  // reset component state
58  _running = true;
59  _shutdown = false;
60 
61  // reset aborted conditional
62  _queue_cond.reset();
63  }
64 
65  void EventSwitch::componentDown() throw ()
66  {
67  try {
68  ibrcommon::MutexLock l(_queue_cond);
69 
70  // stop receiving events
71  _shutdown = true;
72 
73  // wait until all queues are empty
74  while (!this->empty())
75  {
76  _queue_cond.wait();
77  }
78 
79  _queue_cond.abort();
80  } catch (const ibrcommon::Conditional::ConditionalAbortException&) {};
81  }
82 
83  bool EventSwitch::empty() const
84  {
85  return (_low_queue.empty() && _queue.empty() && _prio_queue.empty());
86  }
87 
88  void EventSwitch::process(ibrcommon::TimeMeasurement &tm, bool &inprogress, bool profiling)
89  {
90  EventSwitch::Task *t = NULL;
91 
92  // just look for an event to process
93  {
94  ibrcommon::MutexLock l(_queue_cond);
95  if (!_running) return;
96 
97  while (this->empty() && !_shutdown)
98  {
99  _queue_cond.wait();
100  }
101 
102  if (!_prio_queue.empty())
103  {
104  t = _prio_queue.front();
105  _prio_queue.pop();
106  }
107  else if (!_queue.empty())
108  {
109  t = _queue.front();
110  _queue.pop();
111  }
112  else if (!_low_queue.empty())
113  {
114  t = _low_queue.front();
115  _low_queue.pop();
116  }
117  else if (_shutdown)
118  {
119  // if all queues are empty and shutdown is requested
120  // set running mode to false
121  _running = false;
122 
123  // abort the conditional to release all blocking threads
124  _queue_cond.abort();
125 
126  return;
127  }
128 
129  _queue_cond.signal(true);
130  }
131 
132  if (t != NULL)
133  {
134  if (profiling) {
135  inprogress = true;
136  tm.start();
137  }
138  // execute the event
139  t->processor.process(t->event);
140 
141  if (profiling) {
142  tm.stop();
143  inprogress = false;
144  }
145 
146  // log the event
147  if (t->event->isLoggable())
148  {
149  if (profiling) {
150  IBRCOMMON_LOGGER_TAG(t->event->getName(), notice) << t->event->getMessage() << " (" << tm.getMilliseconds() << " ms)" << IBRCOMMON_LOGGER_ENDL;
151  } else {
152  IBRCOMMON_LOGGER_TAG(t->event->getName(), notice) << t->event->getMessage() << IBRCOMMON_LOGGER_ENDL;
153  }
154  }
155 
156  // delete the Task
157  delete t;
158  }
159  }
160 
161  bool EventSwitch::isStalled()
162  {
163  if (!_inprogress) return false;
164  _tm.stop();
165  return (_tm.getMilliseconds() > 5000);
166  }
167 
168  void EventSwitch::loop(size_t threads, bool profiling)
169  {
170  if (profiling) {
171  IBRCOMMON_LOGGER_TAG("EventSwitch", warning) << "Profiling and stalled event detection enabled" << IBRCOMMON_LOGGER_ENDL;
172  }
173 
174  for (size_t i = 0; i < threads; ++i)
175  {
176  Worker *w = new Worker(*this, profiling);
177  w->start();
178  _wlist.push_back(w);
179  }
180 
181  // bring up watchdog
182  if (profiling) _wd.up();
183 
184  try {
185  while (_running)
186  {
187  process(_tm, _inprogress, profiling);
188  }
189  } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
190 
191  // shut down watchdog
192  if (profiling) _wd.down();
193  _wd.join();
194 
195  for (std::list<Worker*>::iterator iter = _wlist.begin(); iter != _wlist.end(); ++iter)
196  {
197  Worker *w = (*iter);
198  w->stop();
199  w->join();
200  delete w;
201  }
202 
203  _wlist.clear();
204  }
205 
206  void EventSwitch::queue(EventProcessor &proc, Event *evt)
207  {
208  EventSwitch &s = EventSwitch::getInstance();
209 
210  ibrcommon::MutexLock l(s._queue_cond);
211 
212  // do not process any event if the system is going down
213  if (s._shutdown)
214  {
215  delete evt;
216  return;
217  }
218 
219  EventSwitch::Task *t = new EventSwitch::Task(proc, evt);
220 
221  if (evt->prio > 0)
222  {
223  s._prio_queue.push(t);
224  }
225  else if (evt->prio < 0)
226  {
227  s._low_queue.push(t);
228  }
229  else
230  {
231  s._queue.push(t);
232  }
233  s._queue_cond.signal();
234  }
235 
236  void EventSwitch::shutdown()
237  {
238  try {
239  ibrcommon::MutexLock l(_queue_cond);
240 
241  // stop receiving events
242  _shutdown = true;
243 
244  // signal all blocking thread to check _shutdown variable
245  _queue_cond.signal(true);
246  } catch (const ibrcommon::Conditional::ConditionalAbortException&) {};
247  }
248 
249  EventSwitch& EventSwitch::getInstance()
250  {
251  static EventSwitch instance;
252  return instance;
253  }
254 
255  const std::string EventSwitch::getName() const
256  {
257  return "EventSwitch";
258  }
259 
260  EventSwitch::Task::Task(EventProcessor &proc, dtn::core::Event *evt)
261  : processor(proc), event(evt)
262  {
263  }
264 
265  EventSwitch::Task::~Task()
266  {
267  if (event != NULL)
268  {
269  delete event;
270  }
271  }
272 
273  EventSwitch::Worker::Worker(EventSwitch &sw, bool profiling)
274  : _switch(sw), _running(true), _inprogress(false), _profiling(profiling)
275  {}
276 
277  EventSwitch::Worker::~Worker()
278  {}
279 
280  bool EventSwitch::Worker::isStalled()
281  {
282  if (!_inprogress) return false;
283  _tm.stop();
284  return (_tm.getMilliseconds() > 5000);
285  }
286 
287  void EventSwitch::Worker::run() throw ()
288  {
289  try {
290  while (_running)
291  _switch.process(_tm, _inprogress, _profiling);
292  } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
293  }
294 
295  void EventSwitch::Worker::__cancellation() throw ()
296  {
297  _running = false;
298  }
299 
300  EventSwitch::WatchDog::WatchDog(EventSwitch &sw, std::list<Worker*> &workers)
301  : _switch(sw), _workers(workers), _running(true)
302  {}
303 
304  EventSwitch::WatchDog::~WatchDog()
305  {}
306 
307  void EventSwitch::WatchDog::up()
308  {
309  // reset thread if necessary
310  if (JoinableThread::isFinalized())
311  {
312  JoinableThread::reset();
313  _running = true;
314  }
315 
316  JoinableThread::start();
317  }
318 
319  void EventSwitch::WatchDog::down()
320  {
321  JoinableThread::stop();
322  }
323 
324  void EventSwitch::WatchDog::run() throw ()
325  {
326  try {
327  ibrcommon::MutexLock l(_cond);
328  while (_running) {
329  try {
330  // wait a period
331  _cond.wait(10000);
332  } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
333  if (ex.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT) {
334  if (_switch.isStalled()) {
335  IBRCOMMON_LOGGER_TAG("EventSwitch", critical) << "stalled event detected" << IBRCOMMON_LOGGER_ENDL;
336  raise (SIGABRT);
337  }
338 
339  for (std::list<Worker*>::iterator iter = _workers.begin(); iter != _workers.end(); ++iter)
340  {
341  Worker *w = (*iter);
342 
343  if (w->isStalled()) {
344  IBRCOMMON_LOGGER_TAG("EventSwitch", critical) << "stalled event detected" << IBRCOMMON_LOGGER_ENDL;
345  raise (SIGABRT);
346  }
347  }
348 
349  } else throw;
350  }
351  }
352  } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
353  }
354 
355  void EventSwitch::WatchDog::__cancellation() throw ()
356  {
357  ibrcommon::MutexLock l(_cond);
358  _running = false;
359  _cond.abort();
360  }
361  }
362 }
363 
bool _shutdown
Definition: Main.cpp:71
const int prio
Definition: Event.h:60
bool _running
Definition: dtninbox.cpp:122