Contiki 2.5
convergence_layer.c
Go to the documentation of this file.
1 /**
2  * \addtogroup cl
3  *
4  * @{
5  */
6 
7 /**
8  * \file
9  * \brief IEEE 802.15.4 Convergence Layer Implementation
10  * \author Georg von Zengen <vonzeng@ibr.cs.tu-bs.de>
11  * \author Wolf-Bastian Poettner <poettner@ibr.cs.tu-bs.de>
12  */
13 
14 #include <string.h> // memset
15 
16 #include "contiki.h"
17 #include "packetbuf.h"
18 #include "netstack.h"
19 #include "rimeaddr.h"
20 #include "process.h"
21 #include "list.h"
22 
23 #include "agent.h"
24 #include "logging.h"
25 #include "storage.h"
26 #include "discovery.h"
27 #include "dtn_network.h"
28 #include "dispatching.h"
29 #include "bundleslot.h"
30 #include "statusreport.h"
31 
32 #include "convergence_layer.h"
33 
34 /**
35  * Structure to keep track of neighbours from which we are currently awaiting app-layer ACKs
36  */
37 struct blocked_neighbour_t {
38  struct blocked_neighbour_t * next;
39 
40  /* Address of the neighbour */
41  rimeaddr_t neighbour;
42 
43  /* Since when is he blocked? */
44  clock_time_t timestamp;
45 };
46 
47 /**
48  * List to keep track of outgoing bundles
49  */
50 LIST(transmission_ticket_list);
51 MEMB(transmission_ticket_mem, struct transmit_ticket_t, CONVERGENCE_LAYER_QUEUE);
52 
53 /**
54  * List to keep track of blocked neighbours
55  */
56 LIST(blocked_neighbour_list);
57 MEMB(blocked_neighbour_mem, struct blocked_neighbour_t, CONVERGENCE_LAYER_QUEUE);
58 
59 /**
60  * Internal functions
61  */
62 int convergence_layer_is_blocked(rimeaddr_t * neighbour);
63 int convergence_layer_set_blocked(rimeaddr_t * neighbour);
64 int convergence_layer_set_unblocked(rimeaddr_t * neighbour);
65 
66 /**
67  * CL process
68  */
69 PROCESS(convergence_layer_process, "CL process");
70 
71 /**
72  * MUTEX to avoid flooding the MAC layer with outgoing bundles
73  */
75 
76 /**
77  * Keep track of the allocated tickets
78  */
80 
81 /**
82  * Keep track of the tickets in queue
83  */
85 
86 /**
87  * Use a unique sequence number of each outgoing segment
88  */
90 
91 /**
92  * Indicate when a continue event or poll to ourselves is pending to avoid
93  * exceeding the event queue size
94  */
96 
97 /**
98  * Backoff timer
99  */
101 uint8_t convergence_layer_backoff_pending = 0;
102 
103 int convergence_layer_init(void)
104 {
105  // Start CL process
106  process_start(&convergence_layer_process, NULL);
107 
108  return 1;
109 }
110 
111 struct transmit_ticket_t * convergence_layer_get_transmit_ticket_priority(uint8_t priority)
112 {
113  struct transmit_ticket_t * ticket = NULL;
114 
116  // Cannot assign ticket because too many slots are in use
117  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Cannot allocate ticket with low priority");
118  return NULL;
119  }
120 
121  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Requested ticket with priority %d", priority);
122 
123  /* Allocate memory */
124  ticket = memb_alloc(&transmission_ticket_mem);
125  if( ticket == NULL ) {
126  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Cannot allocate ticket");
127  return NULL;
128  }
129  ticket->timestamp = clock_time();
130 
131  /* Initialize the ticket */
132  memset(ticket, 0, sizeof(struct transmit_ticket_t));
133 
134  /* Add it to our list */
135  if( priority == CONVERGENCE_LAYER_PRIORITY_NORMAL ) {
136  /* Append to list */
137  list_add(transmission_ticket_list, ticket);
138  } else {
139  /* Prepend to list */
140  list_push(transmission_ticket_list, ticket);
141  }
142 
143  /* Count the used slots */
145 
146  /* Hand over the ticket to our app */
147  return ticket;
148 }
149 
150 struct transmit_ticket_t * convergence_layer_get_transmit_ticket()
151 {
152  return convergence_layer_get_transmit_ticket_priority(CONVERGENCE_LAYER_PRIORITY_NORMAL);
153 }
154 
155 int convergence_layer_free_transmit_ticket(struct transmit_ticket_t * ticket)
156 {
157  if( ticket->bundle != NULL ) {
158  bundle_decrement(ticket->bundle);
159  ticket->bundle = NULL;
160  }
161 
162  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Freeing ticket %p", ticket);
163 
164  /* Only dequeue bundles that have been in the queue */
165  if( (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACTIVE) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_DONE) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_FAIL) ) {
167  }
168 
169  /* Count the used slots */
171 
172  /* Remove ticket from list and free memory */
173  list_remove(transmission_ticket_list, ticket);
174  memset(ticket, 0, sizeof(struct transmit_ticket_t));
175  memb_free(&transmission_ticket_mem, ticket);
176 
177  return 1;
178 }
179 
180 int convergence_layer_enqueue_bundle(struct transmit_ticket_t * ticket)
181 {
182  if( ticket == NULL ) {
183  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Cannot enqueue invalid ticket %p", ticket);
184  return -1;
185  }
186 
187  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Enqueuing bundle %lu to %u.%u, queue is at %u entries", ticket->bundle_number, ticket->neighbour.u8[0], ticket->neighbour.u8[1], convergence_layer_queue);
188 
189  /* The ticket is now active a ready for transmission */
190  ticket->flags |= CONVERGENCE_LAYER_QUEUE_ACTIVE;
191 
192  if( convergence_layer_pending == 0 ) {
193  /* Poll the process to initiate transmission */
194  process_poll(&convergence_layer_process);
195  }
196 
198 
199  return 1;
200 }
201 
202 int convergence_layer_send_bundle(struct transmit_ticket_t * ticket)
203 {
204  struct bundle_t *bundle = NULL;
205  uint8_t length = 0;
206  uint8_t * buffer = NULL;
207  uint8_t buffer_length = 0;
208 
209  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Sending bundle %lu to %u.%u with ticket %p", ticket->bundle_number, ticket->neighbour.u8[0], ticket->neighbour.u8[1], ticket);
210 
211  /* Read the bundle from storage, if it is not in memory */
212  if( ticket->bundle == NULL ) {
213  ticket->bundle = BUNDLE_STORAGE.read_bundle(ticket->bundle_number);
214  if( ticket->bundle == NULL ) {
215  LOG(LOGD_DTN, LOG_CL, LOGL_ERR, "Unable to read bundle %lu", ticket->bundle_number);
216  /* FIXME: Notify somebody */
217  return -1;
218  }
219  }
220 
221  /* Get our bundle struct and check the pointer */
222  bundle = (struct bundle_t *) MMEM_PTR(ticket->bundle);
223  if( bundle == NULL ) {
224  LOG(LOGD_DTN, LOG_CL, LOGL_ERR, "Invalid bundle pointer for bundle %lu", ticket->bundle_number);
225  bundle_decrement(ticket->bundle);
226  ticket->bundle = NULL;
227  return -1;
228  }
229 
230  /* Check if bundle has expired */
231  if( bundle->lifetime == 0 ) {
232  LOG(LOGD_DTN, LOG_CL, LOGL_INF, "Bundle %lu has expired, not sending it", ticket->bundle_number);
233 
234  /* Bundle is expired */
235  bundle_decrement(ticket->bundle);
236 
237  /* Tell storage to delete - it will take care of the rest */
238  BUNDLE_STORAGE.del_bundle(ticket->bundle_number, REASON_LIFETIME_EXPIRED);
239 
240  return -1;
241  }
242 
243  /* Get our buffer */
244  buffer = dtn_network_get_buffer();
245  if( buffer == NULL ) {
246  bundle_decrement(ticket->bundle);
247  ticket->bundle = NULL;
248  return -1;
249  }
250 
251  /* Get the buffer length */
252  buffer_length = dtn_network_get_buffer_length();
253 
254  /* Put in the prefix */
255  buffer[0] = 0;
256  buffer[0] |= CONVERGENCE_LAYER_TYPE_DATA & CONVERGENCE_LAYER_MASK_TYPE;
257  buffer[0] |= (CONVERGENCE_LAYER_FLAGS_FIRST | CONVERGENCE_LAYER_FLAGS_LAST) & CONVERGENCE_LAYER_MASK_FLAGS;
258  buffer[0] |= (outgoing_sequence_number << 2) & CONVERGENCE_LAYER_MASK_SEQNO;
259  ticket->sequence_number = outgoing_sequence_number;
261  length = 1;
262 
263  /* Encode the bundle into the buffer */
264  length += bundle_encode_bundle(ticket->bundle, buffer + 1, buffer_length - 1);
265 
266  /* Check if we may violate the maximum length */
267  if( length > CONVERGENCE_LAYER_MAX_LENGTH ) {
268  ticket->flags = CONVERGENCE_LAYER_QUEUE_FAIL;
269 
270  /* Notify routing module */
271  ROUTING.sent(ticket, ROUTING_STATUS_ERROR);
272 
273  return -1;
274  }
275 
276  /* Flag the bundle as being in transit now */
277  ticket->flags |= CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
278 
279  /* Now we are transmitting */
281 
282  /* This neighbour is blocked, until we have received the App Layer ACK or NACK */
283  convergence_layer_set_blocked(&ticket->neighbour);
284 
285  /* And send it out */
286  dtn_network_send(&ticket->neighbour, length, (void *) ticket);
287 
288  return 1;
289 }
290 
291 int convergence_layer_send_discovery(uint8_t * payload, uint8_t length, rimeaddr_t * neighbour)
292 {
293  uint8_t * buffer = NULL;
294 
295  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Sending discovery to %u.%u", neighbour->u8[0], neighbour->u8[1]);
296 
297  /* If we are currently transmitting or waiting for an ACK, do nothing */
299  return -1;
300  }
301 
302  /* Get our buffer */
303  buffer = dtn_network_get_buffer();
304  if( buffer == NULL ) {
305  return -1;
306  }
307 
308  /* Discovery Prefix */
309  buffer[0] = CONVERGENCE_LAYER_TYPE_DISCOVERY;
310 
311  /* Copy the discovery message and set the length */
312  memcpy(buffer + 1, payload, length);
313 
314  /* Now we are transmitting */
316 
317  /* Send it out via the MAC */
318  dtn_network_send(neighbour, length + 1, NULL);
319 
320  return 1;
321 }
322 
323 int convergence_layer_send_ack(rimeaddr_t * destination, uint8_t sequence_number, uint8_t type, struct transmit_ticket_t * ticket)
324 {
325  uint8_t * buffer = NULL;
326 
327  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Sending ACK or NACK to %u.%u for SeqNo %u with ticket %p", destination->u8[0], destination->u8[1], sequence_number, ticket);
328 
329  /* If we are currently transmitting or waiting for an ACK, do nothing */
331  /* This ticket has to be processed ASAP, so set timestamp to 0 */
332  ticket->timestamp = 0;
333 
334  return -1;
335  }
336 
337  /* Get our buffer */
338  buffer = dtn_network_get_buffer();
339  if( buffer == NULL ) {
340  return -1;
341  }
342 
343  if( type == CONVERGENCE_LAYER_TYPE_ACK ) {
344  // Construct the ACK
345  buffer[0] = 0;
346  buffer[0] |= CONVERGENCE_LAYER_TYPE_ACK & CONVERGENCE_LAYER_MASK_TYPE;
347  buffer[0] |= (sequence_number << 2) & CONVERGENCE_LAYER_MASK_SEQNO;
348  } else if( type == CONVERGENCE_LAYER_TYPE_NACK ) {
349  // Construct the NACK
350  buffer[0] = 0;
351  buffer[0] |= CONVERGENCE_LAYER_TYPE_NACK & CONVERGENCE_LAYER_MASK_TYPE;
352  buffer[0] |= (sequence_number << 2) & CONVERGENCE_LAYER_MASK_SEQNO;
353  }
354 
355  /* Note down our latest attempt */
356  ticket->timestamp = clock_time();
357 
358  /* Now we are transmitting */
360 
361  /* Send it out via the MAC */
362  dtn_network_send(destination, 1, ticket);
363 
364  return 1;
365 }
366 
367 int convergence_layer_create_send_ack(rimeaddr_t * destination, uint8_t sequence_number, uint8_t type)
368 {
369  struct transmit_ticket_t * ticket = NULL;
370 
371  /* We have to keep track of the outgoing packet, because we have to be able to retransmit */
372  ticket = convergence_layer_get_transmit_ticket_priority(CONVERGENCE_LAYER_PRIORITY_HIGH);
373  if( ticket == NULL ) {
374  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Unable to allocate ticket to potentially retransmit ACK/NACK");
375  } else {
376  rimeaddr_copy(&ticket->neighbour, destination);
377  ticket->sequence_number = sequence_number;
378  ticket->flags |= CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
379 
380  if( type == CONVERGENCE_LAYER_TYPE_ACK ) {
381  ticket->flags |= CONVERGENCE_LAYER_QUEUE_ACK;
382  } else {
383  ticket->flags |= CONVERGENCE_LAYER_QUEUE_NACK;
384  }
385  }
386 
387  return convergence_layer_send_ack(destination, sequence_number, type, ticket);
388 }
389 
390 int convergence_layer_resend_ack(struct transmit_ticket_t * ticket)
391 {
392  /* Check if we really have an ACK/NACK that is currently not beeing transmitted */
393  if( (!(ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK) && !(ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK)) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_IN_TRANSIT) ) {
394  return 0;
395  }
396 
397  /* Check for the retransmission timer */
398  if( (clock_time() - ticket->timestamp) < (CONVERGENCE_LAYER_RETRANSMIT_TIMEOUT * CLOCK_SECOND) ) {
399  return 0;
400  }
401 
402  ticket->flags |= CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
403  uint8_t type = 0;
404  if( ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK ) {
405  type = CONVERGENCE_LAYER_TYPE_ACK;
406  } else if( ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK ) {
407  type = CONVERGENCE_LAYER_TYPE_NACK;
408  } else {
409  LOG(LOGD_DTN, LOG_CL, LOGL_ERR, "Unknown control packet type");
410  return 0;
411  }
412 
413  convergence_layer_send_ack(&ticket->neighbour, ticket->sequence_number, type, ticket);
414 
415  return 1;
416 }
417 
418 int convergence_layer_parse_dataframe(rimeaddr_t * source, uint8_t * payload, uint8_t length, uint8_t flags, uint8_t sequence_number, packetbuf_attr_t rssi)
419 {
420  struct mmem * bundlemem = NULL;
421  struct bundle_t * bundle = NULL;
422  int n;
423 
424  if( flags != (CONVERGENCE_LAYER_FLAGS_FIRST | CONVERGENCE_LAYER_FLAGS_LAST ) ) {
425  LOG(LOGD_DTN, LOG_CL, LOGL_ERR, "Bundle received %p from %u.%u with invalid flags %02X", payload, source->u8[0], source->u8[1], flags);
426  }
427 
428  /* Allocate memory, parse the bundle and set reference counter to 1 */
429  bundlemem = bundle_recover_bundle(payload, length);
430  if( !bundlemem ) {
431  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Error recovering bundle");
432  return -1;
433  }
434 
435  bundle = (struct bundle_t *) MMEM_PTR(bundlemem);
436  if( !bundle ) {
437  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Invalid bundle pointer");
438  bundle_decrement(bundlemem);
439  return -1;
440  }
441 
442  /* Mark the bundle as "internal" */
443  bundle->source_process = &agent_process;
444 
445  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Bundle %lu received from %u.%u with SeqNo %u", bundle->bundle_num, source->u8[0], source->u8[1], sequence_number);
446 
447  /* Store the node from which we received the bundle */
448  rimeaddr_copy(&bundle->msrc, source);
449 
450  /* Store the RSSI for this packet */
451  bundle->rssi = rssi;
452 
453  /* Hand over the bundle to dispatching */
454  n = dispatching_dispatch_bundle(bundlemem);
455  bundlemem = NULL;
456 
457  if( n ) {
458  /* Send out the ACK */
459  convergence_layer_create_send_ack(source, sequence_number + 1, CONVERGENCE_LAYER_TYPE_ACK);
460  } else {
461  /* Send out NACK */
462  convergence_layer_create_send_ack(source, sequence_number, CONVERGENCE_LAYER_TYPE_NACK);
463  }
464 
465  return 1;
466 }
467 
468 int convergence_layer_parse_ackframe(rimeaddr_t * source, uint8_t * payload, uint8_t length, uint8_t sequence_number, uint8_t type)
469 {
470  struct transmit_ticket_t * ticket = NULL;
471  struct bundle_t * bundle = NULL;
472 
473  /* This neighbour is now unblocked */
474  convergence_layer_set_unblocked(source);
475 
476  if( convergence_layer_pending == 0 ) {
477  /* Poll the process to initiate transmission of the next bundle */
478  process_poll(&convergence_layer_process);
479  }
480 
481  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Incoming ACK from %u.%u for SeqNo %u", source->u8[0], source->u8[1], sequence_number);
482 
483  for(ticket = list_head(transmission_ticket_list);
484  ticket != NULL;
485  ticket = list_item_next(ticket) ) {
486  if( rimeaddr_cmp(source, &ticket->neighbour) && (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK_PEND) ) {
487  break;
488  }
489  }
490 
491  /* Unable to find that bundle */
492  if( ticket == NULL ) {
493  return -1;
494  }
495 
496  /* Does the originator need forward notification? */
497  if( type == CONVERGENCE_LAYER_TYPE_ACK && ticket->bundle != NULL ) {
498  bundle = (struct bundle_t *) MMEM_PTR(ticket->bundle);
499 
500  /* Is the forward report flag set? */
501  if( bundle->flags & BUNDLE_FLAG_REP_FWD ) {
502  STATUSREPORT.send(ticket->bundle, NODE_FORWARDED_BUNDLE, NO_ADDITIONAL_INFORMATION);
503  }
504  }
505 
506  if( type == CONVERGENCE_LAYER_TYPE_ACK ) {
507  /* Bundle has been ACKed and is now done */
508  ticket->flags = CONVERGENCE_LAYER_QUEUE_DONE;
509 
510  /* Notify routing module */
511  ROUTING.sent(ticket, ROUTING_STATUS_OK);
512  } else if( type == CONVERGENCE_LAYER_TYPE_NACK ) {
513  /* Bundle has been NACKed and is now done */
514  ticket->flags = CONVERGENCE_LAYER_QUEUE_FAIL;
515 
516  /* Notify routing module */
517  ROUTING.sent(ticket, ROUTING_STATUS_NACK);
518  }
519 
520  /* We can free the bundle memory */
521  if( ticket->bundle != NULL ) {
522  bundle_decrement(ticket->bundle);
523  ticket->bundle = NULL;
524  }
525 
526  return 1;
527 }
528 
529 int convergence_layer_incoming_frame(rimeaddr_t * source, uint8_t * payload, uint8_t length, packetbuf_attr_t rssi)
530 {
531  uint8_t * data_pointer = NULL;
532  uint8_t data_length = 0;
533  uint8_t header;
534  int ret = 0;
535 
536  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Incoming frame from %u.%u", source->u8[0], source->u8[1]);
537 
538  /* Notify the discovery module, that we have seen a peer */
539  DISCOVERY.alive(source);
540 
541  /* Check the COMPAT information */
543  LOG(LOGD_DTN, LOG_CL, LOGL_INF, "Ignoring incoming frame from %u.%u", source->u8[0], source->u8[1]);
544  return -1;
545  }
546 
547  header = payload[0];
548  data_pointer = payload + 1;
549  data_length = length - 1;
550 
551  if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_DATA ) {
552  /* is data */
553  int flags = 0;
554  int sequence_number = 0;
555 
556  flags = (header & CONVERGENCE_LAYER_MASK_FLAGS) >> 0;
557  sequence_number = (header & CONVERGENCE_LAYER_MASK_SEQNO) >> 2;
558 
559  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Incoming data frame from %u.%u with SeqNo %u", source->u8[0], source->u8[1], sequence_number);
560 
561  /* Parse the incoming data frame */
562  ret = convergence_layer_parse_dataframe(source, data_pointer, data_length, flags, sequence_number, rssi);
563 
564  /* Send out NACK if parsing fails */
565  if( ret < 0 ) {
566  convergence_layer_create_send_ack(source, sequence_number, CONVERGENCE_LAYER_TYPE_NACK);
567  }
568 
569  return 1;
570  }
571 
572  if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_DISCOVERY ) {
573  /* is discovery */
574  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Incoming discovery frame from %u.%u", source->u8[0], source->u8[1]);
575 
576  DISCOVERY.receive(source, data_pointer, data_length);
577 
578  return 1;
579  }
580 
581  if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_ACK ) {
582  /* is ACK */
583  int sequence_number = 0;
584 
585  sequence_number = (header & CONVERGENCE_LAYER_MASK_SEQNO) >> 2;
586 
587  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Incoming Ack frame from %u.%u with SeqNo %u", source->u8[0], source->u8[1], sequence_number);
588 
589  convergence_layer_parse_ackframe(source, data_pointer, data_length, sequence_number, CONVERGENCE_LAYER_TYPE_ACK);
590 
591  return 1;
592  }
593 
594  if( (header & CONVERGENCE_LAYER_MASK_TYPE) == CONVERGENCE_LAYER_TYPE_NACK ) {
595  /* is NACK */
596  int sequence_number = 0;
597 
598  sequence_number = (header & CONVERGENCE_LAYER_MASK_SEQNO) >> 2;
599 
600  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Incoming Nack frame from %u.%u with SeqNo %u", source->u8[0], source->u8[1], sequence_number);
601 
602  convergence_layer_parse_ackframe(source, data_pointer, data_length, sequence_number, CONVERGENCE_LAYER_TYPE_NACK);
603 
604  return 1;
605  }
606 
607  return 0;
608 }
609 
610 int convergence_layer_status(void * pointer, uint8_t outcome)
611 {
612  struct transmit_ticket_t * ticket = NULL;
613 
614  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "MAC callback for %p with %d", pointer, outcome);
615 
616  /* Something has been sent, so the radio is not blocked anymore */
618 
619  /* Notify the process to commence transmitting outgoing bundles */
620  if( convergence_layer_pending == 0 ) {
621  if( outcome == CONVERGENCE_LAYER_STATUS_NOSEND ) {
622  /* Send event to slow the stuff down */
623  etimer_set(&convergence_layer_backoff, 0.5 * CLOCK_SECOND);
624  convergence_layer_backoff_pending = 1;
625  } else {
626  /* Poll to make it faster */
627  process_poll(&convergence_layer_process);
628  }
629 
631  }
632 
633  if( pointer == NULL ) {
634  /* Must be a discovery message */
635  return 0;
636  }
637 
638  ticket = (struct transmit_ticket_t *) pointer;
639 
640  if( (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK) ) {
641  /* Unset IN_TRANSIT flag */
642  ticket->flags &= ~CONVERGENCE_LAYER_QUEUE_IN_TRANSIT;
643 
644  /* Must be a NACK or ACK */
645  if( outcome == CONVERGENCE_LAYER_STATUS_OK ) {
646  /* Great! */
647  convergence_layer_free_transmit_ticket(ticket);
648 
649  return 1;
650  }
651 
652  /* Fatal error, retry is pointless */
653  if( outcome == CONVERGENCE_LAYER_STATUS_FATAL ) {
654  convergence_layer_free_transmit_ticket(ticket);
655  return 1;
656  }
657 
658  if( outcome == CONVERGENCE_LAYER_STATUS_NOSEND ) {
659  // Has not been transmitted, so retry right away
660  ticket->timestamp = 0;
661  ticket->failed_tries ++;
662  }
663 
664  /* Increase the retry counter */
665  ticket->tries ++;
666 
667  /* Give up on too many retries */
668  if( ticket->tries >= CONVERGENCE_LAYER_RETRANSMIT_TRIES || ticket->failed_tries >= CONVERGENCE_LAYER_FAILED_RETRIES) {
669  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "CL: Giving up on ticket %p after %d (or %d) tries", ticket, ticket->tries, ticket->failed_tries);
670  convergence_layer_free_transmit_ticket(ticket);
671 
672  return 0;
673  }
674 
675  return 1;
676  }
677 
678  /* Bundle was transmitted successfully */
679  if( outcome == CONVERGENCE_LAYER_STATUS_OK ) {
680  // Bundle is sent, now waiting for the app-layer ACK
681  ticket->flags = CONVERGENCE_LAYER_QUEUE_ACTIVE | CONVERGENCE_LAYER_QUEUE_ACK_PEND;
682 
683  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "LL Ack received, waiting for App-layer ACK with SeqNo %u", ticket->sequence_number);
684 
685  /* It is unlikely that we have to retransmit this bundle, so free up memory */
686  if( ticket->bundle != NULL ) {
687  bundle_decrement(ticket->bundle);
688  ticket->bundle = NULL;
689  }
690 
691  return 1;
692  }
693 
694  /* Fatal error, no retry necessary */
695  if( outcome == CONVERGENCE_LAYER_STATUS_FATAL ) {
696  /* This neighbour is now unblocked */
697  convergence_layer_set_unblocked(&ticket->neighbour);
698 
699  /* Notify routing module */
700  ROUTING.sent(ticket, ROUTING_STATUS_ERROR);
701 
702  return 1;
703  }
704 
705  /* Something went wrong, unblock the neighbour */
706  convergence_layer_set_unblocked(&ticket->neighbour);
707 
708  /* Bundle did not get an ACK, increase try counter */
709  if( outcome == CONVERGENCE_LAYER_STATUS_NOACK ) {
710  ticket->tries ++;
711  } else if( outcome == CONVERGENCE_LAYER_STATUS_NOSEND ) {
712  ticket->failed_tries ++;
713  }
714 
715  if( ticket->tries >= CONVERGENCE_LAYER_RETRIES || ticket->failed_tries >= CONVERGENCE_LAYER_FAILED_RETRIES ) {
716  /* Bundle fails over and over again, notify routing */
717  ticket->flags = CONVERGENCE_LAYER_QUEUE_FAIL;
718 
719  /* Notify routing module */
720  ROUTING.sent(ticket, ROUTING_STATUS_FAIL);
721 
722  /* We can already free the bundle memory */
723  if( ticket->bundle != NULL ) {
724  bundle_decrement(ticket->bundle);
725  ticket->bundle = NULL;
726  }
727 
728  return 1;
729  }
730 
731  /* Transmission did not work, retry it right away */
732  ticket->flags = CONVERGENCE_LAYER_QUEUE_ACTIVE;
733 
734  return 1;
735 }
736 
737 int convergence_layer_delete_bundle(uint32_t bundle_number)
738 {
739  struct transmit_ticket_t * ticket = NULL;
740 
741  LOG(LOGD_DTN, LOG_CL, LOGL_DBG, "Deleting tickets for bundle %lu", bundle_number);
742 
743  for(ticket = list_head(transmission_ticket_list);
744  ticket != NULL;
745  ticket = list_item_next(ticket) ) {
746  if( ticket->bundle_number == bundle_number ) {
747  break;
748  }
749  }
750 
751  /* Unable to find that bundle */
752  if( ticket == NULL ) {
753  return -1;
754  }
755 
756  /* free the ticket's memory */
757  convergence_layer_free_transmit_ticket(ticket);
758 
759  return 1;
760 }
761 
762 int convergence_layer_is_blocked(rimeaddr_t * neighbour)
763 {
764  struct blocked_neighbour_t * n = NULL;
765 
766  for( n = list_head(blocked_neighbour_list);
767  n != NULL;
768  n = list_item_next(n) ) {
769  if( rimeaddr_cmp(neighbour, &n->neighbour) ) {
770  return 1;
771  }
772  }
773 
774  return 0;
775 }
776 
777 int convergence_layer_set_blocked(rimeaddr_t * neighbour)
778 {
779  struct blocked_neighbour_t * n = NULL;
780 
781  n = memb_alloc(&blocked_neighbour_mem);
782  if( n == NULL ) {
783  LOG(LOGD_DTN, LOG_CL, LOGL_ERR, "Cannot allocate neighbour memory");
784  return -1;
785  }
786 
787  /* Fill the struct */
788  rimeaddr_copy(&n->neighbour, neighbour);
789  n->timestamp = clock_time();
790 
791  /* Add it to the list */
792  list_add(blocked_neighbour_list, n);
793 
794  return 1;
795 }
796 
797 int convergence_layer_set_unblocked(rimeaddr_t * neighbour)
798 {
799  struct blocked_neighbour_t * n = NULL;
800 
801  for( n = list_head(blocked_neighbour_list);
802  n != NULL;
803  n = list_item_next(n) ) {
804  if( rimeaddr_cmp(neighbour, &n->neighbour) ) {
805  list_remove(blocked_neighbour_list, n);
806  memb_free(&blocked_neighbour_mem, n);
807  return 1;
808  }
809  }
810 
811  return 0;
812 }
813 
814 void check_blocked_neighbours() {
815  struct blocked_neighbour_t * n = NULL;
816  struct transmit_ticket_t * ticket = NULL;
817 
818  for( n = list_head(blocked_neighbour_list);
819  n != NULL;
820  n = list_item_next(n) ) {
821 
822  if( (clock_time() - n->timestamp) >= (((clock_time_t) CLOCK_SECOND) * ((clock_time_t) CONVERGENCE_LAYER_TIMEOUT) ) ) {
823  /* We have a neighbour that takes quite long to reply apparently -
824  * unblock him and resend the pending bundle
825  */
826  break;
827  }
828  }
829 
830  /* Nothing to do for us */
831  if( n == NULL ) {
832  return;
833  }
834 
835  /* Go and find the currently transmitting ticket to that neighbour */
836  for( ticket = list_head(transmission_ticket_list);
837  ticket != NULL;
838  ticket = list_item_next(ticket) ) {
839  if( rimeaddr_cmp(&ticket->neighbour, &n->neighbour) && (ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK_PEND) ) {
840  break;
841  }
842  }
843 
844  /* Unblock the neighbour */
845  convergence_layer_set_unblocked(&n->neighbour);
846 
847  LOG(LOGD_DTN, LOG_CL, LOGL_WRN, "Neighbour %u.%u stale, removing lock", n->neighbour.u8[0], n->neighbour.u8[1]);
848 
849  /* There seems to be no ticket, nothing to do for us */
850  if( ticket == NULL ) {
851  return;
852  }
853 
854  /* Otherwise: just reactivate the ticket, it will be transmitted again */
855  ticket->flags = CONVERGENCE_LAYER_QUEUE_ACTIVE;
856 
857  if( convergence_layer_pending == 0 ) {
858  /* Tell the process to resend the bundles */
859  process_poll(&convergence_layer_process);
860  }
861 }
862 
863 int convergence_layer_neighbour_down(rimeaddr_t * neighbour) {
864  struct transmit_ticket_t * ticket = NULL;
865  int changed = 1;
866 
867  if( neighbour == NULL ) {
868  return -1;
869  }
870 
871  while( changed ) {
872  changed = 0;
873 
874  /* Go and look for a ticket for this neighbour */
875  for(ticket = list_head(transmission_ticket_list);
876  ticket != NULL;
877  ticket = list_item_next(ticket) ) {
878 
879  if( rimeaddr_cmp(neighbour, &ticket->neighbour) ) {
880  /* Notify routing module */
881  ROUTING.sent(ticket, ROUTING_STATUS_FAIL);
882 
883  /* Mark as changed */
884  changed = 1;
885 
886  /* Stop look and start over again */
887  break;
888  }
889  }
890  }
891 
892  /* Remove potentially stale lock for neighbour */
893  convergence_layer_set_unblocked(neighbour);
894 
895  return 1;
896 }
897 
898 PROCESS_THREAD(convergence_layer_process, ev, data)
899 {
900  struct transmit_ticket_t * ticket = NULL;
901  static struct etimer stale_timer;
902  int n;
903 
904  PROCESS_BEGIN();
905 
906  /* Initialize ticket storage */
907  memb_init(&transmission_ticket_mem);
908  list_init(transmission_ticket_list);
909 
910  /* Initialize neighbour storage */
911  memb_init(&blocked_neighbour_mem);
912  list_init(blocked_neighbour_list);
913 
914  LOG(LOGD_DTN, LOG_CL, LOGL_INF, "CL process is running");
915 
916  /* Initialize state */
921 
922  /* Set timer */
923  etimer_set(&stale_timer, CLOCK_SECOND);
924 
925  while(1) {
926  PROCESS_YIELD_UNTIL(ev == PROCESS_EVENT_POLL || ev == PROCESS_EVENT_CONTINUE || etimer_expired(&stale_timer) || ev == PROCESS_EVENT_TIMER);
927 
928  if( etimer_expired(&stale_timer) ) {
929  check_blocked_neighbours();
930  etimer_restart(&stale_timer);
931  }
932 
933  if( ev == PROCESS_EVENT_POLL || ev == PROCESS_EVENT_CONTINUE || (convergence_layer_backoff_pending && etimer_expired(&convergence_layer_backoff)) ) {
935 
936  /* Stop timer to avoid it firing again */
937  if ( (convergence_layer_backoff_pending && etimer_expired(&convergence_layer_backoff)) ) {
939  convergence_layer_backoff_pending = 0;
940  }
941 
942  /* If we are currently transmitting, we cannot send another bundle */
944  /* We will get polled again when the MAC layers calls us back,
945  * so lean back and relax
946  */
947  continue;
948  }
949 
950 
951  /* If we have been woken up, it must have been a poll to transmit outgoing bundles */
952  for(ticket = list_head(transmission_ticket_list);
953  ticket != NULL;
954  ticket = list_item_next(ticket) ) {
955  if( ((ticket->flags & CONVERGENCE_LAYER_QUEUE_ACK) || (ticket->flags & CONVERGENCE_LAYER_QUEUE_NACK)) && !(ticket->flags & CONVERGENCE_LAYER_QUEUE_IN_TRANSIT) ) {
956  n = convergence_layer_resend_ack(ticket);
957  if( n ) {
958  /* Transmission happened */
959  break;
960  }
961  }
962 
963  /* Tickets that are in any other state than ACTIVE cannot be transmitted */
964  if( ticket->flags != CONVERGENCE_LAYER_QUEUE_ACTIVE ) {
965  continue;
966  }
967 
968  /* Neighbour for which we are currently waiting on app-layer ACKs cannot receive anything now */
969  if( convergence_layer_is_blocked(&ticket->neighbour) ) {
970  continue;
971  }
972 
973  /* Send the bundle just now */
974  convergence_layer_send_bundle(ticket);
975 
976  /* Radio is busy now, defer */
977  break;
978  }
979  }
980  }
981  PROCESS_END();
982 }