Source code for bppy.model.b_thread
from copy import copy
[docs]def b_thread(func):
"""
A decorator to wrap bthread generator with, in order to handle data transmission and bthread termination.
"""
def wrapper(*args):
while True:
m = None
f = func(*args)
while True:
try:
e = f.send(m)
if isinstance(e, dict):
e["locals"] = copy(f.gi_frame.f_locals)
m = yield e
if m is None:
break
except (KeyError, StopIteration):
m = yield None
break
return wrapper