Source code for holisticai.robustness.attackers.classification.zeroth_order_optimization

"""
This module implements the zeroth-order optimization attack `ZooAttack`. This is a black-box attack. This attack is a
variant of the Carlini and Wagner attack which uses ADAM coordinate descent to perform numerical estimation of
gradients.

| Paper link: https://arxiv.org/abs/1708.03999
"""

from __future__ import annotations

from typing import Any, Optional

import numpy as np
import pandas as pd
from holisticai.robustness.attackers.classification.commons import (
    format_function_predict_proba,
    to_categorical,
    x_array_to_df,
    x_to_nd_array,
)
from scipy.ndimage import zoom

BATCH_SIZE = 1


[docs] class ZooAttack: """ The black-box zeroth-order optimization attack from Pin-Yu Chen et al. (2018). This attack is a variant of the C&W attack which uses ADAM coordinate descent to perform numerical estimation of gradients. Parameters ---------- name : str, optional The name of the attack. The default is "Zoo". confidence : float, optional Confidence of adversarial examples. A higher value produces examples that are farther away, but more strongly\\ classified as adversarial. The default is 0.0. targeted : bool, optional Indicates whether the attack is targeted. The default is False. If True, the positive ground truth is used as the target. learning_rate : float, optional The learning rate for the ADAM optimizer. The default is 1e-2. max_iter : int, optional The maximum number of iterations. The default is 20. binary_search_steps : int, optional The number of binary search steps. The default is 10. initial_const : float, optional The initial constant used to scale the adversarial perturbation. The default is 1e-3. abort_early : bool, optional Indicates whether to abort the optimization early. The default is True. use_resize : bool, optional Indicates whether to use resizing. The default is False. use_importance : bool, optional Indicates whether to use importance sampling. The default is False. nb_parallel : int, optional The number of parallel coordinates to update. The default is 1. variable_h : float, optional The variable h. The default is 0.2. verbose : bool, optional Indicates whether to print verbose output. The default is True. input_is_feature_vector : bool, optional Indicates whether the input is a feature vector. The default is False. proxy : callable, optional The model used to predict the probabilities of the input. The default is None. input_size : int, optional The size of the input. The default is 0. nb_classes : int, optional The number of classes. The default is 2. adam_mean : Optional[NDArray|ArrayLike|None], optional The mean of the ADAM optimizer. The default is None. adam_var : Optional[NDArray|ArrayLike|None], optional The variance of the ADAM optimizer. The default is None. adam_epochs : Optional[NDArray|ArrayLike|None], optional The epochs of the ADAM optimizer. The default is None. """ def __init__( self, name="Zoo", confidence=0.0, targeted=False, learning_rate=1e-2, max_iter=20, binary_search_steps=10, initial_const=1e-3, abort_early=True, use_resize=False, use_importance=False, nb_parallel=1, variable_h=0.2, verbose=True, input_is_feature_vector=False, proxy=None, input_size=0, nb_classes=2, adam_mean=None, adam_var=None, adam_epochs=None, ): self.name = name self.confidence = confidence self.targeted = targeted self.learning_rate = learning_rate self.max_iter = max_iter self.binary_search_steps = binary_search_steps self.initial_const = initial_const self.abort_early = abort_early self.use_resize = use_resize self.use_importance = use_importance self.nb_parallel = nb_parallel self.batch_size = BATCH_SIZE self.variable_h = variable_h self.verbose = verbose self.input_is_feature_vector = input_is_feature_vector self.predict_proba_fn = format_function_predict_proba(proxy.learning_task, proxy.predict_proba) self.input_size = input_size self.nb_classes = nb_classes self.adam_mean = adam_mean self.adam_var = adam_var self.adam_epochs = adam_epochs def _initialize_vars(self, x: np.ndarray) -> None: """ Initialize the variables. Parameters ---------- x : np.ndarray The input samples. """ self.input_shape = tuple(x.shape[1:]) self.input_size = np.prod(self.input_shape) if len(self.input_shape) == 1: self.input_is_feature_vector = True if self.batch_size != 1: raise ValueError( "The current implementation of Zeroth-Order Optimisation attack only supports " "`batch_size=1` with feature vectors as input." ) else: self.input_is_feature_vector = False # Initialize some internal variables self._init_size = 32 if self.abort_early: self._early_stop_iters = self.max_iter // 10 if self.max_iter >= 10 else self.max_iter # Initialize noise variable to zero if self.input_is_feature_vector: self.use_resize = False self.use_importance = False if self.use_resize: dims = (self.batch_size, self.input_shape[0], self._init_size, self._init_size) self._current_noise = np.zeros(dims, dtype=np.float32) else: self._current_noise = np.zeros((self.batch_size, *self.input_shape), dtype=np.float32) self._sample_prob = np.ones(self._current_noise.size, dtype=np.float32) / self._current_noise.size def _loss( self, x: np.ndarray, x_adv: np.ndarray, target: np.ndarray, c_weight: np.ndarray ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the loss function values. Parameters ---------- x : np.ndarray The original input. x_adv : np.ndarray The adversarial input. target : np.ndarray The target values. c_weight : np.ndarray The weight of the constant. Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray] The predictions, the L2 distances, and the loss values. """ l2dist = np.sum(np.square(x - x_adv).reshape(x_adv.shape[0], -1), axis=1) ratios = [1.0] + [int(new_size) / int(old_size) for new_size, old_size in zip(self.input_shape, x.shape[1:])] preds = self.predict_proba(np.array(zoom(x_adv, zoom=ratios))) z_target = np.sum(preds * target, axis=1) z_other = np.max( preds * (1 - target) + (np.min(preds, axis=1) - 1)[:, np.newaxis] * target, axis=1, ) if self.targeted: # If targeted, optimize for making the target class most likely loss = np.maximum(z_other - z_target + self.confidence, 0) else: # If untargeted, optimize for making any other class most likely loss = np.maximum(z_target - z_other + self.confidence, 0) return preds, l2dist, c_weight * loss + l2dist
[docs] def generate(self, x_df: pd.DataFrame, y: Optional[np.ndarray] = None) -> pd.DataFrame: """ Generate adversarial samples and return them in an array. Parameters ---------- x_df : pd.DataFrame The input samples. y : Optional[np.ndarray], optional The target labels. The default is None. Returns ------- pd.DataFrame The adversarial samples. """ self._initialize_vars(x_df) feature_names = list(x_df.columns) self.predict_proba = lambda x: self.predict_proba_fn(x, feature_names) x = x_to_nd_array(x_df) self._clip_values = (np.min(x), np.max(x)) if y is not None: y = to_categorical(y, nb_classes=self.nb_classes) # Check that `y` is provided for targeted attacks if self.targeted and y is None: # pragma: no cover raise ValueError("Target labels `y` need to be provided for a targeted attack.") # No labels provided, use model prediction as correct class if y is None: y = self.predict_proba(x) if self.nb_classes == 2 and y.shape[1] == 1: # pragma: no cover raise ValueError( "This attack has not yet been tested for binary classification with a single output classifier." ) # Compute adversarial examples with implicit batching nb_batches = int(np.ceil(x.shape[0] / float(self.batch_size))) x_adv_list = [] for batch_id in range(nb_batches): batch_index_1, batch_index_2 = batch_id * self.batch_size, (batch_id + 1) * self.batch_size x_batch = x[batch_index_1:batch_index_2] y_batch = y[batch_index_1:batch_index_2] res = self._generate_batch(x_batch, y_batch) x_adv_list.append(res) x_adv = np.vstack(x_adv_list) # Apply clip # clip_min, clip_max = self._clip_values # np.clip(x_adv, clip_min-0.1, clip_max+0.1, out=x_adv) return x_array_to_df(x_adv, feature_names=feature_names)
def _generate_batch(self, x_batch: np.ndarray, y_batch: np.ndarray) -> np.ndarray: """ Run the attack on a batch of images and labels. Parameters ---------- x_batch : np.ndarray A batch of original examples. y_batch : np.ndarray A batch of targets (0-1 hot). Returns ------- np.ndarray A batch of adversarial examples. """ # Initialize binary search c_current = self.initial_const * np.ones(x_batch.shape[0]) c_lower_bound = np.zeros(x_batch.shape[0]) c_upper_bound = 1e10 * np.ones(x_batch.shape[0]) # Initialize best distortions and best attacks globally o_best_dist = np.inf * np.ones(x_batch.shape[0]) o_best_attack = x_batch.copy() # Start with a binary search for _ in range(self.binary_search_steps): # Run with 1 specific binary search step best_dist, best_label, best_attack = self._generate_bss(x_batch, y_batch, c_current) # Update best results so far o_best_attack[best_dist < o_best_dist] = best_attack[best_dist < o_best_dist] o_best_dist[best_dist < o_best_dist] = best_dist[best_dist < o_best_dist] # Adjust the constant as needed c_current, c_lower_bound, c_upper_bound = self._update_const( y_batch, best_label, c_current, c_lower_bound, c_upper_bound ) return o_best_attack def _update_const( self, y_batch: np.ndarray, best_label: np.ndarray, c_batch: np.ndarray, c_lower_bound: np.ndarray, c_upper_bound: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Update constant `c_batch` from the ZOO objective. This characterizes the trade-off between attack strength and amount of noise introduced. Parameters ---------- y_batch : np.ndarray A batch of targets (0-1 hot). best_label : np.ndarray The best labels. c_batch : np.ndarray A batch of constants. c_lower_bound : np.ndarray The lower bound of the constant. c_upper_bound : np.ndarray The upper bound of the constant. Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray] The updated constant, lower bound, and upper bound. """ comparison = [ self._compare(best_label[i], np.argmax(y_batch[i])) and best_label[i] != -np.inf for i in range(len(c_batch)) ] for i, comp in enumerate(comparison): if comp: # Successful attack c_upper_bound[i] = min(c_upper_bound[i], c_batch[i]) if c_upper_bound[i] < 1e9: c_batch[i] = (c_lower_bound[i] + c_upper_bound[i]) / 2 else: # Failure attack c_lower_bound[i] = max(c_lower_bound[i], c_batch[i]) c_batch[i] = (c_lower_bound[i] + c_upper_bound[i]) / 2 if c_upper_bound[i] < 1e9 else c_batch[i] * 10 return c_batch, c_lower_bound, c_upper_bound def _compare(self, object1: Any, object2: Any) -> bool: """ Check two objects for equality if the attack is targeted, otherwise check for inequality. Parameters ---------- object1 : Any The first object. object2 : Any The second object. Returns ------- bool The result of the comparison. """ return object1 == object2 if self.targeted else object1 != object2 def _generate_bss( self, x_batch: np.ndarray, y_batch: np.ndarray, c_batch: np.ndarray ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Generate adversarial examples for a batch of inputs with a specific batch of constants. Parameters ---------- x_batch : np.ndarray A batch of original examples. y_batch : np.ndarray A batch of targets (0-1 hot). c_batch : np.ndarray A batch of constants. Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray] The best distortions, the best labels, and the best attacks. """ x_orig = x_batch.astype(np.float32) fine_tuning = np.full(x_batch.shape[0], False, dtype=bool) prev_loss = 1e6 * np.ones(x_batch.shape[0]) prev_l2dist = np.zeros(x_batch.shape[0]) # Resize and initialize Adam if self.use_resize: x_orig = self._resize_image(x_orig, self._init_size, self._init_size, True) assert (x_orig != 0).any() x_adv = x_orig.copy() else: x_orig = x_batch self._reset_adam(np.prod(self.input_shape).item()) if x_batch.shape == self._current_noise.shape: self._current_noise.fill(0) else: self._current_noise = np.zeros(x_batch.shape, dtype=np.float32) x_adv = x_orig.copy() # Initialize best distortions, best changed labels and best attacks best_dist = np.inf * np.ones(x_adv.shape[0]) best_label = -np.inf * np.ones(x_adv.shape[0]) best_attack = np.array([x_adv[i] for i in range(x_adv.shape[0])]) for iter_ in range(self.max_iter): # Upscaling for very large number of iterations if self.use_resize: if iter_ == 2000: x_adv = self._resize_image(x_adv, 64, 64) x_orig = zoom( x_orig, [ 1, x_adv.shape[1] / x_orig.shape[1], x_adv.shape[2] / x_orig.shape[2], x_adv.shape[3] / x_orig.shape[3], ], ) elif iter_ == 10000: x_adv = self._resize_image(x_adv, 128, 128) x_orig = zoom( x_orig, [ 1, x_adv.shape[1] / x_orig.shape[1], x_adv.shape[2] / x_orig.shape[2], x_adv.shape[3] / x_orig.shape[3], ], ) # Compute adversarial examples and loss x_adv = self._optimizer(x_adv, y_batch, c_batch) preds, l2dist, loss = self._loss(x_orig, x_adv, y_batch, c_batch) # Reset Adam if a valid example has been found to avoid overshoot mask_fine_tune = (~fine_tuning) & (loss == l2dist) & (prev_loss != prev_l2dist) fine_tuning[mask_fine_tune] = True self._reset_adam(self.adam_mean.size, np.repeat(mask_fine_tune, x_adv[0].size)) # type: ignore prev_l2dist = l2dist # Abort early if no improvement is obtained if self.abort_early and iter_ % self._early_stop_iters == 0: if (loss > 0.9999 * prev_loss).all(): break prev_loss = loss # Adjust the best result labels_batch = np.argmax(y_batch, axis=1) for i, (dist, pred) in enumerate(zip(l2dist, np.argmax(preds, axis=1))): if dist < best_dist[i] and self._compare(pred, labels_batch[i]): best_dist[i] = dist best_attack[i] = x_adv[i] best_label[i] = pred # Resize images to original size before returning best_attack = np.array(best_attack) if self.use_resize: if not self.channels_first: best_attack = zoom( best_attack, [ 1, int(x_batch.shape[1]) / best_attack.shape[1], int(x_batch.shape[2]) / best_attack.shape[2], 1, ], ) else: best_attack = zoom( best_attack, [ 1, 1, int(x_batch.shape[2]) / best_attack.shape[2], int(x_batch.shape[2]) / best_attack.shape[3], ], ) return best_dist, best_label, best_attack def _optimizer(self, x: np.ndarray, targets: np.ndarray, c_batch: np.ndarray) -> np.ndarray: """ Run the ADAM optimizer for a batch of inputs. Parameters ---------- x : np.ndarray A batch of original examples. targets : np.ndarray A batch of targets (0-1 hot). c_batch : np.ndarray A batch of constants. Returns ------- np.ndarray A batch of adversarial examples. """ # Variation of input for computing loss, same as in original implementation coord_batch = np.repeat(self._current_noise, 2 * self.nb_parallel, axis=0) coord_batch = coord_batch.reshape(2 * self.nb_parallel * self._current_noise.shape[0], -1) # Sample indices to prioritize for optimization if self.use_importance and np.unique(self._sample_prob).size != 1: indices = ( np.random.choice( coord_batch.shape[-1] * x.shape[0], self.nb_parallel * self._current_noise.shape[0], replace=False, p=self._sample_prob.flatten(), ) % coord_batch.shape[-1] ) else: try: indices = ( np.random.choice( coord_batch.shape[-1] * x.shape[0], self.nb_parallel * self._current_noise.shape[0], replace=False, ) % coord_batch.shape[-1] ) except ValueError as error: # pragma: no cover if "Cannot take a larger sample than population when 'replace=False'" in str(error): raise ValueError( "Too many samples are requested for the random indices. Try to reduce the number of parallel" "coordinate updates `nb_parallel`." ) from error raise # Create the batch of modifications to run for i in range(self.nb_parallel * self._current_noise.shape[0]): coord_batch[2 * i, indices[i]] += self.variable_h coord_batch[2 * i + 1, indices[i]] -= self.variable_h # Compute loss for all samples and coordinates, then optimize expanded_x = np.repeat(x, 2 * self.nb_parallel, axis=0).reshape((-1,) + x.shape[1:]) expanded_targets = np.repeat(targets, 2 * self.nb_parallel, axis=0).reshape((-1,) + targets.shape[1:]) expanded_c = np.repeat(c_batch, 2 * self.nb_parallel) _, _, loss = self._loss( expanded_x, expanded_x + coord_batch.reshape(expanded_x.shape), expanded_targets, expanded_c, ) if self.adam_mean is not None and self.adam_var is not None and self.adam_epochs is not None: self._current_noise = self._optimizer_adam_coordinate( loss, indices, self.adam_mean, self.adam_var, self._current_noise, self.learning_rate, self.adam_epochs, True, ) else: raise ValueError("Unexpected `None` in `adam_mean`, `adam_var` or `adam_epochs` detected.") if self.use_importance and self._current_noise.shape[2] > self._init_size: self._sample_prob = self._get_prob(self._current_noise).flatten() return x + self._current_noise def _optimizer_adam_coordinate( self, losses: np.ndarray, index: np.ndarray, mean: np.ndarray, var: np.ndarray, current_noise: np.ndarray, learning_rate: float, adam_epochs: np.ndarray, proj: bool, ) -> np.ndarray: """ Implementation of the ADAM optimizer for coordinate descent. Parameters ---------- losses : np.ndarray Overall loss. index : np.ndarray Indices of the coordinates to update. mean : np.ndarray The mean of the gradient (first moment). var : np.ndarray The uncentered variance of the gradient (second moment). current_noise : np.ndarray The current noise. learning_rate : float Learning rate for Adam optimizer. adam_epochs : np.ndarray Epochs to run the Adam optimizer. proj : bool Whether to project the noise to the L_p ball. Returns ------- np.ndarray Updated noise for coordinate descent. """ beta1, beta2 = 0.9, 0.999 # Estimate grads from loss variation (constant `h` from the paper is fixed to .0001) grads = np.array([(losses[i] - losses[i + 1]) / (2 * self.variable_h) for i in range(0, len(losses), 2)]) # ADAM update mean[index] = beta1 * mean[index] + (1 - beta1) * grads var[index] = beta2 * var[index] + (1 - beta2) * grads**2 corr = (np.sqrt(1 - np.power(beta2, adam_epochs[index]))) / (1 - np.power(beta1, adam_epochs[index])) orig_shape = current_noise.shape current_noise = current_noise.reshape(-1) current_noise[index] -= learning_rate * corr * mean[index] / (np.sqrt(var[index]) + 1e-8) adam_epochs[index] += 1 if proj and hasattr(self, "_clip_values") and self._clip_values is not None: clip_min, clip_max = self._clip_values current_noise[index] = np.clip(current_noise[index], clip_min, clip_max) return current_noise.reshape(orig_shape) def _reset_adam(self, nb_vars: int, indices: Optional[np.ndarray] = None) -> None: """ Reset the ADAM optimizer. Parameters ---------- nb_vars : int The number of variables. indices : Optional[np.ndarray], optional The indices to reset. The default is None. """ # If variables are already there and at the right size, reset values if self.adam_mean is not None and self.adam_mean.size == nb_vars: if indices is None: self.adam_mean.fill(0) self.adam_var.fill(0) # type: ignore self.adam_epochs.fill(1) # type: ignore else: self.adam_mean[indices] = 0 self.adam_var[indices] = 0 # type: ignore self.adam_epochs[indices] = 1 # type: ignore else: # Allocate Adam variables self.adam_mean = np.zeros(nb_vars, dtype=np.float32) self.adam_var = np.zeros(nb_vars, dtype=np.float32) self.adam_epochs = np.ones(nb_vars, dtype=int) def _resize_image(self, x: np.ndarray, size_x: int, size_y: int, reset: bool = False) -> np.ndarray: """ Resize the image to a specific size. Parameters ---------- x : np.ndarray The input image. size_x : int The size in the x direction. size_y : int The size in the y direction. reset : bool, optional Indicates whether to reset the image. The default is False. Returns ------- np.ndarray The resized image. """ if not self.channels_first: dims = (x.shape[0], size_x, size_y, x.shape[-1]) else: dims = (x.shape[0], x.shape[1], size_x, size_y) nb_vars = np.prod(dims).item() if reset: # Reset variables to original size and value if dims == x.shape: resized_x = x if x.shape == self._current_noise.shape: self._current_noise.fill(0) else: self._current_noise = np.zeros(x.shape, dtype=np.float32) else: resized_x = zoom( x, ( 1, dims[1] / x.shape[1], dims[2] / x.shape[2], dims[3] / x.shape[3], ), ) self._current_noise = np.zeros(dims, dtype=np.float32) self._sample_prob = np.ones(nb_vars, dtype=np.float32) / nb_vars else: # Rescale variables and reset values resized_x = zoom(x, (1, dims[1] / x.shape[1], dims[2] / x.shape[2], dims[3] / x.shape[3])) self._sample_prob = self._get_prob(self._current_noise, double=True).flatten() self._current_noise = np.zeros(dims, dtype=np.float32) # Reset Adam self._reset_adam(nb_vars) return resized_x def _get_prob(self, prev_noise: np.ndarray, double: bool = False) -> np.ndarray: """ Compute the probability of each pixel to be selected for optimization. Parameters ---------- prev_noise : np.ndarray The previous noise. double : bool, optional Indicates whether to double the size. The default is False. Returns ------- np.ndarray The probability of each pixel to be selected for optimization. """ dims = list(prev_noise.shape) channel_index = 1 if self.channels_first else 3 # Double size if needed if double: dims = [2 * size if i not in [0, channel_index] else size for i, size in enumerate(dims)] prob = np.empty(shape=dims, dtype=np.float32) image = np.abs(prev_noise) for channel in range(prev_noise.shape[channel_index]): if not self.channels_first: image_pool = self._max_pooling(image[:, :, :, channel], dims[1] // 8) if double: prob[:, :, :, channel] = np.abs(zoom(image_pool, [1, 2, 2])) else: prob[:, :, :, channel] = image_pool elif self.channels_first: image_pool = self._max_pooling(image[:, channel, :, :], dims[2] // 8) if double: prob[:, channel, :, :] = np.abs(zoom(image_pool, [1, 2, 2])) else: prob[:, channel, :, :] = image_pool prob /= np.sum(prob) return prob @staticmethod def _max_pooling(image: np.ndarray, kernel_size: int) -> np.ndarray: """ Perform max pooling on the image. Parameters ---------- image : np.ndarray The input image. kernel_size : int The size of the kernel. Returns ------- np.ndarray The pooled image. """ img_pool = np.copy(image) for i in range(0, image.shape[1], kernel_size): for j in range(0, image.shape[2], kernel_size): img_pool[:, i : i + kernel_size, j : j + kernel_size] = np.max( image[:, i : i + kernel_size, j : j + kernel_size], axis=(1, 2), keepdims=True, ) return img_pool