Mentions légales du service

Skip to content
Snippets Groups Projects
Commit aace3476 authored by SZCZEPANSKI Marin's avatar SZCZEPANSKI Marin
Browse files

Merge branch 'main' of gitlab.inria.fr:CORSE/agdbentures into main

parents 71c3741f 62708ccf
No related branches found
No related tags found
No related merge requests found
......@@ -11,3 +11,8 @@ To get it:
Then checkout the last version using:
> git submodule update
## Generating a level
% python make_level.py levels/local_struct
Subproject commit 6f9b2a2cfb1911beebb2f9015fb41d86e50b0810
Subproject commit 49da81f86e7e7f45ad11e8af7e03c8dbbfabb235
......@@ -18,6 +18,8 @@
* track: load_map:mstack load_map:player_x load_map:player_y
*
* BUG: several bugs. One is a string comparison forgetting the \0, one is a template used instead of malloc, one is a stack alloc, the last one is an inversion of y and x
*
* engine: fullgame
* tag: inverse_xy stack_alloc template string_comparison
*
* HINT1: What is the value of current_map()->entities for switches?
......
......@@ -18,33 +18,40 @@ UP, DOWN, LEFT, RIGHT = 0, 1, 2, 3
DIR_NAME = ["up", "down", "left", "right"]
DELTA = [
# dx dy
# dx dy
(0, 1), # UP
(0, -1), # DOWN
(-1, 0), # LEFT
(0, -1), # DOWN
(-1, 0), # LEFT
(1, 0), # RIGHT
]
def translate_direction(dir):
if dir == 0: # Game up
if dir == 0: # Game up
game_dir = 0
elif dir == 1: # Game down
elif dir == 1: # Game down
game_dir = 2
elif dir == 2: # Game left
elif dir == 2: # Game left
game_dir = 3
else: # Game right
else: # Game right
game_dir = 1
return game_dir
class Player(arcade.Sprite):
"""Player Class."""
def __init__(
self,
sprites_dir, tile_size, map_height,
position_x, position_y, direction,
sprites_dir,
tile_size,
map_height,
position_x,
position_y,
direction,
scale=1,
movement_speed=4,
update_per_frame=5
update_per_frame=5,
):
"""
Player initialization.
......@@ -62,13 +69,15 @@ class Player(arcade.Sprite):
# Arcade's coordonate is centered at the bottom left corner.
# Hence, we need to invert the y axis.
center_x = (position_x + 1/2) * tile_size
center_y = (map_height - position_y - 1/2) * tile_size
center_x = (position_x + 1 / 2) * tile_size
center_y = (map_height - position_y - 1 / 2) * tile_size
# Initialization of the parent class.
super().__init__(
f"{sprites_dir}/player_idle_{DIR_NAME[direction]}.png",
scale, center_x=center_x, center_y=center_y
scale,
center_x=center_x,
center_y=center_y,
)
# Player info
......@@ -89,7 +98,8 @@ class Player(arcade.Sprite):
self.idle_texture = []
for i in range(4):
texture = arcade.load_texture(
f"{self.sprites_dir}/player_idle_{DIR_NAME[i]}.png")
f"{self.sprites_dir}/player_idle_{DIR_NAME[i]}.png"
)
self.idle_texture.append(texture)
# Moving textures
......@@ -98,7 +108,9 @@ class Player(arcade.Sprite):
for frame in range(3):
self.walk_texture[dir].append(
arcade.load_texture(
f"{self.sprites_dir}/player_walk_{DIR_NAME[dir]}_{frame}.png"))
f"{self.sprites_dir}/player_walk_{DIR_NAME[dir]}_{frame}.png"
)
)
def update(self):
"""Move the player."""
......@@ -121,16 +133,23 @@ class Player(arcade.Sprite):
# Slow down if we are reaching the target
# Corner case test (if the distance delta per frame do not
# allow us to reach the center of the target tile directly)
if (self.change_x > 0 and self.center_x > self.target_x or
self.change_x < 0 and self.center_x < self.target_x):
if (
self.change_x > 0
and self.center_x > self.target_x
or self.change_x < 0
and self.center_x < self.target_x
):
self.center_x = self.target_x
if (self.change_y > 0 and self.center_y > self.target_y or
self.change_y < 0 and self.center_y < self.target_y):
if (
self.change_y > 0
and self.center_y > self.target_y
or self.change_y < 0
and self.center_y < self.target_y
):
self.center_y = self.target_y
# End of motion
if (self.center_x == self.target_x and
self.center_y == self.target_y):
if self.center_x == self.target_x and self.center_y == self.target_y:
self.moving = False
# Update the current tile
......@@ -144,8 +163,8 @@ class Player(arcade.Sprite):
self.change_x = dx * self.movement_speed
self.change_y = dy * self.movement_speed
self.target_x = self.center_x + dx*self.tile_size
self.target_y = self.center_y + dy*self.tile_size
self.target_x = self.center_x + dx * self.tile_size
self.target_y = self.center_y + dy * self.tile_size
self.moving = True
......@@ -156,8 +175,8 @@ class Player(arcade.Sprite):
dx, dy = DELTA[self.direction]
self.points = [
self.center_x + dx*self.tile_size,
self.center_y + dy*self.tile_size
self.center_x + dx * self.tile_size,
self.center_y + dy * self.tile_size,
]
def right(self):
......@@ -167,8 +186,8 @@ class Player(arcade.Sprite):
dx, dy = DELTA[self.direction]
self.points = [
self.center_x + dx*self.tile_size,
self.center_y + dy*self.tile_size
self.center_x + dx * self.tile_size,
self.center_y + dy * self.tile_size,
]
def goto(self, coord_x, coord_y):
......@@ -179,7 +198,7 @@ class Player(arcade.Sprite):
self.direction = LEFT if delta_x < 0 else RIGHT
dx, _ = DELTA[self.direction]
dist_x = abs(delta_x)
self.target_x = self.center_x + dx*dist_x*self.tile_size
self.target_x = self.center_x + dx * dist_x * self.tile_size
self.change_x = dx * self.movement_speed
# Moving on the y axis
......@@ -188,14 +207,15 @@ class Player(arcade.Sprite):
self.direction = UP if delta_y < 0 else DOWN
_, dy = DELTA[self.direction]
dist_y = abs(delta_y)
self.target_y = self.center_y + dy*dist_y*self.tile_size
self.target_y = self.center_y + dy * dist_y * self.tile_size
self.change_y = dy * self.movement_speed
self. moving= True
self.moving = True
class ArcadeUI(arcade.Window, threading.Thread):
"""Arcade GUI of the map."""
def __init__(self, map, scaling, sprites_dir):
"""
Initialize the interface.
......@@ -209,18 +229,13 @@ class ArcadeUI(arcade.Window, threading.Thread):
# Window size
self.tile_size = self.tile_map.tile_height * self.tile_map.scaling
width = int(min(MAX_WIN_WIDTH,
self.tile_map.width*self.tile_size))
height = int(min(MAX_WIN_HEIGHT,
self.tile_map.height*self.tile_size))
width = int(min(MAX_WIN_WIDTH, self.tile_map.width * self.tile_size))
height = int(min(MAX_WIN_HEIGHT, self.tile_map.height * self.tile_size))
# Parent class initialization
arcade.Window.__init__(
self,
width, height,
title="agdbentures",
resizable=True
) # type: ignore
self, width, height, title="agdbentures", resizable=True
) # type: ignore
threading.Thread.__init__(self, daemon=True)
# Command queue used for inter-thread comminucation.
self.queue = queue.SimpleQueue()
......@@ -229,10 +244,9 @@ class ArcadeUI(arcade.Window, threading.Thread):
self.tile_scaling = 4
self.scene = self.tile_map.sprite_lists["scene"]
self.walls = self.tile_map.sprite_lists["walls"]
arcade.set_background_color(
self.tile_map.background_color) # type: ignore
player_x = self.tile_map.properties['player_x'] # type: ignore
player_y = self.tile_map.properties['player_y'] # type: ignore
arcade.set_background_color(self.tile_map.background_color) # type: ignore
player_x = self.tile_map.properties["player_x"] # type: ignore
player_y = self.tile_map.properties["player_y"] # type: ignore
self.player = Player(
sprites_dir,
......@@ -240,7 +254,7 @@ class ArcadeUI(arcade.Window, threading.Thread):
self.tile_map.height,
position_x=player_x,
position_y=player_y,
direction=RIGHT
direction=RIGHT,
)
# Cameras
......@@ -307,10 +321,8 @@ class ArcadeUI(arcade.Window, threading.Thread):
def center_camera_to_player(self):
# Find where player is, then calculate lower left corner from that
screen_center_x = (self.player.center_x
- self.camera_sprites.viewport_width/2)
screen_center_y = (self.player.center_y
- self.camera_sprites.viewport_height/2)
screen_center_x = self.player.center_x - self.camera_sprites.viewport_width / 2
screen_center_y = self.player.center_y - self.camera_sprites.viewport_height / 2
# Set some limits on how far we scroll
if screen_center_x < 0:
......@@ -320,7 +332,7 @@ class ArcadeUI(arcade.Window, threading.Thread):
# Here's our center, move to it
player_centered = screen_center_x, screen_center_y
self.camera_sprites.move_to(player_centered) # type: ignore
self.camera_sprites.move_to(player_centered) # type: ignore
def on_key_release(self, key, modifiers):
"""Called when the user releases a key."""
......
"""
Events implementation for the python motor.
"""
from collections import defaultdict
from inspect import signature
from typing import Callable, Optional, Any
class EventList:
"""
Event list class.
Stores events in a dictionnary of events.
The value of this dictionnary are callback functions.
"""
def __init__(self, variables):
"""
Event list initialization.
:param variables: Dictionnary of variables.
"""
self.events = defaultdict(set)
self.variables = variables
def connect(self, event: str, function: Callable) -> None:
"""
Connect a function to an event.
:param event: The name of the event to listen.
:param function: The function to call on update.
If the function arity is 2, then on update,
it will be called with the variable name and its
new value. If the function arity is 1, it will be
called with the updated value.
"""
self.events[event].add(function)
def disconnect(
self, function: Callable, event: Optional[str] = None
) -> None:
"""
Disconnect a function from an event.
:param function: The function to disconnect.
:param event: The event to work on. If not given, disconnects the
function from all the events.
"""
if event is not None:
self.events[event].remove(function)
else:
for function_set in self.events.values():
function_set.discard(function)
def on_value_change(self, variable_list: list[str]) -> None:
"""
Trigger the update of list of variables.
:param variable_list: List of variables that have been updated.
"""
for variable in variable_list:
for function in self.events[f"value_change:{variable}"]:
# Checking the function arity
arity = len(signature(function).parameters)
if arity not in [1, 2]:
raise RuntimeError(f"Wrong arity for event callback funtion {function.__name__}")
if arity == 1:
function(self.variables[variable])
else:
function(variable, self.variables[variable])
def trigger(self, event: str, *args: Optional[list[Any]]) -> None:
"""
Trigger an event.
:param event: The event to trigger.
:param args: The arguments to give to the functions connected to the event.
"""
for function in self.events[event]:
function(*args)
This diff is collapsed.
"""Abstract level implementation."""
from abc import ABC, abstractmethod
from typing import Optional, Any
import subprocess
import sys
from easytracker import init_tracker, PauseReasonType
from level.events import EventList
from level.map import Map
from level.objects import Player, Object, Wop
from level.variables import value, list_variables, expand_expr
# pylint: disable=too-many-instance-attributes fixme
class AbstractLevel(ABC):
"""
Abstract level class that allows to use different
configurations.
"""
def __init__(self, metadata_dict: dict):
"""
Initialize the level
:param metadata_dict: the dictionnary that describes the level
:param cursed: boolean that tells if a curses UI should be used
"""
self.level_number = metadata_dict["level_number"]
self.level_name = metadata_dict["level_name"]
self.program_name = metadata_dict["program_name"]
# Map
# TODO add tiles dicts and fallback tile to metadata_dict
# FIXME TODO connect the map to the value change of map_array
# self.get_variable("map_array")
self.map: Map = Map(metadata_dict["map_width"], metadata_dict["map_height"])
# Available commands
self.available_commands = metadata_dict["available_commands"]
self.internal_commands = ["quit", "q", "exit"]
# Variable names
self.alias = metadata_dict["alias"]
# Bugs description
self.bug = metadata_dict["bug"]
# Hints
self.hints = metadata_dict["hints"]
# Wowms
self.wowms = metadata_dict["wowms"]
# Variables to watch
self.watch = metadata_dict["watch"]
# Functions to track
self.track = metadata_dict["track"]
# Internal infos
self.last_cmd = None
self.prompt = "command: "
self.server_off_warned = False
self.add_short_cmds()
self.variables: dict[str, Any] = {}
self.function_name = "main"
# Tracker
self.tracker = init_tracker("GDB")
# Event list
self.event_list = EventList(self.variables)
# Internal watchpoints
self.internal_wp = set()
# Add the player to the map
player = Player(
self.variables,
expand_expr("player_x", self.alias),
expand_expr("player_y", self.alias),
expand_expr("player_direction", self.alias),
)
self.map.add_player(player)
# Connect the player to event change
for expr in [player.var_x, player.var_x, player.var_dir]:
# Getting the variable if it is a struct
var, *_ = expr.split(".", 1)
self.event_list.connect(f"value_change:{var}", player.update)
# Adding the exit to the map
exit_obj = Object(
self.variables,
expand_expr("exit_x", self.alias),
expand_expr("exit_y", self.alias),
char_rep="@",
)
self.map.add_exit(exit_obj)
# TODO refactor this (code duplication)
for expr in [exit_obj.var_x, exit_obj.var_y]:
var, *_ = expr.split(".", 1)
self.event_list.connect(f"value_change:{var}", exit_obj.update)
# Create a WOP
for wop_name, wop_dict in metadata_dict["wowms"].items():
wop = Wop(self.variables, wop_name, wop_dict)
self.map.add_wop(wop)
# Watch necessary variables
variables_to_watch = (
list_variables(wop_dict["message_conditions"], self.alias)
| list_variables(wop_dict["visibility_conditions"], self.alias)
)
for variable in variables_to_watch:
self.event_list.connect(f"value_change:{variable}", wop.update)
def update_state(self) -> None:
"""Updates the copy of the inferiors' variables."""
# Update variables
updated_vars = self.update_global_variables()
pause_type = self.tracker.pause_reason.type
if pause_type is PauseReasonType.WATCHPOINT:
updated_vars += self.handle_watch()
elif pause_type is PauseReasonType.BREAKPOINT:
updated_vars += self.handle_bp()
# Trigger value updates
self.event_list.on_value_change(updated_vars)
def run_tracker(self) -> None:
"""Start the tracker and loads the binary program."""
self.tracker.load_program(self.program_name)
self.tracker.start()
# Saving infos
self.function_name = self.tracker.get_current_function_name()
# Set the watchpoints
for var in self.watch:
wp_no = self.tracker.watch(var)
self.internal_wp.add(wp_no)
# Track functions
for func in self.track:
self.tracker.track_function(func)
def handle_watch(self) -> list[str]:
"""
Updates the variables value change when a watchpoint is
triggered.
Makes sure that we go back to the initial frame.
:param init_pause_reason: The first WATCHPOINT pause reason.
:return: List of variable that have changed.
"""
def update_vars(args: list[str]) -> Optional[str]:
"""
Update the variables by reading the arguments of a watchpoint.
:param arg: List argument list of the watchpoint's pausereason.
:return: The name of the variable that has changed.
"""
# Nothing has changed
if len(args) != 4:
return None
var = args[1]
content = self.tracker.get_variable_value(var, as_raw_python_objects=True)
# Storing it
self.variables[var] = content.value # type: ignore
return var
updated_vars = []
pause_reason = self.tracker.pause_reason
# Check if the breakpoint is user defined
bp_no = int(pause_reason.args[0]) # type: ignore
user_defined = bp_no in self.internal_wp
# Going up in frames
if not user_defined:
pause_reason = self.tracker.pause_reason
# Continue til the correct breakpoint
while True:
if pause_reason.type is PauseReasonType.WATCHPOINT:
var = update_vars(pause_reason.args) # type: ignore
if var:
updated_vars.append(var)
elif pause_reason.type is PauseReasonType.BREAKPOINT:
updated = self.handle_bp(finish=False)
if updated:
updated_vars += updated
else:
break # Reached an user defined breakpoint
# Break in the same function
if self.tracker.get_current_function_name() == self.function_name:
return updated_vars
pause_reason = self.tracker.send_direct_command("continue")
return updated_vars # Variables that have changed
def handle_bp(self, finish: bool = True) -> list[str]:
"""
Test if a breakpoint is user defined or not.
If not, read the values of the variables of the tracked
function.
:param finish: Tells if a finish command should be used.
:return: List of updated variables
"""
# FIXME -> waiting for easytracker implementation of bpno
# Is it user defined?
user_defined = False
if user_defined:
return []
updated_vars = []
current_fun = self.tracker.get_current_function_name()
for var in self.track[current_fun]:
content = self.tracker.get_variable_value(var, as_raw_python_objects=True)
# Storing it
self.variables[var] = content.value # type: ignore
updated_vars.append(var)
# Stepping out of the function
if finish and current_fun != "main":
self.tracker.send_direct_command("finish")
return updated_vars # Variables that have changed
def show_next_line(self) -> None:
"""Highlight the next line in the remote vim server."""
with subprocess.Popen(
"vim --servername tuto --remote-expr "
f"'HighlightLine({self.tracker.next_lineno})'",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, # silent if no server found
) as process:
process.wait()
def add_short_cmds(self) -> None:
"""Extend the available commands with the gdb aliases."""
for command in self.available_commands:
if command == "next":
cmds = ["n"]
elif command == "step":
cmds = ["s"]
elif command == "break":
cmds = ["brea", "brea", "bre", "bre", "b"]
elif command == "run":
cmds = ["r"]
elif command == "continue":
cmds = ["c", "fg"]
elif command == "condition":
cmds = ["cond"]
else:
continue
self.available_commands.extend(cmds)
# pylint: disable=too-many-branches
def parse_and_eval(self, cmd: str) -> None:
"""
Parse a command and call the appropriate function.
:param cmd: The command to run.
"""
def not_implemented(cmd: str) -> None:
"""Verbose error if not implemented."""
print(f"Arguments for the '{cmd}' command are not handled yet")
def usage(cmd: str, args: str) -> None:
"""Print usage of command that takes one single integer."""
print(f"Usage: {cmd} {args}")
if not cmd or cmd.isspace():
if self.last_cmd is None:
return # Nothing to do
cmd = self.last_cmd # Using the last command
# Saving the command
self.last_cmd = cmd
# Parsing argument and conversion to integer
cmd, *args = cmd.split(maxsplit=1)
if len(args) == 0:
arg = None
arg_int = 0 if cmd in ["q", "quit", "exit"] else 1
elif len(args) == 1:
arg = args[0]
arg_int = int(arg) if arg.isnumeric else "nan"
else:
arg = " ".join(args)
arg_int = "nan"
# TODO: Do not print to verbose errors. Use a method that can be overrided.
if cmd not in self.available_commands and cmd not in self.internal_commands:
print(f"{cmd} not accessible in the level.")
return
if cmd in ["n", "next"]:
if arg_int == "nan":
usage(cmd, "[count_integer]")
return
# Add a breakpoint so we can recover from watchpoints
self.tracker.send_direct_command(f"tbreak +{arg_int}")
self.tracker.next(arg_int)
elif cmd in ["s", "step"]:
if arg_int == "nan":
usage(cmd, "[count_integer]")
return
self.tracker.step(arg_int)
elif cmd in ["r", "run"]:
if arg:
not_implemented(cmd)
return
self.tracker.run()
elif cmd in ["c", "continue"]:
if arg:
not_implemented(cmd)
return
self.tracker.resume()
elif cmd in ["b", "break"]:
if arg and arg_int != "nan":
self.tracker.break_before_line(arg_int)
elif arg:
self.tracker.break_before_func(arg)
else:
current_line = self.tracker.next_lineno
if current_line is not None:
self.tracker.break_before_line(current_line)
elif cmd in ["q", "quit", "exit"]:
sys.exit()
else:
print(f"Command {cmd} not implemented yet.")
# TODO add other commands (condition, ...)
# and extend argument of existing commands (run, continue)
def player_char(self, direction: int) -> str:
"""Return the oriented character"""
up_value = int(self.alias["up"])
down_value = int(self.alias["down"])
left_value = int(self.alias["left"])
right_value = int(self.alias["right"])
if direction == up_value:
return "^"
if direction == down_value:
return "v"
if direction == left_value:
return "<"
if direction == right_value:
return ">"
# Unknown direction
return "?"
def update_global_variables(self) -> list[str]:
"""
Update the global variables.
:return: The list of the variables that have changed.
"""
globs = self.tracker.get_global_variables(as_raw_python_objects=True)
updated_vars = []
# Reading global variables
for var_name, var in globs.items():
if var_name not in self.variables or var.value != self.variables[var_name]:
self.variables[var_name] = var.value # type: ignore
updated_vars.append(var_name)
return updated_vars
def get_variable(self, var: str) -> Any:
"""
Fetch the value of a variable.
:param var: The variable to fetch.
"""
# Translation into the inferior variable name
target = self.alias[var]
return value(target, self.variables)
def eval_condition(self, condition: str) -> Optional[bool]:
"""
Evaluates a condition while making sure that we use
the inferior variable names for the information like
player_x or player_x.
:param condition: The condition to evaluate.
:return: The boolean value of the condition.
"""
# Expand the expression
resolved_condition = expand_expr(condition, self.alias)
return value(resolved_condition, self.variables)
def available_wowm(self) -> set[tuple[int, int]]:
"""Return a list of WOWM coordinates."""
coordinates = set()
for wop in self.map.wops:
if wop.visible:
coordinates.add((wop.coord_x, wop.coord_y))
return coordinates
def get_wowm_message(self) -> dict[str, list[str]]:
"""
Return a dictionnary of messages, with the WOWM
name as key and its message as value (as a list of lines).
"""
message_dict = {}
for wop in self.map.wops:
if wop.visible and wop.talks:
wop.triggered = True
message_dict[wop.name] = wop.message_list
wop.talks = False
return message_dict
@abstractmethod
def run(self):
"""Run the level."""
"""Arcade level implementation."""
from easytracker import PauseReasonType
from level.arcade_ui import ArcadeUI
from level.utils import create_map
from level.level_abc import AbstractLevel
# pylint: disable=fixme
class LevelArcade(AbstractLevel):
"""Level using arcade."""
def __init__(self, metadata_dict: dict):
# Parent initialization
super().__init__(metadata_dict)
# Creating the UI
# TODO, FIXME: Hardcoded path
map_path = "level/resources/map.tmx"
create_map(metadata_dict, map_path)
self.gui = ArcadeUI(map_path, scaling=4, sprites_dir="level/resources/sprites")
# TODO, FIXME: Hardcoded path
# TODO add buttons
@staticmethod
def read_cmd():
"""Read a command from stdin."""
# TODO use readline
try:
cmd = input("agdb: ")
except EOFError:
cmd = "q"
return cmd
def update_map(self):
"""Update the GUI map."""
player_x = self.get_variable("player_x")
player_y = self.get_variable("player_y")
player_direction = self.get_variable("player_direction")
self.gui.queue.put(f"goto {player_x} {player_y}")
self.gui.queue.put(f"set_direction {player_direction}")
def run(self):
self.gui.start()
self.run_tracker()
while self.tracker.pause_reason.type != PauseReasonType.EXITED:
self.show_next_line()
self.update_global_variables()
self.update_map()
# self.print_wowm_msg() # TODO implement this
cmd = self.read_cmd()
self.parse_and_eval(cmd)
# TODO write this in the GUI
if self.tracker.exit_code == 0:
print("VICTORY")
else:
print("DEFEAT")
"""Curses level implementation."""
from typing import Callable
import curses
import curses.textpad
from easytracker import PauseReasonType
from level.level_abc import AbstractLevel
# pylint: disable=fixme
class LevelCurses(AbstractLevel):
"""Level using curses."""
# pylint: disable=too-many-locals
def show_map(self, stdscr: curses.window, pad: curses.window) -> None:
"""Update the cursed map according the variables."""
pad.clear()
pad.box()
# Fetching the map
list_map = self.map.get_ascii_map()
for i, line in enumerate(list_map, 1):
line_str = "".join(line)
pad.addstr(i, 1, line_str)
# Adding the WOWM
for wowm_x, wowm_y in self.available_wowm():
pad.addch(wowm_y + 1, wowm_x + 1, "W")
# Update and refresh
lines, cols = stdscr.getmaxyx()
smincol = int(cols / 2 - self.map.width / 2 - 1)
sminrow = int(lines / 2 - self.map.height / 2 - 1)
smaxcol = smincol + self.map.width + 2
smaxrow = sminrow + self.map.height + 2
stdscr.clear()
stdscr.refresh()
pad.refresh(0, 0, sminrow, smincol, smaxrow, smaxcol)
@staticmethod
def box_msg(
stdscr: curses.window, message_list: list[str], title: str = ""
) -> None:
"""
Show a message in a box.
:param stdscr: The window to display the message in.
:param messages: The message to display, given as a list of lines.
:param title: The title of the box.
"""
# Pad init
max_msg_len = max(map(len, message_list)) + 2
pad_height = len(message_list) + 2
pad_width = max(max_msg_len, len(title) + 2)
pad = curses.newpad(pad_height, pad_width)
pad.box()
# Add the title of the box
if title:
pad.addstr(0, 1, title)
# Add the content
for line, message in enumerate(message_list, start=1):
pad.addstr(line, 1, message)
# Center the pad
lines, cols = stdscr.getmaxyx()
smincol = int(cols / 2 - pad_width / 2)
sminrow = int(lines / 2 - pad_height / 2)
smaxcol = smincol + pad_width + 2
smaxrow = sminrow + pad_height + 2
pad.refresh(0, 0, sminrow, smincol, smaxrow, smaxcol)
# Wait for confirmation
stdscr.getkey()
def read_cmd(
self,
stdscr: curses.window,
input_win: curses.window,
resize_func: Callable[[], None],
) -> str:
"""
Read a command while handling resizes.
:param stdscr: The main window to write in.
:param input_win: The input window.
:param resize_func: The function to call in a resize event.
:return: The raw input.
"""
# Show a prompt
lines, _ = stdscr.getmaxyx()
stdscr.addstr(lines - 1, 0, self.prompt)
stdscr.refresh()
# New Input Box:
cmd_pad = curses.textpad.Textbox(input_win)
# Show the cursor
curses.curs_set(True)
def on_resize() -> None:
"""Update the windows on resize."""
# Update the map
resize_func()
# Update the input window's coordinates
lines, cols = stdscr.getmaxyx()
input_win.resize(1, cols - len(self.prompt))
input_win.mvwin(lines - 1, len(self.prompt))
# Show the prompt
stdscr.addstr(lines - 1, 0, self.prompt)
stdscr.refresh()
def validator(char: int) -> int:
"""
Textpad validator.
By default, the `edit` method of the Textbox instance
terminates the input when Control-G is received.
We want it to end with the enter key.
"""
if char == curses.KEY_RESIZE:
on_resize()
elif char == ord("\n"):
char = 7 # End the command (Control-G)
return char
# Read the command
input_win.clear()
cmd = cmd_pad.edit(validator)
# Hide the cursor
curses.curs_set(False)
return cmd
def print_wowm_msg(self, stdscr: curses.window) -> None:
"""
Print the WOWM message.
:param stdscr: The main window.
"""
for wowm_id, wowm_message in self.get_wowm_message().items():
self.box_msg(stdscr, wowm_message, title=f"WOWM{wowm_id}")
def run(self) -> None:
"""Run in a curses interface."""
def runner(stdscr: curses.window):
"""Runner that will be called by the curses wrapper."""
self.run_tracker()
pause_type = self.tracker.pause_reason.type
# Prepare curses
lines, cols = stdscr.getmaxyx()
pad = curses.newpad(self.map.height + 2, self.map.width + 2)
input_win = curses.newwin(
1, cols - len(self.prompt), lines - 1, len(self.prompt)
)
curses.curs_set(False)
stdscr.clear()
while pause_type != PauseReasonType.EXITED:
# Printing the output
# TODO add a dedicated pad for the stdout
stdout = self.tracker.get_inferior_stdout()
if stdout:
self.box_msg(stdscr, stdout.split("\n"), title="stdout")
# Saving the function name
self.function_name = self.tracker.get_current_function_name()
# Update the code visualizator
self.show_next_line()
# Fetch variables
self.update_state()
# Show the map
self.show_map(stdscr, pad)
self.print_wowm_msg(stdscr)
# Read a command and run it
resize_func = lambda: self.show_map(stdscr, pad)
cmd = self.read_cmd(stdscr, input_win, resize_func)
self.parse_and_eval(cmd)
pause_type = self.tracker.pause_reason.type
if self.tracker.exit_code == 0:
self.box_msg(stdscr, ["VICTORY"])
else:
self.box_msg(stdscr, ["DEFEAT"])
# Call the runner
curses.wrapper(runner)
"""Text level implementation."""
from easytracker import PauseReasonType
from level.level_abc import AbstractLevel
class LevelText(AbstractLevel):
"""Level using text output."""
def show_map(self) -> None:
"""Print a ascii map."""
# Fetching the map
ascii_map = self.map.get_ascii_map()
# Adding the WOWM
for wowm_x, wowm_y in self.available_wowm():
ascii_map[wowm_y][wowm_x] = "W"
# Printing the middle lines
for line in ascii_map:
print("".join(line))
def print_wowm_msg(self) -> None:
"""Print the WOWM message."""
for wowm_id, message_list in self.get_wowm_message().items():
wowm_message = "\n".join(message_list)
print(f"WOMW{wowm_id} says:\n{wowm_message}")
def read_cmd(self) -> str:
"""Read a command from keyboard."""
try:
cmd = input(self.prompt)
except EOFError:
cmd = "q"
return cmd
def run(self) -> None:
"""Run in a text interface."""
self.run_tracker()
pause_type = self.tracker.pause_reason.type
while pause_type != PauseReasonType.EXITED:
# Printing the prog's output
stdout = self.tracker.get_inferior_stdout()
if stdout:
print(f"stdout: {stdout}")
# Saving the function name
self.function_name = self.tracker.get_current_function_name()
# Update the code visualizator
self.show_next_line()
# Fetch variables
self.update_state()
# Show the map
self.show_map()
self.print_wowm_msg()
# Read a command and run it
cmd = self.read_cmd()
self.parse_and_eval(cmd)
pause_type = self.tracker.pause_reason.type
if self.tracker.exit_code == 0:
print("VICTORY")
else:
print("DEFEAT")
"""Map implementation."""
from copy import deepcopy
from typing import Optional, Union
from level.objects import Object, Player, Wop
_TILES_DICT = {"WATER": "~", "DOOR": "D", "WALL": " "}
# pylint: disable=too-many-instance-attributes
class Map:
"""Map class."""
def __init__(
self,
width: int,
height: int,
tiles_dict: Optional[dict[str, str]] = None,
fallback_tile: str = " ",
):
"""
Map initialization.
:param width: The map width.
:param height: The map height.
:param tiles_dict: The dictionnary that give the string mapping
of the enums.
:param fallback_tile: Tile to use if the value is not defined.
"""
self.width = width
self.height = height
self.map = [[" "] * self.width for _ in range(self.height)]
if tiles_dict is None:
self.tiles = _TILES_DICT
else:
self.tiles = tiles_dict
self.fallback_tile = fallback_tile
self.unnammed_objects: set[Object] = set()
self.nammed_objects: dict[str, Object] = {}
self.player: Union[Player, None] = None
self.exit: Union[Object, None] = None
self.wops: set[Wop] = set()
def enum_to_char(self, enum: str) -> str:
"""
Convert an enum to a char (tile of the map).
Override this method to have a custom tiles
:param enum: The enum to convert.
"""
# Maybe using defaultdict?
if enum in self.tiles:
return self.tiles[enum]
return self.fallback_tile
def update_floor(self, map_enum: list[str]) -> None:
"""
Update the map from a list of strings.
:param map_enum: List that describes the map.
"""
for line in self.map:
for i, _ in enumerate(line):
line[i] = " "
if not map_enum or map_enum == (None,):
return
assert len(map_enum) == self.width * self.height
# Filling the map
for i in range(self.width):
for j in range(self.height):
self.map[j][i] = self.enum_to_char(map_enum[i + j * self.width])
def get_ascii_map(self) -> list[list[str]]:
"""
Return a list that represents the map.
:return: The ascii map with its objects.
"""
ascii_map = deepcopy(self.map)
# Adding unnammed_objects
for obj in self.unnammed_objects:
if obj.visible:
ascii_map[obj.coord_y][obj.coord_x] = obj.char_rep # type: ignore
# Adding nammed objects
for name, obj in self.nammed_objects.items():
if obj.visible and name not in ["player", "exit"]:
ascii_map[obj.coord_y][obj.coord_x] = obj.char_rep # type: ignore
# Adding the player and the exit
for obj in [self.exit, self.player]:
if obj and obj.visible:
ascii_map[obj.coord_y][obj.coord_x] = obj.char_rep # type: ignore
return ascii_map
def add_object(self, obj: Object, name: Optional[str] = None) -> None:
"""
Add an object to the map.
:param obj: The object to add.
:param name: The obj's name.
"""
if name is None:
self.unnammed_objects.add(obj)
else:
self.nammed_objects[name] = obj
def add_player(self, player: Player) -> None:
"""Add a player to the map."""
self.add_object(player, "player")
self.player = player
def add_exit(self, exit_obj: Object) -> None:
"""Add an exit to the map."""
self.add_object(exit_obj, "exit")
self.exit = exit_obj
def add_wop(self, wop: Wop) -> None:
"""Add a wop to the map."""
self.wops.add(wop)
"""Map objects."""
from typing import Any, Union, Optional
from level.variables import value
# Default direction
_DIRECTION_DICT = {"up": 0, "down": 1, "left": 2, "right": 3}
# pylint: disable=too-many-instance-attributes too-many-arguments
# pylint: disable=too-few-public-methods fixme
class Object:
"""Object class."""
def __init__(
self,
variables_env: dict[str, Any],
var_x: str,
var_y: str,
var_dir: Optional[str] = None,
char_rep: str = "o",
visibility_conditions: str = "True",
direction_dict: Optional[dict[str, Union[int, str]]] = None,
):
"""
Object initialization.
:param variables_env: The environment to use to evaluate the conditions.
:param var_x: The variable that stores its x coordinate.
:param var_y: The variable that stores its y coordinate.
:param var_dir: The variable that stores its direction.
:param char_rep: Its ascii character representation.
:param visibility_conditions: The python condition that determines its
:param direction_dict: Dictionnary that maps string directions to integers.
visibility.
"""
self.env = variables_env
self.var_x = var_x
self.var_y = var_y
self.var_dir = var_dir
self._char_rep = char_rep
self.visibility_conditions = visibility_conditions
self.dir_mapping = _DIRECTION_DICT if direction_dict is None else direction_dict
# dynamic properties
self.coord_x: Union[int, None] = None
self.coord_y: Union[int, None] = None
self.direction: Union[int, None] = None
self.visible: bool = False
# properties initialization
Object.update(self, "", None) # Ugly I know right...
def update(self, var: str, new_value: Union[int, None]) -> None:
"""
Update object property.
:param var: The variable that has been updated.
:param new_value: The updated value.
"""
assert not var or self.env[var] == new_value
if self.var_x.startswith(var):
self.coord_x = value(self.var_x, self.env)
if self.var_y.startswith(var):
self.coord_y = value(self.var_y, self.env)
if self.var_dir is not None and self.var_dir.startswith(var):
self.direction = value(self.var_dir, self.env)
self.visible = (
value(self.visibility_conditions, self.env)
and self.coord_x is not None
and self.coord_x >= 0
and self.coord_y is not None
and self.coord_y >= 0
and (
self.var_dir is None
or self.direction is not None
and self.direction >= 0
)
)
@property
def char_rep(self) -> str:
"""Get the character representation of the player."""
return self._char_rep
class Player(Object):
"""Player class."""
@property
def char_rep(self) -> str:
"""Get the character representation of the player."""
up_value = int(self.dir_mapping["up"])
down_value = int(self.dir_mapping["down"])
left_value = int(self.dir_mapping["left"])
right_value = int(self.dir_mapping["right"])
if self.direction == up_value:
return "^"
if self.direction == down_value:
return "v"
if self.direction == left_value:
return "<"
if self.direction == right_value:
return ">"
return "?" # Unknown direction
def update(self, var: str, new_value: Union[int, None]) -> None:
"""
Update object property.
:param var: The variable that has been updated.
:param new_value: The updated value.
"""
# Call the parent update
super().update(var, new_value)
# Post-update hook that adds the player vars to the env.
self.env["player_x"] = self.coord_x
self.env["player_y"] = self.coord_y
self.env["player_direction"] = self.direction
# TODO rename all occurences of WOWM to WOP
class Wop(Object):
"""Wise old person class."""
def __init__(
self,
variables_env: dict[str, Any],
name: str,
wop: dict,
direction_dict: Optional[dict[str, Union[int, str]]] = None,
):
"""
Wop initialization.
:param variables_env: The environment to use to evaluate the conditions.
:param name: The name/label of the WOP.
:param wop: Dictionnary that describes the wop.
:param direction_dict: Dictionnary that maps string directions to integers.
"""
# Keyword conditions expansion
def expand_condition(cond: str) -> str:
"""Keyword expansion."""
cond = cond.strip()
if cond == "always":
return "True"
if cond == "near":
# TODO variable range, non constant position
return (
f"abs(player_x - {wop['x']}) <= 1"
f" and abs(player_y - {wop['y']}) <= 1"
)
return cond
visibility_conditions = expand_condition(wop["visibility_conditions"])
message_conditions = expand_condition(wop["message_conditions"])
# Object initialization
super().__init__(
variables_env,
str(wop["x"]),
str(wop["y"]),
char_rep="W",
direction_dict=direction_dict,
visibility_conditions=visibility_conditions,
)
# Infos
self.labels = wop["labels"]
self.message_conditions = message_conditions
self.message_list = wop["message"]
self.name = name
self.triggered = False
self.talks = False
def update(self, var: str, new_value: Union[int, None]) -> None:
"""
Update object property.
:param var: The variable that has been updated.
:param new_value: The updated value.
"""
if self.triggered:
return # TODO handle non oneshot wop
# Call parent's update
super().update(var, new_value)
# Post-update hook that updates the message condition
self.talks = self.visible and value(self.message_conditions, self.env)
......@@ -252,7 +252,7 @@ def generate_dict(file):
else:
# splitting by space and appending
[key, value] = info.split(" ", 1)
wowm_dict[wowm_index][key] = value
wowm_dict[wowm_index][key] = value.strip()
comments = strip_comments(file)
metadata = {
......@@ -427,7 +427,12 @@ def create_map(metadata, file_out):
"""Create a layer."""
# Scene layer
scene_layer = ET.SubElement(
root, "layer", id=str(index), name=name, width=str(width), height=str(height)
root,
"layer",
id=str(index),
name=name,
width=str(width),
height=str(height),
)
data = ET.SubElement(scene_layer, "data", encoding="csv")
# Conversion of the data into a csv string
......@@ -452,16 +457,3 @@ def create_map(metadata, file_out):
final_xml = ET.ElementTree(root)
ET.indent(final_xml, space=" ")
final_xml.write(file_out, encoding="UTF-8", xml_declaration=True)
def _test():
"""Local test."""
# json test
file = "test.c"
data = generate_dict(file)
dict_to_json(dict, "test")
# tmx map generation test
create_map(data, "level/resources/map.tmx")
if __name__ == "__main__":
_test()
"""Variables monitoring/expansion tools."""
import ast
from typing import Any, Optional
# pylint: disable=eval-used
def value(var: str, env: dict[str, Any]) -> Any:
"""Gets expression value."""
try:
return eval(var, {}, env)
except (KeyError, AttributeError, TypeError, NameError):
return None
def expand_expr(expr: str, alias: dict[str, str]) -> str:
"""
Substitute aliased variables.
:param expr: The expression to resolve.
:param alias: Dictionnary of aliases.
:return: The resolved expression.
"""
for var, inferior_var in alias.items():
expr = expr.replace(var, inferior_var)
return expr
# pylint: disable=invalid-name
class AstVarFinder(ast.NodeVisitor):
"""Ast walker that finds variables."""
def __init__(self):
"""AstVarFinder initialization."""
self.variables = set()
def visit_Call(self, node):
"""Visits a Call node. Skips the function name"""
for arg in node.args:
self.visit(arg)
for keyword in node.keywords:
self.visit(keyword)
def visit_Name(self, node):
"""Visits a Name node. Adds the ids to a list"""
self.variables.add(node.id)
def find_variables(self, node):
"""Finds all the variables from a given node."""
self.variables = set()
self.visit(node)
# We only need one walker
_walker = AstVarFinder()
def list_variables(expr: str, alias: Optional[dict[str, str]] = None) -> set[str]:
"""
Returns the variables that are used in a given expression.
:param expr: The expression to parse.
:return: The variables that are used in the expression.
"""
if alias is not None:
expr = expand_expr(expr, alias)
tree = ast.parse(expr, mode="eval")
_walker.find_variables(tree)
return _walker.variables
......@@ -9,8 +9,7 @@ from level.level import level
# List that stores and the values of the variable that we watch
VALUES = []
def event_callback(new_value: int) -> None:
def event_callback(var: str, new_value: int) -> None:
"""
Append the new value to the global list
when a value_change event is received.
......
......@@ -43,7 +43,7 @@
"hi, stay still. you are half way to the end!",
":-)"
],
"message_conditions": " player_x == 5 and player_y == 3",
"message_conditions": "player_x == 5 and player_y == 3",
"triggered": false,
"visibility_conditions": "player_x == 5 and player_y == 3",
"visible": false,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment