Learning a BProgram as a gym environment

This example demonstrates how to learn a BProgram as a gym environment, using the package’s extension to the OpenAI gym. In this extension, we have incorporated a localReward parameter into the yield statement, reflecting the system’s preferences. The BPEnv class implementation requires a b-program generator - a function that creates a new instance of the b-program and the list of program events. The default observation space for the b-program within BPEnv is represented as a Cartesian product of the b-thread’s execution points, classified as multi-discrete. For developers seeking to tailor observation space to specific needs, alternative implementations can be created by extending the abstract class BPObservationSpace, which includes access to both the b-thread’s execution point and its local variables. The Reward computation at each state is determined through a function that receives the reward statements from all b-threads. The default approach calculates the total reward at each yield point by summing the individual rewards from all active b-threads.

import bppy as bp
from bppy.gym import *
import numpy as np

@bp.thread
def add_hot():  # request hot 5 times, and specify a reward
    for i in range(5):
        yield bp.sync(request=bp.BEvent("HOT"), localReward=-0.01)
    yield bp.sync(waitFor=bp.All(), localReward=1)

@bp.thread
def add_cold():  # request cold 5 times
    for i in range(5):
        yield bp.sync(request=bp.BEvent("COLD"))


@bp.thread
def control():  # blocks HOT from occurring twice in a row
    while True:
        yield bp.sync(waitFor=bp.BEvent("HOT"))
        yield bp.sync(block=bp.BEvent("HOT"), waitFor=bp.BEvent("COLD"))


def init_bprogram():  # function to initialize the b-program with the defined b-threads
    return bp.BProgram(bthreads=[add_hot(), add_cold(), control()],
                       event_selection_strategy=bp.SimpleEventSelectionStrategy())


if __name__ == '__main__':
    # define event list
    event_list = [bp.BEvent("HOT"), bp.BEvent("COLD")]

    # initialize environment with the defined b-program generator, observation space, and reward function
    env = BPEnv(bprogram_generator=init_bprogram,
                action_list=event_list,  # all program events are considered as possible actions for the agent
                observation_space=SimpleBPObservationSpace(init_bprogram, event_list),
                reward_function=lambda rewards: sum(filter(None, rewards)))

    # reset environment and print initial state
    state, _ = env.reset()
    print(state)
    terminated = False
    while not terminated:  # loop until the environment (b-program) terminates
        action = env.action_space.sample()  # sample an action
        print(action)
        state, reward, terminated, _, info = env.step(action)  # take a step with the sampled action
        print(state, reward, terminated, info)

    # importing stable_baselines3 and initializing a PPO model
    from stable_baselines3 import PPO
    model = PPO("MlpPolicy", env, verbose=1)
    model.learn(total_timesteps=100000)

    # running the environment again with the trained model
    state, _ = env.reset()
    print(state)
    terminated = False
    while not terminated:
        action, _states = model.predict(state)
        print(action)
        state, reward, terminated, _, info = env.step(action)
        print(state, reward, terminated, info)

Note that not all events are necessarily considered actions. This distinction enables discernment between controllable and uncontrollable program behaviors. For instance, the following b-program implements the frozen lake environment:

import bppy as bp
from bppy.gym import *
import numpy as np

ROWS = 4
COLS = 4

# defining the agent actions
agent_actions = [bp.BEvent("LEFT"), bp.BEvent("RIGHT"), bp.BEvent("UP"), bp.BEvent("DOWN")]

# defining the internal events for the environment
move_event = bp.EventSet(lambda e: e.name.startswith("Move"))


class Move(bp.BEvent):
    def __init__(self, i, j):
        super().__init__("Move", {"i": i, "j": j})


# b-thread for cells in the environment, triggered when the agent moving to this cell. The b-thread than requests to
# move to the intended direction or a perpendicular direction randomly
@bp.thread
def cell(i, j):
    while True:
        yield bp.sync(waitFor=Move(i, j))
        e = yield bp.sync(waitFor=agent_actions)
        actions_and_opposite_moves = {"LEFT": Move(i, j+1),"RIGHT": Move(i, j-1),"UP": Move(i+1, j),"DOWN": Move(i-1, j)}
        # remove the opposite move to the action
        actions_and_opposite_moves.pop(e.name)
        possible_moves = list(actions_and_opposite_moves.values())
        yield bp.sync(request=possible_moves, block=agent_actions)


# b-thread for hole locations in the environment, representing terminal states
@bp.thread
def hole(i, j):
    yield bp.sync(waitFor=Move(i, j))
    yield bp.sync(block=bp.All())  # reached hole terminate the program with a reward of 0


# b-thread representing wall locations, blocking moves to this wall
@bp.thread
def wall(i, j):  # block moves to this wall
    yield bp.sync(block=Move(i, j))


# b-thread for the start of the environment run, triggering the initial location of the agent
@bp.thread
def start():
    yield bp.sync(request=Move(0, 0), block=agent_actions)


# b-thread representing the goal of the environment, providing a terminal state with reward 1
@bp.thread
def goal():
    yield bp.sync(waitFor=Move(ROWS-1, COLS-1), localReward=0)
    yield bp.sync(block=bp.All(), localReward=1)  # reached goal - terminate the program with a reward of 1


# b-thread for the agent, requesting actions based on the current location
@bp.thread
def agent():
    while True:
        e = yield bp.sync(waitFor=move_event)
        current_location = (e.data["i"], e.data["j"])
        yield bp.sync(request=agent_actions)


# function to initialize the b-program with the defined b-threads
def init_bprogram():
    """
    returning an instance for the standard 4x4 frozen lake environment:
        ["SFFF",
         "FHFH",
         "FFFH",
         "HFFG"]
    """
    holes_locations = [(1, 1), (1, 3), (2, 3), (3, 0)]
    return bp.BProgram(bthreads=[start(), agent(), goal()] +
                                [hole(i, j) if (i, j) in holes_locations else cell(i, j) for i in range(ROWS) for j in range(COLS)] +
                                [wall(-1, j) for j in range(COLS)] +
                                [wall(ROWS, j) for j in range(COLS)] +
                                [wall(i, -1) for i in range(ROWS)] +
                                [wall(i, COLS) for i in range(ROWS)],
                       event_selection_strategy=bp.SimpleEventSelectionStrategy(),
                       listener=bp.PrintBProgramRunnerListener())

# listing all possible events in the b-program
all_events = [Move(i, j) for i in range(-1, ROWS+1) for j in range(-1, COLS+1)] + agent_actions + [bp.BEvent("HOLE"), bp.BEvent("GOAL")]


# defining the observation space for the environment based on the current_location variable of the agent b-thread
class FrozenLakeObservationSpace(BPObservationSpace):
    def __init__(self, dim):
        super().__init__([dim], np.int64, None)
    def bp_state_to_gym_space(self, bthreads_states):
        agent_bthread_statement = [x for x in bthreads_states if "current_location" in x.get("locals", {})][0]
        current_location = agent_bthread_statement["locals"]["current_location"]
        return np.asarray([current_location[0]*COLS + current_location[1]], dtype=self.dtype)


# initialize environment with the defined b-program generator, observation space, and reward function
env = BPEnv(bprogram_generator=init_bprogram,
            action_list=agent_actions,  # all program events are considered as possible actions for the agent
            observation_space=FrozenLakeObservationSpace(ROWS*COLS),
            reward_function=lambda rewards: sum(filter(None, rewards)))

# reset environment and print initial state
state, _ = env.reset()
print(state)
terminated = False
while not terminated:  # loop until the environment (b-program) terminates
    action_id = env.action_space.sample()  # sample an action
    state, reward, terminated, _, info = env.step(action_id)  # take a step with the sampled action
    print(agent_actions[action_id].name, state, reward, terminated, info)


# importing stable_baselines3 and initializing a PPO model
from stable_baselines3 import PPO
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=100000)

# running the environment again with the trained model
state, _ = env.reset()
print(state)
terminated = False
while not terminated:
    action_id, _states = model.predict(state)
    state, reward, terminated, _, info = env.step(action_id)
    print(agent_actions[action_id].name, state, reward, terminated, info)