• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/core/AbstractWorker.cpp

Go to the documentation of this file.
00001 /*
00002  * AbstractWorker.cpp
00003  *
00004  *  Created on: 30.10.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "core/AbstractWorker.h"
00010 #include "core/BundleCore.h"
00011 #include "routing/QueueBundleEvent.h"
00012 #include "core/BundleGeneratedEvent.h"
00013 #ifdef WITH_BUNDLE_SECURITY
00014 #include "security/SecurityManager.h"
00015 #endif
00016 #include <ibrcommon/thread/MutexLock.h>
00017 #include <ibrcommon/Logger.h>
00018 #include <typeinfo>
00019 
00020 namespace dtn
00021 {
00022         namespace core
00023         {
00024                 AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
00025                  : _worker(worker), _running(true)
00026                 {
00027                         bindEvent(dtn::routing::QueueBundleEvent::className);
00028                 }
00029 
00030                 AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
00031                 {
00032                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00033                         shutdown();
00034                 }
00035 
00036                 void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt)
00037                 {
00038                         try {
00039                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00040 
00041                                 if (queued.bundle.destination == _worker._eid)
00042                                 {
00043                                         _receive_bundles.push(queued.bundle);
00044                                 }
00045                         } catch (const std::bad_cast&) { }
00046                 }
00047 
00048                 void AbstractWorker::AbstractWorkerAsync::shutdown()
00049                 {
00050                         _running = false;
00051                         _receive_bundles.abort();
00052 
00053                         join();
00054                 }
00055 
00056                 void AbstractWorker::AbstractWorkerAsync::run()
00057                 {
00058                         BundleStorage &storage = BundleCore::getInstance().getStorage();
00059 
00060                         try {
00061                                 while (_running)
00062                                 {
00063                                         dtn::data::BundleID id = _receive_bundles.getnpop(true);
00064 
00065                                         try {
00066                                                 dtn::data::Bundle b = storage.get( id );
00067                                                 prepareBundle(b);
00068                                                 _worker.callbackBundleReceived( b );
00069                                                 storage.remove( id );
00070                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00071 
00072                                         yield();
00073                                 }
00074                         } catch (const ibrcommon::QueueUnblockedException&) {
00075                                 // queue was aborted by another call
00076                         }
00077                 }
00078 
00079                 bool AbstractWorker::AbstractWorkerAsync::__cancellation()
00080                 {
00081                         // cancel the main thread in here
00082                         _receive_bundles.abort();
00083 
00084                         // return true, to signal that no further cancel (the hardway) is needed
00085                         return true;
00086                 }
00087 
00088                 void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const
00089                 {
00090 #ifdef WITH_BUNDLE_SECURITY
00091                         // try to decrypt the bundle
00092                         try {
00093                                 dtn::security::SecurityManager::getInstance().decrypt(bundle);
00094                         } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00095                                 // decrypt needed, but no key is available
00096                                 IBRCOMMON_LOGGER(warning) << "No key available for decrypt bundle." << IBRCOMMON_LOGGER_ENDL;
00097                         } catch (const dtn::security::SecurityManager::DecryptException &ex) {
00098                                 // decrypt failed
00099                                 IBRCOMMON_LOGGER(warning) << "Decryption of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00100                         }
00101 #endif
00102                 }
00103 
00104                 AbstractWorker::AbstractWorker() : _thread(*this)
00105                 {
00106                 };
00107 
00108                 void AbstractWorker::initialize(const string uri, bool async)
00109                 {
00110                         _eid = BundleCore::local + uri;
00111 
00112                         try {
00113                                 if (async) _thread.start();
00114                         } catch (const ibrcommon::ThreadException &ex) {
00115                                 IBRCOMMON_LOGGER(error) << "failed to start thread in AbstractWorker\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00116                         }
00117                 }
00118 
00119                 AbstractWorker::~AbstractWorker()
00120                 {
00121                         shutdown();
00122                 };
00123 
00124                 void AbstractWorker::shutdown()
00125                 {
00126                         // wait for the async thread
00127                         _thread.shutdown();
00128                 }
00129 
00130                 const EID AbstractWorker::getWorkerURI() const
00131                 {
00132                         return _eid;
00133                 }
00134 
00135                 void AbstractWorker::transmit(const Bundle &bundle)
00136                 {
00137                         dtn::core::BundleGeneratedEvent::raise(bundle);
00138                 }
00139         }
00140 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1