• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/core/AbstractWorker.cpp

Go to the documentation of this file.
00001 /*
00002  * AbstractWorker.cpp
00003  *
00004  *  Created on: 30.10.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "core/AbstractWorker.h"
00009 #include "core/BundleCore.h"
00010 #include "routing/QueueBundleEvent.h"
00011 #include "core/BundleGeneratedEvent.h"
00012 #include <ibrcommon/thread/MutexLock.h>
00013 #include <ibrcommon/Logger.h>
00014 #include <typeinfo>
00015 
00016 namespace dtn
00017 {
00018         namespace core
00019         {
00020                 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
00021                  : _worker(worker), _running(true)
00022                 {
00023                         bindEvent(dtn::routing::QueueBundleEvent::className);
00024                 }
00025 
00026                 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
00027                 {
00028                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00029                         shutdown();
00030                 }
00031 
00032                 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt)
00033                 {
00034                         try {
00035                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00036 
00037                                 if (queued.bundle.destination == _worker._eid)
00038                                 {
00039                                         _receive_bundles.push(queued.bundle);
00040                                 }
00041                         } catch (std::bad_cast ex) {
00042 
00043                         }
00044                 }
00045 
00046                 void AbstractWorker::AbstractWorkerAsync::shutdown()
00047                 {
00048                         _running = false;
00049                         _receive_bundles.abort();
00050 
00051                         join();
00052                 }
00053 
00054                 void AbstractWorker::AbstractWorkerAsync::run()
00055                 {
00056                         BundleStorage &storage = BundleCore::getInstance().getStorage();
00057 
00058                         try {
00059                                 while (_running)
00060                                 {
00061                                         dtn::data::Bundle b;
00062 
00063                                         try {
00064                                                 b = storage.get( _worker._eid );
00065                                                 storage.remove(b);
00066                                                 _worker.callbackBundleReceived(b);
00067                                         }
00068                                         catch (dtn::core::BundleStorage::NoBundleFoundException ex)
00069                                         {
00070                                                 dtn::data::BundleID id = _receive_bundles.getnpop(true);
00071 
00072                                                 try {
00073                                                         _worker.callbackBundleReceived( storage.get( id ) );
00074                                                         storage.remove( id );
00075                                                 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) { };
00076                                         }
00077 
00078                                         yield();
00079                                 }
00080                         } catch (const ibrcommon::QueueUnblockedException&) {
00081                                 // queue was aborted by another call
00082                         }
00083                 }
00084 
00085                 bool AbstractWorker::AbstractWorkerAsync::__cancellation()
00086                 {
00087                         // cancel the main thread in here
00088                         _receive_bundles.abort();
00089 
00090                         // return true, to signal that no further cancel (the hardway) is needed
00091                         return true;
00092                 }
00093 
00094                 AbstractWorker::AbstractWorker() : _thread(*this)
00095                 {
00096                 };
00097 
00098                 void AbstractWorker::initialize(const string uri, bool async)
00099                 {
00100                         _eid = BundleCore::local + uri;
00101 
00102                         try {
00103                                 if (async) _thread.start();
00104                         } catch (const ibrcommon::ThreadException &ex) {
00105                                 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00106                         }
00107                 }
00108 
00109                 AbstractWorker::~AbstractWorker()
00110                 {
00111                         shutdown();
00112                 };
00113 
00114                 void AbstractWorker::shutdown()
00115                 {
00116                         // wait for the async thread
00117                         _thread.shutdown();
00118                 }
00119 
00120                 const EID AbstractWorker::getWorkerURI() const
00121                 {
00122                         return _eid;
00123                 }
00124 
00125                 void AbstractWorker::transmit(const Bundle &bundle)
00126                 {
00127                         dtn::core::BundleGeneratedEvent::raise(bundle);
00128                 }
00129         }
00130 }

Generated on Thu Nov 11 2010 09:49:46 for IBR-DTNSuite by  doxygen 1.7.1