Mentions légales du service

Skip to content
Snippets Groups Projects
Verified Commit 7937bee7 authored by ANDREY Paul's avatar ANDREY Paul
Browse files

Enable specifying message-related timeouts as floats.

parent bbe43fba
No related branches found
No related tags found
1 merge request!62Revise network communication backend and message-parsing logic.
......@@ -287,13 +287,13 @@ class NetworkClient(metaclass=abc.ABCMeta):
async def recv_message(
self,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> SerializedMessage:
"""Await a message from the server, with optional timeout.
Parameters
----------
timeout: int or None, default=None
timeout: float or None, default=None
Optional timeout delay, after which the server will send
a timeout notification to this client if no message is
available for it.
......@@ -342,7 +342,7 @@ class NetworkClient(metaclass=abc.ABCMeta):
async def check_message(
self,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> SerializedMessage:
"""Await a message from the server, with optional timeout.
......
......@@ -200,7 +200,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
self,
min_clients: int = 1,
max_clients: Optional[int] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Wait for clients to register for training, with given criteria.
......@@ -212,7 +212,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
required - once reached, registration will be closed.
max_clients: int or None, default=None
Maximum number of clients authorized to register.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum waiting time (in seconds) beyond which
to close registration and either return or raise.
......@@ -228,7 +228,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
self,
message: Message,
client: str,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Send a message to a given client and wait for it to be collected.
......@@ -238,7 +238,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
Message instance that is to be delivered to the client.
client: str
Identifier of the client to whom the message is addressed.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum delay (in seconds) beyond which to stop
waiting for collection and raise an asyncio.TimeoutError.
......@@ -253,7 +253,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
async def send_messages(
self,
messages: Mapping[str, Message],
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Send messages to an ensemble of clients and await their collection.
......@@ -261,7 +261,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
----------
messages: dict[str, Message]
Dict mapping client names to the messages addressed to them.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum delay (in seconds) beyond which to stop
waiting for collection and raise an asyncio.TimeoutError.
......@@ -281,7 +281,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
self,
message: Message,
clients: Optional[Set[str]] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Send a message to an ensemble of clients and await its collection.
......@@ -292,7 +292,7 @@ class NetworkServer(metaclass=abc.ABCMeta):
clients: set[str] or None, default=None
Optional subset of registered clients, messages from
whom to wait for. If None, set to `self.client_names`.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum delay (in seconds) beyond which to stop
waiting for collection and raise an asyncio.TimeoutError.
......@@ -336,14 +336,14 @@ class NetworkServer(metaclass=abc.ABCMeta):
async def wait_for_messages_with_timeout(
self,
timeout: int,
timeout: float,
clients: Optional[Set[str]] = None,
) -> Tuple[Dict[str, SerializedMessage], List[str]]:
"""Wait for an ensemble of clients to have sent a message.
Parameters
----------
timeout: int or None, default=None
timeout: float or None, default=None
Maximum waiting delay (in seconds) before returning
received messages, even if some are missing.
clients: set[str] or None, default=None
......
......@@ -226,7 +226,9 @@ class MessagesHandler:
"""Handle a message-receiving request."""
# Set up the optional timeout mechanism.
timeout = message.timeout
countdown = max(timeout // self.heartbeat, 1) if timeout else -1
countdown = (
max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
)
# Wait for a message to be available or timeout to be reached.
name = self.registered_clients[context]
while (not self.outgoing_messages.get(name)) and countdown:
......@@ -286,7 +288,7 @@ class MessagesHandler:
self,
message: str,
client: str,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Post a message for a client and wait for it to be collected.
......@@ -296,7 +298,7 @@ class MessagesHandler:
Message string that is to be posted for the client to collect.
client: str
Name of the client to whom the message is addressed.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum delay (in seconds) beyond which to stop
waiting for collection and raise an asyncio.TimeoutError.
......@@ -313,7 +315,9 @@ class MessagesHandler:
"""
# Post the message. Wait for it to have been collected.
self.post_message(message, client)
countdown = max(timeout // self.heartbeat, 1) if timeout else -1
countdown = (
max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
)
while self.outgoing_messages.get(client, False) and countdown:
await asyncio.sleep(self.heartbeat)
countdown -= 1
......@@ -352,7 +356,7 @@ class MessagesHandler:
async def recv_message(
self,
client: str,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> str:
"""Wait for a message to be received from a given client.
......@@ -360,7 +364,7 @@ class MessagesHandler:
----------
client: str
Name of the client whose emitted message to check for.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum delay (in seconds) beyond which to stop
waiting for a message and raise an asyncio.TimeoutError.
......@@ -380,7 +384,9 @@ class MessagesHandler:
See the `check_message` method to synchronously check whether
a message from the client is available and return it or None.
"""
countdown = max(timeout // self.heartbeat, 1) if timeout else -1
countdown = (
max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
)
while countdown:
message = self.check_message(client)
if message is not None:
......@@ -407,7 +413,7 @@ class MessagesHandler:
self,
min_clients: int = 1,
max_clients: Optional[int] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Wait for clients to register for training, with given criteria.
......@@ -419,7 +425,7 @@ class MessagesHandler:
required - once reached, registration will be closed.
max_clients: int or None, default=None
Maximum number of clients authorized to register.
timeout: int or None, default=None
timeout: float or None, default=None
Optional maximum waiting time (in seconds) beyond which
to close registration and either return or raise.
......@@ -441,7 +447,7 @@ class MessagesHandler:
self,
min_clients: int = 1,
max_clients: Optional[int] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
) -> None:
"""Backend of `wait_for_clients` method, without safeguards."""
# Parse information on the required number of clients.
......@@ -455,7 +461,9 @@ class MessagesHandler:
max_clients = max(min_clients, max_clients)
# Wait for the required number of clients to have joined.
self.open_clients_registration()
countdown = max(timeout // self.heartbeat, 1) if timeout else -1
countdown = (
max(math.ceil(timeout / self.heartbeat), 1) if timeout else -1
)
while countdown and (len(self.registered_clients) < max_clients):
await asyncio.sleep(self.heartbeat)
countdown -= 1
......
......@@ -103,7 +103,7 @@ class Ping(ActionMessage):
class Recv(ActionMessage):
"""Client action message to get content from the server."""
timeout: Optional[int] = None
timeout: Optional[float] = None
@dataclasses.dataclass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment