"""
This module implements the HopSkipJump attack `HopSkipJump`. This is a black-box attack that only requires class
predictions. It is an advanced version of the Boundary attack.
| Paper link: https://arxiv.org/abs/1904.02144
"""
from __future__ import annotations
from typing import Optional, Union
import numpy as np
import pandas as pd
from holisticai.robustness.attackers.classification.commons import x_array_to_df, x_to_nd_array
[docs]
class HopSkipJump:
"""
Implementation of the HopSkipJump attack from Jianbo et al. (2019). This is a powerful black-box attack that
only requires final class prediction, and is an advanced version of the boundary attack.
Parameters
----------
name : str, optional
The name of the attack.
batch_size : int, optional
Batch size for the attack.
targeted : bool, optional
Indicates whether the attack is targeted or not. If True, the positive ground truth is used as the target.
norm : int, float, str, optional
The norm of the attack. Possible values: "inf", np.inf or 2.
max_iter : int, optional
The maximum number of iterations.
max_eval : int, optional
The maximum number of evaluations.
init_eval : int, optional
The number of initial evaluations.
init_size : int, optional
The number of initial samples.
verbose : bool, optional
Verbosity mode.
predictor : callable, optional
The model's prediction function. The default is None.
input_size : int, optional
The size of the input data.
theta : float, optional
The binary search threshold.
curr_iter : int, optional
The current iteration.
References
----------
.. [1] Chen, J., Jordan, M. I., & Wainwright, M. J. (2019). HopSkipJumpAttack: A query-efficient decision-based attack. In 2020 ieee symposium on security and privacy (sp) (pp. 1277-1294). IEEE.
"""
def __init__(
self,
name="HSJ",
batch_size=64,
targeted=False,
norm=2,
max_iter=50,
max_eval=10000,
init_eval=100,
init_size=100,
verbose=True,
predictor=None,
input_size=0,
theta=0.0,
curr_iter=0,
):
self.name = name
self.batch_size = batch_size
self.targeted = targeted
self.norm = norm
self.max_iter = max_iter
self.max_eval = max_eval
self.init_eval = init_eval
self.init_size = init_size
self.verbose = verbose
self.predictor = predictor
self.input_size = input_size
self.theta = theta
self.curr_iter = curr_iter
[docs]
def predict(self, x: np.ndarray):
"""
Perform prediction on the input data.
Parameters
----------
x : np.ndarray
The input data.
Returns
-------
np.ndarray
The model's prediction.
"""
x_df = x_array_to_df(x, feature_names=self.feature_names)
return np.array(self.predictor(x_df))
[docs]
def generate(
self, x_df: pd.DataFrame, y: Optional[np.ndarray] = None, mask: Optional[np.ndarray] = None, x_adv_init=None
) -> pd.DataFrame:
"""
Generate adversarial samples and return them in an array.
Parameters
----------
x_df : pd.DataFrame
The input data.
y : np.ndarray, optional
The target labels.
mask : np.ndarray, optional
The mask used to select the sensitive features.
x_adv_init : np.ndarray, optional
Initial array to act as an initial adversarial example.
Returns
-------
pd.DataFrame
The adversarial examples.
"""
self.input_shape = tuple(x_df.shape[1:])
self.input_size = np.prod(self.input_shape)
if self.norm == 2:
self.theta = 0.01 / np.sqrt(self.input_size)
else:
self.theta = 0.01 / self.input_size
self.feature_names = list(x_df.columns)
x = x_to_nd_array(x_df)
if y is None:
# Throw error if attack is targeted, but no targets are provided
if self.targeted: # pragma: no cover
raise ValueError("Target labels `y` need to be provided for a targeted attack.")
# Use model predictions as correct outputs
y = self.predict(x)
# Check whether users need a stateful attack
start = 0
# Check the mask
if mask is not None:
if len(mask.shape) != len(x.shape):
mask = np.array([mask] * x.shape[0])
else:
mask = np.array([None] * x.shape[0])
# Get clip_min and clip_max from the input data
clip_min, clip_max = np.min(x), np.max(x)
self._clip_min = clip_min
self._clip_max = clip_max
# Prediction from the original images
preds = self.predict(x)
# Prediction from the initial adversarial examples if not None
if x_adv_init is not None:
# Add mask param to the x_adv_init
for i in range(x.shape[0]):
if mask[i] is not None:
x_adv_init[i] = x_adv_init[i] * mask[i] + x[i] * (1 - mask[i])
# Do prediction on the init
init_preds = self.predict(x_adv_init)
else:
init_preds = [None] * len(x)
x_adv_init = [None] * len(x)
x_adv = x.copy()
# Generate the adversarial samples
for ind, val in enumerate(x_adv):
self.curr_iter = start
if self.targeted:
x_adv[ind] = self._perturb(
x=val,
y=y[ind], # type: ignore
y_p=preds[ind],
init_pred=init_preds[ind], # type: ignore
adv_init=x_adv_init[ind], # type: ignore
mask=mask[ind],
)
else:
x_adv[ind] = self._perturb(
x=val,
y=-1,
y_p=preds[ind],
init_pred=init_preds[ind], # type: ignore
adv_init=x_adv_init[ind], # type: ignore
mask=mask[ind],
)
return x_array_to_df(x_adv, feature_names=self.feature_names)
def _perturb(
self,
x: np.ndarray,
y: int,
y_p: int,
init_pred: int,
adv_init: np.ndarray,
mask: Optional[np.ndarray],
) -> np.ndarray:
"""
Internal attack function for one example.
Parameters
----------
x : np.ndarray
The original input.
y : int
The target label.
y_p : int
The predicted label of x.
init_pred : int
The predicted label of the initial image.
adv_init : np.ndarray
Initial array to act as an initial adversarial example.
mask : np.ndarray
An array with a mask to be applied to the adversarial perturbations. Shape needs to be broadcastable to the\\
shape of x. Any features for which the mask is zero will not be adversarially perturbed.
Returns
-------
np.ndarray
An adversarial example.
"""
# First, create an initial adversarial sample
initial_sample = self._init_sample(x, y, y_p, init_pred, adv_init, mask)
# If an initial adversarial example is not found, then return the original image
if initial_sample is None:
return x
# If an initial adversarial example found, then go with HopSkipJump attack
x_adv = self._attack(initial_sample[0], x, initial_sample[1], mask)
return x_adv
def _init_sample(
self,
x: np.ndarray,
y: int,
y_p: int,
init_pred: int,
adv_init: np.ndarray,
mask: Optional[np.ndarray],
) -> Optional[Union[np.ndarray, tuple[np.ndarray, int]]]:
"""
Find initial adversarial example for the attack.
Parameters
----------
x : np.ndarray
The original input.
y : int
The target label.
y_p : int
The predicted label of x.
init_pred : int
The predicted label of the initial image.
adv_init : np.ndarray
Initial array to act as an initial adversarial example.
mask : np.ndarray
An array with a mask to be applied to the adversarial perturbations. Shape needs to be broadcastable to the\\
shape of x. Any features for which the mask is zero will not be adversarially perturbed.
Returns
-------
Optional[Union[np.ndarray, tuple[np.ndarray, int]]]
An initial adversarial example.
"""
nprd = np.random.RandomState()
initial_sample = None
if self.targeted:
# Attack satisfied
if y == y_p:
return None
# Attack unsatisfied yet and the initial image satisfied
if adv_init is not None and init_pred == y:
return adv_init, init_pred
# Attack unsatisfied yet and the initial image unsatisfied
for _ in range(self.init_size):
random_img = nprd.uniform(self._clip_min, self._clip_max, size=x.shape).astype(x.dtype)
if mask is not None:
random_img = random_img * mask + x * (1 - mask)
random_class = self.predict(np.array([random_img]))
if random_class == y:
# Binary search to reduce the l2 distance to the original image
random_img = self._binary_search(
current_sample=random_img,
original_sample=x,
target=y,
norm=2,
threshold=0.001,
)
initial_sample = random_img, random_class
break
else:
# The initial image satisfied
if adv_init is not None and init_pred != y_p:
return adv_init, y_p
# The initial image unsatisfied
for _ in range(self.init_size):
random_img = nprd.uniform(self._clip_min, self._clip_max, size=x.shape).astype(x.dtype)
if mask is not None:
random_img = random_img * mask + x * (1 - mask)
random_class = self.predict(np.array([random_img]))
if random_class != y_p:
# Binary search to reduce the l2 distance to the original image
random_img = self._binary_search(
current_sample=random_img,
original_sample=x,
target=y_p,
norm=2,
threshold=0.001,
)
initial_sample = random_img, y_p
break
return initial_sample # type: ignore
def _attack(
self,
initial_sample: np.ndarray,
original_sample: np.ndarray,
target: int,
mask: Optional[np.ndarray],
) -> np.ndarray:
"""
Main function for the boundary attack.
Parameters
----------
initial_sample : np.ndarray
The initial adversarial example.
original_sample : np.ndarray
The original input.
target : int
The target label.
mask : np.ndarray
An array with a mask to be applied to the adversarial perturbations. Shape needs to be broadcastable to the\\
shape of x. Any features for which the mask is zero will not be adversarially perturbed.
Returns
-------
np.ndarray
An adversarial example.
"""
# Set current perturbed image to the initial image
current_sample = initial_sample
# Main loop to wander around the boundary
for _ in range(self.max_iter):
# First compute delta
delta = self._compute_delta(
current_sample=current_sample,
original_sample=original_sample,
)
# Then run binary search
current_sample = self._binary_search(
current_sample=current_sample,
original_sample=original_sample,
norm=self.norm,
target=target,
)
# Next compute the number of evaluations and compute the update
num_eval = min(int(self.init_eval * np.sqrt(self.curr_iter + 1)), self.max_eval)
update = self._compute_update(
current_sample=current_sample,
num_eval=num_eval,
delta=delta,
target=target,
mask=mask,
)
# Finally run step size search by first computing epsilon
if self.norm == 2:
dist = np.linalg.norm(original_sample - current_sample)
else:
dist = np.max(abs(original_sample - current_sample))
epsilon = 2.0 * dist / np.sqrt(self.curr_iter + 1)
success = False
while not success:
epsilon /= 2.0
potential_sample = current_sample + epsilon * update
success = self._adversarial_satisfactory( # type: ignore
samples=potential_sample[None],
target=target,
)
# Update current sample
current_sample = np.clip(potential_sample, self._clip_min, self._clip_max)
# Update current iteration
self.curr_iter += 1
# If attack failed. return original sample
if np.isnan(current_sample).any(): # pragma: no cover
return original_sample
return current_sample
def _binary_search(
self,
current_sample: np.ndarray,
original_sample: np.ndarray,
target: int,
norm: Union[int, float, str], # noqa: PYI041
threshold: Optional[float] = None,
) -> np.ndarray:
"""
Binary search to approach the boundary.
Parameters
----------
current_sample : np.ndarray
The current adversarial example.
original_sample : np.ndarray
The original input.
target : int
The target label.
norm : Union[int, float, str]
Order of the norm. Possible values: "inf", np.inf or 2.
threshold : float, optional
The threshold for the binary search.
Returns
-------
np.ndarray
An adversarial example.
"""
# First set upper and lower bounds as well as the threshold for the binary search
if norm == 2:
(upper_bound, lower_bound) = (1, 0)
if threshold is None:
threshold = self.theta
else:
(upper_bound, lower_bound) = (
np.max(abs(original_sample - current_sample)),
0,
)
if threshold is None:
threshold = np.minimum(upper_bound * self.theta, self.theta)
# Then start the binary search
while (upper_bound - lower_bound) > threshold: # type: ignore
# Interpolation point
alpha = (upper_bound + lower_bound) / 2.0
interpolated_sample = self._interpolate(
current_sample=current_sample,
original_sample=original_sample,
alpha=float(alpha),
norm=norm,
)
# Update upper_bound and lower_bound
satisfied = self._adversarial_satisfactory(
samples=interpolated_sample[None],
target=target,
)[0]
lower_bound = np.where(satisfied == 0, alpha, lower_bound)
upper_bound = np.where(satisfied == 1, alpha, upper_bound)
result = self._interpolate(
current_sample=current_sample,
original_sample=original_sample,
alpha=float(upper_bound),
norm=norm,
)
return result
def _compute_delta(
self,
current_sample: np.ndarray,
original_sample: np.ndarray,
) -> float:
"""
Compute the delta parameter.
Parameters
----------
current_sample : np.ndarray
The current adversarial example.
original_sample : np.ndarray
The original input.
Returns
-------
float
The delta parameter.
"""
# Note: This is a bit different from the original paper, instead we keep those that are
# implemented in the original source code of the authors
if self.curr_iter == 0:
return 0.1 * (self._clip_max - self._clip_min)
if self.norm == 2:
dist = np.linalg.norm(original_sample - current_sample)
delta = np.sqrt(np.prod(self.input_shape)) * self.theta * dist
else:
dist = np.max(abs(original_sample - current_sample))
delta = np.prod(self.input_shape) * self.theta * dist
return float(delta)
def _compute_update(
self,
current_sample: np.ndarray,
num_eval: int,
delta: float,
target: int,
mask: Optional[np.ndarray],
) -> np.ndarray:
"""
Compute the update in Eq.(14).
Parameters
----------
current_sample : np.ndarray
The current adversarial example.
num_eval : int
The number of evaluations.
delta : float
The delta parameter.
target : int
The target label.
mask : np.ndarray
An array with a mask to be applied to the adversarial perturbations. Shape needs to be broadcastable to the\\
shape of x. Any features for which the mask is zero will not be adversarially perturbed.
Returns
-------
np.ndarray
The updated perturbation.
"""
# Generate random noise
rnd_noise_shape = [num_eval, *self.input_shape]
if self.norm == 2:
rnd_noise = np.random.randn(*rnd_noise_shape)
else:
rnd_noise = np.random.uniform(low=-1, high=1, size=rnd_noise_shape)
# With mask
if mask is not None:
rnd_noise = rnd_noise * mask
# Normalize random noise to fit into the range of input data
rnd_noise = rnd_noise / np.sqrt(
np.sum(
rnd_noise**2,
axis=tuple(range(len(rnd_noise_shape)))[1:],
keepdims=True,
)
)
eval_samples = np.clip(current_sample + delta * rnd_noise, self._clip_min, self._clip_max)
rnd_noise = (eval_samples - current_sample) / delta
# Compute gradient: This is a bit different from the original paper, instead we keep those that are
# implemented in the original source code of the authors
satisfied = self._adversarial_satisfactory(samples=eval_samples, target=target)
f_val = 2 * satisfied.reshape([num_eval] + [1] * len(self.input_shape)) - 1.0
if np.mean(f_val) == 1.0:
grad = np.mean(rnd_noise, axis=0)
elif np.mean(f_val) == -1.0:
grad = -np.mean(rnd_noise, axis=0)
else:
f_val -= np.mean(f_val)
grad = np.mean(f_val * rnd_noise, axis=0)
# Compute update
result = grad / np.linalg.norm(grad) if self.norm == 2 else np.sign(grad)
return result
def _adversarial_satisfactory(self, samples: np.ndarray, target: int) -> np.ndarray:
"""
Check whether an image is adversarial.
Parameters
----------
samples : np.ndarray
The input data.
target : int
The target label.
Returns
-------
np.ndarray
An array of 0/1.
"""
samples = np.clip(samples, self._clip_min, self._clip_max)
preds = self.predict(samples)
result = preds == target if self.targeted else preds != target
return result
@staticmethod
def _interpolate(
current_sample: np.ndarray,
original_sample: np.ndarray,
alpha: float,
norm: Union[int, float, str], # noqa: PYI041
) -> np.ndarray:
"""
Interpolate a new sample based on the original and the current samples.
Parameters
----------
current_sample : np.ndarray
The current adversarial example.
original_sample : np.ndarray
The original input.
alpha : float
The interpolation factor.
norm : Union[int, float, str]
Order of the norm. Possible values: "inf", np.inf or 2.
Returns
-------
np.ndarray
The interpolated sample.
"""
if norm == 2:
result = (1 - alpha) * original_sample + alpha * current_sample
else:
result = np.clip(current_sample, original_sample - alpha, original_sample + alpha)
return result