00001 /* 00002 * AbstractWorker.cpp 00003 * 00004 * Created on: 30.10.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "core/AbstractWorker.h" 00009 #include "core/BundleCore.h" 00010 #include "routing/QueueBundleEvent.h" 00011 #include "core/BundleGeneratedEvent.h" 00012 #include "ibrcommon/thread/MutexLock.h" 00013 #include <typeinfo> 00014 00015 namespace dtn 00016 { 00017 namespace core 00018 { 00019 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker) 00020 : _worker(worker), _running(true) 00021 { 00022 bindEvent(dtn::routing::QueueBundleEvent::className); 00023 } 00024 00025 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync() 00026 { 00027 unbindEvent(dtn::routing::QueueBundleEvent::className); 00028 shutdown(); 00029 } 00030 00031 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) 00032 { 00033 try { 00034 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00035 00036 if (queued.bundle.destination == _worker._eid) 00037 { 00038 ibrcommon::MutexLock l(_receive_cond); 00039 _receive_bundles.push(queued.bundle); 00040 _receive_cond.signal(true); 00041 } 00042 } catch (std::bad_cast ex) { 00043 00044 } 00045 } 00046 00047 void AbstractWorker::AbstractWorkerAsync::shutdown() 00048 { 00049 { 00050 ibrcommon::MutexLock l(_receive_cond); 00051 _running = false; 00052 _receive_cond.signal(true); 00053 } 00054 00055 join(); 00056 } 00057 00058 void AbstractWorker::AbstractWorkerAsync::run() 00059 { 00060 BundleStorage &storage = BundleCore::getInstance().getStorage(); 00061 dtn::data::Bundle b; 00062 00063 while (_running) 00064 { 00065 try { 00066 b = storage.get( _worker._eid ); 00067 storage.remove(b); 00068 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) { 00069 { 00070 ibrcommon::MutexLock l(_receive_cond); 00071 while (_receive_bundles.empty()) 00072 { 00073 if (!_running) return; 00074 _receive_cond.wait(); 00075 } 00076 00077 try { 00078 b = storage.get( _receive_bundles.front() ); 00079 storage.remove(b); 00080 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) { 00081 00082 } 00083 00084 _receive_bundles.pop(); 00085 } 00086 } 00087 _worker.callbackBundleReceived(b); 00088 } 00089 } 00090 00091 AbstractWorker::AbstractWorker() : _thread(*this) 00092 { 00093 }; 00094 00095 void AbstractWorker::initialize(const string uri, bool async) 00096 { 00097 _eid = BundleCore::local + uri; 00098 if (async) _thread.start(); 00099 } 00100 00101 AbstractWorker::~AbstractWorker() 00102 { 00103 shutdown(); 00104 }; 00105 00106 void AbstractWorker::shutdown() 00107 { 00108 // wait for the async thread 00109 _thread.shutdown(); 00110 } 00111 00112 const EID AbstractWorker::getWorkerURI() const 00113 { 00114 return _eid; 00115 } 00116 00117 void AbstractWorker::transmit(const Bundle &bundle) 00118 { 00119 dtn::core::BundleGeneratedEvent::raise(bundle); 00120 } 00121 } 00122 }
1.5.6