Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 48dd351f authored by Kevin Pouget's avatar Kevin Pouget
Browse files

start working

parents
No related branches found
No related tags found
No related merge requests found
import pkgutil, importlib, difflib # standard lib packages
import sys, subprocess, shlex, fcntl, os, time, errno # standard lib packages
import logging; log = logging.getLogger(__name__)
import mcgdb
mcGDB_PATH = mcgdb.__path__[0]
EXPECTED_FILE_PREFIX = "file://"
##############################################3
def all_subpackages(pkg_prefix, verbose, what_to_run, parent, prefix=""):
if not hasattr(parent, "__path__"):
print("ERROR: if this is mcgdb: {}, then you certainly have a *module* named 'mcgdb'. Please change it !".format(parent))
for _, modname, ispkg in pkgutil.walk_packages(parent.__path__, prefix="mcgdb.", onerror=lambda x : None):
if pkg_prefix and not modname.startswith(pkg_prefix):
continue
if ".testing" not in modname and not ispkg:
continue
mod = importlib.import_module(modname)
for what in what_to_run:
if not hasattr(mod, what): continue
run_module(verbose, modname, mod, getattr(mod, what))
def main(pkg_prefix=None, benchmark=True, test=True, verbose=False):
what_to_run = []
if test: what_to_run.append("test")
if benchmark: what_to_run.append("benchmark")
all_subpackages(pkg_prefix, verbose, what_to_run, mcgdb)
def run_benchmark(pkg_prefix=None, verbose=False):
GDB_Instance.CSV_mode = True
all_subpackages(pkg_prefix, verbose, ["benchmark"], mcgdb)
##############################################3
class CompilationError(Exception): pass
class TerminatedError(Exception): pass
class CSource:
def __init__(self, srcname):
self.srcname = srcname
self.path, _, self.binname = srcname.rpartition("/")
self.binname = self.binname.rpartition(".")[0]
if not self.path: self.path = "."
self._package = None
self.target_dir = None
@property
def package(self):
return self._package
@package.setter
def package(self, package):
self._package = package if hasattr(package, "__path__") else \
importlib.import_module(package.__package__)
self.src_dir = "{pkgpath}/{path}".format(pkgpath=self.package.__path__[0],
path=self.path)
self.target_dir = "{}/__binaries__/".format(self.src_dir)
def compile(self):
assert self.target_dir is not None
mkdir_p(self.target_dir)
bin = self.binary.replace(self.src_dir, "")
if bin.startswith("/"): bin = bin[1:]
cmd = "make -C {path} {bin} ".format(path=self.src_dir, bin=bin)
if 0 != os.system(cmd+" >/dev/null"):
raise CompilationError("Could not make {} ({})".format(self.binary, cmd))
@property
def binary(self):
assert self.target_dir is not None
return self.target_dir + self.binname
class FinishException(Exception):
pass
def run_module(verbose, modname, mod, do_that):
def run_once():
try:
gdb_instance = GDB_Instance(verbose, mod)
do_that(gdb_instance)
return True
except TerminatedError as e:
print(e)
except CompilationError as e:
print("ERROR: Could not compile ...")
print("ERROR: {}".format(e))
if not GDB_Instance.CSV_mode:
title = "### "+modname+" ###"
print("#" * len(title))
print(title)
print("#" * len(title))
try: run_once()
except FinishException: pass
else:
print("# "+modname)
GDB_Instance.CSV_header = True
run_once()
print("iteration") # finish the line
GDB_Instance.CSV_header = False
try:
for ite in range(GDB_Instance.CSV_REPEAT):
if not run_once():
break
print(ite) # finish the line
except FinishException:
pass
def expected_file(filename):
return EXPECTED_FILE_PREFIX + filename
class GDB_Output(list):
def __init__(self, command, out, err, correct):
list.__init__(self, [command, out, err, correct])
self.command = command
self.out = out
self.err = err
self.correct = correct
def print_all(self):
if self.err:
print(self.err)
if self.out:
print(self.out)
class GDB_Instance:
PROMPT = "(gdb) "
CSV_mode = False
CSV_header = False
CSV_REPEAT = 10
def __init__(self, verbose, package):
self.verbose = verbose
self._gdb = None
self.package = package
self._fout = None
self._fcmd = None
def start(self, source, no_mcgdb=False):
self.src = source
self.no_mcgdb = no_mcgdb
source.package = self.package
source.compile()
command_line = "gdb -nx"
self._gdb = subprocess.Popen(shlex.split(command_line),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Make stderr and stdout non-blocking.
for outfile in (self._gdb.stdout, self._gdb.stderr):
if outfile is not None:
fd = outfile.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self._fout = open("/tmp/gdb.out", "wb")
self._fcmd = open("/tmp/gdb.cmd", "wb")
out, err = self._read_til_prompt(None)
self.execute("set confirm off")
if not no_mcgdb:
self.execute("python sys.path.append('{}/..')".format(mcGDB_PATH))
self.execute("python import mcgdb; mcgdb.initialize()")
self.execute("file {}".format(source.binary))
def check_results(self, command, expected, results):
if expected.startswith(EXPECTED_FILE_PREFIX):
expected_filename = expected[len(EXPECTED_FILE_PREFIX):]
with open("{}/{}".format(self.src.src_dir, expected_filename)) as expected_file:
expected = "".join(expected_file.readlines())
return expected in results
def execute_many(self, commands, *args, **kwargs):
if not isinstance(commands, list):
commands = [commands]
return [self.execute(a_command, *args, **kwargs) for a_command in commands]
def execute(self, command, expected_results=None, expected_stderr=None, may_fail=False):
if GDB_Instance.CSV_header: return GDB_Output(command, "", "", True)
assert not isinstance(command, list)
out, err = self._read_til_prompt(command)
correct = all([
not expected_results or self.check_results(command, expected_results, out),
not expected_stderr or self.check_results(command, expected_results, err)])
if correct:
if self.verbose: print("PASSED")
elif may_fail:
print("EXPECTED FAILURE ({})".format(command))
correct = None
else:
print("FAILED {}".format(command))
if not correct and not may_fail and self.verbose:
for results, name in (out, "stdout"), (err, "stderr"):
if not results: continue
for line in difflib.context_diff(expected_results.split("\n"), results.split("\n"),
fromfile='Expected results', tofile='Actual results ({})'.format(name)):
print(line)
if not (out or err):
print("Expected results: (got nothing)")
print(expected_results)
#print(levenshtein_distance(expected, results))
return GDB_Output(command, out, err, correct)
def set_title(self, title):
if GDB_Instance.CSV_header:
print('"{}"'.format(title), end=" ")
elif GDB_Instance.CSV_mode:
pass
else:
print(title)
print("-" * len(title))
def save_py_value(self, name):
if GDB_Instance.CSV_header: return
output = self.execute("python print({})".format(name))
if GDB_Instance.CSV_mode:
print(output.out[:-1], end=", ", flush=True)
else:
output.print_all()
def save_value(self, name, format, unit):
if GDB_Instance.CSV_header: return
output = self.execute('printf "{format}{unit}\\n", {name}'.format(
name=name, format=format, unit=unit))
if GDB_Instance.CSV_mode:
print(output.out[:-1], end=", ", flush=True)
else:
output.print_all()
def _read_til_prompt(self, command):
if command is not None:
if not command.endswith("\n"): command = command + "\n"
for to_file in self._gdb.stdin, self._fout, self._fcmd:
if to_file is self._fout:
to_file.write(GDB_Instance.PROMPT.encode('ascii'))
to_file.write(command.encode('ascii'))
to_file.flush()
out_buffer = ''
err_buffer = ''
def read_utf8_buffer(buff):
bs = buff.read()
return bs.decode("utf-8") if bs else ""
while not out_buffer.endswith(GDB_Instance.PROMPT):
try:
self._gdb.poll()
if self._gdb.returncode is not None:
raise TerminatedError(self._gdb.returncode)
out_buffer += read_utf8_buffer(self._gdb.stdout)
err_buffer += read_utf8_buffer(self._gdb.stderr)
except IOError:
time.sleep(0.1)
except KeyboardInterrupt:
import pdb;pdb.set_trace()
print(out_buffer)
out_buffer = out_buffer[:-(len(GDB_Instance.PROMPT))].strip()
self._gdb.stderr.flush()
while True:
err = read_utf8_buffer(self._gdb.stderr)
if not len(err): break
err_buffer += err
if out_buffer:
if not out_buffer[-1] == '\n': out_buffer += '\n'
self._fout.write("--- OUT ---\n".encode("utf-8"))
self._fout.write(out_buffer.encode("utf-8"))
if err_buffer:
if not err_buffer[-1] == '\n': out_buffer += '\n'
self._fout.write("--- ERR ---\n".encode("utf-8"))
self._fout.write(err_buffer.encode("utf-8"))
if out_buffer or err_buffer:
self._fout.write("--- *** ---\n".encode("utf-8"))
self._fout.flush()
return out_buffer, err_buffer
def reset(self, hard=False):
self.execute("delete")
self.execute("kill")
if hard:
self.quit()
self.start(self.src, self.no_mcgdb)
def sync_outputs(self):
print("# Syncing ...")
i = 1
what = "nothing to start with"
while True:
self._gdb.stderr.flush()
buff = ""
while True:
b = self._gdb.stderr.read()
if b is None: break
buff += b.decode("utf-8")
if what in buff: break
what = "error{}".format(i)
res = self.execute(what).err
if what in res: break
time.sleep(0.3)
i += 1
def quit(self):
try:
self.execute("kill")
self.execute("quit")
self._gdb.terminate()
self._gdb.kill()
except TerminatedError as e:
return e.args[0]
finally:
self._fout.close()
self._fcmd.close()
##############################################3
def levenshtein_distance(first, second):
"""Find the Levenshtein distance between two strings."""
if len(first) > len(second):
first, second = second, first
if len(second) == 0:
return len(first)
first_length = len(first) + 1
second_length = len(second) + 1
distance_matrix = [[0] * second_length for x in range(first_length)]
for i in range(first_length):
distance_matrix[i][0] = i
for j in range(second_length):
distance_matrix[0][j]=j
for i in range(1, first_length):
for j in range(1, second_length):
deletion = distance_matrix[i-1][j] + 1
insertion = distance_matrix[i][j-1] + 1
substitution = distance_matrix[i-1][j-1]
if first[i-1] != second[j-1]:
substitution += 1
distance_matrix[i][j] = min(insertion, deletion, substitution)
return distance_matrix[first_length-1][second_length-1]
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST: pass
else: raise
##############################################3
if __name__ == "__main__":
main()
import testing
def run_tests(*args, **kwargs):
testing.main(*args, **kwargs)
def run_benchmark(*args, **kwargs):
testing.run_benchmark(*args, **kwargs)
import pkgutil, importlib, difflib # standard lib packages
import sys, subprocess, shlex, fcntl, os, time, errno # standard lib packages
import logging; log = logging.getLogger(__name__)
import mcgdb
mcGDB_PATH = mcgdb.__path__[0]
EXPECTED_FILE_PREFIX = "file://"
##############################################3
def all_subpackages(pkg_prefix, verbose, what_to_run, parent, prefix=""):
if not hasattr(parent, "__path__"):
print("ERROR: if this is mcgdb: {}, then you certainly have a *module* named 'mcgdb'. Please change it !".format(parent))
for _, modname, ispkg in pkgutil.walk_packages(parent.__path__, prefix="mcgdb.", onerror=lambda x : None):
if pkg_prefix and not modname.startswith(pkg_prefix):
continue
if ".testing" not in modname and not ispkg:
continue
mod = importlib.import_module(modname)
for what in what_to_run:
if not hasattr(mod, what): continue
run_module(verbose, modname, mod, getattr(mod, what))
def main(pkg_prefix=None, benchmark=True, test=True, verbose=False):
what_to_run = []
if test: what_to_run.append("test")
if benchmark: what_to_run.append("benchmark")
all_subpackages(pkg_prefix, verbose, what_to_run, mcgdb)
def run_benchmark(pkg_prefix=None, verbose=False):
GDB_Instance.CSV_mode = True
all_subpackages(pkg_prefix, verbose, ["benchmark"], mcgdb)
##############################################3
class CompilationError(Exception): pass
class TerminatedError(Exception): pass
class CSource:
def __init__(self, srcname):
self.srcname = srcname
self.path, _, self.binname = srcname.rpartition("/")
self.binname = self.binname.rpartition(".")[0]
if not self.path: self.path = "."
self._package = None
self.target_dir = None
@property
def package(self):
return self._package
@package.setter
def package(self, package):
self._package = package if hasattr(package, "__path__") else \
importlib.import_module(package.__package__)
self.src_dir = "{pkgpath}/{path}".format(pkgpath=self.package.__path__[0],
path=self.path)
self.target_dir = "{}/__binaries__/".format(self.src_dir)
def compile(self):
assert self.target_dir is not None
mkdir_p(self.target_dir)
bin = self.binary.replace(self.src_dir, "")
if bin.startswith("/"): bin = bin[1:]
cmd = "make -C {path} {bin} ".format(path=self.src_dir, bin=bin)
if 0 != os.system(cmd+" >/dev/null"):
raise CompilationError("Could not make {} ({})".format(self.binary, cmd))
@property
def binary(self):
assert self.target_dir is not None
return self.target_dir + self.binname
class FinishException(Exception):
pass
def run_module(verbose, modname, mod, do_that):
def run_once():
try:
gdb_instance = GDB_Instance(verbose, mod)
do_that(gdb_instance)
return True
except TerminatedError as e:
print(e)
except CompilationError as e:
print("ERROR: Could not compile ...")
print("ERROR: {}".format(e))
if not GDB_Instance.CSV_mode:
title = "### "+modname+" ###"
print("#" * len(title))
print(title)
print("#" * len(title))
try: run_once()
except FinishException: pass
else:
print("# "+modname)
GDB_Instance.CSV_header = True
run_once()
print("iteration") # finish the line
GDB_Instance.CSV_header = False
try:
for ite in range(GDB_Instance.CSV_REPEAT):
if not run_once():
break
print(ite) # finish the line
except FinishException:
pass
def expected_file(filename):
return EXPECTED_FILE_PREFIX + filename
class GDB_Output(list):
def __init__(self, command, out, err, correct):
list.__init__(self, [command, out, err, correct])
self.command = command
self.out = out
self.err = err
self.correct = correct
def print_all(self):
if self.err:
print(self.err)
if self.out:
print(self.out)
class GDB_Instance:
PROMPT = "(gdb) "
CSV_mode = False
CSV_header = False
CSV_REPEAT = 10
def __init__(self, verbose, package):
self.verbose = verbose
self._gdb = None
self.package = package
self._fout = None
self._fcmd = None
def start(self, source, no_mcgdb=False):
self.src = source
self.no_mcgdb = no_mcgdb
source.package = self.package
source.compile()
command_line = "gdb -nx"
self._gdb = subprocess.Popen(shlex.split(command_line),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Make stderr and stdout non-blocking.
for outfile in (self._gdb.stdout, self._gdb.stderr):
if outfile is not None:
fd = outfile.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self._fout = open("/tmp/gdb.out", "wb")
self._fcmd = open("/tmp/gdb.cmd", "wb")
out, err = self._read_til_prompt(None)
self.execute("set confirm off")
if not no_mcgdb:
self.execute("python sys.path.append('{}/..')".format(mcGDB_PATH))
self.execute("python import mcgdb; mcgdb.initialize()")
self.execute("file {}".format(source.binary))
def check_results(self, command, expected, results):
if expected.startswith(EXPECTED_FILE_PREFIX):
expected_filename = expected[len(EXPECTED_FILE_PREFIX):]
with open("{}/{}".format(self.src.src_dir, expected_filename)) as expected_file:
expected = "".join(expected_file.readlines())
return expected in results
def execute_many(self, commands, *args, **kwargs):
if not isinstance(commands, list):
commands = [commands]
return [self.execute(a_command, *args, **kwargs) for a_command in commands]
def execute(self, command, expected_results=None, expected_stderr=None, may_fail=False):
if GDB_Instance.CSV_header: return GDB_Output(command, "", "", True)
assert not isinstance(command, list)
out, err = self._read_til_prompt(command)
correct = all([
not expected_results or self.check_results(command, expected_results, out),
not expected_stderr or self.check_results(command, expected_results, err)])
if correct:
if self.verbose: print("PASSED")
elif may_fail:
print("EXPECTED FAILURE ({})".format(command))
correct = None
else:
print("FAILED {}".format(command))
if not correct and not may_fail and self.verbose:
for results, name in (out, "stdout"), (err, "stderr"):
if not results: continue
for line in difflib.context_diff(expected_results.split("\n"), results.split("\n"),
fromfile='Expected results', tofile='Actual results ({})'.format(name)):
print(line)
if not (out or err):
print("Expected results: (got nothing)")
print(expected_results)
#print(levenshtein_distance(expected, results))
return GDB_Output(command, out, err, correct)
def set_title(self, title):
if GDB_Instance.CSV_header:
print('"{}"'.format(title), end=" ")
elif GDB_Instance.CSV_mode:
pass
else:
print(title)
print("-" * len(title))
def save_py_value(self, name):
if GDB_Instance.CSV_header: return
output = self.execute("python print({})".format(name))
if GDB_Instance.CSV_mode:
print(output.out[:-1], end=", ", flush=True)
else:
output.print_all()
def save_value(self, name, format, unit):
if GDB_Instance.CSV_header: return
output = self.execute('printf "{format}{unit}\\n", {name}'.format(
name=name, format=format, unit=unit))
if GDB_Instance.CSV_mode:
print(output.out[:-1], end=", ", flush=True)
else:
output.print_all()
def _read_til_prompt(self, command):
if command is not None:
if not command.endswith("\n"): command = command + "\n"
for to_file in self._gdb.stdin, self._fout, self._fcmd:
if to_file is self._fout:
to_file.write(GDB_Instance.PROMPT.encode('ascii'))
to_file.write(command.encode('ascii'))
to_file.flush()
out_buffer = ''
err_buffer = ''
def read_utf8_buffer(buff):
bs = buff.read()
return bs.decode("utf-8") if bs else ""
while not out_buffer.endswith(GDB_Instance.PROMPT):
try:
self._gdb.poll()
if self._gdb.returncode is not None:
raise TerminatedError(self._gdb.returncode)
out_buffer += read_utf8_buffer(self._gdb.stdout)
err_buffer += read_utf8_buffer(self._gdb.stderr)
except IOError:
time.sleep(0.1)
except KeyboardInterrupt:
import pdb;pdb.set_trace()
print(out_buffer)
out_buffer = out_buffer[:-(len(GDB_Instance.PROMPT))].strip()
self._gdb.stderr.flush()
while True:
err = read_utf8_buffer(self._gdb.stderr)
if not len(err): break
err_buffer += err
if out_buffer:
if not out_buffer[-1] == '\n': out_buffer += '\n'
self._fout.write("--- OUT ---\n".encode("utf-8"))
self._fout.write(out_buffer.encode("utf-8"))
if err_buffer:
if not err_buffer[-1] == '\n': out_buffer += '\n'
self._fout.write("--- ERR ---\n".encode("utf-8"))
self._fout.write(err_buffer.encode("utf-8"))
if out_buffer or err_buffer:
self._fout.write("--- *** ---\n".encode("utf-8"))
self._fout.flush()
return out_buffer, err_buffer
def reset(self, hard=False):
self.execute("delete")
self.execute("kill")
if hard:
self.quit()
self.start(self.src, self.no_mcgdb)
def sync_outputs(self):
print("# Syncing ...")
i = 1
what = "nothing to start with"
while True:
self._gdb.stderr.flush()
buff = ""
while True:
b = self._gdb.stderr.read()
if b is None: break
buff += b.decode("utf-8")
if what in buff: break
what = "error{}".format(i)
res = self.execute(what).err
if what in res: break
time.sleep(0.3)
i += 1
def quit(self):
try:
self.execute("kill")
self.execute("quit")
self._gdb.terminate()
self._gdb.kill()
except TerminatedError as e:
return e.args[0]
finally:
self._fout.close()
self._fcmd.close()
##############################################3
def levenshtein_distance(first, second):
"""Find the Levenshtein distance between two strings."""
if len(first) > len(second):
first, second = second, first
if len(second) == 0:
return len(first)
first_length = len(first) + 1
second_length = len(second) + 1
distance_matrix = [[0] * second_length for x in range(first_length)]
for i in range(first_length):
distance_matrix[i][0] = i
for j in range(second_length):
distance_matrix[0][j]=j
for i in range(1, first_length):
for j in range(1, second_length):
deletion = distance_matrix[i-1][j] + 1
insertion = distance_matrix[i][j-1] + 1
substitution = distance_matrix[i-1][j-1]
if first[i-1] != second[j-1]:
substitution += 1
distance_matrix[i][j] = min(insertion, deletion, substitution)
return distance_matrix[first_length-1][second_length-1]
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST: pass
else: raise
##############################################3
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment