tags : Python, Concurrency, Multiprocessing

FAQ

Can different approaches be used together

How to run python code on multiple processors?

  • multiprocessing module
    • This article is all about using multiprocessing module to use all cpu cores. It’s true.
    • Since this is essentially multiple processes they can just run on multiple cores as per need etc.
  • threading module
    • There’s some confusion around threading module only using one CPU
    • This is not entirely true
    • It has to do with the GIL
      • When we write Python code, we have no control over the GIL.
      • But a built-in function or an extension(c/other lang) interfacing the Python/C API can “release the GIL”. In which case the operation that doesn’t need to use the python interpreter can fully use whatever OS allows it to.
  • asyncio module
    • Things similar to threading
    • Similar to threading module, if the operation itself(eg. I/O) operation doesn’t need the python interpreter that may go ahead and use what the OS allows it to
      • Eg. if the app that pulls data from about 200 web api’s simultaneously using asyncio. While the python code is single threaded, the OS schedules the network driver code across all of the CPUs.

When using threading, do I need to use a worker pool/threading pool?

  • The pool is an object that will start, hold and manage x threads transparently for you.

WTF GIL?

The GIL is a lock held by the python interpreter process whenever bytecode is being executed unless it is explicitly released. I.e. the design of the cpython interpreter is to assume that whatever that occurs in the cpython process between bytecodes is dangerous and not thread-safe unless told otherwise by the programmer. This means that the lock is enabled by default and that it is periodically released as opposed to the paradigm often seen in many multi-threaded programs where locks are generally not held except when specifically required in so-called “critical sections” (parts of code which are not thread-safe).

  • It can be considered as a mutex on the running python interpreter. It controls access to the python interpreter.
  • It’s a thing about CPython, if you use a different python interpreter(eg.Jython/IronPython). It also exists with PyPY interpreter.
    • Multi-threading in cpython “time shares” the interpreter

Pros and Cons

  • Pros

    • Provides useful safety guarantees for internal object and interpreter state.
    • Allows only a single OS thread to run the central Python bytecode interpreter loop
      • In cases(other languages) where there’s no GIL, programmer would need to implement fine grained locks to prevent one thread from stomping on the state set by another thread
      • When there’s the GIL in place, this extra work from the programmer is not needed
        • If T1 is using the interpreter
        • T2 cannot access the interpreter and hence cannot access/mutate any shared state
    • Helps avoid deadlocks and other complications
  • Cons

    • Restricts parallel processing (CPU bound) of only pure python code.
    • GIL is a blocker for you if you want to do CPU bound work with the python interpreter. i.e Till the python code that does the CPU bound task is finished the thread will keep holding the lock(unless there’s a timeout etc.)
    • see this thread for more info

GIL FAQ

  • How does the GIL float around?
  • What does “releasing the GIL” mean? OR Why is the threading module even useful?

    • “Release and Acquire the GIL” has to do with some operation that no longer needs the python interpreter its operation(eg. syscall) may release the GIL so that anything else which needs the python interpreter may use it.
      • Now the operation itself can use as may cores or whatever the OS allows it to do
      • But the python interpreter is still running there on a single core, one OS thread thing. Which keeps on going like that. But its not as bad as it sounds, because we can do a lot of things by the “release & acquire GIL” thing
    • Things that “release the GIL”
      • Every Python standard library function that makes a syscall releases the GIL.
      • This includes all functions that perform disk I/O, network I/O, and time.sleep().
        • Eg. You can spin a thread that does network I/O. It’ll automatically release the GIL and main thread can go on do its thing.
      • CPU-intensive functions in the NumPy/SciPy libraries
      • compressing/decompressing functions from the zlib and bz2 modules, also release the GIL
      • Eg. you could freely write a multithreaded prime number search program in Rust, and use it from Python
        • Check Py_BEGIN_ALLOW_THREADS

Approaches

See this thread

NameGILRemarkType
asynciodoesn’t matterMore efficient than threads for certain kind of I/O opscoroutine-based
threadingfull mattersUseful most times when not doing python-only CPU bound tasksthread-based
multiprocessingdoesn’t matterUseful if doing python-only CPU bound tasksprocess-based
subprocessdoesn’t mattersimilar to multiprocessing but diffelant
concurrent.futuresmixedwrapper around threading and processingmixed
vectorization-specialized

Asyncio

  • There’s a lot of criticism around asyncio. Must use with caution.
  • The criticism is specifically around asyncio module not asynchronous programming using coroutines in general
  • Some people suggest alternative libraries like gevent offer a better api etc. Again, need to do my own research.

How it runs

  • asyncio runs in a single thread(the event queue runs in only a single thread). It only uses the main thread.
    • Concurrency is achieved through cooperative multitasking by using Python generators
    • See Coroutines (See the blocking and non-blocking section)

AsyncIO and CPU bound tasks

  • asyncio is useful when you’re doing practically zero processing on your data.
    • Designed for when there is some external process triggering events - like a user interactive with a UI, or incoming web requests.
  • AsyncIO helps with I/O bound things, for CPU bound things we still would need to resort to multiprocessing or similar things.
  • If what’s being run by asyncio is pure-python cpu-bound, and if we don’t pause it manually, other stuff to be run by the loop will be waiting. Eg. If the web-request takes 2 second cpu-time, then even if its using asyncio it’ll be waiting and ultimately the server will be slow as entire event loop runs in a single thread. This is not a problem when the operations are not CPU bound.

Comparisons

  • AsyncIO vs Threads

    • It eliminating the downside of thread overload and allowing for considerably more connections to run at the same time. (Even though a lot of threads is not really a problem (in most cases))
    • When you use threads, you would have 50 threads spawned, each waiting on read() for a socket
      • But with asyncio you can then have an arbitrarily huge number of sockets open while not having to build up an OS-level thread for each
      • This saves us a lot of resources that threads may consume, such as memory, spawn time, context-switching, (GIL release, acquire time) etc.
    • The race between threads and asyncio then becomes who can context-switch more efficiently.
      • The efficiency in being able to context switch between different IO-bound tasks, which are not impacted by the GIL in any case.
  • AsyncIO vs concurrent.futures

    • concurrent.futures pre-dates asyncio
    • What concurrent.futures is to threads and processes is what asyncio.future is to asyncio
    • You can run asyncio tasks in a concurrent.futures.Executor

Asyncio alternatives

Usage

without asyncio

import time
def sleepy_function():
    print("Before sleeping.")
    time.sleep(1)
    print("After sleeping.")
def main():
    for _ in range(3):
        sleepy_function()
main()

with asyncio

import asyncio
async def sleepy_function():
    print("Before sleeping.")
    await asyncio.sleep(1)
    print("After sleeping.")
async def main():
    await asyncio.gather(*[sleepy_function() for _ in range(3)])
asyncio.run(main())
  • As can be seen in the above example, to use asyncio we had to transform/color almost the entirety of the program
    • When you use asyncio module every function that await for something should be defined as async
    • Any synchronous call within your async functions will block the event loop
    • Code that runs in the asyncio event loop must not block - all blocking calls must be replaced with non-blocking versions that yield control to the event loop. So you have to find alternative libraries in cases if the library at use is blocking.
      • i.e libraries like psycopg2, requests will not work directly
      • You can however use BaseEventLoop.run_in_executor to run blocking libraries with asyncio. There’s also to_thread
    • See Coroutines for more info on this
  • Main ideas

    • Coroutine is defined using the keyword async
    • Just calling the coroutine will not work
      • We need to use asyncio to run it: asyncio.run
      • asyncio.run is the event loop.
    • await keyword signals asyncio that we want to wait for some event, and that it can execute other stuff
      • We can use await for an Awaitable.
      • coroutines~(functions with ~async keyword), task and futures are examples of Awaitable
    • task are created with asyncio.create_task
      • We can think of task as a coroutine that executes in the background. Helps make things faster.
      • Instead of using create_task + await combo, we can simply use TaskGroup for more ez API.
        • TaskGroup also helps with error handling.
  • Plain coroutines vs coroutine+create_task

    # takes 3s
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
     
    async def main():
        print(f"started at {time.strftime('%X')}")
     
        await say_after(1, 'hello')
        await say_after(2, 'world')
     
        print(f"finished at {time.strftime('%X')}")
     
    asyncio.run(main())

    vs

    # takes 1s
    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
     
        task2 = asyncio.create_task(
            say_after(2, 'world'))
     
        print(f"started at {time.strftime('%X')}")
     
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
     
        print(f"finished at {time.strftime('%X')}")
    • The second example here is 1s faster because the tasks are executed in background, while in the first example we await till the completion of the coroutine.

Example

  • “Writes to sqlite are then sent to that thread via a queue, using an open source library called Janus which provides a queue that can bridge the gap between the asyncio and threaded Python worlds.”
  • https://github.com/aio-libs/janus

Threads

See Threads

Daemon and non-daemon threads

# non-daemon, no join.
# prints yoyo, prints server start, waits
# main thread finished, thread runnig
t = threading.Thread(target=serve, args=[port])
t.start()
print("yoyo")
 
# non-daemon, join.
# prints server start, waits.
# main thread waiting, thread running
t = threading.Thread(target=serve, args=[port])
t.start()
t.join()
print("yoyo")
 
# daemon, no join.
# prints server start, prints yoyo, quits.
# thread runs, main runs and finished, both exit
t = threading.Thread(target=serve, args=[port])
t.daemon = True
t.start()
print("yoyo")
  • Non-daemon threads
    • Expected to complete their execution before the main program exits.
    • This is usually achieved by waiting for the thread using join in the main thread
    • Using join is not mandatory, join is only needed for synchronization. Eg. the correctness of the program depends on the thread completing its execution and then we go on with the main thread. Without join, we’ll spin the thread and it’ll keep running and then we’ll continue running the main thread aswell.
  • Daemon threads
    • Abruptly terminated when the main program exits, regardless of whether they have completed their tasks or not.
    • It’s just a flag on a normal thread
    • You can actually call .join on daemon threads, but it’s generally considered to be not good practice.

Processes

Processes

  • This is using the multiprocessing library which is literally just using multiple processes.
  • Effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
    • Now each process gets its own GIL
  • Take care of if __name__ == "__main__" when using multiprocessing
  • State sharing etc becomes harder because its different processes

concurrent.futures

  • This is more like wrappers around threads and multiprocess support in python.
  • The ergonomics seems to be better than using threads and multiprocessing directly

components

  • Executor
    • submit
    • shutdown
    • map
      • map is not super flexible with error handling
      • If usecase requires error handling, better use submit with as_completed (See official example)
  • ThreadPoolExecutor (Subclass of Executor)
    • Based on threads
  • ProcessPoolExecutor (Subclass of Executor)
    • Based on multiprocessing

ThreadPoolExecutor

  • Error handling

    # TODO: Issue is this is only ever being run after all of the fetches which
    #       is sequential, we need a thread safe variable to indicate failure to the
    #       main thread now
    # NOTE: We want to fail the entire fetch pipeline if we get any exception,
    #       this is done because of the nature of the data, half of the data is
    #       not useful to us
    # TODO: In the ideal case, len(done) will be 1, but if in any case, wait
    #       itself is called after the fist exception len(done) will not be 1
    done, _ = futures.wait(worker_updates, return_when=futures.FIRST_EXCEPTION)
    if len(done) == 1:
        maybe_exception = done.pop()
        if maybe_exception.exception():
            # shutdown pool and can all futures
            # NOTE: We don't need to separately use cancel() here
            executor.shutdown(wait=True, cancel_futures=True)
            # NOTE: Cancelling non-running jobs is enough for our usecase,
            #       running workers will run till completion or not. we don't
            #       have to use threading.Event to write custom logic to stop
            #       already running jobs when this exception occurs based on how
            #       our jobs tables are structured.
            # NOTE: We could however do it just so that we don't un-nessarily
            #       push to s3 but it'll need our code to handle a threadsafe
            #       event variable now and do not want to do that atm
     
            # re-raise exception so that whatever's executing the process is
            # notified of it. For some reason calling result() was not raisnig
            # the error as expected
            raise maybe_exception.exception()

queue

Vectorization