00001 /* 00002 * AbstractWorker.cpp 00003 * 00004 * Created on: 30.10.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "core/AbstractWorker.h" 00009 #include "core/BundleCore.h" 00010 #include "routing/QueueBundleEvent.h" 00011 #include "core/BundleGeneratedEvent.h" 00012 #include "ibrcommon/thread/MutexLock.h" 00013 #include <typeinfo> 00014 00015 namespace dtn 00016 { 00017 namespace core 00018 { 00019 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker) 00020 : _worker(worker), _running(true) 00021 { 00022 bindEvent(dtn::routing::QueueBundleEvent::className); 00023 } 00024 00025 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync() 00026 { 00027 unbindEvent(dtn::routing::QueueBundleEvent::className); 00028 shutdown(); 00029 } 00030 00031 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) 00032 { 00033 try { 00034 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00035 00036 if (queued.bundle.destination == _worker._eid) 00037 { 00038 ibrcommon::MutexLock l(_receive_cond); 00039 _receive_bundles.push(queued.bundle); 00040 _receive_cond.signal(true); 00041 } 00042 } catch (std::bad_cast ex) { 00043 00044 } 00045 } 00046 00047 void AbstractWorker::AbstractWorkerAsync::shutdown() 00048 { 00049 { 00050 ibrcommon::MutexLock l(_receive_cond); 00051 _running = false; 00052 _receive_cond.signal(true); 00053 } 00054 00055 join(); 00056 } 00057 00058 void AbstractWorker::AbstractWorkerAsync::run() 00059 { 00060 BundleStorage &storage = BundleCore::getInstance().getStorage(); 00061 00062 while (_running) 00063 { 00064 dtn::data::Bundle b; 00065 00066 try { 00067 b = storage.get( _worker._eid ); 00068 storage.remove(b); 00069 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) { 00070 { 00071 ibrcommon::MutexLock l(_receive_cond); 00072 while (_receive_bundles.empty()) 00073 { 00074 if (!_running) return; 00075 _receive_cond.wait(); 00076 } 00077 00078 try { 00079 b = storage.get( _receive_bundles.front() ); 00080 storage.remove(b); 00081 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) { 00082 00083 } 00084 00085 _receive_bundles.pop(); 00086 } 00087 } 00088 _worker.callbackBundleReceived(b); 00089 } 00090 } 00091 00092 AbstractWorker::AbstractWorker() : _thread(*this) 00093 { 00094 }; 00095 00096 void AbstractWorker::initialize(const string uri, bool async) 00097 { 00098 _eid = BundleCore::local + uri; 00099 if (async) _thread.start(); 00100 } 00101 00102 AbstractWorker::~AbstractWorker() 00103 { 00104 shutdown(); 00105 }; 00106 00107 void AbstractWorker::shutdown() 00108 { 00109 // wait for the async thread 00110 _thread.shutdown(); 00111 } 00112 00113 const EID AbstractWorker::getWorkerURI() const 00114 { 00115 return _eid; 00116 } 00117 00118 void AbstractWorker::transmit(const Bundle &bundle) 00119 { 00120 dtn::core::BundleGeneratedEvent::raise(bundle); 00121 } 00122 } 00123 }
1.6.3