Module ifra.diff_privacy

Expand source code
from typing import Optional
import logging
import numpy as np
from ruleskit import RuleSet, ClassificationRule, RegressionRule

logger = logging.getLogger(__name__)


def lambda_function(delta_p, delta_v, n, p):
    return (delta_p ** 2 / delta_v) * np.log((n - 1) * p / (1 - p))


# noinspection PyProtectedMember
def apply_diff_privacy_classif(
    ruleset: RuleSet, y: np.ndarray, c_min: Optional[float] = None, name: Optional[str] = None
):
    pass


# noinspection PyProtectedMember
def apply_diff_privacy_regression(
    ruleset: RuleSet, y: np.ndarray, p: float, c_min: Optional[float] = None, name: Optional[str] = None
):
    n = len(y)

    good_rules = []

    for r in ruleset:
        if r.prediction is None or np.isnan(r.prediction):
            continue
        c = r.coverage if c_min is None else c_min
        min_pts = int(n * c)
        delta_pred_min = max(
            abs(min(y) - (min(y) * (min_pts - 1) + max(y)) / min_pts),
            abs(max(y) - (max(y) * (min_pts - 1) + min(y)) / min_pts),
        )
        delta_pred_max = max(y) - min(y)
        delta_activated_min = 1  # max variation of number of activated points when changing one point is... well... 1 !
        delta_activated_max = n
        lambda_value_pred = lambda_function(delta_p=delta_pred_min, delta_v=delta_pred_max, n=n, p=p)
        lambda_value_activated = lambda_function(
            delta_p=delta_activated_min, delta_v=delta_activated_max, n=n, p=r.coverage
        )
        if lambda_value_pred < 0:
            logging.warning(f"{name} - Got a negative lambda : not enough points to ensure privacy. Discarding rule.")
            continue
        if lambda_value_activated < 0:
            logging.warning(f"{name} - got a negative lambda : not enough points to ensure privacy. Discarding rule.")
            continue
        privacy_pred = r.prediction + np.random.laplace(0, lambda_value_pred)
        privacy_set_size = r.train_set_size + int(np.random.laplace(0, lambda_value_activated))
        # logger.info(f"{name} - Privatised prediction : {r.prediction} -> {privacy_pred}")
        # logger.info(f"{name} - Privatised train set size : {r.train_set_size} -> {privacy_set_size}")
        r._prediction = privacy_pred
        r._train_set_size = privacy_set_size
        good_rules.append(r)
    ruleset.rules = good_rules


# noinspection PyProtectedMember
def apply_diff_privacy(
    ruleset: RuleSet, y: np.ndarray, p: float, c_min: Optional[float] = None, name: Optional[str] = None
):
    """
    Modifies rules predictions and train set size to make the learning private up to a probability 'p' of identifying
    their real values.

    Parameters
    ----------
    ruleset: RuleSet
    y: np.ndarray
    p: float
    c_min: Optional[float]
    name: Optional[str]

    Returns
    -------
    None
    """
    if len(ruleset) == 0:
        return
    if ruleset._activation is not None and ruleset._activation.length == 0:
        return
    if ruleset.rule_type is None:
        return
    if issubclass(ruleset.rule_type, ClassificationRule):
        apply_diff_privacy_classif(ruleset=ruleset, y=y, c_min=c_min, name=name)
    elif issubclass(ruleset.rule_type, RegressionRule):
        apply_diff_privacy_regression(ruleset=ruleset, y=y, c_min=c_min, p=p, name=name)
    else:
        raise TypeError(f"Invalid rule type {ruleset.rule_type}. Must derive from ClassificationRule or RegressionRule")

Functions

def apply_diff_privacy(ruleset: ruleskit.ruleset.RuleSet, y: numpy.ndarray, p: float, c_min: Optional[float] = None, name: Optional[str] = None)

Modifies rules predictions and train set size to make the learning private up to a probability 'p' of identifying their real values.

Parameters

ruleset : RuleSet
 
y : np.ndarray
 
p : float
 
c_min : Optional[float]
 
name : Optional[str]
 

Returns

None
 
Expand source code
def apply_diff_privacy(
    ruleset: RuleSet, y: np.ndarray, p: float, c_min: Optional[float] = None, name: Optional[str] = None
):
    """
    Modifies rules predictions and train set size to make the learning private up to a probability 'p' of identifying
    their real values.

    Parameters
    ----------
    ruleset: RuleSet
    y: np.ndarray
    p: float
    c_min: Optional[float]
    name: Optional[str]

    Returns
    -------
    None
    """
    if len(ruleset) == 0:
        return
    if ruleset._activation is not None and ruleset._activation.length == 0:
        return
    if ruleset.rule_type is None:
        return
    if issubclass(ruleset.rule_type, ClassificationRule):
        apply_diff_privacy_classif(ruleset=ruleset, y=y, c_min=c_min, name=name)
    elif issubclass(ruleset.rule_type, RegressionRule):
        apply_diff_privacy_regression(ruleset=ruleset, y=y, c_min=c_min, p=p, name=name)
    else:
        raise TypeError(f"Invalid rule type {ruleset.rule_type}. Must derive from ClassificationRule or RegressionRule")
def apply_diff_privacy_classif(ruleset: ruleskit.ruleset.RuleSet, y: numpy.ndarray, c_min: Optional[float] = None, name: Optional[str] = None)
Expand source code
def apply_diff_privacy_classif(
    ruleset: RuleSet, y: np.ndarray, c_min: Optional[float] = None, name: Optional[str] = None
):
    pass
def apply_diff_privacy_regression(ruleset: ruleskit.ruleset.RuleSet, y: numpy.ndarray, p: float, c_min: Optional[float] = None, name: Optional[str] = None)
Expand source code
def apply_diff_privacy_regression(
    ruleset: RuleSet, y: np.ndarray, p: float, c_min: Optional[float] = None, name: Optional[str] = None
):
    n = len(y)

    good_rules = []

    for r in ruleset:
        if r.prediction is None or np.isnan(r.prediction):
            continue
        c = r.coverage if c_min is None else c_min
        min_pts = int(n * c)
        delta_pred_min = max(
            abs(min(y) - (min(y) * (min_pts - 1) + max(y)) / min_pts),
            abs(max(y) - (max(y) * (min_pts - 1) + min(y)) / min_pts),
        )
        delta_pred_max = max(y) - min(y)
        delta_activated_min = 1  # max variation of number of activated points when changing one point is... well... 1 !
        delta_activated_max = n
        lambda_value_pred = lambda_function(delta_p=delta_pred_min, delta_v=delta_pred_max, n=n, p=p)
        lambda_value_activated = lambda_function(
            delta_p=delta_activated_min, delta_v=delta_activated_max, n=n, p=r.coverage
        )
        if lambda_value_pred < 0:
            logging.warning(f"{name} - Got a negative lambda : not enough points to ensure privacy. Discarding rule.")
            continue
        if lambda_value_activated < 0:
            logging.warning(f"{name} - got a negative lambda : not enough points to ensure privacy. Discarding rule.")
            continue
        privacy_pred = r.prediction + np.random.laplace(0, lambda_value_pred)
        privacy_set_size = r.train_set_size + int(np.random.laplace(0, lambda_value_activated))
        # logger.info(f"{name} - Privatised prediction : {r.prediction} -> {privacy_pred}")
        # logger.info(f"{name} - Privatised train set size : {r.train_set_size} -> {privacy_set_size}")
        r._prediction = privacy_pred
        r._train_set_size = privacy_set_size
        good_rules.append(r)
    ruleset.rules = good_rules
def lambda_function(delta_p, delta_v, n, p)
Expand source code
def lambda_function(delta_p, delta_v, n, p):
    return (delta_p ** 2 / delta_v) * np.log((n - 1) * p / (1 - p))