Most code is serial. This means it runs from top to bottom, and it runs one line at a time. There are various different ways that we can break this paradigm and run our code “multiple lines” at a time. There are 4 main ways this is discussed:

  1. Asynchronous programming
  2. Parallel programming
  3. Multi-processing
  4. Multi-Threading

Unfortunately these terms can mean something different in different contexts, so instead I will talk about 2 more general categories asynchronous vs parallel.

Async/Await

Asynchronous execution (which I’ll shorten to async), is a way of running a program where you can switch back and forth between tasks that are executing (concurrent programming). For example if you have a function that has to access a database, but other operations can run while you’re waiting for that to finish, then you could asynchronously call the database, and run the other tasks in the meantime. Once the database call is finished, you can switch back to the method running the database call. With the following pseudocode:

async function getData(){
	return callToDatabase() // Takes a while to run
}

async function doBackgroundTasks(){
	// Do stuff in the background
}

function main(){
	a = getData()
	doBackgroundTasks()
	// Wait for a to finish if it's still not done after background tasks
	await a 
}

In this case we have 2 keywords:

  1. async: Used to declare a function as asynchronous
  2. await: Used to make the program stop and wait for an asynchronous function to finish

*Please note I represented main() as a synchronous function, some runtimes will still require you to declare it as an async function, and await it somehow

If we were to look at a diagram of how this would execute it would be something like this:

However it’s also possible to have:

In this case getData() finished early, so when await is called it’s already done and execution continues. The most complicated parts of asynchronous programming are that things can run out-of-order. This is a benefit in terms of performance, but can often lead to bugs (such as those discussed in Dining Philosophers).

Here is an example in python:

import random
import asyncio # Used to make managing async functions easier
 
async def get_data():
    print("get_data() running")
    await asyncio.sleep(random.randint(1,15))
    print("get_data() done")
    return "data"
 
async def do_background_tasks():
    print("do_background_tasks() running")
    ... # Do stuff in the background
    print("do_background_tasks() done")
 
async def main():
    # A TaskGroup is an easy way to start multiple async functions
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(get_data())
        tg.create_task(do_background_tasks())
    
    print(a.result())
 
if __name__ == "__main__":
    asyncio.run(main())

Which prints:

get_data() running
do_background_tasks() running
do_background_tasks() done
get_data() done
data

Multiprocessing

When your operating system starts an application it will setup a process. This process is the running version of the application. As such, one method of parallelism that exists is to run multiple processes. This is often a good choice if you want the tasks not just to run more efficiently, but if there are external time factors that mean you must run them simultaneously.

If you imagine for example you write an application that is an automated scraper to purchase tickets to a concert. Let’s say you want to buy 5 tickets, and you have 10 seconds before they sell out. With threads you would switch back and forth between each thread, so if each thread takes a while to startup, you may miss your window. Instead if you launch 5 separate processes, they all can run on separate cores on a machine, and run simultaneously. Please keep in mind that not all things in computing that are called multiprocessing work this way. Sometimes multiple processes can end up running on the same core, and at that point it’s no better than threading (arguably worse since it uses more resources than threads).

Here is an example of multiprocessing in python (please note that since each process is a separate python instance it is possible for this to run in parallel or concurrently depending on architecture):

from multiprocessing import Process
import time
import random
 
def task(identifier: int):
    print(f"Started #{identifier} at {time.ctime()}")
    time.sleep(random.randint(0,10))
    print(f"Ended #{identifier} at {time.ctime()}")
    
 
if __name__ == '__main__':
    processes = []
    for index in range(10):
        process = Process(target=task, args=(index,))
        process.start()
        processes.append(process)
 
    for process in processes:
        process.join()

Since they all run at the same time they can finish in any order. For example here is what I got:

Started #0 at Tue Dec 10 03:28:47 2024
Started #1 at Tue Dec 10 03:28:47 2024
Started #2 at Tue Dec 10 03:28:47 2024
Started #3 at Tue Dec 10 03:28:47 2024
Started #4 at Tue Dec 10 03:28:47 2024
Started #5 at Tue Dec 10 03:28:47 2024
Started #6 at Tue Dec 10 03:28:47 2024
Started #7 at Tue Dec 10 03:28:47 2024
Started #8 at Tue Dec 10 03:28:47 2024
Started #9 at Tue Dec 10 03:28:47 2024
Ended #4 at Tue Dec 10 03:28:48 2024
Ended #3 at Tue Dec 10 03:28:49 2024
Ended #0 at Tue Dec 10 03:28:50 2024
Ended #2 at Tue Dec 10 03:28:51 2024
Ended #5 at Tue Dec 10 03:28:52 2024
Ended #6 at Tue Dec 10 03:28:53 2024
Ended #7 at Tue Dec 10 03:28:53 2024
Ended #8 at Tue Dec 10 03:28:54 2024
Ended #9 at Tue Dec 10 03:28:55 2024
Ended #1 at Tue Dec 10 03:28:57 2024

Threading

Multi-threading is a technique in which you create multiple “threads”. A “thread” is a separated area inside a process that lets you run code. There are two types of threads:

  1. OS threads: This is a heavy weight type of thread that is created in the operating system Kernel, and managed by it
  2. Green/virtual Threads: These are “threads” that are much more lightweight and are managed in user space, typically by the language they are implemented in

For the most part in modern programming green/virtual threads are used, and so that’s what I’ll discuss here. In many implementations threads are essentially a second copy of a program that will run until some sort of “yield” point, at which point they will let another thread run. These yield points can be implemented various ways, such as with hardcoded time-slots, or more controllable implementations like “interrupts” (i.e. switch when doing I/O like opening a file). There are a few different approaches to creating threads, and thread management. For creating threads there are 2 main common approaches.

start() and join()

The first is to manage threads individually. For example you pass the thread a callback function (something to run), with it’s arguments, and then start() the thread. From there you let it run, and join() the thread to wait for it to end. For example lets say you have an app where you select a bunch of files to download, then you have to digitally “sign” with your information to confirm you downloaded the files. Let’s say you need to get the URL’s for each file before you can download them. You could do something like:

function getFileURLs(filenames: list[str]){
	result = []
	for file in filenames{
		fileURL = getURLbyFilename(file)
		result.add(fileURL)
	}
}

function downloadFiles(fileURLs:list[str]){
	for url in fileURLs{
		download(url)
	}
}

function sign(name:str, token:str) -> bool{
	isValid = signForDocment(name, token)
	return isValid
}

function main(){
	filenames = ["file1.png", "file2.jpg", "file3.docx"]
	thread = Thread(callback=getFileURLs, arguments = [filenames])

	thread.start()
	// Can ask the user for their name and token while getting the URL's
	name = askUserforName()
	token = getTokenByName(name)
	 
	valid = sign(name, token)
	
	if valid{
		result = thread.join()
		downloadFiles(result)
	} else{
		error("Unable to sign for files")
	}
}

Here is an example of this code in python:

import threading
import random
import time
from typing import List
import uuid
 
def download(filename:str) -> bool:
    print("inside download()")
    time.sleep(random.randint(0,1))
    print("Finishing download()")
    return random.choice([True, False])
 
def sign_for_docment(name:str, token:str) -> bool:
    print("inside sign_for_docment()")
    time.sleep(random.randint(0,1))
    print("Finishing sign_for_docment()")
    return random.choice([True, False])
 
def get_file_urls(filenames:List[str], result:List[str]) -> List[str]:
    print("inside get_file_urls()")
    time.sleep(random.randint(4,15))
    for file in filenames:
        result.append(f"/path/to/{file}")
    print("Finishing get_file_urls()")
    return result
 
def download_files(fileURLs:list[str]):
    print("inside download_files()")
    for url in fileURLs:
        download(url)
    print("Finishing download_files()")
 
def sign(name:str, token:str) -> bool:
    print("inside sign()")
    isValid = sign_for_docment(name, token)
    print("Finishing sign()")
    return isValid
 
def ask_user_for_name() ->str:
    print("inside ask_user_for_name()")
    print("Finishing ask_user_for_name()")
    return "name"
    
def get_token_by_name(name:str) -> str:
    print("inside get_token_by_name()")
    print("Finishing get_token_by_name()")
    return str(uuid.uuid4())
    
if __name__ == "__main__":
    filenames = ["file1.png", "file2.jpg", "file3.docx"]
    result = []
    thread = threading.Thread(target=get_file_urls, args=(filenames,result))
    thread.start()
    print("thread is running!")
    name = ask_user_for_name()
    token = get_token_by_name(name)
    valid = sign(name, token)
    if valid:
        thread.join()
        print("thread is done!")
        download_files(result)
    else:
        raise Exception("Unable to sign for files")
 

Thread Manager

In every language there are various thread managers. Thread managers (often called executors or pools) are systems that help you run threads and manage them more easily. In python there are several thread managers, a common one is ThreadPoolExecutor. This can be used to run a ton of tasks much more simply without having to explicitly start() and join() each thread:

import time
import random
import concurrent.futures
 
def task(identifier:int):
    print(f"inside task() #{identifier}")
    time.sleep(random.randint(0,10))
    print(f"done task() #{identifier}")
 
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
    for index in range(20):
        pool.submit(task, index)

Which prints something like:

inside task() #0
inside task() #1
inside task() #2
done task() #2
inside task() #3
inside task() #4
done task() #1
inside task() #5
done task() #0
inside task() #6
done task() #5
inside task() #7
done task() #4
inside task() #8
done task() #7
inside task() #9
done task() #3
inside task() #10
done task() #6
inside task() #11
done task() #11
inside task() #12
done task() #12
inside task() #13
done task() #10
inside task() #14
done task() #9
inside task() #15
done task() #15
inside task() #16
done task() #8
inside task() #17
done task() #16
inside task() #18
done task() #13
inside task() #19
done task() #17
done task() #14
done task() #18
done task() #19

Concurrency vs Parallelism

Concurrent execution is just switching back and forth between threads/processes, whereas parallel execution is when multiple threads/processes are running at the same time. In order for something to be parallel there needs to be support at a hardware level. Additionally parallelism tends to take more resources, and is harder to synchronize. For example a system with multiple processor cores for separate processes to run in.

Synchronization

Accessing variables can become complicated in concurrent and parallel contexts. There a major issue can come up, data races. A data race is when multiple processes try to access and modify shared data. This can lead to the data being corrupted, or just incorrect. As a simple example consider a shopping system that processes payments:

current_id

function increment_id(){
	global current_id
	for 0..1000{
		current_id += 1
	}
}

function main(){
	t1 = thread(increment_id)
	t2 = thread(increment_id)
	t3 = thread(increment_id)
	t4 = thread(increment_id)
	
	t1.start()
	t2.start()
	t3.start()
	t4.start()

	t1.join()
	t2.join()
	t3.join()
	t4.join()
	
	print(f"Final counter value: {counter}")
}

In python it’s much harder than other languages to cause a race condition. Python has a mechanism called the GIL that makes it hard to cause one. That being said the below example still often races:

import threading
 
counter = 0
 
def increment():
    global counter
    for _ in range(1000000):
        counter += 1
    print(f"Closed with count = {counter}")
    
threads = []
    
for _ in range(15):
    t = threading.Thread(target=increment)
    t.start()
    threads.append(t)
 
for t in threads:
    t.join()

With the output:

Closed with count = 1000000
Closed with count = 2000000
Closed with count = 3573189
Closed with count = 4000000
Closed with count = 5000000
Closed with count = 6000000
Closed with count = 7000000
Closed with count = 8000000
Closed with count = 9602335
Closed with count = 10000000
Closed with count = 11000000
Closed with count = 12585059
Closed with count = 13000000
Closed with count = 14000000
Closed with count = 15000000

You will notice in this example the final result is the expected result. This behavior makes identifying race conditions very difficult. If I modify the example to copy counter first, then modify, then writeback you can cause a race condition constantly:

import threading
 
counter = 0
 
def increment():
    global counter
    c = counter
    for _ in range(1000000):
        c += 1
    print(f"Closed with count = {counter}")
    counter = c
    
threads = []
    
for _ in range(15):
    t = threading.Thread(target=increment)
    t.start()
    threads.append(t)
 
for t in threads:
    t.join()
    
print(f"Final count is {counter}")

For example:

Closed with count = 0
Closed with count = 1000000
Closed with count = 1000000
Closed with count = 1000000
Closed with count = 1000000
Closed with count = 2000000
Closed with count = 2000000
Closed with count = 3000000
Closed with count = 3000000
Closed with count = 3000000
Closed with count = 4000000
Closed with count = 4000000
Closed with count = 3000000
Closed with count = 5000000
Closed with count = 3000000
Final count is 6000000

Synchronization Primitives

When programming there are many synchronization primitives, which are tools used to make synchronization possible in concurrent and parallel contexts. This helps avoid data races.

Terminology

When looking at synchronization it’s worthwhile to know a few terms to be able to understand some of the content:

  • Mutual Exclusion: This is a fancy term to say that only 1 thing can happen at a time. If you have code that is mutually exclusive, then two threads cannot access it simultaneously.
  • Deadlocks/Livelocks: When you manage to make your code get stuck, usually because every thread is waiting on a resource that has been permanently locked by accident. A deadlock means there are no operations running in the threads, a livelock is when there are operations running, but the thread can’t continue
  • Sections: When talking about code in concurrent/parallel contexts we often break it up into discrete sections
    • Entry Section: The code that happens before the critical section, essentially the code that runs to setup the critical section, typically this is where any locks are acquired
    • Critical section: This refers to a part of the code that accesses a resource that is shared. Essentially this part of the code that does work, and should have a limited number of threads accessing it
    • Exit section: The code that cleans up after the critical section, this typically is where locks are released

Dining Philosophers

Avoiding data races is important, but another problem comes up depending on your solution, Deadlocks/Livelocks. This is when you implement a synchronization system that makes it possible to get “stuck” (locked). As an example imagine table where there are philosophers sitting at a table. The philosophers can either think() or eat(). When philosophers want to eat() they must pick up a chopstick on the right, and left hand side of them:

function think(){
	... // think 
}

function eat(){
	while not both_copsticks:
		if is_available(right_chopstick) and not_in_hand(right_chopstick):
			pickup_chopstick(left_chopstick)
		if is_available(left_chopstick) and not_in_hand(left_chopstick):
			pickup_chopstick(left_chopstick)
	eat_food()
	put_down_chopsticks(left_chopstick, right_chopstick)
}

Lets say you have a table of 6 philosophers, and 6 chopsticks. It is possible to have a situation where every chopstick is picked up, but no one can eat(), meaning they all get stuck in a loop waiting for another chopstick. This stuck state is called a deadlock: If each philosopher enters eat() at the same time, they will pick up the chopstick to their right, and be endlessly stuck waiting for the left chopstick, which will never be available. This analogy can apply to any shared resource.

Mutex

A mutex (or spinlock) is a synchronization primitive that allows threads/processes to acquire() (or lock()) and release() (or unlock()) it. You can imagine this like being given a key to enter. When you acquire() the key, you can enter, and then you release() the key back to the next person who needs it. The most important thing to keep in mind is that for a mutex to work properly, the acquire() must be atomic. This is typically done in hardware, and ensures that it is not possible for two threads to simultaneously acquire(). For example:

counter = 0 // Shared resource

function increment(){
	// Entry section
	mutex.acquire() // Wait on mutex
	 
	// Critical section
	counter += 1
	
	// Exit section
	mutex.release()
}

threads = []

for 0..5{
	t = Thread(increment)
	t.start()
	threads.add(t)
}

for thread in threads{
	thread.join()
}

Here is that same example in python:

import threading
 
# Shared resource
counter = 0
 
# Create a lock
mutex = threading.Lock()
 
def increment_counter():
    global counter
    for _ in range(1000):
        # Entry Section
        mutex.acquire() # Acquire the lock before modifying the shared resource
        try:
            # Critical Section
            counter += 1
        finally:
            # Exit section
            mutex.release() # Release the lock
 
# Create multiple threads
threads = []
for _ in range(5):  # 5 threads
    t = threading.Thread(target=increment_counter)
    threads.append(t)
    t.start()
 
# Wait for all threads to complete
for t in threads:
    t.join()
 
print(f"Final counter value: {counter}")

Keep in mind you can have multiple separate mutexes if you need them.

Condition Variable

A condition variable is a system to signal to threads and wake them up. This comes up typically in the producer-consumer (or readers-writers) problem, where you have producers that produce data that consumers need to consume (i.e. email clients producing emails that the email server needs to send). In these cases the threads for the consumer spend a lot of time waiting if there is nothing being produced.

There are typically two solutions to this:

  1. Long polling: Waiting by making the consumer threads sleep() for a set amount of time, and then checking if they have anything to do
  2. Condition variables: An efficient system that is used to notify() threads, and wake them up only when they have something to do

A condition variable will have 2 functions typically notify() and wait(). wait() will make the thread sit idle until notify() is called from somewhere else in the code. This is much more efficient than sleeping for a set interval (long polling) for a few reasons:

  1. Even a sleep like 100ms can massively slowdown systems of thousands of threads
  2. Consumer threads run exactly when they need to, without any wasted time

Here is a long polling example:

import threading
import time
 
# Shared data
data = []
mutex = threading.Lock()  # Lock to protect shared data
 
# Producer function
def producer():
    global data
    for i in range(5):  # Produce 5 items
        with mutex:
            print(f"Producer: Adding item {i}")
            data.append(i)  # Produce an item
        time.sleep(1)  # Simulate time taken to produce
 
# Consumer function
def consumer():
    global data
    for _ in range(5):  # Consume 5 items
        while True:
            with mutex:
                if data:  # Check if there's data to consume
                    item = data.pop(0)
                    print(f"Consumer: Consumed item {item}")
                    break
            time.sleep(0.1)  # Wait before checking again (long-polling delay)
 
# Create and start the threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
 
producer_thread.start()
consumer_thread.start()
 
# Wait for both threads to complete
producer_thread.join()
consumer_thread.join()
 
print("All items have been produced and consumed.")

You can see in this example the consumer is waiting a set amount of time between checking if data is available. Compare this to the condition variable version:

import threading
import time
 
# Shared data
data = []
condition = threading.Condition()
 
# Producer function
def producer():
    global data
    for i in range(5):  # Produce 5 items
        with condition:
            print(f"Producer: Adding item {i}")
            data.append(i)  # Produce an item
            condition.notify()  # Notify the consumer
        time.sleep(1)  # Simulate time taken to produce
 
# Consumer function
def consumer():
    global data
    for _ in range(5):  # Consume 5 items
        with condition:
            while not data:  # Wait until there is data
                condition.wait()
            item = data.pop(0)  # Consume the item
            print(f"Consumer: Consumed item {item}")
 
# Create and start the threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
 
producer_thread.start()
consumer_thread.start()
 
# Wait for both threads to complete
producer_thread.join()
consumer_thread.join()
 
print("All items have been produced and consumed.")

This is much more efficient because the consumer context manager (with condition inside consumer()), only runs when condition.notify() is called in producer(), otherwise the consumer() thread is doing nothing and wasting no CPU time sleeping.

Semaphore

A semaphore is similar to a mutex, but it allows for more than 1 thing to access the critical section/shared resource at a time. For example let’s say you can have up to 3 threads simultaneously accessing a server, you can setup a semaphore with an initial value of 3, and at any given time at most 3 threads will access it.

Semaphores have various implementations, but in general they have some sort of a value (sometimes called count), which is an integer that indicates how many things can currently access the critical section/shared resource. In these examples I also used a condition variable to wake up threads, but you can also just sleep after the operations (though this can slow down your program a lot, and cause many out-of-order executions). Semaphores have 2 methods, wait() and signal() (sometimes called other things like notify() instead of signal()).

Wait will:

class Semaphore{
	function wait(){
		// Wait for resource to be available
		while self.value <= 0{
			self.condition.wait()
		}
		self.value -= 1 // Decrement now you're allowed access to the resource
	}
}

Meaning you have to sit and wait, until Semaphore.value goes above 0, at which point you’re allowed in to the critical section/allowed to accessed shared resource, and need to decrement the value as you enter. Now that you’re done waiting, once finished with the critical section/access you will then signal() to indicate you’re done:

class Semaphore{
	function signal(){
		self.value += 1
		self.condition.notify() // Tell another thread to wake up
	}
}

For example if you initialize a semaphore with a count of 3, then have 1 thread currently accessing the critical section/resource, you would end up with a count of 2 on the semaphore, until the thread is done.

import threading
 
class Semaphore:
    """
    Semaphore represents a counting semaphore that controls access to a shared resource."""
    def __init__(self, initial_value:int):
        """Initialize the semaphore with the given initial value
 
        Parameters
        ----------
        initial_value : int
            The initial value of the semaphore (essentially the capacity)
        """
        self.value = initial_value
        self.mutex = threading.Lock()
        self.condition = threading.Condition(self.mutex)
 
    def wait(self):
        """Decrement the semaphore value, blocking if it would go below zero"""
        with self.condition:
            while self.value <= 0:
                self.condition.wait()
            self.value -= 1  # Decrement semaphore value under mutex protection
 
    def signal(self):
        """Increment the semaphore value and wake one waiting thread"""
        with self.condition:
            self.value += 1  # Increment semaphore value under mutex protection
            self.condition.notify()  # Wake up one waiting thread

*Please note that this semaphore is “generally” safe, but I would recommend using [threading.Semaphore()](threading — Thread-based parallelism — Python 3.13.1 documentation) for any serious code

You can then test this out with:

import time
 
def worker(semaphore, id):
    print(f"Worker {id} waiting...")
    semaphore.wait()
    print(f"Worker {id} working...")
    time.sleep(1)
    print(f"Worker {id} done.")
    semaphore.signal()
 
sem = Semaphore(2)  # Semaphore with an initial value of 2
 
threads = [threading.Thread(target=worker, args=(sem, index)) for index in range(50)]
 
for thread in threads: # Start threads
    thread.start()
 
for thread in threads: # Wait for them all to finish
    thread.join()

You will watch the threads run 2 at a time:

Worker 0 waiting...
Worker 0 working...
Worker 1 waiting...
Worker 1 working...
... // rest of workers waiting
Worker 48 waiting...
Worker 49 waiting...
Worker 0 done.
Worker 1 done.
Worker 2 working...
Worker 3 working...
Worker 2 done.
Worker 3 done.
Worker 4 working...
Worker 5 working...
... // Rest of workers taking turns
Worker 46 done.
Worker 47 done.
Worker 48 working...
Worker 49 working...
Worker 48 done.
Worker 49 done.

Software-based solutions

There are many good solutions to Synchronization issues with Synchronization Primitives. Unfortunately all of them rely on some sort of hardware support for atomic operations (operations that are guaranteed mutual exclusion). What about solutions that are instead algorithmic, and don’t rely on these Synchronization Primitives?

Dekker’s algorithm

Theodorus Dekker is as far as we know the first known software solution to Synchronization. Unfortunately his algorithm was only spoken/written about by Edsger Dijkstra, and never formally published. As such we don’t have a “reference” version of the algorithm. So instead I will just use the one that is popularized by the wikipedia article:

int turn;
boolean wants_to_enter[2];


wants_to_enter[0] = false;
wants_to_enter[1] = false;
turn = 0;   // or 1

function dekkers(int process_id, int other_id){
	wants_to_enter[process_id]= true;
	while wants_to_enter[other_id] {
	  if turn != process_id {
		 wants_to_enter[process_id] = false;
		 while turn != process_id {
		   // busy wait
		 }
		 wants_to_enter[process_id] = true
	  }
	}
	
	// critical section
	...
	turn = other_id
	wants_to_enter[process_id] = false
	// remainder section

}

The algorithm allows for 2 processes to have mutual exclusion for the critical section. Here is an example in python:

from concurrent.futures import ThreadPoolExecutor
import random
import time
turn = 0
wants_to_enter = [False, False]
 
def dekkers(process_id:int, other_id:int):
    global wants_to_enter, turn
    print(f"Entering Dekkers() with thread {process_id}")
    time.sleep(random.randint(0,2)) # adding randomness to pontentially mess up ordering
    wants_to_enter[process_id] = True
 
    while wants_to_enter[other_id]:
        if turn != process_id:
            wants_to_enter[process_id] = False
            while turn != process_id:
                ... # busy wait
        wants_to_enter[process_id] = True
 
	# critical section
    ...
 
    turn = other_id
    wants_to_enter[process_id] = False
	# remainder section
    print(f"Exiting Dekkers() with thread {process_id}")
 
if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(dekkers, 0,1)
        executor.submit(dekkers, 1,0)

Peterson’s algorithm

Peterson’s algorithm is one of the original software based solutions to Synchronization. The simplest version of the algorithm will allow for 2 processes to have mutual exclusion to the critical section. The pseudocode looks like this:

int turn;        // Specifies which process can enter
boolean flag[2]; // Specifies which process intends to enter

int i; // The index representing the current process
int j; // The index representing the other process

while (true) {
	 flag[i] = true; // Indicate your intent to enter
	 turn = j;
	 
	 while (flag[j] && turn == j);
	 
	 /* critical section goes here*/
	 

	 /*remainder section */
	 flag[i] = false;
}

Let’s break this down:

  • turn; an integer that indicates whose turn it is to enter the critical section
  • flag[]; An array of booleans that indicates if a process wants to enter the critical section (true if it wants to enter).
    • The indices represent each process. So flag[0] represents process 1, and flag[1] is process 2
  • i; An integer indicating the current process that’s running (i.e. if the code is running in process 0, then i == 0)
  • j; An integer indicating the other process that’s running (i.e. if the code is running in process 0, then j == 1 )

*This is how the code is traditionally represented, but for the future examples I’ve replaced i with current_process_id and j with other_process_id to make it easier to read.

Each process would then run it’s own version of the Peterson’s algorithm (with flag[] and turn as shared variables), and replace current_process_id with it’s own ID and other_process_id with the other processes’ id. Here’s a working example in python:

from concurrent.futures import ThreadPoolExecutor
 
turn = 0
counter = 0
flag = [False, False]
 
def critical_section(process_id:int):
    global counter
    print(f"In the critical section for {process_id}, and counter is {counter}")
    counter += 100
    print(f"Exiting the critical section for {process_id}, and counter is {counter}")
 
 
def petersons(current_thread_id:int = 0, other_thread_id:int=1):
    global turn, flag, counter
    print(f"Entering petersons() in thread {current_thread_id}, and counter is {counter}")
    
    completed = 0
    TIMES_TO_RUN_CRITICAL_SECTION = 10
    
    while completed < TIMES_TO_RUN_CRITICAL_SECTION:
        print(f"In petersons(), in thread {current_thread_id}, loop #{completed+1} and counter is {counter}")
        flag[current_thread_id] = True
        turn = other_thread_id
        while (flag[other_thread_id] and turn == other_thread_id):
            ... # Busy wait
 
        # Critical Section
        critical_section(current_thread_id)
        
        flag[current_thread_id] = False
        # Remainder section
        completed += 1
 
    print(f"Exiting petersons() in thread {current_thread_id}, and counter is {counter}")
 
def run_thread(process_id:int):
    global counter
    print(f"Starting {process_id}, and counter is {counter}")
    if process_id == 0:
        petersons(0, 1)
    else:
        petersons(1, 0)
    
if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(run_thread, 0)
        executor.submit(run_thread, 1)
 

In this example I made a few modifications:

  1. I added a shared resource called counter. Since we’re running in multiple threads you can tell if the implementation is correct, because the value will be TIMES_TO_RUN_CRITICAL_SECTION*100*2
  2. I ran this example with Threads, not processes. But it’s the exact same principle (threads are just quicker in python)

The output will look something like:

Starting 0, and counter is 0
Entering petersons() in thread 0, and counter is 0
Starting 1, and counter is 0
In petersons(), in thread 0, loop #1 and counter is 0
Entering petersons() in thread 1, and counter is 0
In petersons(), in thread 1, loop #1 and counter is 0
In the critical section for 0, and counter is 0
Exiting the critical section for 0, and counter is 100
In petersons(), in thread 0, loop #2 and counter is 100
In the critical section for 1, and counter is 100
Exiting the critical section for 1, and counter is 200
In petersons(), in thread 1, loop #2 and counter is 200
In the critical section for 0, and counter is 200
Exiting the critical section for 0, and counter is 300
In petersons(), in thread 0, loop #3 and counter is 300
In the critical section for 1, and counter is 300
Exiting the critical section for 1, and counter is 400
In petersons(), in thread 1, loop #3 and counter is 400
In the critical section for 0, and counter is 400
Exiting the critical section for 0, and counter is 500
In petersons(), in thread 0, loop #4 and counter is 500
In the critical section for 1, and counter is 500
Exiting the critical section for 1, and counter is 600
In petersons(), in thread 1, loop #4 and counter is 600
In the critical section for 0, and counter is 600
Exiting the critical section for 0, and counter is 700
In petersons(), in thread 0, loop #5 and counter is 700
In the critical section for 1, and counter is 700
Exiting the critical section for 1, and counter is 800
In petersons(), in thread 1, loop #5 and counter is 800
In the critical section for 0, and counter is 800
Exiting the critical section for 0, and counter is 900
In petersons(), in thread 0, loop #6 and counter is 900
In the critical section for 1, and counter is 900
Exiting the critical section for 1, and counter is 1000
In petersons(), in thread 1, loop #6 and counter is 1000
In the critical section for 0, and counter is 1000
Exiting the critical section for 0, and counter is 1100
In petersons(), in thread 0, loop #7 and counter is 1100
In the critical section for 1, and counter is 1100
Exiting the critical section for 1, and counter is 1200
In petersons(), in thread 1, loop #7 and counter is 1200
In the critical section for 0, and counter is 1200
Exiting the critical section for 0, and counter is 1300
In petersons(), in thread 0, loop #8 and counter is 1300
In the critical section for 1, and counter is 1300
Exiting the critical section for 1, and counter is 1400
In petersons(), in thread 1, loop #8 and counter is 1400
In the critical section for 0, and counter is 1400
Exiting the critical section for 0, and counter is 1500
In petersons(), in thread 0, loop #9 and counter is 1500
In the critical section for 1, and counter is 1500
Exiting the critical section for 1, and counter is 1600
In petersons(), in thread 1, loop #9 and counter is 1600
In the critical section for 0, and counter is 1600
Exiting the critical section for 0, and counter is 1700
In petersons(), in thread 0, loop #10 and counter is 1700
In the critical section for 1, and counter is 1700
Exiting the critical section for 1, and counter is 1800
In petersons(), in thread 1, loop #10 and counter is 1800
In the critical section for 0, and counter is 1800
Exiting the critical section for 0, and counter is 1900
Exiting petersons() in thread 0, and counter is 1900
In the critical section for 1, and counter is 1900
Exiting the critical section for 1, and counter is 2000
Exiting petersons() in thread 1, and counter is 2000

This solution does assume a few things. Most relevant is that the caches are aligned. In systems with multiple processors, it’s possible to have values on separate caches that can create issues. Details here. As a comparison here is what the equivalent code would look like if you were to use a Mutex instead:

# This is an alternative version of the same code, but using a mutex instead
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
 
turn = Lock()
counter = 0
 
def critical_section(process_id:int):
    global counter
    print(f"In the critical section for {process_id}, and counter is {counter}")
    counter += 100
    print(f"Exiting the critical section for {process_id}, and counter is {counter}")
 
 
def run_thread(process_id:int):
    global counter
    print(f"Starting {process_id}, and counter is {counter}")
    completed = 0
    while completed < 100:
        with turn: # Lock until it's the current processes' turn
            critical_section(process_id)
        completed+=1
    
if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(run_thread, 0)
        executor.submit(run_thread, 1)

Peterson’s n-process algorithm (TODO)

Peterson developed a solution that worked for n processes, using a tournament style selection system. The algorithm itself is quite complex, and often ends up with many mistakes.

flag[n-1] // All start as -1 initially
turn[n-2] // 0

The main

Here is an example of peterson’s n-process algorithm in python:

import time
from concurrent.futures import ThreadPoolExecutor
 
# Number of processes
number_of_processes = 12
TIMES_TO_RUN_CRITICAL_SECTION = 10
 
# Shared variables
flag = [-1] * number_of_processes # Flags to indicate a process's current level
turn = [0] * (number_of_processes - 1)  # Turn array for each level
counter = 0  # Shared resource
 
def critical_section(process_id: int):
    """Critical section logic."""
    global counter
    c = counter
    print(f"Process {process_id}, Counter: {counter}")
    c += 1
    counter = c
    print(f"Process {process_id} exiting critical section, Counter: {counter}")
 
def peterson_n_process(process_id:int):
    """Implementation of Peterson's n-process algorithm."""
    global flag, turn
 
    # Entry Section
    for level in range(number_of_processes - 1):  # Compete at each level
        flag[process_id] = level  # Declare intent to enter level k
        turn[level] = process_id  # Assume it's my turn at level k
        for process_index in range(number_of_processes):
            if process_index != process_id:
                while flag[process_index] >= level and turn[level] == process_id:
                    pass  # Busy-wait
 
    # Critical Section
    critical_section(process_id)
 
    # Exit Section
    flag[process_id] = -1  # Indicate that the process is no longer competing
 
def run_task(process_id:int):
    """Simulates a single process executing multiple critical sections."""
    for _ in range(TIMES_TO_RUN_CRITICAL_SECTION):
        peterson_n_process(process_id)
 
if __name__ == "__main__":
    t1 = time.time()
    with ThreadPoolExecutor(max_workers=number_of_processes) as executor:
        for i in range(number_of_processes):
            executor.submit(run_task, i)
    t2 = time.time()
    
    print(f"Took {t2-t1} seconds")
    print(f"Which is {(t2-t1)/60} mins")

You will notice when you run the algorithm that it is incredibly slow. Consider the following mutex-based solution:

from threading import Lock
from concurrent.futures import ThreadPoolExecutor
 
n = 12  # Number of processes (threads)
flag = [False] * n  # Flags for each process
turn = [-1] * (n - 1)  # Turn array for Peterson's n-process hierarchy
counter = 0  # Shared counter
mutex = Lock()
TIMES_TO_RUN_CRITICAL_SECTION = 10
 
def critical_section(process_id: int):
    """Critical section logic."""
    global counter
    print(f"In the critical section for {process_id}, and counter is {counter}")
    c = counter
    c += 100
    counter = c
    print(f"Exiting the critical section for {process_id}, and counter is {counter}")
 
def run_thread(process_id: int):
    """Thread function to execute the Peterson's algorithm."""
    print(f"Starting thread {process_id}, and counter is {counter}")
    completed = 0
    
 
    while completed < TIMES_TO_RUN_CRITICAL_SECTION:
        with mutex:
            critical_section(process_id)
        completed += 1
    print(f"Exiting petersons() in thread {process_id}, and counter is {counter}")
    
 
if __name__ == "__main__":
    import time
    t1 = time.time()
    with ThreadPoolExecutor(max_workers=n) as executor:
        for i in range(n):
            executor.submit(run_thread, i)
    t2 = time.time()
    
    print(f"Took {t2-t1} seconds")
    print(f"Which is {(t2-t1)/60} mins")

Comparing the n-Peterson’s to the mutex version, the mutex version took ~7.5ms, and Testing on my machine n-Peterson’s took ~66 seconds (~8800x slower). There are several possible reasons for this, but the main 3 contributors in order of severity are:

  1. Scalability; As you increase the number of processes n-petersons inherently slows. Looking at the code, each process runs for n-1 iterations, over n values. Meaning the complexity is this kind of a scaling factor is horrible, and in practical situations today this would negate most benefits made by doing multiprocessing. For example upping the number of processes to 30, the mutex solution takes ~18ms, and n-Petersons took ~476.98 seconds (7mins 57 seconds), which is ~26,499x slower.
  2. Busy Waiting; Each process is busy waiting, this means it’s using cycles on the CPU to do nothing. This can waste time, since the CPU is still “working” while this is happening. You could instead choose to sleep() the process for a set amount of time, but you run the risk of slowing down the overall runtime if this sleep value is too high (trading wasted CPU for wasted time). This is where something like a Condition Variable would help tremendously.
  3. Cache invalidation; I’m not sure the intricate details of ThreadPoolExecutor in python, but if it distributes the threads across CPU cores you can often run into an issue of wasting time where the data from the program has to be shared between the cores, and has to be invalidated every time turn or flag changes. Wasting time on the CPU memory.

Additional Resources