IBR-DTN  1.0.0
AbstractWorker.cpp
Go to the documentation of this file.
1 /*
2  * AbstractWorker.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "core/EventDispatcher.h"
24 #include "core/AbstractWorker.h"
25 #include "core/BundleCore.h"
26 #include "core/BundleEvent.h"
27 #include "core/BundlePurgeEvent.h"
28 #include <ibrcommon/thread/MutexLock.h>
29 #include <ibrcommon/Logger.h>
30 #include <typeinfo>
31 
32 namespace dtn
33 {
34  namespace core
35  {
36  AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
37  : _worker(worker), _running(true)
38  {
40  }
41 
42  AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
43  {
45  shutdown();
46  }
47 
48  void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::routing::QueueBundleEvent &queued) throw ()
49  {
50  // ignore fragments - we can not deliver them directly to the client
51  if (queued.bundle.isFragment()) return;
52 
53  // check for bundle destination
54  if (queued.bundle.destination == _worker._eid)
55  {
56  _receive_bundles.push(queued.bundle);
57  return;
58  }
59 
60  // if the bundle is a singleton, stop here
62 
63  // check for subscribed groups
64  if (_worker._groups.find(queued.bundle.destination) != _worker._groups.end())
65  {
66  _receive_bundles.push(queued.bundle);
67  return;
68  }
69  }
70 
71  void AbstractWorker::AbstractWorkerAsync::initialize()
72  {
73  // reset thread if necessary
74  if (JoinableThread::isFinalized())
75  {
76  JoinableThread::reset();
77  _running = true;
78  _receive_bundles.reset();
79  }
80 
81  JoinableThread::start();
82  }
83 
84  void AbstractWorker::AbstractWorkerAsync::shutdown()
85  {
86  _running = false;
87  _receive_bundles.abort();
88 
89  join();
90  }
91 
92  void AbstractWorker::AbstractWorkerAsync::run() throw ()
93  {
95 
96  try {
97  while (_running)
98  {
99  dtn::data::BundleID id = _receive_bundles.poll();
100 
101  try {
102  dtn::data::Bundle b = storage.get( id );
103 
104  try {
105  // process the bundle block (security, compression, ...)
107  } catch (const ibrcommon::Exception &ex) {
108  // delete the bundle from the storage
109  storage.remove(id);
110  continue;
111  }
112 
113  // forward the bundle to the application
114  _worker.callbackBundleReceived( b );
115 
116  // create meta bundle for further processing
118 
119  // raise bundle event
121 
123  {
124  // remove the bundle from the storage
126  }
127  } catch (const ibrcommon::Exception &ex) {
128  IBRCOMMON_LOGGER_DEBUG_TAG("AbstractWorker", 15) << ex.what() << IBRCOMMON_LOGGER_ENDL;
129  };
130 
131  yield();
132  }
133  } catch (const ibrcommon::QueueUnblockedException&) {
134  // queue was aborted by another call
135  }
136  }
137 
138  void AbstractWorker::AbstractWorkerAsync::__cancellation() throw ()
139  {
140  // cancel the main thread in here
141  _receive_bundles.abort();
142  }
143 
144  AbstractWorker::AbstractWorker() : _eid(BundleCore::local), _thread(*this)
145  {
146  }
147 
149  {
150  _groups.insert(endpoint);
151  }
152 
154  {
155  _groups.erase(endpoint);
156  }
157 
158  void AbstractWorker::initialize(const std::string &uri)
159  {
160  _eid.setApplication(uri);
161 
162  try {
163  _thread.initialize();
164  } catch (const ibrcommon::ThreadException &ex) {
165  IBRCOMMON_LOGGER_TAG("AbstractWorker", error) << "initialize failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
166  }
167  }
168 
170  {
171  shutdown();
172  }
173 
175  {
176  // wait for the async thread
177  _thread.shutdown();
178  }
179 
181  {
182  return _eid;
183  }
184 
186  {
188  }
189  }
190 }
static void inject(const dtn::data::EID &source, dtn::data::Bundle &bundle)
Definition: BundleCore.cpp:706
static dtn::data::EID local
Definition: BundleCore.h:79
virtual const EID getWorkerURI() const
static void add(EventReceiver< E > *receiver)
void subscribe(const dtn::data::EID &endpoint)
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
static void remove(const EventReceiver< E > *receiver)
const dtn::data::MetaBundle bundle
void unsubscribe(const dtn::data::EID &endpoint)
virtual void remove(const dtn::data::BundleID &id)=0
bool get(FLAGS flag) const
void transmit(dtn::data::Bundle &bundle)
bool _running
Definition: dtninbox.cpp:122
bool isFragment() const
Definition: MetaBundle.cpp:165
dtn::data::EID destination
Definition: MetaBundle.h:60
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
void initialize(const std::string &uri)
static void processBlocks(dtn::data::Bundle &b)
Definition: BundleCore.cpp:641
Bitset< dtn::data::PrimaryBlock::FLAGS > procflags
Definition: MetaBundle.h:64
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
Definition: BundleEvent.cpp:78
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
static void raise(const dtn::data::MetaBundle &meta, REASON_CODE reason=DELIVERED)
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
static BundleCore & getInstance()
Definition: BundleCore.cpp:82