Module ifra.central_server

Expand source code
from datetime import datetime
from time import time, sleep
from typing import Union

from ruleskit import RuleSet
from tablewriter import TableWriter
import logging

from .configs import CentralConfig
from .actor import Actor
from .decorator import emit

logger = logging.getLogger(__name__)


class CentralServer(Actor):
    """Implementation of the notion of central server in federated learning.

    It monitors changes in a remote GCP directory, were aggregator is expected to write its model.
    Upon changes of the model file, the central server downloads is, updates its using
    `ifra.central_model_updaters.central_model_update` model and saves it to a directory.
    This directory is read by the nodes to update their own models.

    Attributes
    ----------
    central_configs: CentralConfig
        see `ifra.configs.CentralConfig`
    model: RuleSet
        Central server model's model
    last_fetch: datetime
        last time the aggregated model was fetched
    """

    def __init__(
        self,
        central_configs: CentralConfig,
    ):
        """
        Parameters
        ----------
        central_configs: CentralConfig
            This central server configurations. see `ifra.configs.CentralConfig`.
        """
        self.central_configs = None
        self.model = None
        self.last_fetch = None
        super().__init__(central_configs=central_configs)

    @emit
    def create(self):
        self.model = RuleSet()

    @emit
    def update(self):
        """Add new rules in aggregated_model to central model's models.
        If new rules were found, save the updated model to `ifra.central_server.CentralServer`'s 
        *central_configs.central_model_path*.
        
        Returns
        -------
        bool
            True if new rules were added, False if aggregated_model contained no new rules
        """
        aggregated_model = RuleSet()
        aggregated_model.load(self.central_configs.aggregated_model_path)
        self.last_fetch = datetime.now()
        if self.model is None:
            self.model = aggregated_model
        else:
            for r in aggregated_model:
                if r in self.model:
                    logger.warning(
                        f"Central Server - Rule '{r}' was aggregated despite being already in the central server. "
                        "It is ignored, its prediction is discarded."
                    )
                else:
                    self.model.append(r, update_activation=False)
        logger.info(f"Central Server - Fetched new model from aggregator at {self.last_fetch}")
        self.model_to_file()

    @emit
    def model_to_file(self) -> None:
        """Saves `ifra.node.Node` *model* to `ifra.configs.CentralConfig` *central_model_path*,
        overwritting any existing file here, and in another file in the same directory but with a unique name.
        Will also produce a .pdf of the model using TableWriter.
        Does not do anything if `ifra.node.Node` *model* is None
        """
        model = self.model

        iteration = 0
        name = self.central_configs.central_model_path.stem
        path = self.central_configs.central_model_path.parent / f"{name}_{iteration}.csv"
        while path.is_file():
            iteration += 1
            path = self.central_configs.central_model_path.parent / f"{name}_{iteration}.csv"

        path = path.with_suffix(".csv")
        if not path.parent.isdir():
            path.parent.mkdir(parents=True)
        model.save(path)
        model.save(self.central_configs.central_model_path)
        logger.info(f"Central Server - Saved central model in '{self.central_configs.central_model_path}'")

        try:
            path_table = path.with_suffix(".pdf")
            TableWriter(
                path_table, path.read(index_col=0).apply(lambda x: x.round(3) if x.dtype == float else x), paperwidth=30
            ).compile(clean_tex=True)
        except ValueError:
            logger.warning("Central Server - Failed to produce tablewriter. Is LaTeX installed ?")

    @emit
    def run(self, timeout: Union[int, float] = 0, sleeptime: Union[int, float] = 5):
        """Monitors new changes in the nodes, every ''sleeptime'' seconds for ''timeout'' seconds, triggering
        aggregation and pushing central model to nodes when enough new node models are available.

        Stops if all nodes gave a model but no new rules could be found from them. Tells the nodes to stop too.

        Parameters
        ----------
        timeout: Union[int, float]
            How many seconds should the run last. If <= 0, will last until killed by the user. Default value = 0.
        sleeptime: Union[int, float]
            How many seconds between each checks for new nodes models. Default value = 5.
        """

        started = self.iterations != 0  # To force at least one loop of the while to trigger

        if timeout <= 0:
            logger.warning(
                "Central Server - You did not specify a timeout for your run. It will last until manually stopped."
            )
        logger.info(
            "Central Server - Starting central server. Monitoring changes in"
            f" {self.central_configs.aggregated_model_path}."
        )

        t = time()
        while time() - t < timeout or timeout <= 0 or started is False:
            started = True
            if len(self.model) == 0:
                if self.central_configs.aggregated_model_path.is_file():
                    self.update()
                    self.iterations += 1
            else:
                if (
                        self.central_configs.aggregated_model_path.is_file()
                        and self.central_configs.aggregated_model_path.info()["mtime"] > self.last_fetch.timestamp()
                ):
                    self.update()
                    self.iterations += 1

            sleep(sleeptime)

        logger.info(f"Central Server - Timeout of {timeout} seconds reached, stopping central server.")
        if self.model is None:
            logger.warning("Learning failed to produce a central model. No output generated.")
        logger.info(
            f"Central Server - Made {self.iterations} complete iterations between central server and aggregator."
        )
        logger.info(f"Central Server - Results saved in {self.central_configs.central_model_path}")

Classes

class CentralServer (central_configs: CentralConfig)

Implementation of the notion of central server in federated learning.

It monitors changes in a remote GCP directory, were aggregator is expected to write its model. Upon changes of the model file, the central server downloads is, updates its using ifra.central_model_updaters.central_model_update model and saves it to a directory. This directory is read by the nodes to update their own models.

Attributes

central_configs : CentralConfig
see CentralConfig
model : RuleSet
Central server model's model
last_fetch : datetime
last time the aggregated model was fetched

Parameters

central_configs : CentralConfig
This central server configurations. see CentralConfig.
Expand source code
class CentralServer(Actor):
    """Implementation of the notion of central server in federated learning.

    It monitors changes in a remote GCP directory, were aggregator is expected to write its model.
    Upon changes of the model file, the central server downloads is, updates its using
    `ifra.central_model_updaters.central_model_update` model and saves it to a directory.
    This directory is read by the nodes to update their own models.

    Attributes
    ----------
    central_configs: CentralConfig
        see `ifra.configs.CentralConfig`
    model: RuleSet
        Central server model's model
    last_fetch: datetime
        last time the aggregated model was fetched
    """

    def __init__(
        self,
        central_configs: CentralConfig,
    ):
        """
        Parameters
        ----------
        central_configs: CentralConfig
            This central server configurations. see `ifra.configs.CentralConfig`.
        """
        self.central_configs = None
        self.model = None
        self.last_fetch = None
        super().__init__(central_configs=central_configs)

    @emit
    def create(self):
        self.model = RuleSet()

    @emit
    def update(self):
        """Add new rules in aggregated_model to central model's models.
        If new rules were found, save the updated model to `ifra.central_server.CentralServer`'s 
        *central_configs.central_model_path*.
        
        Returns
        -------
        bool
            True if new rules were added, False if aggregated_model contained no new rules
        """
        aggregated_model = RuleSet()
        aggregated_model.load(self.central_configs.aggregated_model_path)
        self.last_fetch = datetime.now()
        if self.model is None:
            self.model = aggregated_model
        else:
            for r in aggregated_model:
                if r in self.model:
                    logger.warning(
                        f"Central Server - Rule '{r}' was aggregated despite being already in the central server. "
                        "It is ignored, its prediction is discarded."
                    )
                else:
                    self.model.append(r, update_activation=False)
        logger.info(f"Central Server - Fetched new model from aggregator at {self.last_fetch}")
        self.model_to_file()

    @emit
    def model_to_file(self) -> None:
        """Saves `ifra.node.Node` *model* to `ifra.configs.CentralConfig` *central_model_path*,
        overwritting any existing file here, and in another file in the same directory but with a unique name.
        Will also produce a .pdf of the model using TableWriter.
        Does not do anything if `ifra.node.Node` *model* is None
        """
        model = self.model

        iteration = 0
        name = self.central_configs.central_model_path.stem
        path = self.central_configs.central_model_path.parent / f"{name}_{iteration}.csv"
        while path.is_file():
            iteration += 1
            path = self.central_configs.central_model_path.parent / f"{name}_{iteration}.csv"

        path = path.with_suffix(".csv")
        if not path.parent.isdir():
            path.parent.mkdir(parents=True)
        model.save(path)
        model.save(self.central_configs.central_model_path)
        logger.info(f"Central Server - Saved central model in '{self.central_configs.central_model_path}'")

        try:
            path_table = path.with_suffix(".pdf")
            TableWriter(
                path_table, path.read(index_col=0).apply(lambda x: x.round(3) if x.dtype == float else x), paperwidth=30
            ).compile(clean_tex=True)
        except ValueError:
            logger.warning("Central Server - Failed to produce tablewriter. Is LaTeX installed ?")

    @emit
    def run(self, timeout: Union[int, float] = 0, sleeptime: Union[int, float] = 5):
        """Monitors new changes in the nodes, every ''sleeptime'' seconds for ''timeout'' seconds, triggering
        aggregation and pushing central model to nodes when enough new node models are available.

        Stops if all nodes gave a model but no new rules could be found from them. Tells the nodes to stop too.

        Parameters
        ----------
        timeout: Union[int, float]
            How many seconds should the run last. If <= 0, will last until killed by the user. Default value = 0.
        sleeptime: Union[int, float]
            How many seconds between each checks for new nodes models. Default value = 5.
        """

        started = self.iterations != 0  # To force at least one loop of the while to trigger

        if timeout <= 0:
            logger.warning(
                "Central Server - You did not specify a timeout for your run. It will last until manually stopped."
            )
        logger.info(
            "Central Server - Starting central server. Monitoring changes in"
            f" {self.central_configs.aggregated_model_path}."
        )

        t = time()
        while time() - t < timeout or timeout <= 0 or started is False:
            started = True
            if len(self.model) == 0:
                if self.central_configs.aggregated_model_path.is_file():
                    self.update()
                    self.iterations += 1
            else:
                if (
                        self.central_configs.aggregated_model_path.is_file()
                        and self.central_configs.aggregated_model_path.info()["mtime"] > self.last_fetch.timestamp()
                ):
                    self.update()
                    self.iterations += 1

            sleep(sleeptime)

        logger.info(f"Central Server - Timeout of {timeout} seconds reached, stopping central server.")
        if self.model is None:
            logger.warning("Learning failed to produce a central model. No output generated.")
        logger.info(
            f"Central Server - Made {self.iterations} complete iterations between central server and aggregator."
        )
        logger.info(f"Central Server - Results saved in {self.central_configs.central_model_path}")

Ancestors

Methods

def model_to_file(self) ‑> None

Saves Node model to CentralConfig central_model_path, overwritting any existing file here, and in another file in the same directory but with a unique name. Will also produce a .pdf of the model using TableWriter. Does not do anything if Node model is None

Expand source code
@emit
def model_to_file(self) -> None:
    """Saves `ifra.node.Node` *model* to `ifra.configs.CentralConfig` *central_model_path*,
    overwritting any existing file here, and in another file in the same directory but with a unique name.
    Will also produce a .pdf of the model using TableWriter.
    Does not do anything if `ifra.node.Node` *model* is None
    """
    model = self.model

    iteration = 0
    name = self.central_configs.central_model_path.stem
    path = self.central_configs.central_model_path.parent / f"{name}_{iteration}.csv"
    while path.is_file():
        iteration += 1
        path = self.central_configs.central_model_path.parent / f"{name}_{iteration}.csv"

    path = path.with_suffix(".csv")
    if not path.parent.isdir():
        path.parent.mkdir(parents=True)
    model.save(path)
    model.save(self.central_configs.central_model_path)
    logger.info(f"Central Server - Saved central model in '{self.central_configs.central_model_path}'")

    try:
        path_table = path.with_suffix(".pdf")
        TableWriter(
            path_table, path.read(index_col=0).apply(lambda x: x.round(3) if x.dtype == float else x), paperwidth=30
        ).compile(clean_tex=True)
    except ValueError:
        logger.warning("Central Server - Failed to produce tablewriter. Is LaTeX installed ?")
def run(self, timeout: Union[int, float] = 0, sleeptime: Union[int, float] = 5)

Monitors new changes in the nodes, every ''sleeptime'' seconds for ''timeout'' seconds, triggering aggregation and pushing central model to nodes when enough new node models are available.

Stops if all nodes gave a model but no new rules could be found from them. Tells the nodes to stop too.

Parameters

timeout : Union[int, float]
How many seconds should the run last. If <= 0, will last until killed by the user. Default value = 0.
sleeptime : Union[int, float]
How many seconds between each checks for new nodes models. Default value = 5.
Expand source code
@emit
def run(self, timeout: Union[int, float] = 0, sleeptime: Union[int, float] = 5):
    """Monitors new changes in the nodes, every ''sleeptime'' seconds for ''timeout'' seconds, triggering
    aggregation and pushing central model to nodes when enough new node models are available.

    Stops if all nodes gave a model but no new rules could be found from them. Tells the nodes to stop too.

    Parameters
    ----------
    timeout: Union[int, float]
        How many seconds should the run last. If <= 0, will last until killed by the user. Default value = 0.
    sleeptime: Union[int, float]
        How many seconds between each checks for new nodes models. Default value = 5.
    """

    started = self.iterations != 0  # To force at least one loop of the while to trigger

    if timeout <= 0:
        logger.warning(
            "Central Server - You did not specify a timeout for your run. It will last until manually stopped."
        )
    logger.info(
        "Central Server - Starting central server. Monitoring changes in"
        f" {self.central_configs.aggregated_model_path}."
    )

    t = time()
    while time() - t < timeout or timeout <= 0 or started is False:
        started = True
        if len(self.model) == 0:
            if self.central_configs.aggregated_model_path.is_file():
                self.update()
                self.iterations += 1
        else:
            if (
                    self.central_configs.aggregated_model_path.is_file()
                    and self.central_configs.aggregated_model_path.info()["mtime"] > self.last_fetch.timestamp()
            ):
                self.update()
                self.iterations += 1

        sleep(sleeptime)

    logger.info(f"Central Server - Timeout of {timeout} seconds reached, stopping central server.")
    if self.model is None:
        logger.warning("Learning failed to produce a central model. No output generated.")
    logger.info(
        f"Central Server - Made {self.iterations} complete iterations between central server and aggregator."
    )
    logger.info(f"Central Server - Results saved in {self.central_configs.central_model_path}")
def update(self)

Add new rules in aggregated_model to central model's models. If new rules were found, save the updated model to CentralServer's central_configs.central_model_path.

Returns

bool
True if new rules were added, False if aggregated_model contained no new rules
Expand source code
@emit
def update(self):
    """Add new rules in aggregated_model to central model's models.
    If new rules were found, save the updated model to `ifra.central_server.CentralServer`'s 
    *central_configs.central_model_path*.
    
    Returns
    -------
    bool
        True if new rules were added, False if aggregated_model contained no new rules
    """
    aggregated_model = RuleSet()
    aggregated_model.load(self.central_configs.aggregated_model_path)
    self.last_fetch = datetime.now()
    if self.model is None:
        self.model = aggregated_model
    else:
        for r in aggregated_model:
            if r in self.model:
                logger.warning(
                    f"Central Server - Rule '{r}' was aggregated despite being already in the central server. "
                    "It is ignored, its prediction is discarded."
                )
            else:
                self.model.append(r, update_activation=False)
    logger.info(f"Central Server - Fetched new model from aggregator at {self.last_fetch}")
    self.model_to_file()

Inherited members