Symbolically verify a BProgram using PyNuSMV
This example demonstrates how to verify a behavioral program symbolically using PyNuSMV.
The SymbolicBProgramVerifier class implementation requires a b-program generator - a function that creates a new instance of the b-program and the list of program events.
The verifier can operate in two modes: Binary Decision Diagrams (BDD) and SAT-based Bounded Model Checking (BMC).
The specification to be verified, is written NuSMV LTL specification format.
Note:
1. The class requires the installation of PyNuSMV. More info on its installation can be found at https://github.com/LouvainVerificationLab/pynusmv.
2. The verifier is currently limited to b-programs with
SimpleEventSelectionStrategy.
The verifier does not support events with data.
import bppy as bp
from bppy.analysis.symbolic_bprogram_verifier import SymbolicBProgramVerifier
# define the number of philosophers
PHILOSOPHER_COUNT = 3
# define the event sets
any_take = dict(
[(i, [bp.BEvent(f"T{i}R"), bp.BEvent(f"T{(i + 1) % PHILOSOPHER_COUNT}L")]) for i in range(PHILOSOPHER_COUNT)])
any_put = dict(
[(i, [bp.BEvent(f"P{i}R"), bp.BEvent(f"P{(i + 1) % PHILOSOPHER_COUNT}L")]) for i in range(PHILOSOPHER_COUNT)])
any_take_semaphore = bp.EventSet(lambda e: e.name.startswith("TS"))
any_release_semaphore = bp.EventSet(lambda e: e.name.startswith("RS"))
all_events = [bp.BEvent(f"T{i}R") for i in range(PHILOSOPHER_COUNT)] + \
[bp.BEvent(f"T{i}L") for i in range(PHILOSOPHER_COUNT)] + \
[bp.BEvent(f"P{i}R") for i in range(PHILOSOPHER_COUNT)] + \
[bp.BEvent(f"P{i}L") for i in range(PHILOSOPHER_COUNT)] + \
[bp.BEvent(f"TS{i}") for i in range(PHILOSOPHER_COUNT)] + \
[bp.BEvent(f"RS{i}") for i in range(PHILOSOPHER_COUNT)]
# define the behavior of a philosopher
@bp.thread
def philosopher(i):
while True:
for j in range(2):
yield bp.sync(request=[bp.BEvent(f"T{i}R"), bp.BEvent(f"T{i}L")])
for j in range(2):
yield bp.sync(request=[bp.BEvent(f"P{i}R"), bp.BEvent(f"P{i}L")])
# define the behavior of a fork
@bp.thread
def fork(i):
while True:
e, cannot_put = None, None
e = yield bp.sync(waitFor=any_take[i], block=any_put[i])
cannot_put = bp.BEvent(f"P{(i + 1) % PHILOSOPHER_COUNT}L") if e.name.endswith("R") else bp.BEvent(f"P{i}R")
yield bp.sync(waitFor=any_put[i], block=any_take[i] + [cannot_put])
def init_bprogram(): # function to initialize the b-program with the defined b-threads
bthreads = [philosopher(i) for i in range(PHILOSOPHER_COUNT)] + [fork(i) for i in range(PHILOSOPHER_COUNT)]
return bp.BProgram(bthreads=bthreads,
event_selection_strategy=bp.SimpleEventSelectionStrategy(),
listener=bp.PrintBProgramRunnerListener()) # unnecessary here, as SymbolicBProgramVerifier us SimpleEventSelectionStrategy always.
# Initialize verifier and check that the program does not end using the BPROGRAM_DONE flag.
# The verifier will use BDDs to check the property.
verifier = SymbolicBProgramVerifier(init_bprogram, all_events)
result, explanation_str = verifier.verify(spec="G (!(event = BPROGRAM_DONE))", type="BDD", find_counterexample=True,
print_info=False)
if result:
print("OK")
else:
print("Violation Found")
print("Counterexample:")
print(explanation_str)
# Initialize verifier and check that philosophers 0 takes the left fork infinitely often.
# The verifier will use SAT-based bounded model checking with bound 10 to check the property.
verifier = SymbolicBProgramVerifier(init_bprogram, all_events)
result, explanation_str = verifier.verify(spec="G (F event = T0L)", type="BMC", bound=10, find_counterexample=True,
print_info=False)
if result:
print("OK")
else:
print("Violation Found")
print("Counterexample:")
print(explanation_str)
# introduce semaphores to fix the classic problem
@bp.thread
def semaphore():
while True:
yield bp.sync(waitFor=any_take_semaphore)
yield bp.sync(waitFor=any_release_semaphore, block=any_take_semaphore)
@bp.thread
def take_semaphore(i):
while True:
yield bp.sync(request=bp.BEvent(f"TS{i}"), block=[bp.BEvent(f"T{i}R"), bp.BEvent(f"T{i}L")])
for j in range(2):
yield bp.sync(waitFor=[bp.BEvent(f"T{i}R"), bp.BEvent(f"T{i}L")])
yield bp.sync(request=bp.BEvent(f"RS{i}"), block=[bp.BEvent(f"P{i}R"), bp.BEvent(f"P{i}L")])
for j in range(2):
yield bp.sync(waitFor=[bp.BEvent(f"P{i}R"), bp.BEvent(f"P{i}L")])
def init_fixed_bprogram(): # initialize the fixed b-program with semaphores and the defined b-threads
bthreads = [philosopher(i) for i in range(PHILOSOPHER_COUNT)] + [fork(i) for i in range(PHILOSOPHER_COUNT)] + [
semaphore()] + [take_semaphore(i) for i in range(PHILOSOPHER_COUNT)]
return bp.BProgram(bthreads=bthreads,
event_selection_strategy=bp.SimpleEventSelectionStrategy(),
listener=bp.PrintBProgramRunnerListener())
# Initialize verifier and see that the updated program does not end.
verifier = SymbolicBProgramVerifier(init_fixed_bprogram, all_events)
result, explanation_str = verifier.verify(spec="G (!(event = BPROGRAM_DONE))", type="BDD", find_counterexample=True,
print_info=False)
if result:
print("OK")
else:
print("Violation Found")
print("Counterexample:")
print(explanation_str)
# Initialize verifier and see that philosophers 0 might stay hungry in the updated program :)
result, explanation_str = verifier.verify(spec="G (F event = T0L)", type="BMC", bound=10, find_counterexample=True,
print_info=False)
if result:
print("OK")
else:
print("Violation Found")
print("Counterexample:")
print(explanation_str)