Mental models

Why ?

I get how to start a PcoThread, I know how a PcoMutex or PcoSemaphore can be used, but I'm lost on how to put that all together...

When reading code I don't know how to detect potential issues and I miss a lot of problems.

When I see an non protected critical section, I cannot find a situation where the scheduling of threads is causing incorrect behaviors...

Note: in this document, when refering to T1, T2 or any T + a number, we refer to a thread with a specific number to easily differentiate them.

Thinking with the concurrence mindset

Before, you always thought that "if a function has 3 lines, nothing can happen in between these 3 lines". Now, a lot of other code can run in between, sometimes the same code or another functions on 1 or multiple others threads... Sometimes a single line is actually a lot of CPU instructions under the hood and you need to mentally divide them.

A crucial skill to develop in this course, is the ability to read concurrent code and imagine situations where the unpredictible scheduling is able to cause issues. This is really something new and something that takes time to master.

To be concrete, let's take a concrete custom exercice. This code and fixes presented have the particularity to not include any synchronisation methods (mutex or others) to simplify the mental and enable showing you diverse scenarios on a single code snippet. In practice, you'll just use synchronisation methods (mutex or others) to fix them.

It goes in a certain level of depth, to make sure you can understand in details a lot of ways things can go wrong. This is not necessarily exhaustive but it should help you start to see patterns and imagine your own scenarios. When reading code it's not enough to detect critical sections, it's also important to understand why and how it would fail under certain order chosen by the scheduler.

We also talk about memory issues that are may be sometimes out of scope of the course, but you might encounter them in labs if you need to manage dynamic allocations. In any case, they'll also give you harder cases to think about.

Concurrency bugs can generate various errors types (this list is probably not exhaustive):

  • incoherent results in calculation or incorrect behaviors, the algorithm and logic is getting wrong
  • correct behavior but rules not enforced 100% of the time, i.e. the priority rules between threads could be bypassed
  • or even worse: memory issues that could crash the program or have undefined behavior, such as access freed memory, saving values only partially

The implementation to review

See the code

Here is the implementation of FastAvg, a faster way to calculate the average. There is no main to simplify the thinking and avoid limiting the possibilities of use. A shared FastAvg instance can be called concurrently at any time on any public methods with any value.

// Container to calculate the average of a list of integers faster, by avoiding to compute the whole sum from scratch everytime
class FastAvg {
private:
    vector<int> values;
    long sum = 0;

public:
    // Calculate the average of given values, returns 0 when empty
    double average() {
        if (values.empty()) return 0.;
        return (double) sum / values.size();
    }

    // Push a new value `a` and return the position of the pushed value
    // This position could be used in further calls of `update`
    size_t push(int a) {
        values.push_back(a);
        sum += a;
        return values.size() - 1;
    }

    // Update an existing value `a` at position `i`
    // and returns whether the value has been updated
    bool update(size_t i, int a) {
        if (i >= values.size()) return false;

        sum += -values.at(i) + a;// remove the current value and add the new one
        values.at(i) = a;

        return true;
    }

    // Delete all inserted values
    void reset() {
        values.clear();
        values.shrink_to_fit();
        sum = 0;
    }
};

Make sure to understand the general goal and how it works without concurrency.

Details if this is not clear for you

We just want to have calculate the average of a large numbers of changing values. Instead of computing the sum of all values everytime we push a new one or we change an existing number, we store the previous sum value to adapt it. This also makes the final average calculation in instead of .

Where is the shared state considering threads have all access to a FastAvg instance ? The values and sum attributes is the shared state.

Now, let's analyse the methods one by one to analyse and understand several issues.

Analysing the average method

double average() const {
    if (values.empty()) return 0.;
    return (double) sum / values.size();
}

Division by zero check

The first risk is the division by zero. This risk is mitigated by checking the size via the empty() method before doing the calculation.

As we are first checking the size (code of empty()) and then reloading a second a time (via size()), everything could have happened in the meantime.

If size is 0, the if condition would be true, thus returning zero. No issue.

If size is > 0, the if is not entered and we continue. Is there a way the size could be zero again after this check ? Yes, because we have the reset method, the vector can become empty in between.

How to support concurrent reset calls ? The root issue is that we are loading size twice from the shared memory. Using a local variable to be sure the value cannot be changed can fix this specific issue.

size_t localSize = values.size();
if (localSize == 0) return 0.;
return (double) sum / localSize;

Wrong calculation of average

We are also reading the sum and the size() one after the other, without any warranty that they correspond. What happens if another thread is doing push at the same time we are calculating the average? We accept to return the previous or the new average (both are okay), but we really don't want any other incoherent value !

That's a frequent issue when 2 variables are linked. In this code, when the size is changed, the sum would also need to be updated. This relation can be expressed as size always corresponds to the number of values included in sum.

Just to be clear sum / values.size(); is actually made in 3 steps

  1. Step A: Load sum from memory into a register
  2. Step B: Call the size() method and store its returned value
  3. Step C: Do the division and store the result into a register

The issue is that B doesn't always follows A immediately, here is scenario where the calculation would be incoherent.

  1. The vector contains [1,3], the sum is 4 and size is 2. The average calculation should be 4/2 = 2.
  2. T1 is running average(), it does step A, the register contains a copy of the sum to 4.
  3. T2 is running push, no matter how protected is the push method, it needs to change sum and values in 2 steps.
  4. T2 want to push 6, once done, the new average calculation should be 10/3 = 3.333.
  5. T2 is running values.push_back(6), the sum has not been updated yet but the size is 3.
  6. T2 could continue or not at this point, but this doesn't matter: changing sum will not impact T1 in this scenario, and as long as the size has been changed it will impact T1...
  7. T1 continues with step B which loads the size as 3. It does the division in step C with 4 / 3 instead of the correct calculation (which is 4 / 2 or 10 / 3). This obviously gives us an invalid average that cannot be accepted as valid!

Conclusion: even with just reads, a lot of wrong things can happen, because they are write access somewhere else.

Analysing the push method

size_t push(int a) {
    values.push_back(a);
    sum += a;
    return values.size() - 1;
}

A lot of the same thinking from above can also be applied to this second method, but because we have write accesses, this is a bit more difficult to imagine all scenarios.

Returning the wrong index

The returned size_t needs to be correct. Otherwise, future calls to update would fail or change values at incorrect positions. The returned index could be wrong in case the push_back is not directly followed by the size() access.

Here is problematic scenario

  1. Imagine an empty vector, with size = 0.
  2. T1 is doing the push_back, the size is now 1
  3. Then T2 is also calling push and doing the push_back, the size is now 2.
  4. T1 continues and return size - 1 = 2-1 = 1
  5. T1 has returned the wrong index, it was 1 but it should be 0

There is no way to fix this specific issue by refactoring the existing code. A mutex or another way to make sure the push_back and size read is done at once, is necessary.

Concurrent write access on sum causing incorrect sum

As you probably know the classic issue, the ++ or += or any other calculations is done in multiple steps. Like when incrementing i++, the sum augmentation could be lost if two augmentations are done at the same time. In this situation sum += a; could be broken down into these steps. (Registers names are arbitrary).

  1. Read sum into a register R1
  2. Read a into a register R2
  3. Do the addition of R1 + R2 and saves the result in R3
  4. Store R3 into sum

What happens if these 4 steps are not done at once ? Here is problematic scenario.

  1. The vector is empty
  2. T1 is pushing 4 (push_back done) and does step 1, 2, 3. R3 of T1 now contains 0+4 = 4. The vector contains [4] but the sum variable in RAM is still 0.
  3. T2 is pushes 5. It does step 1 (R1 is 0), step 2, step 3 (R3 is 0 + 5 = 5) and also the step 4. The vector contains [4, 5] and the sum variable in RAM is now 5.
  4. T1 continues with step 4 which set the sum value to 4 (this is losing the previous value 5).
  5. T1 and T2 are done on push
  6. The big issue is that the vector contains [4, 5] and the sum variable in RAM is 4. It should be 4+5 = 9 instead of 4 !

The current and future calculation of the average will be wrong !

Concurrent write access on values causing memory issues

values.push_back(a);

What could happen by mutating values at the same time in 2 threads ? This is asking for a bit more imagination because we don't see directly the code inside push_back and this is related to the internal management the vector class is needing.

As a reminder, the vector allocates memory dynamically to store a contiguous array of values on the heap. Its attributes are the pointer towards the heap, the size and capacity of the allocated array.

  1. The implementation needs to do the following things
    1. If the size is equal to the capacity, allocate twice more memory, move data to the new place and free the previous zone
    2. Then as we know we have some space available, it saves the given value into the correct array index on the heap and then increment the size As this pseudocode above contains a lot of steps that are designed to be done in a single threaded context, a lot of wrong things can occur.

As we don't have an easy to read and self-contained implementation of vector::push_back to discuss every possible issues, we'll write a POC that is calling push_back from 4 threads to experiment and test various hypothesis. The final code of this POC is available here. Go run this and see the Steps to have fun at the very top to play with it. You really should experience the steps documented below yourself and play with the code.

See code of POC
#include <iostream>
#include <pcosynchro/pcothread.h>
#include <string>
#include <vector>

using namespace std;
const long TOTAL = 100000;
const int NB_THREADS = 4;

vector<int> values;

void task(int tid) {
    for (long i = 0; i < TOTAL; i++) {
        values.push_back(i);
        string msg = "tid : " + to_string(tid) + " i: " + to_string(i) + "\n";
        cout << msg;
    }
}

// default main
Result of execution

We run this code, we can see the 4 threads are working all at the same time and don't have the same progression.

> r
tid: 0 i: 52
tid: 3 i: 12
tid: 2 i: 23
tid: 0 i: 53
tid: 1 i: 54
tid: 3 i: 13
...
double free or corruption (out)

We have an almost 100% of chance to have a crash because of the dynamic allocations.

To count the number of lines (approximately one line per push_back) before a crash, we use wc -l (lines counter). This occurs sometimes sooner or later in the execution.

> r | wc -l
double free or corruption (out)
300

> r | wc -l
double free or corruption (out)
double free or corruption (!prev)
300

> r | wc -l
malloc(): corrupted top size
4121

> r | wc -l
malloc(): corrupted top size
free(): unaligned chunk detected in tcache 2
3856

> r | wc -l
malloc(): corrupted top size
4116

> r | wc -l
terminate called after throwing an instance of 'std::bad_alloc'
what():  std::bad_alloc
5

As you can see, it seems that multiple reallocations are taking place at the same time, which are causing double free (calling free() on the same zone twice).

Note: as a reminder, you can easily create little POCs by creating a hello world for PCO via the scr pco command. Don't lose time creating the CMakeLists.txt or creating the boilerplate of threads creation.


All these crashes are caused because we need to reallocate. If there a way to avoid reallocations ? If we reserve the necessary space at the start of the main, these crashes disappear because no reallocation are necessary. Just a reminder: reserve is increasing the capacity by allocating a chunk of heap memory but doesn't initialize it with default elements. This means the size will be left unchanged.

values.reserve(TOTAL * NB_THREADS);

But there is still some big but silent issues. Do you see them ? Does the vector content is correct ?

How would look like the vector if everything was correct ?

  1. The vector size should be equal to the capacity (4*TOTAL)
  2. All values from 0 to TOTAL - 1 must be present 4 times in the final vector (because there are 4 threads)
  3. All values would be initialized (each cell was overwritten)

The vector content has numerous issues. To measure this, we added a small function checkingVectorCoherence to run once all threads are done. To detect non initialized cells in the vector buffer, we had to initialize the vector's buffer with known bytes. This achieve this without accessing the private pointer attribute, after reserve use, we pushed a fake element and used its address to memset the big zone. We filled this zone with the value 3, repeated on each byte. When checking integer (4 bytes) values later, we would find 0x03030303 where the buffer was not set.

What are the results of checkingVectorCoherence ?

Checking vector content
Value 3092 is present 3 times instead of 4
Value 3384 is present 3 times instead of 4
Value 4092 is present 3 times instead of 4
Value 4580 is present 3 times instead of 4
Value 8199 is present 3 times instead of 4
Value 13152 is present 3 times instead of 4
...
(58 lines removed)
...
Found 64 missing values
values.size() is 399945 -> 55 lost increments towards 400000
values.capacity() -> 400000
Found 9 times the initialisation flag 50529027

In this execution, we found

  • 64 values that are missing in the final vector (the first line show some of these missing values). It probably means that 64 values were erased by another value.
  • the final size is 399945 instead of the expected 400000, the size++ increment has been lost 55 times
  • 9 integers were not touched (as we encounter the flag on them)

If we just add a simple mutex around the push_back, all the mentionned issues are fixed.

Checking vector content
Found 0 missing values
values.size() is 400000 -> 0 lost increments towards 400000
values.capacity() -> 400000
Found 0 times the initialisation flag 50529027

We hope you liked this deep dive and demonstrations of all these possible loud or silent errors !

Underflow of returned size_t

return values.size() - 1;

Remember that size_t is an alias to an unsigned integer, so is it possible we have a negative value ? This negative value would underflow and generate a very big integer number. This can only happen with a concurrent reset call that would bring the size back to zero.

Analysing the update method

Note: we still consider the original code without the improvements explained above.

bool update(size_t i, int a) {
    if (i >= values.size()) return false;

    sum += -values.at(i) + a;// remove the current value and add the new one
    values.at(i) = a;

    return true;
}

Wrong sum calculation again

When you read sum += -values.at(i) + a you should be able to understand that these steps are behind (there are several order possible, this one is arbitrary)

  1. Load sum into register R1
  2. Call .at() and store the result into a register R2
  3. Calculate the inverse () of R2 and save into R2
  4. Load a into register R3
  5. Addition R1 and R2 and save in R1
  6. Addition R1 and R3 and save in R1
  7. Store R1 into sum

Again, there is a risk of T1 saving the result (step 7) and erasing another result made by T2 that ran between the steps 1 to 6 of T1. The calculation itself might be wrong as we are using 2 shared values together. If T1 does step 1, T2 is doing steps 1 to 7 updating the same index, T1 will continue with step 2 (that loads the new value instead of the previous one). The final sum will be incorrect.

TODO: is this needed to use the list formatting again and more details on local registers ? I guess this was enough detailed before, this is really the same pattern as before.

The previous list of steps could be simplified by grouping operations that are "local" (not reading or writing any shared state). This grouping in your head, is useful to avoid considering "What happens if another thread is working during these local operations ?". Local operations cannot be impacted by other threads. No matter how long they are, they can be considered a single group.

  1. Load sum into register R1
  2. Call .at() and store the result into a register R2
  3. Do the full calculation
  4. Store R1 into sum

Possible memory issues

Several memory issues could also occurs, because there are potential reallocations at the same time in push via push_back. As the previous deep dive was probably enough, we'll keep on the high level here.

What's really behind at() ? It depends on the exact implementation of the STL, but let's take a look at LLVM's libc++ implementation. This is a simplified version for ease of reading (source).

T& at(size_t n) {
    if (n >= size())
      this->__throw_out_of_range();
    return this->begin[n];
}

To avoid accessing memory out of the allocated range, it makes sure the index is below the size. Is this is really working with 2 concurrent threads ?

Here is yet another problematic scenario:

  1. T1 is checking .at(3), the if is passing because the size is 10.
  2. T1 continues by loading the begin pointer
  3. T2 runs reset which deallocate the buffer on the heap (because of shrink_to_fit)...
  4. T1 continues with accessing the value at address of begin + 3*4, oupsi...
  5. T1 is accessing a freed region, the read value can be invalid or a segfault could crash the process

The second .at() which has use this reference to write a new value, will suffer from the same issue.

Analysing the reset method

// Delete all inserted values
void reset() {
    values.clear();
    values.shrink_to_fit();
    sum = 0;
}

Again, memory issues could happen as before for the dynamic memory management for the various steps inside clear and shrink_to_fit.

This is causing issues mostly for other methods as described above. We don't see some logic bug though as it is mostly write access without further calculations.

Fixing the FastAvg implementation - custom exo

Woof this was a long journey, we're glad you are still here ! Now it's time to fix... It's your turn to code now. We prepared the exo fastavg-safe - Making FastAvg thread-safe to help you train to fix it. This include basic testing to detect some errors in your implementation. Then you can also read a proposed solution.

Thinking the concurrency with synchronisation primitives

Mutex

TODO metaphore + schema

TODO check this explanations https://stackoverflow.com/questions/3528877/can-someone-explain-mutex-and-how-it-is-used

Semaphore

TODO metaphore + schema

Common errors to detect

TODO find patterns, add original examples, create exercices to train the pattern detection.

Memory view

TODO

Proposed steps to read an existing code to detect and fix errors

With a lot of exercices and labs, you'll spend time reading some code and imagining what could go wrong... In case you feel stuck and don't know where to start, here is the proposed steps to follow.

  1. Understand the general goal of the program, what is it trying to achieve
  2. Read the main to know what kind of threads are started
    • How much thread are started ?
    • What are the entrypoint of each thread (which function or method) ?
    • Is there some roles for each thread or are they all doing the same thing ? (readers/writers, buyers/sellers/wholesalers, incrementer, ...)
  3. Understand WHERE is the shared state
    • Look at global variables and understand their needs
    • Look at pointers provided to the thread's functions
    • Look at local static variables (static keyword inside a function)
  4. Understand HOW this the shared state is used
    • Make sure the state is correctly initialized before any PcoThread is started
    • Where is it accessed ? (where are used the variables of the shared state)
    • Is it only in read or also in write ?
    • Identify the critical sections (any place where this state is accessed)
  5. Read the whole code to search for logic errors
    1. What are the rules in the instruction that must be respected ?
    2. Are they implemented correctly, making the hypothesis the running threads are never stopped in the middle of a method ?
  6. Fix the code by adding or fixing the implemented rules
    1. Imagine new scenarios to see if your fixes are improving the situations
    2. Is there still issues around this ? Does your fix is creating other issues somewhere else ?
    3. Did you protect your new shared state ?
  7. Now that the code implement the rules, read the whole code to search for concurrency issues. These issues could make the rules not always respected.
    1. Is there some unproper usage of PcoSemaphore or PcoMutex ? (i.e. calling lock twice from the same thread with a non recursive mutex).
    2. Which lines are touching the shared state and should be done without any external modifications of this state ? (Remember above the sum that must be read with values.size() without any modifications of the vector, because it would create an incoherent average calculation).
    3. When you see conditions to do certains actions only in some situations (remember the Division by zero check), make sure the reverse situations cannot happen after the condition check and before the action. (Remember the case where reset calls after the empty() check, would make the size zero again and where the division action would crash).
    4. Make sure blocking and releasing is correctly managed
      • Make sure semaphore are correctly initialized (do you want to block directly on the first acquire or only after a certain number of acquire ?)
      • Each acquire call must have and associated release somewhere during the execution. This is the same logic for lock that will need a matching unlock. You cannot just count naively in your code, you'll need several unlock before return in conditions sometimes.
  8. Make sure the program is terminated correctly
    1. All threads have a way to stop (by exiting their start function) and are not blocked in an acquire, lock or a infinite while loop. Look at the release and unlock calls.
    2. The main thread is correctly waiting on all threads before returning (calling join for all threads to make sure it doesn't main doesn't return before all started threads are fully done)
    3. All memory has been freed (see new and delete keywords)