diff --git a/04-futex/futex.c b/04-futex/futex.c
index 101692b..8692d1b 100644
--- a/04-futex/futex.c
+++ b/04-futex/futex.c
@@ -53,12 +53,40 @@ void sem_init(atomic_int *sem, unsigned initval) {
decrement it. If it is already zero, we sleep until the value
becomes larger than zero and try decrementing it again. */
void sem_down(atomic_int *sem) {
+
+ // We have to use a loop here, as there can be a race between old
+ // and new decrementors after another thread wakes up the old
+ // decrementor.
+ while (true) {
+ // We retrieve the current value and try to decrement it if it
+ // is larger than zero.
+ int value = atomic_load(sem);
+ if (value > 0) {
+ // We use compare and exchange here to attempt the
+ // decrement here. We hope that sem is still value. In
+ // this case the compare and exchange updates the
+ // semaphore to (value - 1).
+ if (atomic_compare_exchange_strong(sem, &value, value - 1) == true)
+ break; // Compare-and-Exchange succeeded!
+ } else {
+ // We wait, if the value is still zero!
+ futex_wait(sem, 0);
+ }
+ }
}
/* The semaphore increment operation increments the counter and wakes
up one waiting thread, if there is the possibility of waiting
threads. */
void sem_up(atomic_int *sem) {
+ // Get the old value and increment the semaphore unconditionally.
+ int prev = atomic_fetch_add(sem, 1);
+
+ // Only if the old value was zero there is the possibility of
+ // waiting threads. Only in this case, we invoke the (expensive)
+ // system-call.
+ if (prev == 0)
+ futex_wake(sem, 1);
}
////////////////////////////////////////////////////////////////
@@ -86,14 +114,72 @@ struct bounded_buffer {
};
void bb_init(struct bounded_buffer *bb) {
+
+ // Binary Semaphore (used as Mutex): We initialize it with 1 as
+ // the mutex is currently free. If someone wants to enter the
+ // critical section, the semaphore is decremented.
+ sem_init(&bb->lock, 1);
+
+ // Counting semaphore: We use these semaphores to count the number
+ // of empty slots (initally 3) and the number of valid elements
+ // (initially 0) in our bounded buffer.
+ sem_init(&bb->slots, ARRAY_SIZE(bb->data));
+ sem_init(&bb->elements, 0);
+
+ // We start reading and writing at data[0]
+ bb->read_idx = 0;
+ bb->write_idx = 0;
}
void * bb_get(struct bounded_buffer *bb) {
void *ret = NULL;
+
+ // Before, we get an element, we have to ensure that there is at
+ // least one element in the bounded buffer. For this, we decrement
+ // the elements semaphroe
+ sem_down(&bb->elements);
+
+ // We retrieve the element from the data field using the read_idx.
+ // As these indices are shared between threads and processes, we
+ // synchronize the access to it with our binary semaphore.
+
+ { // Critical Section (protected by bb->lock)
+ sem_down(&bb->lock);
+
+ ret = bb->data[bb->read_idx];
+ bb->read_idx = (bb->read_idx + 1) % ARRAY_SIZE(bb->data);
+
+ sem_up(&bb->lock);
+ }
+
+ // After we have removed the element, more slots are empty.
+ // Therefore, we increment that semaphore, which potentially wakes
+ // up threads that wait in `bb_put()`.
+ sem_up(&bb->slots);
+
return ret;
}
void bb_put(struct bounded_buffer *bb, void *data) {
+
+ // bb_put performs almost the same operation as bb_put, with a few
+ // differences:
+ // - We first decrement the slots semaphore to allocate an empty slot
+ // - We use the write_idx to determine the written slot
+ // - We increment the number of elements afterwards.
+ sem_down(&bb->slots);
+
+ { // Critical Section
+ sem_down(&bb->lock);
+
+ bb->data[bb->write_idx] = data;
+ bb->write_idx = (bb->write_idx + 1) % ARRAY_SIZE(bb->data);
+
+ sem_up(&bb->lock);
+ }
+
+ sem_up(&bb->elements);
+
}
@@ -134,7 +220,11 @@ int main() {
printf("Child has initialized the bounded buffer\n");
- // FIXME: Retrieve elements from the bounded buffer
+ char *element;
+ do {
+ element = bb_get(bb);
+ printf("Parent: %p = '%s'\n", element, element);
+ } while(element != NULL);
} else {
////////////////////////////////////////////////////////////////
// Child
@@ -142,9 +232,16 @@ int main() {
"Hello", "World", "!", "How", "are", "you", "?"
};
(void)data;
- // FIXME: Initialize the bounded buffer after sleeping a second
- // FIXME: Wake up the parent who is waiting on semaphore
- // FIXME: Push all char pointers through the bounded buffer to the parent.
+ sleep(1);
+ bb_init(bb);
+ printf("Child: We initialized the bounded buffer\n");
+ sem_up(semaphore);
+ for (unsigned int i = 0; i < 9; i ++) {
+ bb_put(bb, (void*) data[i % ARRAY_SIZE(data)]);
+
+ if (i > 5) sleep(1);
+ }
+ bb_put(bb, NULL);
}
return 0;