Mentions légales du service

Skip to content
Snippets Groups Projects
Select Git revision
  • 98e279fb0b14f5cb5b1eaf751407d1cf0450cfb0
  • devel default protected
  • dev/docker
  • test-run
  • dev/guillaume
  • dev/bp_accumul
  • dev/entities
  • dev/keyboards-event
  • dev/bp_cumul
  • Fix_stepping_through_libc
  • dev/breakpoint_cumul
  • Level_toxicode_restoration
  • thunder_display_fix
  • dev/command_args_map_bug
  • dev/bug-continue
  • main protected
  • dev/toxicode
  • dev/music_sounds
  • dev/thomas/auto_export_python_path_when_running_maketests
  • dev/flo/sandbox-bug-hunting
  • DEPRECATED/split_game.py
  • MainTheoPHD
  • mark
23 results

level_abc.py

Blame
  • level_abc.py 29.82 KiB
    """Abstract level implementation."""
    
    from abc import ABC, abstractmethod
    from typing import Optional, Any
    import subprocess
    import sys
    import os
    from os import chdir
    from os.path import join
    import logging
    import time
    
    from logs import lvl
    from utils import show_threads
    from easytracker import init_tracker, PauseReasonType
    from level.map import Map
    from level.objects import Player, Object, Wop, Coordinates
    from progress.progress_manager import progress
    from tracker_helper import TrackerHelp
    from verif_lib import LevelChecker
    from code_window import ServerWindow
    from config import Config
    
    PROMPT = "(gdb) "
    
    # pylint: disable=too-many-instance-attributes fixme
    class AbstractLevel(ABC, TrackerHelp):
        """
        Abstract level class that allows to use different configurations.
    
        param: level_input_queue: a queue to receive info from whover has started us, e.g., a GUI or a command line
        param: level_output_queue: a queue to send info with whoever has started us, e.g., a GUI
        """
    
        def __init__(
            self,
            metadata: dict,
            level_input_queue,
            level_output_queue,
            code_window,
        ):
            """
            Initialize the level
    
            :param metadata: the dictionnary that describes the level
            """
            # Communication queues
            # queue for output communication, e.g., with GUI
            self.out_queue = level_output_queue
            # queue for getting GDB command or inferior input
            self.in_queue = level_input_queue
    
            self.code_window = code_window
    
            self.metadata = metadata
            # Level infos
            self.level_number = metadata["level_number"]
            self.level_title = metadata["level_title"]
            self.level_name = metadata["level_name"]
            self.level_path = metadata["level_path"]
            self.source_level_dir = metadata["source_level_dir"] or os.path.join(Config.ROOT_DIR, Config.LEVEL_DIR, self.level_name)
            self.exec_name = metadata["exec_name"]
    
            # Available commands
            self.available_commands = metadata["available_commands"]
            # Bugs/hints description
            self.bug = metadata["bug"]
            self.hints = metadata["hints"]
    
            # number of times we have started or restarted the level
            self.start_number = 0
    
            # compile the level sources if required (files newer than executable)
            # TODO(flo): should not be here. Either the tests will compile, or 
            # arcade must handle it so it can catch errors even when starting up.
            # self.compile_source()
    
            # Tracker
            # Contains internal breakpoints used by level (e.g., pause on 'message' function)
            TrackerHelp.__init__(
                self,
                self.source_level_dir,
                self.exec_name,
            )
    
            # Maps and connection to the inferior's variable
            self.current_map = "main"
            self.maps = {}
            for map_name in metadata["arcade_maps"]:
                self.maps[map_name] = Map()
    
            if 'map_width' in self.metadata and 'map_height' in self.metadata:
                self.maps['main'].set_size(self.metadata['map_width'],self.metadata['map_height'])
    
    
            # Add the player to the map
            player_mode = metadata["player_mode"] or "global_variables"
    
            if "player_y" in metadata:
                player_y = int(metadata["player_y"])
                player_mode = "x_only"
            else:
                player_y = None  # can be None if not in x_only mode
    
            self.player = Player(player_mode, player_y)
            for map in self.maps.values():
                map.add_player(self.player)
    
            if "exit_x" in metadata:
                init_x = int(metadata["exit_x"])
                var_x = None
            else:
                init_x = None
                var_x = "exit_x"
    
            if "exit_y" in metadata:
                init_y = int(metadata["exit_y"])
                var_y = None
            else:
                init_y = None
                var_y = "exit_y"
    
            # Adding the exit to the map
            exit_obj = Object(
                init_x=init_x,
                init_y=init_y,
                var_x=var_x,
                var_y=var_y,
                char_rep="@",
                name="exit",
            )
            lvl.debug(f"Init {exit_obj.name}: {init_x} and {init_y}, watching {var_x} and {var_y}")
            self.maps["main"].add_exit(exit_obj)
            lvl.debug(f"Init {exit_obj.name}: var_x in object {exit_obj.var_x}")
            lvl.debug(f"Init {exit_obj.name}: var_y in object {exit_obj.var_y}")
    
            # raise RuntimeError("Make this thread fail in level ABC!!")
    
            # Create a WOP
            for wop_dict in metadata["wops"].values():
                lvl.debug(f"Adding wop {wop_dict}")
                self.maps["main"].add_object(Wop(wop_dict))
            self.map = self.maps["main"]
    
            # everything that also need to be reset for a restart
            self.set_reset()
    
    
        def set_reset(self):
            """
            Common code that must be called when we start/restart a level
            """
            # Internal infos
            self.last_cmdline = "next"  # TODO: change to None?
            self._internal_function_name = (
                None  # used for internal breakpoints, to know where to return to
            )
            self._internal_line_number = None
    
            self.recording = False
            self.record = []
    
            # checker: initially none, gets created on the fly.
            # kept in the current objects so it can be used in custom level.py
            # files to know whether a check has already been performed (and failed)
            self.checker = None
    
            self.inferior_started = False
            self.is_handling_internal_brwatch = False  # True if we are handling internal brwatch
            if self.magic_bp:  # if a magic breakpoint was placed, remove it
                self.delete_magic_breakpoint()
    
            self.interactive_bp = None
            self.interactive_mode = False
    
    
        @property
        def is_running(self):
            """
            When we are handling an internal breakpoint, we consider the level is
            still running as from the user perspective, this should be the case
            (internal breaks and watches should be transparents).
            """
            r = self.tracker and (self.tracker.is_running() or self.is_handling_internal_brwatch)
            return r
    
        @property
        def is_validating(self):
            return self.checker and self.checker.tracker and self.checker.tracker.is_running()
    
    
        def check_map_change(self, memory):
            """
            Map change in the inferior depends on the engine version
            """
            # TODO(flo): do this for all subsequent engines
            match self.metadata['engine_name']:
                case None | "none":
                    return
                case "simple_map" | "level_end" | "input_command" | "levers" | "config_file" | "read_map_str" | "command_args":
                    lvl.info("Map change check not implemented for {self.metadata['engine_name']}")
                    return
                case "map_stack":
                    pass
                case _ :
                    lvl.error(f"TODO: think of map changes for engines > map_stack ({self.metadata['engine_name']})")
                    raise NotImplementedError
    
            # Here, get the game instance and check current map
            game = self.tracker.get_variable_value_as_str("game")
    
    
    
        def update_state(self) -> None:
            """Run events for updated variables"""
            if (not self.inferior_started 
            or self.tracker._pause_reason.type == PauseReasonType.SIGNAL and 'SIGINT' in self.tracker._pause_reason.args):
                return
            try:
                memory = self.tracker.get_program_memory(as_raw_python_objects=True)
                if memory is None:
                    lvl.error(f"Cannot update state of level, memory is None")
                    return
            except ValueError as e:
                lvl.warning(f"Cannot update state of level: {e}")
                return
    
            # verify if there was a map change since last time
            self.check_map_change(memory)
    
            # lvl.debug(f"Globals variables: {globs}")
            # lvl.debug(f"Frames: {frames}")
            # update all objects on the current map
            self.maps[self.current_map].update_objects(memory)
    
    
        def update_all(self):
            lvl.debug("Running update_all for level")
    
            # Fetch and update variables if required
            self.update_state()
    
            # Update the code view
            self.show_next_line()
    
        def compile_source(self, force=False, target='all', capture_output=False):
            lvl.info(f"Compiling program in {self.source_level_dir}")
            cmd = ["make", "-C", self.source_level_dir, target]
            if lvl.getEffectiveLevel() > logging.DEBUG:
                cmd.append("--quiet")
            if force:
                cmd.append("--always-make")
            result = subprocess.run(
                cmd,
                capture_output=capture_output
            )
            lvl.debug(f"Compilation results: {result}")
            return result
    
        def unlock_next_levels(self):
            lvl.info(f"Success on level {self.level_name}, unlocking levels...")
            # mark the level as success
            progress.set_done(self.level_name)
    
        def pre_validation(self):
            """
            Redefine for a particular level
            """
            pass
    
        def post_validation(self):
            """
            Redefine for a particular level
            """
            pass
    
        def validate_level(self, timeout: float|None = None) -> (bool, str | None):
    
            """
            Do some general and specific checkings for current level.
            """
            lvl.info(f"Validating level")
    
            # Freezing temporarily current tracker to avoid using it by error during validation
            tracker_freeze = self.tracker
            assert tracker_freeze
            self.tracker = None
            valid = False
            reasons = None
    
            if timeout:
                self.checker = LevelChecker(self, timeout=timeout)
            else:
                self.checker = LevelChecker(self)
            if self.checker.result():
                lvl.info("Level validated")
                valid = True
            else:
                valid = False
                lvl.info("Level failed validation for the following reasons:")
                reasons = self.checker.get_reasons()
                lvl.info(reasons)
                lvl.debug(f"Check statistics: {self.checker.get_stats()}")
    
            # Restoring tracker, if we want to start again
            self.tracker = tracker_freeze
            assert self.tracker
            return valid, reasons
    
        def pre_first_start(self):
            """
            Called just before the first 'start' is sent to gdb.
            """
            pass
    
        def pre_restart(self):
            """
            Called just before the subsequent 'start' are sent to gdb.
            Synonym: 'reset'
            """
            self.player.reset()
            pass
    
        def post_first_start(self):
            """
            Called just after the first 'start' is sent to gdb.
            """
            pass
    
        def post_restart(self):
            """
            Called just after the subsequent 'start' are sent to gdb.
            """
            pass
    
    
    
        def do_start(self):
            """
            Start the level, by starting the inferior.
            Can be called either at first start or at a restart.
            (Note: cannot be named 'start' at it interferes with the Thread class)
    
            There are two cases:
                - initial start: need to initialize everything
                - restart with tracker alive
            """
    
            lvl.debug("Performing level 'set/reset' then start")
    
    
            if self.start_number == 0:
                self.pre_first_start()
                lvl.debug("Performing first start")
                self.load_program()
            else:
                self.set_reset() # not necessary for first start as was already called by init
                self.pre_restart()
                lvl.debug("Performing a restart")
                self.reload_program()
    
    
            self.tracker.send_direct_command("start")
            self.inferior_started = True
            self.start_number += 1
    
            # update the local variables, line in code viewer, then notify
            self.update_all()
    
            if self.start_number == 1:
                self.post_first_start()
            else:
                self.post_restart()
    
    
        def restart(self):
            """
            Called to trigger a level restart.
            """
            assert False
            # Not the correct way to do anymore
            # self.send_console_command("start", prompt=True)  # make gdb start/restart the program
    
        def show_next_line(self):
            source_file = self.tracker.next_source_file
            line_number = self.tracker.next_lineno
            self.code_window.show_next_line(
                source_file, self.source_level_dir, line_number
            )
    
    
        def handle_breakpoint(self, cont=False) -> bool:
            """
            Handle stopping on a breakpoint during a gdb command.
    
            :param cont is set to True if the breakpoint happened during a 'continue'-like
            command:
               * continue  => just return and let _gdb_continue method loop work
               * step      => can happen with e.g. 'step 50': let _gdb_step method loop do the work
    
            :return value is True if we must really stop there (e.g. we are on a user breakpoint)
    
            """
            pause_reason = self.tracker.pause_reason
    
            # Check if the breakpoint is user defined
            br_no = int(pause_reason.args[0])  # type: ignore
    
            if br_no not in self.internal_bp:
                lvl.debug(f"Breakpoint number {br_no} is not internal")
                self.is_handling_internal_brwatch = False
                return True
    
            self.is_handling_internal_brwatch = True
    
            # This is an internal breakpoint, need to "finish" the current gdb
            # command
            callbacks = self.internal_bp[br_no]
            lvl.debug(f"Handling internal bp {br_no} ({callbacks})")
            # lvl.debug(f"Internal breakpoint, calling callback functions")
            # lvl.debug(f"BREAKPOINTS: {self.internal_bp}")
            # lvl.debug(f"My BP: {self.internal_bp[br_no]}")
            for cb in callbacks:
                cb()
    
            # Now it will depend on the type of the last gdb command used
            if cont:
                # continue will happen inside the _gdb_continue method
                # step also
                self.is_handling_internal_brwatch = False
                return False
    
            # Otherwise, we must go up in frames until we reach a user bp or the
            # function where command was called.
            # With next, it will effectively place the PC on the next line
            lvl.debug(
                f"START going up the stack to find {self._internal_function_name} (currently at {self.tracker.get_current_function_name()})"
            )
    
            while True:
                # we can already by at the correct position, e.g., if the next line
                # had the breakpoint
                if self.tracker.get_current_function_name() == self._internal_function_name:  # type: ignore
                    # breakpoint()
                    lvl.debug(
                        f"Finished going up the stack: found {self._internal_function_name}"
                    )
                    # If on a function that stores the result in a variable, one 
                    # start step is necessary to "finish" the current line
                    if self._internal_line_number == self.tracker.next_lineno:
                        self.tracker.step()
                    self.is_handling_internal_brwatch = False
                    return False  # back to the initial frame
    
                lvl.debug(
                    f"Finishing {self.tracker.get_current_function_name()} to go up the stack to find {self._internal_function_name}"
                )
                pause_reason = self.tracker.send_direct_command("finish")
    
                if pause_reason.type is PauseReasonType.FUNCTION_FINISHED:
                    # that was expected :-) next round!
                    continue
    
                # need to handle other wanted or not stops
                if pause_reason.type is PauseReasonType.EXITED:
                    # somehow the program terminated
                    self.is_handling_internal_brwatch = False
                    return True  # should not try to contine after program exited...
    
                if pause_reason.type is PauseReasonType.BREAKPOINT:
                    # recursively handle the bp:
                    # - if user breakpoint, will return True
                    # - if internal breakpoint, will have gone up the stack back to
                    # original point and returned False
                    return self.handle_breakpoint(cont)
    
                if pause_reason.type is PauseReasonType.WATCHPOINT:
                    raise NotImplementedError
    
                if pause_reason.type is PauseReasonType.CALL:
                    raise NotImplementedError
                    self.handle_track(finish=False)
    
                # TODO: debug just to know the pause reason when fin
                lvl.error(f"Pause reason {pause_reason} not handled")
                raise NotImplementedError
    
    
        def add_short_cmds(self) -> None:
            """Extend the available commands with the gdb aliases."""
            for command in self.available_commands:
                if command == "next":
                    cmds = ["n"]
                elif command == "step":
                    cmds = ["s"]
                elif command == "break":
                    cmds = ["brea", "brea", "bre", "bre", "b"]
                elif command == "run":
                    cmds = ["r"]
                elif command == "continue":
                    cmds = ["c", "fg"]
                elif command == "condition":
                    cmds = ["cond"]
                else:
                    continue
                self.available_commands.extend(cmds)
    
        def _on_quit(self):
            """
            Quit handler. Called when asking gdb to quit or exit in the console.
            """
            lvl.warning("Exiting gdb...")
            self.stop_level()
    
        def _gdb_next(self, count: Optional[int] = None):
            """
            Emulate the next gdb command.
            We have to use a loop and not pass 'count' to gdb, as we need to handle
            internal bp, and there is no way of knowing how many 'next' gdb has executed before
            stopping on a bp.
            """
            count = count or 1
            if count > 1:
                # hide all the commands sent to gdb in the user console
                self.send_print_thread_command('pause')
            for _ in range(count):
                pause_reason = self.tracker.next()
                # self.update_state()
                if (
                    pause_reason.type is PauseReasonType.EXITED
                    or pause_reason.type is PauseReasonType.SIGNAL
                ):
                    self._internal_function_name = None
                    self._internal_line_number = None
                    break
                elif pause_reason.type is PauseReasonType.WATCHPOINT:
                    raise NotImplementedError
                    self.handle_watch()
                elif pause_reason.type is PauseReasonType.CALL:
                    raise NotImplementedError
                    self.handle_track()  # Internal breakpoint
                elif pause_reason.type is PauseReasonType.BREAKPOINT:
                    ret = self.handle_breakpoint()
                    if ret:
                        break
                elif pause_reason.type is PauseReasonType.ENDSTEPPING_RANGE:
                    # normal reason: 'next' is finished
                    continue
                elif pause_reason.type is PauseReasonType.ERROR:
                    lvl.error(f"Error from gdb resulting from _gdb_next")
                    raise ValueError("GDB error")
    
                elif pause_reason.type is PauseReasonType.UNKNOWN:
                    lvl.error(f"Pause reason {pause_reason} badly handled in _gdb_next")
                    raise NotImplementedError
                else:
                    lvl.error(f"Pause reason {pause_reason} not handled")
                    raise NotImplementedError
                    # break  # Breakpoint while handling CALL or WATCHPOINT
    
            if count > 1:
                self.send_print_thread_command('last-and-resume')
    
        def _gdb_continue(self, timeout: float|None = None):
            """Emulate the continue command.
            TODO add ignore_count parameter
            """
            while True:
                lvl.debug(f"Emulating continue: resuming in loop")
                pause_reason = self.tracker.resume()
                if pause_reason.type is PauseReasonType.WATCHPOINT:
                    raise NotImplementedError
                    self.handle_watch()
                elif pause_reason.type is PauseReasonType.CALL:
                    raise NotImplementedError
                    self.handle_track()
                elif pause_reason.type is PauseReasonType.BREAKPOINT:
                    lvl.debug(f"Got a breakpoint, will check if need to really stop")
                    ret = self.handle_breakpoint(cont=True)
                    lvl.debug(f"Back from handle breakpt")
                    if ret:
                        lvl.debug(f"Must stop now")
                        break
                else:
                    lvl.debug(f"Must stop because pause reason is {pause_reason.type}")
                    break
    
        def _gdb_interrupt(self):
            self.interrupt_inferior()
    
        def _gdb_break(
            self,
            loc_str: Optional[str] = None,
            line_no: Optional[int] = None,
        ) -> None:
            """Emulate the break command."""
            if line_no is not None:
                self.tracker.break_before_line(line_no)
            elif loc_str is not None:
                self.tracker.break_before_func(loc_str)
            else:
                current_line = self.tracker.next_lineno
                if current_line is not None:
                    self.tracker.break_before_line(current_line)
    
        def _gdb_step(self, count: Optional[int] = None):
            """Emulate gdb commands."""
            count = count or 1
            if count > 1:
                self.send_print_thread_command('pause')
            for _ in range(count):
                pause_reason = self.tracker.step()
                if pause_reason.type is PauseReasonType.WATCHPOINT:
                    raise NotImplementedError
                    self.handle_watch()
                elif pause_reason.type is PauseReasonType.CALL:
                    raise NotImplementedError
                    self.handle_track()
                elif pause_reason.type is PauseReasonType.BREAKPOINT:
                    ret = self.handle_breakpoint(cont=True)
                    if ret:
                        break
                else:
                    break
            if count > 1:
                self.send_print_thread_command('last-and-resume')
    
        def _gdb_finish(self):
            self.tracker.send_direct_command("finish")
    
        def send_print_thread_command(self, cmd):
            self.print_thread.send_command(cmd)
            # tracker.stdout_queue.put_nowait(('command', cmd))
    
        def send_print_thread_error(self, msg):
            self.print_thread.send_error(msg)
    
        def info(self, msg):
            """
            Just output informations from agdbentures (warning, errors, messages to player)
            """
            print(msg)
    
        # pylint: disable=too-many-branches
        def parse_and_eval(self, cmdline: str) -> None | str:
            """
            Parse a command and call the appropriate function.
    
            :param cmd: The command to run.
            :return: None if there is nothing else to do, or a string if subsequent 
            actions are required
            """
            def not_implemented(cmd: str) -> None:
                """Verbose error if not implemented."""
                self.info(f"Arguments for the '{cmd}' command are not handled yet")
    
            def usage(cmd: str, args: str) -> None:
                """Print usage of command that takes one single integer."""
                self.info(f"Usage: {cmd} {args}")
    
            if not cmdline or cmdline.isspace():
                if self.last_cmdline is None:
                    return  # Nothing to do
                cmdline = self.last_cmdline  # Using the last command
    
            # Saving function name
            self._internal_function_name = self.tracker.get_current_function_name()  # type: ignore
            self._internal_line_number = self.tracker.next_lineno
            # Cleaning up the command
            cmdline = cmdline.strip()
    
            # Saving the command unless the command is to restart
            if cmdline != "start":
                self.last_cmdline = cmdline
            if self.recording:
                self.record.append(["gdb", cmdline])
    
            # Parsing argument and conversion to integer
            cmd, *args = cmdline.split(maxsplit=1)
            arg_str = None
            arg_int = None
            if args and len(args) == 1:
                arg_str = args[0]
                if arg_str.isnumeric():
                    arg_int = int(arg_str)
                    # if cmd not in self.available_commands and
                    # cmd not in {"quit", "q", "exit"}:
            if False:
    
                self.info(f"{cmd} is not a valid command.")
                return
    
            lcmd = len(cmd)
    
            if cmd in ["run"]:
                self.inferior_started = True
    
            if cmd == "start":
                # Do not forward to gdb, notify the level thread that we have to 
                # restart the level
                return 'restart'
    
            elif cmd in ["n", "next"]:
                if arg_str is not None and arg_int is None:
                    usage(cmd, "[count_integer]")
                    return
                self._gdb_next(arg_int)
    
            elif cmd in ["s", "step"]:
                if arg_str is not None and arg_int is None:
                    usage(cmd, "[count_integer]")
                    return
                self._gdb_step(arg_int)
    
            # elif cmd in ["r", "run"]:
            #    if arg_str is not None or arg_int is not None:
            #        not_implemented(cmd)
            #        return
            #    self._gdb_continue()
    
            elif cmd in ["c", "cont", "continue"]:
                self._gdb_continue()
    
            elif cmd in ["b", "break"]:
                self._gdb_break(arg_str, arg_int)
    
            elif lcmd >= 3 and "finish".startswith(cmd):
                self._gdb_finish()
    
            elif cmd == "interrupt":
                self._gdb_interrupt()
    
            elif cmd in ["q", "qui", "quit", "exi", "exit"]:
                lvl.warning("Not allowed to quit gdb directly")
                # self._on_quit()
                return "gdb_exit"
            else:
                lvl.info(f"Command {cmd} not implemented yet.")
                try:
                    self.tracker.send_direct_command(cmdline)
                except RuntimeError:
                    print("Command does not exist!")
            # TODO add other commands (condition, ...)
            # and extend argument of existing commands (run, continue)
    
        def record_file(self):
            return os.path.join(
                self.source_level_dir,
                "record.in",
            )
    
        def print_prompt(self):
            self.send_print_thread_command('prompt')
    
    
        def interrupt_validation(self):
            """
            Used e.g., on an infinite loop during validation.
            TODO: use a timer to automatically send interrupt.
            """
            assert self.checker.tracker
            self.checker.tracker.interrupt()
    
    
        def send_out_queue(self, payload):
            """
            Send message to whoever has started us
            """
            self.out_queue.put_nowait(payload)
    
        ## Putting commands in our own queue
        def send_console_command(self, command: str, cmdtype="gdb", prompt=False):
            lvl.debug(f"Putting gdb command in level_in queue {command}")
            if prompt:
                self.print_prompt()
            self.in_queue.put_nowait({"type": cmdtype, "command": command})
            self.print_thread.show_gdb_command(command + '\n')
    
        ## Putting commands in our own queue
        def send_level_control(self, control_command: str):
            lvl.debug(f"Putting level command in level_in queue {control_command}")
            self.in_queue.put_nowait({"type": "control", "command": control_command})
    
        def start_record(self):
            self.recording = True
            self.send_console_command("start")
    
        def save_record(self):
            if not self.recording:
                raise RuntimeError("No record file")
            self.recording = False
            with open(self.record_file(), "w") as f:
                for source, command in self.record:
                    print(f"{source}:{command}", file=f)
    
            # empty current record
            self.record.clear()
    
        def play_record(self):
            if not os.path.exists(self.record_file()):
                raise RuntimeError("No record file")
    
            self.recording = False
    
            with open(self.record_file()) as f:
                commands = f.readlines()
    
            # self.send_console_command("start")
    
            for command in commands:
                source, text = command.strip("\n").split(":")
                # print("(gdb) ", end="")
                # TODO nice loop to wait for GDB to run before sending to stdin
                if source == "gdb":
                    self.send_console_command(text)
                elif source == "input":
                    self.send_console_command(text)
                else:
                    print(f"Unknown source in record {source}")
    
                # print(self.level.tracker.get_global_variables(True))
    
        def available_wop(self) -> set[tuple[int, int]]:
            """Return a list of WOP coordinates."""
            coordinates = set()
    
            for wop in self.map.wops:
                if wop.visible:
                    coordinates.add((wop.coord_x, wop.coord_y))
    
            return coordinates
    
        def get_wop_message(self) -> dict[str, list[str]]:
            """
            Return a dictionnary of messages, with the WOP
            name as key and its message as value (as a list of lines).
            """
            message_dict = {}
    
            assert False, "Deprecated function?"
            for wop in self.map.wops:
                if wop.visible and wop.talks:
                    wop.triggered = True
                    message_dict[wop.name] = wop.message_list
                    wop.talks = False
    
            return message_dict
    
        def setup(self) -> None:
            """
            Set the level. Needs to be defined in the level-specific .py file
            """
            pass
    
        def __del__(self):
            self.stop_level()
    
        def stop_level(self):
            """
            Terminate the tracker
            TODO: investigate, useless to send None to the GUI. Can we send something else?
            """
            # self.out_queue.put_nowait({'topic': "quit"})
            self.out_queue.put_nowait(None)
            self.terminate_tracker()
    
        def terminate_tracker(self):
            """
            Safe function to call when we want to terminate the tracker.
            Ensures the tracker exists before terminating it, and sets the tracker to None.
            """
            if not self.tracker:
                return
            self.tracker.terminate()
            self.tracker = None
    
    
        @abstractmethod
        def test(self):
            """Test the level."""
    
        @abstractmethod
        def run(self):
            """Run the level."""