Wiselib
wiselib.testing/radio/fragmenting/volumeradio.h
Go to the documentation of this file.
00001 /* 
00002  * File:   volumeradio.h
00003  * Author: amaxilatis
00004  *
00005  * Created on August 2, 2010, 1:15 PM
00006  */
00007 
00008 #ifndef _VOLUMERADIO_H
00009 #define  _VOLUMERADIO_H
00010 
00011 //wiselib includes
00012 #include "util/delegates/delegate.hpp"
00013 #include "util/pstl/vector_static.h"
00014 #include "util/pstl/pair.h"
00015 
00016 #include "../radio_base.h"
00017 
00018 // volume message type include
00019 #include "volumemsg.h"
00020 
00021 /*
00022  * DEBUG MESSAGES TEMPLATE
00023  * VolumeRadio::<task> [ type= ...]
00024  *
00025  * */
00026 #define DEBUG_VOLUMERADIO
00027 
00028 namespace wiselib {
00029 
00030     /*
00031      * ReliableRadio Template
00032      * Uses OsModel, Radio and Timer
00033      * Degug is only for debugging
00034      * ReliableRadio is used as a layer between normal radio
00035      * and application to make sure that messages will be
00036      * delivered besides any errors that may occur.
00037      * */
00038     template<typename OsModel_P, typename Radio_P, typename Timer_P,
00039     typename Debug_P>
00040     class VolumeRadio
00041     : public RadioBase<OsModel_P,Radio_P>{
00042     public:
00043         // Type definitions
00044         typedef OsModel_P OsModel;
00045 
00046         typedef Radio_P Radio;
00047 
00048         typedef Debug_P Debug;
00049         typedef Timer_P Timer;
00050 
00051         typedef typename Radio::node_id_t node_id_t;
00052         typedef typename Radio::size_t size_t;
00053         typedef typename Radio::block_data_t block_data_t;
00054         typedef typename Radio::message_id_t message_id_t;
00055 
00056 
00057 
00058 
00059 
00060 
00061         typedef VolumeRadio<OsModel_P, Radio_P, Timer_P, Debug_P> self_t;
00062 
00063 
00064         typedef VolumeMsg<OsModel, Radio > VolumeMessage_t;
00065 
00066         /*
00067          * struct that describes a message buffer
00068          * contains the sequence number
00069          * the source of the message
00070          * and data to show if receiving is complete
00071          */
00072         struct information {
00073             uint16_t seq_no;
00074             node_id_t source;
00075             uint8_t fragments_received;
00076             uint8_t total_fragments;
00077         };
00078 
00079         typedef struct information information_t;
00080 
00081 
00082         // radio callbacks delegate
00083         typedef delegate3<void, int, long, unsigned char*> radio_delegate_t;
00084 
00085         // --------------------------------------------------------------------
00086 
00087         enum SpecialNodeIds {
00088             BROADCAST_ADDRESS = Radio::BROADCAST_ADDRESS, 
00089             NULL_NODE_ID = Radio::NULL_NODE_ID
00091         };
00092         // --------------------------------------------------------------------
00093 
00094         enum Restrictions {
00095             FRAGMENT_SIZE = VolumeMessage_t::FRAGMENT_SIZE,
00096             MAX_MESSAGE_LENGTH = VolumeMessage_t::MAX_MESSAGE_LENGTH
00098         };
00099 
00100         // Vector Storing all messages received but not finished yet
00101         typedef wiselib::vector_static<OsModel,
00102         pair<information_t, uint8_t* >, MAX_PENDING> vector_t;
00103 
00104         // --------------------------------------------------------------------
00105 
00106         VolumeRadio() {
00107         }
00108         ;
00109 
00110         ~VolumeRadio() {
00111         }
00112         ;
00113 
00114 
00115         // Enable the Radio
00116 
00117         void enable_radio() {
00118 #ifdef DEBUG_VOLUMERADIO
00119             debug().debug("VolumeRadio::enable\n");
00120 #endif
00121 
00122             //set the local os pointer
00123             
00124             //enable normal radio
00125             radio().enable_radio();
00126             // register receive callback to normal radio
00127             recv_callback_id_ = radio().template reg_recv_callback<self_t,
00128                     &self_t::receive > ( this);
00129 
00130             // initialize the buffers for pending messages
00131             for (int i = 0; i < MAX_PENDING; i++) {
00132                 pending_messages_[i].first.seq_no = 0; // seq_no ==  0 means unused
00133                 pending_messages_[i].second = new uint8_t[MAX_MESSAGE_LENGTH]; // set the buffer size
00134 
00135             }
00136 
00137             // initialize the connections array
00138             for (int i = 0; i < MAX_CONNECTIONS_; i++) {
00139                 open_connections[i].conn_id = -1; // -1 means connection unused
00140                 open_connections[i].sequence_numbers_ = 1; // sequence numbers start from 1 ( 0 means unused see above)
00141             }
00142 
00143             // ??? only for broadcast messages ???
00144             seq_numbers_ = 1;
00145 
00146 
00147 
00148 
00149         }
00150         ;
00151 
00152         // --------------------------------------------------------------------
00153 
00154         
00155         // Send a message the application has requested
00156 
00157         void send( node_id_t id, uint16_t len, block_data_t *data) {
00158 
00159             block_data_t local_data[len];
00160 
00161             
00162 
00163             // check for an open connection to the destination
00164             int connection = check_connection(id);
00165             // get the next available sequence number 
00166             uint16_t seq_no = open_connections[connection].sequence_numbers_++;
00167             // if the next sequence number is 0 then set it as 1
00168             if (open_connections[connection].sequence_numbers_ == 0) open_connections[connection].sequence_numbers_++;
00169 
00170 #ifdef DEBUG_VOLUMERADIO
00171             debug().debug( "VolumeRadio::send [node %d |to %d |len %d |seq_no %d ", radio().id(), id, len, seq_no);
00172 #endif
00173 
00174             // create a new volume message
00175             VolumeMessage_t m = VolumeMessage_t();
00176             // initialize the message
00177 
00178             if (len>VolumeMessage_t::FRAGMENT_SIZE){
00179                 m.set_msg_id(VolumeMessage_t::VOLUME_MESSAGE);
00180             }
00181             else{
00182                 m.set_msg_id(VolumeMessage_t::SINGLE_MESSAGE);
00183             }
00184 
00185             m.set_seq_number(seq_no);
00186             
00187             uint8_t fragments = len/VolumeMessage_t::FRAGMENT_SIZE + 1;
00188 #ifdef DEBUG_VOLUMERADIO
00189             debug().debug( "split_count= %d ]\n", fragments);
00190 #endif
00191             m.set_fragments(fragments);
00192             // Send all fragments of the message
00193             uint8_t i;
00194             for (i = 0 ; i < (fragments-1); i++) {
00195                 // create a temp buffer for the message
00196                 m.set_fragment_id(i);
00197                 m.set_payload(FRAGMENT_SIZE,local_data+(i*FRAGMENT_SIZE));
00198 
00199                 // send the fragment
00200                 radio().send(id, m.buffer_size(), (uint8_t * ) &m);
00201 #ifdef DEBUG_VOLUMERADIO
00202                 debug().debug( "VolumeRadio::send [%d |dest= %d |seq_no %d |fr_no %d |tot_fr %d |size= %d |...]\n",
00203                     m.msg_id(),
00204                     id,
00205                     m.seq_number(),
00206                     m.fragment_id(),
00207                     m.fragments(),
00208                     m.buffer_size()
00209 
00210                         );
00211 #endif
00212                                                       
00213             }
00214             m.set_fragment_id(i);
00215             m.set_payload(len%FRAGMENT_SIZE,data+(i*FRAGMENT_SIZE));
00216             radio().send(id, (size_t)m.buffer_size(), (uint8_t * ) &m);
00217 #ifdef DEBUG_VOLUMERADIO
00218                 debug().debug( "VolumeRadio::send [%d |dest= %d |seq_no %d |fr_no %d |tot_fr %d |size= %d |...]\n",
00219                     m.msg_id(),
00220                     id,
00221                     m.seq_number(),
00222                     m.fragment_id(),
00223                     m.fragments(),
00224                     m.buffer_size()
00225 
00226                         );
00227 #endif
00228 
00229            
00230             
00231         }
00232         ;
00233 
00234 
00235 
00236         // --------------------------------------------------------------------
00237 
00238         /*
00239          * Callback from the Radio module
00240          *
00241          * when a new message is received check its type:
00242          * - Ack_Message : mark as received all containing sequence numbers
00243          * - Broadcast message : send to the application
00244          * - ReliableMessage : send ack , and forward message to the application
00245          *
00246          *
00247          */
00248         void receive(node_id_t from, size_t len, block_data_t* data_t) {
00249             // do not receive own messages
00250             if (radio().id() == from) {
00251                 debug().debug( "message heard\n");
00252                 return;
00253             }
00254             VolumeMessage_t mrecv;
00255             memcpy(&mrecv,data_t,len);
00256             
00257             if (mrecv.msg_id() == VolumeMessage_t::VOLUME_MESSAGE) {
00258 
00259 #ifdef DEBUG_VOLUMERADIO
00260                 debug().debug( "VolumeRadio::receive [%d |from %d |seq_no %d |fr_no %d |tot_fr %d |size %d |...]\n",
00261                         mrecv.msg_id(),
00262                         from,
00263                         mrecv.seq_number(),
00264                         mrecv.fragment_id(),
00265                         mrecv.fragments(),
00266                         len);
00267 #endif
00268 
00269                 // get all the information from the header
00270                 node_id_t source = from;
00271                 uint16_t seq_no = mrecv.seq_number();
00272                 uint8_t fragment_num = mrecv.fragment_id();
00273                 uint8_t total_frags = mrecv.fragments();
00274 
00275 
00276                 // local buffer for payload fragment
00277                 uint8_t t_buff[mrecv.payload_size()];
00278 
00279                 memcpy(t_buff,mrecv.payload(),mrecv.payload_size());
00280                 //t_buff = VolumeMessage_t::strip_message(data);
00281 
00282                 // find the buffer dedicated to this message
00283                 int fd = flow_id(seq_no, source, total_frags);
00284 
00285                 if (fd != -1) {
00286 #ifdef DEBUG_VOLUMERADIO
00287                     debug().debug( "VolumeRadio::receive pending_message %d frag_no %d\n", fd, fragment_num);
00288 #endif
00289 
00290                     // add fragment to the buffer. if finished set the flag
00291                     bool finished = add_fragment_to_message(fd, fragment_num, t_buff);
00292 
00293                     // if finished forward to the application
00294                     if (finished) {
00295 #ifdef DEBUG_VOLUMERADIO
00296                         debug().debug( "VolumeRadio::receive pending_message %d completed %d \n", fd , );
00297 #endif
00298                         notify_receivers(from, (FRAGMENT_SIZE) * total_frags, pending_messages_[fd].second);
00299                     }
00300 
00301 
00302                 } else {
00303 #ifdef DEBUG_VOLUMERADIO
00304                     debug().debug( "VolumeRadio::receive vector is full\n");
00305 #endif
00306                 }
00307 
00308 
00309             } else if (mrecv.msg_id() == VolumeMessage_t::SINGLE_MESSAGE) {
00310                 
00311 #ifdef DEBUG_VOLUMERADIO
00312                 debug().debug( "VolumeRadio::receive [%d |from %d |seq_no %d |fr_no %d |tot_fr %d |...]\n",
00313                         mrecv.msg_id(),
00314                         from,
00315                         mrecv.seq_number(),
00316                         mrecv.fragment_id(),
00317                         mrecv.fragments()
00318                         );
00319 #endif
00320                 // get the payload from the buffer
00321                 uint8_t t_buff[mrecv.payload_size()];
00322 
00323                 memcpy(t_buff,mrecv.payload(),mrecv.payload_size());
00324                 //uint8_t * t_buff = new uint8_t[len - VolumeMessage_t::header_size()];
00325                 //t_buff = VolumeMessage_t::strip_message(data);
00326                 // forward message to the application
00327                 
00328 #ifdef DEBUG_VOLUMERADIO
00329                 debug().debug( "VolumeRadio::message_received_callback\n");
00330 #endif
00331                 notify_receivers(from, mrecv.payload_size() , t_buff);
00332                 
00333 
00334             }
00335              
00336         }
00337         ;
00338 
00339         // checks the vector of messages to see if the first fragment.
00340         // if not returns the id of the vector position
00341         // if yes allocates an available buffer and initializes data
00342         // if yes && buffers full returns -1
00343 
00344         int flow_id(uint16_t seq_no, node_id_t source, uint8_t total_frags) {
00345             int fd = -1;
00346 
00347             // search vector for message and a free slot
00348             for (int i = 0; i < MAX_PENDING; i++) {
00349                 if ((seq_no == pending_messages_[i].first.seq_no) && (source == pending_messages_[i].first.source))
00350                     return i;
00351                 if ((pending_messages_[i].first.seq_no == 0) && (fd == -1))
00352                     fd = i;
00353             }
00354             if (fd != -1) {
00355                 // if does not exist but we have an empty buffer allocate it
00356                 pending_messages_[fd].first.seq_no = seq_no;
00357                 pending_messages_[fd].first.source = source;
00358                 pending_messages_[fd].first.fragments_received = 0;
00359                 pending_messages_[fd].first.total_fragments = total_frags;
00360             }
00361             // if not return -1
00362             return fd;
00363 
00364         };
00365 
00366         // add a fragment to the payload buffer and returns if finished
00367 
00368         bool add_fragment_to_message(int fd, uint8_t fragment_num, uint8_t * data) {
00369             // copy data to local memory
00370             memcpy(pending_messages_[fd].second + FRAGMENT_SIZE*fragment_num, data, FRAGMENT_SIZE);
00371             // inrease fragments received
00372             pending_messages_[fd].first.fragments_received++;
00373             // if the last fragment clear the buffer
00374             if (pending_messages_[fd].first.fragments_received ==
00375                     pending_messages_[fd].first.total_fragments) {
00376                 pending_messages_[fd].first.seq_no = 0;
00377                 // return message is complete
00378                 return true;
00379             } else {
00380                 // return message is incomplete
00381                 return false;
00382             }
00383 
00384         };
00385         // --------------------------------------------------------------------
00386 
00387         // Check if a connection with the sender was established before
00388 
00389         int check_connection(node_id_t node) {
00390             int avail = -1;
00391             for (int i = 0; i < MAX_CONNECTIONS_; i++) {
00392                 if (open_connections[i].conn_id == node) {
00393                     return i;
00394                 } else if (open_connections[i].conn_id == -1) {
00395                     if (avail == -1) {
00396                         avail = i;
00397                     }
00398                 }
00399             }
00400             // if the first connection get a new entry from the table
00401             if (avail != -1) {
00402 #ifdef DEBUG_VOLUMERADIO
00403                 debug().debug( "VolumeRadio::connection Opened Connection %d to node %d\n", radio().id(), node);
00404 #endif
00405                 open_connections[avail].conn_id = node;
00406                 open_connections[avail].sequence_numbers_ = 1;
00407 
00408             } else {
00409 #ifdef DEBUG_VOLUMERADIO
00410                 debug().debug( "VolumeRadio::connection Out of connections node= %d\n", radio().id());
00411 #endif
00412             }
00413             // return the connection id
00414             return avail;
00415         };
00416 
00417         // --------------------------------------------------------------------
00418 
00419 
00420         // returns the node's id
00421 
00422         node_id_t id() {
00423             return radio().id();
00424         }
00425         // --------------------------------------------------------------------
00426 
00427 
00428         /*
00429          initialize the module
00430          */
00431         void init(Radio& radio, Timer& timer, Debug& debug) {
00432             radio_ = &radio;
00433             timer_ = &timer;
00434             debug_ = &debug;
00435         };
00436 
00437 
00438     private:
00439         int recv_callback_id_; // callback for receive function
00440         radio_delegate_t message_received_callback_; // callback for application's receive function
00441 
00442 
00443         vector_t pending_messages_; // contains the buffers for the pending messages
00444 
00445         static const int MAX_CONNECTIONS_ = 40; // maximum available connections
00446         uint16_t seq_numbers_; // sequence number for messages
00447 
00448         struct connections {
00449             node_id_t conn_id;
00450             uint16_t sequence_numbers_;
00451         } open_connections[MAX_CONNECTIONS_];
00452 
00453 
00454 
00455 
00456         Radio * radio_;
00457         Timer * timer_;
00458         Debug * debug_;
00459 
00460         Radio& radio() {
00461             return *radio_;
00462         }
00463 
00464         Timer& timer() {
00465             return *timer_;
00466         }
00467 
00468         Debug& debug() {
00469             return *debug_;
00470         }
00471     };
00472 
00473 }
00474 
00475 
00476 
00477 
00478 #endif   /* _VOLUMERADIO_H */
00479 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines