Compiling workflows#

The workflow() decorator rewrites the code of the underlying function to a state machine. The state machine is then compiled to a big switch statement. This enables us to resume the execution of the workflow from any await instruction. Even on another machine.

Let’s consider the following workflow:

async def head(cnt, src):
    res = []
    while len(res) < cnt:
        line = await src.read_line()
        if line is None:
            break
        res.append(line)
    return res

This would be compiled to this machine:

digraph head { node [shape=record fontname="Sans serif" fontsize="12"]; edge [style=dashed]; Start [style=invis] ;inline_140069583001472 [nojustify=true label="{<body> res\ =\ \[\]\l}"];private_1 [nojustify=true label="{<condition> if\ len(res)\ \<\ cnt:\l\ \ ...\l}"];private_2 [nojustify=true label="{<body> return\ res\l}"];inline_140069582998496 [nojustify=true label="{<awaiting> await\ src.read_line()\l}"];public_1 [nojustify=true label="{<body> if\ line\ is\ None:\l\ \ \ \ break\lres.append(line)\l}"]; Start -> inline_140069583001472 [style=bold]; inline_140069583001472:s -> private_1; private_1:s -> inline_140069582998496:n [label=True]; private_1:s -> private_2:n [label=False]; inline_140069582998496:s -> public_1 [style=bold]; public_1:body:e -> private_2:e [label=break]; public_1:s -> private_1; }

Which, in turn, would be compiled to these python cases:

case 0:
    res = []
    _otf_pos = -1
    continue
case -1:
    if len(res) < cnt:
        return _otf_suspend(position=1, variables=locals(), awaiting=src.read_line())
    else:
        _otf_pos = -2
        continue
case -2:
    return res
case 1:
    line = _otf_fut.result()
    if line is None:
        _otf_pos = -2
        continue
    res.append(line)
    _otf_pos = -1
    continue

This is just a peak inside OTF’s internal machinery.

The generated function returns Suspension whenever it hits an await. This Suspension can be used to resume the workflow once the result we’re awaiting on is available. All the states are identified by numbers. The public states (the one we can reenter the function through) have positive id. The state 0 correspond to the start of the function, the state 1 to the continuation of the first await that appears in the source function, the state 2 to the second await

All the functions and variables starting with _otf_ are used for internal purposes only. In order to understand this code here’s a quick explainer of the ones used here:

  • _otf_pos: target state in the state machine.

  • _otf_fut: value that we were just awaiting on.

  • _otf_suspend(): build a Suspension that captures everything weneed to resume the workflow after awaiting on a variable.