From 22b0af35b38700fcbc5a5a3945d4ba5e2323c207 Mon Sep 17 00:00:00 2001 From: BIGAUD Nathan <nathan.bigaud@inria.fr> Date: Fri, 24 Mar 2023 12:12:38 +0100 Subject: [PATCH] Debugging and documenting --- declearn/quickrun/__init__.py | 20 ++ declearn/quickrun/_run.py | 148 --------------- declearn/quickrun/_split_data.py | 68 ++++--- declearn/quickrun/run.py | 310 +++++++++++++++++++++++++++++++ examples/quickrun/config.toml | 6 +- 5 files changed, 380 insertions(+), 172 deletions(-) create mode 100644 declearn/quickrun/__init__.py delete mode 100644 declearn/quickrun/_run.py create mode 100644 declearn/quickrun/run.py diff --git a/declearn/quickrun/__init__.py b/declearn/quickrun/__init__.py new file mode 100644 index 00000000..2bcb6b0f --- /dev/null +++ b/declearn/quickrun/__init__.py @@ -0,0 +1,20 @@ +# coding: utf-8 + +# Copyright 2023 Inria (Institut National de Recherche en Informatique +# et Automatique) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Script to quickly run example locally using declearn""" + +from .run import quickrun diff --git a/declearn/quickrun/_run.py b/declearn/quickrun/_run.py deleted file mode 100644 index 3348c794..00000000 --- a/declearn/quickrun/_run.py +++ /dev/null @@ -1,148 +0,0 @@ -# coding: utf-8 - -# Copyright 2023 Inria (Institut National de Recherche en Informatique -# et Automatique) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""TODO""" -import importlib -from glob import glob - -from declearn.communication import NetworkClientConfig, NetworkServerConfig -from declearn.dataset import InMemoryDataset -from declearn.main import FederatedClient, FederatedServer -from declearn.main.config import FLOptimConfig, FLRunConfig -from declearn.test_utils import make_importable -from declearn.utils import run_as_processes - -DEFAULT_FOLDER = "./examples/quickrun" - - -def _run_server( - model: str, - network: NetworkServerConfig, - optim: FLOptimConfig, - config: FLRunConfig, -) -> None: - """Routine to run a FL server, called by `run_declearn_experiment`.""" - server = FederatedServer(model, network, optim) - server.run(config) - - -def _parse_data_folder(folder: str): - """Utils parsing a data folder following a standard format into a nested" - dictionnary""" - # Get data dir - data_folder = glob("data_*", root_dir=folder) - if len(data_folder) == 0: - raise ValueError( - f"No folder starting with 'data_' found in {folder}" - "Please store your data under a 'data_*' folder" - ) - if len(data_folder) > 1: - raise ValueError( - "More than one folder starting with 'data_' found" - f"in {folder}. Please store your data under a single" - "parent folder" - ) - data_folder = f"{folder}/{data_folder[0]}" - # Get clients dir - clients_folders = glob("client_*", root_dir=data_folder) - if len(clients_folders) == 0: - raise ValueError( - f"No folder starting with 'client_' found in {data_folder}" - "Please store your individual under client data under" - "a 'client_*' folder" - ) - clients = {c: {} for c in clients_folders} - # Get train and valid files - for c in clients.keys(): - path = f"{data_folder}/{c}/" - data_items = [ - "train_data", - "train_target", - "valid_data", - "valid_target", - ] - for d in data_items: - files = glob(f"{d}*", root_dir=path) - if len(files) != 1: - raise ValueError( - f"Could not find unique file named '{d}.*' in {path}" - ) - clients[c][d] = files[0] - - return clients - - -def _run_client( - network: str, - name: str, - paths: dict, -) -> None: - """Routine to run a FL client, called by `run_declearn_experiment`.""" - # Run the declearn FL client routine. - netwk = NetworkClientConfig.from_toml(network) - # Overwrite client name based on folder name - netwk.name = name - # Wrap train and validation data as Dataset objects. - train = InMemoryDataset( - paths.get("train_data"), - target=paths.get("train_target"), - expose_classes=True, - ) - valid = InMemoryDataset( - paths.get("valid_data"), - target=paths.get("valid_target"), - ) - client = FederatedClient(netwk, train, valid) - client.run() - - -def quickrun( - folder: str = None, -) -> None: - """Run a server and its clients using multiprocessing.""" - # default to the 101 example - if not folder: - folder = DEFAULT_FOLDER # TODO check data was run - # Parse toml file to ServerConfig and ClientConfig - toml = f"{folder}/config.toml" - ntk_server = NetworkServerConfig.from_toml(toml, False, "network_server") - optim = FLOptimConfig.from_toml(toml, False, "optim") - run = FLRunConfig.from_toml(toml, False, "run") - ntk_client = NetworkClientConfig.from_toml(toml, False, "network_client") - # get Model - module, name = f"{folder}/model.py", "MyModel" - mod = importlib.import_module(module) - model_cls = getattr(mod, name) - model = model_cls() - # Set up a (func, args) tuple specifying the server process. - p_server = (_run_server, (model, ntk_server, optim, run)) - # Get datasets and client_names from folder - client_dict = _parse_data_folder(folder) - # Set up the (func, args) tuples specifying client-wise processes. - p_client = [] - for name, data_dict in client_dict.items(): - client = (_run_client, (ntk_client, name, data_dict)) - p_client.append(client) - # Run each and every process in parallel. - success, outputs = run_as_processes(p_server, *p_client) - assert success, "The FL process failed:\n" + "\n".join( - str(exc) for exc in outputs if isinstance(exc, RuntimeError) - ) - - -if __name__ == "__main__": - quickrun() diff --git a/declearn/quickrun/_split_data.py b/declearn/quickrun/_split_data.py index a45e5c64..f1eff42d 100644 --- a/declearn/quickrun/_split_data.py +++ b/declearn/quickrun/_split_data.py @@ -1,5 +1,20 @@ # coding: utf-8 +# Copyright 2023 Inria (Institut National de Recherche en Informatique +# et Automatique) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """Script to split data into heterogeneous shards and save them. Available splitting scheme: @@ -20,7 +35,6 @@ instance sparse data import argparse import io -import json import os import re import textwrap @@ -33,8 +47,6 @@ import requests # type: ignore from declearn.dataset import load_data_array SOURCE_URL = "https://pjreddie.com/media/files" -DEFAULT_FOLDER = "./examples/quickrun/data" -# TODO remove duplicate with _run.py def load_mnist( @@ -174,7 +186,7 @@ def _split_biased( def split_data( - folder: str = DEFAULT_FOLDER, + folder: str, n_shards: int = 5, data: Optional[str] = None, target: Optional[Union[str, int]] = None, @@ -182,17 +194,32 @@ def split_data( perc_train: float = 0.8, seed: Optional[int] = None, ) -> None: - """Download and randomly split the MNIST dataset into shards. - #TODO + """Download and randomly split a dataset into shards. + + The resulting folder structure is : + folder/ + └─── data*/ + └─── client*/ + │ train_data.* - training data + │ train_target.* - training labels + │ valid_data.* - validation data + │ valid_target.* - validation labels + └─── client*/ + │ ... + Parameters ---------- folder: str Path to the folder where to export shard-wise files. n_shards: int - Number of shards between which to split the MNIST training data. + Number of shards between which to split the data. data: str or None, default=None - Optional path to a folder where to find or download the raw MNIST - data. If None, use a temporary folder. + Optional path to a folder where to find the data. + If None, default to the MNIST example. + target: str or int or None, default=None + If str, path to the labels file to import. If int, column of + the data file to be used as labels. Required if data is not None, + ignored if data is None. scheme: {"iid", "labels", "biased"}, default="iid" Splitting scheme to use. In all cases, shards contain mutually- exclusive samples and cover the full raw training data. @@ -201,13 +228,11 @@ def split_data( with mutually-exclusive target classes. - If "biased", split the dataset through random sampling according to a shard-specific random labels distribution. + perc_train: float, default= 0.8 + Train/validation split in each client dataset, must be in the + ]0,1] range. seed: int or None, default=None Optional seed to the RNG used for all sampling operations. - use_csv: bool, default=False - Whether to export shard-wise csv files rather than pairs of .npy - files. This uses twice as much disk space and requires using the - `load_mnist_from_csv` function to reload instead of `numpy.load` - but is mandatory to have compatibility with the Fed-BioMed API. """ # Select the splitting function to be used. if scheme == "iid": @@ -221,20 +246,22 @@ def split_data( # Set up the RNG, download the raw dataset and split it. rng = np.random.default_rng(seed) inputs, labels = load_data(data, target) - os.makedirs(folder, exist_ok=True) print(f"Splitting data into {n_shards} shards using the {scheme} scheme") split = func(inputs, labels, n_shards, rng) # Export the resulting shard-wise data to files. + folder = os.path.join(folder, f"data_{scheme}") def np_save(data, i, name): - np.save(os.path.join(folder, f"client_{i}/{name}.npy"), data) + data_dir = os.path.join(folder, f"client_{i}") + os.makedirs(data_dir, exist_ok=True) + np.save(os.path.join(data_dir, f"{name}.npy"), data) for i, (inp, tgt) in enumerate(split): if not perc_train: np_save(inp, i, "train_data") np_save(tgt, i, "train_target") else: - if ~(perc_train <= 1.0) or ~(perc_train > 0.0): + if perc_train > 1.0 or perc_train < 0.0: raise ValueError("perc_train should be a float in ]0,1]") n_train = round(len(inp) * perc_train) t_inp, t_tgt = inp[:n_train], tgt[:n_train] @@ -248,12 +275,7 @@ def split_data( def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace: """Set up and run a command-line arguments parser.""" usage = """ - Download and split MNIST data into heterogeneous shards. - - This script automates the random splitting of the MNIST digits- - recognition images dataset's 60k training samples into shards, - based on various schemes. Shards contain mutually-exclusive - samples and cover the full raw dataset. + Download and split data into heterogeneous shards. The implemented schemes are the following: * "iid": diff --git a/declearn/quickrun/run.py b/declearn/quickrun/run.py new file mode 100644 index 00000000..c6562bbd --- /dev/null +++ b/declearn/quickrun/run.py @@ -0,0 +1,310 @@ +# coding: utf-8 + +# Copyright 2023 Inria (Institut National de Recherche en Informatique +# et Automatique) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Script to quickly run example locally using declearn. + +The script requires to be provided with the path to a folder containing: + +* A declearn model +* A TOML file with all the elements required to configurate an FL experiment +* A data folder, structured in a specific way + +If not provided with this, the script defaults to the MNIST example provided +by declearn in `declearn.example.quickrun`. + +The script then locally runs the FL experiment as layed out in the TOML file, +using privided model and data, and stores its result in the same folder. +""" + +import argparse +import importlib +import os +import re +import textwrap +from pathlib import Path +from typing import Any, Dict, List, Optional + +from declearn.communication import NetworkClientConfig, NetworkServerConfig +from declearn.dataset import InMemoryDataset +from declearn.main import FederatedClient, FederatedServer +from declearn.main.config import FLOptimConfig, FLRunConfig +from declearn.test_utils import make_importable +from declearn.utils import run_as_processes + +__all__ = ["quickrun"] + +DEFAULT_FOLDER = "./examples/quickrun" + +# Perform local imports. +# pylint: disable=wrong-import-order, wrong-import-position +with make_importable(os.path.dirname(__file__)): + from _split_data import split_data +# pylint: enable=wrong-import-order, wrong-import-position + + +def _run_server( + model: str, + network: NetworkServerConfig, + optim: FLOptimConfig, + config: FLRunConfig, +) -> None: + """Routine to run a FL server, called by `run_declearn_experiment`.""" + server = FederatedServer(model, network, optim) + server.run(config) + + +def parse_data_folder(folder: str) -> Dict: + """Utils parsing a data folder following a standard format into a nested" + dictionnary. + + The expected format is : + + folder/ + └─── data*/ + └─── client*/ + │ train_data.* - training data + │ train_target.* - training labels + │ valid_data.* - validation data + │ valid_target.* - validation labels + └─── client*/ + │ ... + """ + # Get data dir + gen_folders = Path(folder).glob("data*") + data_folder = next(gen_folders, False) + if not data_folder: + raise ValueError( + f"No folder starting with 'data' found in {folder}. " + "Please store your data under a 'data_*' folder" + ) + if next(gen_folders, False): + raise ValueError( + "More than one folder starting with 'data' found" + f"in {folder}. Please store your data under a single" + "parent folder" + ) + # Get clients dir + gen_folders = data_folder.glob("client*") # type: ignore + first_client = next(gen_folders, False) + if not first_client: + raise ValueError( + f"No folder starting with 'client' found in {data_folder}. " + "Please store your individual under client data under" + "a 'client*' folder" + ) + clients = {str(first_client).rsplit("/", 1)[-1]: {}} + while client := next(gen_folders, False): + clients[str(client).rsplit("/", 1)[-1]] = {} + # Get train and valid files + data_items = [ + "train_data", + "train_target", + "valid_data", + "valid_target", + ] + for client, files in clients.items(): + for d in data_items: + gen_file = Path(data_folder / client).glob(f"{d}*") # type: ignore + file = next(gen_file, False) + if not file: + raise ValueError( + f"Could not find a file named '{d}.*' in {client}" + ) + if next(gen_file, False): + raise ValueError( + f"Found more than one file named '{d}.*' in {client}" + ) + files[d] = str(file) + + return clients + + +def _run_client( + network: str, + name: str, + paths: dict, +) -> None: + """Routine to run a FL client, called by `run_declearn_experiment`.""" + # Run the declearn FL client routine. + netwk = NetworkClientConfig.from_toml(network) + # Overwrite client name based on folder name + netwk.name = name + # Wrap train and validation data as Dataset objects. + train = InMemoryDataset( + paths.get("train_data"), + target=paths.get("train_target"), + expose_classes=True, + ) + valid = InMemoryDataset( + paths.get("valid_data"), + target=paths.get("valid_target"), + ) + client = FederatedClient(netwk, train, valid) + client.run() + + +def quickrun( + folder: Optional[str] = None, + **kwargs: Any, +) -> None: + """Run a server and its clients using multiprocessing. + + The kwargs are the arguments expected by split_data, + see [the documentation][declearn.quickrun._split_data] + """ + # default to the mnist example + if not folder: + folder = DEFAULT_FOLDER + folder = os.path.abspath(folder) + # Get datasets and client_names from folder + try: + client_dict = parse_data_folder(folder) + except ValueError: + split_data(folder, **kwargs) + client_dict = parse_data_folder(folder) + # Parse toml file to ServerConfig and ClientConfig + toml = f"{folder}/config.toml" + ntk_server = NetworkServerConfig.from_toml(toml, False, "network_server") + optim = FLOptimConfig.from_toml(toml, False, "optim") + run = FLRunConfig.from_toml(toml, False, "run") + ntk_client = NetworkClientConfig.from_toml(toml, False, "network_client") + # get Model + name = "MyModel" + with make_importable(folder): + mod = importlib.import_module("model") + model_cls = getattr(mod, name) + model = model_cls + # Set up a (func, args) tuple specifying the server process. + p_server = (_run_server, (model, ntk_server, optim, run)) + # Set up the (func, args) tuples specifying client-wise processes. + p_client = [] + for name, data_dict in client_dict.items(): + client = (_run_client, (ntk_client, name, data_dict)) + p_client.append(client) + # Run each and every process in parallel. + success, outputs = run_as_processes(p_server, *p_client) + assert success, "The FL process failed:\n" + "\n".join( + str(exc) for exc in outputs if isinstance(exc, RuntimeError) + ) + + +def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace: + """Set up and run a command-line arguments parser.""" + usage = """ + Quickly run an example locally using declearn. + The script requires to be provided with the path to a folder + containing: + * A declearn model + * A TOML file with all the elements required to configurate an FL + experiment + * A data folder, structured in a specific way + + If not provided with this, the script defaults to the MNIST example + provided by declearn in `declearn.example.quickrun`. + + Once launched, this script splits data into heterogeneous shards. It + then locally runs the FL experiment as layed out in the TOML file, + using privided model and data, and stores its result in the same folder. + + The implemented schemes are the following: + * "iid": + Split the dataset through iid random sampling. + * "labels": + Split the dataset into shards that hold all samples + that have mutually-exclusive target classes. + * "biased": + Split the dataset through random sampling according + to a shard-specific random labels distribution. + """ + usage = re.sub("\n *(?=[a-z])", " ", textwrap.dedent(usage)) + parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter, + usage=re.sub("- ", "-", usage), + ) + parser.add_argument( + "--n_shards", + type=int, + default=5, + help="Number of shards between which to split the data.", + ) + parser.add_argument( + "--root", + default=None, + dest="folder", + help="Path to the root folder where to export data.", + ) + parser.add_argument( + "--data_path", + default=None, + dest="data", + help="Path to the data to be split", + ) + parser.add_argument( + "--target_path", + default=None, + dest="target", + help="Path to the labels to be split", + ) + schemes_help = """ + Splitting scheme(s) to use, among {"iid", "labels", "biased"}. + If this argument is not specified, all "iid" is used. + See details above on the schemes' definition. + """ + parser.add_argument( + "--scheme", + action="append", + choices=["iid", "labels", "biased"], + default=["iid"], + dest="schemes", + nargs="+", + help=textwrap.dedent(schemes_help), + ) + parser.add_argument( + "--train_split", + default=0.8, + dest="perc_train", + type=float, + help="What proportion of the data to use for training vs validation", + ) + parser.add_argument( + "--seed", + default=20221109, + dest="seed", + type=int, + help="RNG seed to use (default: 20221109).", + ) + return parser.parse_args(args) + + +def main(args: Optional[List[str]] = None) -> None: + """Quikcrun based on commandline-input arguments.""" + cmdargs = parse_args(args) + for scheme in cmdargs.schemes: + quickrun( + folder=cmdargs.folder, + n_shards=cmdargs.n_shards, + data=cmdargs.data, + target=cmdargs.target, + scheme=scheme, + perc_train=cmdargs.perc_train, + seed=cmdargs.seed, + ) + + +if __name__ == "__main__": + main() diff --git a/examples/quickrun/config.toml b/examples/quickrun/config.toml index 21eef2d4..dd45f263 100644 --- a/examples/quickrun/config.toml +++ b/examples/quickrun/config.toml @@ -14,7 +14,7 @@ server_opt = 1.0 [optim.client_opt] lrate = 0.001 - regularizers = ["lasso", {alpha = 0.1}] + regularizers = [["lasso", {alpha = 0.1}]] [run] rounds = 10 @@ -27,5 +27,9 @@ rounds = 10 batch_size = 48 drop_remainder = false + [run.evaluate] + batch_size = 128 + + -- GitLab