Module ruleskit.ruleset

Expand source code
import inspect
from abc import ABC
import ast
import pandas as pd
from copy import copy
from typing import List, Union, Tuple, Any, Optional, Callable
from collections import Counter
import numpy as np
import itertools
from collections import OrderedDict

from .condition import HyperrectangleCondition

from .rule import Rule, ClassificationRule, RegressionRule
from .activation import Activation
from .utils import rfunctions as functions
import logging
import warnings

from .utils.rfunctions import (
    calc_ruleset_prediction_weighted_regressor_stacked,
    calc_ruleset_prediction_weighted_classificator_stacked,
    calc_ruleset_prediction_equally_weighted_classificator_stacked,
    calc_ruleset_prediction_equally_weighted_regressor_stacked,
    calc_ruleset_prediction_weighted_regressor_unstacked,
    calc_ruleset_prediction_equally_weighted_regressor_unstacked,
    calc_ruleset_prediction_weighted_classificator_unstacked,
    calc_ruleset_prediction_equally_weighted_classificator_unstacked,
    calc_zscore_external,
)

logger = logging.getLogger(__name__)


class RuleSet(ABC):
    """A set of rules"""

    NLINES = 5  # half how many rules to show in str(self)
    CHECK_DUPLICATED = False
    STACKED_FIT = False
    all_features_indexes = {}
    attributes_from_train_set = {ClassificationRule: ["train_set_size"], RegressionRule: ["train_set_size"]}
    attributes_from_test_set = {
        ClassificationRule: ["criterion", "test_set_size"],
        RegressionRule: ["criterion", "test_set_size"],
    }

    @staticmethod
    def check_duplicated_rules(rules, name_or_index: str = "index"):
        if name_or_index == "index":
            str_rules = [str(r.features_indexes) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in rules]
        else:
            str_rules = [str(r.features_names) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in rules]
        if len(set(str_rules)) < len(str_rules):
            duplicated = {}
            for r in str_rules:
                if r not in duplicated:
                    duplicated[r] = 0
                duplicated[r] += 1
            duplicated = [
                f"{r}: {duplicated[r]} (positions {[i for i, x in enumerate(str_rules) if x == r]})\n"
                f"   underlying rules : {[str(rules[i]) for i in [i for i, x in enumerate(str_rules) if x == r]]}"
                for r in duplicated
                if duplicated[r] > 1
            ]
            s = "\n -".join(duplicated)
            raise ValueError(f"There are {len(duplicated)} duplicated rules in your ruleset !\n {s}")

    def __init__(
        self,
        rules_list: Union[List[Rule], None] = None,
        compute_activation: bool = True,
        stack_activation: bool = False,
    ):
        """

        Parameters
        ----------
        rules_list: Union[List[Rule], None]
            The list of rules to start with. Can be None, since a RuleSet can be filled after its creation.
        compute_activation: bool
            The activation of the RuleSet is the logical OR of the activation of all its rules. It is only computed
            if compute_activation is True. (default value = True)
        stack_activation: bool
            If True, the RuleSet will keep in memory 2-D np.ndarray containing the activations of all its rules. This
            can take a lot of memory, but can save time if you apply numpy methods on this stacked vector instead of on
            each rule separately. (default value = False)
        """
        self._rules: List[Rule] = []
        self.features_names: List[str] = []
        self.features_indexes: List[int] = []
        self._activation: Optional[Activation] = None
        self._coverage: Optional[float] = None  # in case Activation is not available
        self.criterion: Optional[float] = None
        self.train_set_size: Optional[int] = None
        self.test_set_size: Optional[int] = None
        """Set by self.eval"""
        self.stacked_activations: Optional[pd.DataFrame] = None
        self.stack_activation: bool = stack_activation
        self._rule_type = None
        self._compute_activation = True  # Should NOT be equal to given compute_activation argument, but always True.
        if rules_list is not None:
            names_available = all([hasattr(r.condition, "features_names") for r in self])
            for rule in rules_list:
                if not isinstance(rule, Rule) and rule is not None:
                    raise TypeError(f"Some rules in given iterable were not of type 'Rule' but of type {type(rule)}")
                if rule is not None:
                    self.append(rule, update_activation=False)
            if compute_activation:
                self.compute_self_activation()
            if self.stack_activation:
                self.compute_stacked_activation()
            if names_available:
                self.features_names = list(set(itertools.chain(*[rule.features_names for rule in self])))
            self.set_features_indexes()
        if self.__class__.CHECK_DUPLICATED:
            self.check_duplicated_rules(self.rules, name_or_index="name" if len(self.features_names) > 0 else "index")

    # noinspection PyProtectedMember,PyTypeChecker
    def __iadd__(self, other: Union["RuleSet", Rule]):
        """Appends a rule or each rules of another RuleSet to self and updates activation vector and stacked activations
        if needed. Also updates features_indexes, and features_names if possible."""
        if isinstance(other, Rule):
            if self._rule_type is None:
                self.rule_type = type(other)
            else:
                if not isinstance(other, self.rule_type):
                    raise TypeError(
                        f"Ruleset previously had rules of type {self.rule_type},"
                        f" so can not add rule of type {type(other)}"
                    )
            self._rules.append(other)
        else:
            if self._rule_type is not None and other._rule_type is not None and self.rule_type != other.rule_type:
                raise TypeError(
                    f"Ruleset previously had rules of type {self.rule_type},"
                    f" so can not add rules of type {other.rule_type}"
                )
            if self._rule_type is None and other._rule_type is not None:
                self.rule_type = other.rule_type
            self._rules += other._rules

        self.features_indexes = list(set(self.features_indexes + other.features_indexes))
        if hasattr(other, "features_names"):
            self.features_names = list(set(self.features_names + other.features_names))
        if self._compute_activation:
            self._update_activation(other)
        if self.stack_activation:
            self._update_stacked_activation(other)
        return self

    def __add__(self, other: Union["RuleSet", Rule]):
        """Returns the RuleSet resulting in appendind a rule or each rules of another RuleSet to self."""
        stack_activation = self.stack_activation
        if isinstance(other, Rule):
            rules = self.rules + [other]
        else:
            stack_activation &= other.stack_activation
            rules = list(set(self.rules + other.rules))
        return self.__class__(rules, compute_activation=self._compute_activation, stack_activation=stack_activation)

    def __getattr__(self, item):
        """If item is not found in self, try to fetch it from its activation."""
        if item == "_activation":
            raise AttributeError(f"'RuleSet' object has no attribute '{item}'")

        if self._activation is not None and hasattr(self._activation, item):
            return getattr(self._activation, item)
        raise AttributeError(f"'RuleSet' object has no attribute '{item}'")

    def __len__(self):
        """The length of a RuleSet its the number of rules stored in it."""
        return len(self.rules)

    def __eq__(self, other: "RuleSet"):
        return set(self.rules) == set(other.rules)

    def __iter__(self):
        if hasattr(self, "_rules"):
            return self.rules.__iter__()
        else:
            return [].__iter__()

    def __getitem__(self, key):
        if isinstance(key, slice):
            indices = range(*key.indices(len(self.rules)))
            return self.__class__([self.rules[i] for i in indices])
        return self.rules.__getitem__(key)

    def __str__(self):
        if len(self) < 2 * self.__class__.NLINES:
            return "\n".join([str(self[i]) for i in range(len(self))])
        else:
            return "\n".join(
                [str(self[i]) for i in range(self.__class__.NLINES)]
                + ["..."]
                + [str(self[i]) for i in range(len(self) - self.__class__.NLINES, len(self))]
            )

    def __hash__(self) -> hash:
        return hash(frozenset(self.to_hash))

    # noinspection PyProtectedMember
    def __contains__(self, other: Rule) -> bool:
        """A RuleSet contains another Rule if the two rule's conditions and predictions are the same"""
        name_or_index = "name" if len(self.features_names) > 0 else "index"

        if name_or_index == "index":
            str_rules = [str(r.features_indexes) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in self]
            str_rule = str(other.features_indexes) + str(other.bmins) + str(other.bmaxs) + str(other.prediction)
        else:
            str_rules = [str(r.features_names) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in self]
            str_rule = str(other.features_names) + str(other.bmins) + str(other.bmaxs) + str(other.prediction)

        return str_rule in str_rules

    @property
    def rule_type(self) -> type:
        if self._rule_type is None:
            raise ValueError("Ruleset's rule type is not set !")
        return self._rule_type

    @rule_type.setter
    def rule_type(self, rule_type: type):
        if not issubclass(rule_type, (ClassificationRule, RegressionRule, Rule)):
            raise TypeError(
                "Ruleset's rule type must be a subclass of ruleskit.RegressionRule or ruleskit.ClassificationRule, not"
                f" {rule_type}"
            )
        self._rule_type = rule_type

    @property
    def rules(self) -> List[Rule]:
        return self._rules

    @rules.setter
    def rules(self, rules: Union[List[Rule], None]):
        ruleset = RuleSet(rules, stack_activation=self.stack_activation)
        self._rules = ruleset._rules
        self.features_names = ruleset.features_names
        self.features_indexes = ruleset.features_indexes
        self.stacked_activations = ruleset.stacked_activations
        self._activation = ruleset._activation

    @property
    def to_hash(self) -> Tuple[str]:
        if len(self) == 0:
            return "rs",
        to_hash = ("rs",)
        for r in self:
            rule_hash = r.to_hash[1:]
            to_hash += rule_hash
        return to_hash

    @property
    def activation_available(self) -> bool:
        """Returns True if the RuleSet has an activation vector, and if this Activation's object data is available."""
        if self._activation is None or self._activation.length == 0:
            return False
        if self._activation.data_format == "file":
            return self._activation.data.is_file()
        else:
            return self._activation.data is not None

    @property
    def stacked_activations_available(self) -> bool:
        """Returns True is the RuleSet has its rules' stacked activations."""
        if self.stack_activation is None or self._activation.length == 0:
            return False
        return True

    @property
    def activation(self) -> Union[None, np.ndarray]:
        """Returns the Activation vector's data in a form of a 1-D np.ndarray, or None if not available.

        Returns:
        --------
        Union[None, np.ndarray]
            of the form [0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, ...]
        """
        if self._activation:
            return self._activation.raw
        return None

    @property
    def ruleset_coverage(self) -> float:
        """Coverage is the fraction of points equal to 1 in the activation vector"""
        if not self.activation_available:
            return self._coverage
        else:
            return self._activation.coverage

    # noinspection PyProtectedMember,PyTypeChecker
    def _update_activation(self, other: Union[Rule, "RuleSet"]):
        """Updates the activation vector of the RuleSet with the activation vector of a new Rule or RuleSet."""
        if other.activation_available:
            if self._activation is None or self._activation.length == 0:
                self._activation = Activation(other.activation, to_file=Rule.LOCAL_ACTIVATION)
            else:
                self._activation = self._activation | other._activation

    # noinspection PyProtectedMember,PyTypeChecker
    def _update_stacked_activation(self, other: Union[Rule, "RuleSet"]):
        """Updates the stacked activation vectors of the RuleSet with the activation vector of a new Rule or
        the stacked activation vectors of another RuleSet."""
        if other.activation_available:

            if self.stacked_activations is None:
                if isinstance(other, Rule):
                    self.stacked_activations = pd.DataFrame(
                        data=np.array(other.activation).T, columns=[str(other.condition)]
                    )
                else:
                    self.stacked_activations = other.stacked_activations
            else:
                if isinstance(other, Rule):
                    self.stacked_activations[str(other.condition)] = other.activation
                else:
                    self.stacked_activations = pd.concat([self.stacked_activations, other.stacked_activations], axis=1)

    def set_features_indexes(self):
        if len(self.__class__.all_features_indexes) > 0:
            self.features_indexes = [self.__class__.all_features_indexes[f] for f in self.features_names]
            for r in self._rules:
                # noinspection PyProtectedMember
                r._condition._features_indexes = [self.__class__.all_features_indexes[f] for f in r.features_names]
        else:
            list(set(itertools.chain(*[rule.features_indexes for rule in self])))

    # noinspection PyProtectedMember
    def fit(
        self,
        y: Union[np.ndarray, pd.Series],
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        force_if_not_good: bool = False,
        **kwargs,
    ) -> List[Rule]:
        """Fits the ruleset on y and xs to produce the rules' activation vectors and attributes relevant to train set.

        Parameters
        ----------
        y: Union[np.ndarray, pd.Series]
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
        force_if_not_good: bool
        kwargs

        Returns
        -------
        List[Rule]
            List of rules that were excluded from the ruleset after fitting because they were 'bad'
        """
        if issubclass(self.rule_type, ClassificationRule):
            type_to_use = ClassificationRule
        elif issubclass(self.rule_type, RegressionRule):
            type_to_use = RegressionRule
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

        if "method" in kwargs:
            raise IndexError("Key 'method' can not be given to 'fit'")

        def launch_method(method, **kw):
            expected_args = list(inspect.signature(method).parameters)
            if "kwargs" not in expected_args:
                kw = {item: kw[item] for item in kw if item in expected_args}
            return method(**kw)

        if xs is not None and len(xs) == 0:
            logger.warning("Given xs is empty")
            return []

        if len(self) == 0:
            logger.debug("Ruleset is empty. Nothing to fit.")
            return []

        if all([r._fitted for r in self]) and xs is None:
            return []

        if self.__class__.STACKED_FIT:

            clean_activation = False
            # Activation must always be computed from train set
            if xs is not None:
                clean_activation = not self.stack_activation
                self.stack_activation = True
                self.calc_activation(xs)
                self.stack_activation = not clean_activation
            elif self.stacked_activations is None:
                clean_activation = not self.stack_activation
                self.stack_activation = True
                self.compute_stacked_activation()
                self.stack_activation = not clean_activation

            if isinstance(y, np.ndarray):
                # noinspection PyUnresolvedReferences
                if not len(self.stacked_activations.index) == y.shape[0]:
                    raise IndexError(
                        "Stacked activation and y have different number of rows. Use pd.Series for y to"
                        " reindex stacked activations automatically."
                    )
            else:
                self.stacked_activations.index = y.index

            computed_attrs = {}
            # noinspection PyUnresolvedReferences
            for attr in self.rule_type.attributes_from_train_set:
                if attr == "activation":
                    raise ValueError("'activation' can not be specified in 'attributes_from_train_set'")
                computed_attrs[f"{attr}s"] = launch_method(
                    getattr(self, f"calc_{attr}s"), y=y, xs=xs, **computed_attrs, **kwargs
                )
            to_drop = []

            if clean_activation:
                self.del_stacked_activations()
            else:
                if isinstance(y, np.ndarray):
                    self.stacked_activations.index = pd.RangeIndex(y.shape[0])
                else:
                    self.stacked_activations.index = pd.RangeIndex(len(y.index))

            for ir in range(len(self)):
                self._rules[ir]._fitted = True
                for attr in computed_attrs:
                    setattr(self._rules[ir], f"_{attr[:-1]}", computed_attrs[attr].iloc[ir])
                    if self._rules[ir].good:
                        self._rules[ir].check_thresholds(attr[:-1])
                    if not self._rules[ir].good:
                        to_drop.append(self._rules[ir])
                # To check attributes that are set along side others, like coverage
                if self._rules[ir].good:
                    self._rules[ir].check_thresholds()
        else:
            [r.fit(xs=xs, y=y, force_if_not_good=force_if_not_good, **kwargs) for r in self]
            to_drop = [r for r in self if not r.good]

        if len(to_drop) > 0:
            rules = [r for r in self.rules if r not in to_drop]
            self._rules = []
            if self._activation is not None:
                self._activation.clear()
            self.del_stacked_activations()
            for r in rules:
                self.append(r, update_activation=False)

            # Recompute activation now that bad rules have been droped
            self._activation = None
            self.compute_self_activation()
            if self.stack_activation:
                self.stacked_activations = None
                self.compute_stacked_activation()
        # If not bad rules were dropped and stacked fit was not used, still compute self.activation since it has not
        # been done  (needed to set self.coverage), but not stacked (useless)
        elif not self.__class__.STACKED_FIT:
            if self._activation is None or self._activation.length == 0 or xs is not None:
                self._activation = None
                self.compute_self_activation()
            if self.stack_activation and (self.stacked_activations is None or xs is not None):
                self.stacked_activations = None
                self.compute_stacked_activation()

        for attr in self.__class__.attributes_from_train_set[type_to_use]:
            launch_method(
                getattr(self, f"calc_{attr}"),
                y=y,
                xs=xs,
                **kwargs,
            )

        return to_drop

    # noinspection PyProtectedMember
    def eval(
        self,
        y: Union[np.ndarray, pd.Series],
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        keep_new_activations: bool = False,
        weights: Optional[Union[pd.Series, str]] = None,
        force_if_not_good: bool = False,
        **kwargs,
    ) -> List[Rule]:
        """Evaluate the ruleset on y and xs to produce the rules' attributes relevant to the test set. Will recompute
        the ruleset's stacked activation vector if using stakced fit.

        Parameters
        ----------
        y: Union[np.ndarray, pd.Series]
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
        keep_new_activations: bool
            If True, activation vectgor and stacked activation vectors will be kept as ruleset's attribute and replace
            the ones computed using the train set, if any. Will also change the activation vectors of the rules.
        weights: Optional[Union[pd.Series, str]]
            Optional weights for calc_critetion. If is a pd.Series, expects the index to be the rules names.
            If is a str, a pd.Series will be constructed by fetching each rules' attribute named after the given string
            (ex: it can be 'criterion')
        force_if_not_good: bool
            If the rule was seen as "bad", eval will not trigger unless this boolean is True (Default value = False)
        kwargs


        Returns
        -------
        List[Rule]
            List of rules that were excluded from the ruleset after fitting because they were 'bad'
        """
        if issubclass(self.rule_type, ClassificationRule):
            type_to_use = ClassificationRule
        elif issubclass(self.rule_type, RegressionRule):
            type_to_use = RegressionRule
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

        if "method" in kwargs:
            raise IndexError("Key 'method' can not be given to 'eval'")

        def launch_method(method, **kw):
            expected_args = list(inspect.signature(method).parameters)
            if "kwargs" not in expected_args:
                kw = {item: kw[item] for item in kw if item in expected_args}
            return method(**kw)

        if xs is not None and len(xs) == 0:
            logger.warning("Given xs is empty")
            return []

        if len(self) == 0:
            logger.debug("Ruleset is empty. Nothing to fit.")
            return []

        if not all([r._fitted for r in self]):
            if xs is None:
                raise ValueError(
                    "Not all rules of the ruleset were fitted. Please do so before calling ruleset.eval or provide xs."
                )
            else:
                keep_new_activations = True

        if self.__class__.STACKED_FIT:
            clean_activation = False

            # If test set is given, compute the activation used to evaluate test relative attributes.
            # Not the same as self.stacked_activations, computed from the train set.
            # Else, we evaluate on the train set, so we use self.stacked_actviation
            nones = None
            if xs is not None:
                if keep_new_activations:
                    clean_activation = not self.stack_activation
                    self.stack_activation = True
                    self.calc_activation(xs=xs)  # Will reset rules' activation vectors too
                    self.stack_activation = not clean_activation
                    stacked_activations = self.stacked_activations
                    activation = self._activation
                    # noinspection PyUnresolvedReferences
                    if "zscore" in self.rule_type.attributes_from_test_set:
                        nones = pd.Series({str(r.condition): r.nones for r in self})
                else:
                    stacked_activations = self.evaluate_stacked_activations(xs)
                    # noinspection PyUnresolvedReferences
                    if "zscore" in self.rule_type.attributes_from_test_set:
                        activation, nones = self.evaluate_self_activation(xs, return_nones=True)
                    else:
                        activation = self.evaluate_self_activation(xs)
            else:
                # If self.stacked_activations is None, compute it from the rules' activations. They must be available.
                if self.stacked_activations is None:
                    # Else, will compute the stacked activation from the current rules activations
                    clean_activation = not self.stack_activation
                    self.stack_activation = True
                    self.compute_stacked_activation()
                    self.compute_self_activation()
                    if self.stacked_activations is None:
                        raise ValueError("Rules activations must have been computed previously.")
                    self.stack_activation = not clean_activation
                stacked_activations = self.stacked_activations
                activation = self._activation
                # noinspection PyUnresolvedReferences
                if "zscore" in self.rule_type.attributes_from_test_set:
                    nones = pd.Series({str(r.condition): r.nones for r in self})

            if isinstance(y, np.ndarray):
                if not len(stacked_activations.index) == y.shape[0]:
                    raise IndexError("Stacked activation and y have different number of rows.")
            else:
                if not len(stacked_activations.index) == len(y.index):
                    raise IndexError("Stacked activation and y have different number of rows.")
                stacked_activations.index = y.index

            # noinspection PyUnresolvedReferences
            if "prediction" in self.rule_type.attributes_from_train_set:
                # Do NOT recompute prediction from test set : it does not make sense !
                predictions = pd.Series({str(r.condition): r.prediction for r in self})
            else:
                predictions = None
            computed_attrs = {}

            # noinspection PyUnresolvedReferences
            for attr in self.rule_type.attributes_from_test_set:
                if attr == "activation":
                    raise ValueError("'activation' can not be specified in 'attributes_from_train_set'")
                computed_attrs[f"{attr}s"] = launch_method(
                    getattr(self, f"calc_{attr}s"),
                    y=y,
                    xs=xs,
                    stacked_activations=stacked_activations,
                    activation=activation,
                    nones=nones,
                    predictions=predictions,
                    **computed_attrs,
                    **kwargs,
                )

            to_drop = []

            if clean_activation:
                self.del_stacked_activations()
            else:
                if isinstance(y, np.ndarray):
                    stacked_activations.index = pd.RangeIndex(y.shape[0])
                else:
                    stacked_activations.index = pd.RangeIndex(len(y.index))

            for ir in range(len(self)):
                self._rules[ir]._evaluated = True
                for attr in computed_attrs:
                    setattr(self._rules[ir], f"_{attr[:-1]}", computed_attrs[attr].iloc[ir])
                    if self._rules[ir].good:
                        self._rules[ir].check_thresholds(attr[:-1])
                    if not self._rules[ir].good:
                        to_drop.append(self._rules[ir])
                if self._rules[ir].good:
                    self._rules[ir].check_thresholds()
        else:
            [
                r.eval(
                    xs=xs, y=y, recompute_activation=keep_new_activations, force_if_not_good=force_if_not_good, **kwargs
                )
                for r in self
            ]
            to_drop = [r for r in self if not r.good]

        if len(to_drop) > 0:
            rules = [r for r in self.rules if r not in to_drop]
            self._rules = []
            if self._activation is not None:
                self._activation.clear()
            self.del_stacked_activations()
            for r in rules:
                self.append(r, update_activation=False)

            # Recompute activation now that bad rules have been droped
            if self._activation is None or self._activation.length == 0 or (xs is not None and keep_new_activations):
                self._activation = None
                self.compute_self_activation()
            if self.stack_activation and (
                    self.stacked_activations is None or (xs is not None and keep_new_activations)
            ):
                self.stacked_activations = None
                self.compute_stacked_activation()
        # If not bad rules were dropped and stacked fit was not used, still compute self.activation since it has not
        # been done
        elif not self.__class__.STACKED_FIT:
            if self._activation is None or self._activation.length == 0 or (xs is not None and keep_new_activations):
                self._activation = None
                self.compute_self_activation()
            if self.stack_activation and (self._activation is None or (xs is not None and keep_new_activations)):
                self.stacked_activations = None
                self.compute_stacked_activation()

        for attr in self.__class__.attributes_from_test_set[type_to_use]:
            launch_method(
                getattr(self, f"calc_{attr}"),
                y=y,
                xs=xs if not keep_new_activations else None,
                weights=weights,
                **kwargs,
            )

        return to_drop

    def append(self, rule: Rule, update_activation: bool = True):
        """Appends a new rule to self. The updates of activation vector and the stacked activation vectors can be
        blocked by specifying update_activation=False. Otherwise, will use self.stack_activation to determine if the
        updates should be done or not."""

        if not isinstance(rule, Rule):
            raise TypeError(f"RuleSet's append method expects a Rule object, got {type(rule)}")
        stack_activation = self.stack_activation
        if not update_activation:
            self._compute_activation = False
            self.stack_activation = False
        self.__iadd__(rule)
        self._compute_activation = True
        self.stack_activation = stack_activation

    def sort(self, criterion: str = None, reverse: bool = False):
        """Sorts the RuleSet.

        * If criterion is not speficied:
            Will sort the rules according to :
                1. The number of features they talk about
                2. For a same number of features (sorted in alphabetical order, or index if names are not available,
                    optionally reversed), the bmins and bmaxs of the rules
        * If criterion is specified, it must be an float or interger attribute of rule, condition or activation. Then
            sorts according to this criterion, optionally reversed.
        """
        if len(self) == 0:
            return

        if criterion is None or criterion == "":
            if not (hasattr(self[0].condition, "bmins") and hasattr(self[0].condition, "bmaxs")):
                return
            # The set of all the features the RuleSet talks about
            which = "index"
            if len(self.features_names) > 0:
                which = "name"
                fnames_or_indexes = list(set([str(r.features_names) for r in self]))
            else:
                fnames_or_indexes = list(set([str(r.features_indexes) for r in self]))
            dict_names = {}
            lmax = 1
            for f in fnames_or_indexes:
                l_ = len(ast.literal_eval(f))
                if l_ > lmax:
                    lmax = l_
                if l_ not in dict_names:
                    dict_names[l_] = []
                dict_names[l_].append(f)
            for l_ in dict_names:
                dict_names[l_].sort(reverse=reverse)
            fnames_or_indexes = []
            for l_ in range(1, lmax + 1):
                if l_ in dict_names:
                    fnames_or_indexes += dict_names[l_]

            rules_by_fnames = OrderedDict({f: [] for f in fnames_or_indexes})
            for rule in self:
                if which == "name":
                    v = str(rule.features_names)
                else:
                    v = str(rule.features_indexes)
                rules_by_fnames[v].append(rule)
            rules_by_fnames = {
                n: sorted(rules_by_fnames[n], key=lambda x: x.condition.bmins + x.condition.bmaxs)
                for n in rules_by_fnames
            }
            self._rules = []
            for n in rules_by_fnames:
                self._rules += rules_by_fnames[n]
        elif hasattr(self[0], criterion):
            self._rules = sorted(self, key=lambda x: getattr(x, criterion), reverse=reverse)
        else:
            raise ValueError(f"Can not sort RuleSet according to criterion {criterion}")
        if self.stack_activation:
            self.stacked_activations = self.stacked_activations[[str(r.condition) for r in self]]

    # noinspection PyProtectedMember
    def evaluate_self_activation(
        self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None, return_nones: bool = False
    ):
        """Computes the activation vector of self from its rules, using time-efficient Activation.multi_logical_or."""
        if len(self) == 0:
            act = Activation(np.array([]))
            if return_nones is True:
                return act, pd.Series(dtype=int)
            return act
        if xs is not None:
            activations = [r.evaluate_activation(xs) for r in self]
            activations_available = True
        else:
            activations = [r._activation for r in self]
            activations_available = all([r.activation_available for r in self])
        if activations_available:
            if len(self) == 1:
                act = Activation(activations[0].raw, optimize=activations[0].optimize, to_file=activations[0].to_file)
                if return_nones is True:
                    return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
                return act
            try:
                act = Activation.multi_logical_or(activations)
                if return_nones is True:
                    return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
                return act
            except (MemoryError, np.core._exceptions._ArrayMemoryError) as e:
                logger.warning(str(e))
                act = activations[0]
                for a in activations[1:]:
                    act = act | a
                if return_nones is True:
                    return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
                return act

    def compute_self_activation(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
        """Computes the activation vector of self from its rules. If xs is specified, uses it to remake the
        rules' activation vectors, but do not set them as the 'activation' attributes of the rules"""
        self._activation = self.evaluate_self_activation(xs=xs)

    def evaluate_stacked_activations(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
        if len(self) == 0:
            return pd.DataFrame(dtype=int)
        if xs is not None:
            return pd.DataFrame({str(r.condition): r.evaluate_activation(xs).raw for r in self})
        activations_available = all([r.activation_available for r in self])
        if activations_available:
            # noinspection PyProtectedMember
            return pd.DataFrame({str(r.condition): r.activation for r in self})

    def compute_stacked_activation(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
        """Computes the stacked activation vectors of self from its rules. If xs is specified, uses it to remake the
        rules' activation vectors, but do not set them as the 'activation' attributes of the rules"""
        self.stacked_activations = self.evaluate_stacked_activations(xs=xs)

    def del_activations(self):
        """Deletes the data, but not the relevent attributes, of the activation vector or each rules in self."""
        for r in self:
            r.del_activation()

    def del_activation(self):
        """Deletes the activation vector's data of self, but not the object itself, so any computed attribute remains
        available"""
        if hasattr(self, "_activation") and self._activation is not None:
            self._activation.delete()

    def del_stacked_activations(self):
        """Deletes stacked activation vectors of self. Set it to None."""
        if hasattr(self, "stacked_activations") and self.stacked_activations is not None:
            del self.stacked_activations
            self.stacked_activations = None

    def evaluate(self, xs: Union[pd.DataFrame, np.ndarray]) -> Activation:
        """Computes and returns the activation vector from an array of features.

        Parameters
        ----------
        xs: Union[pd.DataFrame, np.ndarray]
            The features on which the check whether the rule is activated or not. Must be a 2-D np.ndarray
            or pd.DataFrame.

        Returns
        -------
        Activation
        """
        if len(self) == 0:
            raise ValueError("Can not use evaluate : The ruleset is empty!")
        activations = [rule.evaluate_activation(xs) for rule in self.rules]
        return Activation.multi_logical_or(activations)

    def calc_activation(self, xs: Union[np.ndarray, pd.DataFrame]):
        """Uses input xs features data to compute the activation vector of all rules in self, and updates self's
        activation and stacked activation if self.stack_activation is True

        Parameters
        ----------
        xs: Union[np.ndarray, pd.DataFrame]
        """
        if len(self) == 0:
            raise ValueError("Can not use calc_activation : The ruleset is empty!")

        if len(xs) == 0:
            logger.warning("Given xs is empty")
            return
        [rule.calc_activation(xs) for rule in self.rules]

        self._activation = None
        self.compute_self_activation()
        if self.stack_activation:
            self.stacked_activations = None
            self.compute_stacked_activation()

    def get_features_count(self) -> List[Tuple[Any, int]]:
        """
        Get a counter of all different features in the ruleset. If names are not available, will use indexes.

        Returns:
        --------
        count : List[Tuple[Any, int]]
            Counter of all different features in the ruleset
        """
        if len(self) == 0:
            return []
        if len(self.features_names) > 0:
            var_in = list(itertools.chain(*[rule.features_names for rule in self]))
        else:
            var_in = list(itertools.chain(*[rule.feautres_indexes for rule in self]))
        count = Counter(var_in)

        count = count.most_common()
        return count

    def load(self, path, **kwargs):
        if hasattr(path, "read"):
            rules = path.read(**kwargs)
        else:
            rules = pd.read_csv(path, **kwargs)
        if "ruleset coverage" in rules.iloc[:, 0].values:
            self._coverage = rules[rules.iloc[:, 0] == "ruleset coverage"].iloc[0, 1]
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset coverage"].index)
        if "ruleset criterion" in rules.iloc[:, 0].values:
            self.criterion = rules[rules.iloc[:, 0] == "ruleset criterion"].iloc[0, 1]
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset criterion"].index)
        if "ruleset train set size" in rules.iloc[:, 0].values:
            self.train_set_size = rules[rules.iloc[:, 0] == "ruleset train set size"].iloc[0, 1]
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset train set size"].index)
            if isinstance(self.train_set_size, str):
                self.train_set_size = int(self.train_set_size)
        if "ruleset test set size" in rules.iloc[:, 0].values:
            self.test_set_size = rules[rules.iloc[:, 0] == "ruleset test set size"].iloc[0, 1]
            if isinstance(self.test_set_size, str):
                self.test_set_size = int(self.test_set_size)
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset test set size"].index)
        if rules.empty:
            self._rules = []
        else:
            self._rules = [self.series_to_rule(rules.loc[r]) for r in rules.index]
        for r in self:
            if r._train_set_size is None:
                r._train_set_size = self.train_set_size
            elif isinstance(r._train_set_size, str):
                r._train_set_size = int(r._train_set_size)
            if r._test_set_size is None:
                r._test_set_size = self.test_set_size
            elif isinstance(r._test_set_size, str):
                r._test_set_size = int(r._test_set_size)

        if len(self._rules) > 0:
            self.rule_type = type(self._rules[0])
        self.compute_self_activation()
        if self.stack_activation:
            self.compute_stacked_activation()
        self.features_names = list(set(traverse([rule.features_names for rule in self])))

    def to_df(self, custom_rules_to_series: Optional[Callable] = None, ruleset_attributes: bool = True) -> pd.DataFrame:
        if len(self) == 0:
            return pd.DataFrame()
        idx = copy(self._rule_type.index)

        if custom_rules_to_series is None:
            dfs = [
                self.rule_to_series(
                    (i, r),
                    index=idx,
                )
                for i, r in enumerate(self.rules)
            ]
        else:
            dfs = custom_rules_to_series(index=idx)

        if len(dfs) > 0:
            df = pd.concat(dfs, axis=1).T
        else:
            df = pd.DataFrame(columns=idx)

        for i in range(len(df.index)):
            # noinspection PyProtectedMember
            self._rules[i]._name = df.index[i]

        if not ruleset_attributes:
            return df

        s_cov = pd.DataFrame(
            columns=df.columns,
            data=[[self.ruleset_coverage if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset coverage"],
        )
        s_crit = pd.DataFrame(
            columns=df.columns,
            data=[[self.criterion if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset criterion"],
        )
        s_train = pd.DataFrame(
            columns=df.columns,
            data=[[self.train_set_size if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset train set size"],
        )
        s_test = pd.DataFrame(
            columns=df.columns,
            data=[[self.train_set_size if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset test set size"],
        )
        return pd.concat([df, s_cov, s_crit, s_train, s_test])

    def save(self, path):
        df = self.to_df()

        if hasattr(path, "write"):
            path.write(df)
        else:
            df.to_csv(path)

    # noinspection PyProtectedMember
    @staticmethod
    def rule_to_series(irule: Tuple[int, Rule], index: list) -> pd.Series:
        i = irule[0]
        rule = irule[1]
        if rule._name is not None:
            name = rule._name
        elif hasattr(rule, "sign"):
            name = f"R_{i}({len(rule)}){rule.sign}"
        else:
            name = f"R_{i}({len(rule)})"
        sr = pd.Series(data=[str(getattr(rule, ind)) for ind in index], name=name, index=index, dtype=str)
        return sr

    @staticmethod
    def series_to_rule(srule: pd.Series) -> Rule:

        for ind in srule.index:
            if ind == "Unnamed: 0":
                continue
            if ind not in Rule.index and ind not in RegressionRule.index and ind not in ClassificationRule.index:
                raise IndexError(f"Invalid rule attribute '{ind}'")

        if "std" in srule.index:
            rule = RegressionRule()
        elif "criterion" in srule.index:
            rule = ClassificationRule()
        else:
            rule = Rule()
        rule_idx = copy(rule.__class__.rule_index)
        condition_index = {c: None for c in rule.__class__.condition_index}
        rule._name = srule.index[0]

        for rule_ind in srule.index:
            str_value = str(srule[rule_ind])
            if rule_ind in condition_index:
                condition_index[rule_ind] = ast.literal_eval(str_value)
            elif rule_ind in rule_idx:
                if rule_ind == "activation":
                    setattr(rule, f"_{rule_ind}", Activation(str_value))
                elif str_value == "nan":
                    setattr(rule, f"_{rule_ind}", np.nan)
                elif rule_ind == "sign":
                    setattr(rule, f"_{rule_ind}", str_value)
                else:
                    if rule_ind == "prediction":  # Prediction can be a str in case of classification
                        try:
                            setattr(rule, f"_{rule_ind}", ast.literal_eval(str_value))
                        except ValueError:
                            setattr(rule, f"_{rule_ind}", str_value)
                    else:
                        setattr(rule, f"_{rule_ind}", ast.literal_eval(str_value))
            else:
                continue

        rule._condition = HyperrectangleCondition(**condition_index)
        if hasattr(rule, rule.__class__.fitted_if_has) and getattr(rule, rule.__class__.fitted_if_has) is not None:
            rule._fitted = True
        return rule

    def calc_predictions(
        self, y: [np.ndarray, pd.Series], stacked_activations: Optional[pd.DataFrame] = None
    ) -> Union[pd.Series, pd.DataFrame]:
        """
        Will compute the prediction of each rule in the ruleset. Does NOT set anything in either the rules nor the
        ruleset.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
            The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D
            np.ndarray or pd.Series.
        stacked_activations: Optional[pd.DataFrame)
            If specified, uses this activation instead of self.activation

        Returns
        -------
        Union[pd.Series, pd.DataFrame]
            Regression : A pd.Series with the rules as index and the values being the predictions\n
            Classification : A pd.DataFrame with rules as index and classes as columns, and class probabilities as
            values
        """
        if stacked_activations is None:
            stacked_activations = self.stacked_activations
            if stacked_activations is None:
                raise ValueError("Stacked activation vectors are needed")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)
            if len(self) == 0:
                if self.rule_type is None:
                    return pd.Series(dtype=int)
                elif issubclass(self.rule_type, ClassificationRule):
                    return pd.DataFrame(dtype=int)
                elif issubclass(self.rule_type, RegressionRule):
                    return pd.Series(dtype=int)
                else:
                    raise TypeError(f"Unexpected rule type '{self.rule_type}'")

            if issubclass(self.rule_type, ClassificationRule):
                class_probabilities = functions.class_probabilities(stacked_activations, y)
                maxs = class_probabilities.max()
                return class_probabilities[class_probabilities == maxs].apply(
                    lambda x: x.dropna().sort_index().index[0]
                )
            elif issubclass(self.rule_type, RegressionRule):
                return functions.conditional_mean(stacked_activations, y)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_stds(self, y: [np.ndarray, pd.Series], stacked_activations: Optional[pd.DataFrame] = None) -> pd.Series:
        """
        Will compute the std of each rule in the ruleset. Does NOT set anything in either the rules nor the
        ruleset.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
          The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D np.ndarray
          or pd.Series.
        stacked_activations: Optional[pd.DataFrame]
            If specified, uses this activation instead of self.activation

        Returns
        -------
        pd.Series
            A pd.Series with the rules as index and the values being the std
        """
        if not issubclass(self.rule_type, RegressionRule):
            raise TypeError(f"'std' can not be computed for '{self.rule_type}'")

        if stacked_activations is None:
            stacked_activations = self.stacked_activations
            if stacked_activations is None:
                raise ValueError("Stacked activation vectors are needed")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)
            if len(self) == 0:
                if self._rule_type is None:
                    return pd.Series(dtype=int)
                elif issubclass(self.rule_type, RegressionRule):
                    return pd.Series(dtype=int)
                else:
                    raise TypeError(f"Unexpected rule type '{self.rule_type}'")

            if issubclass(self.rule_type, RegressionRule):
                return functions.conditional_std(stacked_activations, y)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_signs(self, predictions: Optional[pd.Series] = None) -> pd.Series:
        """
        Computes the sign of each rule in the ruleset.

        Rules predictions must have been set or be given to the method.
        """
        if issubclass(self.rule_type, RegressionRule):
            if predictions is None:
                predictions = pd.Series({str(r.condition): r.prediction for r in self})
            return predictions.apply(lambda x: None if x is None else ("-" if x < 0 else "+"))
        else:
            raise TypeError(f"'sign' can not be computed for '{self.rule_type}'")

    def calc_zscores(
        self,
        y: np.ndarray,
        predictions: Optional[pd.Series] = None,
        nones: Optional[pd.Series] = None,
        horizon: int = 1,
    ) -> pd.Series:
        if not issubclass(self.rule_type, RegressionRule):
            raise TypeError(f"'sign' can not be computed for '{self.rule_type}'")
        if nones is None:
            nones = pd.Series({str(r.condition): r.nones for r in self})

        if predictions is None:
            predictions = self.calc_predictions(y=y)
            """unique prediction of each rules in a pd.Series"""

        return calc_zscore_external(prediction=predictions, nones=nones, y=y, horizon=horizon)

    def calc_criterions(
        self,
        y: Union[np.ndarray, pd.Series],
        predictions: Optional[pd.Series] = None,
        stacked_activations: Optional[pd.DataFrame] = None,
        **kwargs,
    ) -> pd.Series:
        """
        Will compute the criterion of each rule in the ruleset. Does NOT set anything in either the rules nor the
        ruleset.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
            The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D
            np.ndarray or pd.Series.
        predictions: Optional[pd.Series]
            Prediction of each rules. If None, will call self.calc_predictions(y)
        stacked_activations: Optional[pd.DataFrame]
            If specified, uses those activations instead of self.stacked_activations
        kwargs

        Returns
        -------
        pd.Series
            Criterion values a set of rules (pd.Series)

        """
        if stacked_activations is None:
            stacked_activations = self.stacked_activations
            if stacked_activations is None:
                raise ValueError("Stacked activation vectors are needed")

        if predictions is None:
            predictions = self.calc_predictions(y=y)
            """unique prediction of each rules in a pd.Series"""

        if self._rule_type is None:
            return pd.Series(dtype=int)
        if issubclass(self.rule_type, ClassificationRule):
            return functions.calc_classification_criterion(stacked_activations, predictions, y, **kwargs)
        elif issubclass(self.rule_type, RegressionRule):
            return functions.calc_regression_criterion(
                stacked_activations.replace(0, np.nan) * predictions, y, **kwargs
            )
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_train_set_sizes(self, y: Union[np.ndarray, pd.Series]) -> pd.Series:
        if isinstance(y, (pd.Series, pd.DataFrame)):
            s = len(y.index)
        else:
            s = len(y)
        return pd.Series({str(r.condition): s for r in self})

    def calc_test_set_sizes(self, y: Union[np.ndarray, pd.Series]) -> pd.Series:
        if isinstance(y, (pd.Series, pd.DataFrame)):
            s = len(y.index)
        else:
            s = len(y)
        return pd.Series({str(r.condition): s for r in self})

    def predict(
        self,
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        weights: Optional[Union[pd.Series, str]] = None,
    ) -> pd.Series:
        """Computes the prediction vector of an entier ruleset from its rules predictions and its activation vector.
        Predictions of rules must have been computed beforehand if 'xs' is not specified.

        Parameters
        ----------
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
            If specified, uses those features to compute the ruleset's activation. Does not change the activation
            vectors nor the predictions of the ruleset's rules nor its own activation and stacked activations.
        weights: Optional[Union[pd.Series, str]]
            Optional weights. If is a pd.Series, expects the index to be the rules names. If is a str, a pd.Series
            will be constructed by fetching each rules' attribute named after the given string
            (ex: it can be 'criterion')

        Returns
        -------
        pd.Series
            The prediction vector of the ruleset. Index are observations numbers, values are the predicted values when
            the ruleset predicts, else NaN.
        """
        if len(self) == 0:
            return pd.Series(dtype=int)
        if self._rule_type is None:
            return pd.Series(dtype=int)

        stacked = self.__class__.STACKED_FIT and self.stacked_activations is not None

        stacked_activations = None
        if xs is not None and stacked:
            stacked_activations = self.evaluate_stacked_activations(xs=xs)
        if stacked:
            return self._calc_prediction_stacked(weights=weights, stacked_activations=stacked_activations)
        else:
            return self._calc_prediction_unstacked(weights=weights, xs=xs)

    def _calc_prediction_stacked(
        self, weights: Optional[Union[pd.Series, str]] = None, stacked_activations: Optional[pd.DataFrame] = None
    ) -> Union[None, pd.Series]:
        predictions = pd.Series({str(r.condition): getattr(r, "prediction") for r in self})

        if stacked_activations is None:
            stacked_activations = self.stacked_activations
        if stacked_activations is None:
            return None

        if isinstance(self[0].prediction, str):
            prediction_vectors = stacked_activations.replace(0, np.nan).replace(1.0, "") + predictions
        else:
            prediction_vectors = stacked_activations.replace(0, np.nan) * predictions
        if prediction_vectors.empty:
            return prediction_vectors
        if weights is not None:
            if isinstance(weights, str):
                weights = pd.Series({str(r.condition): getattr(r, weights) for r in self})
            weights = stacked_activations.replace(0, np.nan) * weights
            if issubclass(self.rule_type, RegressionRule):
                return calc_ruleset_prediction_weighted_regressor_stacked(
                    prediction_vectors=prediction_vectors, weights=weights
                )
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_weighted_classificator_stacked(
                    prediction_vectors=prediction_vectors, weights=weights
                )
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")
        else:
            if issubclass(self.rule_type, RegressionRule):
                return calc_ruleset_prediction_equally_weighted_regressor_stacked(prediction_vectors=prediction_vectors)
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_equally_weighted_classificator_stacked(
                    prediction_vectors=prediction_vectors
                )
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def _calc_prediction_unstacked(
        self,
        weights: Optional[Union[pd.Series, str]] = None,
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
    ) -> pd.Series:

        if weights is not None:
            if isinstance(weights, str):
                weights = pd.Series({str(r.condition): getattr(r, weights) for r in self})
            if issubclass(self.rule_type, RegressionRule):

                return calc_ruleset_prediction_weighted_regressor_unstacked(rules=self.rules, weights=weights, xs=xs)
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_weighted_classificator_unstacked(
                    rules=self.rules, weights=weights, xs=xs
                )
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")
        else:
            if issubclass(self.rule_type, RegressionRule):
                return calc_ruleset_prediction_equally_weighted_regressor_unstacked(rules=self.rules, xs=xs)
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_equally_weighted_classificator_unstacked(rules=self.rules, xs=xs)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_criterion(
        self,
        y: Union[np.ndarray, pd.Series],
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        weights: Optional[Union[pd.Series, str]] = None,
        predictions: Optional[pd.Series] = None,
        **kwargs,
    ):
        """Computes the criterion of an entier ruleset. Predictions of rules must have been computed beforehand.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Sets self.criterion so does not return anything

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
            The targets on which to evaluate the ruleset criterions. Must be a 1-D np.ndarray or pd.Series.
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
            If specified, uses those features to compute the ruleset's activation. Does not change the activation
            vectors nor the predictions of its rules. If specified and if self.stack_activation is True, will overwrite
            the ruleset's stacked activation vectors. Else, only ruleset's activation is overwritten.
        weights: Optional[Union[pd.Series, str]]
            Optional weights. If is a pd.Series, expected the index to be the rules names. If is a str, a pd.Series
            will be constructed by fetching each rules' attribute named after the given string
            (ex: it can be 'criterion').
            Useless if predictions is specified.
        predictions: Optional[pd.Series]
            The vector of predictions of the ruleset. If not specified, is computed using self.calc_prediction
        kwargs
        """
        if len(self) == 0:
            return np.nan
        if self._rule_type is None:
            return np.nan
        if predictions is None:
            predictions = self.predict(xs=xs, weights=weights)
        if len(predictions) == 0:
            self.criterion = np.nan
            return
        if issubclass(self.rule_type, ClassificationRule):
            if xs is not None:
                activation = self.evaluate_self_activation(xs=xs).raw
            else:
                if self._activation is None or self._activation.length == 0:
                    self.compute_self_activation()
                activation = self.activation
            self.criterion = functions.calc_classification_criterion(activation, predictions, y, **kwargs)
        elif issubclass(self.rule_type, RegressionRule):
            # noinspection PyTypeChecker
            self.criterion = functions.calc_regression_criterion(predictions, y, **kwargs)
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_test_set_size(self, y: Union[np.ndarray, pd.Series]):
        if isinstance(y, (pd.Series, pd.DataFrame)):
            self.test_set_size = len(y.index)
        else:
            self.test_set_size = len(y)

    def calc_train_set_size(self, y: Union[np.ndarray, pd.Series]):
        if isinstance(y, (pd.Series, pd.DataFrame)):
            self.train_set_size = len(y.index)
        else:
            self.train_set_size = len(y)


def traverse(o, tree_types=(list, tuple, RuleSet)):
    """Yields each elementary elements from nested list or tuple

    Example
    -------
    >>> list(traverse([[[1, 2], 3,], [4, 5]]))
    [1, 2, 3, 4, 5]
    """
    if isinstance(o, tree_types):
        for value in o:
            for subvalue in traverse(value, tree_types):
                yield subvalue
    else:
        yield o

Functions

def traverse(o, tree_types=(<class 'list'>, <class 'tuple'>, <class 'ruleskit.ruleset.RuleSet'>))

Yields each elementary elements from nested list or tuple

Example

>>> list(traverse([[[1, 2], 3,], [4, 5]]))
[1, 2, 3, 4, 5]
Expand source code
def traverse(o, tree_types=(list, tuple, RuleSet)):
    """Yields each elementary elements from nested list or tuple

    Example
    -------
    >>> list(traverse([[[1, 2], 3,], [4, 5]]))
    [1, 2, 3, 4, 5]
    """
    if isinstance(o, tree_types):
        for value in o:
            for subvalue in traverse(value, tree_types):
                yield subvalue
    else:
        yield o

Classes

class RuleSet (rules_list: Optional[List[Rule]] = None, compute_activation: bool = True, stack_activation: bool = False)

A set of rules

Parameters

rules_list : Union[List[Rule], None]
The list of rules to start with. Can be None, since a RuleSet can be filled after its creation.
compute_activation : bool
The activation of the RuleSet is the logical OR of the activation of all its rules. It is only computed if compute_activation is True. (default value = True)
stack_activation : bool
If True, the RuleSet will keep in memory 2-D np.ndarray containing the activations of all its rules. This can take a lot of memory, but can save time if you apply numpy methods on this stacked vector instead of on each rule separately. (default value = False)
Expand source code
class RuleSet(ABC):
    """A set of rules"""

    NLINES = 5  # half how many rules to show in str(self)
    CHECK_DUPLICATED = False
    STACKED_FIT = False
    all_features_indexes = {}
    attributes_from_train_set = {ClassificationRule: ["train_set_size"], RegressionRule: ["train_set_size"]}
    attributes_from_test_set = {
        ClassificationRule: ["criterion", "test_set_size"],
        RegressionRule: ["criterion", "test_set_size"],
    }

    @staticmethod
    def check_duplicated_rules(rules, name_or_index: str = "index"):
        if name_or_index == "index":
            str_rules = [str(r.features_indexes) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in rules]
        else:
            str_rules = [str(r.features_names) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in rules]
        if len(set(str_rules)) < len(str_rules):
            duplicated = {}
            for r in str_rules:
                if r not in duplicated:
                    duplicated[r] = 0
                duplicated[r] += 1
            duplicated = [
                f"{r}: {duplicated[r]} (positions {[i for i, x in enumerate(str_rules) if x == r]})\n"
                f"   underlying rules : {[str(rules[i]) for i in [i for i, x in enumerate(str_rules) if x == r]]}"
                for r in duplicated
                if duplicated[r] > 1
            ]
            s = "\n -".join(duplicated)
            raise ValueError(f"There are {len(duplicated)} duplicated rules in your ruleset !\n {s}")

    def __init__(
        self,
        rules_list: Union[List[Rule], None] = None,
        compute_activation: bool = True,
        stack_activation: bool = False,
    ):
        """

        Parameters
        ----------
        rules_list: Union[List[Rule], None]
            The list of rules to start with. Can be None, since a RuleSet can be filled after its creation.
        compute_activation: bool
            The activation of the RuleSet is the logical OR of the activation of all its rules. It is only computed
            if compute_activation is True. (default value = True)
        stack_activation: bool
            If True, the RuleSet will keep in memory 2-D np.ndarray containing the activations of all its rules. This
            can take a lot of memory, but can save time if you apply numpy methods on this stacked vector instead of on
            each rule separately. (default value = False)
        """
        self._rules: List[Rule] = []
        self.features_names: List[str] = []
        self.features_indexes: List[int] = []
        self._activation: Optional[Activation] = None
        self._coverage: Optional[float] = None  # in case Activation is not available
        self.criterion: Optional[float] = None
        self.train_set_size: Optional[int] = None
        self.test_set_size: Optional[int] = None
        """Set by self.eval"""
        self.stacked_activations: Optional[pd.DataFrame] = None
        self.stack_activation: bool = stack_activation
        self._rule_type = None
        self._compute_activation = True  # Should NOT be equal to given compute_activation argument, but always True.
        if rules_list is not None:
            names_available = all([hasattr(r.condition, "features_names") for r in self])
            for rule in rules_list:
                if not isinstance(rule, Rule) and rule is not None:
                    raise TypeError(f"Some rules in given iterable were not of type 'Rule' but of type {type(rule)}")
                if rule is not None:
                    self.append(rule, update_activation=False)
            if compute_activation:
                self.compute_self_activation()
            if self.stack_activation:
                self.compute_stacked_activation()
            if names_available:
                self.features_names = list(set(itertools.chain(*[rule.features_names for rule in self])))
            self.set_features_indexes()
        if self.__class__.CHECK_DUPLICATED:
            self.check_duplicated_rules(self.rules, name_or_index="name" if len(self.features_names) > 0 else "index")

    # noinspection PyProtectedMember,PyTypeChecker
    def __iadd__(self, other: Union["RuleSet", Rule]):
        """Appends a rule or each rules of another RuleSet to self and updates activation vector and stacked activations
        if needed. Also updates features_indexes, and features_names if possible."""
        if isinstance(other, Rule):
            if self._rule_type is None:
                self.rule_type = type(other)
            else:
                if not isinstance(other, self.rule_type):
                    raise TypeError(
                        f"Ruleset previously had rules of type {self.rule_type},"
                        f" so can not add rule of type {type(other)}"
                    )
            self._rules.append(other)
        else:
            if self._rule_type is not None and other._rule_type is not None and self.rule_type != other.rule_type:
                raise TypeError(
                    f"Ruleset previously had rules of type {self.rule_type},"
                    f" so can not add rules of type {other.rule_type}"
                )
            if self._rule_type is None and other._rule_type is not None:
                self.rule_type = other.rule_type
            self._rules += other._rules

        self.features_indexes = list(set(self.features_indexes + other.features_indexes))
        if hasattr(other, "features_names"):
            self.features_names = list(set(self.features_names + other.features_names))
        if self._compute_activation:
            self._update_activation(other)
        if self.stack_activation:
            self._update_stacked_activation(other)
        return self

    def __add__(self, other: Union["RuleSet", Rule]):
        """Returns the RuleSet resulting in appendind a rule or each rules of another RuleSet to self."""
        stack_activation = self.stack_activation
        if isinstance(other, Rule):
            rules = self.rules + [other]
        else:
            stack_activation &= other.stack_activation
            rules = list(set(self.rules + other.rules))
        return self.__class__(rules, compute_activation=self._compute_activation, stack_activation=stack_activation)

    def __getattr__(self, item):
        """If item is not found in self, try to fetch it from its activation."""
        if item == "_activation":
            raise AttributeError(f"'RuleSet' object has no attribute '{item}'")

        if self._activation is not None and hasattr(self._activation, item):
            return getattr(self._activation, item)
        raise AttributeError(f"'RuleSet' object has no attribute '{item}'")

    def __len__(self):
        """The length of a RuleSet its the number of rules stored in it."""
        return len(self.rules)

    def __eq__(self, other: "RuleSet"):
        return set(self.rules) == set(other.rules)

    def __iter__(self):
        if hasattr(self, "_rules"):
            return self.rules.__iter__()
        else:
            return [].__iter__()

    def __getitem__(self, key):
        if isinstance(key, slice):
            indices = range(*key.indices(len(self.rules)))
            return self.__class__([self.rules[i] for i in indices])
        return self.rules.__getitem__(key)

    def __str__(self):
        if len(self) < 2 * self.__class__.NLINES:
            return "\n".join([str(self[i]) for i in range(len(self))])
        else:
            return "\n".join(
                [str(self[i]) for i in range(self.__class__.NLINES)]
                + ["..."]
                + [str(self[i]) for i in range(len(self) - self.__class__.NLINES, len(self))]
            )

    def __hash__(self) -> hash:
        return hash(frozenset(self.to_hash))

    # noinspection PyProtectedMember
    def __contains__(self, other: Rule) -> bool:
        """A RuleSet contains another Rule if the two rule's conditions and predictions are the same"""
        name_or_index = "name" if len(self.features_names) > 0 else "index"

        if name_or_index == "index":
            str_rules = [str(r.features_indexes) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in self]
            str_rule = str(other.features_indexes) + str(other.bmins) + str(other.bmaxs) + str(other.prediction)
        else:
            str_rules = [str(r.features_names) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in self]
            str_rule = str(other.features_names) + str(other.bmins) + str(other.bmaxs) + str(other.prediction)

        return str_rule in str_rules

    @property
    def rule_type(self) -> type:
        if self._rule_type is None:
            raise ValueError("Ruleset's rule type is not set !")
        return self._rule_type

    @rule_type.setter
    def rule_type(self, rule_type: type):
        if not issubclass(rule_type, (ClassificationRule, RegressionRule, Rule)):
            raise TypeError(
                "Ruleset's rule type must be a subclass of ruleskit.RegressionRule or ruleskit.ClassificationRule, not"
                f" {rule_type}"
            )
        self._rule_type = rule_type

    @property
    def rules(self) -> List[Rule]:
        return self._rules

    @rules.setter
    def rules(self, rules: Union[List[Rule], None]):
        ruleset = RuleSet(rules, stack_activation=self.stack_activation)
        self._rules = ruleset._rules
        self.features_names = ruleset.features_names
        self.features_indexes = ruleset.features_indexes
        self.stacked_activations = ruleset.stacked_activations
        self._activation = ruleset._activation

    @property
    def to_hash(self) -> Tuple[str]:
        if len(self) == 0:
            return "rs",
        to_hash = ("rs",)
        for r in self:
            rule_hash = r.to_hash[1:]
            to_hash += rule_hash
        return to_hash

    @property
    def activation_available(self) -> bool:
        """Returns True if the RuleSet has an activation vector, and if this Activation's object data is available."""
        if self._activation is None or self._activation.length == 0:
            return False
        if self._activation.data_format == "file":
            return self._activation.data.is_file()
        else:
            return self._activation.data is not None

    @property
    def stacked_activations_available(self) -> bool:
        """Returns True is the RuleSet has its rules' stacked activations."""
        if self.stack_activation is None or self._activation.length == 0:
            return False
        return True

    @property
    def activation(self) -> Union[None, np.ndarray]:
        """Returns the Activation vector's data in a form of a 1-D np.ndarray, or None if not available.

        Returns:
        --------
        Union[None, np.ndarray]
            of the form [0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, ...]
        """
        if self._activation:
            return self._activation.raw
        return None

    @property
    def ruleset_coverage(self) -> float:
        """Coverage is the fraction of points equal to 1 in the activation vector"""
        if not self.activation_available:
            return self._coverage
        else:
            return self._activation.coverage

    # noinspection PyProtectedMember,PyTypeChecker
    def _update_activation(self, other: Union[Rule, "RuleSet"]):
        """Updates the activation vector of the RuleSet with the activation vector of a new Rule or RuleSet."""
        if other.activation_available:
            if self._activation is None or self._activation.length == 0:
                self._activation = Activation(other.activation, to_file=Rule.LOCAL_ACTIVATION)
            else:
                self._activation = self._activation | other._activation

    # noinspection PyProtectedMember,PyTypeChecker
    def _update_stacked_activation(self, other: Union[Rule, "RuleSet"]):
        """Updates the stacked activation vectors of the RuleSet with the activation vector of a new Rule or
        the stacked activation vectors of another RuleSet."""
        if other.activation_available:

            if self.stacked_activations is None:
                if isinstance(other, Rule):
                    self.stacked_activations = pd.DataFrame(
                        data=np.array(other.activation).T, columns=[str(other.condition)]
                    )
                else:
                    self.stacked_activations = other.stacked_activations
            else:
                if isinstance(other, Rule):
                    self.stacked_activations[str(other.condition)] = other.activation
                else:
                    self.stacked_activations = pd.concat([self.stacked_activations, other.stacked_activations], axis=1)

    def set_features_indexes(self):
        if len(self.__class__.all_features_indexes) > 0:
            self.features_indexes = [self.__class__.all_features_indexes[f] for f in self.features_names]
            for r in self._rules:
                # noinspection PyProtectedMember
                r._condition._features_indexes = [self.__class__.all_features_indexes[f] for f in r.features_names]
        else:
            list(set(itertools.chain(*[rule.features_indexes for rule in self])))

    # noinspection PyProtectedMember
    def fit(
        self,
        y: Union[np.ndarray, pd.Series],
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        force_if_not_good: bool = False,
        **kwargs,
    ) -> List[Rule]:
        """Fits the ruleset on y and xs to produce the rules' activation vectors and attributes relevant to train set.

        Parameters
        ----------
        y: Union[np.ndarray, pd.Series]
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
        force_if_not_good: bool
        kwargs

        Returns
        -------
        List[Rule]
            List of rules that were excluded from the ruleset after fitting because they were 'bad'
        """
        if issubclass(self.rule_type, ClassificationRule):
            type_to_use = ClassificationRule
        elif issubclass(self.rule_type, RegressionRule):
            type_to_use = RegressionRule
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

        if "method" in kwargs:
            raise IndexError("Key 'method' can not be given to 'fit'")

        def launch_method(method, **kw):
            expected_args = list(inspect.signature(method).parameters)
            if "kwargs" not in expected_args:
                kw = {item: kw[item] for item in kw if item in expected_args}
            return method(**kw)

        if xs is not None and len(xs) == 0:
            logger.warning("Given xs is empty")
            return []

        if len(self) == 0:
            logger.debug("Ruleset is empty. Nothing to fit.")
            return []

        if all([r._fitted for r in self]) and xs is None:
            return []

        if self.__class__.STACKED_FIT:

            clean_activation = False
            # Activation must always be computed from train set
            if xs is not None:
                clean_activation = not self.stack_activation
                self.stack_activation = True
                self.calc_activation(xs)
                self.stack_activation = not clean_activation
            elif self.stacked_activations is None:
                clean_activation = not self.stack_activation
                self.stack_activation = True
                self.compute_stacked_activation()
                self.stack_activation = not clean_activation

            if isinstance(y, np.ndarray):
                # noinspection PyUnresolvedReferences
                if not len(self.stacked_activations.index) == y.shape[0]:
                    raise IndexError(
                        "Stacked activation and y have different number of rows. Use pd.Series for y to"
                        " reindex stacked activations automatically."
                    )
            else:
                self.stacked_activations.index = y.index

            computed_attrs = {}
            # noinspection PyUnresolvedReferences
            for attr in self.rule_type.attributes_from_train_set:
                if attr == "activation":
                    raise ValueError("'activation' can not be specified in 'attributes_from_train_set'")
                computed_attrs[f"{attr}s"] = launch_method(
                    getattr(self, f"calc_{attr}s"), y=y, xs=xs, **computed_attrs, **kwargs
                )
            to_drop = []

            if clean_activation:
                self.del_stacked_activations()
            else:
                if isinstance(y, np.ndarray):
                    self.stacked_activations.index = pd.RangeIndex(y.shape[0])
                else:
                    self.stacked_activations.index = pd.RangeIndex(len(y.index))

            for ir in range(len(self)):
                self._rules[ir]._fitted = True
                for attr in computed_attrs:
                    setattr(self._rules[ir], f"_{attr[:-1]}", computed_attrs[attr].iloc[ir])
                    if self._rules[ir].good:
                        self._rules[ir].check_thresholds(attr[:-1])
                    if not self._rules[ir].good:
                        to_drop.append(self._rules[ir])
                # To check attributes that are set along side others, like coverage
                if self._rules[ir].good:
                    self._rules[ir].check_thresholds()
        else:
            [r.fit(xs=xs, y=y, force_if_not_good=force_if_not_good, **kwargs) for r in self]
            to_drop = [r for r in self if not r.good]

        if len(to_drop) > 0:
            rules = [r for r in self.rules if r not in to_drop]
            self._rules = []
            if self._activation is not None:
                self._activation.clear()
            self.del_stacked_activations()
            for r in rules:
                self.append(r, update_activation=False)

            # Recompute activation now that bad rules have been droped
            self._activation = None
            self.compute_self_activation()
            if self.stack_activation:
                self.stacked_activations = None
                self.compute_stacked_activation()
        # If not bad rules were dropped and stacked fit was not used, still compute self.activation since it has not
        # been done  (needed to set self.coverage), but not stacked (useless)
        elif not self.__class__.STACKED_FIT:
            if self._activation is None or self._activation.length == 0 or xs is not None:
                self._activation = None
                self.compute_self_activation()
            if self.stack_activation and (self.stacked_activations is None or xs is not None):
                self.stacked_activations = None
                self.compute_stacked_activation()

        for attr in self.__class__.attributes_from_train_set[type_to_use]:
            launch_method(
                getattr(self, f"calc_{attr}"),
                y=y,
                xs=xs,
                **kwargs,
            )

        return to_drop

    # noinspection PyProtectedMember
    def eval(
        self,
        y: Union[np.ndarray, pd.Series],
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        keep_new_activations: bool = False,
        weights: Optional[Union[pd.Series, str]] = None,
        force_if_not_good: bool = False,
        **kwargs,
    ) -> List[Rule]:
        """Evaluate the ruleset on y and xs to produce the rules' attributes relevant to the test set. Will recompute
        the ruleset's stacked activation vector if using stakced fit.

        Parameters
        ----------
        y: Union[np.ndarray, pd.Series]
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
        keep_new_activations: bool
            If True, activation vectgor and stacked activation vectors will be kept as ruleset's attribute and replace
            the ones computed using the train set, if any. Will also change the activation vectors of the rules.
        weights: Optional[Union[pd.Series, str]]
            Optional weights for calc_critetion. If is a pd.Series, expects the index to be the rules names.
            If is a str, a pd.Series will be constructed by fetching each rules' attribute named after the given string
            (ex: it can be 'criterion')
        force_if_not_good: bool
            If the rule was seen as "bad", eval will not trigger unless this boolean is True (Default value = False)
        kwargs


        Returns
        -------
        List[Rule]
            List of rules that were excluded from the ruleset after fitting because they were 'bad'
        """
        if issubclass(self.rule_type, ClassificationRule):
            type_to_use = ClassificationRule
        elif issubclass(self.rule_type, RegressionRule):
            type_to_use = RegressionRule
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

        if "method" in kwargs:
            raise IndexError("Key 'method' can not be given to 'eval'")

        def launch_method(method, **kw):
            expected_args = list(inspect.signature(method).parameters)
            if "kwargs" not in expected_args:
                kw = {item: kw[item] for item in kw if item in expected_args}
            return method(**kw)

        if xs is not None and len(xs) == 0:
            logger.warning("Given xs is empty")
            return []

        if len(self) == 0:
            logger.debug("Ruleset is empty. Nothing to fit.")
            return []

        if not all([r._fitted for r in self]):
            if xs is None:
                raise ValueError(
                    "Not all rules of the ruleset were fitted. Please do so before calling ruleset.eval or provide xs."
                )
            else:
                keep_new_activations = True

        if self.__class__.STACKED_FIT:
            clean_activation = False

            # If test set is given, compute the activation used to evaluate test relative attributes.
            # Not the same as self.stacked_activations, computed from the train set.
            # Else, we evaluate on the train set, so we use self.stacked_actviation
            nones = None
            if xs is not None:
                if keep_new_activations:
                    clean_activation = not self.stack_activation
                    self.stack_activation = True
                    self.calc_activation(xs=xs)  # Will reset rules' activation vectors too
                    self.stack_activation = not clean_activation
                    stacked_activations = self.stacked_activations
                    activation = self._activation
                    # noinspection PyUnresolvedReferences
                    if "zscore" in self.rule_type.attributes_from_test_set:
                        nones = pd.Series({str(r.condition): r.nones for r in self})
                else:
                    stacked_activations = self.evaluate_stacked_activations(xs)
                    # noinspection PyUnresolvedReferences
                    if "zscore" in self.rule_type.attributes_from_test_set:
                        activation, nones = self.evaluate_self_activation(xs, return_nones=True)
                    else:
                        activation = self.evaluate_self_activation(xs)
            else:
                # If self.stacked_activations is None, compute it from the rules' activations. They must be available.
                if self.stacked_activations is None:
                    # Else, will compute the stacked activation from the current rules activations
                    clean_activation = not self.stack_activation
                    self.stack_activation = True
                    self.compute_stacked_activation()
                    self.compute_self_activation()
                    if self.stacked_activations is None:
                        raise ValueError("Rules activations must have been computed previously.")
                    self.stack_activation = not clean_activation
                stacked_activations = self.stacked_activations
                activation = self._activation
                # noinspection PyUnresolvedReferences
                if "zscore" in self.rule_type.attributes_from_test_set:
                    nones = pd.Series({str(r.condition): r.nones for r in self})

            if isinstance(y, np.ndarray):
                if not len(stacked_activations.index) == y.shape[0]:
                    raise IndexError("Stacked activation and y have different number of rows.")
            else:
                if not len(stacked_activations.index) == len(y.index):
                    raise IndexError("Stacked activation and y have different number of rows.")
                stacked_activations.index = y.index

            # noinspection PyUnresolvedReferences
            if "prediction" in self.rule_type.attributes_from_train_set:
                # Do NOT recompute prediction from test set : it does not make sense !
                predictions = pd.Series({str(r.condition): r.prediction for r in self})
            else:
                predictions = None
            computed_attrs = {}

            # noinspection PyUnresolvedReferences
            for attr in self.rule_type.attributes_from_test_set:
                if attr == "activation":
                    raise ValueError("'activation' can not be specified in 'attributes_from_train_set'")
                computed_attrs[f"{attr}s"] = launch_method(
                    getattr(self, f"calc_{attr}s"),
                    y=y,
                    xs=xs,
                    stacked_activations=stacked_activations,
                    activation=activation,
                    nones=nones,
                    predictions=predictions,
                    **computed_attrs,
                    **kwargs,
                )

            to_drop = []

            if clean_activation:
                self.del_stacked_activations()
            else:
                if isinstance(y, np.ndarray):
                    stacked_activations.index = pd.RangeIndex(y.shape[0])
                else:
                    stacked_activations.index = pd.RangeIndex(len(y.index))

            for ir in range(len(self)):
                self._rules[ir]._evaluated = True
                for attr in computed_attrs:
                    setattr(self._rules[ir], f"_{attr[:-1]}", computed_attrs[attr].iloc[ir])
                    if self._rules[ir].good:
                        self._rules[ir].check_thresholds(attr[:-1])
                    if not self._rules[ir].good:
                        to_drop.append(self._rules[ir])
                if self._rules[ir].good:
                    self._rules[ir].check_thresholds()
        else:
            [
                r.eval(
                    xs=xs, y=y, recompute_activation=keep_new_activations, force_if_not_good=force_if_not_good, **kwargs
                )
                for r in self
            ]
            to_drop = [r for r in self if not r.good]

        if len(to_drop) > 0:
            rules = [r for r in self.rules if r not in to_drop]
            self._rules = []
            if self._activation is not None:
                self._activation.clear()
            self.del_stacked_activations()
            for r in rules:
                self.append(r, update_activation=False)

            # Recompute activation now that bad rules have been droped
            if self._activation is None or self._activation.length == 0 or (xs is not None and keep_new_activations):
                self._activation = None
                self.compute_self_activation()
            if self.stack_activation and (
                    self.stacked_activations is None or (xs is not None and keep_new_activations)
            ):
                self.stacked_activations = None
                self.compute_stacked_activation()
        # If not bad rules were dropped and stacked fit was not used, still compute self.activation since it has not
        # been done
        elif not self.__class__.STACKED_FIT:
            if self._activation is None or self._activation.length == 0 or (xs is not None and keep_new_activations):
                self._activation = None
                self.compute_self_activation()
            if self.stack_activation and (self._activation is None or (xs is not None and keep_new_activations)):
                self.stacked_activations = None
                self.compute_stacked_activation()

        for attr in self.__class__.attributes_from_test_set[type_to_use]:
            launch_method(
                getattr(self, f"calc_{attr}"),
                y=y,
                xs=xs if not keep_new_activations else None,
                weights=weights,
                **kwargs,
            )

        return to_drop

    def append(self, rule: Rule, update_activation: bool = True):
        """Appends a new rule to self. The updates of activation vector and the stacked activation vectors can be
        blocked by specifying update_activation=False. Otherwise, will use self.stack_activation to determine if the
        updates should be done or not."""

        if not isinstance(rule, Rule):
            raise TypeError(f"RuleSet's append method expects a Rule object, got {type(rule)}")
        stack_activation = self.stack_activation
        if not update_activation:
            self._compute_activation = False
            self.stack_activation = False
        self.__iadd__(rule)
        self._compute_activation = True
        self.stack_activation = stack_activation

    def sort(self, criterion: str = None, reverse: bool = False):
        """Sorts the RuleSet.

        * If criterion is not speficied:
            Will sort the rules according to :
                1. The number of features they talk about
                2. For a same number of features (sorted in alphabetical order, or index if names are not available,
                    optionally reversed), the bmins and bmaxs of the rules
        * If criterion is specified, it must be an float or interger attribute of rule, condition or activation. Then
            sorts according to this criterion, optionally reversed.
        """
        if len(self) == 0:
            return

        if criterion is None or criterion == "":
            if not (hasattr(self[0].condition, "bmins") and hasattr(self[0].condition, "bmaxs")):
                return
            # The set of all the features the RuleSet talks about
            which = "index"
            if len(self.features_names) > 0:
                which = "name"
                fnames_or_indexes = list(set([str(r.features_names) for r in self]))
            else:
                fnames_or_indexes = list(set([str(r.features_indexes) for r in self]))
            dict_names = {}
            lmax = 1
            for f in fnames_or_indexes:
                l_ = len(ast.literal_eval(f))
                if l_ > lmax:
                    lmax = l_
                if l_ not in dict_names:
                    dict_names[l_] = []
                dict_names[l_].append(f)
            for l_ in dict_names:
                dict_names[l_].sort(reverse=reverse)
            fnames_or_indexes = []
            for l_ in range(1, lmax + 1):
                if l_ in dict_names:
                    fnames_or_indexes += dict_names[l_]

            rules_by_fnames = OrderedDict({f: [] for f in fnames_or_indexes})
            for rule in self:
                if which == "name":
                    v = str(rule.features_names)
                else:
                    v = str(rule.features_indexes)
                rules_by_fnames[v].append(rule)
            rules_by_fnames = {
                n: sorted(rules_by_fnames[n], key=lambda x: x.condition.bmins + x.condition.bmaxs)
                for n in rules_by_fnames
            }
            self._rules = []
            for n in rules_by_fnames:
                self._rules += rules_by_fnames[n]
        elif hasattr(self[0], criterion):
            self._rules = sorted(self, key=lambda x: getattr(x, criterion), reverse=reverse)
        else:
            raise ValueError(f"Can not sort RuleSet according to criterion {criterion}")
        if self.stack_activation:
            self.stacked_activations = self.stacked_activations[[str(r.condition) for r in self]]

    # noinspection PyProtectedMember
    def evaluate_self_activation(
        self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None, return_nones: bool = False
    ):
        """Computes the activation vector of self from its rules, using time-efficient Activation.multi_logical_or."""
        if len(self) == 0:
            act = Activation(np.array([]))
            if return_nones is True:
                return act, pd.Series(dtype=int)
            return act
        if xs is not None:
            activations = [r.evaluate_activation(xs) for r in self]
            activations_available = True
        else:
            activations = [r._activation for r in self]
            activations_available = all([r.activation_available for r in self])
        if activations_available:
            if len(self) == 1:
                act = Activation(activations[0].raw, optimize=activations[0].optimize, to_file=activations[0].to_file)
                if return_nones is True:
                    return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
                return act
            try:
                act = Activation.multi_logical_or(activations)
                if return_nones is True:
                    return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
                return act
            except (MemoryError, np.core._exceptions._ArrayMemoryError) as e:
                logger.warning(str(e))
                act = activations[0]
                for a in activations[1:]:
                    act = act | a
                if return_nones is True:
                    return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
                return act

    def compute_self_activation(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
        """Computes the activation vector of self from its rules. If xs is specified, uses it to remake the
        rules' activation vectors, but do not set them as the 'activation' attributes of the rules"""
        self._activation = self.evaluate_self_activation(xs=xs)

    def evaluate_stacked_activations(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
        if len(self) == 0:
            return pd.DataFrame(dtype=int)
        if xs is not None:
            return pd.DataFrame({str(r.condition): r.evaluate_activation(xs).raw for r in self})
        activations_available = all([r.activation_available for r in self])
        if activations_available:
            # noinspection PyProtectedMember
            return pd.DataFrame({str(r.condition): r.activation for r in self})

    def compute_stacked_activation(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
        """Computes the stacked activation vectors of self from its rules. If xs is specified, uses it to remake the
        rules' activation vectors, but do not set them as the 'activation' attributes of the rules"""
        self.stacked_activations = self.evaluate_stacked_activations(xs=xs)

    def del_activations(self):
        """Deletes the data, but not the relevent attributes, of the activation vector or each rules in self."""
        for r in self:
            r.del_activation()

    def del_activation(self):
        """Deletes the activation vector's data of self, but not the object itself, so any computed attribute remains
        available"""
        if hasattr(self, "_activation") and self._activation is not None:
            self._activation.delete()

    def del_stacked_activations(self):
        """Deletes stacked activation vectors of self. Set it to None."""
        if hasattr(self, "stacked_activations") and self.stacked_activations is not None:
            del self.stacked_activations
            self.stacked_activations = None

    def evaluate(self, xs: Union[pd.DataFrame, np.ndarray]) -> Activation:
        """Computes and returns the activation vector from an array of features.

        Parameters
        ----------
        xs: Union[pd.DataFrame, np.ndarray]
            The features on which the check whether the rule is activated or not. Must be a 2-D np.ndarray
            or pd.DataFrame.

        Returns
        -------
        Activation
        """
        if len(self) == 0:
            raise ValueError("Can not use evaluate : The ruleset is empty!")
        activations = [rule.evaluate_activation(xs) for rule in self.rules]
        return Activation.multi_logical_or(activations)

    def calc_activation(self, xs: Union[np.ndarray, pd.DataFrame]):
        """Uses input xs features data to compute the activation vector of all rules in self, and updates self's
        activation and stacked activation if self.stack_activation is True

        Parameters
        ----------
        xs: Union[np.ndarray, pd.DataFrame]
        """
        if len(self) == 0:
            raise ValueError("Can not use calc_activation : The ruleset is empty!")

        if len(xs) == 0:
            logger.warning("Given xs is empty")
            return
        [rule.calc_activation(xs) for rule in self.rules]

        self._activation = None
        self.compute_self_activation()
        if self.stack_activation:
            self.stacked_activations = None
            self.compute_stacked_activation()

    def get_features_count(self) -> List[Tuple[Any, int]]:
        """
        Get a counter of all different features in the ruleset. If names are not available, will use indexes.

        Returns:
        --------
        count : List[Tuple[Any, int]]
            Counter of all different features in the ruleset
        """
        if len(self) == 0:
            return []
        if len(self.features_names) > 0:
            var_in = list(itertools.chain(*[rule.features_names for rule in self]))
        else:
            var_in = list(itertools.chain(*[rule.feautres_indexes for rule in self]))
        count = Counter(var_in)

        count = count.most_common()
        return count

    def load(self, path, **kwargs):
        if hasattr(path, "read"):
            rules = path.read(**kwargs)
        else:
            rules = pd.read_csv(path, **kwargs)
        if "ruleset coverage" in rules.iloc[:, 0].values:
            self._coverage = rules[rules.iloc[:, 0] == "ruleset coverage"].iloc[0, 1]
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset coverage"].index)
        if "ruleset criterion" in rules.iloc[:, 0].values:
            self.criterion = rules[rules.iloc[:, 0] == "ruleset criterion"].iloc[0, 1]
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset criterion"].index)
        if "ruleset train set size" in rules.iloc[:, 0].values:
            self.train_set_size = rules[rules.iloc[:, 0] == "ruleset train set size"].iloc[0, 1]
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset train set size"].index)
            if isinstance(self.train_set_size, str):
                self.train_set_size = int(self.train_set_size)
        if "ruleset test set size" in rules.iloc[:, 0].values:
            self.test_set_size = rules[rules.iloc[:, 0] == "ruleset test set size"].iloc[0, 1]
            if isinstance(self.test_set_size, str):
                self.test_set_size = int(self.test_set_size)
            rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset test set size"].index)
        if rules.empty:
            self._rules = []
        else:
            self._rules = [self.series_to_rule(rules.loc[r]) for r in rules.index]
        for r in self:
            if r._train_set_size is None:
                r._train_set_size = self.train_set_size
            elif isinstance(r._train_set_size, str):
                r._train_set_size = int(r._train_set_size)
            if r._test_set_size is None:
                r._test_set_size = self.test_set_size
            elif isinstance(r._test_set_size, str):
                r._test_set_size = int(r._test_set_size)

        if len(self._rules) > 0:
            self.rule_type = type(self._rules[0])
        self.compute_self_activation()
        if self.stack_activation:
            self.compute_stacked_activation()
        self.features_names = list(set(traverse([rule.features_names for rule in self])))

    def to_df(self, custom_rules_to_series: Optional[Callable] = None, ruleset_attributes: bool = True) -> pd.DataFrame:
        if len(self) == 0:
            return pd.DataFrame()
        idx = copy(self._rule_type.index)

        if custom_rules_to_series is None:
            dfs = [
                self.rule_to_series(
                    (i, r),
                    index=idx,
                )
                for i, r in enumerate(self.rules)
            ]
        else:
            dfs = custom_rules_to_series(index=idx)

        if len(dfs) > 0:
            df = pd.concat(dfs, axis=1).T
        else:
            df = pd.DataFrame(columns=idx)

        for i in range(len(df.index)):
            # noinspection PyProtectedMember
            self._rules[i]._name = df.index[i]

        if not ruleset_attributes:
            return df

        s_cov = pd.DataFrame(
            columns=df.columns,
            data=[[self.ruleset_coverage if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset coverage"],
        )
        s_crit = pd.DataFrame(
            columns=df.columns,
            data=[[self.criterion if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset criterion"],
        )
        s_train = pd.DataFrame(
            columns=df.columns,
            data=[[self.train_set_size if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset train set size"],
        )
        s_test = pd.DataFrame(
            columns=df.columns,
            data=[[self.train_set_size if i == 0 else np.nan for i in range(len(df.columns))]],
            index=["ruleset test set size"],
        )
        return pd.concat([df, s_cov, s_crit, s_train, s_test])

    def save(self, path):
        df = self.to_df()

        if hasattr(path, "write"):
            path.write(df)
        else:
            df.to_csv(path)

    # noinspection PyProtectedMember
    @staticmethod
    def rule_to_series(irule: Tuple[int, Rule], index: list) -> pd.Series:
        i = irule[0]
        rule = irule[1]
        if rule._name is not None:
            name = rule._name
        elif hasattr(rule, "sign"):
            name = f"R_{i}({len(rule)}){rule.sign}"
        else:
            name = f"R_{i}({len(rule)})"
        sr = pd.Series(data=[str(getattr(rule, ind)) for ind in index], name=name, index=index, dtype=str)
        return sr

    @staticmethod
    def series_to_rule(srule: pd.Series) -> Rule:

        for ind in srule.index:
            if ind == "Unnamed: 0":
                continue
            if ind not in Rule.index and ind not in RegressionRule.index and ind not in ClassificationRule.index:
                raise IndexError(f"Invalid rule attribute '{ind}'")

        if "std" in srule.index:
            rule = RegressionRule()
        elif "criterion" in srule.index:
            rule = ClassificationRule()
        else:
            rule = Rule()
        rule_idx = copy(rule.__class__.rule_index)
        condition_index = {c: None for c in rule.__class__.condition_index}
        rule._name = srule.index[0]

        for rule_ind in srule.index:
            str_value = str(srule[rule_ind])
            if rule_ind in condition_index:
                condition_index[rule_ind] = ast.literal_eval(str_value)
            elif rule_ind in rule_idx:
                if rule_ind == "activation":
                    setattr(rule, f"_{rule_ind}", Activation(str_value))
                elif str_value == "nan":
                    setattr(rule, f"_{rule_ind}", np.nan)
                elif rule_ind == "sign":
                    setattr(rule, f"_{rule_ind}", str_value)
                else:
                    if rule_ind == "prediction":  # Prediction can be a str in case of classification
                        try:
                            setattr(rule, f"_{rule_ind}", ast.literal_eval(str_value))
                        except ValueError:
                            setattr(rule, f"_{rule_ind}", str_value)
                    else:
                        setattr(rule, f"_{rule_ind}", ast.literal_eval(str_value))
            else:
                continue

        rule._condition = HyperrectangleCondition(**condition_index)
        if hasattr(rule, rule.__class__.fitted_if_has) and getattr(rule, rule.__class__.fitted_if_has) is not None:
            rule._fitted = True
        return rule

    def calc_predictions(
        self, y: [np.ndarray, pd.Series], stacked_activations: Optional[pd.DataFrame] = None
    ) -> Union[pd.Series, pd.DataFrame]:
        """
        Will compute the prediction of each rule in the ruleset. Does NOT set anything in either the rules nor the
        ruleset.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
            The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D
            np.ndarray or pd.Series.
        stacked_activations: Optional[pd.DataFrame)
            If specified, uses this activation instead of self.activation

        Returns
        -------
        Union[pd.Series, pd.DataFrame]
            Regression : A pd.Series with the rules as index and the values being the predictions\n
            Classification : A pd.DataFrame with rules as index and classes as columns, and class probabilities as
            values
        """
        if stacked_activations is None:
            stacked_activations = self.stacked_activations
            if stacked_activations is None:
                raise ValueError("Stacked activation vectors are needed")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)
            if len(self) == 0:
                if self.rule_type is None:
                    return pd.Series(dtype=int)
                elif issubclass(self.rule_type, ClassificationRule):
                    return pd.DataFrame(dtype=int)
                elif issubclass(self.rule_type, RegressionRule):
                    return pd.Series(dtype=int)
                else:
                    raise TypeError(f"Unexpected rule type '{self.rule_type}'")

            if issubclass(self.rule_type, ClassificationRule):
                class_probabilities = functions.class_probabilities(stacked_activations, y)
                maxs = class_probabilities.max()
                return class_probabilities[class_probabilities == maxs].apply(
                    lambda x: x.dropna().sort_index().index[0]
                )
            elif issubclass(self.rule_type, RegressionRule):
                return functions.conditional_mean(stacked_activations, y)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_stds(self, y: [np.ndarray, pd.Series], stacked_activations: Optional[pd.DataFrame] = None) -> pd.Series:
        """
        Will compute the std of each rule in the ruleset. Does NOT set anything in either the rules nor the
        ruleset.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
          The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D np.ndarray
          or pd.Series.
        stacked_activations: Optional[pd.DataFrame]
            If specified, uses this activation instead of self.activation

        Returns
        -------
        pd.Series
            A pd.Series with the rules as index and the values being the std
        """
        if not issubclass(self.rule_type, RegressionRule):
            raise TypeError(f"'std' can not be computed for '{self.rule_type}'")

        if stacked_activations is None:
            stacked_activations = self.stacked_activations
            if stacked_activations is None:
                raise ValueError("Stacked activation vectors are needed")

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=RuntimeWarning)
            if len(self) == 0:
                if self._rule_type is None:
                    return pd.Series(dtype=int)
                elif issubclass(self.rule_type, RegressionRule):
                    return pd.Series(dtype=int)
                else:
                    raise TypeError(f"Unexpected rule type '{self.rule_type}'")

            if issubclass(self.rule_type, RegressionRule):
                return functions.conditional_std(stacked_activations, y)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_signs(self, predictions: Optional[pd.Series] = None) -> pd.Series:
        """
        Computes the sign of each rule in the ruleset.

        Rules predictions must have been set or be given to the method.
        """
        if issubclass(self.rule_type, RegressionRule):
            if predictions is None:
                predictions = pd.Series({str(r.condition): r.prediction for r in self})
            return predictions.apply(lambda x: None if x is None else ("-" if x < 0 else "+"))
        else:
            raise TypeError(f"'sign' can not be computed for '{self.rule_type}'")

    def calc_zscores(
        self,
        y: np.ndarray,
        predictions: Optional[pd.Series] = None,
        nones: Optional[pd.Series] = None,
        horizon: int = 1,
    ) -> pd.Series:
        if not issubclass(self.rule_type, RegressionRule):
            raise TypeError(f"'sign' can not be computed for '{self.rule_type}'")
        if nones is None:
            nones = pd.Series({str(r.condition): r.nones for r in self})

        if predictions is None:
            predictions = self.calc_predictions(y=y)
            """unique prediction of each rules in a pd.Series"""

        return calc_zscore_external(prediction=predictions, nones=nones, y=y, horizon=horizon)

    def calc_criterions(
        self,
        y: Union[np.ndarray, pd.Series],
        predictions: Optional[pd.Series] = None,
        stacked_activations: Optional[pd.DataFrame] = None,
        **kwargs,
    ) -> pd.Series:
        """
        Will compute the criterion of each rule in the ruleset. Does NOT set anything in either the rules nor the
        ruleset.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
            The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D
            np.ndarray or pd.Series.
        predictions: Optional[pd.Series]
            Prediction of each rules. If None, will call self.calc_predictions(y)
        stacked_activations: Optional[pd.DataFrame]
            If specified, uses those activations instead of self.stacked_activations
        kwargs

        Returns
        -------
        pd.Series
            Criterion values a set of rules (pd.Series)

        """
        if stacked_activations is None:
            stacked_activations = self.stacked_activations
            if stacked_activations is None:
                raise ValueError("Stacked activation vectors are needed")

        if predictions is None:
            predictions = self.calc_predictions(y=y)
            """unique prediction of each rules in a pd.Series"""

        if self._rule_type is None:
            return pd.Series(dtype=int)
        if issubclass(self.rule_type, ClassificationRule):
            return functions.calc_classification_criterion(stacked_activations, predictions, y, **kwargs)
        elif issubclass(self.rule_type, RegressionRule):
            return functions.calc_regression_criterion(
                stacked_activations.replace(0, np.nan) * predictions, y, **kwargs
            )
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_train_set_sizes(self, y: Union[np.ndarray, pd.Series]) -> pd.Series:
        if isinstance(y, (pd.Series, pd.DataFrame)):
            s = len(y.index)
        else:
            s = len(y)
        return pd.Series({str(r.condition): s for r in self})

    def calc_test_set_sizes(self, y: Union[np.ndarray, pd.Series]) -> pd.Series:
        if isinstance(y, (pd.Series, pd.DataFrame)):
            s = len(y.index)
        else:
            s = len(y)
        return pd.Series({str(r.condition): s for r in self})

    def predict(
        self,
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        weights: Optional[Union[pd.Series, str]] = None,
    ) -> pd.Series:
        """Computes the prediction vector of an entier ruleset from its rules predictions and its activation vector.
        Predictions of rules must have been computed beforehand if 'xs' is not specified.

        Parameters
        ----------
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
            If specified, uses those features to compute the ruleset's activation. Does not change the activation
            vectors nor the predictions of the ruleset's rules nor its own activation and stacked activations.
        weights: Optional[Union[pd.Series, str]]
            Optional weights. If is a pd.Series, expects the index to be the rules names. If is a str, a pd.Series
            will be constructed by fetching each rules' attribute named after the given string
            (ex: it can be 'criterion')

        Returns
        -------
        pd.Series
            The prediction vector of the ruleset. Index are observations numbers, values are the predicted values when
            the ruleset predicts, else NaN.
        """
        if len(self) == 0:
            return pd.Series(dtype=int)
        if self._rule_type is None:
            return pd.Series(dtype=int)

        stacked = self.__class__.STACKED_FIT and self.stacked_activations is not None

        stacked_activations = None
        if xs is not None and stacked:
            stacked_activations = self.evaluate_stacked_activations(xs=xs)
        if stacked:
            return self._calc_prediction_stacked(weights=weights, stacked_activations=stacked_activations)
        else:
            return self._calc_prediction_unstacked(weights=weights, xs=xs)

    def _calc_prediction_stacked(
        self, weights: Optional[Union[pd.Series, str]] = None, stacked_activations: Optional[pd.DataFrame] = None
    ) -> Union[None, pd.Series]:
        predictions = pd.Series({str(r.condition): getattr(r, "prediction") for r in self})

        if stacked_activations is None:
            stacked_activations = self.stacked_activations
        if stacked_activations is None:
            return None

        if isinstance(self[0].prediction, str):
            prediction_vectors = stacked_activations.replace(0, np.nan).replace(1.0, "") + predictions
        else:
            prediction_vectors = stacked_activations.replace(0, np.nan) * predictions
        if prediction_vectors.empty:
            return prediction_vectors
        if weights is not None:
            if isinstance(weights, str):
                weights = pd.Series({str(r.condition): getattr(r, weights) for r in self})
            weights = stacked_activations.replace(0, np.nan) * weights
            if issubclass(self.rule_type, RegressionRule):
                return calc_ruleset_prediction_weighted_regressor_stacked(
                    prediction_vectors=prediction_vectors, weights=weights
                )
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_weighted_classificator_stacked(
                    prediction_vectors=prediction_vectors, weights=weights
                )
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")
        else:
            if issubclass(self.rule_type, RegressionRule):
                return calc_ruleset_prediction_equally_weighted_regressor_stacked(prediction_vectors=prediction_vectors)
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_equally_weighted_classificator_stacked(
                    prediction_vectors=prediction_vectors
                )
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def _calc_prediction_unstacked(
        self,
        weights: Optional[Union[pd.Series, str]] = None,
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
    ) -> pd.Series:

        if weights is not None:
            if isinstance(weights, str):
                weights = pd.Series({str(r.condition): getattr(r, weights) for r in self})
            if issubclass(self.rule_type, RegressionRule):

                return calc_ruleset_prediction_weighted_regressor_unstacked(rules=self.rules, weights=weights, xs=xs)
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_weighted_classificator_unstacked(
                    rules=self.rules, weights=weights, xs=xs
                )
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")
        else:
            if issubclass(self.rule_type, RegressionRule):
                return calc_ruleset_prediction_equally_weighted_regressor_unstacked(rules=self.rules, xs=xs)
            elif issubclass(self.rule_type, ClassificationRule):
                return calc_ruleset_prediction_equally_weighted_classificator_unstacked(rules=self.rules, xs=xs)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_criterion(
        self,
        y: Union[np.ndarray, pd.Series],
        xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
        weights: Optional[Union[pd.Series, str]] = None,
        predictions: Optional[pd.Series] = None,
        **kwargs,
    ):
        """Computes the criterion of an entier ruleset. Predictions of rules must have been computed beforehand.

        This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
        not suffice.

        Sets self.criterion so does not return anything

        Parameters
        ----------
        y: [np.ndarray, pd.Series]
            The targets on which to evaluate the ruleset criterions. Must be a 1-D np.ndarray or pd.Series.
        xs: Optional[Union[pd.DataFrame, np.ndarray]]
            If specified, uses those features to compute the ruleset's activation. Does not change the activation
            vectors nor the predictions of its rules. If specified and if self.stack_activation is True, will overwrite
            the ruleset's stacked activation vectors. Else, only ruleset's activation is overwritten.
        weights: Optional[Union[pd.Series, str]]
            Optional weights. If is a pd.Series, expected the index to be the rules names. If is a str, a pd.Series
            will be constructed by fetching each rules' attribute named after the given string
            (ex: it can be 'criterion').
            Useless if predictions is specified.
        predictions: Optional[pd.Series]
            The vector of predictions of the ruleset. If not specified, is computed using self.calc_prediction
        kwargs
        """
        if len(self) == 0:
            return np.nan
        if self._rule_type is None:
            return np.nan
        if predictions is None:
            predictions = self.predict(xs=xs, weights=weights)
        if len(predictions) == 0:
            self.criterion = np.nan
            return
        if issubclass(self.rule_type, ClassificationRule):
            if xs is not None:
                activation = self.evaluate_self_activation(xs=xs).raw
            else:
                if self._activation is None or self._activation.length == 0:
                    self.compute_self_activation()
                activation = self.activation
            self.criterion = functions.calc_classification_criterion(activation, predictions, y, **kwargs)
        elif issubclass(self.rule_type, RegressionRule):
            # noinspection PyTypeChecker
            self.criterion = functions.calc_regression_criterion(predictions, y, **kwargs)
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    def calc_test_set_size(self, y: Union[np.ndarray, pd.Series]):
        if isinstance(y, (pd.Series, pd.DataFrame)):
            self.test_set_size = len(y.index)
        else:
            self.test_set_size = len(y)

    def calc_train_set_size(self, y: Union[np.ndarray, pd.Series]):
        if isinstance(y, (pd.Series, pd.DataFrame)):
            self.train_set_size = len(y.index)
        else:
            self.train_set_size = len(y)

Ancestors

  • abc.ABC

Class variables

var CHECK_DUPLICATED
var NLINES
var STACKED_FIT
var all_features_indexes
var attributes_from_test_set
var attributes_from_train_set

Static methods

def check_duplicated_rules(rules, name_or_index: str = 'index')
Expand source code
@staticmethod
def check_duplicated_rules(rules, name_or_index: str = "index"):
    if name_or_index == "index":
        str_rules = [str(r.features_indexes) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in rules]
    else:
        str_rules = [str(r.features_names) + str(r.bmins) + str(r.bmaxs) + str(r.prediction) for r in rules]
    if len(set(str_rules)) < len(str_rules):
        duplicated = {}
        for r in str_rules:
            if r not in duplicated:
                duplicated[r] = 0
            duplicated[r] += 1
        duplicated = [
            f"{r}: {duplicated[r]} (positions {[i for i, x in enumerate(str_rules) if x == r]})\n"
            f"   underlying rules : {[str(rules[i]) for i in [i for i, x in enumerate(str_rules) if x == r]]}"
            for r in duplicated
            if duplicated[r] > 1
        ]
        s = "\n -".join(duplicated)
        raise ValueError(f"There are {len(duplicated)} duplicated rules in your ruleset !\n {s}")
def rule_to_series(irule: Tuple[int, Rule], index: list) ‑> pandas.core.series.Series
Expand source code
@staticmethod
def rule_to_series(irule: Tuple[int, Rule], index: list) -> pd.Series:
    i = irule[0]
    rule = irule[1]
    if rule._name is not None:
        name = rule._name
    elif hasattr(rule, "sign"):
        name = f"R_{i}({len(rule)}){rule.sign}"
    else:
        name = f"R_{i}({len(rule)})"
    sr = pd.Series(data=[str(getattr(rule, ind)) for ind in index], name=name, index=index, dtype=str)
    return sr
def series_to_rule(srule: pandas.core.series.Series) ‑> Rule
Expand source code
@staticmethod
def series_to_rule(srule: pd.Series) -> Rule:

    for ind in srule.index:
        if ind == "Unnamed: 0":
            continue
        if ind not in Rule.index and ind not in RegressionRule.index and ind not in ClassificationRule.index:
            raise IndexError(f"Invalid rule attribute '{ind}'")

    if "std" in srule.index:
        rule = RegressionRule()
    elif "criterion" in srule.index:
        rule = ClassificationRule()
    else:
        rule = Rule()
    rule_idx = copy(rule.__class__.rule_index)
    condition_index = {c: None for c in rule.__class__.condition_index}
    rule._name = srule.index[0]

    for rule_ind in srule.index:
        str_value = str(srule[rule_ind])
        if rule_ind in condition_index:
            condition_index[rule_ind] = ast.literal_eval(str_value)
        elif rule_ind in rule_idx:
            if rule_ind == "activation":
                setattr(rule, f"_{rule_ind}", Activation(str_value))
            elif str_value == "nan":
                setattr(rule, f"_{rule_ind}", np.nan)
            elif rule_ind == "sign":
                setattr(rule, f"_{rule_ind}", str_value)
            else:
                if rule_ind == "prediction":  # Prediction can be a str in case of classification
                    try:
                        setattr(rule, f"_{rule_ind}", ast.literal_eval(str_value))
                    except ValueError:
                        setattr(rule, f"_{rule_ind}", str_value)
                else:
                    setattr(rule, f"_{rule_ind}", ast.literal_eval(str_value))
        else:
            continue

    rule._condition = HyperrectangleCondition(**condition_index)
    if hasattr(rule, rule.__class__.fitted_if_has) and getattr(rule, rule.__class__.fitted_if_has) is not None:
        rule._fitted = True
    return rule

Instance variables

var activation : Optional[None]

Returns the Activation vector's data in a form of a 1-D np.ndarray, or None if not available.

Returns:

Union[None, np.ndarray] of the form [0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, …]

Expand source code
@property
def activation(self) -> Union[None, np.ndarray]:
    """Returns the Activation vector's data in a form of a 1-D np.ndarray, or None if not available.

    Returns:
    --------
    Union[None, np.ndarray]
        of the form [0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, ...]
    """
    if self._activation:
        return self._activation.raw
    return None
var activation_available : bool

Returns True if the RuleSet has an activation vector, and if this Activation's object data is available.

Expand source code
@property
def activation_available(self) -> bool:
    """Returns True if the RuleSet has an activation vector, and if this Activation's object data is available."""
    if self._activation is None or self._activation.length == 0:
        return False
    if self._activation.data_format == "file":
        return self._activation.data.is_file()
    else:
        return self._activation.data is not None
var rule_type : type
Expand source code
@property
def rule_type(self) -> type:
    if self._rule_type is None:
        raise ValueError("Ruleset's rule type is not set !")
    return self._rule_type
var rules : List[Rule]
Expand source code
@property
def rules(self) -> List[Rule]:
    return self._rules
var ruleset_coverage : float

Coverage is the fraction of points equal to 1 in the activation vector

Expand source code
@property
def ruleset_coverage(self) -> float:
    """Coverage is the fraction of points equal to 1 in the activation vector"""
    if not self.activation_available:
        return self._coverage
    else:
        return self._activation.coverage
var stacked_activations_available : bool

Returns True is the RuleSet has its rules' stacked activations.

Expand source code
@property
def stacked_activations_available(self) -> bool:
    """Returns True is the RuleSet has its rules' stacked activations."""
    if self.stack_activation is None or self._activation.length == 0:
        return False
    return True
var test_set_size

Set by self.eval

var to_hash : Tuple[str]
Expand source code
@property
def to_hash(self) -> Tuple[str]:
    if len(self) == 0:
        return "rs",
    to_hash = ("rs",)
    for r in self:
        rule_hash = r.to_hash[1:]
        to_hash += rule_hash
    return to_hash

Methods

def append(self, rule: Rule, update_activation: bool = True)

Appends a new rule to self. The updates of activation vector and the stacked activation vectors can be blocked by specifying update_activation=False. Otherwise, will use self.stack_activation to determine if the updates should be done or not.

Expand source code
def append(self, rule: Rule, update_activation: bool = True):
    """Appends a new rule to self. The updates of activation vector and the stacked activation vectors can be
    blocked by specifying update_activation=False. Otherwise, will use self.stack_activation to determine if the
    updates should be done or not."""

    if not isinstance(rule, Rule):
        raise TypeError(f"RuleSet's append method expects a Rule object, got {type(rule)}")
    stack_activation = self.stack_activation
    if not update_activation:
        self._compute_activation = False
        self.stack_activation = False
    self.__iadd__(rule)
    self._compute_activation = True
    self.stack_activation = stack_activation
def calc_activation(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray])

Uses input xs features data to compute the activation vector of all rules in self, and updates self's activation and stacked activation if self.stack_activation is True

Parameters

xs : Union[np.ndarray, pd.DataFrame]
 
Expand source code
def calc_activation(self, xs: Union[np.ndarray, pd.DataFrame]):
    """Uses input xs features data to compute the activation vector of all rules in self, and updates self's
    activation and stacked activation if self.stack_activation is True

    Parameters
    ----------
    xs: Union[np.ndarray, pd.DataFrame]
    """
    if len(self) == 0:
        raise ValueError("Can not use calc_activation : The ruleset is empty!")

    if len(xs) == 0:
        logger.warning("Given xs is empty")
        return
    [rule.calc_activation(xs) for rule in self.rules]

    self._activation = None
    self.compute_self_activation()
    if self.stack_activation:
        self.stacked_activations = None
        self.compute_stacked_activation()
def calc_criterion(self, y: Union[numpy.ndarray, pandas.core.series.Series], xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None, weights: Union[pandas.core.series.Series, str, ForwardRef(None)] = None, predictions: Optional[None] = None, **kwargs)

Computes the criterion of an entier ruleset. Predictions of rules must have been computed beforehand.

This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might not suffice.

Sets self.criterion so does not return anything

Parameters

y : [np.ndarray, pd.Series]
The targets on which to evaluate the ruleset criterions. Must be a 1-D np.ndarray or pd.Series.
xs : Optional[Union[pd.DataFrame, np.ndarray]]
If specified, uses those features to compute the ruleset's activation. Does not change the activation vectors nor the predictions of its rules. If specified and if self.stack_activation is True, will overwrite the ruleset's stacked activation vectors. Else, only ruleset's activation is overwritten.
weights : Optional[Union[pd.Series, str]]
Optional weights. If is a pd.Series, expected the index to be the rules names. If is a str, a pd.Series will be constructed by fetching each rules' attribute named after the given string (ex: it can be 'criterion'). Useless if predictions is specified.
predictions : Optional[pd.Series]
The vector of predictions of the ruleset. If not specified, is computed using self.calc_prediction
kwargs
 
Expand source code
def calc_criterion(
    self,
    y: Union[np.ndarray, pd.Series],
    xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
    weights: Optional[Union[pd.Series, str]] = None,
    predictions: Optional[pd.Series] = None,
    **kwargs,
):
    """Computes the criterion of an entier ruleset. Predictions of rules must have been computed beforehand.

    This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
    not suffice.

    Sets self.criterion so does not return anything

    Parameters
    ----------
    y: [np.ndarray, pd.Series]
        The targets on which to evaluate the ruleset criterions. Must be a 1-D np.ndarray or pd.Series.
    xs: Optional[Union[pd.DataFrame, np.ndarray]]
        If specified, uses those features to compute the ruleset's activation. Does not change the activation
        vectors nor the predictions of its rules. If specified and if self.stack_activation is True, will overwrite
        the ruleset's stacked activation vectors. Else, only ruleset's activation is overwritten.
    weights: Optional[Union[pd.Series, str]]
        Optional weights. If is a pd.Series, expected the index to be the rules names. If is a str, a pd.Series
        will be constructed by fetching each rules' attribute named after the given string
        (ex: it can be 'criterion').
        Useless if predictions is specified.
    predictions: Optional[pd.Series]
        The vector of predictions of the ruleset. If not specified, is computed using self.calc_prediction
    kwargs
    """
    if len(self) == 0:
        return np.nan
    if self._rule_type is None:
        return np.nan
    if predictions is None:
        predictions = self.predict(xs=xs, weights=weights)
    if len(predictions) == 0:
        self.criterion = np.nan
        return
    if issubclass(self.rule_type, ClassificationRule):
        if xs is not None:
            activation = self.evaluate_self_activation(xs=xs).raw
        else:
            if self._activation is None or self._activation.length == 0:
                self.compute_self_activation()
            activation = self.activation
        self.criterion = functions.calc_classification_criterion(activation, predictions, y, **kwargs)
    elif issubclass(self.rule_type, RegressionRule):
        # noinspection PyTypeChecker
        self.criterion = functions.calc_regression_criterion(predictions, y, **kwargs)
    else:
        raise TypeError(f"Unexpected rule type '{self.rule_type}'")
def calc_criterions(self, y: Union[numpy.ndarray, pandas.core.series.Series], predictions: Optional[None] = None, stacked_activations: Optional[pandas.core.frame.DataFrame] = None, **kwargs) ‑> pandas.core.series.Series

Will compute the criterion of each rule in the ruleset. Does NOT set anything in either the rules nor the ruleset.

This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might not suffice.

Parameters

y : [np.ndarray, pd.Series]
The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D np.ndarray or pd.Series.
predictions : Optional[pd.Series]
Prediction of each rules. If None, will call self.calc_predictions(y)
stacked_activations : Optional[pd.DataFrame]
If specified, uses those activations instead of self.stacked_activations
kwargs
 

Returns

pd.Series
Criterion values a set of rules (pd.Series)
Expand source code
def calc_criterions(
    self,
    y: Union[np.ndarray, pd.Series],
    predictions: Optional[pd.Series] = None,
    stacked_activations: Optional[pd.DataFrame] = None,
    **kwargs,
) -> pd.Series:
    """
    Will compute the criterion of each rule in the ruleset. Does NOT set anything in either the rules nor the
    ruleset.

    This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
    not suffice.

    Parameters
    ----------
    y: [np.ndarray, pd.Series]
        The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D
        np.ndarray or pd.Series.
    predictions: Optional[pd.Series]
        Prediction of each rules. If None, will call self.calc_predictions(y)
    stacked_activations: Optional[pd.DataFrame]
        If specified, uses those activations instead of self.stacked_activations
    kwargs

    Returns
    -------
    pd.Series
        Criterion values a set of rules (pd.Series)

    """
    if stacked_activations is None:
        stacked_activations = self.stacked_activations
        if stacked_activations is None:
            raise ValueError("Stacked activation vectors are needed")

    if predictions is None:
        predictions = self.calc_predictions(y=y)
        """unique prediction of each rules in a pd.Series"""

    if self._rule_type is None:
        return pd.Series(dtype=int)
    if issubclass(self.rule_type, ClassificationRule):
        return functions.calc_classification_criterion(stacked_activations, predictions, y, **kwargs)
    elif issubclass(self.rule_type, RegressionRule):
        return functions.calc_regression_criterion(
            stacked_activations.replace(0, np.nan) * predictions, y, **kwargs
        )
    else:
        raise TypeError(f"Unexpected rule type '{self.rule_type}'")
def calc_predictions(self, y: [], stacked_activations: Optional[pandas.core.frame.DataFrame] = None) ‑> Union[pandas.core.series.Series, pandas.core.frame.DataFrame]

Will compute the prediction of each rule in the ruleset. Does NOT set anything in either the rules nor the ruleset.

This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might not suffice.

Parameters

y : [np.ndarray, pd.Series]
The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D np.ndarray or pd.Series.
stacked_activations : Optional[pd.DataFrame)
If specified, uses this activation instead of self.activation

Returns

Union[pd.Series, pd.DataFrame]

Regression : A pd.Series with the rules as index and the values being the predictions

Classification : A pd.DataFrame with rules as index and classes as columns, and class probabilities as values

Expand source code
def calc_predictions(
    self, y: [np.ndarray, pd.Series], stacked_activations: Optional[pd.DataFrame] = None
) -> Union[pd.Series, pd.DataFrame]:
    """
    Will compute the prediction of each rule in the ruleset. Does NOT set anything in either the rules nor the
    ruleset.

    This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
    not suffice.

    Parameters
    ----------
    y: [np.ndarray, pd.Series]
        The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D
        np.ndarray or pd.Series.
    stacked_activations: Optional[pd.DataFrame)
        If specified, uses this activation instead of self.activation

    Returns
    -------
    Union[pd.Series, pd.DataFrame]
        Regression : A pd.Series with the rules as index and the values being the predictions\n
        Classification : A pd.DataFrame with rules as index and classes as columns, and class probabilities as
        values
    """
    if stacked_activations is None:
        stacked_activations = self.stacked_activations
        if stacked_activations is None:
            raise ValueError("Stacked activation vectors are needed")

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        if len(self) == 0:
            if self.rule_type is None:
                return pd.Series(dtype=int)
            elif issubclass(self.rule_type, ClassificationRule):
                return pd.DataFrame(dtype=int)
            elif issubclass(self.rule_type, RegressionRule):
                return pd.Series(dtype=int)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

        if issubclass(self.rule_type, ClassificationRule):
            class_probabilities = functions.class_probabilities(stacked_activations, y)
            maxs = class_probabilities.max()
            return class_probabilities[class_probabilities == maxs].apply(
                lambda x: x.dropna().sort_index().index[0]
            )
        elif issubclass(self.rule_type, RegressionRule):
            return functions.conditional_mean(stacked_activations, y)
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")
def calc_signs(self, predictions: Optional[None] = None) ‑> pandas.core.series.Series

Computes the sign of each rule in the ruleset.

Rules predictions must have been set or be given to the method.

Expand source code
def calc_signs(self, predictions: Optional[pd.Series] = None) -> pd.Series:
    """
    Computes the sign of each rule in the ruleset.

    Rules predictions must have been set or be given to the method.
    """
    if issubclass(self.rule_type, RegressionRule):
        if predictions is None:
            predictions = pd.Series({str(r.condition): r.prediction for r in self})
        return predictions.apply(lambda x: None if x is None else ("-" if x < 0 else "+"))
    else:
        raise TypeError(f"'sign' can not be computed for '{self.rule_type}'")
def calc_stds(self, y: [], stacked_activations: Optional[pandas.core.frame.DataFrame] = None) ‑> pandas.core.series.Series

Will compute the std of each rule in the ruleset. Does NOT set anything in either the rules nor the ruleset.

This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might not suffice.

Parameters

y : [np.ndarray, pd.Series]
 
The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D np.ndarray
or pd.Series.
stacked_activations : Optional[pd.DataFrame]
If specified, uses this activation instead of self.activation

Returns

pd.Series
A pd.Series with the rules as index and the values being the std
Expand source code
def calc_stds(self, y: [np.ndarray, pd.Series], stacked_activations: Optional[pd.DataFrame] = None) -> pd.Series:
    """
    Will compute the std of each rule in the ruleset. Does NOT set anything in either the rules nor the
    ruleset.

    This uses the ruleset's stacked activation, so do not use it with too large rulesets otherwise your memory might
    not suffice.

    Parameters
    ----------
    y: [np.ndarray, pd.Series]
      The targets on which to evaluate the rules predictions, and possibly other criteria. Must be a 1-D np.ndarray
      or pd.Series.
    stacked_activations: Optional[pd.DataFrame]
        If specified, uses this activation instead of self.activation

    Returns
    -------
    pd.Series
        A pd.Series with the rules as index and the values being the std
    """
    if not issubclass(self.rule_type, RegressionRule):
        raise TypeError(f"'std' can not be computed for '{self.rule_type}'")

    if stacked_activations is None:
        stacked_activations = self.stacked_activations
        if stacked_activations is None:
            raise ValueError("Stacked activation vectors are needed")

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        if len(self) == 0:
            if self._rule_type is None:
                return pd.Series(dtype=int)
            elif issubclass(self.rule_type, RegressionRule):
                return pd.Series(dtype=int)
            else:
                raise TypeError(f"Unexpected rule type '{self.rule_type}'")

        if issubclass(self.rule_type, RegressionRule):
            return functions.conditional_std(stacked_activations, y)
        else:
            raise TypeError(f"Unexpected rule type '{self.rule_type}'")
def calc_test_set_size(self, y: Union[numpy.ndarray, pandas.core.series.Series])
Expand source code
def calc_test_set_size(self, y: Union[np.ndarray, pd.Series]):
    if isinstance(y, (pd.Series, pd.DataFrame)):
        self.test_set_size = len(y.index)
    else:
        self.test_set_size = len(y)
def calc_test_set_sizes(self, y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> pandas.core.series.Series
Expand source code
def calc_test_set_sizes(self, y: Union[np.ndarray, pd.Series]) -> pd.Series:
    if isinstance(y, (pd.Series, pd.DataFrame)):
        s = len(y.index)
    else:
        s = len(y)
    return pd.Series({str(r.condition): s for r in self})
def calc_train_set_size(self, y: Union[numpy.ndarray, pandas.core.series.Series])
Expand source code
def calc_train_set_size(self, y: Union[np.ndarray, pd.Series]):
    if isinstance(y, (pd.Series, pd.DataFrame)):
        self.train_set_size = len(y.index)
    else:
        self.train_set_size = len(y)
def calc_train_set_sizes(self, y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> pandas.core.series.Series
Expand source code
def calc_train_set_sizes(self, y: Union[np.ndarray, pd.Series]) -> pd.Series:
    if isinstance(y, (pd.Series, pd.DataFrame)):
        s = len(y.index)
    else:
        s = len(y)
    return pd.Series({str(r.condition): s for r in self})
def calc_zscores(self, y: numpy.ndarray, predictions: Optional[None] = None, nones: Optional[None] = None, horizon: int = 1) ‑> pandas.core.series.Series
Expand source code
def calc_zscores(
    self,
    y: np.ndarray,
    predictions: Optional[pd.Series] = None,
    nones: Optional[pd.Series] = None,
    horizon: int = 1,
) -> pd.Series:
    if not issubclass(self.rule_type, RegressionRule):
        raise TypeError(f"'sign' can not be computed for '{self.rule_type}'")
    if nones is None:
        nones = pd.Series({str(r.condition): r.nones for r in self})

    if predictions is None:
        predictions = self.calc_predictions(y=y)
        """unique prediction of each rules in a pd.Series"""

    return calc_zscore_external(prediction=predictions, nones=nones, y=y, horizon=horizon)
def compute_self_activation(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None)

Computes the activation vector of self from its rules. If xs is specified, uses it to remake the rules' activation vectors, but do not set them as the 'activation' attributes of the rules

Expand source code
def compute_self_activation(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
    """Computes the activation vector of self from its rules. If xs is specified, uses it to remake the
    rules' activation vectors, but do not set them as the 'activation' attributes of the rules"""
    self._activation = self.evaluate_self_activation(xs=xs)
def compute_stacked_activation(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None)

Computes the stacked activation vectors of self from its rules. If xs is specified, uses it to remake the rules' activation vectors, but do not set them as the 'activation' attributes of the rules

Expand source code
def compute_stacked_activation(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
    """Computes the stacked activation vectors of self from its rules. If xs is specified, uses it to remake the
    rules' activation vectors, but do not set them as the 'activation' attributes of the rules"""
    self.stacked_activations = self.evaluate_stacked_activations(xs=xs)
def del_activation(self)

Deletes the activation vector's data of self, but not the object itself, so any computed attribute remains available

Expand source code
def del_activation(self):
    """Deletes the activation vector's data of self, but not the object itself, so any computed attribute remains
    available"""
    if hasattr(self, "_activation") and self._activation is not None:
        self._activation.delete()
def del_activations(self)

Deletes the data, but not the relevent attributes, of the activation vector or each rules in self.

Expand source code
def del_activations(self):
    """Deletes the data, but not the relevent attributes, of the activation vector or each rules in self."""
    for r in self:
        r.del_activation()
def del_stacked_activations(self)

Deletes stacked activation vectors of self. Set it to None.

Expand source code
def del_stacked_activations(self):
    """Deletes stacked activation vectors of self. Set it to None."""
    if hasattr(self, "stacked_activations") and self.stacked_activations is not None:
        del self.stacked_activations
        self.stacked_activations = None
def eval(self, y: Union[numpy.ndarray, pandas.core.series.Series], xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None, keep_new_activations: bool = False, weights: Union[pandas.core.series.Series, str, ForwardRef(None)] = None, force_if_not_good: bool = False, **kwargs) ‑> List[Rule]

Evaluate the ruleset on y and xs to produce the rules' attributes relevant to the test set. Will recompute the ruleset's stacked activation vector if using stakced fit.

Parameters

y : Union[np.ndarray, pd.Series]
 
xs : Optional[Union[pd.DataFrame, np.ndarray]]
 
keep_new_activations : bool
If True, activation vectgor and stacked activation vectors will be kept as ruleset's attribute and replace the ones computed using the train set, if any. Will also change the activation vectors of the rules.
weights : Optional[Union[pd.Series, str]]
Optional weights for calc_critetion. If is a pd.Series, expects the index to be the rules names. If is a str, a pd.Series will be constructed by fetching each rules' attribute named after the given string (ex: it can be 'criterion')
force_if_not_good : bool
If the rule was seen as "bad", eval will not trigger unless this boolean is True (Default value = False)
kwargs
 

Returns

List[Rule]
List of rules that were excluded from the ruleset after fitting because they were 'bad'
Expand source code
def eval(
    self,
    y: Union[np.ndarray, pd.Series],
    xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
    keep_new_activations: bool = False,
    weights: Optional[Union[pd.Series, str]] = None,
    force_if_not_good: bool = False,
    **kwargs,
) -> List[Rule]:
    """Evaluate the ruleset on y and xs to produce the rules' attributes relevant to the test set. Will recompute
    the ruleset's stacked activation vector if using stakced fit.

    Parameters
    ----------
    y: Union[np.ndarray, pd.Series]
    xs: Optional[Union[pd.DataFrame, np.ndarray]]
    keep_new_activations: bool
        If True, activation vectgor and stacked activation vectors will be kept as ruleset's attribute and replace
        the ones computed using the train set, if any. Will also change the activation vectors of the rules.
    weights: Optional[Union[pd.Series, str]]
        Optional weights for calc_critetion. If is a pd.Series, expects the index to be the rules names.
        If is a str, a pd.Series will be constructed by fetching each rules' attribute named after the given string
        (ex: it can be 'criterion')
    force_if_not_good: bool
        If the rule was seen as "bad", eval will not trigger unless this boolean is True (Default value = False)
    kwargs


    Returns
    -------
    List[Rule]
        List of rules that were excluded from the ruleset after fitting because they were 'bad'
    """
    if issubclass(self.rule_type, ClassificationRule):
        type_to_use = ClassificationRule
    elif issubclass(self.rule_type, RegressionRule):
        type_to_use = RegressionRule
    else:
        raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    if "method" in kwargs:
        raise IndexError("Key 'method' can not be given to 'eval'")

    def launch_method(method, **kw):
        expected_args = list(inspect.signature(method).parameters)
        if "kwargs" not in expected_args:
            kw = {item: kw[item] for item in kw if item in expected_args}
        return method(**kw)

    if xs is not None and len(xs) == 0:
        logger.warning("Given xs is empty")
        return []

    if len(self) == 0:
        logger.debug("Ruleset is empty. Nothing to fit.")
        return []

    if not all([r._fitted for r in self]):
        if xs is None:
            raise ValueError(
                "Not all rules of the ruleset were fitted. Please do so before calling ruleset.eval or provide xs."
            )
        else:
            keep_new_activations = True

    if self.__class__.STACKED_FIT:
        clean_activation = False

        # If test set is given, compute the activation used to evaluate test relative attributes.
        # Not the same as self.stacked_activations, computed from the train set.
        # Else, we evaluate on the train set, so we use self.stacked_actviation
        nones = None
        if xs is not None:
            if keep_new_activations:
                clean_activation = not self.stack_activation
                self.stack_activation = True
                self.calc_activation(xs=xs)  # Will reset rules' activation vectors too
                self.stack_activation = not clean_activation
                stacked_activations = self.stacked_activations
                activation = self._activation
                # noinspection PyUnresolvedReferences
                if "zscore" in self.rule_type.attributes_from_test_set:
                    nones = pd.Series({str(r.condition): r.nones for r in self})
            else:
                stacked_activations = self.evaluate_stacked_activations(xs)
                # noinspection PyUnresolvedReferences
                if "zscore" in self.rule_type.attributes_from_test_set:
                    activation, nones = self.evaluate_self_activation(xs, return_nones=True)
                else:
                    activation = self.evaluate_self_activation(xs)
        else:
            # If self.stacked_activations is None, compute it from the rules' activations. They must be available.
            if self.stacked_activations is None:
                # Else, will compute the stacked activation from the current rules activations
                clean_activation = not self.stack_activation
                self.stack_activation = True
                self.compute_stacked_activation()
                self.compute_self_activation()
                if self.stacked_activations is None:
                    raise ValueError("Rules activations must have been computed previously.")
                self.stack_activation = not clean_activation
            stacked_activations = self.stacked_activations
            activation = self._activation
            # noinspection PyUnresolvedReferences
            if "zscore" in self.rule_type.attributes_from_test_set:
                nones = pd.Series({str(r.condition): r.nones for r in self})

        if isinstance(y, np.ndarray):
            if not len(stacked_activations.index) == y.shape[0]:
                raise IndexError("Stacked activation and y have different number of rows.")
        else:
            if not len(stacked_activations.index) == len(y.index):
                raise IndexError("Stacked activation and y have different number of rows.")
            stacked_activations.index = y.index

        # noinspection PyUnresolvedReferences
        if "prediction" in self.rule_type.attributes_from_train_set:
            # Do NOT recompute prediction from test set : it does not make sense !
            predictions = pd.Series({str(r.condition): r.prediction for r in self})
        else:
            predictions = None
        computed_attrs = {}

        # noinspection PyUnresolvedReferences
        for attr in self.rule_type.attributes_from_test_set:
            if attr == "activation":
                raise ValueError("'activation' can not be specified in 'attributes_from_train_set'")
            computed_attrs[f"{attr}s"] = launch_method(
                getattr(self, f"calc_{attr}s"),
                y=y,
                xs=xs,
                stacked_activations=stacked_activations,
                activation=activation,
                nones=nones,
                predictions=predictions,
                **computed_attrs,
                **kwargs,
            )

        to_drop = []

        if clean_activation:
            self.del_stacked_activations()
        else:
            if isinstance(y, np.ndarray):
                stacked_activations.index = pd.RangeIndex(y.shape[0])
            else:
                stacked_activations.index = pd.RangeIndex(len(y.index))

        for ir in range(len(self)):
            self._rules[ir]._evaluated = True
            for attr in computed_attrs:
                setattr(self._rules[ir], f"_{attr[:-1]}", computed_attrs[attr].iloc[ir])
                if self._rules[ir].good:
                    self._rules[ir].check_thresholds(attr[:-1])
                if not self._rules[ir].good:
                    to_drop.append(self._rules[ir])
            if self._rules[ir].good:
                self._rules[ir].check_thresholds()
    else:
        [
            r.eval(
                xs=xs, y=y, recompute_activation=keep_new_activations, force_if_not_good=force_if_not_good, **kwargs
            )
            for r in self
        ]
        to_drop = [r for r in self if not r.good]

    if len(to_drop) > 0:
        rules = [r for r in self.rules if r not in to_drop]
        self._rules = []
        if self._activation is not None:
            self._activation.clear()
        self.del_stacked_activations()
        for r in rules:
            self.append(r, update_activation=False)

        # Recompute activation now that bad rules have been droped
        if self._activation is None or self._activation.length == 0 or (xs is not None and keep_new_activations):
            self._activation = None
            self.compute_self_activation()
        if self.stack_activation and (
                self.stacked_activations is None or (xs is not None and keep_new_activations)
        ):
            self.stacked_activations = None
            self.compute_stacked_activation()
    # If not bad rules were dropped and stacked fit was not used, still compute self.activation since it has not
    # been done
    elif not self.__class__.STACKED_FIT:
        if self._activation is None or self._activation.length == 0 or (xs is not None and keep_new_activations):
            self._activation = None
            self.compute_self_activation()
        if self.stack_activation and (self._activation is None or (xs is not None and keep_new_activations)):
            self.stacked_activations = None
            self.compute_stacked_activation()

    for attr in self.__class__.attributes_from_test_set[type_to_use]:
        launch_method(
            getattr(self, f"calc_{attr}"),
            y=y,
            xs=xs if not keep_new_activations else None,
            weights=weights,
            **kwargs,
        )

    return to_drop
def evaluate(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray]) ‑> Activation

Computes and returns the activation vector from an array of features.

Parameters

xs : Union[pd.DataFrame, np.ndarray]
The features on which the check whether the rule is activated or not. Must be a 2-D np.ndarray or pd.DataFrame.

Returns

Activation
 
Expand source code
def evaluate(self, xs: Union[pd.DataFrame, np.ndarray]) -> Activation:
    """Computes and returns the activation vector from an array of features.

    Parameters
    ----------
    xs: Union[pd.DataFrame, np.ndarray]
        The features on which the check whether the rule is activated or not. Must be a 2-D np.ndarray
        or pd.DataFrame.

    Returns
    -------
    Activation
    """
    if len(self) == 0:
        raise ValueError("Can not use evaluate : The ruleset is empty!")
    activations = [rule.evaluate_activation(xs) for rule in self.rules]
    return Activation.multi_logical_or(activations)
def evaluate_self_activation(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None, return_nones: bool = False)

Computes the activation vector of self from its rules, using time-efficient Activation.multi_logical_or.

Expand source code
def evaluate_self_activation(
    self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None, return_nones: bool = False
):
    """Computes the activation vector of self from its rules, using time-efficient Activation.multi_logical_or."""
    if len(self) == 0:
        act = Activation(np.array([]))
        if return_nones is True:
            return act, pd.Series(dtype=int)
        return act
    if xs is not None:
        activations = [r.evaluate_activation(xs) for r in self]
        activations_available = True
    else:
        activations = [r._activation for r in self]
        activations_available = all([r.activation_available for r in self])
    if activations_available:
        if len(self) == 1:
            act = Activation(activations[0].raw, optimize=activations[0].optimize, to_file=activations[0].to_file)
            if return_nones is True:
                return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
            return act
        try:
            act = Activation.multi_logical_or(activations)
            if return_nones is True:
                return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
            return act
        except (MemoryError, np.core._exceptions._ArrayMemoryError) as e:
            logger.warning(str(e))
            act = activations[0]
            for a in activations[1:]:
                act = act | a
            if return_nones is True:
                return act, pd.Series({str(self[i].condition): activations[i].nones for i in range(len(self))})
            return act
def evaluate_stacked_activations(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None)
Expand source code
def evaluate_stacked_activations(self, xs: Optional[Union[np.ndarray, pd.DataFrame]] = None):
    if len(self) == 0:
        return pd.DataFrame(dtype=int)
    if xs is not None:
        return pd.DataFrame({str(r.condition): r.evaluate_activation(xs).raw for r in self})
    activations_available = all([r.activation_available for r in self])
    if activations_available:
        # noinspection PyProtectedMember
        return pd.DataFrame({str(r.condition): r.activation for r in self})
def fit(self, y: Union[numpy.ndarray, pandas.core.series.Series], xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None, force_if_not_good: bool = False, **kwargs) ‑> List[Rule]

Fits the ruleset on y and xs to produce the rules' activation vectors and attributes relevant to train set.

Parameters

y : Union[np.ndarray, pd.Series]
 
xs : Optional[Union[pd.DataFrame, np.ndarray]]
 
force_if_not_good : bool
 
kwargs
 

Returns

List[Rule]
List of rules that were excluded from the ruleset after fitting because they were 'bad'
Expand source code
def fit(
    self,
    y: Union[np.ndarray, pd.Series],
    xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
    force_if_not_good: bool = False,
    **kwargs,
) -> List[Rule]:
    """Fits the ruleset on y and xs to produce the rules' activation vectors and attributes relevant to train set.

    Parameters
    ----------
    y: Union[np.ndarray, pd.Series]
    xs: Optional[Union[pd.DataFrame, np.ndarray]]
    force_if_not_good: bool
    kwargs

    Returns
    -------
    List[Rule]
        List of rules that were excluded from the ruleset after fitting because they were 'bad'
    """
    if issubclass(self.rule_type, ClassificationRule):
        type_to_use = ClassificationRule
    elif issubclass(self.rule_type, RegressionRule):
        type_to_use = RegressionRule
    else:
        raise TypeError(f"Unexpected rule type '{self.rule_type}'")

    if "method" in kwargs:
        raise IndexError("Key 'method' can not be given to 'fit'")

    def launch_method(method, **kw):
        expected_args = list(inspect.signature(method).parameters)
        if "kwargs" not in expected_args:
            kw = {item: kw[item] for item in kw if item in expected_args}
        return method(**kw)

    if xs is not None and len(xs) == 0:
        logger.warning("Given xs is empty")
        return []

    if len(self) == 0:
        logger.debug("Ruleset is empty. Nothing to fit.")
        return []

    if all([r._fitted for r in self]) and xs is None:
        return []

    if self.__class__.STACKED_FIT:

        clean_activation = False
        # Activation must always be computed from train set
        if xs is not None:
            clean_activation = not self.stack_activation
            self.stack_activation = True
            self.calc_activation(xs)
            self.stack_activation = not clean_activation
        elif self.stacked_activations is None:
            clean_activation = not self.stack_activation
            self.stack_activation = True
            self.compute_stacked_activation()
            self.stack_activation = not clean_activation

        if isinstance(y, np.ndarray):
            # noinspection PyUnresolvedReferences
            if not len(self.stacked_activations.index) == y.shape[0]:
                raise IndexError(
                    "Stacked activation and y have different number of rows. Use pd.Series for y to"
                    " reindex stacked activations automatically."
                )
        else:
            self.stacked_activations.index = y.index

        computed_attrs = {}
        # noinspection PyUnresolvedReferences
        for attr in self.rule_type.attributes_from_train_set:
            if attr == "activation":
                raise ValueError("'activation' can not be specified in 'attributes_from_train_set'")
            computed_attrs[f"{attr}s"] = launch_method(
                getattr(self, f"calc_{attr}s"), y=y, xs=xs, **computed_attrs, **kwargs
            )
        to_drop = []

        if clean_activation:
            self.del_stacked_activations()
        else:
            if isinstance(y, np.ndarray):
                self.stacked_activations.index = pd.RangeIndex(y.shape[0])
            else:
                self.stacked_activations.index = pd.RangeIndex(len(y.index))

        for ir in range(len(self)):
            self._rules[ir]._fitted = True
            for attr in computed_attrs:
                setattr(self._rules[ir], f"_{attr[:-1]}", computed_attrs[attr].iloc[ir])
                if self._rules[ir].good:
                    self._rules[ir].check_thresholds(attr[:-1])
                if not self._rules[ir].good:
                    to_drop.append(self._rules[ir])
            # To check attributes that are set along side others, like coverage
            if self._rules[ir].good:
                self._rules[ir].check_thresholds()
    else:
        [r.fit(xs=xs, y=y, force_if_not_good=force_if_not_good, **kwargs) for r in self]
        to_drop = [r for r in self if not r.good]

    if len(to_drop) > 0:
        rules = [r for r in self.rules if r not in to_drop]
        self._rules = []
        if self._activation is not None:
            self._activation.clear()
        self.del_stacked_activations()
        for r in rules:
            self.append(r, update_activation=False)

        # Recompute activation now that bad rules have been droped
        self._activation = None
        self.compute_self_activation()
        if self.stack_activation:
            self.stacked_activations = None
            self.compute_stacked_activation()
    # If not bad rules were dropped and stacked fit was not used, still compute self.activation since it has not
    # been done  (needed to set self.coverage), but not stacked (useless)
    elif not self.__class__.STACKED_FIT:
        if self._activation is None or self._activation.length == 0 or xs is not None:
            self._activation = None
            self.compute_self_activation()
        if self.stack_activation and (self.stacked_activations is None or xs is not None):
            self.stacked_activations = None
            self.compute_stacked_activation()

    for attr in self.__class__.attributes_from_train_set[type_to_use]:
        launch_method(
            getattr(self, f"calc_{attr}"),
            y=y,
            xs=xs,
            **kwargs,
        )

    return to_drop
def get_features_count(self) ‑> List[Tuple[Any, int]]

Get a counter of all different features in the ruleset. If names are not available, will use indexes.

Returns:

count : List[Tuple[Any, int]] Counter of all different features in the ruleset

Expand source code
def get_features_count(self) -> List[Tuple[Any, int]]:
    """
    Get a counter of all different features in the ruleset. If names are not available, will use indexes.

    Returns:
    --------
    count : List[Tuple[Any, int]]
        Counter of all different features in the ruleset
    """
    if len(self) == 0:
        return []
    if len(self.features_names) > 0:
        var_in = list(itertools.chain(*[rule.features_names for rule in self]))
    else:
        var_in = list(itertools.chain(*[rule.feautres_indexes for rule in self]))
    count = Counter(var_in)

    count = count.most_common()
    return count
def load(self, path, **kwargs)
Expand source code
def load(self, path, **kwargs):
    if hasattr(path, "read"):
        rules = path.read(**kwargs)
    else:
        rules = pd.read_csv(path, **kwargs)
    if "ruleset coverage" in rules.iloc[:, 0].values:
        self._coverage = rules[rules.iloc[:, 0] == "ruleset coverage"].iloc[0, 1]
        rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset coverage"].index)
    if "ruleset criterion" in rules.iloc[:, 0].values:
        self.criterion = rules[rules.iloc[:, 0] == "ruleset criterion"].iloc[0, 1]
        rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset criterion"].index)
    if "ruleset train set size" in rules.iloc[:, 0].values:
        self.train_set_size = rules[rules.iloc[:, 0] == "ruleset train set size"].iloc[0, 1]
        rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset train set size"].index)
        if isinstance(self.train_set_size, str):
            self.train_set_size = int(self.train_set_size)
    if "ruleset test set size" in rules.iloc[:, 0].values:
        self.test_set_size = rules[rules.iloc[:, 0] == "ruleset test set size"].iloc[0, 1]
        if isinstance(self.test_set_size, str):
            self.test_set_size = int(self.test_set_size)
        rules = rules.drop(rules[rules.iloc[:, 0] == "ruleset test set size"].index)
    if rules.empty:
        self._rules = []
    else:
        self._rules = [self.series_to_rule(rules.loc[r]) for r in rules.index]
    for r in self:
        if r._train_set_size is None:
            r._train_set_size = self.train_set_size
        elif isinstance(r._train_set_size, str):
            r._train_set_size = int(r._train_set_size)
        if r._test_set_size is None:
            r._test_set_size = self.test_set_size
        elif isinstance(r._test_set_size, str):
            r._test_set_size = int(r._test_set_size)

    if len(self._rules) > 0:
        self.rule_type = type(self._rules[0])
    self.compute_self_activation()
    if self.stack_activation:
        self.compute_stacked_activation()
    self.features_names = list(set(traverse([rule.features_names for rule in self])))
def predict(self, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None, weights: Union[pandas.core.series.Series, str, ForwardRef(None)] = None) ‑> pandas.core.series.Series

Computes the prediction vector of an entier ruleset from its rules predictions and its activation vector. Predictions of rules must have been computed beforehand if 'xs' is not specified.

Parameters

xs : Optional[Union[pd.DataFrame, np.ndarray]]
If specified, uses those features to compute the ruleset's activation. Does not change the activation vectors nor the predictions of the ruleset's rules nor its own activation and stacked activations.
weights : Optional[Union[pd.Series, str]]
Optional weights. If is a pd.Series, expects the index to be the rules names. If is a str, a pd.Series will be constructed by fetching each rules' attribute named after the given string (ex: it can be 'criterion')

Returns

pd.Series
The prediction vector of the ruleset. Index are observations numbers, values are the predicted values when the ruleset predicts, else NaN.
Expand source code
def predict(
    self,
    xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
    weights: Optional[Union[pd.Series, str]] = None,
) -> pd.Series:
    """Computes the prediction vector of an entier ruleset from its rules predictions and its activation vector.
    Predictions of rules must have been computed beforehand if 'xs' is not specified.

    Parameters
    ----------
    xs: Optional[Union[pd.DataFrame, np.ndarray]]
        If specified, uses those features to compute the ruleset's activation. Does not change the activation
        vectors nor the predictions of the ruleset's rules nor its own activation and stacked activations.
    weights: Optional[Union[pd.Series, str]]
        Optional weights. If is a pd.Series, expects the index to be the rules names. If is a str, a pd.Series
        will be constructed by fetching each rules' attribute named after the given string
        (ex: it can be 'criterion')

    Returns
    -------
    pd.Series
        The prediction vector of the ruleset. Index are observations numbers, values are the predicted values when
        the ruleset predicts, else NaN.
    """
    if len(self) == 0:
        return pd.Series(dtype=int)
    if self._rule_type is None:
        return pd.Series(dtype=int)

    stacked = self.__class__.STACKED_FIT and self.stacked_activations is not None

    stacked_activations = None
    if xs is not None and stacked:
        stacked_activations = self.evaluate_stacked_activations(xs=xs)
    if stacked:
        return self._calc_prediction_stacked(weights=weights, stacked_activations=stacked_activations)
    else:
        return self._calc_prediction_unstacked(weights=weights, xs=xs)
def save(self, path)
Expand source code
def save(self, path):
    df = self.to_df()

    if hasattr(path, "write"):
        path.write(df)
    else:
        df.to_csv(path)
def set_features_indexes(self)
Expand source code
def set_features_indexes(self):
    if len(self.__class__.all_features_indexes) > 0:
        self.features_indexes = [self.__class__.all_features_indexes[f] for f in self.features_names]
        for r in self._rules:
            # noinspection PyProtectedMember
            r._condition._features_indexes = [self.__class__.all_features_indexes[f] for f in r.features_names]
    else:
        list(set(itertools.chain(*[rule.features_indexes for rule in self])))
def sort(self, criterion: str = None, reverse: bool = False)

Sorts the RuleSet.

  • If criterion is not speficied: Will sort the rules according to : 1. The number of features they talk about 2. For a same number of features (sorted in alphabetical order, or index if names are not available, optionally reversed), the bmins and bmaxs of the rules
  • If criterion is specified, it must be an float or interger attribute of rule, condition or activation. Then sorts according to this criterion, optionally reversed.
Expand source code
def sort(self, criterion: str = None, reverse: bool = False):
    """Sorts the RuleSet.

    * If criterion is not speficied:
        Will sort the rules according to :
            1. The number of features they talk about
            2. For a same number of features (sorted in alphabetical order, or index if names are not available,
                optionally reversed), the bmins and bmaxs of the rules
    * If criterion is specified, it must be an float or interger attribute of rule, condition or activation. Then
        sorts according to this criterion, optionally reversed.
    """
    if len(self) == 0:
        return

    if criterion is None or criterion == "":
        if not (hasattr(self[0].condition, "bmins") and hasattr(self[0].condition, "bmaxs")):
            return
        # The set of all the features the RuleSet talks about
        which = "index"
        if len(self.features_names) > 0:
            which = "name"
            fnames_or_indexes = list(set([str(r.features_names) for r in self]))
        else:
            fnames_or_indexes = list(set([str(r.features_indexes) for r in self]))
        dict_names = {}
        lmax = 1
        for f in fnames_or_indexes:
            l_ = len(ast.literal_eval(f))
            if l_ > lmax:
                lmax = l_
            if l_ not in dict_names:
                dict_names[l_] = []
            dict_names[l_].append(f)
        for l_ in dict_names:
            dict_names[l_].sort(reverse=reverse)
        fnames_or_indexes = []
        for l_ in range(1, lmax + 1):
            if l_ in dict_names:
                fnames_or_indexes += dict_names[l_]

        rules_by_fnames = OrderedDict({f: [] for f in fnames_or_indexes})
        for rule in self:
            if which == "name":
                v = str(rule.features_names)
            else:
                v = str(rule.features_indexes)
            rules_by_fnames[v].append(rule)
        rules_by_fnames = {
            n: sorted(rules_by_fnames[n], key=lambda x: x.condition.bmins + x.condition.bmaxs)
            for n in rules_by_fnames
        }
        self._rules = []
        for n in rules_by_fnames:
            self._rules += rules_by_fnames[n]
    elif hasattr(self[0], criterion):
        self._rules = sorted(self, key=lambda x: getattr(x, criterion), reverse=reverse)
    else:
        raise ValueError(f"Can not sort RuleSet according to criterion {criterion}")
    if self.stack_activation:
        self.stacked_activations = self.stacked_activations[[str(r.condition) for r in self]]
def to_df(self, custom_rules_to_series: Optional[Callable] = None, ruleset_attributes: bool = True) ‑> pandas.core.frame.DataFrame
Expand source code
def to_df(self, custom_rules_to_series: Optional[Callable] = None, ruleset_attributes: bool = True) -> pd.DataFrame:
    if len(self) == 0:
        return pd.DataFrame()
    idx = copy(self._rule_type.index)

    if custom_rules_to_series is None:
        dfs = [
            self.rule_to_series(
                (i, r),
                index=idx,
            )
            for i, r in enumerate(self.rules)
        ]
    else:
        dfs = custom_rules_to_series(index=idx)

    if len(dfs) > 0:
        df = pd.concat(dfs, axis=1).T
    else:
        df = pd.DataFrame(columns=idx)

    for i in range(len(df.index)):
        # noinspection PyProtectedMember
        self._rules[i]._name = df.index[i]

    if not ruleset_attributes:
        return df

    s_cov = pd.DataFrame(
        columns=df.columns,
        data=[[self.ruleset_coverage if i == 0 else np.nan for i in range(len(df.columns))]],
        index=["ruleset coverage"],
    )
    s_crit = pd.DataFrame(
        columns=df.columns,
        data=[[self.criterion if i == 0 else np.nan for i in range(len(df.columns))]],
        index=["ruleset criterion"],
    )
    s_train = pd.DataFrame(
        columns=df.columns,
        data=[[self.train_set_size if i == 0 else np.nan for i in range(len(df.columns))]],
        index=["ruleset train set size"],
    )
    s_test = pd.DataFrame(
        columns=df.columns,
        data=[[self.train_set_size if i == 0 else np.nan for i in range(len(df.columns))]],
        index=["ruleset test set size"],
    )
    return pd.concat([df, s_cov, s_crit, s_train, s_test])