Screenplay: Pointless Blinking With Python, asyncio
, and libgpiod
(and a Raspberry Pi of Course)¶
Setup Before Presentation¶
Starting Point¶
Download
code/thread-multi.py
asblink.py
, andchmod +x
Download
code/gpio-multi.py
asgpio.py
, andchmod +x
Download
code/off.py
asoff.py
, andchmod +x
Download snippets/* into
snippets
Greeting¶
In a subdirectory:
stuff.py
fromsnippets/stuff-complete
Save away blink program with the squares etc
Multiple Background Threads¶
$ strace -f python3 code/thread-multi.py
...
[pid 4677] write(1, "hello left\n", 11hello left
) = 11
[pid 4677] clock_gettime64(CLOCK_MONOTONIC, {tv_sec=164646, tv_nsec=833862215}) = 0
[pid 4677] _newselect(0, NULL, NULL, NULL, {tv_sec=0, tv_usec=500000} <unfinished ...>
[pid 4679] <... _newselect resumed>) = 0 (Timeout)
[pid 4679] write(1, " hello mi"..., 61 hello middle
) = 61
[pid 4679] clock_gettime64(CLOCK_MONOTONIC, {tv_sec=164646, tv_nsec=845864201}) = 0
[pid 4679] _newselect(0, NULL, NULL, NULL, {tv_sec=1, tv_usec=0} <unfinished ...>
[pid 4678] <... _newselect resumed>) = 0 (Timeout)
[pid 4678] write(1, " "..., 61 hello right
) = 61
Three independent PIDs, using
select()
to implementtime.sleep()
(NULL
fds)Fourth (main thread) also involved occasionally, likely due to Python’s weird thread management (GIL)
Managed by OS scheduler
⟶ scheduling jitter, heavy (?) OS load
Blink
#!/usr/bin/env python
from threading import Thread
import time
def hello_left():
for _ in range(10):
print('hello left')
time.sleep(0.5)
def hello_right():
for _ in range(10):
print('hello right'.rjust(60))
time.sleep(0.4)
def hello_middle():
for _ in range(10):
print('hello middle'.center(60))
time.sleep(0.3)
t1 = Thread(target=hello_left)
t2 = Thread(target=hello_right)
t3 = Thread(target=hello_middle)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
Enter asyncio
¶
Replace
threading
andtime
withasyncio
async
functions,await asyncio.sleep(...)
async def main()
asyncio.run(main())
$ strace -f code/async-multi.py
...
epoll_wait(3, [], 1, 201) = 0
...
Single thread!
Timeouts apparently multiplexed on the event loop’s timeout parameter
Blink
#!/usr/bin/env python
import asyncio
async def hello_left():
for _ in range(5):
print('hello left')
await asyncio.sleep(0.5)
async def hello_right():
for _ in range(5):
print('hello right'.rjust(60))
await asyncio.sleep(0.8)
async def hello_middle():
for _ in range(4):
print('hello middle'.center(60))
await asyncio.sleep(1)
async def main():
t1 = asyncio.create_task(hello_left())
t2 = asyncio.create_task(hello_middle())
t3 = asyncio.create_task(hello_right())
await t1
await t2
await t3
asyncio.run(main())
Character Device Based GPIO¶
The way to go for GPIO on Linux
Alternative:
sysfs
GPIOUnmaintained
Not immune to hotplug GPIO (e.g. USB GPIO controller) ⟶ fixed number range
Not reset when application crashes (a feature that is not wanted by most people)
No pullup/pulldown configuration
RPi.GPIO
Raspberry specific
Weird (a background thread fires event/interrupts)
Bound to go away (I hope)
Libgpiod V2: New Major Release with a Ton of New Features - Bartosz Golaszewski
GPIO Device¶
/dev/gpiochip0
: character deviceOpened by processes
Communication via
ioctl()
Selectable ⟶ events/interrupts
User space library:
libgpiod
$ ls -l /dev/gpiochip0
ls -l crw-rw---- 1 root gpio 254, 0 Apr 9 13:30 /dev/gpiochip0
Most Basic Feature: Setting GPIO Values¶
Request GPIO 11, 10, 27
… configuring for output
#!/usr/bin/env python
import gpiod
import time
REQUEST = gpiod.request_lines(
'/dev/gpiochip0',
consumer='mytest',
config={(11,10,27): gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})
REQUEST.set_values({11: gpiod.line.Value(1),
10: gpiod.line.Value(1),
27: gpiod.line.Value(1),
})
time.sleep(1)
REQUEST.set_values({11: gpiod.line.Value(0),
10: gpiod.line.Value(0),
27: gpiod.line.Value(0),
})
Entire Matrix On/Off¶
GPIO
#!/usr/bin/env python
import gpiod
import time
MATRIX = (
(11, 10, 27, 4, 2),
( 0, 9, 22, 17, 3),
( 5, 20, 1, 25, 18),
( 6, 16, 7, 24, 15),
(13, 12, 8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())
REQUEST = gpiod.request_lines(
'/dev/gpiochip0',
consumer='blink',
config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})
REQUEST.set_values({i: gpiod.line.Value(1) for i in ALL_IOS})
time.sleep(0.5)
REQUEST.set_values({i: gpiod.line.Value(0) for i in ALL_IOS})
Bringing All Together¶
Continue
blink.py
Morph
hello*()
inblink(ios, interval, ntimes=None)
(None
->itertools.count()
, elserange()
)import gpiod
, next toasyncio
Pull in snippet
set_values
Pull in snippet
blink-raw
GPIO |
Interval |
---|---|
11 |
0.5 |
10 |
0.4 |
27 |
0.3 |
4 |
0.2 |
2 |
0.1 |
Blink
#!/usr/bin/env python
import gpiod
import asyncio
import itertools
MATRIX = (
(11, 10, 27, 4, 2),
( 0, 9, 22, 17, 3),
( 5, 20, 1, 25, 18),
( 6, 16, 7, 24, 15),
(13, 12, 8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())
REQUEST = gpiod.request_lines(
'/dev/gpiochip0',
consumer='blink',
config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})
def SET_VALUES(ios, b):
REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
async def main():
tasks = [
asyncio.create_task(blink((11,), 0.5)),
asyncio.create_task(blink((10,), 0.4)),
asyncio.create_task(blink((27,), 0.3)),
asyncio.create_task(blink(( 4,), 0.2)),
asyncio.create_task(blink(( 2,), 0.1)),
]
for t in tasks:
await t
asyncio.run(main())
Modularize¶
Note this is not about clean coding 🐷
Cram stuff into
stuff.py
All but
main()
goes there
Stuff
import asyncio
import gpiod
import itertools
MATRIX = (
(11, 10, 27, 4, 2),
( 0, 9, 22, 17, 3),
( 5, 20, 1, 25, 18),
( 6, 16, 7, 24, 15),
(13, 12, 8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())
REQUEST = gpiod.request_lines(
'/dev/gpiochip0',
consumer='blink',
config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})
def SET_VALUES(ios, b):
REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
Blink
#!/usr/bin/env python
from stuff_raw import * # <-- this is not about "clean code"
import asyncio
async def main():
tasks = [
asyncio.create_task(blink((11,), 0.5)),
asyncio.create_task(blink((10,), 0.4)),
asyncio.create_task(blink((27,), 0.3)),
asyncio.create_task(blink(( 4,), 0.2)),
asyncio.create_task(blink(( 2,), 0.1)),
]
for t in tasks:
await t
asyncio.run(main())
Play A Bit: Blink Entire Rows¶
Five tasks, one for each row
Make list of tasks
Blink
#!/usr/bin/env python
from stuff_raw import *
import asyncio
async def main():
rows = MATRIX
tasks = [
asyncio.create_task(blink(rows[0], 0.5)),
asyncio.create_task(blink(rows[1], 0.4)),
asyncio.create_task(blink(rows[2], 0.3)),
asyncio.create_task(blink(rows[3], 0.2)),
asyncio.create_task(blink(rows[4], 0.1)),
]
for t in tasks:
await t
asyncio.run(main())
Sideway, maybe: blink columns
⟶
numpy.transpose()
I like heavy dependencies 🐷
Blink
#!/usr/bin/env python
from stuff_raw import *
import asyncio
import numpy
async def main():
rows = MATRIX
cols = numpy.transpose(MATRIX).tolist()
tasks = [
asyncio.create_task(blink(cols[0], 0.5)),
asyncio.create_task(blink(cols[1], 0.4)),
asyncio.create_task(blink(cols[2], 0.3)),
asyncio.create_task(blink(cols[3], 0.2)),
asyncio.create_task(blink(cols[4], 0.1)),
]
for t in tasks:
await t
asyncio.run(main())
Coroutines?¶
Hm … too much of
asyncio.create_task()
hereInstantiate coroutines first, not using
create-task()
immediatelyCoroutines do nothing
await
, orcreate_task()
Pass to
all()
(pull in from snippetall-initial
)await
that instead
Blink
#!/usr/bin/env python
from stuff_raw import *
import asyncio
import numpy
async def all(coros):
tasks = [asyncio.create_task(c) for c in coros]
for t in tasks:
await t
async def main():
cols = numpy.transpose(MATRIX).tolist()
await all([
blink(cols[0], 0.5),
blink(cols[1], 0.4),
blink(cols[2], 0.3),
blink(cols[3], 0.2),
blink(cols[4], 0.1),
])
asyncio.run(main())
Not Enough: sequence()
¶
Not using
create_task()
: sequential executionPass a limit to
blink()
⟶ another one:
sequence()
#!/usr/bin/env python
from stuff_raw import *
import asyncio
import numpy
async def all(coros):
tasks = [asyncio.create_task(c) for c in coros]
for t in tasks:
await t
async def sequence(coros):
for c in coros:
await c
async def main():
cols = numpy.transpose(MATRIX).tolist()
await sequence([
blink(cols[0], 0.5, 2),
blink(cols[1], 0.4, 2),
blink(cols[2], 0.3, 2),
blink(cols[3], 0.2, 2),
blink(cols[4], 0.1, 2),
])
asyncio.run(main())
Pull
all()
andsequence()
out intostuff.py
when done
Looping: forever()
¶
#!/usr/bin/env python
from stuff_raw2 import *
import asyncio
import numpy
async def forever(coro):
while True:
await coro
async def main():
cols = numpy.transpose(MATRIX).tolist()
await forever(
sequence([
blink(cols[0], 0.3, 2),
blink(cols[1], 0.2, 2),
blink(cols[2], 0.1, 2),
blink(cols[3], 0.01, 2),
blink(cols[4], 0.05, 2),
])
)
asyncio.run(main())
$ code/blink-forever.py
...
RuntimeError: cannot reuse already awaited coroutine
...
Need to create new coroutine at each execution
⟶ factory?
A Stripped-Down Program (⟶ Factory)¶
#!/usr/bin/env python
from stuff_raw2 import *
import asyncio
async def forever(coro):
while True:
await coro
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
async def main():
await forever(
blink((11,), 0.3, 1),
)
asyncio.run(main())
Turn blink()
Into A Factory¶
class blink
, with a__init__
just like originalblink()
functionStores ctor parameters as members
One single method:
create_coro(self)
Creates nested function
_blink(ios, interval, ntimes)
Returns instantiated coroutine from it
forever()
gets passedfactory
, and callsfactory.create_coro()
in every iteration⟶ done
⟶ except: clumsy
#!/usr/bin/env python
from stuff_raw2 import *
import asyncio
async def forever(coro_creator):
while True:
await coro_creator.create_coro()
class blink:
def __init__(self, ios, interval, ntimes=None):
self.ios = ios
self.interval = interval
self.ntimes = ntimes
def create_coro(self):
async def _blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
return _blink(self.ios, self.interval, self.ntimes)
async def main():
await forever(
blink((11,), 0.3, 1), # <--- create *coro-creator*, passed to forever()
)
asyncio.run(main())
Anti-Clumsy Decorator: blink()
Wrapper¶
def create_factory_for_blink(blinkfunc):
def factory(ios, interval, ntimes=None):
def create_coro():
return blinkfunc(ios, interval, ntimes)
return create_coro
return factory
Factory is much writing
Closures are objects
“Members” are in the closure
One method: the function
⟶ inverted view
Start with
create_factory_for_blink(blink_func)
⟶ hardwired parameters:
ios, interval, ntimes
def create_factory_for_blink(blinkfunc): def factory(ios, interval, ntimes=None): def create_coro(): return blinkfunc(ios, interval, ntimes) return create_coro return factory
Wrap manually
blink = create_factory_for_blink(blink)
Stuff
import asyncio
import gpiod
import itertools
MATRIX = (
(11, 10, 27, 4, 2),
( 0, 9, 22, 17, 3),
( 5, 20, 1, 25, 18),
( 6, 16, 7, 24, 15),
(13, 12, 8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())
REQUEST = gpiod.request_lines(
'/dev/gpiochip0',
consumer='blink',
config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})
def SET_VALUES(ios, b):
REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
async def all(coros):
tasks = [asyncio.create_task(c) for c in coros]
for t in tasks:
await t
async def sequence(coros):
for c in coros:
await c
Blink
#!/usr/bin/env python
from stuff_decorator_stage1 import *
import asyncio
async def forever(factory):
while True:
await factory() # <--- call to create_coro()
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
def create_factory_for_blink(blinkfunc):
def factory(ios, interval, ntimes=None):
def create_coro():
return blinkfunc(ios, interval, ntimes)
return create_coro
return factory
blink = create_factory_for_blink(blink) # <--- turn blink into blink-factory
async def main():
await forever(
blink((11,), 0.3, 1), # <--- use blink-factory to instantiate coroutine
)
asyncio.run(main())
@program
, Finally¶
Stuff
import asyncio
import gpiod
import itertools
MATRIX = (
(11, 10, 27, 4, 2),
( 0, 9, 22, 17, 3),
( 5, 20, 1, 25, 18),
( 6, 16, 7, 24, 15),
(13, 12, 8, 23, 14),
)
ALL_IOS = sum(MATRIX, start=())
REQUEST = gpiod.request_lines(
'/dev/gpiochip0',
consumer='blink',
config={ALL_IOS: gpiod.LineSettings(direction=gpiod.line.Direction.OUTPUT)})
def SET_VALUES(ios, b):
REQUEST.set_values({i: gpiod.line.Value(b) for i in ios})
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
async def all(coros):
tasks = [asyncio.create_task(c) for c in coros]
for t in tasks:
await t
async def sequence(coros):
for c in coros:
await c
Blink
#!/usr/bin/env python
from stuff_decorator_stage1 import *
import asyncio
async def forever(factory):
while True:
await factory() # <--- call to create_coro()
async def blink(ios, interval, ntimes=None):
loop = (ntimes is None) and itertools.count() or range(ntimes)
for _ in loop:
SET_VALUES(ios, 1)
await asyncio.sleep(interval)
SET_VALUES(ios, 0)
await asyncio.sleep(interval)
def create_factory_for_blink(blinkfunc):
def factory(ios, interval, ntimes=None):
def create_coro():
return blinkfunc(ios, interval, ntimes)
return create_coro
return factory
blink = create_factory_for_blink(blink) # <--- turn blink into blink-factory
async def main():
await forever(
blink((11,), 0.3, 1), # <--- use blink-factory to instantiate coroutine
)
asyncio.run(main())
Playground: cycle()
¶
@program
async def cycle(ios, interval):
import itertools
for i in itertools.cycle(ios):
p = blink((i,), interval, 1)
await p()
Cycle
row[0]
⟶ A-HA!!
Fast Forward: any()
, And Cancellation¶
blink.py
: insertprog-all-demo
⟶ explainShow
stuff.any()
⟶ opposite ofall()
blink.py
: insertprog-any-demo
Playground: on()
¶
Show
stuff.on()
Explain
Future
blink.py
: insertprog-any-on-demo
blink.py
: insertprog-my-blink
Goodbye¶
blink.py
: insertprog-smiley