00001 /* 00002 * AbstractWorker.cpp 00003 * 00004 * Created on: 30.10.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "core/AbstractWorker.h" 00010 #include "core/BundleCore.h" 00011 #include "routing/QueueBundleEvent.h" 00012 #include "core/BundleGeneratedEvent.h" 00013 #ifdef WITH_BUNDLE_SECURITY 00014 #include "security/SecurityManager.h" 00015 #endif 00016 #include <ibrcommon/thread/MutexLock.h> 00017 #include <ibrcommon/Logger.h> 00018 #include <typeinfo> 00019 00020 namespace dtn 00021 { 00022 namespace core 00023 { 00024 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker) 00025 : _worker(worker), _running(true) 00026 { 00027 bindEvent(dtn::routing::QueueBundleEvent::className); 00028 } 00029 00030 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync() 00031 { 00032 unbindEvent(dtn::routing::QueueBundleEvent::className); 00033 shutdown(); 00034 } 00035 00036 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) 00037 { 00038 try { 00039 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00040 00041 if (queued.bundle.destination == _worker._eid) 00042 { 00043 _receive_bundles.push(queued.bundle); 00044 } 00045 } catch (const std::bad_cast&) { } 00046 } 00047 00048 void AbstractWorker::AbstractWorkerAsync::shutdown() 00049 { 00050 _running = false; 00051 _receive_bundles.abort(); 00052 00053 join(); 00054 } 00055 00056 void AbstractWorker::AbstractWorkerAsync::run() 00057 { 00058 BundleStorage &storage = BundleCore::getInstance().getStorage(); 00059 00060 try { 00061 while (_running) 00062 { 00063 dtn::data::BundleID id = _receive_bundles.getnpop(true); 00064 00065 try { 00066 dtn::data::Bundle b = storage.get( id ); 00067 prepareBundle(b); 00068 _worker.callbackBundleReceived( b ); 00069 storage.remove( id ); 00070 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00071 00072 yield(); 00073 } 00074 } catch (const ibrcommon::QueueUnblockedException&) { 00075 // queue was aborted by another call 00076 } 00077 } 00078 00079 bool AbstractWorker::AbstractWorkerAsync::__cancellation() 00080 { 00081 // cancel the main thread in here 00082 _receive_bundles.abort(); 00083 00084 // return true, to signal that no further cancel (the hardway) is needed 00085 return true; 00086 } 00087 00088 void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const 00089 { 00090 #ifdef WITH_BUNDLE_SECURITY 00091 // try to decrypt the bundle 00092 try { 00093 dtn::security::SecurityManager::getInstance().decrypt(bundle); 00094 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00095 // decrypt needed, but no key is available 00096 IBRCOMMON_LOGGER(warning) << "No key available for decrypt bundle." << IBRCOMMON_LOGGER_ENDL; 00097 } catch (const dtn::security::SecurityManager::DecryptException &ex) { 00098 // decrypt failed 00099 IBRCOMMON_LOGGER(warning) << "Decryption of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00100 } 00101 #endif 00102 } 00103 00104 AbstractWorker::AbstractWorker() : _thread(*this) 00105 { 00106 }; 00107 00108 void AbstractWorker::initialize(const string uri, bool async) 00109 { 00110 _eid = BundleCore::local + uri; 00111 00112 try { 00113 if (async) _thread.start(); 00114 } catch (const ibrcommon::ThreadException &ex) { 00115 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00116 } 00117 } 00118 00119 AbstractWorker::~AbstractWorker() 00120 { 00121 shutdown(); 00122 }; 00123 00124 void AbstractWorker::shutdown() 00125 { 00126 // wait for the async thread 00127 _thread.shutdown(); 00128 } 00129 00130 const EID AbstractWorker::getWorkerURI() const 00131 { 00132 return _eid; 00133 } 00134 00135 void AbstractWorker::transmit(const Bundle &bundle) 00136 { 00137 dtn::core::BundleGeneratedEvent::raise(bundle); 00138 } 00139 } 00140 }
1.7.1