IBR-DTNSuite 0.6

daemon/src/core/AbstractWorker.cpp

Go to the documentation of this file.
00001 /*
00002  * AbstractWorker.cpp
00003  *
00004  *  Created on: 30.10.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "core/AbstractWorker.h"
00010 #include "core/BundleCore.h"
00011 #include "routing/QueueBundleEvent.h"
00012 #include "core/BundleGeneratedEvent.h"
00013 #include "core/BundleEvent.h"
00014 #ifdef WITH_BUNDLE_SECURITY
00015 #include "security/SecurityManager.h"
00016 #endif
00017 #ifdef WITH_COMPRESSION
00018 #include <ibrdtn/data/CompressedPayloadBlock.h>
00019 #endif
00020 #include <ibrcommon/thread/MutexLock.h>
00021 #include <ibrcommon/Logger.h>
00022 #include <typeinfo>
00023 
00024 namespace dtn
00025 {
00026         namespace core
00027         {
00028                 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
00029                  : _worker(worker), _running(true)
00030                 {
00031                         bindEvent(dtn::routing::QueueBundleEvent::className);
00032                 }
00033 
00034                 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
00035                 {
00036                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00037                         shutdown();
00038                 }
00039 
00040                 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt)
00041                 {
00042                         try {
00043                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00044 
00045                                 if (queued.bundle.destination == _worker._eid)
00046                                 {
00047                                         _receive_bundles.push(queued.bundle);
00048                                 }
00049                         } catch (const std::bad_cast&) { }
00050                 }
00051 
00052                 void AbstractWorker::AbstractWorkerAsync::shutdown()
00053                 {
00054                         _running = false;
00055                         _receive_bundles.abort();
00056 
00057                         join();
00058                 }
00059 
00060                 void AbstractWorker::AbstractWorkerAsync::run()
00061                 {
00062                         BundleStorage &storage = BundleCore::getInstance().getStorage();
00063 
00064                         try {
00065                                 while (_running)
00066                                 {
00067                                         dtn::data::BundleID id = _receive_bundles.getnpop(true);
00068 
00069                                         try {
00070                                                 dtn::data::Bundle b = storage.get( id );
00071                                                 prepareBundle(b);
00072                                                 _worker.callbackBundleReceived( b );
00073 
00074                                                 // raise bundle event
00075                                                 dtn::core::BundleEvent::raise(b, BUNDLE_DELIVERED);
00076 
00077                                                 // remove the bundle from the storage
00078                                                 storage.remove( id );
00079                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00080 
00081                                         yield();
00082                                 }
00083                         } catch (const ibrcommon::QueueUnblockedException&) {
00084                                 // queue was aborted by another call
00085                         }
00086                 }
00087 
00088                 bool AbstractWorker::AbstractWorkerAsync::__cancellation()
00089                 {
00090                         // cancel the main thread in here
00091                         _receive_bundles.abort();
00092 
00093                         // return true, to signal that no further cancel (the hardway) is needed
00094                         return true;
00095                 }
00096 
00097                 void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const
00098                 {
00099                         // process the bundle block (security, compression, ...)
00100                         dtn::core::BundleCore::processBlocks(bundle);
00101                 }
00102 
00103                 AbstractWorker::AbstractWorker() : _thread(*this)
00104                 {
00105                 };
00106 
00107                 void AbstractWorker::initialize(const string uri, bool async)
00108                 {
00109                         _eid = BundleCore::local + uri;
00110 
00111                         try {
00112                                 if (async) _thread.start();
00113                         } catch (const ibrcommon::ThreadException &ex) {
00114                                 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00115                         }
00116                 }
00117 
00118                 AbstractWorker::~AbstractWorker()
00119                 {
00120                         shutdown();
00121                 };
00122 
00123                 void AbstractWorker::shutdown()
00124                 {
00125                         // wait for the async thread
00126                         _thread.shutdown();
00127                 }
00128 
00129                 const EID AbstractWorker::getWorkerURI() const
00130                 {
00131                         return _eid;
00132                 }
00133 
00134                 void AbstractWorker::transmit(const Bundle &bundle)
00135                 {
00136                         dtn::core::BundleGeneratedEvent::raise(bundle);
00137                 }
00138         }
00139 }