By Gabor Laszlo Hajba‏ | 9/29/2016 | General |Beginners

Parallel processing in Python

Parallel processing in Python

In this article I will introduce you to parallel processing with threads in Python, focusing on Python 3.5.

Why parallelism? (TL;DR)

Many times we need to call an external service (web server, Database server, file, etc...) and the result is depending on it so we get into a blocking mode until the result is available. If we split our program to parallel tasks, we can use the CPU time more effectively in such cases. Also nowadays there are multicore processors on almost every machine means we have the parallelism available on hardware.

It is important to be familiar with any programming language parallel features to write more effective code. 

Python is used on Web applications when we depend on the server response time along with the database and other components and if we write code for any use other than a simple script we can use threads to make things work in parallel. In such cases we need to be familiar also with some known problems such as race condition and use synchronization objects to avoid it. 

Some Examples

In this article we will look at two basic examples: image downloading from imgur.com and a factorization-calculation.

Both examples will have a basic version where I simply introduce the problem and the code to solve it. After that I will add parallelism to both examples and see what we’ve achieved: are we getting faster to our results or are we slower because of parallel processing?

Downloading images from imgur.com

For this I have prepared a set of URL-s for images to download because this will make tests more stable for this article (if we do not count the web router caching the images we download). However I will show you the sources to enable your ability to manipulate the code.

Now let's see the code:

__author__ = 'GHajba'

from urllib.request import urlopen, Request
import json
import os


def get_image_urls(client_id):
    headers = {'Authorization': 'Client-ID {0}'.format(client_id)}
    with urlopen(Request('https://api.imgur.com/3/g/memes/', headers=headers)) as response:
        data = json.loads(response.read().decode('utf-8'))
    return map(lambda image: image['link'], data['data'])


def download_images(target_dir, url):
    path = target_dir + '/' + os.path.basename(url)

    with open(path, 'wb') as file:
        file.write(urlopen(url).read())

This code downloads the last viral memes where the links point to .jpg or .png files. I will avoid going into detail on how you can get your IMGUR_CLIENT_ID but it has to be in the environment to make things work because the Imgur API requires authentication and it is provided through the client ID.

Nevertheless, this code downloads images one-by-one.

On my Mac executing this code yields following results:

Downloaded 52 images in 13.790853023529053 seconds with Python 3
Downloaded 52 images in 15.189190864562988 seconds with Python 3
Downloaded 52 images in 13.965453863143921 seconds with Python 3
Downloaded 52 images in 13.087532997131348 seconds with Python 3
Downloaded 52 images in 14.43852710723877 seconds with Python 3

As you can see, it takes an average of 14 seconds to download 52 images from Imgur with utilizing only one core of my available 8.

Prime factors calculation

In this example I will use a slightly un-optimized version of prime factors calculation. This calculation is very CPU-intensive. Which means that it requires more processing power from the computer than the previous example, where we waited an extended period of time for the network to get all the data we needed.

The code of the factor calculation looks like this:

def factors(result, n):
    if n <= 1:
        return result
    for i in range(2, n + 1):
        if n % i == 0:
            result.append(i)
            return factors(result, n // i)

As you can see, we get a number n and a list result.

For every number starting from 2 until n we look if n is divisible by that number. If yes, we add that number to the result list and return the recursive call where we look only at the division result of n // i.

It is not optimized because we loop through every number and look for the remainder with n instead of having a list of prime numbers (or a generator to make things more efficient) -- but for the example this is very good because this tasks requires the CPU to calculate.

Now let's run this code with list of 50,000 numbers and see what happens:

Factorizing 50000 numbers took 25.89749503135681 seconds with serial recursive approach and Python 3.5.0
Factorizing 50000 numbers took 26.277130842208862 seconds with serial recursive approach and Python 3.5.0
Factorizing 50000 numbers took 26.53605008125305 seconds with serial recursive approach and Python 3.5.0
Factorizing 50000 numbers took 25.725329160690308 seconds with serial recursive approach and Python 3.5.0
Factorizing 50000 numbers took 25.732399940490723 seconds with serial recursive approach and Python 3.5.0

Well, this seems not bad, but if we raise the number of calculations we get longer calculation times:

Factorizing 55000 numbers took 33.56754684448242 seconds with serial recursive approach and Python 3.5.0
Factorizing 60000 numbers took 37.03928780555725 seconds with serial recursive approach and Python 3.5.0
Factorizing 65000 numbers took 41.349984884262085 seconds with serial recursive approach and Python 3.5.0
Factorizing 75000 numbers took 56.48437809944153 seconds with serial recursive approach and Python 3.5.0
Factorizing 100000 numbers took 100.21058487892151 seconds with serial recursive approach and Python 3.5.0

At the end we reached an average of 1 second for 1000 numbers -- and this is bad if we want to go further.

How to

After looking at the examples above it's time to tell elaborate on how to achieve parallelization in Python 3.

Parallel processing with threads is achieved using the threading library in Python -- independent of the version. This library has a class called Thread which summons a new thread to execute code you define.

There are two options how we can create threads: one is to instantiate a new Thread object with a lambda expression which tells it what to preform; the other way is to have a subclass of Thread which implements the run method which will execute the logic of the thread when started.

I recommend the latest version because it is more flexible and you have one class (a point in your codebase) where you can find the logic of your threads and it is not scattered around.

As you will see in the examples below, I set the threads to daemon threads. This is because a Python script won't terminate until it is live / running threads -- and even if a working thread created in the examples will finish it's work and the shared-data-container gets empty it won't stop running because there could be the possibility in which there is a greater workload coming. However daemon threads will enable exiting the application, because the main thread (where you start the work / run the script) is not a daemon and there are no more non-daemon threads running and therefore the script will exit.

Data sharing between threads

This is always an interesting topic, if you work with parallelism regardless of the programming language the question raises: how can you share data between threads without getting a wrong result or an exception?

A wrong result could happen if each thread downloads the same images (or just two threads download the same image in parallel).

We can utilize the Queue class from the queue module of Python.

A Queue object is a FIFO (first-in first-out) implementation, which means that if you put in an element A before element B into the queue, element A will be taken out before element B.

I mentioned above that the application ends when the main thread finishes. However if we do all the work in separate threads we would finish sooner than the threads and this would mean that we are super fast but we won't get our goal achieved.

Examples parallelized

Now we know how to parallelize work with Python it is time to look at the example applications and split the workload between threads.

Image downloader

Let's see, what we can achieve when we add threads to execute the downloading in parallel.

First of all we define a subclass from Thread which we call Downloader:

Here is the code:

__author__ = 'GHajba'

from threading import Thread
from queue import Queue


class Downloader(Thread):
    def __init__(self, queue, folder):
        Thread.__init__(self)
        self.queue = queue
        self.folder = folder

    def run(self):
        while True:
            url = self.queue.get()
            download_images(self.folder, url)
            self.queue.task_done()

As you can see, I sublcass the Thread class and initialize the new object with the reference of the queue where the data is shared in the folder where we export the files into. These two should not change during the lifetime of the thread.

The run method has an endless loop because daemon threads usually do not know how many work they have to do. At each iteration we get the next URL from the queue and call the download_images function with the new URL and the folder where we want the result to be saved. After the download finished we can tell the queue that we are done with the element we have taken out so it can rest assured that we have done a chunk out of the workload -- and does not wait endlessly in the main thread (where we will call queue.join()).

Now let's see how we initialize the threads and how we fill the queue:

thread_count = 4

queue = Queue()

for i in range(thread_count):
    downloader = Downloader(queue, 'images')
    downloader.daemon = True
    downloader.start()

for url in image_url_list:
    queue.put(url)

queue.join()

The code above starts the application with 4 threads parallel and then fills the queue. As I have mentioned, we set the new threads to daemon threads and start them. After all the threads have started we can fill in the queue with the data from the images to download URL.

Finally we block our main thread with queue.join() until all the elements of the queue are read out and finished by the threads.

I used the URLs of the images as input for the queue because there we have a definite list of multiple elements (in my test case it is 52 .jpg or .png images). We could make the extraction parallel too but it won't bring much performance gain for one site. If we would extract a bunch of sites of the memes gallery (not just the first 60 of the first site) we could parallelize it too if we know how much sites we are interested in.

When I run this example (and add some time-measurement) on my Mac I get the following result:

Downloaded 52 images in 5.1321189403533936 seconds with 4 threads and Python 3.5.0
Downloaded 52 images in 4.801907062530518 seconds with 4 threads and Python 3.5.0
Downloaded 52 images in 5.145358085632324 seconds with 4 threads and Python 3.5.0
Downloaded 52 images in 4.81237006187439 seconds with 4 threads and Python 3.5.0
Downloaded 52 images in 5.1538519859313965 seconds with 4 threads and Python 3.5.0

Much more faster. That's what we hoped when we added multiple threads to the code. Now let's see what happens when we use 8 threads:

Downloaded 52 images in 3.7929270267486572 seconds with 8 threads and Python 3.5.0
Downloaded 52 images in 3.6049129962921143 seconds with 8 threads and Python 3.5.0
Downloaded 52 images in 3.4051239490509033 seconds with 8 threads and Python 3.5.0
Downloaded 52 images in 3.8651368618011475 seconds with 8 threads and Python 3.5.0
Downloaded 52 images in 3.204490900039673 seconds with 8 threads and Python 3.5.0

A bit faster but as you can see, it does not make sense to raise the number of threads more. Naturally if the script would download larger files or you would have not a fast network connection then you could add more threads to utilize the I/O wait times.

Prime factor calculation

For this calculation we create again a Thread subclass and call it Factorizer:

__author__ = 'GHajba'

from threading import Thread
from queue import Queue

class Factorizer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            n = self.queue.get()
            result = factors([], n)
            self.queue.task_done()

The workflow is the same: in the constructor we provide the queue of the shared data and in the run method we take out one element, create it's factors and tell the queue that we are done with that element (and we do not use the result but this is currently not a matter).

To fill the queue and initialize the threads I use the same approach as previously:

thread_count = 4

queue = Queue()

for i in range(thread_count):
    factorizer = Factorizer(queue)
    factorizer.daemon = True
    factorizer.start()

for n in numbers:
    queue.put(n)

queue.join()

The shared data between the threads are the numbers we want to factorize.

If we run the script with 4 threads we get the following result:

Factorizing 50000 numbers took 30.301374912261963 seconds with 4 threads and Python 3.5.0
Factorizing 50000 numbers took 30.535978078842163 seconds with 4 threads and Python 3.5.0
Factorizing 50000 numbers took 30.37640905380249 seconds with 4 threads and Python 3.5.0
Factorizing 50000 numbers took 30.411335945129395 seconds with 4 threads and Python 3.5.0
Factorizing 50000 numbers took 31.054304122924805 seconds with 4 threads and Python 3.5.0

Almost the same runtime as previously! But what happened? Let's add more threads and see if we can get better:

Factorizing 50000 numbers took 27.70519995689392 seconds with 8 threads and Python 3.5.0
Factorizing 50000 numbers took 27.566843032836914 seconds with 8 threads and Python 3.5.0
Factorizing 50000 numbers took 27.92934489250183 seconds with 8 threads and Python 3.5.0
Factorizing 50000 numbers took 28.050817012786865 seconds with 8 threads and Python 3.5.0
Factorizing 50000 numbers took 27.751455068588257 seconds with 8 threads and Python 3.5.0

Nope. But why is this? And why does it take more time with threads than using a simple for-loop?

Creating the Queue and filling it with the numbers, and creating the threads makes the process running a bit slower but it is not the main reason: we have the same runtime in the end because the task is CPU-intensive and the CPython implementation of Python (the default you can download from the official website of Python) has a so called Global Interpreter Lock (GIL) which ensures that only one thread can execute Python code at any one time.

When we download images we wait for the network this is why it can run in parallel much more faster -- there is no code execution just network wait time.

Prime factors with lambda expressions

I have told you that there are two ways to create threads: through subclassing or with lambda expressions. I have shown the first version because I think that one is more clear and flexible -- however I will show you now the second version too. The example is the same factorization you have seen previously just instead the Factorizer class we create an "anonymous" class:

queue = Queue()

for i in range(thread_count):
    t = Thread(target = lambda: calculate(queue))
    t.daemon = True
    t.start()

for n in numbers:
    queue.put(n)

queue.join()

As you can see, we provide a lambda expression to the Thread's constructor. This lambda expression references to a function called calculate which takes a queue as parameter -- and this queue is the queue of numbers we want to factorize:

def calculate(numbers_queue):
    while True:
        number = numbers_queue.get()
        result = factors([], number)
        numbers_queue.task_done()

As you can see, the method's body is the same as from the subclassed thread. The performance is the same too -- there is no boosting in switching solutions.

Now it is up to you which version you prefer and use later on.

Conclusion

We have seen, that parallelizing applications using the threading library makes executions faster -- if we give the processor enough time to wait and utilize multiple threads. If we have CPU-consuming tasks running it does not matter how many threads we assign: we will end-up with the same results as if running a single thread. This is because of the GIL (the Global Interpreter Lock). However there is a solution for this too: starting multiple processes to conquer the same task faster -- however I will introduce this topic in another article.

By Gabor Laszlo Hajba‏ | 9/29/2016 | General

{{CommentsModel.TotalCount}} Comments

Your Comment

{{CommentsModel.Message}}

Recent Stories

Top DiscoverSDK Experts

User photo
3355
Ashton Torrence
Web and Windows developer
GUI | Web and 11 more
View Profile
User photo
3220
Mendy Bennett
Experienced with Ad network & Ad servers.
Mobile | Ad Networks and 1 more
View Profile
User photo
3060
Karen Fitzgerald
7 years in Cross-Platform development.
Mobile | Cross Platform Frameworks
View Profile
Show All
X

Compare Products

Select up to three two products to compare by clicking on the compare icon () of each product.

{{compareToolModel.Error}}

Now comparing:

{{product.ProductName | createSubstring:25}} X
Compare Now