00001 /* 00002 * AbstractWorker.cpp 00003 * 00004 * Created on: 30.10.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "core/AbstractWorker.h" 00009 #include "core/BundleCore.h" 00010 #include "routing/QueueBundleEvent.h" 00011 #include "core/BundleGeneratedEvent.h" 00012 #include <ibrcommon/thread/MutexLock.h> 00013 #include <ibrcommon/Logger.h> 00014 #include <typeinfo> 00015 00016 namespace dtn 00017 { 00018 namespace core 00019 { 00020 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker) 00021 : _worker(worker), _running(true) 00022 { 00023 bindEvent(dtn::routing::QueueBundleEvent::className); 00024 } 00025 00026 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync() 00027 { 00028 unbindEvent(dtn::routing::QueueBundleEvent::className); 00029 shutdown(); 00030 } 00031 00032 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) 00033 { 00034 try { 00035 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00036 00037 if (queued.bundle.destination == _worker._eid) 00038 { 00039 _receive_bundles.push(queued.bundle); 00040 } 00041 } catch (std::bad_cast ex) { 00042 00043 } 00044 } 00045 00046 void AbstractWorker::AbstractWorkerAsync::shutdown() 00047 { 00048 _running = false; 00049 _receive_bundles.abort(); 00050 00051 join(); 00052 } 00053 00054 void AbstractWorker::AbstractWorkerAsync::run() 00055 { 00056 BundleStorage &storage = BundleCore::getInstance().getStorage(); 00057 00058 try { 00059 while (_running) 00060 { 00061 dtn::data::Bundle b; 00062 00063 try { 00064 b = storage.get( _worker._eid ); 00065 storage.remove(b); 00066 _worker.callbackBundleReceived(b); 00067 } 00068 catch (dtn::core::BundleStorage::NoBundleFoundException ex) 00069 { 00070 dtn::data::BundleID id = _receive_bundles.getnpop(true); 00071 00072 try { 00073 _worker.callbackBundleReceived( storage.get( id ) ); 00074 storage.remove( id ); 00075 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) { }; 00076 } 00077 00078 yield(); 00079 } 00080 } catch (const ibrcommon::QueueUnblockedException&) { 00081 // queue was aborted by another call 00082 } 00083 } 00084 00085 bool AbstractWorker::AbstractWorkerAsync::__cancellation() 00086 { 00087 // cancel the main thread in here 00088 _receive_bundles.abort(); 00089 00090 // return true, to signal that no further cancel (the hardway) is needed 00091 return true; 00092 } 00093 00094 AbstractWorker::AbstractWorker() : _thread(*this) 00095 { 00096 }; 00097 00098 void AbstractWorker::initialize(const string uri, bool async) 00099 { 00100 _eid = BundleCore::local + uri; 00101 00102 try { 00103 if (async) _thread.start(); 00104 } catch (const ibrcommon::ThreadException &ex) { 00105 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00106 } 00107 } 00108 00109 AbstractWorker::~AbstractWorker() 00110 { 00111 shutdown(); 00112 }; 00113 00114 void AbstractWorker::shutdown() 00115 { 00116 // wait for the async thread 00117 _thread.shutdown(); 00118 } 00119 00120 const EID AbstractWorker::getWorkerURI() const 00121 { 00122 return _eid; 00123 } 00124 00125 void AbstractWorker::transmit(const Bundle &bundle) 00126 { 00127 dtn::core::BundleGeneratedEvent::raise(bundle); 00128 } 00129 } 00130 }
1.7.1