Wiselib
wiselib.testing/algorithms/aggregation/aggregation.h
Go to the documentation of this file.
00001 /* 
00002  * File:   aggregation.h
00003  * Author: Koninis
00004  *
00005  * Created on January 22, 2011, 1:16 PM
00006  */
00007 
00008 #ifndef AGGREGATION_H
00009 #define  AGGREGATION_H
00010 
00011 //wiselib includes
00012 #include "algorithms/routing/tree/tree_routing.h"
00013 #include "util/delegates/delegate.hpp"
00014 #include "util/pstl/vector_static.h"
00015 #include "util/pstl/pair.h"
00016 #include "aggregate.h"
00017 #include "aggregationmsg.h"
00018 
00019 #define DEBUG_AGGREGATION
00020 
00021 namespace wiselib {
00022 
00023     template<typename OsModel_P, typename Radio_P,typename Debug_P,
00024             typename Cluster_P, typename Aggregate_P, typename Routing_P>
00025     class Aggregation {
00026     public:
00027         // Type definitions
00028         typedef OsModel_P OsModel;
00029         typedef Radio_P Radio;
00030         typedef Debug_P Debug;
00031         typedef Cluster_P Cluster;
00032 
00033         typedef typename OsModel_P::Clock Clock;
00034         typedef typename OsModel_P::Timer Timer;
00035         typedef Aggregate_P Aggregate_t;
00036         typedef typename Aggregate_t::value_t Aggregate_value_t;
00037 
00038         typedef Routing_P tree_routing_t;
00039 
00040         typedef AggregateMsg<OsModel,Radio> msg_t;
00041 
00042         typedef typename Radio::node_id_t node_id_t;
00043         typedef typename Radio::size_t size_t;
00044         typedef typename Radio::block_data_t block_data_t;
00045         typedef typename Radio::message_id_t message_id_t;
00046         typedef typename Clock::time_t time_t;
00047 
00048         typedef typename Radio::ExtendedData ExData;
00049         typedef typename Radio::TxPower TxPower;
00050 
00051 //        typedef EchoMsg<OsModel, Radio> EchoMsg_t;
00052         typedef Aggregation<OsModel_P, Radio_P, Debug_P, Cluster_P, Aggregate_P, Routing_P> self_t;
00053         TxPower power;
00054 
00055         // Vector containing the aggregates that the node is going to combine.
00056         typedef wiselib::vector_static<OsModel, Aggregate_t, 10> aggregates_vector_t;
00057 
00058         // Iterators for the aggregates_vector_t
00059         typedef typename aggregates_vector_t::iterator iterator_t;
00060 
00064         aggregates_vector_t aggregates_vector;
00065 
00066         typedef delegate4<void, uint8_t, node_id_t, uint8_t, uint8_t*>
00067             event_notifier_delegate_t;
00068 //        typedef status_delegate_t radio_delegate_t;
00069 
00070         
00071         enum status_codes {
00072          RUNNING = 0,
00073          WAITING = 1,
00074          RECEIVING_VALUES = 2
00075         };
00076 
00077         // --------------------------------------------------------------------
00078         enum node_roles {
00079             LEAF_NODE = 0, 
00080             NORMAL_NODE = 1, 
00081             INTERMEDIATE_NODE = 2, 
00082             SINK = 3 
00083         };
00084 
00088         Aggregation() {
00089             set_role(NORMAL_NODE);
00090             set_status(RECEIVING_VALUES);
00091         };
00092 
00093         /*
00094          * Destructor
00095          */
00096         ~Aggregation() {
00097         };
00098 
00102         void init (Radio& radio, Timer& timer, Clock& clock, Debug& debug, Cluster& cluster, tree_routing_t& tree) {
00103             radio_ = &radio;
00104             timer_ = &timer;
00105             clock_ = &clock;
00106             debug_ = &debug;
00107             cluster_ = &cluster;
00108             tree_routing_ = &tree;
00109         };
00110 
00111         /*
00112          * Enable the Aggregation system
00113          * enable radio and register receive callback
00114          * initialize vectors
00115          * change status to SEARCHING
00116          * start sending hello messages
00117          * */
00118         void enable() {
00119 
00120             //enable normal radio
00121             radio().enable_radio();
00122             recv_callback_id_ =radio().template reg_recv_callback<self_t,
00123                     &self_t::receive > ( this);
00124 
00125             // initialize vectors and variables
00126             init_aggregation();
00127         };
00128 
00129         // --------------------------------------------------------------------
00130 
00131         /*
00132          * Disable the Aggregation system
00133          * */
00134         void disable() {
00135 //            radio().disable_radio();
00136         };
00137 
00141         void init_aggregation() {
00142             //blah blah
00143          //Klatu barada nikto
00144         };
00145 
00151         Aggregate_t combine_aggregates(Aggregate_t agg1, Aggregate_t agg2) {
00152             return agg1.combine(agg2);
00153         };
00154 
00155         Aggregate_t combine_all_aggregates() {
00156             iterator_t next_aggregate = aggregates_vector.begin();
00157             Aggregate_t result;
00158 
00159          if (aggregates_vector.size() == 1) return *next_aggregate;
00160 
00161             for (; next_aggregate != aggregates_vector.end(); next_aggregate++) {
00162 //             debug().debug("agg::combine_all_aggregates::%d %d %d %d",radio().id(),result.get(),(*next_aggregate).get(),aggregates_vector.size());
00163                result = combine_aggregates(result, *next_aggregate);
00164   //           debug().debug("%d\n",result.get());
00165             }
00166             return result;
00167         };
00168 
00173         void extract_aggregates() {
00174 
00175         };
00176 
00177         uint16_t msgs_count() {
00178             return msgs_stats.aggregation_msg_count;
00179         };
00180 
00181         uint32_t msgs_size() {
00182             return msgs_stats.aggregation_msg_size;
00183         };
00184 
00185         void send(Aggregate_t aggregate) {
00186 
00187             if (node_role == SINK) {
00188                 aggregates_vector.push_back(aggregate);
00189 //                extract_aggregates();
00190             } else if (get_next_node() != radio().id()) {
00191             if ((cluster_->childs_count() == 1) && aggregates_vector.size() == 0) {
00192                             msg_t aggMsg;
00193                             aggregate.writeTo(aggMsg.payload());
00194                             aggMsg.set_payload_size(aggregate.size());
00195 
00196 //                            debug().debug("aggregation::send::from%x::type%d::to%x::%d:: sending value to parent",radio().id(), msg_t::AGG_MESSAGE_TYPE, get_next_node(), aggregate.get());
00197 #ifdef DEBUG_AGGREGATION
00198 //                            debug().debug("AGGS;%x;%d;%x;%d",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get());
00199 //                            debug().debug("aggregation::send::from%x::type%d::to%x::%d:: sending value to parent",
00200 //radio().id(), msg_t::AGG_MESSAGE_TYPE, get_next_node(), aggregate.get());
00201 #endif
00202                     radio().send(get_next_node(),
00203                             aggMsg.buffer_size(),
00204                             (uint8_t *) &aggMsg);
00205                     aggregates_vector.clear();
00206             }
00207             else {
00208 #ifdef DEBUG_AGGREGATION2
00209 //                            debug().debug("aggregation::send::%x waiting from children values",radio().id());
00210 #endif
00211                             aggregates_vector.push_back(aggregate);
00212             }
00213 
00214             } else if (get_next_node() == radio().id()) {//node_role == INTERMEDIATE_NODE) {
00215                 if ( cluster_->childs_count() == (aggregates_vector.size()-1)
00216                         || (cluster_->childs_count()==0)) {
00217 
00218                     msg_t aggMsg;
00219                     aggregate.writeTo(aggMsg.payload());
00220                     aggMsg.set_payload_size(aggregate.size());
00221                     aggMsg.set_level(tree_routing_->hops());
00222 
00223                     radio().send(radio().BROADCAST_ADDRESS,
00224                             aggMsg.buffer_size(),
00225                             (uint8_t *) &aggMsg);
00226 //                    debug().debug("AGGS;%x;%d;%x;%d",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get());
00227 //                    tree_routing_->send(0, aggMsg.buffer_size(), (uint8_t *) &aggMsg);
00228                     aggregates_vector.clear();
00229                     aggregates_vector.push_back(aggregate);
00230 
00231                 }
00232                 else {
00233                     timer().template set_timer<self_t, &self_t::wait_for_chlds> (500, this, 0);
00234                     aggregates_vector.push_back(aggregate);
00235                 }
00236                 //send_to_sink();
00237             }
00238 
00239         };
00240 
00241         void wait_for_chlds(void *data) {
00242 //            debug().debug("aggregation::wait_for_chlds timeout ad %x",radio().id());
00243             if (aggregates_vector.size()>1 && tree_routing_->hops()!=0) {
00244 //                    debug().debug("aggregation::send::%x::%x::",radio().id(),radio().BROADCAST_ADDRESS);
00245                 Aggregate_t aggregate = combine_all_aggregates();
00246                 msg_t aggMsg;
00247                 aggregate.writeTo(aggMsg.payload());
00248                 aggMsg.set_payload_size(aggregate.size());
00249                 aggMsg.set_level(tree_routing_->hops());
00250 
00251                 radio().send(radio().BROADCAST_ADDRESS,
00252                         aggMsg.buffer_size(),
00253                         (uint8_t *) &aggMsg);
00254 
00255     //                    tree_routing_->send(0, aggMsg.buffer_size(), (uint8_t *) &aggMsg);
00256                 aggregates_vector.clear();
00257                 aggregates_vector.push_back(aggregate);
00258             }
00259 
00260         };
00261 
00265         void set_role(uint8_t role) {
00266              node_role = role;
00267         };
00268 
00269 
00270     private:
00278         void receive(node_id_t from, size_t len, block_data_t *msg, ExData const &ex) {
00279 
00280          if (*msg==msg_t::AGG_MESSAGE_TYPE) {
00281                     msg_t *amsg = (msg_t *)msg;
00282                     block_data_t *payload = amsg->payload();
00283 
00284 #ifdef DEBUG_AGGREGATION
00285 //                debug().debug("aggregation::receive::%d received from %d next_node %d level %d my level %d",
00286 //                               radio().id(),from, get_next_node(),amsg->level(),tree_routing_->hops());
00287 #endif
00288                     if (amsg->level() == msg_t::IN_CLUSTER) {
00289                             debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00290 
00291                         if (node_role == SINK) {
00292                             aggregates_vector.push_back(Aggregate_t(payload));
00293 
00294 //                            debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00295 //                            debug().debug("AGGR;%x;%d;%x;%d;SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() );
00296                                 //TODO
00297                         } else if (get_next_node() != radio().id()) {//node_role == NORMAL_NODE) {
00298 
00299 //                            debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00300 //                                debug().debug("AGGR;%x;%d;%x;%d",
00301 //                                radio_->id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() );
00302                                 if (cluster_->childs_count() == (aggregates_vector.size())) {
00303                                         send(combine_all_aggregates());
00304                                 }
00305                                 else {
00306                                         aggregates_vector.push_back(Aggregate_t(payload));
00307 //                                        timer().template set_timer<self_t, &self_t::wait_for_chlds> (3000, this, 0);
00308                                 }
00309 
00310                         } else if (get_next_node() == radio().id()) {//node_role == INTERMEDIATE_NODE) {
00311 
00312                                 aggregates_vector.push_back(Aggregate_t(payload));
00313  //                           debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00314 //                                debug().debug("AGGR;%x;%d;%x;%d", radio_->id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() );
00315 //                                debug().debug("aggregation::receive::to%x::type%d::from%x::%d:: from cluster", radio_->id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() );
00316                                 if ( cluster_->childs_count() == (aggregates_vector.size()-1) ) {
00317 //                                    debug().debug("aggregation::receive::%x send to sink value: %x", radio_->id(), combine_all_aggregates().get() );
00318                                     send(combine_all_aggregates());
00319                                 }
00320                                 //TODO
00321                         }
00322                     } else if (amsg->level() == (tree_routing_->hops() + 1)) {
00323                             debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00324 
00325                         if (node_role == SINK) {
00326                             aggregates_vector.push_back(Aggregate_t(payload));
00327 //                            debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00328 //                            debug().debug("AGGR;%x;%d;%x;%d;SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get() );
00329 //                            debug().debug("aggregation::receive::to%x::type%d::from%x::%d:: at SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, from, combine_all_aggregates().get());
00330                         } else {
00331                             Aggregate_t aggregate(payload);
00332                             aggregates_vector.push_back(Aggregate_t(payload));
00333                             aggregate = combine_all_aggregates();
00334  //                           debug_->debug("AGGAV;%x;%d;", radio_->id(), combine_all_aggregates().get());//greedy_partition_w_.get());
00335  //                           debug().debug("AGGS;%x;%d;%x;%d;SINK",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get());
00336  //                           debug().debug("aggregation::send::from%x::type%d::to%x::%d::",radio().id(), msg_t::AGG_MESSAGE_TYPE, radio().BROADCAST_ADDRESS, aggregate.get());
00337                             aggregates_vector.clear();
00338                             aggregates_vector.push_back(aggregate);
00339 
00340                             msg_t aggMsg;
00341                             aggregate.writeTo(aggMsg.payload());
00342                             aggMsg.set_payload_size(aggregate.size());
00343                             aggMsg.set_level(tree_routing_->hops());
00344 
00345                             radio().send(radio().BROADCAST_ADDRESS,
00346                                     aggMsg.buffer_size(),
00347                                     (uint8_t *) &aggMsg);
00348                         }
00349                     }
00350          }
00351         };
00352 
00353         node_id_t get_next_node() {
00354             return cluster_->parent();
00355         };
00356 
00362         void set_status(uint8_t status) {
00363              status_ = status;
00364         };
00365 
00369         int status() {
00370             return status_;
00371         };
00372 
00373         int recv_callback_id_; // callback for receive function
00374         uint8_t status_; // status of the module
00375 
00376         tree_routing_t * tree_routing_;
00377         Cluster * cluster_;
00378 
00384         uint8_t node_role;
00385 
00386         struct messages_statistics {
00387             uint16_t aggregation_msg_count;    
00388             uint32_t aggregation_msg_size;     
00389         }msgs_stats;
00390 
00391         Radio * radio_;
00392         Clock * clock_;
00393         Timer * timer_;
00394         Debug * debug_;
00395 
00396         Radio& radio() {
00397             return *radio_;
00398         }
00399 
00400         Clock& clock() {
00401             return *clock_;
00402         }
00403 
00404         Timer& timer() {
00405             return *timer_;
00406         }
00407 
00408         Debug& debug() {
00409 #ifdef SHAWN
00410          debug_->debug("\n");
00411 #endif
00412 
00413             return *debug_;
00414         }
00415 
00416     };
00417 }
00418 
00419 #endif   /* AGGREGATION_H */
00420 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines