from typing import Tuple
import numpy as np
from numpy import (
bitwise_and as AND,
bitwise_not as NOT,
bitwise_or as OR,
bitwise_xor as XOR,
)
[docs]
def XNOR(input: np.ndarray, other: np.ndarray) -> np.ndarray:
"""Computes the XNOR gate. (semantically the same as `input == other`)
Args:
input (np.ndarray): Input array
other (np.ndarray): Other input array
Returns:
np.ndarray: XNOR(input, other) as an array
"""
return NOT(XOR(input, other))
[docs]
class RACER:
def __init__(
self,
alpha=0.9,
suppress_warnings=False,
benchmark=False,
):
"""Initialize the RACER class
Args:
alpha (float, optional): Value of alpha according to the RACER paper. Defaults to 0.9.
suppress_warnings (bool, optional): Whether to suppress any warnings raised during prediction. Defaults to False.
benchmark (bool, optional): Whether to time the `fit` method for benchmark purposes. Defaults to False.
"""
self._alpha, self._beta = alpha, 1.0 - alpha
self._suppress_warnings = suppress_warnings
self._benchmark = benchmark
self._has_fit = False
[docs]
def fit(self, X: np.ndarray, y: np.ndarray) -> None:
"""Fits the RACER algorithm on top of input data X and targets y.
The code is written in close correlation to the pseudo-code provided in the RACER paper with some slight modifications.
Args:
X (np.ndarray): Features vector
y (np.ndarray): Targets vector
"""
if self._benchmark:
from time import perf_counter
tic = perf_counter()
self._X, self._y = X, y
self._cardinality, self._rule_len = self._X.shape
self._classes = np.unique(self._y, axis=0)
self._class_indices = {
self._label_to_int(cls): np.where(XNOR(self._y, cls).min(axis=-1))[0]
for cls in self._classes
}
self._create_init_rules()
for cls in self._class_indices.keys():
for i in range(len(self._class_indices[cls])):
for j in range(i + 1, len(self._class_indices[cls])):
self._process_rules(
self._class_indices[cls][i], self._class_indices[cls][j]
)
independent_indices = NOT(self._extants_covered)
self._extants_if, self._extants_then, self._fitnesses = (
self._extants_if[independent_indices],
self._extants_then[independent_indices],
self._fitnesses[independent_indices],
)
self._generalize_extants()
# https://stackoverflow.com/questions/64238462/numpy-descending-stable-arg-sort-of-arrays-of-any-dtype
args = (
len(self._fitnesses)
- 1
- np.argsort(self._fitnesses[::-1], kind="stable")[::-1]
)
self._final_rules_if, self._final_rules_then, self._fitnesses = (
self._extants_if[args],
self._extants_then[args],
self._fitnesses[args],
)
self._finalize_rules()
self._has_fit = True
if self._benchmark:
self._bench_time = perf_counter() - tic
[docs]
def predict(self, X: np.ndarray, convert_dummies=True) -> np.ndarray:
"""Given input X, predict label using RACER
Args:
X (np.ndarray): Input features vector
convert_dummies (bool, optional): Whether to convert dummy labels back to integert format. Defaults to True.
Returns:
np.ndarray: Label as predicted by RACER
"""
assert self._has_fit, "RACER has not been fit yet."
labels = np.zeros((len(X), self._final_rules_then.shape[1]), dtype=bool)
found = np.zeros(len(X), dtype=bool)
for i in range(len(self._final_rules_if)):
covered = self._covered(X, self._final_rules_if[i])
labels[AND(covered, NOT(found))] = self._final_rules_then[i]
found[covered] = True
all_found = found.sum() == len(X)
if all_found:
break
if not all_found:
if not self._suppress_warnings:
print(
f"WARNING: RACER was unable to find a perfect match for {len(X) - found.sum()} instances out of {len(X)}"
)
print(
"These instances will be labelled as the majority class during training."
)
leftover_indices = np.where(NOT(found))[0]
for idx in leftover_indices:
labels[idx] = self._closest_match(X[idx])
if convert_dummies:
labels = np.argmax(labels, axis=-1)
return labels
[docs]
def _bool2str(self, bool_arr: np.ndarray) -> str:
"""Converts a boolean array to a human-readable string
Args:
bool_arr (np.ndarray): The input boolean array
Returns:
str: Human-readable string output
"""
return np.array2string(bool_arr.astype(int), separator="")
[docs]
def display_rules(self) -> None:
"""Print out the final rules"""
assert self._has_fit, "RACER has not been fit yet."
print("Algorithm Parameters:")
print(f"\t- Alpha: {self._alpha}")
if self._benchmark:
print(f"\t- Time to fit: {self._bench_time}s")
print(
f"\nFinal Rules ({len(self._final_rules_if)} total): (if --> then (label) | fitness)"
)
for i in range(len(self._final_rules_if)):
print(
f"\t{self._bool2str(self._final_rules_if[i])} -->"
f" {self._bool2str(self._final_rules_then[i])}"
f" ({self._label_to_int(self._final_rules_then[i])})"
f" | {self._fitnesses[i]}"
)
[docs]
def _closest_match(self, X: np.ndarray) -> np.ndarray:
"""Find the closest matching rule to `X` (This will be extended later)
Args:
X (np.ndarray): Input rule `X`
Returns:
np.ndarray: Matched rule
"""
return self._majority_then
[docs]
def score(self, X_test: np.ndarray, y_test: np.ndarray) -> float:
"""Returns accuracy on the provided test data.
Args:
X_test (np.ndarray): Test features vector
y_test (np.ndarray): Test targets vector
Returns:
float: Accuracy score
"""
assert self._has_fit, "RACER has not been fit yet."
try:
from sklearn.metrics import accuracy_score
except ImportError as e:
raise ImportError(
"scikit-learn is required to use the score function. Install wit `pip install scikit-learn`."
)
if y_test.ndim != 1 and y_test.shape[1] != 1:
y_test = np.argmax(y_test, axis=-1)
y_pred = self.predict(X_test)
return accuracy_score(y_test, y_pred)
[docs]
def _fitness_fn(self, rule_if: np.ndarray, rule_then: np.ndarray) -> np.ndarray:
"""Returns fitness for a given rule according to the RACER paper
Args:
rule_if (np.ndarray): If part of a rule (x)
rule_then (np.ndarray): Then part of a rule (y)
Returns:
np.ndarray: Fitness score for the rule as defined in the RACER paper
"""
n_covered, n_correct = self._confusion(rule_if, rule_then)
accuracy = n_correct / n_covered
coverage = n_covered / self._cardinality
return self._alpha * accuracy + self._beta * coverage
[docs]
def _covered(self, X: np.ndarray, rule_if: np.ndarray) -> np.ndarray:
"""Returns indices of instances if `X` that are covered by `rule_if`.
Note that rule covers instance if EITHER of the following holds in a bitwise manner:
1. instance[i] == 0
2. instance[i] == 1 AND rule[i] == 1
Args:
X (np.ndarray): Instances
rule_if (np.ndarray): If part of rule (x)
Returns:
np.ndarray: An array containing indices in `X` that are covered by `rule_if`
"""
covered = OR(NOT(X), AND(rule_if, X)).min(axis=-1)
return covered
[docs]
def _confusion(
self, rule_if: np.ndarray, rule_then: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Returns n_correct and n_covered for instances classified by a rule.
Args:
rule_if (np.ndarray): If part of rule (x)
rule_then (np.ndarray): Then part of rule (y)
Returns:
Tuple[np.ndarray, np.ndarray]: (n_covered, n_correct)
"""
covered = self._covered(self._X, rule_if)
n_covered = covered.sum()
y_covered = self._y[covered]
n_correct = XNOR(y_covered, rule_then).min(axis=-1).sum()
return n_covered, n_correct
[docs]
def _get_majority(self) -> np.ndarray:
"""Return the majority rule_then from self._y
Returns:
np.ndarray: Majority rule_then
"""
u, indices = np.unique(self._y, axis=0, return_inverse=True)
return u[np.bincount(indices).argmax()]
[docs]
def _create_init_rules(self) -> None:
"""Creates an initial set of rules from theinput feature vectors"""
self._extants_if = self._X.copy()
self._extants_then = self._y.copy()
self._extants_covered = np.zeros(len(self._X), dtype=bool)
self._majority_then = self._get_majority()
self._fitnesses = np.array(
[
self._fitness_fn(rule_if, rule_then)
for rule_if, rule_then in zip(self._X, self._y)
]
)
[docs]
def _composable(self, idx1: int, idx2: int) -> bool:
"""Returns true if two rules indicated by their indices are composable
Args:
idx1 (int): Index of the first rule
idx2 (int): Index of the second rule
Returns:
bool: True if labels match and neither of the rules are covered. False otherwise.
"""
labels_match = XNOR(self._extants_then[idx1], self._extants_then[idx2]).min()
return (
labels_match
and not self._extants_covered[idx1]
and not self._extants_covered[idx2]
)
[docs]
def _process_rules(self, idx1: int, idx2: int) -> None:
"""Process two rules indiciated by their indices
Args:
idx1 (int): Index of the first rule
idx2 (int): Index of the second rule
"""
if self._composable(idx1, idx2):
composition = self._compose(self._extants_if[idx1], self._extants_if[idx2])
composition_fitness = self._fitness_fn(
composition, self._extants_then[idx1]
)
if composition_fitness > np.maximum(
self._fitnesses[idx1], self._fitnesses[idx2]
):
self._update_extants(
idx1, composition, self._extants_then[idx1], composition_fitness
)
[docs]
def _compose(self, rule1: np.ndarray, rule2: np.ndarray) -> np.ndarray:
"""Composes rule1 with rule2
Args:
rule1 (np.ndarray): The first rule
rule2 (np.ndarray): The second rule
Returns:
np.ndarray: The composed rule which is simply the bitwise OR of the two rules
"""
return OR(rule1, rule2)
[docs]
def _update_extants(
self,
index: int,
new_rule_if: np.ndarray,
new_rule_then: np.ndarray,
new_rule_fitness: np.ndarray,
):
"""Remove all rules from current extants that are covered by `new_rule`.
Then append new rule to extants.
Args:
index (int): Index of `new_rule`
new_rule_if (np.ndarray): If part of `new_rule` (x)
new_rule_then (np.ndarray): Then part of `new_rule` (y)
new_rule_fitness (np.ndarray): Fitness of the `new_rule`
"""
same_class_indices = self._class_indices[self._label_to_int(new_rule_then)]
covered = self._covered(self._extants_if[same_class_indices], new_rule_if)
self._extants_covered[same_class_indices[covered]] = True
self._extants_covered[index] = False
self._extants_if[index], self._extants_then[index], self._fitnesses[index] = (
new_rule_if,
new_rule_then,
new_rule_fitness,
)
[docs]
def _label_to_int(self, label: np.ndarray) -> int:
"""Converts dummy label to int
Args:
label (np.ndarray): Label to convert
Returns:
int: Converted label
"""
return int(np.argmax(label))
[docs]
def _generalize_extants(self) -> None:
"""Generalize the extants by flipping every 0 to a 1 and checking if the fitness improves."""
new_extants_if = np.zeros_like(self._extants_if, dtype=bool)
for i in range(len(self._extants_if)):
for j in range(len(self._extants_if[i])):
if not self._extants_if[i][j]:
self._extants_if[i][j] = True
fitness = self._fitness_fn(
self._extants_if[i], self._extants_then[i]
)
if fitness > self._fitnesses[i]:
self._fitnesses[i] = fitness
else:
self._extants_if[i][j] = False
new_extants_if[i] = self._extants_if[i]
self._extants_if = new_extants_if
[docs]
def _finalize_rules(self) -> None:
"""Removes redundant rules to form the final ruleset"""
temp_rules_if = self._final_rules_if
temp_rules_then = self._final_rules_then
temp_rules_fitnesses = self._fitnesses
i = 0
while i < len(temp_rules_if) - 1:
mask = np.ones(len(temp_rules_if), dtype=bool)
covered = self._covered(temp_rules_if[i + 1 :], temp_rules_if[i])
mask[i + 1 :][covered] = False
temp_rules_if, temp_rules_then, temp_rules_fitnesses = (
temp_rules_if[mask],
temp_rules_then[mask],
temp_rules_fitnesses[mask],
)
i += 1
self._final_rules_if, self._final_rules_then, self._fitnesses = (
temp_rules_if,
temp_rules_then,
temp_rules_fitnesses,
)