from __future__ import annotations
from collections import Counter
from typing import Optional, Union
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
[docs]
class Anonymize:
"""Anonymize
Class for performing tailored, model-guided anonymization of training datasets for ML models.
Parameters
----------
k : int
The privacy parameter that determines the number of records that will be indistinguishable from each
other (when looking at the quasi identifiers). Should be at least 2.
quasi_identifiers : np.ndarray or list of strings or integers.
The features that need to be minimized in case of pandas data, and indexes of features
in case of numpy data.
quasi_identifer_slices: list of lists of strings or integers.
If some of the quasi-identifiers represent 1-hot encoded features that need to remain
consistent after anonymization, provide a list containing the list of column names
or indexes that represent a single feature.
categorical_features : list
The list of categorical features (if supplied, these featurtes will be one-hot encoded
before using them to train the decision tree model).
is_regression : bool
Whether the model is a regression model or not (if False, assumes a classification model).
Default is False.
train_only_QI : bool
The required method to train data set for anonymization. Default is
to train the tree on all features.
References
----------
Goldsteen, Abigail, Gilad Ezov, Ron Shmelkin, Micha Moffie, and Ariel Farkash. "Anonymizing machine learning models."
In International Workshop on Data Privacy Management, pp. 121-136. Cham: Springer International Publishing, 2021.
"""
def __init__(
self,
k: int,
quasi_identifiers: Union[np.ndarray, list],
features: Optional[list] = None,
features_names: Optional[list] = None,
quasi_identifer_slices: Optional[list] = None,
categorical_features: Optional[list] = None,
is_regression: Optional[bool] = False,
train_only_QI: Optional[bool] = False,
):
if k < 2:
raise ValueError("k should be a positive integer with a value of 2 or higher")
if quasi_identifiers is None or len(quasi_identifiers) < 1:
raise ValueError("The list of quasi-identifiers cannot be empty")
self.k = k
self.quasi_identifiers = quasi_identifiers
self.categorical_features = categorical_features
self.is_regression = is_regression
self.train_only_QI = train_only_QI
self.features_names = features_names
self.features = features
self.quasi_identifer_slices = quasi_identifer_slices
[docs]
def anonymize(self, X_train, y_train):
"""
Description
-----------
Method for performing model-guided anonymization.
Parameters
----------
X_train : np.ndarray or pandas DataFrame
Dataset containing the training data for the model.
y_train : np.ndarray
The predictions of the original model on the training data.
Returns
-------
The anonymized training dataset as pandas DataFrame.
"""
if X_train.shape[1] != 0:
self.features = list(range(X_train.shape[1]))
else:
raise ValueError("No data provided")
if self.features_names is None:
# if no names provided, use numbers instead
self.features_names = self.features
if not set(self.quasi_identifiers).issubset(set(self.features_names)):
raise ValueError(
"Quasi identifiers should bs a subset of the supplied features or indexes in range of "
"the data columns"
)
if self.categorical_features and not set(self.categorical_features).issubset(set(self.features_names)):
raise ValueError(
"Categorical features should bs a subset of the supplied features or indexes in range of "
"the data columns"
)
# transform quasi identifiers to indexes
self.quasi_identifiers = [i for i, v in enumerate(self.features_names) if v in self.quasi_identifiers]
if self.quasi_identifer_slices:
temp_list = []
for qislice in self.quasi_identifer_slices:
new_slice = [i for i, v in enumerate(self.features_names) if v in qislice]
temp_list.append(new_slice)
self.quasi_identifer_slices = temp_list
if self.categorical_features:
self.categorical_features = [i for i, v in enumerate(self.features_names) if v in self.categorical_features]
transformed = self._anonymize(X_train.copy(), y_train)
df = pd.DataFrame(transformed, columns=self.features_names)
return df.astype(X_train.dtypes)
def _anonymize(self, x, y):
if x.shape[0] != y.shape[0]:
raise ValueError("x and y should have same number of rows")
if any(x.select_dtypes(include="category").columns):
if not self.categorical_features:
raise ValueError("when supplying an array with non-numeric data, categorical_features must be defined")
x_prepared = self._modify_categorical_features(x)
else:
x_prepared = x
x_anonymizer_train = x_prepared
if self.train_only_QI:
# build DT just on QI features
x_anonymizer_train = x_prepared[:, self.quasi_identifiers]
if self.is_regression:
self._anonymizer = DecisionTreeRegressor(random_state=10, min_samples_split=2, min_samples_leaf=self.k)
else:
self._anonymizer = DecisionTreeClassifier(random_state=10, min_samples_split=2, min_samples_leaf=self.k)
self._anonymizer.fit(x_anonymizer_train, y)
cells_by_id = self._calculate_cells(x, x_anonymizer_train)
return self._anonymize_data(x, x_anonymizer_train, cells_by_id)
def _calculate_cells(self, x, x_anonymizer_train):
# x is original data, x_anonymizer_train is only QIs + 1-hot encoded
cells_by_id = {}
leaves = []
for node, feature in enumerate(self._anonymizer.tree_.feature):
if feature == -2: # leaf node
leaves.append(node)
hist = [int(i) for i in self._anonymizer.tree_.value[node][0]]
# TODO we may change the method for choosing representative for cell # noqa: TD004
# label_hist = self.anonymizer.tree_.value[node][0]
# label = int(self.anonymizer.classes_[np.argmax(label_hist)])
cell = {"label": 1, "hist": hist, "id": int(node)}
cells_by_id[cell["id"]] = cell
self._nodes = leaves
self._find_representatives(x, x_anonymizer_train, cells_by_id.values())
return cells_by_id
def _find_representatives(self, x, x_anonymizer_train, cells):
# x is original data (always numpy), x_anonymizer_train is only QIs + 1-hot encoded
node_ids = self._find_sample_nodes(x_anonymizer_train)
if self.quasi_identifer_slices:
all_one_hot_features = {feature for encoded in self.quasi_identifer_slices for feature in encoded}
else:
all_one_hot_features = set()
for cell in cells:
cell["representative"] = {}
# get all rows in cell
indexes = [index for index, node_id in enumerate(node_ids) if node_id == cell["id"]]
# TODO: should we filter only those with majority label? (using hist)
rows = x.loc[indexes]
done = set()
for feature in self.quasi_identifiers: # self.quasi_identifiers are numerical indexes
if feature not in done:
# deal with 1-hot encoded features
if feature in all_one_hot_features:
# find features that belong together
for encoded in self.quasi_identifer_slices:
if feature in encoded:
values = rows[:, encoded]
unique_rows, counts = np.unique(values, axis=0, return_counts=True)
rep = unique_rows[np.argmax(counts)]
for i, e in enumerate(encoded):
done.add(e)
cell["representative"][e] = rep[i]
else: # rest of features
values = rows.iloc[:, feature]
if self.categorical_features and feature in self.categorical_features:
# find most common value
cell["representative"][feature] = Counter(values).most_common(1)[0][0]
else:
# find the mean value (per feature)
median = np.median(values)
min_value = max(values)
min_dist = float("inf")
for value in values:
# euclidean distance between two floating point values
dist = abs(value - median)
if dist < min_dist:
min_dist = dist
min_value = value
cell["representative"][feature] = min_value
def _find_sample_nodes(self, samples):
paths = self._anonymizer.decision_path(samples).toarray()
node_set = set(self._nodes)
return [next(iter({i for i, v in enumerate(p) if v == 1} & node_set)) for p in paths]
def _find_sample_cells(self, samples, cells_by_id):
node_ids = self._find_sample_nodes(samples)
return [cells_by_id[node_id] for node_id in node_ids]
def _anonymize_data(self, x, x_anonymizer_train, cells_by_id):
cells = self._find_sample_cells(x_anonymizer_train, cells_by_id)
index = 0
dfnew = pd.DataFrame(columns=x.columns)
for _, row in x.iterrows():
cell = cells[index]
index += 1
for feature in cell["representative"]:
row[feature] = cell["representative"][feature]
dfnew.loc[len(dfnew)] = row
return dfnew
def _modify_categorical_features(self, x):
# prepare data for DT
used_features = self.features
if self.train_only_QI:
used_features = self.quasi_identifiers
numeric_features = [f for f in self.features if f in used_features and f not in self.categorical_features]
categorical_features = [f for f in self.categorical_features if f in used_features]
numeric_transformer = Pipeline(steps=[("imputer", SimpleImputer(strategy="constant", fill_value=0))])
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x)
return encoded