Module ifra.messenger
Expand source code
from copy import deepcopy, copy
from pathlib import Path
from time import time, sleep
from typing import Union
from json import load
from transparentpath import TransparentPath
import logging
logger = logging.getLogger(__name__)
def get_lock(path: TransparentPath, timeout: int = 5, exists: str = "wait") -> TransparentPath:
lockfile = path.append(".lock")
if not lockfile.parent.is_dir():
lockfile.parent.mkdir(parents=True)
lockfile.touch()
return lockfile
t = time()
while lockfile.is_file():
if exists != "wait":
raise ValueError(
f"Lock file '{lockfile}' exists. "
"This can indicate a crash of a previous execution, or the fact that two"
"sessions are running at the same time. Delete this file and restart."
)
sleep(0.5)
if time() - t > timeout:
raise FileExistsError(f"File '{lockfile}' exists : could not lock on corresponding file.")
lockfile.touch()
return lockfile
class Emitter:
"""Instance to send messages to a receiver. It uses a json file to write messages.
Uses the same logic than `ifra.configs.Config` to handle the json file that is read or written by
`ifra.node.Node` and `ifra.central_server.NodeGate`.
Only messages which names are in `ifra.messenger.Emitter.DEFAULT_MESSAGES` are allowed.
Attributes
----------
path: TransparentPath
The path to the json file containing the messages. Will be casted into a transparentpath if a str is
given, so make sure that if you pass a string that should be local, you did not set a global file system.
If it does not exist, it is created with the default messages present in
`ifra.messenger.Emitter.DEFAULT_MESSAGES`. If it exists and it indicates that the corresponding object is
doint something, will raise an error. It it exists and says taht the corresponding object is doing nothing,
overwrites the file.
messages: dict
A dictionnary of messages
Messages
--------
error: str
Set by the emitting object error occures. Will contain any error message raised.
doing: str
Name of the function currently being run by the emitting object
"""
DEFAULT_MESSAGES = {"error": None, "doing": None}
"""Overload in daughter classes."""
def __init__(self, path: Union[str, Path, TransparentPath]):
"""
Parameters
----------
path: Union[str, Path, TransparentPath]
The path to the json file containing the messages. Will be casted into a transparentpath if a str is
given, so make sure that if you pass a str that should be local, you did not set a global file system.
"""
if type(path) == str:
path = TransparentPath(path)
if not path.suffix == ".json":
raise ValueError("Emitter path must be a json")
lockfile = get_lock(path, exists="raise")
self.path = path
if self.path.is_file():
try:
if hasattr(self.path, "read"):
self.messages = self.path.read()
else:
with open(self.path) as opath:
self.messages = load(opath)
for key in self.messages:
if self.messages[key] == "":
self.messages[key] = None
lockfile.rm(absent="ignore")
except Exception as e:
lockfile.rm(absent="ignore")
raise e
else:
self.messages = {"doing": None, "error": None}
if self.doing is not None:
raise ValueError(
f"Message file '{path}' existed at Emitter creation and had 'doing={self.doing}'. "
f"This can indicate a crash of a previous execution, or the fact that two"
f"sessions are running at the same time. Delete this file and restart."
)
lockfile.rm(absent="ignore")
self.reset_messages()
def __getstate__(self):
return self.__dict__
def __setstate__(self, d: dict):
for item in d:
setattr(self, item, d[item])
def __getattr__(self, item):
if item == "messages":
raise ValueError("NodeMessenger object's 'messages' not set")
if item == "path":
raise ValueError("NodeMessenger object's 'path' not set")
if item not in self.messages:
raise ValueError(f"No messages named '{item}' was found")
return self.messages[item]
def __setattr__(self, item, value):
if item == "messages" or item == "path":
super().__setattr__(item, value)
return
if item not in self.messages and item not in self.__class__.DEFAULT_MESSAGES:
raise ValueError(f"Message named '{item}' is not allowed")
self.messages[item] = value
self.save()
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
setattr(result, "messages", deepcopy(self.messages, memo))
return result
def send(self, **kwargs):
"""Sends several messages at once. Keys in kwargs must be present in `ifra.messenger.Emitter.DEFAULT_MESSAGES`
"""
for key in kwargs:
if key not in self.messages and key not in self.__class__.DEFAULT_MESSAGES:
raise ValueError(f"Message named '{key}' is not allowed")
self.messages[key] = kwargs[key]
self.save()
def save(self) -> None:
"""Saves the current messages into the file it used to load."""
to_save = copy(self.messages)
lockfile = get_lock(self.path)
try:
for key in to_save:
if key not in self.__class__.DEFAULT_MESSAGES:
raise ValueError(f"Forbidden message '{key}'")
if to_save[key] is None:
# noinspection PyTypeChecker
to_save[key] = ""
self.path.write(to_save)
lockfile.rm(absent="ignore")
except Exception as e:
lockfile.rm(absent="ignore")
raise e
def reset_messages(self):
"""Reset messages to their default values specified in `ifra.messenger.Emitter.DEFAULT_MESSAGES`"""
self.messages = copy(self.__class__.DEFAULT_MESSAGES)
self.save()
def rm(self):
"""Removes the Emitter's json file."""
lockfile = get_lock(self.path)
self.path.rm(absent="ignore")
lockfile.rm()
class Receiver:
"""Instance to read messages from an emitter. It uses a json file to read messages.
Uses the same logic than `ifra.configs.Config` to handle the json file.
Only messages which names are in `ifra.messenger.Receiver.DEFAULT_MESSAGES` are allowed.
Attributes
----------
path: TransparentPath
The path to the json file containing the messages. Will be casted into a transparentpath if a str is
given, so make sure that if you pass a string that should be local, you did not set a global file system.
If it does not exist, Receiver waits until the file exists, or raises TimeoutError if it waited more than
`ifra.messenger.Receiver.timeout_when_missing` seconds (default is 600s).
messages: dict
A dictionnary of messages
"""
DEFAULT_MESSAGES = {}
"""Overload in daughter classes. Must match the `ifra.messenger.Emitter.DEFAULT_MESSAGES` of the emitter that
will write to the file the receiver will read."""
timeout_when_missing = 600
"""How long the messenger is supposed to wait for its file to come back when using
`ifra.messenger.NodeMessenger.get_latest_messages`"""
def __init__(self, path: Union[str, Path, TransparentPath], wait: bool = True):
"""
Parameters
----------
path: Union[str, Path, TransparentPath]
The path to the json file containing the messages. Will be casted into a transparentpath if a str is
given, so make sure that if you pass a str that should be local, you did not set a global file system.
wait: bool
If True, will wait for 'path' to point to an existing file for `ifra.messenger.Emitter.DEFAULT_MESSAGES`
seconds at Receiver creation if it does not exist. Else, messages will be those in
`ifra.messenger.Emitter.DEFAULT_MESSAGES`
"""
if type(path) == str:
path = TransparentPath(path)
if not path.suffix == ".json":
raise ValueError("Messenger path must be a json")
self.path = path
self.messages = {}
self.new = True
self.get_latest_messages(wait)
def __getstate__(self):
return self.__dict__
def __setstate__(self, d: dict):
for item in d:
setattr(self, item, d[item])
def __getattr__(self, item):
if item == "messages":
raise ValueError("Messenger object's 'messages' not set")
if item == "path":
raise ValueError("Messenger object's 'path' not set")
if item not in self.messages:
raise ValueError(f"No messages named '{item}' was found")
return self.messages[item]
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
setattr(result, "messages", deepcopy(self.messages, memo))
return result
def get_latest_messages(self, wait: bool = False) -> bool:
"""Gets latest messages by reading the messages json file. If file does not exist and it is the first time we
read it (object init), wait for it for at most `ifra.messenger.Receiver.timeout_when_missing` seconds then
raises TimeoutError. If file does not exist and it is not the first time we read it, returns False. If
messages are read, returns True"""
first_message = True
if self.__class__.timeout_when_missing is None:
raise ValueError("'timeout_when_missing' can not be None")
t = time()
while not self.path.is_file() and time() - t < self.timeout_when_missing:
if not self.new:
logger.warning(f"Message file {self.path} disapread. No message received.")
return False
if not wait:
logger.warning(
f"Message file {self.path} does not exist. No message received, using defaults messages."
)
self.messages = copy(self.DEFAULT_MESSAGES)
return False
if first_message:
first_message = False
logger.info(f"Message file {self.path} not here yet. Waiting for it to arrive...")
sleep(1)
if not self.path.is_file():
raise TimeoutError(f"Could not find message file '{self.path}' in less that "
f"{self.__class__.timeout_when_missing} seconds. Aborting.")
if first_message is False:
logger.info("...message file found. Resuming.")
self.new = False
lockfile = get_lock(self.path)
try:
if hasattr(self.path, "read"):
self.messages = self.path.read()
else:
with open(self.path) as opath:
self.messages = load(opath)
for key in self.messages:
if self.messages[key] == "":
self.messages[key] = None
lockfile.rm(absent="ignore")
except Exception as e:
lockfile.rm(absent="ignore")
raise e
return True
def reset_messages(self):
"""Reset messages to their default values specified in `ifra.messenger.Receiver.DEFAULT_MESSAGES`"""
self.messages = copy(self.DEFAULT_MESSAGES)
self.save()
Functions
def get_lock(path: transparentpath.gcsutils.transparentpath.TransparentPath, timeout: int = 5, exists: str = 'wait') ‑> transparentpath.gcsutils.transparentpath.TransparentPath
-
Expand source code
def get_lock(path: TransparentPath, timeout: int = 5, exists: str = "wait") -> TransparentPath: lockfile = path.append(".lock") if not lockfile.parent.is_dir(): lockfile.parent.mkdir(parents=True) lockfile.touch() return lockfile t = time() while lockfile.is_file(): if exists != "wait": raise ValueError( f"Lock file '{lockfile}' exists. " "This can indicate a crash of a previous execution, or the fact that two" "sessions are running at the same time. Delete this file and restart." ) sleep(0.5) if time() - t > timeout: raise FileExistsError(f"File '{lockfile}' exists : could not lock on corresponding file.") lockfile.touch() return lockfile
Classes
class Emitter (path: Union[str, pathlib.Path, transparentpath.gcsutils.transparentpath.TransparentPath])
-
Instance to send messages to a receiver. It uses a json file to write messages.
Uses the same logic than
Config
to handle the json file that is read or written byNode
andifra.central_server.NodeGate
.Only messages which names are in
Emitter.DEFAULT_MESSAGES
are allowed.Attributes
path: TransparentPath The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a string that should be local, you did not set a global file system. If it does not exist, it is created with the default messages present in
Emitter.DEFAULT_MESSAGES
. If it exists and it indicates that the corresponding object is doint something, will raise an error. It it exists and says taht the corresponding object is doing nothing, overwrites the file. messages: dict A dictionnary of messagesMessages
error: str Set by the emitting object error occures. Will contain any error message raised. doing: str Name of the function currently being run by the emitting object
Parameters
path
:Union[str, Path, TransparentPath]
- The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a str that should be local, you did not set a global file system.
Expand source code
class Emitter: """Instance to send messages to a receiver. It uses a json file to write messages. Uses the same logic than `ifra.configs.Config` to handle the json file that is read or written by `ifra.node.Node` and `ifra.central_server.NodeGate`. Only messages which names are in `ifra.messenger.Emitter.DEFAULT_MESSAGES` are allowed. Attributes ---------- path: TransparentPath The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a string that should be local, you did not set a global file system. If it does not exist, it is created with the default messages present in `ifra.messenger.Emitter.DEFAULT_MESSAGES`. If it exists and it indicates that the corresponding object is doint something, will raise an error. It it exists and says taht the corresponding object is doing nothing, overwrites the file. messages: dict A dictionnary of messages Messages -------- error: str Set by the emitting object error occures. Will contain any error message raised. doing: str Name of the function currently being run by the emitting object """ DEFAULT_MESSAGES = {"error": None, "doing": None} """Overload in daughter classes.""" def __init__(self, path: Union[str, Path, TransparentPath]): """ Parameters ---------- path: Union[str, Path, TransparentPath] The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a str that should be local, you did not set a global file system. """ if type(path) == str: path = TransparentPath(path) if not path.suffix == ".json": raise ValueError("Emitter path must be a json") lockfile = get_lock(path, exists="raise") self.path = path if self.path.is_file(): try: if hasattr(self.path, "read"): self.messages = self.path.read() else: with open(self.path) as opath: self.messages = load(opath) for key in self.messages: if self.messages[key] == "": self.messages[key] = None lockfile.rm(absent="ignore") except Exception as e: lockfile.rm(absent="ignore") raise e else: self.messages = {"doing": None, "error": None} if self.doing is not None: raise ValueError( f"Message file '{path}' existed at Emitter creation and had 'doing={self.doing}'. " f"This can indicate a crash of a previous execution, or the fact that two" f"sessions are running at the same time. Delete this file and restart." ) lockfile.rm(absent="ignore") self.reset_messages() def __getstate__(self): return self.__dict__ def __setstate__(self, d: dict): for item in d: setattr(self, item, d[item]) def __getattr__(self, item): if item == "messages": raise ValueError("NodeMessenger object's 'messages' not set") if item == "path": raise ValueError("NodeMessenger object's 'path' not set") if item not in self.messages: raise ValueError(f"No messages named '{item}' was found") return self.messages[item] def __setattr__(self, item, value): if item == "messages" or item == "path": super().__setattr__(item, value) return if item not in self.messages and item not in self.__class__.DEFAULT_MESSAGES: raise ValueError(f"Message named '{item}' is not allowed") self.messages[item] = value self.save() def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result setattr(result, "messages", deepcopy(self.messages, memo)) return result def send(self, **kwargs): """Sends several messages at once. Keys in kwargs must be present in `ifra.messenger.Emitter.DEFAULT_MESSAGES` """ for key in kwargs: if key not in self.messages and key not in self.__class__.DEFAULT_MESSAGES: raise ValueError(f"Message named '{key}' is not allowed") self.messages[key] = kwargs[key] self.save() def save(self) -> None: """Saves the current messages into the file it used to load.""" to_save = copy(self.messages) lockfile = get_lock(self.path) try: for key in to_save: if key not in self.__class__.DEFAULT_MESSAGES: raise ValueError(f"Forbidden message '{key}'") if to_save[key] is None: # noinspection PyTypeChecker to_save[key] = "" self.path.write(to_save) lockfile.rm(absent="ignore") except Exception as e: lockfile.rm(absent="ignore") raise e def reset_messages(self): """Reset messages to their default values specified in `ifra.messenger.Emitter.DEFAULT_MESSAGES`""" self.messages = copy(self.__class__.DEFAULT_MESSAGES) self.save() def rm(self): """Removes the Emitter's json file.""" lockfile = get_lock(self.path) self.path.rm(absent="ignore") lockfile.rm()
Class variables
var DEFAULT_MESSAGES
-
Overload in daughter classes.
Methods
def reset_messages(self)
-
Reset messages to their default values specified in
Emitter.DEFAULT_MESSAGES
Expand source code
def reset_messages(self): """Reset messages to their default values specified in `ifra.messenger.Emitter.DEFAULT_MESSAGES`""" self.messages = copy(self.__class__.DEFAULT_MESSAGES) self.save()
def rm(self)
-
Removes the Emitter's json file.
Expand source code
def rm(self): """Removes the Emitter's json file.""" lockfile = get_lock(self.path) self.path.rm(absent="ignore") lockfile.rm()
def save(self) ‑> None
-
Saves the current messages into the file it used to load.
Expand source code
def save(self) -> None: """Saves the current messages into the file it used to load.""" to_save = copy(self.messages) lockfile = get_lock(self.path) try: for key in to_save: if key not in self.__class__.DEFAULT_MESSAGES: raise ValueError(f"Forbidden message '{key}'") if to_save[key] is None: # noinspection PyTypeChecker to_save[key] = "" self.path.write(to_save) lockfile.rm(absent="ignore") except Exception as e: lockfile.rm(absent="ignore") raise e
def send(self, **kwargs)
-
Sends several messages at once. Keys in kwargs must be present in
Emitter.DEFAULT_MESSAGES
Expand source code
def send(self, **kwargs): """Sends several messages at once. Keys in kwargs must be present in `ifra.messenger.Emitter.DEFAULT_MESSAGES` """ for key in kwargs: if key not in self.messages and key not in self.__class__.DEFAULT_MESSAGES: raise ValueError(f"Message named '{key}' is not allowed") self.messages[key] = kwargs[key] self.save()
class Receiver (path: Union[str, pathlib.Path, transparentpath.gcsutils.transparentpath.TransparentPath], wait: bool = True)
-
Instance to read messages from an emitter. It uses a json file to read messages.
Uses the same logic than
Config
to handle the json file.Only messages which names are in
Receiver.DEFAULT_MESSAGES
are allowed.Attributes
path
:TransparentPath
- The path to the json file containing the messages. Will be casted into a transparentpath if a str is
- given, so make sure that if you pass a string that should be local, you did not set a global file system.
- If it does not exist, Receiver waits until the file exists, or raises TimeoutError if it waited more than
Receiver.timeout_when_missing
seconds (default is 600s).messages
:dict
A dictionnary of messages
Parameters
path
:Union[str, Path, TransparentPath]
- The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a str that should be local, you did not set a global file system.
wait
:bool
- If True, will wait for 'path' to point to an existing file for
Emitter.DEFAULT_MESSAGES
seconds at Receiver creation if it does not exist. Else, messages will be those inEmitter.DEFAULT_MESSAGES
Expand source code
class Receiver: """Instance to read messages from an emitter. It uses a json file to read messages. Uses the same logic than `ifra.configs.Config` to handle the json file. Only messages which names are in `ifra.messenger.Receiver.DEFAULT_MESSAGES` are allowed. Attributes ---------- path: TransparentPath The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a string that should be local, you did not set a global file system. If it does not exist, Receiver waits until the file exists, or raises TimeoutError if it waited more than `ifra.messenger.Receiver.timeout_when_missing` seconds (default is 600s). messages: dict A dictionnary of messages """ DEFAULT_MESSAGES = {} """Overload in daughter classes. Must match the `ifra.messenger.Emitter.DEFAULT_MESSAGES` of the emitter that will write to the file the receiver will read.""" timeout_when_missing = 600 """How long the messenger is supposed to wait for its file to come back when using `ifra.messenger.NodeMessenger.get_latest_messages`""" def __init__(self, path: Union[str, Path, TransparentPath], wait: bool = True): """ Parameters ---------- path: Union[str, Path, TransparentPath] The path to the json file containing the messages. Will be casted into a transparentpath if a str is given, so make sure that if you pass a str that should be local, you did not set a global file system. wait: bool If True, will wait for 'path' to point to an existing file for `ifra.messenger.Emitter.DEFAULT_MESSAGES` seconds at Receiver creation if it does not exist. Else, messages will be those in `ifra.messenger.Emitter.DEFAULT_MESSAGES` """ if type(path) == str: path = TransparentPath(path) if not path.suffix == ".json": raise ValueError("Messenger path must be a json") self.path = path self.messages = {} self.new = True self.get_latest_messages(wait) def __getstate__(self): return self.__dict__ def __setstate__(self, d: dict): for item in d: setattr(self, item, d[item]) def __getattr__(self, item): if item == "messages": raise ValueError("Messenger object's 'messages' not set") if item == "path": raise ValueError("Messenger object's 'path' not set") if item not in self.messages: raise ValueError(f"No messages named '{item}' was found") return self.messages[item] def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result setattr(result, "messages", deepcopy(self.messages, memo)) return result def get_latest_messages(self, wait: bool = False) -> bool: """Gets latest messages by reading the messages json file. If file does not exist and it is the first time we read it (object init), wait for it for at most `ifra.messenger.Receiver.timeout_when_missing` seconds then raises TimeoutError. If file does not exist and it is not the first time we read it, returns False. If messages are read, returns True""" first_message = True if self.__class__.timeout_when_missing is None: raise ValueError("'timeout_when_missing' can not be None") t = time() while not self.path.is_file() and time() - t < self.timeout_when_missing: if not self.new: logger.warning(f"Message file {self.path} disapread. No message received.") return False if not wait: logger.warning( f"Message file {self.path} does not exist. No message received, using defaults messages." ) self.messages = copy(self.DEFAULT_MESSAGES) return False if first_message: first_message = False logger.info(f"Message file {self.path} not here yet. Waiting for it to arrive...") sleep(1) if not self.path.is_file(): raise TimeoutError(f"Could not find message file '{self.path}' in less that " f"{self.__class__.timeout_when_missing} seconds. Aborting.") if first_message is False: logger.info("...message file found. Resuming.") self.new = False lockfile = get_lock(self.path) try: if hasattr(self.path, "read"): self.messages = self.path.read() else: with open(self.path) as opath: self.messages = load(opath) for key in self.messages: if self.messages[key] == "": self.messages[key] = None lockfile.rm(absent="ignore") except Exception as e: lockfile.rm(absent="ignore") raise e return True def reset_messages(self): """Reset messages to their default values specified in `ifra.messenger.Receiver.DEFAULT_MESSAGES`""" self.messages = copy(self.DEFAULT_MESSAGES) self.save()
Class variables
var DEFAULT_MESSAGES
-
Overload in daughter classes. Must match the
Emitter.DEFAULT_MESSAGES
of the emitter that will write to the file the receiver will read. var timeout_when_missing
-
How long the messenger is supposed to wait for its file to come back when using
ifra.messenger.NodeMessenger.get_latest_messages
Methods
def get_latest_messages(self, wait: bool = False) ‑> bool
-
Gets latest messages by reading the messages json file. If file does not exist and it is the first time we read it (object init), wait for it for at most
Receiver.timeout_when_missing
seconds then raises TimeoutError. If file does not exist and it is not the first time we read it, returns False. If messages are read, returns TrueExpand source code
def get_latest_messages(self, wait: bool = False) -> bool: """Gets latest messages by reading the messages json file. If file does not exist and it is the first time we read it (object init), wait for it for at most `ifra.messenger.Receiver.timeout_when_missing` seconds then raises TimeoutError. If file does not exist and it is not the first time we read it, returns False. If messages are read, returns True""" first_message = True if self.__class__.timeout_when_missing is None: raise ValueError("'timeout_when_missing' can not be None") t = time() while not self.path.is_file() and time() - t < self.timeout_when_missing: if not self.new: logger.warning(f"Message file {self.path} disapread. No message received.") return False if not wait: logger.warning( f"Message file {self.path} does not exist. No message received, using defaults messages." ) self.messages = copy(self.DEFAULT_MESSAGES) return False if first_message: first_message = False logger.info(f"Message file {self.path} not here yet. Waiting for it to arrive...") sleep(1) if not self.path.is_file(): raise TimeoutError(f"Could not find message file '{self.path}' in less that " f"{self.__class__.timeout_when_missing} seconds. Aborting.") if first_message is False: logger.info("...message file found. Resuming.") self.new = False lockfile = get_lock(self.path) try: if hasattr(self.path, "read"): self.messages = self.path.read() else: with open(self.path) as opath: self.messages = load(opath) for key in self.messages: if self.messages[key] == "": self.messages[key] = None lockfile.rm(absent="ignore") except Exception as e: lockfile.rm(absent="ignore") raise e return True
def reset_messages(self)
-
Reset messages to their default values specified in
Receiver.DEFAULT_MESSAGES
Expand source code
def reset_messages(self): """Reset messages to their default values specified in `ifra.messenger.Receiver.DEFAULT_MESSAGES`""" self.messages = copy(self.DEFAULT_MESSAGES) self.save()