|
IBR-DTNSuite 0.6
|
00001 /* 00002 * AbstractWorker.cpp 00003 * 00004 * Created on: 30.10.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "core/AbstractWorker.h" 00010 #include "core/BundleCore.h" 00011 #include "routing/QueueBundleEvent.h" 00012 #include "core/BundleGeneratedEvent.h" 00013 #include "core/BundleEvent.h" 00014 #ifdef WITH_BUNDLE_SECURITY 00015 #include "security/SecurityManager.h" 00016 #endif 00017 #ifdef WITH_COMPRESSION 00018 #include <ibrdtn/data/CompressedPayloadBlock.h> 00019 #endif 00020 #include <ibrcommon/thread/MutexLock.h> 00021 #include <ibrcommon/Logger.h> 00022 #include <typeinfo> 00023 00024 namespace dtn 00025 { 00026 namespace core 00027 { 00028 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker) 00029 : _worker(worker), _running(true) 00030 { 00031 bindEvent(dtn::routing::QueueBundleEvent::className); 00032 } 00033 00034 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync() 00035 { 00036 unbindEvent(dtn::routing::QueueBundleEvent::className); 00037 shutdown(); 00038 } 00039 00040 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) 00041 { 00042 try { 00043 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00044 00045 if (queued.bundle.destination == _worker._eid) 00046 { 00047 _receive_bundles.push(queued.bundle); 00048 } 00049 } catch (const std::bad_cast&) { } 00050 } 00051 00052 void AbstractWorker::AbstractWorkerAsync::shutdown() 00053 { 00054 _running = false; 00055 _receive_bundles.abort(); 00056 00057 join(); 00058 } 00059 00060 void AbstractWorker::AbstractWorkerAsync::run() 00061 { 00062 BundleStorage &storage = BundleCore::getInstance().getStorage(); 00063 00064 try { 00065 while (_running) 00066 { 00067 dtn::data::BundleID id = _receive_bundles.getnpop(true); 00068 00069 try { 00070 dtn::data::Bundle b = storage.get( id ); 00071 prepareBundle(b); 00072 _worker.callbackBundleReceived( b ); 00073 00074 // raise bundle event 00075 dtn::core::BundleEvent::raise(b, BUNDLE_DELIVERED); 00076 00077 // remove the bundle from the storage 00078 storage.remove( id ); 00079 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00080 00081 yield(); 00082 } 00083 } catch (const ibrcommon::QueueUnblockedException&) { 00084 // queue was aborted by another call 00085 } 00086 } 00087 00088 bool AbstractWorker::AbstractWorkerAsync::__cancellation() 00089 { 00090 // cancel the main thread in here 00091 _receive_bundles.abort(); 00092 00093 // return true, to signal that no further cancel (the hardway) is needed 00094 return true; 00095 } 00096 00097 void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const 00098 { 00099 // process the bundle block (security, compression, ...) 00100 dtn::core::BundleCore::processBlocks(bundle); 00101 } 00102 00103 AbstractWorker::AbstractWorker() : _thread(*this) 00104 { 00105 }; 00106 00107 void AbstractWorker::initialize(const string uri, bool async) 00108 { 00109 _eid = BundleCore::local + uri; 00110 00111 try { 00112 if (async) _thread.start(); 00113 } catch (const ibrcommon::ThreadException &ex) { 00114 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00115 } 00116 } 00117 00118 AbstractWorker::~AbstractWorker() 00119 { 00120 shutdown(); 00121 }; 00122 00123 void AbstractWorker::shutdown() 00124 { 00125 // wait for the async thread 00126 _thread.shutdown(); 00127 } 00128 00129 const EID AbstractWorker::getWorkerURI() const 00130 { 00131 return _eid; 00132 } 00133 00134 void AbstractWorker::transmit(const Bundle &bundle) 00135 { 00136 dtn::core::BundleGeneratedEvent::raise(bundle); 00137 } 00138 } 00139 }