Using External Events
This example shows how external events can be added to the program.
Each program contains a queue of external events, which can be added using the enqueue_external_event method.
Once no internal events can be selected, the selected event will be the first external event in the queue.
from bppy import *
# define an external event class
class External(BEvent):
pass
# define the set of external events
any_external = EventSet(lambda event: isinstance(event, External))
@b_thread
def add_external(): # adds external events to the bprogram and then continues doing nothing
b_program.enqueue_external_event(External("A"))
b_program.enqueue_external_event(External("B"))
b_program.enqueue_external_event(External("C"))
while True:
yield {waitFor: All()}
@b_thread
def act_on_external():
# waits for external events, while blocking all internal events
# and then requests an internal event with the same name
while True:
# triggers external events if exists, else terminates the bprogram
event = yield {block: All(), waitFor: any_external}
yield {request: BEvent(event.name)}
if __name__ == "__main__":
b_program = BProgram(bthreads=[add_external(), act_on_external()],
event_selection_strategy=SimpleEventSelectionStrategy(),
listener=PrintBProgramRunnerListener())
b_program.run()