Skip to main content

asyncio

Python has a native asyncio package that supports async and await (similar to JS). We will be using the lastest Python version 3.11, some features may not be available in older Python versions.

Task Group

In a task group context, tasks are executed concurrently. In the example below, 2 tasks should finish at around the same time.

"""asyncio task group (python 3.11 only)"""

import asyncio

import time



async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)


async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(say_after(2, 'hello'))
task2 = tg.create_task(say_after(2, 'world'))

print(f"started at {time.strftime('%X')}")
# The await is implicit when the context manager exits.
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
print("finished all")

asyncio.gather()

The gather method also blocks the code from keep running

import asyncio

async def say(msg: str):
await asyncio.sleep(1)
print(f"say: {msg}")


async def main():
await say("a")

# this is also valid:
await asyncio.gather(
say("b"),
say("c")
)
print("finished")

asyncio.run(main())

Queue

Not thread safe. Queues — Python 3.11.2 documentation asyncio.Queue is similar to Golang: WaitGroups Tasks are created, added to queue and executed concurrently. task_done() is called after each job is finished. queue.join() will wait for all tasks in queue to finish.

import asyncio
import random
import time


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()

# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)

# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())