Module ruleskit.utils.rfunctions
Expand source code
import numpy as np
from collections import Counter
from typing import Union, Tuple, List, Optional
import logging
import pandas as pd
logger = logging.getLogger(__name__)
def class_probabilities(
activation: Union[np.ndarray, pd.DataFrame, None], y: Union[np.ndarray, pd.Series]
) -> Union[np.ndarray, pd.DataFrame]:
"""Computes the class probability of each rule(s)
Parameters
----------
activation: Union[np.ndarray, pd.DataFrame, None]
Either the activation vector of one rule (np.ndarray) or a DataFrame of activation vectors of many rules (one rule
is one column)
y: Union[np.ndarray, pd.Series]
The target classes
Returns
-------
Union[np.ndarray, pd.DataFrame]
If given one activation vector, returns a np.ndarray of the form [(class1, prob 1), ..., (class n, prob n)].
If given a df of activation vectors, returns a df with the classes as index, the rules as columns and the
probabilities as values.
"""
if activation is None:
return np.bincount(y).argmax()
if not isinstance(activation, pd.DataFrame) and not isinstance(activation, np.ndarray):
raise TypeError("'activation' in most_common_class must be None or a np.ndarray or a pd.DataFrame")
if isinstance(activation, np.ndarray):
y_conditional = np.extract(activation, y)
count = Counter(y_conditional)
n = len(y_conditional)
prop = [v / n for v in count.values()]
proba = np.array([(c, v) for c, v in zip(count.keys(), prop)])
proba = proba[proba[:, 0].argsort()]
return proba
else:
if isinstance(y, pd.DataFrame):
if len(y.columns) == 1:
y = y.squeeze()
else:
raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series")
elif isinstance(y, np.ndarray):
if len(y.shape) == 1:
y = pd.Series(y)
elif y.shape[1] == 1:
y = pd.Series(y.squeeze())
else:
raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series")
y_conditional = (
activation.mul(y.replace(0, "zero"), axis=0).replace(0, np.nan).replace("", np.nan).replace("zero", 0)
)
count = y_conditional.apply(lambda x: x.value_counts())
count.index = count.index.astype(y.dtype)
return count.apply(lambda x: x / x.dropna().sum())
def conditional_mean(
activation: Union[np.ndarray, pd.DataFrame, None], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
"""Mean of all activated values
If activation is None, we assume the given y have already been extracted from the activation vector,
which saves time.
"""
if activation is None:
return float(np.nanmean(y))
if not isinstance(activation, pd.DataFrame) and not isinstance(activation, np.ndarray):
raise TypeError("'activation' in conditional_mean must be None or a np.ndarray or a pd.DataFrame")
if isinstance(activation, np.ndarray):
y_conditional = np.extract(activation, y)
non_nans_conditional_y = y_conditional[~np.isnan(y_conditional)]
if len(non_nans_conditional_y) == 0:
logger.debug(
"None of the activated points have a non-nan value in target y." " Conditional mean is set to nan."
)
return np.nan
return float(np.mean(non_nans_conditional_y))
else:
return activation.apply(lambda x: np.nanmean(np.extract(x, y)))
def conditional_std(
activation: Union[np.ndarray, pd.DataFrame, None], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
"""Standard deviation of all activated values
If activation is None, we assume the given y have already been extracted from the activation vector,
which saves time.
"""
if activation is None:
return float(np.nanstd(y))
if not isinstance(activation, pd.DataFrame) and not isinstance(activation, np.ndarray):
raise TypeError("'activation' in conditional_std must be None or a np.ndarray or a pd.DataFrame")
if isinstance(activation, np.ndarray):
y_conditional = np.extract(activation, y)
if len(y_conditional) == 0:
return np.nan
if len(y_conditional) == 1:
return 0
# ddof ensures numpy uses non-biased estimator of std, like pandas' default
return float(np.nanstd(y_conditional, ddof=1))
else:
if isinstance(y, pd.DataFrame):
if len(y.columns) == 1:
y = y.squeeze()
else:
raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series")
elif isinstance(y, np.ndarray):
if len(y.shape) == 1:
y = pd.Series(y)
elif y.shape[1] == 1:
y = pd.Series(y.squeeze())
else:
raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series")
y_conditional = (
activation.mul(y.replace(0, "zero"), axis=0).replace(0, np.nan).replace("", np.nan).replace("zero", 0)
)
return y_conditional.std()
def mse_function(
prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
"""
Compute the mean squared error
"$ \\dfrac{1}{n} \\Sigma_{i=1}^{n} (\\hat{y}_i - y_i)^2 $"
Parameters
----------
prediction_vector: Union[pd.Series, pd.DataFrame]
A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two
different values ymean, if the rule is not active and the prediction is the rule is active.
y: Union[np.ndarray, pd.Series]
The real target values (real numbers)
Returns
-------
criterion: Union[float, pd.Series]
the mean squared error
"""
if not isinstance(prediction_vector, pd.DataFrame) and not isinstance(prediction_vector, pd.Series):
raise TypeError("'prediction_vector' in mse_function must be a pd.Series or a pd.DataFrame")
if isinstance(prediction_vector, np.ndarray):
if len(prediction_vector) != len(y):
raise ValueError("Predictions and y must have have the same length")
error_vector = prediction_vector - y
criterion = np.nanmean(error_vector ** 2)
# noinspection PyTypeChecker
return criterion
else:
if len(prediction_vector.index) != len(y):
raise ValueError("Predictions and y must have the same length")
error_vector = prediction_vector.sub(y, axis=0)
return (error_vector ** 2).mean()
def mse_norm(
prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
return mse_function(prediction_vector, y) / np.mean((np.mean(y) - y) ** 2)
def mae_function(
prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
"""
Compute the mean absolute error
"$ \\dfrac{1}{n} \\Sigma_{i=1}^{n} |\\hat{y}_i - y_i| $"
Parameters
----------
prediction_vector: Union[pd.Series, pd.DataFrame]
A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two
different values ymean, if the rule is not active and the prediction is the rule is active.
y: Union[np.ndarray, pd.Series]
The real target values (real numbers)
Returns
-------
criterion: Union[float, pd.Series]
the mean absolute error
"""
if not isinstance(prediction_vector, pd.DataFrame) and not isinstance(prediction_vector, pd.Series):
raise TypeError("'prediction_vector' in mae_function must be a pd.Series or a pd.DataFrame")
if isinstance(prediction_vector, np.ndarray):
if len(prediction_vector) != len(y):
raise ValueError("The two array must have the same length")
error_vect = np.abs(prediction_vector - y)
criterion = np.nanmean(error_vect)
# noinspection PyTypeChecker
return criterion
else:
if len(prediction_vector.index) != len(y):
raise ValueError("Predictions and y must have the same length")
error_vect = prediction_vector.sub(y, axis=0).abs()
return error_vect.mean()
def mae_norm(
prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
return mae_function(prediction_vector, y) / np.mean(np.mean(y) - y)
def aae_function(
prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series]
) -> Union[float, pd.Series]:
"""
Compute the mean squared error
"$ \\dfrac{1}{n} \\Sigma_{i=1}^{n} (\\hat{y}_i - y_i)$"
Parameters
----------
prediction_vector: Union[pd.Series, pd.DataFrame]
A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two
different values ymean, if the rule is not active and the prediction is the rule is active.
y: Union[np.ndarray, pd.Series]
The real target values (real numbers)
Returns
-------
criterion: Union[float, pd.Series]
the mean squared error, or a Series of mean squared errors
"""
if not isinstance(prediction_vector, pd.DataFrame) and not isinstance(prediction_vector, pd.Series):
raise TypeError("'prediction_vector' in aae_function must be a pd.Series or a pd.DataFrame")
if isinstance(prediction_vector, np.ndarray):
if len(prediction_vector) != len(y):
raise ValueError("The two array must have the same length")
error = np.nanmean(np.abs(prediction_vector - y))
median = np.nanmean(np.abs(y - np.median(y)))
return error / median
else:
if len(prediction_vector.index) != len(y):
raise ValueError("Predictions and y must have the same length")
error_vector = prediction_vector.sub(y, axis=0).abs().mean()
median = np.mean(np.abs(y - np.median(y)))
return error_vector / median
def calc_regression_criterion(
prediction: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series], **kwargs
) -> Union[float, pd.Series]:
"""
Compute the criterion
Parameters
----------
prediction: Union[pd.Series, pd.DataFrame]
The prediction vector of one rule, or the stacked prediction vectors of a ruleset
y: Union[np.ndarray, pd.Series]
The real target values (real numbers)
kwargs:
Can contain 'criterion_method', the criterion_method mse_function or mse_function criterion (default is 'mse')
Returns
-------
criterion: Union[float, pd.Series]
Criterion value of one rule or ruleset, or the Series of the criterion values of several rules
"""
criterion_method = kwargs.get("criterion_method", "mse_norm")
if isinstance(y, pd.Series):
y = y.values
if criterion_method.lower() == "mse":
criterion = mse_function(prediction, y)
elif criterion_method.lower() == "mse_norm":
criterion = mse_norm(prediction, y)
elif criterion_method.lower() == "mae":
criterion = mae_function(prediction, y)
elif criterion_method.lower() == "mae_norm":
criterion = mae_norm(prediction, y)
elif criterion_method.lower() == "aae":
criterion = aae_function(prediction, y)
else:
raise ValueError(f"Unknown criterion: {criterion_method}. Please choose among mse, mae and aae")
return criterion
def success_rate(
prediction: Union[float, int, str, pd.Series, np.integer, np.float64], y: Union[np.ndarray, pd.DataFrame]
) -> Union[float, pd.Series]:
"""
Returns the number fraction of y that equal the prediction.
Parameters
----------
prediction: Union[float, int, str, pd.Series, np.integer, np.float64]
The label prediction, of one rule (float ,int or str) or of a set of rules (pd.Series),
of a prediction vector (pd.Series)
y: Union[np.ndarray, pd.DataFrame]
The real target points activated by the rule (np.ndarray, without nans) or the rules
(pd.DataFrame, can contain nans even alongside strings)
Returns
-------
Union[float, pd.Series]
The fraction of y that equal one rule or ruleset's prediction (float) or many rules predictions (pd.Series).
"""
if not isinstance(prediction, (pd.Series, float, int, str, np.integer, np.float64)):
raise TypeError(
"'prediction' in success_rate must be an integer, a string or a pd.Series/np.ndarray of one of those,"
f"got {prediction} ({type(prediction)})."
)
if isinstance(prediction, (int, float, str, np.integer, np.float64)):
if len(y) == 0:
return np.nan
return sum(prediction == y) / len(y)
elif isinstance(y, np.ndarray) and isinstance(prediction, pd.Series):
if len(y) == 0:
return pd.Series(dtype=int)
return sum(prediction == y) / len(prediction.dropna())
else:
if not isinstance(y, pd.DataFrame):
raise TypeError(
f"If passing several predictions as a Series, then activated Y must be a DataFrame, not {type(y)}"
)
if len(y.index) == 0:
return pd.Series(dtype=int)
activated_points = (~y.isnull()).sum()
correctly_predicted = y == prediction
return correctly_predicted.sum() / activated_points
def calc_classification_criterion(
activation_vector: Union[np.ndarray, pd.DataFrame],
prediction: Union[float, int, str, pd.Series, np.integer, np.float64],
y: Union[np.ndarray, pd.Series],
**kwargs,
) -> Union[float, pd.Series]:
"""
Computes the criterion
Parameters
----------
activation_vector: Union[np.ndarray, pd.DataFrame]
The activation vector of one rule, or of one ruleset, or the stacked activation vectors of a ruleset
prediction: Union[float, int, str, pd.Series, np.integer, np.float64]
The label prediction, of one rule (float, int, or str) or of a set of rules (pd.Series), or
the label prediction of one ruleset at each observations (pd.Series)
y: Union[np.ndarray, pd.Series, pd.DataFrame]
The real target values. If a dataframe is given, it must have one column only.
kwargs:
Can contain 'method', indicating how to evaluate the criterion. For now, one can use:\n * success_rate (default)
Returns
-------
Union[float, pd.Series]
Criterion value of one rule or ruleset (float) or of a set of rules (pd.Series)
"""
method = kwargs.get("criterion_method", "success_rate")
if not isinstance(activation_vector, pd.DataFrame) and not isinstance(activation_vector, np.ndarray):
raise TypeError("'activation_vector' in calc_classification_criterion must be a np.ndarray or a pd.DataFrame")
if isinstance(prediction, np.ndarray):
raise TypeError("Prediction should not be a numpy array")
if isinstance(activation_vector, np.ndarray):
y_conditional = np.extract(activation_vector != 0, y)
if isinstance(prediction, pd.Series):
if len(prediction) != len(activation_vector):
raise IndexError("Activation vector and prediction vector should have the same length")
prediction = prediction[activation_vector != 0]
if method.lower() == "success_rate":
return success_rate(prediction, y_conditional)
else:
raise ValueError(f"Unknown criterion: {method}. Please choose among:\n* success_rate")
else:
if not isinstance(prediction, pd.Series):
raise TypeError(
"If passing several activation vector as a DataFrame, then prediction must be a pd.Series,"
f" not {type(prediction)}"
)
if isinstance(y, pd.DataFrame):
if len(y.columns) == 1:
y = y.squeeze()
else:
raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series")
elif isinstance(y, np.ndarray):
if len(y.shape) == 1:
y = pd.Series(y)
elif y.shape[1] == 1:
y = pd.Series(y.squeeze())
else:
raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series")
y_conditional = (
activation_vector.mul(y.replace(0, "zero"), axis=0)
.replace(0, np.nan)
.replace("", np.nan)
.replace("zero", 0)
)
return success_rate(prediction, y_conditional)
def init_weights_stacked(prediction_vectors: pd.DataFrame, weights: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
if weights.empty:
raise ValueError("Weights not None but empty : can not evaluate prediction")
weights = weights.replace(0, np.nan).fillna(value=np.nan).dropna(axis=1, how="all")
absent_rules = prediction_vectors.loc[:, ~prediction_vectors.columns.isin(weights.columns)].columns
present_rules = prediction_vectors.loc[:, prediction_vectors.columns.isin(weights.columns)].columns
if len(absent_rules) > 0:
s = (
"Some rules given in the prediction vector did not have a weight and will be ignored in the"
" computation of the ruleset prediction. The concerned rules are :"
)
s = "\n".join([s] + list(absent_rules.astype(str)))
logger.warning(s)
prediction_vectors = prediction_vectors[present_rules]
weights = (~prediction_vectors.isna() * 1).replace(0, np.nan) * weights
if prediction_vectors.empty:
raise ValueError(
"No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN"
)
return prediction_vectors, weights
# noinspection PyUnresolvedReferences
def calc_ruleset_prediction_weighted_regressor_unstacked(
rules: List["Rule"],
weights: pd.Series,
xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
) -> pd.Series:
if len(rules) == 0:
return pd.Series(dtype=int)
cum_pred = None
cum_w = None
for rule in rules:
rulename = str(rule.condition)
if rulename not in weights.index:
logger.warning(f"Rule '{rulename}' has no weight and is ignored in ruleset's prediction")
continue
if weights[rulename] == 0:
logger.warning(f"Rule '{rulename}' has a null weight and is ignored in ruleset's prediction")
continue
if np.isnan(weights[rulename]):
logger.warning(f"Rule '{rulename}' has a nan weight and is ignored in ruleset's prediction")
continue
if xs is None:
activation = rule.activation
else:
activation = rule.evaluate_activation(xs).raw
if activation is None:
continue
if len(activation) == 0:
continue
if cum_pred is None:
cum_pred = rule.calc_prediction_vector(activation=activation) * weights[rulename]
cum_w = activation * weights[rulename]
else:
cum_pred = (
(cum_pred.replace(0, "zero").replace(np.nan, 0) + activation * rule.prediction * weights[rulename])
.replace(0, np.nan)
.replace("zero", 0)
)
cum_w += activation * weights[rulename]
if cum_pred is None:
raise ValueError(
"No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN"
)
return cum_pred / cum_w
def calc_ruleset_prediction_weighted_regressor_stacked(
prediction_vectors: pd.DataFrame, weights: pd.DataFrame
) -> pd.Series:
prediction_vectors, weights = init_weights_stacked(prediction_vectors, weights)
idx = prediction_vectors.index
return ((prediction_vectors * weights).sum(axis=1) / weights.sum(axis=1)).reindex(idx)
# noinspection PyUnresolvedReferences
def calc_ruleset_prediction_equally_weighted_regressor_unstacked(
rules: List["Rule"], xs: Optional[Union[pd.DataFrame, np.ndarray]] = None
) -> pd.Series:
if len(rules) == 0:
return pd.Series(dtype=int)
cum_pred = None
cum_w = None
for rule in rules:
if xs is None:
activation = rule.activation
else:
activation = rule.evaluate_activation(xs).raw
if activation is None:
continue
if len(activation) == 0:
continue
if cum_pred is None:
cum_pred = rule.calc_prediction_vector(activation=activation)
cum_w = activation
else:
cum_pred = (
(cum_pred.replace(0, "zero").replace(np.nan, 0) + activation * rule.prediction)
.replace(0, np.nan)
.replace("zero", 0)
)
cum_w += activation
if cum_pred is None:
raise ValueError(
"No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN"
)
return cum_pred / cum_w
def calc_ruleset_prediction_equally_weighted_regressor_stacked(prediction_vectors: pd.DataFrame) -> pd.Series:
return prediction_vectors.mean(axis=1)
# noinspection PyUnresolvedReferences
def calc_ruleset_prediction_weighted_classificator_unstacked(
rules: List["Rule"], weights: pd.Series, xs: Optional[Union[pd.DataFrame, np.ndarray]] = None
) -> pd.Series:
if len(rules) == 0:
return pd.Series(dtype=int)
def get_max(s: pd.Series):
ps = s.index
m = s.max()
if isinstance(ps[0], str):
s = (s == m).astype(int).replace(0, np.nan)
s[~s.isna()] = ps[~s.isna()]
return s
else:
return (s == m).astype(int).replace(0, np.nan) * ps
preds = None
for rule in rules:
rulename = str(rule.condition)
if rulename not in weights.index:
logger.warning(f"Rule '{rulename}' has no weight and is ignored in ruleset's prediction")
continue
if weights[rulename] == 0:
logger.warning(f"Rule '{rulename}' has a null weight and is ignored in ruleset's prediction")
continue
if np.isnan(weights[rulename]):
logger.warning(f"Rule '{rulename}' has a nan weight and is ignored in ruleset's prediction")
continue
if xs is None:
activation = rule.activation
nones = rule.nones
else:
activation = rule.evaluate_activation(xs)
nones = activation.nones
activation = activation.raw
if activation is None:
continue
if nones == 0:
continue
if preds is None:
preds = pd.DataFrame(index=[rule.prediction], data=[activation * weights[rulename]])
else:
if rule.prediction not in preds.index:
preds.loc[rule.prediction] = activation * weights[rulename]
else:
preds.loc[rule.prediction] += activation * weights[rulename]
if preds is None:
raise ValueError(
"No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN"
)
best_preds = preds.max().replace(0, np.nan)
best_preds = best_preds[(preds == best_preds).sum() == 1]
preds.loc[:, ~preds.columns.isin(best_preds.index)] = np.nan
preds2 = preds.dropna(axis=1).apply(get_max)
if isinstance(preds.index[0], str):
preds2 = preds2.bfill().ffill().iloc[0].reindex(preds.columns)
preds2.name = None
return preds2
else:
return preds2.sum().reindex(preds.columns)
# noinspection PyUnresolvedReferences
def calc_ruleset_prediction_equally_weighted_classificator_unstacked(
rules: List["Rule"],
xs: Optional[Union[pd.DataFrame, np.ndarray]] = None,
) -> pd.Series:
return calc_ruleset_prediction_weighted_classificator_unstacked(
rules, pd.Series({str(r.condition): 1 for r in rules}), xs=xs
)
def calc_ruleset_prediction_weighted_classificator_stacked(
prediction_vectors: pd.DataFrame, weights: pd.DataFrame
) -> pd.Series:
prediction_vectors, weights = init_weights_stacked(prediction_vectors, weights)
# noinspection PyUnresolvedReferences
mask = (weights.T == weights.max(axis=1)).T
prediction_vectors = prediction_vectors[mask]
return calc_ruleset_prediction_equally_weighted_classificator_stacked(prediction_vectors)
def calc_ruleset_prediction_equally_weighted_classificator_stacked(prediction_vectors: pd.DataFrame) -> pd.Series:
idx = prediction_vectors.index
if pd.api.types.is_string_dtype(prediction_vectors.dtypes.iloc[0]):
most_freq_pred = prediction_vectors.fillna(value=np.nan).replace("nan", np.nan).mode(axis=1)
else:
most_freq_pred = prediction_vectors.mode(axis=1)
most_freq_pred = most_freq_pred.loc[most_freq_pred.count(axis=1) == 1].dropna(axis=1).squeeze()
# Happens if only on point is activated, in which case 'squeeze' will only extract this point's prediction as a
# float, int or str
if not isinstance(most_freq_pred, pd.Series):
most_freq_pred = pd.Series([most_freq_pred])
most_freq_pred.name = None
return most_freq_pred.reindex(idx)
def calc_zscore_external(
prediction: Union[pd.Series, float], nones: Union[pd.Series, int], y: np.ndarray, horizon: int = 1
) -> Union[None, float, pd.Series]:
if isinstance(nones, int) and nones == 0:
return np.nan
num = abs(prediction - np.nanmean(y))
deno = np.sqrt(horizon / nones) * np.nanstd(y)
return num / deno
Functions
def aae_function(prediction_vector: Union[pandas.core.series.Series, pandas.core.frame.DataFrame], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Compute the mean squared error "$ \dfrac{1}{n} \Sigma_{i=1}^{n} (\hat{y}_i - y_i)$"
Parameters
prediction_vector
:Union[pd.Series, pd.DataFrame]
- A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two
- different values ymean, if the rule is not active and the prediction is the rule is active.
y
:Union[np.ndarray, pd.Series]
The real target values (real numbers)
Returns
criterion
:Union[float, pd.Series]
the mean squared error, or a Series of mean squared errors
Expand source code
def aae_function( prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: """ Compute the mean squared error "$ \\dfrac{1}{n} \\Sigma_{i=1}^{n} (\\hat{y}_i - y_i)$" Parameters ---------- prediction_vector: Union[pd.Series, pd.DataFrame] A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two different values ymean, if the rule is not active and the prediction is the rule is active. y: Union[np.ndarray, pd.Series] The real target values (real numbers) Returns ------- criterion: Union[float, pd.Series] the mean squared error, or a Series of mean squared errors """ if not isinstance(prediction_vector, pd.DataFrame) and not isinstance(prediction_vector, pd.Series): raise TypeError("'prediction_vector' in aae_function must be a pd.Series or a pd.DataFrame") if isinstance(prediction_vector, np.ndarray): if len(prediction_vector) != len(y): raise ValueError("The two array must have the same length") error = np.nanmean(np.abs(prediction_vector - y)) median = np.nanmean(np.abs(y - np.median(y))) return error / median else: if len(prediction_vector.index) != len(y): raise ValueError("Predictions and y must have the same length") error_vector = prediction_vector.sub(y, axis=0).abs().mean() median = np.mean(np.abs(y - np.median(y))) return error_vector / median
def calc_classification_criterion(activation_vector: Union[pandas.core.frame.DataFrame, numpy.ndarray], prediction: Union[float, int, str, pandas.core.series.Series, numpy.integer, numpy.float64], y: Union[numpy.ndarray, pandas.core.series.Series], **kwargs) ‑> Union[float, pandas.core.series.Series]
-
Computes the criterion
Parameters
activation_vector: Union[np.ndarray, pd.DataFrame] The activation vector of one rule, or of one ruleset, or the stacked activation vectors of a ruleset prediction: Union[float, int, str, pd.Series, np.integer, np.float64] The label prediction, of one rule (float, int, or str) or of a set of rules (pd.Series), or the label prediction of one ruleset at each observations (pd.Series) y: Union[np.ndarray, pd.Series, pd.DataFrame] The real target values. If a dataframe is given, it must have one column only. kwargs: Can contain 'method', indicating how to evaluate the criterion. For now, one can use: * success_rate (default)
Returns
Union[float, pd.Series] Criterion value of one rule or ruleset (float) or of a set of rules (pd.Series)
Expand source code
def calc_classification_criterion( activation_vector: Union[np.ndarray, pd.DataFrame], prediction: Union[float, int, str, pd.Series, np.integer, np.float64], y: Union[np.ndarray, pd.Series], **kwargs, ) -> Union[float, pd.Series]: """ Computes the criterion Parameters ---------- activation_vector: Union[np.ndarray, pd.DataFrame] The activation vector of one rule, or of one ruleset, or the stacked activation vectors of a ruleset prediction: Union[float, int, str, pd.Series, np.integer, np.float64] The label prediction, of one rule (float, int, or str) or of a set of rules (pd.Series), or the label prediction of one ruleset at each observations (pd.Series) y: Union[np.ndarray, pd.Series, pd.DataFrame] The real target values. If a dataframe is given, it must have one column only. kwargs: Can contain 'method', indicating how to evaluate the criterion. For now, one can use:\n * success_rate (default) Returns ------- Union[float, pd.Series] Criterion value of one rule or ruleset (float) or of a set of rules (pd.Series) """ method = kwargs.get("criterion_method", "success_rate") if not isinstance(activation_vector, pd.DataFrame) and not isinstance(activation_vector, np.ndarray): raise TypeError("'activation_vector' in calc_classification_criterion must be a np.ndarray or a pd.DataFrame") if isinstance(prediction, np.ndarray): raise TypeError("Prediction should not be a numpy array") if isinstance(activation_vector, np.ndarray): y_conditional = np.extract(activation_vector != 0, y) if isinstance(prediction, pd.Series): if len(prediction) != len(activation_vector): raise IndexError("Activation vector and prediction vector should have the same length") prediction = prediction[activation_vector != 0] if method.lower() == "success_rate": return success_rate(prediction, y_conditional) else: raise ValueError(f"Unknown criterion: {method}. Please choose among:\n* success_rate") else: if not isinstance(prediction, pd.Series): raise TypeError( "If passing several activation vector as a DataFrame, then prediction must be a pd.Series," f" not {type(prediction)}" ) if isinstance(y, pd.DataFrame): if len(y.columns) == 1: y = y.squeeze() else: raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series") elif isinstance(y, np.ndarray): if len(y.shape) == 1: y = pd.Series(y) elif y.shape[1] == 1: y = pd.Series(y.squeeze()) else: raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series") y_conditional = ( activation_vector.mul(y.replace(0, "zero"), axis=0) .replace(0, np.nan) .replace("", np.nan) .replace("zero", 0) ) return success_rate(prediction, y_conditional)
def calc_regression_criterion(prediction: Union[pandas.core.series.Series, pandas.core.frame.DataFrame], y: Union[numpy.ndarray, pandas.core.series.Series], **kwargs) ‑> Union[float, pandas.core.series.Series]
-
Compute the criterion
Parameters
prediction
:Union[pd.Series, pd.DataFrame]
- The prediction vector of one rule, or the stacked prediction vectors of a ruleset
y
:Union[np.ndarray, pd.Series]
The real target values (real numbers) kwargs: Can contain 'criterion_method', the criterion_method mse_function or mse_function criterion (default is 'mse')
Returns
criterion
:Union[float, pd.Series]
- Criterion value of one rule or ruleset, or the Series of the criterion values of several rules
Expand source code
def calc_regression_criterion( prediction: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series], **kwargs ) -> Union[float, pd.Series]: """ Compute the criterion Parameters ---------- prediction: Union[pd.Series, pd.DataFrame] The prediction vector of one rule, or the stacked prediction vectors of a ruleset y: Union[np.ndarray, pd.Series] The real target values (real numbers) kwargs: Can contain 'criterion_method', the criterion_method mse_function or mse_function criterion (default is 'mse') Returns ------- criterion: Union[float, pd.Series] Criterion value of one rule or ruleset, or the Series of the criterion values of several rules """ criterion_method = kwargs.get("criterion_method", "mse_norm") if isinstance(y, pd.Series): y = y.values if criterion_method.lower() == "mse": criterion = mse_function(prediction, y) elif criterion_method.lower() == "mse_norm": criterion = mse_norm(prediction, y) elif criterion_method.lower() == "mae": criterion = mae_function(prediction, y) elif criterion_method.lower() == "mae_norm": criterion = mae_norm(prediction, y) elif criterion_method.lower() == "aae": criterion = aae_function(prediction, y) else: raise ValueError(f"Unknown criterion: {criterion_method}. Please choose among mse, mae and aae") return criterion
def calc_ruleset_prediction_equally_weighted_classificator_stacked(prediction_vectors: pandas.core.frame.DataFrame) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_equally_weighted_classificator_stacked(prediction_vectors: pd.DataFrame) -> pd.Series: idx = prediction_vectors.index if pd.api.types.is_string_dtype(prediction_vectors.dtypes.iloc[0]): most_freq_pred = prediction_vectors.fillna(value=np.nan).replace("nan", np.nan).mode(axis=1) else: most_freq_pred = prediction_vectors.mode(axis=1) most_freq_pred = most_freq_pred.loc[most_freq_pred.count(axis=1) == 1].dropna(axis=1).squeeze() # Happens if only on point is activated, in which case 'squeeze' will only extract this point's prediction as a # float, int or str if not isinstance(most_freq_pred, pd.Series): most_freq_pred = pd.Series([most_freq_pred]) most_freq_pred.name = None return most_freq_pred.reindex(idx)
def calc_ruleset_prediction_equally_weighted_classificator_unstacked(rules: List[ForwardRef('Rule')], xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_equally_weighted_classificator_unstacked( rules: List["Rule"], xs: Optional[Union[pd.DataFrame, np.ndarray]] = None, ) -> pd.Series: return calc_ruleset_prediction_weighted_classificator_unstacked( rules, pd.Series({str(r.condition): 1 for r in rules}), xs=xs )
def calc_ruleset_prediction_equally_weighted_regressor_stacked(prediction_vectors: pandas.core.frame.DataFrame) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_equally_weighted_regressor_stacked(prediction_vectors: pd.DataFrame) -> pd.Series: return prediction_vectors.mean(axis=1)
def calc_ruleset_prediction_equally_weighted_regressor_unstacked(rules: List[ForwardRef('Rule')], xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_equally_weighted_regressor_unstacked( rules: List["Rule"], xs: Optional[Union[pd.DataFrame, np.ndarray]] = None ) -> pd.Series: if len(rules) == 0: return pd.Series(dtype=int) cum_pred = None cum_w = None for rule in rules: if xs is None: activation = rule.activation else: activation = rule.evaluate_activation(xs).raw if activation is None: continue if len(activation) == 0: continue if cum_pred is None: cum_pred = rule.calc_prediction_vector(activation=activation) cum_w = activation else: cum_pred = ( (cum_pred.replace(0, "zero").replace(np.nan, 0) + activation * rule.prediction) .replace(0, np.nan) .replace("zero", 0) ) cum_w += activation if cum_pred is None: raise ValueError( "No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN" ) return cum_pred / cum_w
def calc_ruleset_prediction_weighted_classificator_stacked(prediction_vectors: pandas.core.frame.DataFrame, weights: pandas.core.frame.DataFrame) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_weighted_classificator_stacked( prediction_vectors: pd.DataFrame, weights: pd.DataFrame ) -> pd.Series: prediction_vectors, weights = init_weights_stacked(prediction_vectors, weights) # noinspection PyUnresolvedReferences mask = (weights.T == weights.max(axis=1)).T prediction_vectors = prediction_vectors[mask] return calc_ruleset_prediction_equally_weighted_classificator_stacked(prediction_vectors)
def calc_ruleset_prediction_weighted_classificator_unstacked(rules: List[ForwardRef('Rule')], weights: pandas.core.series.Series, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_weighted_classificator_unstacked( rules: List["Rule"], weights: pd.Series, xs: Optional[Union[pd.DataFrame, np.ndarray]] = None ) -> pd.Series: if len(rules) == 0: return pd.Series(dtype=int) def get_max(s: pd.Series): ps = s.index m = s.max() if isinstance(ps[0], str): s = (s == m).astype(int).replace(0, np.nan) s[~s.isna()] = ps[~s.isna()] return s else: return (s == m).astype(int).replace(0, np.nan) * ps preds = None for rule in rules: rulename = str(rule.condition) if rulename not in weights.index: logger.warning(f"Rule '{rulename}' has no weight and is ignored in ruleset's prediction") continue if weights[rulename] == 0: logger.warning(f"Rule '{rulename}' has a null weight and is ignored in ruleset's prediction") continue if np.isnan(weights[rulename]): logger.warning(f"Rule '{rulename}' has a nan weight and is ignored in ruleset's prediction") continue if xs is None: activation = rule.activation nones = rule.nones else: activation = rule.evaluate_activation(xs) nones = activation.nones activation = activation.raw if activation is None: continue if nones == 0: continue if preds is None: preds = pd.DataFrame(index=[rule.prediction], data=[activation * weights[rulename]]) else: if rule.prediction not in preds.index: preds.loc[rule.prediction] = activation * weights[rulename] else: preds.loc[rule.prediction] += activation * weights[rulename] if preds is None: raise ValueError( "No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN" ) best_preds = preds.max().replace(0, np.nan) best_preds = best_preds[(preds == best_preds).sum() == 1] preds.loc[:, ~preds.columns.isin(best_preds.index)] = np.nan preds2 = preds.dropna(axis=1).apply(get_max) if isinstance(preds.index[0], str): preds2 = preds2.bfill().ffill().iloc[0].reindex(preds.columns) preds2.name = None return preds2 else: return preds2.sum().reindex(preds.columns)
def calc_ruleset_prediction_weighted_regressor_stacked(prediction_vectors: pandas.core.frame.DataFrame, weights: pandas.core.frame.DataFrame) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_weighted_regressor_stacked( prediction_vectors: pd.DataFrame, weights: pd.DataFrame ) -> pd.Series: prediction_vectors, weights = init_weights_stacked(prediction_vectors, weights) idx = prediction_vectors.index return ((prediction_vectors * weights).sum(axis=1) / weights.sum(axis=1)).reindex(idx)
def calc_ruleset_prediction_weighted_regressor_unstacked(rules: List[ForwardRef('Rule')], weights: pandas.core.series.Series, xs: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)] = None) ‑> pandas.core.series.Series
-
Expand source code
def calc_ruleset_prediction_weighted_regressor_unstacked( rules: List["Rule"], weights: pd.Series, xs: Optional[Union[pd.DataFrame, np.ndarray]] = None, ) -> pd.Series: if len(rules) == 0: return pd.Series(dtype=int) cum_pred = None cum_w = None for rule in rules: rulename = str(rule.condition) if rulename not in weights.index: logger.warning(f"Rule '{rulename}' has no weight and is ignored in ruleset's prediction") continue if weights[rulename] == 0: logger.warning(f"Rule '{rulename}' has a null weight and is ignored in ruleset's prediction") continue if np.isnan(weights[rulename]): logger.warning(f"Rule '{rulename}' has a nan weight and is ignored in ruleset's prediction") continue if xs is None: activation = rule.activation else: activation = rule.evaluate_activation(xs).raw if activation is None: continue if len(activation) == 0: continue if cum_pred is None: cum_pred = rule.calc_prediction_vector(activation=activation) * weights[rulename] cum_w = activation * weights[rulename] else: cum_pred = ( (cum_pred.replace(0, "zero").replace(np.nan, 0) + activation * rule.prediction * weights[rulename]) .replace(0, np.nan) .replace("zero", 0) ) cum_w += activation * weights[rulename] if cum_pred is None: raise ValueError( "No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN" ) return cum_pred / cum_w
def calc_zscore_external(prediction: Union[float, pandas.core.series.Series], nones: Union[pandas.core.series.Series, int], y: numpy.ndarray, horizon: int = 1) ‑> Union[ForwardRef(None), float, pandas.core.series.Series]
-
Expand source code
def calc_zscore_external( prediction: Union[pd.Series, float], nones: Union[pd.Series, int], y: np.ndarray, horizon: int = 1 ) -> Union[None, float, pd.Series]: if isinstance(nones, int) and nones == 0: return np.nan num = abs(prediction - np.nanmean(y)) deno = np.sqrt(horizon / nones) * np.nanstd(y) return num / deno
def class_probabilities(activation: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[pandas.core.frame.DataFrame, numpy.ndarray]
-
Computes the class probability of each rule(s)
Parameters
activation
:Union[np.ndarray, pd.DataFrame, None]
- Either the activation vector of one rule (np.ndarray) or a DataFrame of activation vectors of many rules (one rule
- is one column)
y
:Union[np.ndarray, pd.Series]
The target classes
Returns
Union[np.ndarray, pd.DataFrame]
- If given one activation vector, returns a np.ndarray of the form [(class1, prob 1), …, (class n, prob n)]. If given a df of activation vectors, returns a df with the classes as index, the rules as columns and the probabilities as values.
Expand source code
def class_probabilities( activation: Union[np.ndarray, pd.DataFrame, None], y: Union[np.ndarray, pd.Series] ) -> Union[np.ndarray, pd.DataFrame]: """Computes the class probability of each rule(s) Parameters ---------- activation: Union[np.ndarray, pd.DataFrame, None] Either the activation vector of one rule (np.ndarray) or a DataFrame of activation vectors of many rules (one rule is one column) y: Union[np.ndarray, pd.Series] The target classes Returns ------- Union[np.ndarray, pd.DataFrame] If given one activation vector, returns a np.ndarray of the form [(class1, prob 1), ..., (class n, prob n)]. If given a df of activation vectors, returns a df with the classes as index, the rules as columns and the probabilities as values. """ if activation is None: return np.bincount(y).argmax() if not isinstance(activation, pd.DataFrame) and not isinstance(activation, np.ndarray): raise TypeError("'activation' in most_common_class must be None or a np.ndarray or a pd.DataFrame") if isinstance(activation, np.ndarray): y_conditional = np.extract(activation, y) count = Counter(y_conditional) n = len(y_conditional) prop = [v / n for v in count.values()] proba = np.array([(c, v) for c, v in zip(count.keys(), prop)]) proba = proba[proba[:, 0].argsort()] return proba else: if isinstance(y, pd.DataFrame): if len(y.columns) == 1: y = y.squeeze() else: raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series") elif isinstance(y, np.ndarray): if len(y.shape) == 1: y = pd.Series(y) elif y.shape[1] == 1: y = pd.Series(y.squeeze()) else: raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series") y_conditional = ( activation.mul(y.replace(0, "zero"), axis=0).replace(0, np.nan).replace("", np.nan).replace("zero", 0) ) count = y_conditional.apply(lambda x: x.value_counts()) count.index = count.index.astype(y.dtype) return count.apply(lambda x: x / x.dropna().sum())
def conditional_mean(activation: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Mean of all activated values
If activation is None, we assume the given y have already been extracted from the activation vector, which saves time.
Expand source code
def conditional_mean( activation: Union[np.ndarray, pd.DataFrame, None], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: """Mean of all activated values If activation is None, we assume the given y have already been extracted from the activation vector, which saves time. """ if activation is None: return float(np.nanmean(y)) if not isinstance(activation, pd.DataFrame) and not isinstance(activation, np.ndarray): raise TypeError("'activation' in conditional_mean must be None or a np.ndarray or a pd.DataFrame") if isinstance(activation, np.ndarray): y_conditional = np.extract(activation, y) non_nans_conditional_y = y_conditional[~np.isnan(y_conditional)] if len(non_nans_conditional_y) == 0: logger.debug( "None of the activated points have a non-nan value in target y." " Conditional mean is set to nan." ) return np.nan return float(np.mean(non_nans_conditional_y)) else: return activation.apply(lambda x: np.nanmean(np.extract(x, y)))
def conditional_std(activation: Union[pandas.core.frame.DataFrame, numpy.ndarray, ForwardRef(None)], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Standard deviation of all activated values
If activation is None, we assume the given y have already been extracted from the activation vector, which saves time.
Expand source code
def conditional_std( activation: Union[np.ndarray, pd.DataFrame, None], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: """Standard deviation of all activated values If activation is None, we assume the given y have already been extracted from the activation vector, which saves time. """ if activation is None: return float(np.nanstd(y)) if not isinstance(activation, pd.DataFrame) and not isinstance(activation, np.ndarray): raise TypeError("'activation' in conditional_std must be None or a np.ndarray or a pd.DataFrame") if isinstance(activation, np.ndarray): y_conditional = np.extract(activation, y) if len(y_conditional) == 0: return np.nan if len(y_conditional) == 1: return 0 # ddof ensures numpy uses non-biased estimator of std, like pandas' default return float(np.nanstd(y_conditional, ddof=1)) else: if isinstance(y, pd.DataFrame): if len(y.columns) == 1: y = y.squeeze() else: raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series") elif isinstance(y, np.ndarray): if len(y.shape) == 1: y = pd.Series(y) elif y.shape[1] == 1: y = pd.Series(y.squeeze()) else: raise ValueError("y must be a 1-D DataFrame or ndarray or pd.Series") y_conditional = ( activation.mul(y.replace(0, "zero"), axis=0).replace(0, np.nan).replace("", np.nan).replace("zero", 0) ) return y_conditional.std()
def init_weights_stacked(prediction_vectors: pandas.core.frame.DataFrame, weights: pandas.core.frame.DataFrame) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
-
Expand source code
def init_weights_stacked(prediction_vectors: pd.DataFrame, weights: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: if weights.empty: raise ValueError("Weights not None but empty : can not evaluate prediction") weights = weights.replace(0, np.nan).fillna(value=np.nan).dropna(axis=1, how="all") absent_rules = prediction_vectors.loc[:, ~prediction_vectors.columns.isin(weights.columns)].columns present_rules = prediction_vectors.loc[:, prediction_vectors.columns.isin(weights.columns)].columns if len(absent_rules) > 0: s = ( "Some rules given in the prediction vector did not have a weight and will be ignored in the" " computation of the ruleset prediction. The concerned rules are :" ) s = "\n".join([s] + list(absent_rules.astype(str))) logger.warning(s) prediction_vectors = prediction_vectors[present_rules] weights = (~prediction_vectors.isna() * 1).replace(0, np.nan) * weights if prediction_vectors.empty: raise ValueError( "No rules had non-zero/non-NaN weights, or all predictions left after applying weights where NaN" ) return prediction_vectors, weights
def mae_function(prediction_vector: Union[pandas.core.series.Series, pandas.core.frame.DataFrame], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Compute the mean absolute error "$ \dfrac{1}{n} \Sigma_{i=1}^{n} |\hat{y}_i - y_i| $"
Parameters
prediction_vector
:Union[pd.Series, pd.DataFrame]
- A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two
- different values ymean, if the rule is not active and the prediction is the rule is active.
y
:Union[np.ndarray, pd.Series]
The real target values (real numbers)
Returns
criterion
:Union[float, pd.Series]
- the mean absolute error
Expand source code
def mae_function( prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: """ Compute the mean absolute error "$ \\dfrac{1}{n} \\Sigma_{i=1}^{n} |\\hat{y}_i - y_i| $" Parameters ---------- prediction_vector: Union[pd.Series, pd.DataFrame] A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two different values ymean, if the rule is not active and the prediction is the rule is active. y: Union[np.ndarray, pd.Series] The real target values (real numbers) Returns ------- criterion: Union[float, pd.Series] the mean absolute error """ if not isinstance(prediction_vector, pd.DataFrame) and not isinstance(prediction_vector, pd.Series): raise TypeError("'prediction_vector' in mae_function must be a pd.Series or a pd.DataFrame") if isinstance(prediction_vector, np.ndarray): if len(prediction_vector) != len(y): raise ValueError("The two array must have the same length") error_vect = np.abs(prediction_vector - y) criterion = np.nanmean(error_vect) # noinspection PyTypeChecker return criterion else: if len(prediction_vector.index) != len(y): raise ValueError("Predictions and y must have the same length") error_vect = prediction_vector.sub(y, axis=0).abs() return error_vect.mean()
def mae_norm(prediction_vector: Union[pandas.core.series.Series, pandas.core.frame.DataFrame], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Expand source code
def mae_norm( prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: return mae_function(prediction_vector, y) / np.mean(np.mean(y) - y)
def mse_function(prediction_vector: Union[pandas.core.series.Series, pandas.core.frame.DataFrame], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Compute the mean squared error "$ \dfrac{1}{n} \Sigma_{i=1}^{n} (\hat{y}_i - y_i)^2 $"
Parameters
prediction_vector
:Union[pd.Series, pd.DataFrame]
- A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two
- different values ymean, if the rule is not active and the prediction is the rule is active.
y
:Union[np.ndarray, pd.Series]
- The real target values (real numbers)
Returns
criterion
:Union[float, pd.Series]
- the mean squared error
Expand source code
def mse_function( prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: """ Compute the mean squared error "$ \\dfrac{1}{n} \\Sigma_{i=1}^{n} (\\hat{y}_i - y_i)^2 $" Parameters ---------- prediction_vector: Union[pd.Series, pd.DataFrame] A predictor vector or stacked prediction vectors. It means one or many sparse arrays with two different values ymean, if the rule is not active and the prediction is the rule is active. y: Union[np.ndarray, pd.Series] The real target values (real numbers) Returns ------- criterion: Union[float, pd.Series] the mean squared error """ if not isinstance(prediction_vector, pd.DataFrame) and not isinstance(prediction_vector, pd.Series): raise TypeError("'prediction_vector' in mse_function must be a pd.Series or a pd.DataFrame") if isinstance(prediction_vector, np.ndarray): if len(prediction_vector) != len(y): raise ValueError("Predictions and y must have have the same length") error_vector = prediction_vector - y criterion = np.nanmean(error_vector ** 2) # noinspection PyTypeChecker return criterion else: if len(prediction_vector.index) != len(y): raise ValueError("Predictions and y must have the same length") error_vector = prediction_vector.sub(y, axis=0) return (error_vector ** 2).mean()
def mse_norm(prediction_vector: Union[pandas.core.series.Series, pandas.core.frame.DataFrame], y: Union[numpy.ndarray, pandas.core.series.Series]) ‑> Union[float, pandas.core.series.Series]
-
Expand source code
def mse_norm( prediction_vector: Union[pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series] ) -> Union[float, pd.Series]: return mse_function(prediction_vector, y) / np.mean((np.mean(y) - y) ** 2)
def success_rate(prediction: Union[float, int, str, pandas.core.series.Series, numpy.integer, numpy.float64], y: Union[pandas.core.frame.DataFrame, numpy.ndarray]) ‑> Union[float, pandas.core.series.Series]
-
Returns the number fraction of y that equal the prediction.
Parameters
prediction
:Union[float, int, str, pd.Series, np.integer, np.float64]
- The label prediction, of one rule (float ,int or str) or of a set of rules (pd.Series), of a prediction vector (pd.Series)
y
:Union[np.ndarray, pd.DataFrame]
- The real target points activated by the rule (np.ndarray, without nans) or the rules (pd.DataFrame, can contain nans even alongside strings)
Returns
Union[float, pd.Series]
- The fraction of y that equal one rule or ruleset's prediction (float) or many rules predictions (pd.Series).
Expand source code
def success_rate( prediction: Union[float, int, str, pd.Series, np.integer, np.float64], y: Union[np.ndarray, pd.DataFrame] ) -> Union[float, pd.Series]: """ Returns the number fraction of y that equal the prediction. Parameters ---------- prediction: Union[float, int, str, pd.Series, np.integer, np.float64] The label prediction, of one rule (float ,int or str) or of a set of rules (pd.Series), of a prediction vector (pd.Series) y: Union[np.ndarray, pd.DataFrame] The real target points activated by the rule (np.ndarray, without nans) or the rules (pd.DataFrame, can contain nans even alongside strings) Returns ------- Union[float, pd.Series] The fraction of y that equal one rule or ruleset's prediction (float) or many rules predictions (pd.Series). """ if not isinstance(prediction, (pd.Series, float, int, str, np.integer, np.float64)): raise TypeError( "'prediction' in success_rate must be an integer, a string or a pd.Series/np.ndarray of one of those," f"got {prediction} ({type(prediction)})." ) if isinstance(prediction, (int, float, str, np.integer, np.float64)): if len(y) == 0: return np.nan return sum(prediction == y) / len(y) elif isinstance(y, np.ndarray) and isinstance(prediction, pd.Series): if len(y) == 0: return pd.Series(dtype=int) return sum(prediction == y) / len(prediction.dropna()) else: if not isinstance(y, pd.DataFrame): raise TypeError( f"If passing several predictions as a Series, then activated Y must be a DataFrame, not {type(y)}" ) if len(y.index) == 0: return pd.Series(dtype=int) activated_points = (~y.isnull()).sum() correctly_predicted = y == prediction return correctly_predicted.sum() / activated_points