A little example of how asyncio works

This is a simple example to show how Asyncio works without using Asyncio itself, instead using a basic and poorly written event loop. This is only meant to give a flavor of what Asyncio does behind the curtains. I’m avoiding most details of the library design, like callbacks, just to keep this simple. Since this is written as an illustration, rather than real code, I’m going to dispense with trying to keep it 2.7 compatible.

import time

This loop will be based on async functions that ‘guess’ how long they need before being able to complete. It would be an inficient way to wait for unknown items, like input, but works great for timers (which I’ll be using). In Asyncio, I would return a Future, and this would wait until .done() was true. And the loop would use callbacks and select to wait on things like file discriptors to be ready. That is only needed to make the wait time variable, ending when work needs to be done, to maximize speed and effeciency, which is not needed for this example.

I’m going to make new version of a float that I can specifically test for using isinstance, but otherwise is a float. The size of the float is a rough guess of how long to sleep.

class NotReady(float):
    pass

Now, here’s an event loop that runs through a list of tasks (created by create_task) and then sleeps if no thread is ready (based on the minimum recommended time by each incomplete task):

class EventLoop:
    def run_until_complete(self, tasks):
        results = []
        while tasks: # Stops when all tasks are done, unlike asyncio run_forever
            waits = []
            for task in tasks:
                try:
                    res = task.send(None) # async function runs here
                    if isinstance(res, NotReady):
                        waits.append(res) # Build up all the requested waits
                    else:
                        waits.append(0) # Don't wait if a task completed
                        results.append(res) # Gather results
                except StopIteration:
                    tasks.remove(task) # Task done, remove from tasks
            if waits:
                time.sleep(min(waits)) # Wait for the shortest requested wait
        
        return results

It is easy to then write a sleep function for this eventloop:

def sleep(t):
    endtime = time.time() + t
    while time.time() < endtime:
        yield NotReady(endtime - time.time())
    yield 'Sleep {} over'.format(t)

Now we can print the results, making sure it only takes 4 seconds and the tasks completed in the correct order:

Note: For this purpose, I order the output by completion order, rather than input order, unlike asyncio. Later you will also notice that I gather all outputs, rather than just the last one.

%%time
myloop = EventLoop()
print(myloop.run_until_complete([sleep(3),
                           sleep(2),
                           sleep(1),
                           sleep(4)]))
['Sleep 1 over', 'Sleep 2 over', 'Sleep 3 over', 'Sleep 4 over']
CPU times: user 567 µs, sys: 523 µs, total: 1.09 ms
Wall time: 4 s

Now the pieces are in place to see why yield from is needed:

def print_every_period(msg, period):
    for i in range(int(3/period)):
        yield from sleep(period)
        print(msg, flush=True)
    return 'Print {} over'.format(period)
myloop = EventLoop()
print(myloop.run_until_complete([print_every_period("First message!",1),
    print_every_period("Second message!",1),
    print_every_period("...",.25)]))
...
...
...
First message!
Second message!
...
...
...
...
First message!
Second message!
...
...
...
...
First message!
Second message!
...
['Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 1 over', 'Sleep 1 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 1 over', 'Sleep 1 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 1 over', 'Sleep 1 over', 'Sleep 0.25 over']

We have created a working async system, with event loop and a real async sleep function, using only generators and a normal sleep function. This is incredibly simplified, but should have given a hint as to the real system Python uses to implement generator based Asyncio. I’ve avoiding using an OS level select loop, callbacks, etc.

Just to show this is similar to asyncio:

import asyncio
@asyncio.coroutine
def as_print_every_period(msg, period):
    for i in range(int(3/period)):
        yield from asyncio.sleep(period)
        print(msg, flush=True)
    return 'Print {} over'.format(period)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
        as_print_every_period("First message!",1),
        as_print_every_period("Second message!",1),
        as_print_every_period("...",.25)))
...
...
...
Second message!
First message!
...
...
...
...
Second message!
First message!
...
...
...
...
Second message!
First message!
...
['Print 1 over', 'Print 1 over', 'Print 0.25 over']

The only change was the addition of gather, since a coroutine must be turned into a Task, which is a feature my system did not need.

Bonus: awaitable

Let’s modify the previous example to do the unthinkable: use the await/async syntax without using asyncio. First, this is a simple wrapper to change our current normal generator into an awaitable:

class asleep:
    def __init__(self, t):
        self.t = t
        
    def __await__(self):
        return sleep(self.t)

We can then use the new syntax with asleep:

async def print_every_period(msg, period):
    for i in range(int(3/period)):
        await asleep(period)
        print(msg, flush=True)

The event loop needs only tiny changes; there is no need to call iter, and we will need to explicitly close the awaitable before letting it be garbage collected:

class EventLoop:
    def __init__(self):
        self.tasks = []
        
    def create_task(self, task):
        self.tasks.append(task)
    
    def run_forever(self):
        results = []
        while self.tasks:
            waits = []
            for task in self.tasks:
                try:
                    res = task.send(None)
                    if isinstance(res, NotReady):
                        waits.append(res)
                    else:
                        waits.append(0)
                        results.append(res)
                except StopIteration:
                    task.close() # Needed to avoid "never awaited"
                    self.tasks.remove(task)
            if waits:
                time.sleep(min(waits))
                
        return results
myloop = EventLoop()
myloop.create_task(print_every_period("First message!",1))
myloop.create_task(print_every_period("Second message!",1))
myloop.create_task(print_every_period("...",.25))
print(myloop.run_forever())
...
...
...
First message!
Second message!
...
...
...
...
First message!
Second message!
...
...
...
...
First message!
Second message!
...
['Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 1 over', 'Sleep 1 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 1 over', 'Sleep 1 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 0.25 over', 'Sleep 1 over', 'Sleep 1 over', 'Sleep 0.25 over']

This is a working usage of the new syntax using only standard library tools. Note that I had to call .close, so that the task would not display a warning that it was never awaited on when it was garbage collected.

comments powered by Disqus