Module ifra.duplicated_rules_updater
Expand source code
import numexpr as ne
from collections import Counter
from typing import Optional, Union
import numpy as np
from ruleskit import RuleSet, ClassificationRule, RegressionRule, Rule
import logging
logger = logging.getLogger(__name__)
def get_weight(w: Union[None, str], rule: Rule):
if w is None:
return None
if hasattr(rule, w):
return getattr(rule, w)
elif "rule." in w:
for attr in dir(rule):
if "rule." not in w:
break
if f"rule.{attr}" in w:
value = getattr(rule, attr)
if value is None:
raise ValueError(f"Attribute '{attr}' is not set")
if np.isnan(value):
return None
w = w.replace(f"rule.{attr}", str(getattr(rule, attr)))
if "rule." in w:
raise ValueError(f"Could not interprete all attributes specified in weight str : '{w}'")
return ne.evaluate(w).reshape(1)[0]
else:
raise ValueError(f"I do not undersand the weight string '{w}'")
def update_duplicated_rules(
aggregated_model: RuleSet,
weight: Optional[str] = None,
best: str = "max",
name: Optional[str] = None
) -> bool:
"""Used by `ifra.aggragations.Aggregation.aggregate` to take into account the new aggregated model.
It will update the rules predictions of duplicated rules. In case of classification rules, they become the most
frequent prediction. In case of regression rules, they become the weighted mean of the predictions, using
the rules attributes designed by 'weight'. This attributes are concidered better when they are taller if 'best'
is "max", and better when they are smaller if 'best' is "min".
It will set all rules attributes that are not the prediction to None, since theyr are only valid for a node's
dataset.
"""
attributes_to_keep = ["prediction"]
if len(aggregated_model) == 0:
logger.warning(f"{name} - No new rules in aggregated model")
return False
if aggregated_model.rule_type == ClassificationRule:
success = update_classif_preds(aggregated_model, name=name)
elif aggregated_model.rule_type == RegressionRule:
success = update_regr_preds(aggregated_model, weight, best, name=name)
else:
raise TypeError(f"{name} - Unexpected model's rule type {aggregated_model.rule_type}")
if not success:
return False
for attr in aggregated_model.rule_type.rule_index:
if attr in attributes_to_keep:
continue
for r in aggregated_model:
setattr(r, f"_{attr}", None)
return True
def update_classif_preds(model: RuleSet, name: Optional[str] = None) -> bool:
duplicated_conditions = {}
to_remove = []
conditions = [r.condition for r in model]
if list(set(conditions)) == conditions: # No duplicated conds -> no contradictory predictions -> do nothing
return True
for i in range(len(model)):
if conditions.count(conditions[i]) > 1:
if conditions[i] not in duplicated_conditions:
duplicated_conditions[conditions[i]] = [i]
else:
duplicated_conditions[conditions[i]].append(i)
for condition in duplicated_conditions:
preds = []
for i in duplicated_conditions[condition]:
preds.append(model[i].prediction)
if len(set(preds)) == 1: # Duplicated conditions predict the same : nothing to do
continue
counter = Counter(preds).most_common(2)
"""Is a list of 2 tuples containing the 2 most frequent elements in *preds* with the number of times they
are present"""
if counter[0][1] == counter[1][1]: # 2 or more different predictions are equally frequent : ignore rules
to_remove += duplicated_conditions[condition]
continue
good_pred = counter[0][0]
first = True
for i in duplicated_conditions[condition]:
# Only update first rule, and drop the others
if first:
# noinspection PyProtectedMember
model[i]._prediction = good_pred
first = False
else:
to_remove.append(i)
model._rules = [model[i] for i in range(len(model)) if i not in to_remove]
model.stack_activation = False
model._activation = None
model.stacked_activations = None
model._coverage = None
return True
def update_regr_preds(model: RuleSet, weight: str = "equi", best: str = "max", name: Optional[str] = None) -> bool:
duplicated_conditions = {}
to_remove = []
conditions = [r.condition for r in model]
if list(set(conditions)) == conditions: # No duplicated conds -> no contradictory predictions -> do nothing
return True
for i in range(len(model)):
if conditions.count(conditions[i]) > 1:
if conditions[i] not in duplicated_conditions:
duplicated_conditions[conditions[i]] = [i]
else:
duplicated_conditions[conditions[i]].append(i)
for condition in duplicated_conditions:
preds = []
weights = []
for i in duplicated_conditions[condition]:
w = 1
if weight != "equi":
w = get_weight(weight, model[i])
if w is None or np.isnan(w):
continue
preds.append(model[i].prediction)
weights.append(w)
weights = np.array(weights)
weights = weights / weights.sum()
if best == "min":
weights = 1 - weights # smaller weights become the best
weights = weights / weights.sum()
if weights.sum() == 0:
logger.warning(f"{name} - Sum of weights is null. Discarding ruleset.")
return False
pred = np.average(np.array(preds), weights=weights)
first = True
for i in duplicated_conditions[condition]:
# Only update first rule, and drop the others
if first:
# noinspection PyProtectedMember
model[i]._prediction = pred
first = False
else:
to_remove.append(i)
model._rules = [model[i] for i in range(len(model)) if i not in to_remove]
model.stack_activation = False
model._activation = None
model.stacked_activations = None
model._coverage = None
return True
Functions
def get_weight(w: Optional[str], rule: ruleskit.rule.Rule)
-
Expand source code
def get_weight(w: Union[None, str], rule: Rule): if w is None: return None if hasattr(rule, w): return getattr(rule, w) elif "rule." in w: for attr in dir(rule): if "rule." not in w: break if f"rule.{attr}" in w: value = getattr(rule, attr) if value is None: raise ValueError(f"Attribute '{attr}' is not set") if np.isnan(value): return None w = w.replace(f"rule.{attr}", str(getattr(rule, attr))) if "rule." in w: raise ValueError(f"Could not interprete all attributes specified in weight str : '{w}'") return ne.evaluate(w).reshape(1)[0] else: raise ValueError(f"I do not undersand the weight string '{w}'")
def update_classif_preds(model: ruleskit.ruleset.RuleSet, name: Optional[str] = None) ‑> bool
-
Expand source code
def update_classif_preds(model: RuleSet, name: Optional[str] = None) -> bool: duplicated_conditions = {} to_remove = [] conditions = [r.condition for r in model] if list(set(conditions)) == conditions: # No duplicated conds -> no contradictory predictions -> do nothing return True for i in range(len(model)): if conditions.count(conditions[i]) > 1: if conditions[i] not in duplicated_conditions: duplicated_conditions[conditions[i]] = [i] else: duplicated_conditions[conditions[i]].append(i) for condition in duplicated_conditions: preds = [] for i in duplicated_conditions[condition]: preds.append(model[i].prediction) if len(set(preds)) == 1: # Duplicated conditions predict the same : nothing to do continue counter = Counter(preds).most_common(2) """Is a list of 2 tuples containing the 2 most frequent elements in *preds* with the number of times they are present""" if counter[0][1] == counter[1][1]: # 2 or more different predictions are equally frequent : ignore rules to_remove += duplicated_conditions[condition] continue good_pred = counter[0][0] first = True for i in duplicated_conditions[condition]: # Only update first rule, and drop the others if first: # noinspection PyProtectedMember model[i]._prediction = good_pred first = False else: to_remove.append(i) model._rules = [model[i] for i in range(len(model)) if i not in to_remove] model.stack_activation = False model._activation = None model.stacked_activations = None model._coverage = None return True
def update_duplicated_rules(aggregated_model: ruleskit.ruleset.RuleSet, weight: Optional[str] = None, best: str = 'max', name: Optional[str] = None) ‑> bool
-
Used by
ifra.aggragations.Aggregation.aggregate
to take into account the new aggregated model.It will update the rules predictions of duplicated rules. In case of classification rules, they become the most frequent prediction. In case of regression rules, they become the weighted mean of the predictions, using the rules attributes designed by 'weight'. This attributes are concidered better when they are taller if 'best' is "max", and better when they are smaller if 'best' is "min".
It will set all rules attributes that are not the prediction to None, since theyr are only valid for a node's dataset.
Expand source code
def update_duplicated_rules( aggregated_model: RuleSet, weight: Optional[str] = None, best: str = "max", name: Optional[str] = None ) -> bool: """Used by `ifra.aggragations.Aggregation.aggregate` to take into account the new aggregated model. It will update the rules predictions of duplicated rules. In case of classification rules, they become the most frequent prediction. In case of regression rules, they become the weighted mean of the predictions, using the rules attributes designed by 'weight'. This attributes are concidered better when they are taller if 'best' is "max", and better when they are smaller if 'best' is "min". It will set all rules attributes that are not the prediction to None, since theyr are only valid for a node's dataset. """ attributes_to_keep = ["prediction"] if len(aggregated_model) == 0: logger.warning(f"{name} - No new rules in aggregated model") return False if aggregated_model.rule_type == ClassificationRule: success = update_classif_preds(aggregated_model, name=name) elif aggregated_model.rule_type == RegressionRule: success = update_regr_preds(aggregated_model, weight, best, name=name) else: raise TypeError(f"{name} - Unexpected model's rule type {aggregated_model.rule_type}") if not success: return False for attr in aggregated_model.rule_type.rule_index: if attr in attributes_to_keep: continue for r in aggregated_model: setattr(r, f"_{attr}", None) return True
def update_regr_preds(model: ruleskit.ruleset.RuleSet, weight: str = 'equi', best: str = 'max', name: Optional[str] = None) ‑> bool
-
Expand source code
def update_regr_preds(model: RuleSet, weight: str = "equi", best: str = "max", name: Optional[str] = None) -> bool: duplicated_conditions = {} to_remove = [] conditions = [r.condition for r in model] if list(set(conditions)) == conditions: # No duplicated conds -> no contradictory predictions -> do nothing return True for i in range(len(model)): if conditions.count(conditions[i]) > 1: if conditions[i] not in duplicated_conditions: duplicated_conditions[conditions[i]] = [i] else: duplicated_conditions[conditions[i]].append(i) for condition in duplicated_conditions: preds = [] weights = [] for i in duplicated_conditions[condition]: w = 1 if weight != "equi": w = get_weight(weight, model[i]) if w is None or np.isnan(w): continue preds.append(model[i].prediction) weights.append(w) weights = np.array(weights) weights = weights / weights.sum() if best == "min": weights = 1 - weights # smaller weights become the best weights = weights / weights.sum() if weights.sum() == 0: logger.warning(f"{name} - Sum of weights is null. Discarding ruleset.") return False pred = np.average(np.array(preds), weights=weights) first = True for i in duplicated_conditions[condition]: # Only update first rule, and drop the others if first: # noinspection PyProtectedMember model[i]._prediction = pred first = False else: to_remove.append(i) model._rules = [model[i] for i in range(len(model)) if i not in to_remove] model.stack_activation = False model._activation = None model.stacked_activations = None model._coverage = None return True