Wiselib
wiselib.testing/algorithms/cluster/maxmind/maxmind.h
Go to the documentation of this file.
00001 #ifndef __MAXMIND_CLUSTER_FORMATION_H_
00002 #define __MAXMIND_CLUSTER_FORMATION_H_
00003 
00004 #include "util/delegates/delegate.hpp"
00005 #include "algorithms/cluster/clustering_types.h"
00006 #include "util/base_classes/clustering_base.h"
00007 
00008 #undef DEBUG
00009 // Uncomment to enable Debug
00010 #define DEBUG
00011 
00012 namespace wiselib {
00020     template<typename OsModel_P,
00021     typename HeadDecision_P,
00022     typename JoinDecision_P,
00023     typename Iterator_P>
00024     class MaxmindCore
00025     : public ClusteringBase <OsModel_P> {
00026     public:
00027 
00028         //TYPEDEFS
00029         typedef OsModel_P OsModel;
00030         typedef HeadDecision_P HeadDecision_t;
00031         typedef JoinDecision_P JoinDecision_t;
00032         typedef Iterator_P Iterator_t;
00033         typedef typename OsModel::Radio Radio;
00034         typedef typename OsModel::Timer Timer;
00035         typedef typename OsModel::Debug Debug;
00036         typedef MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P, Iterator_P> self_t;
00037 
00038         //DATA TYPES
00039         typedef int cluster_id_t;
00040         typedef int cluster_level_t; //quite useless within current scheme, supported for compatibility issues
00041         typedef typename Radio::node_id_t node_id_t;
00042         typedef typename Radio::size_t size_t;
00043         typedef typename Radio::block_data_t block_data_t;
00044 
00045         //delegates
00046         typedef delegate1<void, int> cluster_delegate_t;
00047 
00048         /*
00049          * Constructor
00050          * */
00051         MaxmindCore() :
00052         maxhops_(5),
00053         round_(0) {
00054         }
00055         ;
00056 
00057         /*
00058          * Destructor
00059          * */
00060         ~MaxmindCore() {
00061         }
00062         ;
00064 
00065         /*
00066          * INIT
00067          * initializes the values of radio timer and debug
00068          */
00069         void init(Radio& radio, Timer& timer, Debug& debug) {
00070             radio_ = &radio;
00071             timer_ = &timer;
00072             debug_ = &debug;
00073         };
00074 
00075         bool is_cluster_head(){
00076             return chd().is_cluster_head();
00077         }
00078 
00079         /* SET functions */
00080 
00081         // Set the iterator Module
00082 
00083         void set_iterator(Iterator_t &it) {
00084             it_ = &it;
00085         }
00086 
00087         // Set the join_decision Module
00088 
00089         void set_join_decision(JoinDecision_t &jd) {
00090             jd_ = &jd;
00091         }
00092 
00093         // Set the cluster_head_decision Module
00094 
00095         void set_cluster_head_decision(HeadDecision_t &chd) {
00096             chd_ = &chd;
00097         }
00098 
00099         // Set the theta value
00100 
00101         void set_maxhops(int maxhops) {
00102             maxhops_ = maxhops;
00103         }
00104 
00105         /* GET functions */
00106 
00107         // Get the Node Parent
00108 
00109         node_id_t parent(void) {
00110             return it().parent();
00111         }
00112 
00113         // Get the cluster_id
00114 
00115         cluster_id_t cluster_id(void) {
00116             return it().cluster_id();
00117         }
00118 
00119         // Get the node_type
00120 
00121         cluster_id_t node_type(void) {
00122             return it().node_type();
00123         }
00124 
00125         int hops(){
00126             return it().hops();
00127         }
00128 
00129         //MAXMIND ONLY CALLBACKS
00130 
00131         /*
00132          * callback from the cluster_head_decision_
00133          * When a head is to be decided, chd_
00134          * needs the winner vector from jd_
00135          *
00136          * argument: pointer to the winner array
00137          */
00138         void winner(node_id_t * mem_pos) {
00139             // get the winner from join_decision_
00140             jd().get_winner(mem_pos);
00141 
00142         }
00143 
00144         /* callback from the cluster_head_decision_
00145          * When a head is to be decided, chd_
00146          * needs the sender vector from jd_
00147          *
00148          * argument: pointer to the sender array
00149          */
00150         void sender(node_id_t * mem_pos) {
00151             // get the sender from join_decision_
00152             jd().get_sender(mem_pos);
00153 
00154         }
00155 
00156 
00157         /* SHOW all the known nodes */
00158         void present() {
00159             it().present_neibhors();
00160 
00161         }
00162 
00163         /*
00164          * Enable
00165          * enables the bfsclustering module
00166          * initializes values
00167          * registers callbacks         
00168          * */
00169         void enable(void);
00170         /*
00171          * Disable
00172          * disables the bfsclustering module
00173          * unregisters callbacks
00174          * */
00175         void disable(void);
00176 
00177         void timer_expired(void * data);
00178 
00179         /*
00180          * Find_head
00181          * starts clustering procedure
00182          * */
00183         void find_head();
00184 
00185         /*
00186          * Convergecast
00187          * Start a procedure where
00188          * outer nodes inform inner nodes for their status
00189          * */
00190         void convergecast(void * data);
00191         //useless now
00192         void receive_next(int num);
00193 
00194 
00195 #ifdef DEBUG
00196 
00197         int mess_flood() {
00198             return mess_flood_;
00199         }
00200 
00201         int mess_inform() {
00202             return mess_inform_;
00203 
00204         }
00205 
00206         int mess_convergecast() {
00207             return mess_convergecast_;
00208         }
00209 
00210         int mess_rejoin() {
00211             return mess_rejoin_;
00212         }
00213 #endif
00214 
00215 
00216     protected:
00217         /*
00218          * RECEIVE
00219          * respond to the new messages received
00220          * callback from the radio
00221          * */
00222         void receive(node_id_t receiver, size_t len, block_data_t *data);
00223 
00224     private:
00225 
00226 
00227         int callback_id_; //receive callbalck
00228         int next_callback_id_; //
00229         int winner_callback_id_; // get the winner list callback chd_<>jd_
00230         int sender_callback_id_; // get the sender list callback chd_<>jd_
00231         int maxhops_; // clustering parameter
00232         int round_; // the "synchronous" round of the algorithm
00233 
00234 #ifdef DEBUG
00235         int mess_flood_;
00236         int mess_inform_;
00237         int mess_convergecast_;
00238         int mess_rejoin_;
00239 
00240         void inc_mess_flood() {
00241             mess_flood_++;
00242         }
00243 
00244         void inc_mess_inform() {
00245             mess_inform_++;
00246 
00247         }
00248 
00249         void inc_mess_convergecast() {
00250             mess_convergecast_++;
00251         }
00252 
00253         void inc_mess_rejoin() {
00254             mess_rejoin_++;
00255         }
00256 
00257         void reset_mess_counters() {
00258             mess_flood_ = 0;
00259             mess_inform_ = 0;
00260             mess_convergecast_ = 0;
00261             mess_rejoin_ = 0;
00262         }
00263 #endif
00264 
00265 
00266         HeadDecision_t *chd_; // cluster_head_decision_ module
00267         JoinDecision_t *jd_; // join_decision_ module
00268         Iterator_t *it_; // iterator_ module
00269 
00270         HeadDecision_t& chd() {
00271             return *chd_;
00272         }
00273 
00274         JoinDecision_t& jd() {
00275             return *jd_;
00276         }
00277 
00278         Iterator_t& it() {
00279             return *it_;
00280         }
00281 
00282 
00283         static const int time_slice_ = 1000; // timeslice for checking once per 1500 msec (simulates an once per round)
00284 
00285         Radio * radio_; // radio module
00286         Timer * timer_; // timer module
00287         Debug * debug_; // debug module
00288 
00289         Radio& radio() {
00290             return *radio_;
00291         }
00292 
00293         Timer& timer() {
00294             return *timer_;
00295         }
00296 
00297         Debug& debug() {
00298             return *debug_;
00299         }
00300 
00301     };
00302 
00303     template<typename OsModel_P,
00304     typename HeadDecision_P,
00305     typename JoinDecision_P,
00306     typename Iterator_P>
00307     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,
00308     Iterator_P>::enable(void) {
00309         // initialize
00310         chd().init(radio(), debug());
00311         jd().init(radio(), debug());
00312         it().init(radio(), timer(), debug());
00313 #ifdef DEBUG
00314         //reset values
00315         reset_mess_counters();
00316 #endif
00317         round_ = 0;
00318         it().enable();
00319         chd().enable();
00320         jd().enable();
00321 
00322         // Enable the Radio
00323 
00324         radio().enable_radio();
00325 
00326 
00327 
00328 
00329 
00330 
00331 
00332         // Register receive callback
00333         // to enable receiving new messages
00334         callback_id_
00335                 = radio().template reg_recv_callback<self_t, &self_t::receive > (
00336                 this);
00337         // Set os pointer for iterator
00338 
00339 
00340         // register next callback
00341         next_callback_id_ = it().template reg_next_callback<self_t,
00342                 &self_t::receive_next > (this);
00343 
00344         //MAXMIND
00345         //register the callback from the cluster_head_decision_ to the iterator
00346         winner_callback_id_ = chd().template reg_winner_callback<self_t,
00347                 &self_t::winner > (this);
00348         sender_callback_id_ = chd().template reg_sender_callback<self_t,
00349                 &self_t::sender > (this);
00350 
00351         //MAXMIND
00352         chd().set_theta(maxhops_);
00353         jd().set_id(radio().id());
00354         chd().set_id(radio().id());
00355         it().set_id(radio().id());
00356         jd().set_theta(maxhops_);
00357         
00358 
00359         /*
00360          * All needed structures and modules
00361          * are now initialized and we can start
00362          * clustering
00363          *
00364          * Call find_head() to find the new clusterhead
00365          *
00366          * */
00367         find_head();
00368     }
00369 
00370     template<typename OsModel_P,
00371     typename HeadDecision_P,
00372     typename JoinDecision_P,
00373     typename Iterator_P>
00374     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,    Iterator_P>::disable(void) {
00375         // Unregister the callbacks
00376         radio().unreg_recv_callback(callback_id_);
00377         it().unreg_next_callback(next_callback_id_);
00378         chd().unreg_winner_callback(winner_callback_id_);
00379         chd().unreg_sender_callback(sender_callback_id_);
00380         // Disable the Radio
00381         //radio().disable();
00382 
00383     }
00384 
00385     template<typename OsModel_P,
00386     typename HeadDecision_P,
00387     typename JoinDecision_P,
00388     typename Iterator_P>
00389     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,
00390     Iterator_P>::receive(node_id_t from, size_t len, block_data_t* data) {
00391         // if message is from myself Ignore it
00392         if (radio().id() == from) return;
00393 
00394         // data[0] shows the type of the message
00395         int type = data[0];
00396 
00397         // Check all supported Message Types and act differently
00398         if (type == FLOOD) {
00399 #ifdef DEBUG
00400             debug().debug("RECEIVED FLOOD %x <- %x\n", radio().id(), from);
00401 #endif
00402             /*
00403              * If a flooding message then pass it to jd_
00404              * First or Second Stage of clustering so
00405              * it is needed to build the sender and winner lists
00406              *
00407              * */
00408             jd().join(data, len);
00409         }
00410         if (type == INFORM) {
00411 
00412 #ifdef DEBUG
00413             debug().debug("RECEIVED INFORM %x <- %x\n", radio().id(), from);
00414 #endif
00415             /*
00416              * If an inform message then pas it to it_
00417              * 4th stage of clustering
00418              * get to know the neiborhod
00419              * */
00420             it().inform(data, len);
00421         }
00422         if (type == CONVERGECAST) {
00423 
00424             /*
00425              * If a convergecast message check if the
00426              * destination else ignore
00427              *
00428              * cluster_heads End Convergecast Messages
00429              * simple_nodes Forward Convergecast Messages
00430              * gateway_nodes Start Convergecast Messages
00431              *
00432              * */
00433 
00434 #ifdef DEBUG
00435                 debug().debug("RECEIVED CONVERGECAST %x <- %x\n", radio().id(), from);
00436 #endif
00437 
00438                 // if cluster head finish the convergecast
00439                 if (is_cluster_head()) {
00440 #ifdef DEBUG
00441                     debug().debug("Node_type= HEAD\n");
00442 #endif
00443                     cluster_id_t child_cluster;
00444                     memcpy(&child_cluster,data+1+2*sizeof(node_id_t),sizeof(cluster_id_t));
00445                     // Check if Child Node needs to correct its cluster_id
00446                     if (cluster_id() != child_cluster) {
00447 #ifdef DEBUG
00448                         debug().debug(
00449                                 "status= WRONG message_cluster= %x my_cluster= %x\n",
00450                                 child_cluster,
00451                                 cluster_id()
00452                                 );
00453 #endif
00454                         // RULE 4 of CLustering
00455                         /*
00456                          * To correct a nodes cluster_head
00457                          * send a REJOIN message to the node in question
00458                          * */
00459                         // rejoin messages have size 6
00460                         size_t mess_size = it().get_payload_length(REJOIN);
00461                         // create the rejoin message
00462                         block_data_t m_sid[mess_size];
00463                         node_id_t child_id;
00464                         memcpy(&child_id,data+1+sizeof(node_id_t),sizeof(node_id_t));                                
00465 #ifdef DEBUG
00466                         debug().debug("SEND REJOIN %x -> %x ",
00467                                 radio().id(),
00468                                 from
00469                                 );
00470                         inc_mess_rejoin();
00471 #endif
00472                         it().get_rejoin_payload(m_sid, child_id);
00473 
00474                         // do send the message
00475                         radio().send(from, mess_size,
00476                                 m_sid);
00477 
00478 
00479                         it().remove_from_non_cluster(child_id);
00480                         it().add_to_cluster(child_id);
00481                     }// Same cluster , just inform my structs
00482                     else {
00483 #ifdef DEBUG
00484                         debug().debug("status= CORRECT\n");
00485 #endif
00486                     }
00487 
00488                     it().eat_convergecast(data, len);
00489 
00490                     
00491 
00492                 }// if the node is a simple node forward the message to the cluster_head
00493             else {
00494 
00495                 //Get the data from the message
00496                 it().eat_convergecast(data, len);
00497 
00498 #ifdef DEBUG
00499                 debug().debug("Node_type= SIMPLE\n");
00500 #endif
00501 
00502 
00503                 radio().send(it().parent(), len, data);
00504 
00505 #ifdef DEBUG
00506                 debug().debug("SEND CONVERGECAST %x -> %x \n",
00507                         radio().id(),
00508                         it().parent()
00509                         );
00510                 inc_mess_convergecast();
00511 #endif
00512 
00513 
00514 
00515             }
00516 
00517 
00518         }
00519 
00520         /*
00521          * If a REJOIN message check for
00522          * cluster id problems else forward or ignore
00523          *
00524          *  nodes forward and check rejoin messages that were
00525          *  sent only from their selected parents
00526          * */
00527 
00528         if (type == REJOIN) {
00529 
00530             // local message copy
00531             block_data_t payload[len];
00532             memcpy(payload, data, len);
00533 #ifdef DEBUG
00534             debug().debug("RECEIVED REJOIN %x <- %x\n", radio().id(), from);
00535 #endif
00536             // get message destination node
00537             node_id_t destination_node ;
00538             memcpy(&destination_node, payload+1,sizeof(node_id_t));
00539             // get message ttl
00540             size_t ttl;
00541             memcpy(&ttl,payload+1+sizeof(node_id_t)+sizeof(cluster_id_t),sizeof(size_t));
00542 
00543             /*
00544              * if the destination node
00545              * check and correct the cluster head
00546              * stop the message from moving forward
00547              * */
00548             if (destination_node == radio().id()) {
00549 
00550 
00551 
00552                 // get the new cluster_id from the message
00553                 cluster_id_t mess_cluster_id ;
00554                 memcpy(&mess_cluster_id,payload+1+sizeof(node_id_t),sizeof(cluster_id_t));
00555 
00556                 // change my cluster_id if there is a difference
00557                 if (mess_cluster_id!= cluster_id()) {
00558 #ifdef DEBUG
00559                     debug().debug("action= REJOIN old= %x new= %x\n", cluster_id(), mess_cluster_id);
00560 #endif
00561                     // do change the cluster id
00562                     it().set_cluster_id(mess_cluster_id);
00563 
00564                     this->state_changed(CLUSTER_HEAD_CHANGED); // callback to wiselib.processor
00565                 }
00566             }/*
00567               * if not the destination node
00568               * check to see if it was sent by my parent
00569               * if it was use it to check my cluster_id
00570               * and forward to my children
00571               * */
00572             else {
00573 
00574 
00575                 if (!is_cluster_head()) {
00576                     // check the sender of the message
00577                     if (it().parent() == from) {
00578                         // get the new cluster_id from the message
00579                         cluster_id_t mess_cluster_id;
00580                         memcpy(&mess_cluster_id, payload + 1 + sizeof (node_id_t), sizeof (cluster_id_t));
00581                         // change my cluster_id if there is a difference
00582                         if (mess_cluster_id != cluster_id()) {
00583                             // do change the cluster id
00584                             it().set_cluster_id(mess_cluster_id);
00585 
00586                             this->state_changed(CLUSTER_HEAD_CHANGED); // callback to wiselib.processor
00587                         }
00588 
00589                         /*
00590                          * if the message ttl has not
00591                          * expired then forward the message
00592                          * */
00593                         if (--ttl >= 0) {
00594                             // get the destination node from the original message
00595 
00596                             // set the new ttl value
00597                             memcpy(payload + 1 + sizeof (node_id_t) + sizeof (cluster_id_t), &ttl, sizeof (size_t));
00598 #ifdef DEBUG
00599                             debug().debug("action= FORWARD\n");
00600                             debug().debug("SEND REJOIN %x -> %x [%x |%x |%d ]\n",
00601                                     radio().id(),
00602                                     Radio::BROADCAST_ADDRESS,
00603                                     destination_node,
00604                                     mess_cluster_id,
00605                                     ttl
00606                                     );
00607 
00608                             inc_mess_rejoin();
00609 #endif
00610 
00611                             // do forward the message
00612                             radio().send(Radio::BROADCAST_ADDRESS, len, payload);
00613                         } else {
00614                         }
00615                     } else {
00616                     }
00617                 }
00618             }
00619 
00620         }
00621 
00622 
00623     }
00624 
00625     template<typename OsModel_P,
00626     typename HeadDecision_P,
00627     typename JoinDecision_P,
00628     typename Iterator_P>
00629     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,
00630     Iterator_P>::find_head() {
00631 #ifdef DEBUG
00632         if (round_ == 0) {
00633             debug().debug("1st stage - flooding\n");
00634         }
00635 #endif
00636         if (round_ < 2 * maxhops_) {
00637 
00638 
00639             // get the message size
00640             size_t mess_size = jd().get_payload_length(FLOOD);
00641             // check if a flood_payload is supported
00642             block_data_t m_sid[mess_size];
00643 
00644 #ifdef DEBUG
00645             debug().debug("SEND FLOOD %x -> %x ",
00646                     radio().id(),
00647                     Radio::BROADCAST_ADDRESS
00648                     );
00649             inc_mess_flood();
00650 #endif
00651             // get the payload
00652             jd().get_flood_payload(m_sid);
00653 
00654             // send the FLOOD message
00655             radio().send(Radio::BROADCAST_ADDRESS, mess_size, m_sid);
00656 
00657 
00658 
00659 
00660             round_++; //move to next round
00661 
00662             // Reset the timer
00663             timer().template set_timer<self_t, &self_t::timer_expired > (
00664                     time_slice_, this, (void*) 0);
00665         } else {
00666 #ifdef DEBUG
00667             debug().debug("stage 2 - calculate head\n");
00668 #endif
00669 
00670             if (chd().calculate_head()) {
00671 
00672                 // i am cluster head
00673                 cluster_id_t cluster_id = chd().cluster_id();
00674                 node_id_t parent = chd().parent();
00675                 it().set_cluster_id(cluster_id);
00676                 it().set_node_type(HEAD); // set my node_type as head
00677                 it().set_parent(parent); // set myself as my parent (USED for visualization)
00678                 it().set_hops(0); // set hop distance from head
00679 
00680 
00681                 this->state_changed(CLUSTER_HEAD_CHANGED);
00682 
00683             } else {
00684                 // i am not cluster_head
00685 
00686                 cluster_id_t cluster_id = chd().cluster_id();
00687                 node_id_t parent = chd().parent();
00688                 it().set_node_type(UNCLUSTERED); // set my node type as simple node
00689                 it().set_parent(parent); // set my parent
00690                 it().set_cluster_id(cluster_id);
00691 
00692 
00693             }
00694 
00695             // notify neibhors for my cluster_id
00696 #ifdef DEBUG
00697             debug().debug(" head= DECIDED\n");
00698             debug().debug("stage 3 - send inform\n");
00699 #endif
00700 
00701 
00702             // get the message size
00703             int mess_size = it().get_payload_length(INFORM);
00704             // check if the inform_payload exists
00705 
00706             // get the payload
00707             block_data_t m_sid[mess_size];
00708 #ifdef DEBUG
00709             debug().debug("SEND INFORM %x -> %x ",
00710                     radio().id(),
00711                     Radio::BROADCAST_ADDRESS
00712                     );
00713 
00714             inc_mess_inform();
00715 #endif
00716             it().get_inform_payload(m_sid);
00717 
00718             // send the payload
00719             radio().send(Radio::BROADCAST_ADDRESS, mess_size, m_sid);
00720 
00721             round_++; // move to next round
00722 
00723 
00724             //set timer and after expire if gateway start convergecast
00725             timer().template set_timer<self_t, &self_t::convergecast > (
00726                     time_slice_, this, (void*) 0);
00727 
00728 
00729         }
00730 
00731     }
00732 
00733     template<typename OsModel_P,
00734     typename HeadDecision_P,
00735     typename JoinDecision_P,
00736     typename Iterator_P>
00737     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,
00738     Iterator_P>::receive_next(int num) {
00739     }
00740 
00741     template<typename OsModel_P,
00742     typename HeadDecision_P,
00743     typename JoinDecision_P,
00744     typename Iterator_P>
00745     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,
00746     Iterator_P>::timer_expired(void * data) {
00747         // if timer expired check to find head
00748         find_head();
00749 
00750     }
00751 
00752     // timer expired2
00753 
00754     template<typename OsModel_P,
00755     typename HeadDecision_P,
00756     typename JoinDecision_P,
00757     typename Iterator_P>
00758     void MaxmindCore<OsModel_P, HeadDecision_P, JoinDecision_P,
00759     Iterator_P>::convergecast(void * data) {
00760 #ifdef DEBUG
00761         debug().debug("stage 4 - convergecast\n");
00762 #endif
00763 
00764 
00765         /*
00766          * After the inform stage all gateway nodes
00767          * start reporting to their cluster heads
00768          *
00769          * Only gateway nodes report their status
00770          *
00771          * */
00772         // check i a gateway node
00773         if (node_type() == GATEWAY) {
00774 #ifdef DEBUG
00775             debug().debug("Convergecast %x\n", radio().id());
00776 #endif
00777             // create the convergecast message
00778             int mess_size = it().get_payload_length(CONVERGECAST);
00779 
00780             block_data_t m_sid[mess_size];
00781 #ifdef DEBUG
00782 
00783             debug().debug("SEND CONVERGECAST %x -> %x ",
00784                     radio().id(),
00785                     it().parent()
00786                     );
00787             inc_mess_convergecast();
00788 #endif
00789             it().get_convergecast_payload(m_sid);
00790             // do send the convergecast messages
00791             radio().send(it().parent(), mess_size, m_sid);
00792 
00793         }
00794 #ifdef DEBUG
00795         debug().debug("waiting...");
00796 #endif
00797 
00798 
00799     }
00800 
00801 
00802 }
00803 
00804 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines