Wiselib
wiselib.testing/algorithms/e2ec/e2ec.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  ** This file is part of the generic algorithm library Wiselib.           **
00003  ** Copyright (C) 2008,2009 by the Wisebed (www.wisebed.eu) project.      **
00004  **                                                                       **
00005  ** The Wiselib is free software: you can redistribute it and/or modify   **
00006  ** it under the terms of the GNU Lesser General Public License as        **
00007  ** published by the Free Software Foundation, either version 3 of the    **
00008  ** License, or (at your option) any later version.                       **
00009  **                                                                       **
00010  ** The Wiselib is distributed in the hope that it will be useful,        **
00011  ** but WITHOUT ANY WARRANTY; without even the implied warranty of        **
00012  ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         **
00013  ** GNU Lesser General Public License for more details.                   **
00014  **                                                                       **
00015  ** You should have received a copy of the GNU Lesser General Public      **
00016  ** License along with the Wiselib.                                       **
00017  ** If not, see <http://www.gnu.org/licenses/>.                           **
00018  ***************************************************************************/
00019 #ifndef __ALGORITHMS_END_TO_END_C_END_TO_END_COMMUNICATION_H__H__
00020 #define __ALGORITHMS_END_TO_END_C_END_TO_END_COMMUNICATION_H__H__
00021 
00022 #include "algorithms/e2ec/e2ec_message.h"
00023 #include "algorithms/neighbor_discovery/echo.h"
00024 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h"
00025 
00026 #include "algorithms/cluster/clustering_types.h"
00027 #include "util/pstl/vector_static.h"
00028 #include "util/pstl/priority_queue.h"
00029 #include "util/pstl/pair.h"
00030 #include "util/pstl/map_static_vector.h"
00031 #include "internal_interface/routing_table/routing_table_static_array.h"
00032 #include "util/delegates/delegate.hpp"
00033 
00034 #include "algorithms/cluster/fronts/fronts_core.h"
00035 
00036 #include "util/serialization/serialization.h"
00037 #include "util/base_classes/routing_base.h"
00038 #include "algorithms/cluster_radio/cluster_radio.h"
00039 
00040 
00041 //#define DEBUG_E2EC
00042 
00043 #define TIMEOUT 2000
00044 
00045 namespace wiselib {
00046 
00050 template<typename OsModel_P,
00051      typename Radio_P,
00052      typename Neighbor_Discovery_P = wiselib::Echo<OsModel_P, Radio_P, typename OsModel_P::Timer, typename OsModel_P::Debug>,
00053      typename Cluster_P = wiselib::FrontsCore<OsModel_P, Radio_P, wiselib::AtributeClusterHeadDecision<OsModel_P, Radio_P>, wiselib::BfsJoinDecision<OsModel_P, Radio_P>, wiselib::FrontsIterator<OsModel_P, Radio_P> > >
00054     class EndToEndCommunication : public RoutingBase<OsModel_P, Radio_P> {
00055 public:
00056         
00057      typedef OsModel_P OsModel;
00058 
00059      typedef Radio_P Radio;
00060      typedef typename OsModel_P::Debug Debug;
00061      typedef typename OsModel_P::Clock Clock;
00062      typedef typename OsModel::Timer Timer;
00063      typedef typename OsModel::Rand Rand;
00064      typedef typename Radio::node_id_t node_id_t;
00065      typedef typename Radio::size_t size_t;
00066      typedef typename Radio::block_data_t block_data_t;
00067      typedef typename Radio::message_id_t message_id_t;
00068      typedef typename Timer::millis_t millis_t;
00069      typedef typename Clock::time_t time_t;
00070 
00071      typedef Cluster_P Cluster;
00072      typedef Neighbor_Discovery_P NeighborDiscovery;
00073      typedef Neighbor_Discovery_P nb_t;
00074    
00075 
00076      typedef wiselib::E2ecMessage<OsModel, Radio> CommunicationMsg_t;
00077      typedef EndToEndCommunication<OsModel, Radio, NeighborDiscovery, Cluster> self_type;
00078 
00079      typedef self_type* self_pointer_t;
00080 
00081      typedef E2ecMessage<OsModel, Radio> E2ecMsg_t;
00082 
00083      typedef typename wiselib::ClusterRadio<OsModel, Radio, nb_t, Cluster> cluster_radio_t;
00084 
00085 
00086      struct rrqs {
00087          node_id_t source;
00088          node_id_t dest;
00089          uint8_t seqno;
00090          time_t time;
00091      };
00092 
00093      struct route {
00094          node_id_t dest;
00095          uint8_t seqno;
00096          time_t time;
00097          uint8_t hops;
00098          node_id_t nodes[10];
00099      };
00100 
00101      struct pending_message {
00102          E2ecMsg_t msg;
00103          time_t time;
00104      };
00105 
00106      typedef wiselib::vector_static<OsModel, struct pending_message, 10> pending_messages_t;
00107      typedef typename pending_messages_t::iterator pending_messages_it;
00108 
00109      typedef wiselib::vector_static<OsModel, node_id_t, 3> robots_t;
00110      typedef typename robots_t::iterator robots_it;
00111 
00112      typedef wiselib::vector_static<OsModel, struct rrqs, 10> rrq_t_t;
00113      typedef typename rrq_t_t::iterator rrq_t_it;
00114 
00115      typedef wiselib::vector_static<OsModel, struct route, 5> routes_t;
00116      typedef typename routes_t::iterator routes_it;
00117 
00118      // --------------------------------------------------------------------
00119      enum SpecialNodeIds {
00120           BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 
00121           NULL_NODE_ID      = Radio::NULL_NODE_ID      
00122      };
00123      // --------------------------------------------------------------------
00124      enum Restrictions {
00125           MESSAGE_SIZE = E2ecMsg_t::MAX_PAYLOAD_LENGTH   
00126      };
00127 
00128      enum{
00129           E2EC_MESSAGE = 111,
00130      };
00131 
00132      enum ErrorCodes{
00133           SUCCESS = OsModel::SUCCESS,
00134           ERR_UNSPEC = OsModel::ERR_UNSPEC
00135      };
00136         // --------------------------------------------------------------------
00139         EndToEndCommunication() {}
00140         ~EndToEndCommunication() {}
00142 
00145         void enable_radio();
00146         void disable_radio();
00148 
00151 
00153          void send(node_id_t receiver, size_t len, block_data_t *data );
00154          void send(E2ecMsg_t *msg);
00155          int send_via_robot(E2ecMsg_t *msg);
00156          void radio_receive( node_id_t receiver, size_t len, block_data_t *data );
00157          void arriving_robot(uint8_t event, node_id_t from, uint8_t len, uint8_t* data);
00158          bool is_robot(node_id_t address);
00159          
00160          int init( Radio& , Timer& , Clock& , Debug& , Rand& , Cluster& ,NeighborDiscovery& , cluster_radio_t&);
00161          void destruct() {}
00162 
00163          
00164          void add_robot_id(node_id_t id) {
00165              robots.push_back(id);
00166          }
00167 
00168          char *sprint_payload(E2ecMsg_t *msg) {
00169 
00170             static char str[60];
00171             int bytes_written=0;
00172             for(int i=0; i<msg->payload_size() ;i++) {
00173                 bytes_written+=sprintf(str + bytes_written," %x",*(msg->payload() + i));
00174             }
00175             str[bytes_written]='\0';
00176 
00177             return str;
00178          }
00179 
00180          struct pending_message createMsg(E2ecMsg_t msg) {
00181              struct pending_message pm;
00182              pm.msg = msg;
00183              pm.time = clock().time();
00184 
00185              return pm;
00186          }
00187 
00188          void addRoute(E2ecMsg_t* msg) {
00189 
00190              struct route rt;
00191              rt.dest = msg->dest();
00192              rt.hops = msg->hops();
00193              memcpy(rt.nodes,msg->payload(),(msg->hops()+1)*sizeof(node_id_t));
00194              rt.seqno = msg->seq_no();
00195              rt.time = clock().time();
00196 
00197              for(routes_it
00198                      it = routes.begin();
00199                      it != routes.end();
00200                      it++) {
00201                  if (it->dest == msg->dest()
00202                          ) {
00203                      if (it->seqno > msg->seq_no()) {
00204                          return;
00205                      }
00206 //                     debug().debug("replacing with better route");
00207                      routes.erase(it);
00208                      routes.push_back(rt);
00209                      return;
00210                  }
00211              }
00212              routes.push_back(rt);
00213          }
00214 
00215          routes_it findRoute(node_id_t dest) {
00216              for(routes_it
00217                      it = routes.begin();
00218                      it != routes.end();
00219                      it++) {
00220                  if (it->dest == dest) {
00221                      return it;
00222                  }
00223              }
00224 
00225              return routes.end();
00226          }
00227 
00228          int addRrq(E2ecMsg_t msg) {
00229              struct rrqs rrq;
00230              rrq.dest = msg.dest();
00231              rrq.source = msg.source();
00232              rrq.seqno = msg.seq_no();
00233              rrq.time = clock().time();
00234 
00235              for(rrq_t_it
00236                      it = rrq_messages.begin();
00237                      it != rrq_messages.end();
00238                      it++) {
00239                  if (it->dest == msg.dest()
00240                          && it->source == msg.source()
00241                          ) {
00242                      if (it->seqno == msg.seq_no()) {
00243                          return -1;
00244                      }
00245                      else {
00246                          return 1;
00247                      }
00248                  }
00249              }
00250 
00251              rrq_messages.push_back(rrq);
00252              return 0;
00253          }
00254 
00255          bool is_in_cluster(node_id_t dest) {
00256               //Check if the destination is within the cluster
00257               int cnodesNo = cluster().childs_count();
00258               node_id_t cnodes[cnodesNo];
00259               cluster().childs(cnodes);
00260 
00261               for (int i=0; i < cnodesNo ; i++) {
00262                   if (cnodes[i] ==  dest) {
00263                       return true;
00264                   }
00265               }
00266               return false;
00267          }
00268 
00269          void cluster_radio_receive(node_id_t from, size_t len, block_data_t *data) {
00270             E2ecMsg_t* msg;
00271             msg = (E2ecMsg_t*)data;
00272 
00273              //Only treat CommunicationMessages
00274             if(msg->msg_id() == E2ecMsg_t::E2EC_TYPE) {
00275 #ifdef DEBUG_E2EC
00276                     debug().debug ("E2ec::cluster_radio_receive::%x::%x:: E2EC_TYPE msg(%x,%x,%d) pl[%s]",tx_radio_->id(),get_next_hop(msg), msg->source(), msg->dest(), msg->seq_no(),sprint_payload(msg));
00277 #endif
00278 
00279                     if(radio().id() == msg->dest())
00280                     {
00281                         notify_receivers(msg->source(), 
00282                                 msg->payload_size() - (msg->hops()+1) * sizeof(node_id_t) ,
00283                                 msg->payload()+ (msg->hops()+1) * sizeof(node_id_t));
00284 
00285                     }
00286                     else
00287                         if (radio().id() == get_next_hop(msg)) {
00288                             send(msg);
00289                 }
00290                 else {
00291 #ifdef DEBUG_E2EC
00292                     debug().debug ("E2ec::send::%x::%x:: sending over cluster radio msg(%x,%x,%d) pl[%s]",tx_radio_->id(),get_next_hop(msg), msg->source(), msg->dest(), msg->seq_no(),sprint_payload(msg));
00293 #endif
00294                     clusterRadio().send(get_next_hop(msg), msg->buffer_size(),  (unsigned char *)&((*msg)));
00295                 }
00296 
00297             }
00298             else if (msg->msg_id() == E2ecMsg_t::E2EC_RRQ_TYPE) {
00299                 if ( addRrq(*msg) < 0 ) {
00300 #ifdef DEBUG_E2EC
00301 //                  debug().debug ("E2ec::cluster_radio_receive::%x::%x:: received E2EC_RRQ_TYPE INGORE",
00302 //                  tx_radio_->id(),from);
00303 #endif
00304                     return;
00305                 }
00306 #ifdef DEBUG_E2EC
00307                   debug().debug ("E2ec::cluster_radio_receive::%x::%x:: received E2EC_RRQ_TYPE msg(%x,%x,%d)",
00308                   tx_radio_->id(),from ,msg->source(), msg->dest(), msg->seq_no());
00309 #endif
00310 
00311 //                memcpy(msg->payload(),(void *)(radio().id()),sizeof(node_id_t));
00312                 node_id_t myid = radio().id();
00313                 write<OsModel, block_data_t, node_id_t > (msg->payload() + msg->payload_size(), myid);
00314                 msg->set_payload_size(msg->payload_size() + sizeof(node_id_t));
00315                 msg->set_hops(msg->hops()+1);
00316 
00317                 if(is_in_cluster(msg->dest()) || (msg->dest() == radio().id())) {
00318                     msg->set_msg_id(E2ecMsg_t::E2EC_RPL_TYPE);
00319 #ifdef DEBUG_E2EC
00320                   debug().debug ("E2ec::cluster_radio_receive::%x::%x:: sending E2EC_RPL_TYPE msg(%x,%x,%d)",
00321                   tx_radio_->id(), get_reverse_hop(msg), 
00322                   read<OsModel, block_data_t, node_id_t>(msg->payload()),
00323                   read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t) ), msg->seq_no());
00324 #endif
00325 
00326                     clusterRadio().send(get_reverse_hop(msg), msg->buffer_size(), (unsigned char *)msg);
00327                 }
00328                 else {
00329 #ifdef DEBUG_E2EC
00330                   debug().debug ("E2ec::cluster_radio_receive::%x::%x:: sending E2EC_RRQ_TYPE msg(%x,%x,%d)",
00331                   tx_radio_->id(),from ,msg->source(), msg->dest(), msg->seq_no());
00332 #endif
00333                     clusterRadio().send(clusterRadio().BROADCAST_ADDRESS, msg->buffer_size(), (unsigned char *)msg);
00334                 }
00335             }
00336             else if (msg->msg_id() == E2ecMsg_t::E2EC_RPL_TYPE) {
00337 #ifdef DEBUG_E2EC
00338                   debug().debug ("E2ec::cluster_radio_receive::%x::%x:: received E2EC_RPL_TYPE msg(%x,%x,%d)",
00339                   tx_radio_->id(),from ,msg->source(), msg->dest(), msg->seq_no());
00340 #endif
00341                 if (radio().id() == get_reverse_hop(msg)) {
00342                     addRoute(msg);
00343                     while(send_pending_messages(msg) == 1);
00344                 }
00345                 else if (radio().NULL_NODE_ID != get_reverse_hop(msg)){
00346 #ifdef DEBUG_E2EC
00347                   debug().debug ("E2ec::cluster_radio_receive::%x::%x:: sending E2EC_RPL_TYPE msg(%x,%x,%d)",
00348                   tx_radio_->id(),from ,get_reverse_hop(msg), msg->dest(), msg->seq_no());
00349 #endif
00350                     clusterRadio().send(get_reverse_hop(msg), msg->buffer_size(), (unsigned char *)msg);
00351                 }
00352                 else {
00353 #ifdef DEBUG_E2EC
00354                   debug().debug ("E2ec::cluster_radio_receive::%x::%x:: INVALID NEXT HOP DROPING E2EC_RPL_TYPE msg(%x,%x,%d)",
00355                   tx_radio_->id(),from ,get_reverse_hop(msg), msg->dest(), msg->seq_no());
00356 #endif
00357                     
00358                 }
00359             }
00360          }
00361 
00362          int send_pending_messages(E2ecMsg_t* msg) {
00363 //clusterRadio().present_neighbors();
00364 //return;
00365               for (pending_messages_it
00366                         it = pending_messages.begin();
00367                         it != pending_messages.end();
00368                         it++) {
00369 
00370                   if ((it->msg).dest() == msg->dest()) {
00371                       E2ecMsg_t data_to_send((it->msg).source(),(it->msg).dest(),(it->msg).seq_no());
00372 
00373                       data_to_send.set_hops(msg->hops());
00374                       data_to_send.set_msg_id(E2ecMsg_t::E2EC_TYPE);
00375 //                      msg->set_msg_id(E2ecMsg_t::E2EC_TYPE);
00376 
00377                       memcpy(data_to_send.payload(),
00378                       msg->payload(), ((msg->hops() + 1) * sizeof(node_id_t)));
00379 //                      data_to_send.set_payload(((msg->hops() + 1) * sizeof(node_id_t)),msg->payload());
00380                       data_to_send.set_payload_size(data_to_send.payload_size() + ((msg->hops() + 1) * sizeof(node_id_t)));
00381 
00382                       memcpy(data_to_send.payload() + data_to_send.payload_size(),
00383                       (it->msg).payload(), (it->msg).payload_size());
00384                       data_to_send.set_payload_size(data_to_send.payload_size() + (it->msg).payload_size());
00385 
00386 //                      memcpy(msg->payload() + ((msg->hops() + 1) * sizeof(node_id_t)), (it->msg).payload(), (it->msg).payload_size());
00387 //                      msg->set_payload_size(msg->payload_size() + (it->msg).payload_size());
00388 
00389 #ifdef DEBUG_E2EC
00390           debug().debug ("E2ec::send::%x::%x:: sending over cluster radio msg(%x,%d,%d) pl[%s]",
00391           tx_radio_->id(),get_next_hop(&data_to_send),data_to_send.dest(),data_to_send.msg_id(),data_to_send.payload_size(),
00392           sprint_payload(&data_to_send));
00393 #endif
00394           
00395 //                    clusterRadio().present_neighbors();
00396 
00397 //clusterRadio().send(get_next_hop(msg), msg->buffer_size(),  (unsigned char *)msg);
00398           clusterRadio().send(get_next_hop(&data_to_send), data_to_send.buffer_size(),  (unsigned char *)&data_to_send);
00399 
00400 //                        msg->set_payload_size(msg->payload_size() - (it->msg).payload_size())
00401                         pending_messages.erase(it);
00402                         return 1;
00403                    }
00404               }
00405 
00406               return 0;
00407 //              debug().debug("sending;;;;;;;;;");
00408          }
00409 
00410          node_id_t get_reverse_hop(E2ecMsg_t* msg) {
00411              for (int i=1; i<=msg->hops(); i++) {
00412                  if (read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*i) == radio().id()) {
00413                      return read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*(i-1));
00414                  }
00415              }
00416 
00417              if (read<OsModel, block_data_t, node_id_t>(msg->payload()) == radio().id()) {
00418                  return radio().id();
00419              }
00420              else {
00421                  return clusterRadio().NULL_NODE_ID;
00422              }
00423          }
00424 
00425          node_id_t get_next_hop(E2ecMsg_t* msg) {
00426              for (int i=0; i<msg->hops(); i++) {
00427                  if (read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*i) == radio().id()) {
00428                      return read<OsModel, block_data_t, node_id_t>(msg->payload() + sizeof(node_id_t)*(i+1));
00429                  }
00430              }
00431 
00432              if (read<OsModel, block_data_t, node_id_t>(msg->payload() + msg->hops()*sizeof(node_id_t)) == radio().id()) {
00433                  return radio().id();
00434              }
00435              else {
00436                  return clusterRadio().NULL_NODE_ID;
00437              }
00438 
00439          }
00440 
00441          void send_rrq(E2ecMsg_t msg) {
00442 
00443              E2ecMsg_t rrq_msg(msg.source(),msg.dest(), msg.seq_no());
00444              rrq_msg.set_msg_id(E2ecMsg_t::E2EC_RRQ_TYPE);
00445              node_id_t myid = radio().id();
00446         //                  memcpy(rrq_msg.payload(), (void *)&(myid), sizeof(node_id_t));
00447              write<OsModel, block_data_t, node_id_t > (rrq_msg.payload(), myid);
00448              rrq_msg.set_payload_size(rrq_msg.payload_size() + sizeof(node_id_t));
00449 
00450         #ifdef DEBUG_E2EC
00451              debug().debug ("E2ec::sendRrq::%x::%x:: sending over cluster radio msg(%x,%x,%d) pl[%s]",
00452              tx_radio_->id(), cluster_radio_t::BROADCAST_ADDRESS,
00453              rrq_msg.source(), rrq_msg.dest(), rrq_msg.seq_no()
00454                      ,sprint_payload(&rrq_msg));
00455         //                  clusterRadio().present_neighbors();
00456         #endif
00457              clusterRadio().send(cluster_radio_t::BROADCAST_ADDRESS, rrq_msg.buffer_size(), (unsigned char *)(&rrq_msg));
00458          }
00459 
00460 
00461          void cleanup_pending_messages() {
00462               for (pending_messages_it
00463                         it = pending_messages.begin();
00464                         it != pending_messages.end();
00465                         it++) {
00466                   if ( (clock().seconds(clock().time()) - clock().seconds(it->time)) > 5) {
00467 #ifdef DEBUG_E2EC
00468                       debug().debug("e2ec::cleanup::%x removing pending data message (%x,%x,%d)", radio().id(), it->msg.source(), it->msg.dest(), it->msg.seq_no());
00469 #endif
00470                       pending_messages.erase(it);
00471                       cleanup_pending_messages();
00472                       return;
00473                   }
00474 //                  else {
00475 //                      if (send_via_robot( &(it->msg) ) == 0) {
00476 //                          pending_messages.erase(it);
00477 //                          cleanup_pending_messages();
00478 //                          return;
00479 //                      }
00480 //                  }
00481               }
00482          }
00483 
00484          void cleanup_stale_rrq() {
00485               for (rrq_t_it
00486                         it = rrq_messages.begin();
00487                         it != rrq_messages.end();
00488                         it++) {
00489                   if ( (clock().seconds(clock().time()) - clock().seconds(it->time)) > 1) {
00490 #ifdef DEBUG_E2EC
00491                       debug().debug("e2ec::cleanup::%x removing pending rrq message (%x,%x,%d)", radio().id(), it->source, it->dest, it->seqno);
00492 #endif
00493 //                      rrq_messages.erase(it,it);
00494                       rrq_messages.erase(it);
00495                       cleanup_stale_rrq();
00496                       return;
00497                   }
00498               }             
00499          }
00500 
00501 
00502          void update_routes(void *a) {
00503               for (pending_messages_it
00504                         it = pending_messages.begin();
00505                         it != pending_messages.end();
00506                         it++) {
00507                   if (addRrq(it->msg) != 1 && (findRoute((it->msg).dest())==routes.end()) ) {
00508                       send_rrq(it->msg);
00509                   }
00510               }
00511 
00512               for (routes_it
00513                         it = routes.begin();
00514                         it != routes.end();
00515                         it++) {
00516                   send_rrq(E2ecMsg_t(radio().id(), it->dest, (it->seqno)+1));
00517               }
00518 
00519               if (pending_messages.size() == 0 
00520                       && (empty_q == false) ) {
00521                       debug().debug("E2EP;%d", pending_messages.size());
00522                       empty_q = true;
00523               }
00524 
00525               timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::update_routes>( 5000, this, 0 );
00526          }
00527 
00528          void cleanup(void *a) {
00529 
00530 
00531              cleanup_pending_messages();
00532 
00533              cleanup_stale_rrq();
00534 
00535 
00536               if (pending_messages.size() > 0) {
00537                   empty_q = false;
00538                   debug().debug("E2EP;%d", pending_messages.size());
00539               }
00540 
00541               timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::cleanup>( 1000, this, 0 );
00542          }
00543 
00544 protected:
00545      typename Radio::self_pointer_t tx_radio_;
00546      typename Debug::self_pointer_t debug_;
00547      typename Clock::self_pointer_t clock_;
00548      typename Timer::self_pointer_t timer_;
00549      typename Cluster::self_type* cluster_;
00550      typename Rand::self_pointer_t rand_;
00551 
00552      cluster_radio_t *ClusterRadio_;
00553      nb_t *neighbor_;
00554 
00555      pending_messages_t pending_messages;
00556      robots_t robots;
00557      rrq_t_t rrq_messages;
00558      routes_t routes;
00559 
00560      millis_t disconnected_node_timeout_;     
00561 
00562      int radio_recv_callback_id_;
00563 
00564      uint8_t cur_seq_no;
00565 
00566      uint32_t msg_received;
00567      bool empty_q;
00568 
00569      Radio& radio()
00570      {
00571           return *tx_radio_;
00572      }
00573      Timer& timer()
00574      {
00575           return *timer_;
00576      }
00577      Debug& debug()
00578      {
00579 #ifdef SHAWN
00580           debug_->debug("\n");
00581 #endif
00582           return *debug_;
00583      }
00584 
00585      Cluster& cluster()
00586      {
00587           return *cluster_;
00588      }
00589 
00590      Clock& clock()
00591      {
00592      return *clock_;
00593      }
00594 
00595      nb_t& neighbor_discovery(){
00596           return *neighbor_;       
00597      }
00598 
00599      cluster_radio_t& clusterRadio() {
00600          return *ClusterRadio_;
00601      }
00602 
00603 };
00604 
00605     // -----------------------------------------------------------------------
00606 
00607     template<typename OsModel_P,
00608      typename Radio_P,
00609      typename Neighbor_Discovery_P,
00610      typename Cluster_P>
00611     int
00612     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P,Cluster_P>::
00613     init (Radio& tx_radio, Timer& timer, Clock& clock, Debug& debug, Rand& rand, Cluster& cluster, NeighborDiscovery& neighbor, cluster_radio_t& clusterRadio) {
00614      tx_radio_ = &tx_radio;
00615      timer_ = &timer;
00616      clock_ = &clock;
00617      rand_ = &rand;
00618      neighbor_ = &neighbor;
00619      debug_ = &debug;
00620      cluster_= &cluster;
00621      ClusterRadio_ = &clusterRadio;
00622 
00623      cur_seq_no = 0;
00624      msg_received = 0;
00625 
00626 #ifdef DEBUG_E2EC
00627 //     debug().debug("E2Ec Algorithm: initialized");
00628 #endif
00629      return SUCCESS;
00630 }
00631 
00632 
00633 template<typename OsModel_P,
00634      typename Radio_P,
00635      typename Neighbor_Discovery_P,
00636      typename Cluster_P>
00637 void
00638 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00639 enable_radio()
00640 {
00641     empty_q = true;
00642 #ifdef DEBUG_E2EC
00643      debug().debug("E2Ec::enable_radio %x", tx_radio_->id());
00644 #endif
00645      radio().enable_radio();
00646      radio_recv_callback_id_ = radio().template reg_recv_callback<self_type, &self_type::radio_receive >(this);
00647 
00648      clusterRadio().template  reg_recv_callback<self_type, &self_type::cluster_radio_receive >(this);
00649 
00650 //     neighbor_discovery().register_payload_space( MOBILITY );
00651 
00652      uint8_t flags = nb_t::NEW_NB_BIDI|nb_t::DROPPED_NB;
00653      neighbor_discovery().template reg_event_callback<self_type, &self_type::arriving_robot>( MOBILITY, flags, this );
00654 
00655      timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::cleanup>( 5000, this, 0 );
00656      timer().template set_timer<EndToEndCommunication,&EndToEndCommunication::update_routes>( 5000, this, 0 );
00657 }
00658 
00659     // -----------------------------------------------------------------------
00660 
00661 template<typename OsModel_P,
00662      typename Radio_P,
00663      typename Neighbor_Discovery_P,
00664      typename Cluster_P>
00665 void
00666 EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00667 disable_radio(void) {
00668 #ifdef DEBUG_E2EC
00669      debug().debug("E2Ec::disable");
00670 #endif
00671      //Unregister callbacks
00672 //     neighbor_discovery().unregister_payload_space( MOBILITY );
00673 //     neighbor_discovery().unreg_recv_callback( MOBILITY );
00674      radio().unreg_recv_callback( radio_recv_callback_id_ );
00675 }
00676       
00677 // -----------------------------------------------------------------------
00678 
00679     template<typename OsModel_P,
00680      typename Radio_P,
00681      typename Neighbor_Discovery_P,
00682      typename Cluster_P>
00683     void
00684     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00685     send(node_id_t destination, size_t len, block_data_t *data) {
00686 
00687 
00688           cur_seq_no++;
00689           E2ecMsg_t msg(radio().id(), destination, cur_seq_no);
00690           msg.set_payload(len,data);
00691           send(&msg);
00692      }
00693 
00694     template<typename OsModel_P,
00695      typename Radio_P,
00696      typename Neighbor_Discovery_P,
00697      typename Cluster_P>
00698     void
00699     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00700     send(E2ecMsg_t *msg) {
00701 
00702           if (neighbor_discovery().is_neighbor_bidi(msg->dest())) {
00703 
00704 debug().debug ("E2E;%x;%d;%x",
00705         tx_radio_->id(), msg->msg_id(), msg->dest());
00706 
00707 #ifdef DEBUG_E2EC
00708 debug().debug ("E2ec::send::from%x::type%d::to%x:: sending to final destination msg(%x,%x,%d)",
00709         tx_radio_->id(), msg->msg_id(), msg->dest(), msg->source(), msg->dest(), msg->seq_no());
00710 #endif
00711                     radio().send(msg->dest(), msg->buffer_size(),  (unsigned char *)msg);
00712           }
00713           else if (cluster().is_cluster_head()) {
00714               //Check if the destination is within the cluster
00715               int cnodesNo = cluster().childs_count();
00716               node_id_t cnodes[cnodesNo];
00717               cluster().childs(cnodes);
00718 
00719               bool is_in_cluster = false;
00720               for (int i=0; i < cnodesNo ; i++) {
00721                   if (cnodes[i] ==  msg->dest()) {
00722                       is_in_cluster = true;
00723                   }
00724               }
00725 
00726               if (is_in_cluster) {
00727 
00728                     if ( cluster().get_next_node_to_child(msg->dest()) != radio().NULL_NODE_ID ) {
00729 debug().debug ("E2E;%x;%d;%x",
00730         tx_radio_->id(), msg->msg_id(), cluster().get_next_node_to_child(msg->dest()));
00731 #ifdef DEBUG_E2EC
00732                     debug().debug ("E2ec::send::%x::%x:: sending in to cluster msg(%x,%x,%d) pl[%s]",
00733                     tx_radio_->id(),cluster().get_next_node_to_child(msg->dest()),
00734                     msg->source(),msg->dest(),msg->payload_size(), sprint_payload(msg));
00735 #endif
00736                         radio().send(cluster().get_next_node_to_child(msg->dest()), msg->buffer_size(),  (unsigned char *)msg);
00737                     }
00738                     else
00739                     {
00740                         debug().debug ("E2E droping (not in cluster);%x;%d;%x",
00741                                 tx_radio_->id(), msg->msg_id(), cluster().get_next_node_to_child(msg->dest()));
00742                     }
00743 
00744 /*                    radio().send(msg->dest(), msg->buffer_size(),  (unsigned char *)msg);*/
00745               }
00746               else{
00747 
00748                   routes_it it = findRoute(msg->dest());
00749                   if ( it == routes.end()) {
00750                       pending_messages.push_back(createMsg(*msg));
00751                       if (addRrq(*msg) == 1 ){
00752                           return;
00753                       }
00754 
00755                       E2ecMsg_t rrq_msg(msg->source(),msg->dest(), msg->seq_no());
00756                       rrq_msg.set_msg_id(E2ecMsg_t::E2EC_RRQ_TYPE);
00757                       node_id_t myid = radio().id();
00758         //                  memcpy(rrq_msg.payload(), (void *)&(myid), sizeof(node_id_t));
00759                       write<OsModel, block_data_t, node_id_t > (rrq_msg.payload(), myid);
00760                       rrq_msg.set_payload_size(rrq_msg.payload_size() + sizeof(node_id_t));
00761 
00762         #ifdef DEBUG_E2EC
00763                       debug().debug ("E2ec::sendRrq::%x::%x:: sending over cluster radio msg(%x,%x,%d) pl[%s]",
00764                       tx_radio_->id(), cluster_radio_t::BROADCAST_ADDRESS,
00765                       rrq_msg.source(), rrq_msg.dest(), rrq_msg.seq_no()
00766                               ,sprint_payload(&rrq_msg));
00767         //                  clusterRadio().present_neighbors();
00768         #endif
00769                       clusterRadio().send(cluster_radio_t::BROADCAST_ADDRESS, rrq_msg.buffer_size(), (unsigned char *)(&rrq_msg));
00770         //                  send_via_robot(msg);
00771         //                  send_to_neighbor_clusters();                      
00772                   }
00773                   else
00774                   {
00775                       E2ecMsg_t data_to_send(msg->source(),msg->dest(), msg->seq_no());
00776 
00777                       data_to_send.set_hops(it->hops);
00778                       data_to_send.set_msg_id(E2ecMsg_t::E2EC_TYPE);
00779 
00780                       memcpy(data_to_send.payload(),
00781                       it->nodes, ((it->hops + 1) * sizeof(node_id_t)));
00782                       data_to_send.set_payload_size(data_to_send.payload_size() + ((it->hops + 1) * sizeof(node_id_t)));
00783 
00784                       memcpy(data_to_send.payload() + data_to_send.payload_size(),
00785                       msg->payload(), msg->payload_size());
00786                       data_to_send.set_payload_size(data_to_send.payload_size() + msg->payload_size());
00787 
00788 #ifdef DEBUG_E2EC
00789           debug().debug ("E2ec::send::%x::%x:: sending over cluster radio Route Cache Hit msg(%x,%d,%d) pl[%s]",
00790           tx_radio_->id(),get_next_hop(&data_to_send),data_to_send.dest(),data_to_send.msg_id(),data_to_send.payload_size(),
00791           sprint_payload(&data_to_send));
00792 #endif
00793 
00794           clusterRadio().send(get_next_hop(&data_to_send), data_to_send.buffer_size(),  (unsigned char *)&data_to_send);
00795                       
00796                       
00797                   }
00798 
00799               }
00800 
00801               //Send the message to the neighboring clusters
00802 //               radio().send(cluster().parent(), msg.buffer_size(), &msg);
00803           }
00804           else {
00805 
00806                     if ( cluster().get_next_node_to_child(msg->dest()) != radio().NULL_NODE_ID ) {
00807 debug().debug ("E2E;%x;%d;%x",
00808         tx_radio_->id(), msg->msg_id(), cluster().get_next_node_to_child(msg->dest()));
00809 
00810 #ifdef DEBUG_E2EC
00811                         debug().debug ("E2ec::send::%x::%x:: sending to child  msg(%x,%x,%d)",tx_radio_->id(),cluster().get_next_node_to_child(msg->dest()),msg->source(),msg->dest(),msg->seq_no());
00812 #endif
00813                         radio().send(cluster().get_next_node_to_child(msg->dest()), msg->buffer_size(),  (unsigned char *)msg);
00814                     }
00815                     else if ( (cluster().parent() != radio().NULL_NODE_ID)
00816                             && (cluster().parent() != 0xffff)) {
00817 debug().debug ("E2E;%x;%d;%x",
00818         tx_radio_->id(), msg->msg_id(), cluster().parent());
00819 
00820 #ifdef DEBUG_E2EC
00821                         debug().debug ("E2ec::send::%x::%x:: sending to cluster leader msg(%x,%x,%d)",tx_radio_->id(),cluster().parent(),msg->source(),msg->dest(),msg->seq_no());
00822 #endif
00823                         radio().send(cluster().parent(), msg->buffer_size(),  (unsigned char *)msg);
00824                     }
00825                     else {
00826 #ifdef DEBUG_E2EC
00827                         debug().debug ("E2ec::send::%x:: INGORING (possible cluster is not formed) msg(%x,%x,%d)",tx_radio_->id(),msg->source(),msg->dest(),msg->seq_no());
00828 #endif
00829                         
00830                     }
00831           }        
00832     }
00833 
00834     template<typename OsModel_P,
00835      typename Radio_P,
00836      typename Neighbor_Discovery_P,
00837      typename Cluster_P>
00838     void
00839     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00840     radio_receive( node_id_t from, size_t len, block_data_t *data ) {
00841 
00842 //        message_id_t msg_id = read<OsModel, block_data_t, message_id_t>( data );
00843 
00844         E2ecMsg_t* msg = (E2ecMsg_t*)data;
00845 
00846          //Only treat CommunicationMessages
00847         if( msg->msg_id() == E2ecMsg_t::E2EC_TYPE )
00848         {
00849 
00850             if (radio().id() == msg->dest()) {
00851                 msg_received++;
00852 #ifdef DEBUG_E2EC
00853                 debug().debug ("E2ec::radio_receive::%x::%x:: received message msg(%x,%x,%d) (total received: %d) payload [%s]",tx_radio_->id(),msg->source(),msg->dest(),msg->dest(),msg->seq_no(),msg_received,sprint_payload(msg));
00854 #endif
00855 
00856                 notify_receivers(msg->source(), msg->payload_size() - (msg->hops()+1) * sizeof(node_id_t) , msg->payload()+ (msg->hops()+1) * sizeof(node_id_t));
00857                 if (is_robot(from)) {
00858                 }
00859                 return;
00860             }
00861 
00862             send(msg);
00863 //            send(msg->dest(), msg->buffer_size(),  (unsigned char *)msg);
00864         }
00865 
00866     }
00867 
00868     template<typename OsModel_P,
00869      typename Radio_P,
00870      typename Neighbor_Discovery_P,
00871      typename Cluster_P>
00872     int
00873     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00874     send_via_robot(E2ecMsg_t *msg) {
00875         for (robots_it
00876                     it = robots.begin();
00877                     it != robots.end();
00878                     it++)
00879         {
00880             if (neighbor_discovery().is_neighbor(*it))
00881             {
00882 
00883                 radio().send(*it, msg->buffer_size(),  (unsigned char *)msg);
00884                 return 0;
00885             }
00886         }
00887         return 1;
00888     }
00889 
00890     template<typename OsModel_P,
00891      typename Radio_P,
00892      typename Neighbor_Discovery_P,
00893      typename Cluster_P>
00894     bool
00895     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00896     is_robot(node_id_t id) {
00897 
00898         for (robots_it
00899                     it = robots.begin();
00900                     it != robots.end();
00901                     it++) {
00902             if (id == *it) {
00903                 return true;
00904             }
00905         }
00906         return false;
00907     }
00908 
00909     template<typename OsModel_P,
00910      typename Radio_P,
00911      typename Neighbor_Discovery_P,
00912      typename Cluster_P>
00913     void
00914     EndToEndCommunication<OsModel_P, Radio_P, Neighbor_Discovery_P, Cluster_P>::
00915     arriving_robot(uint8_t event, node_id_t from, uint8_t len, uint8_t* data)
00916     {
00917 
00918           if ( nb_t::NEW_PAYLOAD == event ) {
00919               
00920           }
00921           else if ( nb_t::NEW_PAYLOAD_BIDI == event ) {
00922           }
00923           /*
00924            * +====+====+====+====++====+====++====+====++====+====+
00925            *  CMD  NODE TIME TIME  NODE NODE  SNBH SNBH
00926            */
00927           else if ( nb_t::NEW_NB == event ) {
00928           }
00929           else if ( nb_t::NEW_NB_BIDI == event ) {
00930               if (is_robot(from)) {
00931                   for (pending_messages_it
00932                             it = pending_messages.begin();
00933                             it != pending_messages.end();
00934                             it++) {
00935                       if ( (clock().seconds(clock().time()) - clock().seconds(it->time)) > 1) {
00936                           radio().send(from, it->msg.buffer_size(),  (unsigned char *)&((*it).msg));
00937 //                          debug().debug ("arriving_robot::%x::%x:: sending to robot msg(%x,%x,%d)",tx_radio_->id(),it->msg.dest(),it->msg.source(),it->msg.dest(),it->msg.seq_no());
00938 #ifdef DEBUG_E2EC
00939                           debug().debug ("E2ec::arriving_robot::%x::%x:: sending to robot msg(%x,%x,%d)",tx_radio_->id(),it->msg.dest(),it->msg.source(),it->msg.dest(),it->msg.seq_no());
00940 #endif
00941                       }
00942                   }
00943                   pending_messages.clear();
00944               }
00945           }
00946           else if ( nb_t::DROPPED_NB == event ) {
00947           }
00948           else if ( nb_t::LOST_NB_BIDI == event ) {
00949           }
00950 
00951       }
00952 
00953 }
00954 
00955 
00956 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines