Wiselib
|
00001 /*************************************************************************** 00002 ** This file is part of the generic algorithm library Wiselib. ** 00003 ** Copyright (C) 2008,2009 by the Wisebed (www.wisebed.eu) project. ** 00004 ** ** 00005 ** The Wiselib is free software: you can redistribute it and/or modify ** 00006 ** it under the terms of the GNU Lesser General Public License as ** 00007 ** published by the Free Software Foundation, either version 3 of the ** 00008 ** License, or (at your option) any later version. ** 00009 ** ** 00010 ** The Wiselib is distributed in the hope that it will be useful, ** 00011 ** but WITHOUT ANY WARRANTY; without even the implied warranty of ** 00012 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ** 00013 ** GNU Lesser General Public License for more details. ** 00014 ** ** 00015 ** You should have received a copy of the GNU Lesser General Public ** 00016 ** License along with the Wiselib. ** 00017 ** If not, see <http://www.gnu.org/licenses/>. ** 00018 ***************************************************************************/ 00019 #ifndef __ALGORITHMS_END_TO_END_COMMUNICATION_END_TO_END_COMMUNICATION_H__H__ 00020 #define __ALGORITHMS_END_TO_END_COMMUNICATION_END_TO_END_COMMUNICATION_H__H__ 00021 00022 #include "algorithms/e2e_no_highways/end_to_end_communication_msg.h" 00023 #include "algorithms/neighbor_discovery/echo.h" 00024 #include "algorithms/neighbor_discovery/pgb_payloads_ids.h" 00025 00026 #include "algorithms/cluster/highway/highway_stabilizing.h" 00027 #include "algorithms/cluster/clustering_types.h" 00028 #include "util/pstl/vector_static.h" 00029 #include "util/pstl/priority_queue.h" 00030 #include "util/pstl/pair.h" 00031 #include "util/pstl/map_static_vector.h" 00032 #include "internal_interface/routing_table/routing_table_static_array.h" 00033 #include "util/delegates/delegate.hpp" 00034 00035 #include "algorithms/cluster/fronts/fronts_core.h" 00036 #include "algorithms/cluster/modules/chd/attr_chd.h" 00037 #include "algorithms/cluster/modules/it/fronts_it.h" 00038 #include "algorithms/cluster/modules/jd/fronts_jd.h" 00039 00040 #include "util/serialization/serialization.h" 00041 #include "util/base_classes/routing_base.h" 00042 00043 00044 //#define USE_ROBOT 00045 00046 #define DISCONNECTED_NODE_TIMEOUT 1000 // In milliseconds 00047 00048 namespace wiselib { 00049 00053 template<typename OsModel_P, 00054 typename Radio_P, 00055 typename Timer_P = typename OsModel_P::Timer, 00056 typename Debug_P = typename OsModel_P::Debug, 00057 typename Neighbor_Discovery_P = wiselib::Echo<OsModel_P, Radio_P , typename OsModel_P::Timer_P, typename OsModel_P::Debug_P>, 00058 typename Cluster_P = wiselib::FrontsCore<OsModel_P, Radio_P, wiselib::AtributeClusterHeadDecision<OsModel_P, Radio_P>, wiselib::FrontsJoinDecision<OsModel_P, Radio_P>, wiselib::FrontsIterator<OsModel_P, Radio_P> > > 00059 class EndToEndCommunication : public RoutingBase<OsModel_P, Radio_P> { 00060 public: 00061 typedef OsModel_P OsModel; 00062 typedef Radio_P Radio; 00063 //typedef typename OsModel::Radio Radio; 00064 typedef typename OsModel::Timer Timer; 00065 typedef typename OsModel::Clock Clock; 00066 typedef typename OsModel::Debug Debug; 00067 typedef typename OsModel::Rand Rand; 00068 typedef Cluster_P Cluster; 00069 //typedef Neighbor_Discovery_P NeighborDiscovery; 00070 00071 00072 typedef wiselib::AtributeClusterHeadDecision<OsModel, Radio> CHD_t; 00073 typedef wiselib::FrontsJoinDecision<OsModel, Radio> JD_t; 00074 typedef wiselib::FrontsIterator<OsModel, Radio> IT_t; 00075 typedef wiselib::FrontsCore<OsModel, Radio, CHD_t, JD_t, IT_t> clustering_algo_t; 00076 typedef wiselib::StaticArrayRoutingTable<OsModel, Radio, 1000> RoutingTable; 00077 typedef wiselib::Echo<OsModel, Radio, Timer, Debug> nb_t; 00078 00079 typedef wiselib::CommunicationMessage<OsModel, Radio> CommunicationMsg_t; 00080 typedef EndToEndCommunication<OsModel, Radio, Timer, Debug, nb_t> self_type; 00081 00082 typedef self_type* self_pointer_t; 00083 typedef typename Radio::node_id_t node_id_t; 00084 typedef typename Radio::size_t size_t; 00085 typedef typename Radio::block_data_t block_data_t; 00086 typedef typename Radio::message_id_t message_id_t; 00087 typedef delegate3<void, node_id_t, size_t, block_data_t*> endToEnd_delegate_t; 00088 00089 typedef CommunicationMessage<OsModel, Radio> Communicationmsg_t; 00090 00091 // -------------------------------------------------------------------- 00092 enum SpecialNodeIds { 00093 BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 00094 NULL_NODE_ID = Radio::NULL_NODE_ID 00095 }; 00096 // -------------------------------------------------------------------- 00097 enum Restrictions { 00098 MESSAGE_SIZE = Communicationmsg_t::MAX_PAYLOAD_LENGTH 00099 }; 00100 enum{ 00101 END_TO_END_MESSAGE = 245, 00102 }; 00103 00104 enum ErrorCodes{ 00105 SUCCESS = OsModel::SUCCESS, 00106 ERR_UNSPEC = OsModel::ERR_UNSPEC 00107 }; 00108 // -------------------------------------------------------------------- 00111 EndToEndCommunication() {} 00112 ~EndToEndCommunication() {} 00114 00117 void enable_radio(); 00118 void disable_radio(); 00120 00123 00125 void send( node_id_t receiver, size_t len, block_data_t *data ); 00126 void on_receive (node_id_t from, size_t len, block_data_t *data ); 00127 bool is_in_cluster ( node_id_t nodeID ); 00128 void print_statistics(); 00129 void print_cluster_childs(); 00130 #ifdef USE_ROBOT 00131 void arriving_robot(uint8_t event, node_id_t from, uint8_t len, uint8_t* data); 00132 #endif 00133 00134 int init( Radio& tx_radio, Timer& timer, Clock& clock, Debug& debug, Rand& rand, Cluster& cluster ); 00137 template<class T, void (T::*TMethod)(node_id_t, size_t, block_data_t*)> 00138 uint8_t endToEnd_reg_recv_callback(T *obj_pnt) { 00139 endToEnd_recv_callback_ = endToEnd_delegate_t::template from_method<T, TMethod > ( obj_pnt ); 00140 return 0; 00141 } 00142 00143 void unreg_endToEnd_recv_callback() { 00144 endToEnd_recv_callback_ = endToEnd_delegate_t(); 00145 } 00146 00147 00148 00149 typename Radio::node_id_t id() 00150 { 00151 return tx_radio_->id(); 00152 } 00154 00155 void destruct() {} 00156 00157 00158 protected: 00159 typename Radio::self_pointer_t tx_radio_; 00160 typename Timer::self_pointer_t timer_; 00161 typename Debug::self_pointer_t debug_; 00162 typename Clock::self_pointer_t clock_; 00163 typename Rand::self_pointer_t rand_; 00164 typename Cluster::self_type* cluster_; 00165 00166 nb_t neighbor_discovery; 00167 //clustering_algo_t cluster_; 00168 00169 00170 CommunicationMsg_t comm_message; 00171 endToEnd_delegate_t endToEnd_recv_callback_; 00172 CHD_t CHD_; 00173 JD_t JD_; 00174 IT_t IT_; 00175 00176 // METRICS FOR END_TO_END COMMUNICATION 00177 00178 00179 00180 int RX_neigh ; //Messages received from neighbor nodes 00181 //Metrics for cluster heads 00182 00183 int RX_childs; //Messages received from nodes in the cluster 00184 int RX_highways; //Messages recevied from highways 00185 int RX_from_cluster_head; 00186 00187 int FW_childs; //Messages coming from nodes in the cluster and delivered to the final destination INSIDE THE SAME CLUSTER 00188 int FW_highways;//Messages coming from nodes in the cluster and delivered to the final destination IN ANOTHER CLUSTER 00189 00190 00191 int TX_neigh; //Messages generated and transmitted directly to the final destination (Cluster heads uses the same metric for messages generated by itself) 00192 int TX_cluster_head; //Messages generated and transmitted to the cluster head because the node is not a direct neighbor 00193 int TX_highways; //Messages generated by the cluster head and transmitted directly on the highways because the destination is not in the cluster 00194 int total_latency; 00195 int total_latency_from_cluster_head; 00196 00197 int radio_recv_callback_id_; 00198 00199 Radio& radio() 00200 { 00201 return *tx_radio_; 00202 } 00203 Timer& timer() 00204 { 00205 return *timer_; 00206 } 00207 Debug& debug() 00208 { 00209 return *debug_; 00210 } 00211 Clock& clock() 00212 { 00213 return *clock_; 00214 } 00215 00216 Cluster& cluster() 00217 { 00218 return *cluster_; 00219 } 00220 00221 }; 00222 00223 00224 00225 template<typename OsModel_P, 00226 typename Radio_P, 00227 typename Timer_P, 00228 typename Debug_P, 00229 typename Neighbor_Discovery_P, 00230 typename Cluster_P> 00231 int 00232 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00233 init (Radio& tx_radio, Timer& timer, Clock& clock, Debug& debug, Rand& rand, Cluster& cluster) { 00234 tx_radio_ = &tx_radio; 00235 timer_ = &timer; 00236 clock_ = &clock; 00237 rand_ = &rand; 00238 neighbor_discovery.init( *tx_radio_, *clock_, *timer_, *debug_ ); 00239 debug_ = &debug; 00240 cluster_= &cluster; 00241 RX_neigh = 0 ; //Messages received from neighbor nodes 00242 RX_childs = 0 ; 00243 RX_highways = 0 ; 00244 RX_from_cluster_head = 0; 00245 FW_childs = 0; 00246 FW_highways = 0; 00247 total_latency = 0; 00248 total_latency_from_cluster_head = 0; 00249 00250 TX_neigh = 0 ; 00251 TX_cluster_head = 0 ; 00252 TX_highways = 0 ; 00253 cluster_->set_cluster_head_decision( CHD_ ); 00254 // set the JoinDecision Module 00255 cluster_->set_join_decision( JD_ ); 00256 // set the Iterator Module 00257 cluster_->set_iterator( IT_ ); 00258 cluster_->init( *tx_radio_, *timer_, *debug_, *rand_, neighbor_discovery ); 00259 00260 //IMPROVE: Take the value upper as soon as more hops clustering is tested. 00261 cluster_->set_maxhops( 1 ); 00262 00263 debug_->debug("EndToEnd Algorithm: Successfully initialized module\n"); 00264 //debug().debug("Debug().debug: Node %x: TX_neigh value is %i\n" , radio().id(), TX_neigh); 00265 debug_->debug("Debug_->debug: Node %x: TX_neigh value is %i\n" , radio().id(), TX_neigh); 00266 00267 return SUCCESS; 00268 } 00269 00270 template<typename OsModel_P, 00271 typename Radio_P, 00272 typename Timer_P, 00273 typename Debug_P, 00274 typename Neighbor_Discovery_P, 00275 typename Cluster_P> 00276 void 00277 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00278 enable_radio() { 00279 #ifdef DEBUG 00280 debug_->debug("EndToEndCommunication boots on %x\n", tx_radio_->id()); 00281 #endif 00282 00283 radio().enable_radio(); 00284 radio_recv_callback_id_ = radio().template reg_recv_callback<self_type, &self_type::on_receive >(this); 00285 neighbor_discovery.enable(); 00286 } 00287 00288 // ----------------------------------------------------------------------- 00289 00290 template<typename OsModel_P, 00291 typename Radio_P, 00292 typename Timer_P, 00293 typename Debug_P, 00294 typename Neighbor_Discovery_P, 00295 typename Cluster_P> 00296 void 00297 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00298 disable_radio(void) { 00299 #ifdef DEBUG 00300 debug_->debug("Called EndToEndCommunication::disable\n"); 00301 #endif 00302 //Unregister callbacks 00303 neighbor_discovery->unregister_payload_space( MOBILITY ); 00304 neighbor_discovery->unreg_recv_callback( MOBILITY ); 00305 radio().unregister_recv_callback( radio_recv_callback_id_ ); 00306 00307 neighbor_discovery->disable(); 00308 radio().disable_radio(); 00309 } 00310 00311 // ----------------------------------------------------------------------- 00312 00313 template<typename OsModel_P, 00314 typename Radio_P, 00315 typename Timer_P, 00316 typename Debug_P, 00317 typename Neighbor_Discovery_P, 00318 typename Cluster_P> 00319 void 00320 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00321 send( node_id_t destination, size_t len, block_data_t *data ) { 00322 00323 CommunicationMsg_t* msg = (CommunicationMsg_t*)data; 00324 00325 // The send method must take into account just two cases: if the destination node is a direct neighbor, then the message must be sent immediately; otherwise the message must be sent toward the cluster leader. 00326 // IMPORTANT NOTICE: WE MUST ALSO CONSIDER THE CASE IN WHICH THE NODE WHICH IS SENDING THE MESSAGE IS A LEADER ITSELF! 00327 00328 // debug_->debug("Node %x: Trying to send a message to node %x\n", radio().id(), msg->dest()); 00329 00330 if (cluster().is_cluster_head()){ 00331 debug().debug("Node %x is a cluster leader\n", radio().id()); 00332 } 00333 else 00334 debug().debug("Node %x is not a cluster leader\n", radio().id()); 00335 00336 if (neighbor_discovery.is_neighbor_bidi(msg->dest())){ 00337 TX_neigh++; 00338 debug_->debug ("Node %x is a bidi neighbor of %x; sending message; sent a total of %i messages\n", msg->dest(), radio().id(), TX_neigh ); 00339 //debug_->debug("Node %x: TX_neigh value is %i\n" , radio().id(), TX_neigh); 00340 msg->set_timestamp(clock().milliseconds(clock().time())); 00341 radio().send(msg->dest(), len , data); 00342 } 00343 00344 else { 00345 TX_cluster_head++; 00346 debug_->debug ("Node %x: sending message to cluster leader %x\n", tx_radio_->id(), cluster().parent() ); 00347 //debug().debug("Node %x: TX_cluster_head = %i\n", radio().id(), TX_cluster_head); 00348 //comm_message.set_dest(cluster().parent()); 00349 msg->set_timestamp(clock().milliseconds(clock().time())); 00350 radio().send( cluster().parent(), len, data ); 00351 } 00352 } 00353 00354 00355 00356 template<typename OsModel_P, 00357 typename Radio_P, 00358 typename Timer_P, 00359 typename Debug_P, 00360 typename Neighbor_Discovery_P, 00361 typename Cluster_P> 00362 00363 void 00364 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00365 on_receive( node_id_t from, size_t len, block_data_t *data ) { 00366 message_id_t msg_id = read<OsModel, block_data_t, message_id_t>( data ); 00367 00368 00369 //Only treat CommunicationMessages 00370 if( msg_id == END_TO_END_MESSAGE ) 00371 { 00372 CommunicationMsg_t* msg = (CommunicationMsg_t*)data; 00373 00374 if( msg->dest() == tx_radio_->id() ) { 00375 // The message reached its destination => Notify registered receivers. 00376 notify_receivers( msg->source(), msg->payload_size(), msg->payload() ); 00377 debug_->debug("Node %x received an EndToEnd message from %x for itself with latency %d\n", radio().id(), from, (clock().milliseconds( clock().time()) - msg->timestamp())); 00378 if ((from == cluster().parent()) || (from == cluster().parent())){ 00379 RX_from_cluster_head++; 00380 total_latency_from_cluster_head += (clock().milliseconds( clock().time()) - msg->timestamp()); 00381 } 00382 else { 00383 RX_neigh++; 00384 total_latency += (clock().milliseconds( clock().time()) - msg->timestamp()); 00385 debug().debug("Node %x: Neigh RX messages = %i\n", radio().id(), RX_neigh); 00386 } 00387 } 00388 00389 else if (cluster().is_cluster_head()){ 00390 debug().debug("Cluster leader %x received a message from %x destined to %x\n", radio().id(), from, msg->dest()); 00391 RX_childs++; 00392 if (is_in_cluster( msg->dest()) || (neighbor_discovery.is_neighbor_bidi(msg->dest()))) { 00393 radio().send( msg->dest(), len, data ); 00394 FW_childs++; 00395 debug().debug("Node %x: Cluster leader forwarded a message for the destination %x \n",radio().id(), msg->dest());//, cluster_.parent); 00396 } 00397 00398 else 00399 debug().debug("Node %x: destination node is neither in my cluster nor a neighbor_bidi\n", radio().id()); 00400 TX_highways++; 00401 } 00402 00403 else 00404 debug().debug("Node %x: not interested in this message\n", radio().id()); 00405 00406 //ELSE WE MUST USE HIGHWAYS WHEN READY 00407 //All the other nodes, when receiving a message not for themselves must simply ignore the message 00408 00409 //TODO: Find the next hop of the message. 00410 00411 //tx_radio_->send( next_hop, msg->buffer_size(), (block_data_t*)&msg ); 00412 } 00413 } 00414 00415 00416 template<typename OsModel_P, 00417 typename Radio_P, 00418 typename Timer_P, 00419 typename Debug_P, 00420 typename Neighbor_Discovery_P, 00421 typename Cluster_P> 00422 bool 00423 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00424 is_in_cluster ( node_id_t nodeID ){ 00425 node_id_t cluster_members[cluster().childs_count()]; 00426 bool result = false; 00427 cluster().childs(cluster_members); 00428 for (uint8_t i = 0; i < cluster().childs_count(); i++){ 00429 // debug().debug("Node %x: node %x belongs to my cluster\n", radio().id(), cluster_members[i]); 00430 if ( cluster_members[i] == nodeID ){ 00431 result = true; 00432 debug().debug("Node %x: node %x is in my cluster!\n", radio().id(), nodeID); 00433 return result; 00434 } 00435 } 00436 debug().debug("Node %x: node %x is not in my cluster!\n", radio().id(), nodeID); 00437 return result; 00438 } 00439 00440 00441 template<typename OsModel_P, 00442 typename Radio_P, 00443 typename Timer_P, 00444 typename Debug_P, 00445 typename Neighbor_Discovery_P, 00446 typename Cluster_P> 00447 void 00448 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00449 print_cluster_childs(){ 00450 node_id_t cluster_members[cluster().childs_count()]; 00451 cluster().childs(cluster_members); 00452 for (uint8_t i = 0; i < cluster().childs_count(); i++){ 00453 debug().debug("Node %x: node %x belongs to my cluster\n", radio().id(), cluster_members[i]); 00454 } 00455 } 00456 00457 00458 template<typename OsModel_P, 00459 typename Radio_P, 00460 typename Timer_P, 00461 typename Debug_P, 00462 typename Neighbor_Discovery_P, 00463 typename Cluster_P> 00464 void 00465 EndToEndCommunication<OsModel_P, Radio_P, Timer_P, Debug_P, Neighbor_Discovery_P, Cluster_P>:: 00466 print_statistics (){ 00467 debug().debug("PRINTING METRICS OF NODE %x\n", radio().id()); 00468 debug().debug("\n ID = %x; \nRX_neigh = %i\n; RX_FromClusterHead = %i\n; RX_childs = %i\n; RX_highways = %i\n; FW_childs = %i\n; FW_highways = %i\n; TX_neigh = %i\n; TX_cluster_head = %i\n; TX_highways = %i\n; AvgLatency = %i\n, AvgLatencyClusterHead = %i\n", 00469 radio().id(),RX_neigh, RX_from_cluster_head, RX_childs, RX_highways, FW_childs, FW_highways, TX_neigh, TX_cluster_head, TX_highways, total_latency/RX_neigh, total_latency_from_cluster_head/RX_from_cluster_head); 00470 00471 } 00472 00473 00474 00475 } 00476 #endif 00477 00478 00479 00480 00481 00482 00483 00484 00485 00486 00487 00488 00489 00490 00491 00492 00493 00494 00495 00496 00497 00498 00499 00500