Source code for bppy.analysis.bprogram_converter

from bppy.model.b_event import BEvent
from bppy.model.event_set import EventSet
from bppy.model.sync_statement import sync, choice
from bppy.utils.dfs import DFSBProgram
from bppy.utils.exceptions import BPAssertionError
from collections.abc import Iterable


[docs] class BProgramConverter: """ A class to convert a behavioral program to a PRISM model by exploring the DFS graph. """ def __init__(self, bprogram_generator, event_list, bt_names=None, max_trace_length=1000): """ Initialize a DFSBProgramVerifier. Parameters: ----------- bprogram_generator : function A function that generates a new instance of the BProgram. event_list : list A list of event objects which are used in the model. bt_names : list, optional Names to used for the matching bthreads in the prism model. If none, the bthreads are named bt0, bt1, etc. max_trace_length : int, optional Maximum length of the trace before terminating the search. Defaults to 1000. """ self.bprogram_generator = bprogram_generator self.event_list = event_list if bt_names is not None: self.bt_names = bt_names else: names = [f'bt{i}' for i in range(len(self.bprogram_generator().bthreads))] self.bt_names = names self.max_trace_length = max_trace_length
[docs] def collect_structure(self): """ Generates the tree structure and other necessary details of the behavioral program. """ dfs = DFSBProgram(self.bprogram_generator, self.event_list, max_trace_length=self.max_trace_length) init, mapper = dfs.run(explore_graph=False) names = [x.name for x in dfs.event_list] events = {e: BEvent(e) for e in names} bp_states = {self.bt_names[i]:v for i, v in mapper.items()} return names, events, bp_states
#def
[docs] def to_prism(self, output_file=None): """ Converts the behavioral program to a PRISM model of a markov decision process, with each bthread corresponding to a module. Returns the resulting text content. Parameters ---------- output_file : str, optional Name of the file to write the PRISM model to. Defaults to 'None'. """ event_names, events, bp_states = self.collect_structure() rule_template = "formula {}_{} = {};" bt_condition = "({}_{}_{}={})" header = 'dtmc\n\n' def map_bt_states(event_names, bt_states): node_to_s = {s: i for i, s in enumerate(bt_states)} relevant_events = {e: False for e in event_names} bt_req = {e: [] for e in event_names} bt_block = {e: [] for e in event_names} bt_trans = {} bt_probs = {} for node, n in node_to_s.items(): if isinstance(node.data, sync): bt_trans[n] = {} if ('request' in node.data): if isinstance(node.data['request'], Iterable): for e in node.data['request']: bt_req[e.name].append(n) elif isinstance(node.data['request'], BEvent): bt_req[node.data['request'].name].append(n) else: raise BPAssertionError("Request of an unknown type") if ('block' in node.data): if isinstance(node.data['block'], Iterable): for e in node.data['block']: bt_block[e.name].append(n) elif isinstance(node.data['block'], BEvent): bt_block[node.data['block'].name].append(n) else: raise BPAssertionError("Block of an unknown type") for e in event_names: bt_trans[n][e] = (node_to_s[node.transitions[events[e]]] if events[e] in node.transitions else n) for t in node.transitions: if node.transitions[t] != node: relevant_events[t.name] = True if isinstance(node.data, choice): bt_probs[n] = {choice_outcome: (node_to_s[node.transitions[choice_outcome][0]], choice_prob) for choice_outcome, choice_prob in node.data.options()} for e, n in bt_req.items(): if not relevant_events[e] and len(n) > 0: relevant_events[e] = True for e, n in bt_block.items(): if not relevant_events[e] and len(n) > 0: relevant_events[e] = True return bt_req, bt_block, bt_trans, bt_probs, relevant_events def format_bt_module(name, event_names, bt_states): bt_req, bt_block, bt_trans, bt_probs, relevant_events = map_bt_states(event_names, bt_states) event_names = [e for e in event_names if relevant_events[e]] module_template = '\nmodule {}\n\t{}\n\n\t{}\nendmodule\n' state_template = "formula {}_{}_{} = {};" state_name = f's_{name}' state_init = f'{state_name}: [0..{len(bt_states)-1}] init 0;' event_transition = '[{}] ({}={}) -> 1: ({}\'={});' prob_transition = '[] ({}={}) -> {};' prob_format = '{}: ({}\'={})' req = [state_template.format(name, "req", e, ' | '.join([('({}={})').format(state_name, n) for n in bt_req[e]]) if len(bt_req[e]) > 0 else 'false') for e in event_names] block = [state_template.format(name, "block", e, ' | '.join([('({}={})').format(state_name, n) for n in bt_block[e]]) if len(bt_block[e]) > 0 else 'false') for e in event_names] transitions = [] for n, tr in bt_trans.items(): string_tr = [event_transition.format(e, state_name, n, state_name, s_tag) for e, s_tag in tr.items() if e in event_names] transitions.append('\n\t'.join(string_tr)) probabilities = [] for n, c in bt_probs.items(): string_prob = [prob_format.format(p, state_name, next_state) for next_state, p in c.values()] probabilities.append(prob_transition.format(state_name, n, ' + '.join(string_prob))) module = '' module += '\n\n'.join( ['\n'.join(sec) for sec in [req,block]]) module += module_template.format(name, state_init, '\n\t\n\t'.join(transitions + probabilities)) return module, event_names text, bt_events = zip(*[format_bt_module(bt_name, event_names, bp_states[bt_name]) for bt_name in self.bt_names]) content = '\n\n'.join([t for t in text]) # header regulating which events are enabled + main module req = [rule_template.format(e, "req", ' | '.join([bt_condition.format(bt,'req',e,'true') for i, bt in enumerate(self.bt_names) if e in bt_events[i]])) for e in event_names] block = [rule_template.format(e, "block", ' | '.join([bt_condition.format(bt,'block',e,'true') for i, bt in enumerate(self.bt_names) if e in bt_events[i]])) for e in event_names] enable = [rule_template.format(e, "enabled", ' & '.join([f'({e}_req=true)', f'({e}_block=false)'])) for e in event_names] labels = ["label \"{}\" = ({}_enabled=true);".format(e, e) for e in event_names] event_transition = '[{}] ({}_enabled=true) -> 1: (event\'={});' guards = '\n\t'.join([event_transition.format(e, e, i) for i, e in enumerate(event_names)]) module_main = f'\nmodule main\n\tevent: [-1..{len(event_names)}] init -1;'+\ f'\n\t{guards}\nendmodule\n' header += '\n\n'.join( ['\n'.join(sec) for sec in [req,block,enable, labels]]) header += module_main header += '\n//-----------------------\n\n' if output_file: with open(output_file, "w") as f: f.write(header + content) return header + content