Select Git revision
level_abc.py
level_abc.py 29.82 KiB
"""Abstract level implementation."""
from abc import ABC, abstractmethod
from typing import Optional, Any
import subprocess
import sys
import os
from os import chdir
from os.path import join
import logging
import time
from logs import lvl
from utils import show_threads
from easytracker import init_tracker, PauseReasonType
from level.map import Map
from level.objects import Player, Object, Wop, Coordinates
from progress.progress_manager import progress
from tracker_helper import TrackerHelp
from verif_lib import LevelChecker
from code_window import ServerWindow
from config import Config
PROMPT = "(gdb) "
# pylint: disable=too-many-instance-attributes fixme
class AbstractLevel(ABC, TrackerHelp):
"""
Abstract level class that allows to use different configurations.
param: level_input_queue: a queue to receive info from whover has started us, e.g., a GUI or a command line
param: level_output_queue: a queue to send info with whoever has started us, e.g., a GUI
"""
def __init__(
self,
metadata: dict,
level_input_queue,
level_output_queue,
code_window,
):
"""
Initialize the level
:param metadata: the dictionnary that describes the level
"""
# Communication queues
# queue for output communication, e.g., with GUI
self.out_queue = level_output_queue
# queue for getting GDB command or inferior input
self.in_queue = level_input_queue
self.code_window = code_window
self.metadata = metadata
# Level infos
self.level_number = metadata["level_number"]
self.level_title = metadata["level_title"]
self.level_name = metadata["level_name"]
self.level_path = metadata["level_path"]
self.source_level_dir = metadata["source_level_dir"] or os.path.join(Config.ROOT_DIR, Config.LEVEL_DIR, self.level_name)
self.exec_name = metadata["exec_name"]
# Available commands
self.available_commands = metadata["available_commands"]
# Bugs/hints description
self.bug = metadata["bug"]
self.hints = metadata["hints"]
# number of times we have started or restarted the level
self.start_number = 0
# compile the level sources if required (files newer than executable)
# TODO(flo): should not be here. Either the tests will compile, or
# arcade must handle it so it can catch errors even when starting up.
# self.compile_source()
# Tracker
# Contains internal breakpoints used by level (e.g., pause on 'message' function)
TrackerHelp.__init__(
self,
self.source_level_dir,
self.exec_name,
)
# Maps and connection to the inferior's variable
self.current_map = "main"
self.maps = {}
for map_name in metadata["arcade_maps"]:
self.maps[map_name] = Map()
if 'map_width' in self.metadata and 'map_height' in self.metadata:
self.maps['main'].set_size(self.metadata['map_width'],self.metadata['map_height'])
# Add the player to the map
player_mode = metadata["player_mode"] or "global_variables"
if "player_y" in metadata:
player_y = int(metadata["player_y"])
player_mode = "x_only"
else:
player_y = None # can be None if not in x_only mode
self.player = Player(player_mode, player_y)
for map in self.maps.values():
map.add_player(self.player)
if "exit_x" in metadata:
init_x = int(metadata["exit_x"])
var_x = None
else:
init_x = None
var_x = "exit_x"
if "exit_y" in metadata:
init_y = int(metadata["exit_y"])
var_y = None
else:
init_y = None
var_y = "exit_y"
# Adding the exit to the map
exit_obj = Object(
init_x=init_x,
init_y=init_y,
var_x=var_x,
var_y=var_y,
char_rep="@",
name="exit",
)
lvl.debug(f"Init {exit_obj.name}: {init_x} and {init_y}, watching {var_x} and {var_y}")
self.maps["main"].add_exit(exit_obj)
lvl.debug(f"Init {exit_obj.name}: var_x in object {exit_obj.var_x}")
lvl.debug(f"Init {exit_obj.name}: var_y in object {exit_obj.var_y}")
# raise RuntimeError("Make this thread fail in level ABC!!")
# Create a WOP
for wop_dict in metadata["wops"].values():
lvl.debug(f"Adding wop {wop_dict}")
self.maps["main"].add_object(Wop(wop_dict))
self.map = self.maps["main"]
# everything that also need to be reset for a restart
self.set_reset()
def set_reset(self):
"""
Common code that must be called when we start/restart a level
"""
# Internal infos
self.last_cmdline = "next" # TODO: change to None?
self._internal_function_name = (
None # used for internal breakpoints, to know where to return to
)
self._internal_line_number = None
self.recording = False
self.record = []
# checker: initially none, gets created on the fly.
# kept in the current objects so it can be used in custom level.py
# files to know whether a check has already been performed (and failed)
self.checker = None
self.inferior_started = False
self.is_handling_internal_brwatch = False # True if we are handling internal brwatch
if self.magic_bp: # if a magic breakpoint was placed, remove it
self.delete_magic_breakpoint()
self.interactive_bp = None
self.interactive_mode = False
@property
def is_running(self):
"""
When we are handling an internal breakpoint, we consider the level is
still running as from the user perspective, this should be the case
(internal breaks and watches should be transparents).
"""
r = self.tracker and (self.tracker.is_running() or self.is_handling_internal_brwatch)
return r
@property
def is_validating(self):
return self.checker and self.checker.tracker and self.checker.tracker.is_running()
def check_map_change(self, memory):
"""
Map change in the inferior depends on the engine version
"""
# TODO(flo): do this for all subsequent engines
match self.metadata['engine_name']:
case None | "none":
return
case "simple_map" | "level_end" | "input_command" | "levers" | "config_file" | "read_map_str" | "command_args":
lvl.info("Map change check not implemented for {self.metadata['engine_name']}")
return
case "map_stack":
pass
case _ :
lvl.error(f"TODO: think of map changes for engines > map_stack ({self.metadata['engine_name']})")
raise NotImplementedError
# Here, get the game instance and check current map
game = self.tracker.get_variable_value_as_str("game")
def update_state(self) -> None:
"""Run events for updated variables"""
if (not self.inferior_started
or self.tracker._pause_reason.type == PauseReasonType.SIGNAL and 'SIGINT' in self.tracker._pause_reason.args):
return
try:
memory = self.tracker.get_program_memory(as_raw_python_objects=True)
if memory is None:
lvl.error(f"Cannot update state of level, memory is None")
return
except ValueError as e:
lvl.warning(f"Cannot update state of level: {e}")
return
# verify if there was a map change since last time
self.check_map_change(memory)
# lvl.debug(f"Globals variables: {globs}")
# lvl.debug(f"Frames: {frames}")
# update all objects on the current map
self.maps[self.current_map].update_objects(memory)
def update_all(self):
lvl.debug("Running update_all for level")
# Fetch and update variables if required
self.update_state()
# Update the code view
self.show_next_line()
def compile_source(self, force=False, target='all', capture_output=False):
lvl.info(f"Compiling program in {self.source_level_dir}")
cmd = ["make", "-C", self.source_level_dir, target]
if lvl.getEffectiveLevel() > logging.DEBUG:
cmd.append("--quiet")
if force:
cmd.append("--always-make")
result = subprocess.run(
cmd,
capture_output=capture_output
)
lvl.debug(f"Compilation results: {result}")
return result
def unlock_next_levels(self):
lvl.info(f"Success on level {self.level_name}, unlocking levels...")
# mark the level as success
progress.set_done(self.level_name)
def pre_validation(self):
"""
Redefine for a particular level
"""
pass
def post_validation(self):
"""
Redefine for a particular level
"""
pass
def validate_level(self, timeout: float|None = None) -> (bool, str | None):
"""
Do some general and specific checkings for current level.
"""
lvl.info(f"Validating level")
# Freezing temporarily current tracker to avoid using it by error during validation
tracker_freeze = self.tracker
assert tracker_freeze
self.tracker = None
valid = False
reasons = None
if timeout:
self.checker = LevelChecker(self, timeout=timeout)
else:
self.checker = LevelChecker(self)
if self.checker.result():
lvl.info("Level validated")
valid = True
else:
valid = False
lvl.info("Level failed validation for the following reasons:")
reasons = self.checker.get_reasons()
lvl.info(reasons)
lvl.debug(f"Check statistics: {self.checker.get_stats()}")
# Restoring tracker, if we want to start again
self.tracker = tracker_freeze
assert self.tracker
return valid, reasons
def pre_first_start(self):
"""
Called just before the first 'start' is sent to gdb.
"""
pass
def pre_restart(self):
"""
Called just before the subsequent 'start' are sent to gdb.
Synonym: 'reset'
"""
self.player.reset()
pass
def post_first_start(self):
"""
Called just after the first 'start' is sent to gdb.
"""
pass
def post_restart(self):
"""
Called just after the subsequent 'start' are sent to gdb.
"""
pass
def do_start(self):
"""
Start the level, by starting the inferior.
Can be called either at first start or at a restart.
(Note: cannot be named 'start' at it interferes with the Thread class)
There are two cases:
- initial start: need to initialize everything
- restart with tracker alive
"""
lvl.debug("Performing level 'set/reset' then start")
if self.start_number == 0:
self.pre_first_start()
lvl.debug("Performing first start")
self.load_program()
else:
self.set_reset() # not necessary for first start as was already called by init
self.pre_restart()
lvl.debug("Performing a restart")
self.reload_program()
self.tracker.send_direct_command("start")
self.inferior_started = True
self.start_number += 1
# update the local variables, line in code viewer, then notify
self.update_all()
if self.start_number == 1:
self.post_first_start()
else:
self.post_restart()
def restart(self):
"""
Called to trigger a level restart.
"""
assert False
# Not the correct way to do anymore
# self.send_console_command("start", prompt=True) # make gdb start/restart the program
def show_next_line(self):
source_file = self.tracker.next_source_file
line_number = self.tracker.next_lineno
self.code_window.show_next_line(
source_file, self.source_level_dir, line_number
)
def handle_breakpoint(self, cont=False) -> bool:
"""
Handle stopping on a breakpoint during a gdb command.
:param cont is set to True if the breakpoint happened during a 'continue'-like
command:
* continue => just return and let _gdb_continue method loop work
* step => can happen with e.g. 'step 50': let _gdb_step method loop do the work
:return value is True if we must really stop there (e.g. we are on a user breakpoint)
"""
pause_reason = self.tracker.pause_reason
# Check if the breakpoint is user defined
br_no = int(pause_reason.args[0]) # type: ignore
if br_no not in self.internal_bp:
lvl.debug(f"Breakpoint number {br_no} is not internal")
self.is_handling_internal_brwatch = False
return True
self.is_handling_internal_brwatch = True
# This is an internal breakpoint, need to "finish" the current gdb
# command
callbacks = self.internal_bp[br_no]
lvl.debug(f"Handling internal bp {br_no} ({callbacks})")
# lvl.debug(f"Internal breakpoint, calling callback functions")
# lvl.debug(f"BREAKPOINTS: {self.internal_bp}")
# lvl.debug(f"My BP: {self.internal_bp[br_no]}")
for cb in callbacks:
cb()
# Now it will depend on the type of the last gdb command used
if cont:
# continue will happen inside the _gdb_continue method
# step also
self.is_handling_internal_brwatch = False
return False
# Otherwise, we must go up in frames until we reach a user bp or the
# function where command was called.
# With next, it will effectively place the PC on the next line
lvl.debug(
f"START going up the stack to find {self._internal_function_name} (currently at {self.tracker.get_current_function_name()})"
)
while True:
# we can already by at the correct position, e.g., if the next line
# had the breakpoint
if self.tracker.get_current_function_name() == self._internal_function_name: # type: ignore
# breakpoint()
lvl.debug(
f"Finished going up the stack: found {self._internal_function_name}"
)
# If on a function that stores the result in a variable, one
# start step is necessary to "finish" the current line
if self._internal_line_number == self.tracker.next_lineno:
self.tracker.step()
self.is_handling_internal_brwatch = False
return False # back to the initial frame
lvl.debug(
f"Finishing {self.tracker.get_current_function_name()} to go up the stack to find {self._internal_function_name}"
)
pause_reason = self.tracker.send_direct_command("finish")
if pause_reason.type is PauseReasonType.FUNCTION_FINISHED:
# that was expected :-) next round!
continue
# need to handle other wanted or not stops
if pause_reason.type is PauseReasonType.EXITED:
# somehow the program terminated
self.is_handling_internal_brwatch = False
return True # should not try to contine after program exited...
if pause_reason.type is PauseReasonType.BREAKPOINT:
# recursively handle the bp:
# - if user breakpoint, will return True
# - if internal breakpoint, will have gone up the stack back to
# original point and returned False
return self.handle_breakpoint(cont)
if pause_reason.type is PauseReasonType.WATCHPOINT:
raise NotImplementedError
if pause_reason.type is PauseReasonType.CALL:
raise NotImplementedError
self.handle_track(finish=False)
# TODO: debug just to know the pause reason when fin
lvl.error(f"Pause reason {pause_reason} not handled")
raise NotImplementedError
def add_short_cmds(self) -> None:
"""Extend the available commands with the gdb aliases."""
for command in self.available_commands:
if command == "next":
cmds = ["n"]
elif command == "step":
cmds = ["s"]
elif command == "break":
cmds = ["brea", "brea", "bre", "bre", "b"]
elif command == "run":
cmds = ["r"]
elif command == "continue":
cmds = ["c", "fg"]
elif command == "condition":
cmds = ["cond"]
else:
continue
self.available_commands.extend(cmds)
def _on_quit(self):
"""
Quit handler. Called when asking gdb to quit or exit in the console.
"""
lvl.warning("Exiting gdb...")
self.stop_level()
def _gdb_next(self, count: Optional[int] = None):
"""
Emulate the next gdb command.
We have to use a loop and not pass 'count' to gdb, as we need to handle
internal bp, and there is no way of knowing how many 'next' gdb has executed before
stopping on a bp.
"""
count = count or 1
if count > 1:
# hide all the commands sent to gdb in the user console
self.send_print_thread_command('pause')
for _ in range(count):
pause_reason = self.tracker.next()
# self.update_state()
if (
pause_reason.type is PauseReasonType.EXITED
or pause_reason.type is PauseReasonType.SIGNAL
):
self._internal_function_name = None
self._internal_line_number = None
break
elif pause_reason.type is PauseReasonType.WATCHPOINT:
raise NotImplementedError
self.handle_watch()
elif pause_reason.type is PauseReasonType.CALL:
raise NotImplementedError
self.handle_track() # Internal breakpoint
elif pause_reason.type is PauseReasonType.BREAKPOINT:
ret = self.handle_breakpoint()
if ret:
break
elif pause_reason.type is PauseReasonType.ENDSTEPPING_RANGE:
# normal reason: 'next' is finished
continue
elif pause_reason.type is PauseReasonType.ERROR:
lvl.error(f"Error from gdb resulting from _gdb_next")
raise ValueError("GDB error")
elif pause_reason.type is PauseReasonType.UNKNOWN:
lvl.error(f"Pause reason {pause_reason} badly handled in _gdb_next")
raise NotImplementedError
else:
lvl.error(f"Pause reason {pause_reason} not handled")
raise NotImplementedError
# break # Breakpoint while handling CALL or WATCHPOINT
if count > 1:
self.send_print_thread_command('last-and-resume')
def _gdb_continue(self, timeout: float|None = None):
"""Emulate the continue command.
TODO add ignore_count parameter
"""
while True:
lvl.debug(f"Emulating continue: resuming in loop")
pause_reason = self.tracker.resume()
if pause_reason.type is PauseReasonType.WATCHPOINT:
raise NotImplementedError
self.handle_watch()
elif pause_reason.type is PauseReasonType.CALL:
raise NotImplementedError
self.handle_track()
elif pause_reason.type is PauseReasonType.BREAKPOINT:
lvl.debug(f"Got a breakpoint, will check if need to really stop")
ret = self.handle_breakpoint(cont=True)
lvl.debug(f"Back from handle breakpt")
if ret:
lvl.debug(f"Must stop now")
break
else:
lvl.debug(f"Must stop because pause reason is {pause_reason.type}")
break
def _gdb_interrupt(self):
self.interrupt_inferior()
def _gdb_break(
self,
loc_str: Optional[str] = None,
line_no: Optional[int] = None,
) -> None:
"""Emulate the break command."""
if line_no is not None:
self.tracker.break_before_line(line_no)
elif loc_str is not None:
self.tracker.break_before_func(loc_str)
else:
current_line = self.tracker.next_lineno
if current_line is not None:
self.tracker.break_before_line(current_line)
def _gdb_step(self, count: Optional[int] = None):
"""Emulate gdb commands."""
count = count or 1
if count > 1:
self.send_print_thread_command('pause')
for _ in range(count):
pause_reason = self.tracker.step()
if pause_reason.type is PauseReasonType.WATCHPOINT:
raise NotImplementedError
self.handle_watch()
elif pause_reason.type is PauseReasonType.CALL:
raise NotImplementedError
self.handle_track()
elif pause_reason.type is PauseReasonType.BREAKPOINT:
ret = self.handle_breakpoint(cont=True)
if ret:
break
else:
break
if count > 1:
self.send_print_thread_command('last-and-resume')
def _gdb_finish(self):
self.tracker.send_direct_command("finish")
def send_print_thread_command(self, cmd):
self.print_thread.send_command(cmd)
# tracker.stdout_queue.put_nowait(('command', cmd))
def send_print_thread_error(self, msg):
self.print_thread.send_error(msg)
def info(self, msg):
"""
Just output informations from agdbentures (warning, errors, messages to player)
"""
print(msg)
# pylint: disable=too-many-branches
def parse_and_eval(self, cmdline: str) -> None | str:
"""
Parse a command and call the appropriate function.
:param cmd: The command to run.
:return: None if there is nothing else to do, or a string if subsequent
actions are required
"""
def not_implemented(cmd: str) -> None:
"""Verbose error if not implemented."""
self.info(f"Arguments for the '{cmd}' command are not handled yet")
def usage(cmd: str, args: str) -> None:
"""Print usage of command that takes one single integer."""
self.info(f"Usage: {cmd} {args}")
if not cmdline or cmdline.isspace():
if self.last_cmdline is None:
return # Nothing to do
cmdline = self.last_cmdline # Using the last command
# Saving function name
self._internal_function_name = self.tracker.get_current_function_name() # type: ignore
self._internal_line_number = self.tracker.next_lineno
# Cleaning up the command
cmdline = cmdline.strip()
# Saving the command unless the command is to restart
if cmdline != "start":
self.last_cmdline = cmdline
if self.recording:
self.record.append(["gdb", cmdline])
# Parsing argument and conversion to integer
cmd, *args = cmdline.split(maxsplit=1)
arg_str = None
arg_int = None
if args and len(args) == 1:
arg_str = args[0]
if arg_str.isnumeric():
arg_int = int(arg_str)
# if cmd not in self.available_commands and
# cmd not in {"quit", "q", "exit"}:
if False:
self.info(f"{cmd} is not a valid command.")
return
lcmd = len(cmd)
if cmd in ["run"]:
self.inferior_started = True
if cmd == "start":
# Do not forward to gdb, notify the level thread that we have to
# restart the level
return 'restart'
elif cmd in ["n", "next"]:
if arg_str is not None and arg_int is None:
usage(cmd, "[count_integer]")
return
self._gdb_next(arg_int)
elif cmd in ["s", "step"]:
if arg_str is not None and arg_int is None:
usage(cmd, "[count_integer]")
return
self._gdb_step(arg_int)
# elif cmd in ["r", "run"]:
# if arg_str is not None or arg_int is not None:
# not_implemented(cmd)
# return
# self._gdb_continue()
elif cmd in ["c", "cont", "continue"]:
self._gdb_continue()
elif cmd in ["b", "break"]:
self._gdb_break(arg_str, arg_int)
elif lcmd >= 3 and "finish".startswith(cmd):
self._gdb_finish()
elif cmd == "interrupt":
self._gdb_interrupt()
elif cmd in ["q", "qui", "quit", "exi", "exit"]:
lvl.warning("Not allowed to quit gdb directly")
# self._on_quit()
return "gdb_exit"
else:
lvl.info(f"Command {cmd} not implemented yet.")
try:
self.tracker.send_direct_command(cmdline)
except RuntimeError:
print("Command does not exist!")
# TODO add other commands (condition, ...)
# and extend argument of existing commands (run, continue)
def record_file(self):
return os.path.join(
self.source_level_dir,
"record.in",
)
def print_prompt(self):
self.send_print_thread_command('prompt')
def interrupt_validation(self):
"""
Used e.g., on an infinite loop during validation.
TODO: use a timer to automatically send interrupt.
"""
assert self.checker.tracker
self.checker.tracker.interrupt()
def send_out_queue(self, payload):
"""
Send message to whoever has started us
"""
self.out_queue.put_nowait(payload)
## Putting commands in our own queue
def send_console_command(self, command: str, cmdtype="gdb", prompt=False):
lvl.debug(f"Putting gdb command in level_in queue {command}")
if prompt:
self.print_prompt()
self.in_queue.put_nowait({"type": cmdtype, "command": command})
self.print_thread.show_gdb_command(command + '\n')
## Putting commands in our own queue
def send_level_control(self, control_command: str):
lvl.debug(f"Putting level command in level_in queue {control_command}")
self.in_queue.put_nowait({"type": "control", "command": control_command})
def start_record(self):
self.recording = True
self.send_console_command("start")
def save_record(self):
if not self.recording:
raise RuntimeError("No record file")
self.recording = False
with open(self.record_file(), "w") as f:
for source, command in self.record:
print(f"{source}:{command}", file=f)
# empty current record
self.record.clear()
def play_record(self):
if not os.path.exists(self.record_file()):
raise RuntimeError("No record file")
self.recording = False
with open(self.record_file()) as f:
commands = f.readlines()
# self.send_console_command("start")
for command in commands:
source, text = command.strip("\n").split(":")
# print("(gdb) ", end="")
# TODO nice loop to wait for GDB to run before sending to stdin
if source == "gdb":
self.send_console_command(text)
elif source == "input":
self.send_console_command(text)
else:
print(f"Unknown source in record {source}")
# print(self.level.tracker.get_global_variables(True))
def available_wop(self) -> set[tuple[int, int]]:
"""Return a list of WOP coordinates."""
coordinates = set()
for wop in self.map.wops:
if wop.visible:
coordinates.add((wop.coord_x, wop.coord_y))
return coordinates
def get_wop_message(self) -> dict[str, list[str]]:
"""
Return a dictionnary of messages, with the WOP
name as key and its message as value (as a list of lines).
"""
message_dict = {}
assert False, "Deprecated function?"
for wop in self.map.wops:
if wop.visible and wop.talks:
wop.triggered = True
message_dict[wop.name] = wop.message_list
wop.talks = False
return message_dict
def setup(self) -> None:
"""
Set the level. Needs to be defined in the level-specific .py file
"""
pass
def __del__(self):
self.stop_level()
def stop_level(self):
"""
Terminate the tracker
TODO: investigate, useless to send None to the GUI. Can we send something else?
"""
# self.out_queue.put_nowait({'topic': "quit"})
self.out_queue.put_nowait(None)
self.terminate_tracker()
def terminate_tracker(self):
"""
Safe function to call when we want to terminate the tracker.
Ensures the tracker exists before terminating it, and sets the tracker to None.
"""
if not self.tracker:
return
self.tracker.terminate()
self.tracker = None
@abstractmethod
def test(self):
"""Test the level."""
@abstractmethod
def run(self):
"""Run the level."""