Wiselib
wiselib.testing/algorithms/cluster/moca/moca.h
Go to the documentation of this file.
00001 #ifndef _MOCA_MocaCore_H
00002 #define  _MOCA_MocaCore_H
00003 
00004 #include "algorithms/cluster/clustering_types.h"
00005 #include "util/base_classes/clustering_base.h"
00006 
00007 #undef DEBUG
00008 //Uncomment to enable debug
00009 #define DEBUG
00010 
00011 namespace wiselib {
00012 
00020     template<typename OsModel_P,
00021     typename HeadDecision_P,
00022     typename JoinDecision_P,
00023     typename Iterator_P>
00024     class MocaCore
00025     : public ClusteringBase <OsModel_P> {
00026     public:
00027         //TYPEDEFS
00028         typedef OsModel_P OsModel;
00029         // os modules
00030         typedef typename OsModel::Radio Radio;
00031         typedef typename OsModel::Timer Timer;
00032         typedef typename OsModel::Debug Debug;
00033         typedef typename OsModel::Rand Rand;
00034         //algorithm modules
00035         typedef HeadDecision_P HeadDecision_t;
00036         typedef JoinDecision_P JoinDecision_t;
00037         typedef Iterator_P Iterator_t;
00038         // self t
00039         typedef MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P > self_t;
00040 
00041         // data types
00042         typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues
00043         typedef typename Radio::node_id_t node_id_t;
00044         typedef typename Radio::size_t size_t;
00045         typedef typename Radio::block_data_t block_data_t;
00046         typedef node_id_t cluster_id_t;
00047 
00048         // delegate
00049         typedef delegate1<void, int> cluster_delegate_t;
00050 
00051         /*
00052          * Constructor
00053          * */
00054         MocaCore() :
00055         probability_(30),
00056         maxhops_(3) {
00057         }
00058 
00059         /*
00060          * Destructor
00061          * */
00062         ~MocaCore() {
00063         }
00064 
00065         /*
00066          * INIT
00067          * initializes the values of radio timer and debug
00068          */
00069         void init(Radio& radio, Timer& timer, Debug& debug) {
00070             radio_ = &radio;
00071             timer_ = &timer;
00072             debug_ = &debug;
00073         }
00074 
00075 
00076         /* set functions */
00077 
00078         /* SET the clustering modules */
00079 
00080         // Set the iterator Module
00081 
00082         void set_iterator(Iterator_t &it) {
00083             it_ = &it;
00084         }
00085 
00086         // Set the join_decision Module
00087 
00088         void set_join_decision(JoinDecision_t &jd) {
00089             jd_ = &jd;
00090         }
00091 
00092         // Set the cluster_head_decision Module
00093 
00094         void set_cluster_head_decision(HeadDecision_t &chd) {
00095             chd_ = &chd;
00096         }
00097 
00098         // Set the theta value
00099 
00100         void set_probability(int prob) {
00101             probability_ = prob;
00102         }
00103 
00104         // Set the maxhops value
00105 
00106         void set_maxhops(int maxhops) {
00107             maxhops_ = maxhops;
00108         }
00109 
00110         /* GET functions
00111 
00112         //get the parent value
00113          */
00114         node_id_t parent(cluster_id_t cluster_id = 0) {
00115             return it().parent(cluster_id);
00116         }
00117 
00118         //get the cluster id
00119 
00120         cluster_id_t cluster_id(size_t cluster_no = 0) {
00121             return it().cluster_id(cluster_no);
00122         }
00123 
00124         //get the hops from the cluster head
00125 
00126         size_t hops(cluster_id_t cluster_id = 0) {
00127             return it().hops(cluster_id);
00128         }
00129 
00130         /* SHOW all the known nodes */
00131 
00132         void present_neighbors() {
00133             it().present_neighbors();
00134         }
00135 
00136         int node_type() {
00137             return it().node_type();
00138         }
00139 
00140         /* CALLBACKS */
00141 
00142         /*template<class T, void(T::*TMethod)(int) >
00143         inline int reg_state_changed_callback(T *obj_pnt) {
00144             state_changed_callback_ = cluster_delegate_t::from_method<T, TMethod > (obj_pnt);
00145             return state_changed_callback_;
00146         }
00147 
00148         void unreg_changed_callback(int idx) {
00149             state_changed_callback_ = cluster_delegate_t();
00150         }
00151          */
00152 
00153 
00154         /*
00155          * Enable
00156          * enables the mbfsclustering module
00157          * enable chd it and jd modules
00158          * initializes values
00159          * registers callbacks
00160          * calls find head to start clustering
00161          * */
00162         void enable();
00163 
00164         /*
00165          * Disable
00166          * disables the bfsclustering module
00167          * unregisters callbacks
00168          * */
00169         void disable();
00170 
00171         /*
00172          * FIND_HEAD
00173          * starts clustering
00174          * decides a head and then start clustering
00175          * from the head of each cluster
00176          * */
00177         void find_head();
00178 
00179         /*
00180          * RECEIVE_NEXT
00181          *
00182          * */
00183         void receive_next(int num);
00184 
00185         /*
00186          * TIMER_EXPIRED
00187          * if timer_expired is called
00188          * clustering procedure ends
00189          * */
00190         void timer_expired(void *);
00191 
00192         void reply_to_head(void *);
00193 
00194 #ifdef DEBUG
00195 
00196         int mess_join() {
00197             return mess_join_;
00198         }
00199 
00200         int mess_join_request() {
00201             return mess_join_request_;
00202         }
00203 #endif
00204 
00205     protected:
00206         /*
00207          * RECEIVE
00208          * respond to the new messages received
00209          * callback from the radio
00210          * */
00211         void receive(node_id_t receiver, size_t len, block_data_t *data);
00212 
00213     private:
00214 
00215         int callback_id_; // receive message callback
00216         //cluster_delegate_t state_changed_callback_; // state changed callback to processor
00217         int next_callback_id_; // unused
00218 
00219         int probability_; // clustering parameter
00220         int maxhops_;
00221         static const int time_slice_ = 1000; // time to wait for cluster accept replies
00222 
00223 
00224         /* CLustering algorithm modules */
00225         HeadDecision_t * chd_;
00226 
00227         HeadDecision_t& chd() {
00228             return *chd_;
00229         }
00230         JoinDecision_t * jd_;
00231 
00232         JoinDecision_t& jd() {
00233             return *jd_;
00234         }
00235         Iterator_t * it_;
00236 
00237         Iterator_t& it() {
00238             return *it_;
00239         }
00240 
00241 #ifdef DEBUG
00242         int mess_join_;
00243         int mess_join_request_;
00244 #endif
00245 
00246 
00247         Radio * radio_; // radio module
00248         Timer * timer_; // timer module
00249         Debug * debug_; // debug module
00250         Rand * rand_;
00251 
00252         Radio& radio() {
00253             return *radio_;
00254         }
00255 
00256         Timer& timer() {
00257             return *timer_;
00258         }
00259 
00260         Debug& debug() {
00261             return *debug_;
00262         }
00263 
00264         Rand& rand() {
00265             return *rand_;
00266         }
00267 
00268 
00269     };
00270 
00271     template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P>
00272     void
00273     MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>::
00274     enable(void) {
00275 
00276         //enable the radio
00277         radio().enable_radio();
00278 
00279         //initialize the clustering modules
00280         chd().init(radio(), debug(), rand());
00281         jd().init(radio(), debug());
00282         it().init(radio(), timer(), debug());
00283 
00284         //reset_mess_counters();
00285 
00286 #ifdef DEBUG
00287         debug().debug("Enable node %x \n", radio().id());
00288 #endif
00289 
00290         // receive receive callback
00291         callback_id_
00292                 = radio().template reg_recv_callback<self_t, &self_t::receive > (this);
00293 
00294 
00295 
00296         // register next callback
00297         //next_callback_id_ = it().template reg_next_callback<self_t,
00298         //        &self_t::receive_next > (this);
00299 
00300         //enabling
00301         chd().enable();
00302         jd().enable();
00303         it().enable();
00304 
00305         // set variables of other modules
00306         chd().set_probability(probability_);
00307         jd().set_maxhops(maxhops_);
00308 
00309         chd().set_id(radio().id());
00310         jd().set_id(radio().id());
00311         it().set_id(radio().id());
00312 
00313 
00314         // start the procedure to find new head
00315         find_head();
00316     }
00317     ;
00318 
00319     template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P>
00320     void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>
00321     ::find_head(void) {
00322 #ifdef DEBUG
00323         debug().debug("stage 1 - Cluster head decision\n");
00324 #endif
00325         // if Cluster Head
00326         if (chd().calculate_head() == true) {
00327 
00328             // set values for iterator and join_decision
00329             it().set_node_type(HEAD);
00330             //it().set_cluster_id(radio().id());
00331             //jd().set_cluster_id(radio().id());
00332 
00333             jd().set_id(radio().id());
00334 
00335 #ifdef DEBUG
00336             debug().debug("stage 2 - Join\n");
00337 #endif
00338 
00339             //jd(). get join payload
00340             int mess_size = jd().get_payload_length(JOIN);
00341 
00342             block_data_t m_sid[mess_size];
00343 
00344             jd().get_join_request_payload(m_sid);
00345 
00346             // send JOIN
00347             radio().send(Radio::BROADCAST_ADDRESS, mess_size, m_sid);
00348 #ifdef DEBUG
00349             cluster_id_t cluster;
00350             uint8_t hops;
00351             memcpy(&cluster, m_sid + 1, sizeof (cluster_id_t));
00352             memcpy(&hops, m_sid + 1 + sizeof (cluster_id_t), sizeof (uint8_t));
00353             debug().debug("SEND JOIN Node %x, [%x,%d]\n", radio().id(), cluster, hops);
00354             mess_join_++;
00355 #endif
00356 
00357             // Cluster is head now
00358             //this->state_changed(CLUSTER_HEAD_CHANGED);
00359 
00360 
00361 
00362             //set the time to check for finished clustering
00363             timer().template set_timer<self_t,
00364                     &self_t::timer_expired > (2 * maxhops_ * time_slice_, this, (void*) 0);
00365 
00366         } else {
00367             timer().template set_timer<self_t,
00368                     &self_t::reply_to_head > (maxhops_ * time_slice_, this, (void*) 0);
00369 
00370         }
00371 
00372     }
00373     ;
00374 
00375     template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P>
00376     void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>
00377     ::disable(void) {
00378         // Unregister the callback
00379         radio().unreg_recv_callback(callback_id_);
00380         it().unreg_next_callback(next_callback_id_);
00381     }
00382 
00383     template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P>
00384     void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>
00385     ::receive(node_id_t from, size_t len, block_data_t* data) {
00386         if (from == radio().id()) return;
00387 
00388         // get Type of Message
00389         int type = data[0];
00390 
00391         if (type == JOIN) {
00392             if (it().node_type() != HEAD) {
00393                 if (jd().join(data, len)) {
00394                     cluster_id_t cluster;
00395                     memcpy(&cluster, data + 1, sizeof (cluster_id_t));
00396                     uint8_t hops;
00397                     memcpy(&hops, data + 1 + sizeof (cluster_id_t), 1);
00398                     if (it().add_cluster(cluster, hops, from)) {
00399 #ifdef DEBUG
00400                         debug().debug("Node %x Joined Cluster %x, %d hops away\n", radio().id(), cluster, hops);
00401 #endif
00402 
00403                         hops++;
00404                         uint8_t newjoin[len];
00405                         memcpy(newjoin, data, len);
00406                         memcpy(newjoin + 1 + sizeof (cluster_id_t), (void *) & hops, 1);
00407 
00408                         if (hops <= maxhops_) {
00409                             radio().send(Radio::BROADCAST_ADDRESS, len, newjoin);
00410 #ifdef DEBUG
00411                             debug().debug("SEND JOIN Node %x [%x,%d]\n", radio().id(), cluster, hops);
00412                             mess_join_++;
00413 #endif
00414                         }
00415                     }
00416                 }
00417             } else {
00418                 cluster_id_t cluster;
00419                 memcpy(&cluster, data + 1, sizeof (cluster_id_t));
00420                 uint8_t hops;
00421                 memcpy(&hops, data + 1 + sizeof (cluster_id_t), 1);
00422                 if (it().add_cluster(cluster, hops, from)) {
00423 #ifdef DEBUG
00424                     debug().debug("Node %x knows Cluster %x, %d hops away\n", radio().id(), cluster, hops);
00425 #endif
00426                     hops++;
00427                     uint8_t newjoin[len];
00428                     memcpy(newjoin, data, len);
00429                     memcpy(newjoin + 1 + sizeof (cluster_id_t), (void *) & hops, 1);
00430 
00431                     if (hops <= maxhops_) {
00432 
00433                         radio().send(Radio::BROADCAST_ADDRESS, len, newjoin);
00434 #ifdef DEBUG
00435                         debug().debug("SEND JOIN Node %x [%x,%d]\n", radio().id(), cluster, hops);
00436                         mess_join_++;
00437 #endif
00438                     }
00439                 }
00440 
00441 
00442             }
00443         } else if (type == JOIN_REQUEST) {
00444             debug().debug("Join req\n");
00445             if (!it().eat_request(len, data)) {
00446                 node_id_t original_sender;
00447                 memcpy(&original_sender, data + 1, sizeof (node_id_t));
00448                 cluster_id_t mess_cluster;
00449                 memcpy(&mess_cluster, data + 1 + sizeof (node_id_t), sizeof (cluster_id_t));
00450                 node_id_t dest = it().parent(mess_cluster);
00451 #ifdef DEBUG
00452                 debug().debug("To resend %x , cluster %x from %x sender %x \n", radio().id(), mess_cluster, from, original_sender);
00453 #endif
00454                 radio().send(dest, len, data);
00455 #ifdef DEBUG
00456                 debug().debug("SEND JOIN_REQUEST %x -> %x\n", radio().id(), dest);
00457                 mess_join_request_++;
00458 #endif
00459             }
00460         }
00461     }
00462 
00463     template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P>
00464     void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>
00465     ::timer_expired(void * d) {
00466 #ifdef DEBUG
00467         debug().debug("Clustering Finished %x \n", radio().id());
00468 #endif
00469         //it().present_neighbors();
00470     }
00471 
00472     template<typename OsModel_P, typename HeadDecision_P, typename JoinDecision_P, typename Iterator_P>
00473     void MocaCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P>
00474     ::reply_to_head(void * d) {
00475 #ifdef DEBUG
00476         debug().debug("stage 3 - Reply\n");
00477         //it().present_neighbors();
00478 #endif
00479 
00480         if (it().clusters_joined() < 1) {
00481 #ifdef DEBUG
00482             debug().debug("Node Joined no cluster, change to cluster_head\n");
00483             debug().debug("NODE UNCOVERED\n");
00484 
00485 #endif
00486             it().set_node_type(HEAD);
00487             timer().template set_timer<self_t,
00488                     &self_t::timer_expired > (maxhops_ * time_slice_, this, (void*) 0);
00489 
00490         } else {
00491 #ifdef DEBUG
00492             debug().debug("Reply_to_head Node %x, %d clusters\n", radio().id(), it().clusters_joined());
00493 #endif
00494             for (size_t i = 0; i < it().clusters_joined(); i++) {
00495 
00496                 cluster_id_t cluster_id = it().cluster_id(i);
00497                 node_id_t dest = it().parent(cluster_id);
00498                 size_t mess_size = it().get_payload_length(JOIN_REQUEST, cluster_id);
00499                 block_data_t mess[mess_size];
00500 #ifdef DEBUG
00501                 debug().debug("SEND JOIN_REQUEST %x -> %x ", radio().id(), dest);
00502                 mess_join_request_++;
00503 #endif
00504                 it().get_join_request_payload(mess, cluster_id);
00505 
00506                 radio().send(dest, mess_size, mess);
00507             }
00508 
00509         }
00510 
00511     }
00512 }
00513 #endif   /* _MOCA_MocaCore_H */
00514 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines