Source code for RFoT.RFoT

# -*- coding: utf-8 -*-
"""
Tensor decomposition is a powerful unsupervised Machine Learning method that enables the modeling of multi-dimensional data, including malware data. We introduce a novel ensemble semi-supervised classification algorithm, named Random Forest of Tensors (RFoT), that utilizes tensor decomposition to extract the complex and multi-faceted latent patterns from data. Our hybrid model leverages the strength of multi-dimensional analysis combined with clustering to capture the sample groupings in the latent components, whose combinations distinguish malware and benign-ware. The patterns extracted from a given data with tensor decomposition depend upon the configuration of the tensor such as dimension, entry, and rank selection. To capture the unique perspectives of different tensor configurations, we employ the “wisdom of crowds” philosophy and make use of decisions made by the majority of a randomly generated ensemble of tensors with varying dimensions, entries, and ranks.

As the tensor decomposition backend, RFoT offers two CPD algorithms. First, RFoT package includes the Python implementation of **CP-ALS** algorithm that was originally introduced in the `MATLAB Tensor Toolbox <https://www.tensortoolbox.org/cp.html>`_  :cite:p:`TTB_Software,Bader2006,Bader2008`. CP-ALS backend can also be used to **decompose each random tensor in a parallel manner**. RFoT can also be used with the Python implentation of the **CP-APR** algorithm with the **GPU capability** :cite:p:`10.1145/3519602`. Use of CP-APR backend allows decomposing each random tensor configuration both in an **embarrassingly parallel fashion in a single GPU**, and in a **multi-GPU parallel execution**.

"""

from pyCP_ALS import CP_ALS
from pyCP_APR import CP_APR

from .utilities.bin_columns import bin_columns
from .utilities.sample_tensor_configs import setup_tensors
from .utilities.build_tensor import setup_sptensor
from .utilities.istarmap import istarmap

from .clustering.gmm import gmm_cluster
from .clustering.ms import ms_cluster
from .clustering.component import component_cluster

from multiprocessing import Pool
from collections import Counter
import tqdm
import numpy as np
import pandas as pd
import operator


[docs]class RFoT: def __init__(self, max_depth=1, min_rank=2, max_rank=20, min_dimensions=3, max_dimensions=3, min_cluster_search=2, max_cluster_search=12, component_purity_tol=-1, cluster_purity_tol=0.9, n_estimators=80, rank="random", clustering="ms", decomp="cp_als", zero_tol=1e-08, dont_bin=list(), bin_scale=1.0, bin_entry=False, bin_max_map={"max": 10 ** 6, "bin": 10 ** 3}, tol=1e-4, n_iters=50, verbose=True, decomp_verbose=False, fixsigns=True, random_state=42, n_jobs=1, n_gpus=1, gpu_id=0): """ Initilize the **RFoT.RFoT** class. Parameters ---------- max_depth : int, optional Maximum number of times to run RFoT. The default is 1. .. note:: * If ``max_depth=1``, data is fit with RFoT once. * Otherwise, when ``max_depth`` is more than 1, each corresponding fit of the data with RFoT will work on the abstaining predictions from the prior fit. min_rank : int, optional Minimum tensor rank R to be randomly sampled. The default is 2. .. note:: * Should be more than 1. ``min_rank`` should be less than ``max_rank``. * Only used when ``rank="random"``. max_rank : int, optional Maximum tensor rank R to be randomly sampled. The default is 20. .. note:: * ``max_rank`` should be more than ``min_rank``. * Only used when ``rank="random"``. min_dimensions : int, optional When randomly sampling tensor configurations, minimum number of dimensions a tensor should have within the ensemble of random tensor configurations. The default is 3. max_dimensions : int, optional When randomly sampling tensor configurations, maximum number of dimensions a tensor should have within the ensemble of random tensor configurations. The default is 3. min_cluster_search : int, optional When searching for the number of clusters via likelihood in GMM, minimum number of clusters to try. The default is 2. max_cluster_search : int, optional When searching for the number of clusters via likelihood in GMM, maximum number of clusters to try. The default is 12. component_purity_tol : float or int, optional The purity score threshold for the latent factors. The default is -1.\n This threshold is calculated based on the known instances in the component.\n If the purity score of the latent factor is lower then the threshold ``component_purity_tol``, component is discarded and would not be used in obtaining clusters. .. note:: * By default ``component_purity_tol=-1``. * When ``component_purity_tol=-1``, component uniformity is not used in deciding whether to discard the components, and only ``cluster_purity_tol`` is used. * Either ``component_purity_tol`` or ``cluster_purity_tol`` must be more than 0. cluster_purity_tol : float, optional The purity score threshold for the clusters. The default is 0.9. This threshold is calculated based on the known instances in the cluster.\n If the purity score of the cluster is lower then the threshold ``cluster_purity_tol``, cluster is discarded and would not be used in the semi-supervised class voting of the unknown samples in the same cluster. .. note:: * When ``cluster_purity_tol=-1``, cluster uniformity is not used in deciding whether to discard the clusters, and only ``component_purity_tol`` is used. * Either ``component_purity_tol`` or ``cluster_purity_tol`` must be more than 0. n_estimators : int, optional Number of random tensor configurations in the ensemble. The default is 80. .. caution:: * Based on the hyper-parameter configurations, and the number of features in the dataset, it is possible to have less number of random tensor configurations than the one specified in ``n_estimators``. rank : int or string, optional Method for assigning rank for each random tensor to be decomposed. The default is "random".\n When ``rank="random"``, the rank for decomposition is sampled randomly from the range (``min_rank``, ``max_rank``).\n All the tensors in the ensemble can also be decomposed with same rank (example: ``rank=2``). clustering : string, optional Clustering method to be used for capturing the patterns from the latent factors. The default is "ms".\n .. admonition:: Options * ``clustering="ms"`` (Mean Shift) * ``clustering="component"`` * ``clustering="gmm"`` (Gaussian Mixture Model) decomp : string, optional Tensor decomposition backend/algorithm to be used. The default is "cp_als".\n .. admonition:: Options * ``decomp="cp_als"`` (Alternating least squares for CANDECOMP/PARAFAC Decomposition) * ``decomp="cp_apr"`` (CANDECOMP/PARAFAC Alternating Poisson Regression) * ``decomp="cp_apr_gpu"`` (CP-APR with GPU) * ``decomp="debug"`` .. note:: * GPU is used when ``decomp="cp_apr_gpu"``. * ``decomp="debug"`` allows serial computation where any error or warning would be raised to the user level. zero_tol : float, optional Samples who are close to the zero, where closeness defined by ``zero_tol``, are removed from the latent factor. The default is 1e-08. dont_bin : list, optional List of column (feature) indices whose values should not be binned. The default is list(). bin_scale : float, optional When using a given column (feature) as a tensor dimension, the feature values are binned to create feature value to tensor dimension mapping. This allows a feature value to be represented by an index in the tensor dimension for that feature. The default is 1.0.\n When ``bin_scale=1.0``, the size of the dimension that represents the given feature will be equal to the number of unique values in that column (feature). .. seealso:: * See `Pandas Cut <https://pandas.pydata.org/docs/reference/api/pandas.cut.html>`_ for value binning. bin_entry : bool, optional If ``bin_entry=True``, the features that are used as tensor entry are also binned. The default is False. bin_max_map : dict, optional ``bin_max_map`` prevents any dimension of any of the tensors in the ensemble to be too large. The default is ``bin_max_map={"max": 10 ** 6, "bin": 10 ** 3}``. \n Specifically, ``bin_max_map["bin"]`` is used to determine the size of the dimension when: :math:`bin\_scale \cdot |f_i| > bin\_max\_map["max"]` tol : float, optional CP-ALS hyper-parameter. The default is 1e-4. n_iters : int, optional Maximum number of iterations (epoch) to run the tensor decomposition algorithm. The default is 50. verbose : bool, optional If ``verbose=True``, progress of the method is displayed. The default is True. decomp_verbose : bool, optional If ``decomp_verbose=True``, progress of the tensor decomposition backend is displayed for each random tensor. The default is False. fixsigns : bool, optional CP-ALS hyper-parameter. The default is True. random_state : int, optional Random seed. The default is 42. n_jobs : int, optional Number of prallel tensor decompositions to perform when decomposing the random tensors from the ensemble. The default is 1. n_gpus : int, optional Number of GPUs. The default is 1. .. note:: * Only used when ``decomp="cp_apr_gpu"``. * When ``n_gpus`` is more than 1, and when ``n_jobs`` is more than one, multi-GPU parallel execution is performed. For example, ``n_gpus=2`` and ``n_jobs=2`` will use 2 GPUs, and 1 job will be run on each GPU in parallel. gpu_id : int, optional GPU device ID when using GPU. The default is 0. .. note:: * Only used when ``decomp="cp_apr_gpu"``. * Not considered when ``n_gpus`` is more than 1. Raises ------ Exception Invalid parameter selection. Returns ------- None. """ self.max_depth = max_depth self.min_rank = min_rank self.max_rank = max_rank self.min_dimensions = min_dimensions self.max_dimensions = max_dimensions self.min_cluster_search = min_cluster_search self.max_cluster_search = max_cluster_search self.component_purity_tol = component_purity_tol self.cluster_purity_tol = cluster_purity_tol self.n_estimators = n_estimators self.rank = rank self.clustering = clustering self.decomp = decomp self.zero_tol = zero_tol self.dont_bin = dont_bin self.bin_scale = bin_scale self.bin_entry = bin_entry self.bin_max_map = bin_max_map self.tol = tol self.n_iters = n_iters self.verbose = verbose self.decomp_verbose = decomp_verbose self.fixsigns = fixsigns self.random_state = random_state self.classes = None self.n_jobs = n_jobs self.n_gpus = n_gpus self.gpu_id = gpu_id self.allowed_decompositions = ["cp_als", "cp_apr", "cp_apr_gpu", "debug"] assert ( self.cluster_purity_tol > 0 or self.component_purity_tol > 0 ), "Cluster purity and/or component purity must be >0" if self.clustering == "gmm": self.cluster = gmm_cluster elif self.clustering == "ms": self.cluster = ms_cluster elif self.clustering == "component": self.cluster = component_cluster else: raise Exception("Unknown clustering method is chosen.")
[docs] def get_params(self): """ Returns the parameters of the RFoT object. Returns ------- dict Parameters and data stored in the RFoT object. """ return vars(self)
[docs] def set_params(self, **parameters): """ Used to set the parameters of RFoT object. Parameters ---------- **parameters : dict Dictionary of parameters where keys are the variable names. Returns ------- object RFoT object. """ for parameter, value in parameters.items(): setattr(self, parameter, value) return self
[docs] def predict(self, X: np.array, y: np.ndarray): """ Semi-supervised prediction of the unknown samples (with labels -1) based on the known samples. .. important:: * Use -1 for the unknown samples. * In returned ``y_pred``, samples with -1 predictions are said to be **abstaining predictions** (i.e. model says **"we do not know that the label for that sample is"**). * Returned ``y_pred`` includes both known and unknown samples, where the labels of unknown samples may have changed from the original ``y``. .. admonition:: Example Usage .. code-block:: python from RFoT import RFoT from sklearn import datasets from sklearn.metrics import f1_score import numpy as np # load the dataset iris = datasets.load_iris() X = iris["data"] y = (iris["target"] == 2).astype(np.int) y_true = y.copy() y_experiment = y_true.copy() # label 30% some as unknown rng = np.random.RandomState(42) random_unlabeled_points = rng.rand(y_experiment.shape[0]) < 0.3 y_experiment[random_unlabeled_points] = -1 # predict with RFoT model = RFoT( bin_scale=1, max_dimensions=3, component_purity_tol=1.0, min_rank=2, max_rank=3, n_estimators=50, bin_entry=True, clustering="ms", max_depth=2, n_jobs=50, ) y_pred = model.predict(X, y_experiment) # look at results unknown_indices = np.argwhere(y_experiment == -1).flatten() did_predict_indices = np.argwhere(y_pred[unknown_indices] != -1).flatten() abstaining_count = len(np.argwhere(y_pred == -1)) f1 = f1_score( y_true[unknown_indices][did_predict_indices], y_pred[unknown_indices][did_predict_indices], average="weighted", ) print("------------------------") print("Num. of Abstaining", abstaining_count) print("Percent Abstaining", (abstaining_count / len(unknown_indices)) * 100, "%") print("F1=", f1) .. admonition:: Example Usage .. code-block:: python # y is the vector of known and unknown labels passed to RFoT # y_pred is the prediction returned by RFoT # y_true is the ground truth import numpy as np from sklearn.metrics import f1_score unknown_indices = np.argwhere(y == -1).flatten() did_predict_indices = np.argwhere(y_pred[unknown_indices] != -1).flatten() abstaining_count = len(np.argwhere(y_pred == -1)) f1 = f1_score( y_true[unknown_indices][did_predict_indices], y_pred[unknown_indices][did_predict_indices], average="weighted", ) print("Num. of Abstaining", abstaining_count) print("Percent Abstaining", (abstaining_count / len(unknown_indices)) * 100, "%") print("F1=", f1) Parameters ---------- X : np.array Features matrix X where columns are the m features and rows are the n samples. y : np.ndarray Vector of size n with the label for each sample. Unknown samples have the labels -1. Returns ------- y_pred : np.ndarray Predictions made over the original y. Known samples are kept as is. Unknown samples that are no longer labeled as -1 did have prediction. Samples that are still -1 are the abstaining predictions. """ # # Input verification # if isinstance(X, pd.DataFrame): X = X.to_numpy() if isinstance(y, list): y = np.array(y) if np.count_nonzero(y == -1) == 0: raise Exception("No unknown samples found. Label unknown with -1 in y.") assert ( np.count_nonzero(y == -2) == 0 ), "Do not use -2 as the label as it is internally used!" assert len(X) == len( y ), "Number of samples does not match the number of labels!" assert ( np.count_nonzero(y == -1) > 0 ), "No unknown samples found! Use -1 in labels to mark the unknown samples." # # Setup # # add column to X to serve as sample IDs X = np.insert(X, 0, np.arange(0, len(X)), axis=1) # convert the features matrix into dataframe X = pd.DataFrame(X) y_pred = y.copy() y_pred_flag = y.copy() X_curr = X.copy() y_curr = y.copy() curr_indices = np.arange(0, len(X)) # # Main loop, depth # for depth in range(self.max_depth): n_abstaining = np.count_nonzero(y_pred == -1) y_pred_curr, predicted_indices = self._tree(X_curr, y_curr, depth) y_pred[curr_indices] = y_pred_curr y_pred_flag[curr_indices[predicted_indices]] = -2 n_abstaining_new = np.count_nonzero(y_pred == -1) # no change in abstaining predictions, no need to continue if n_abstaining_new == n_abstaining: break # form the data for the next depth curr_indices = np.argwhere(y_pred_flag >= -1).flatten() X_curr = X.iloc[curr_indices].copy() X_curr[0] = np.arange(0, len(X_curr)) y_curr = y[curr_indices].copy() return y_pred
def _tree(self, X: np.array, y: np.array, depth: int): """ Creates random tensor configurations, then builds the tensors in COO format. These tensors are then decomposed and the sample patters are captured with clustering over the latent factors for the first dimension. Then semi-supervised voting is performed over the clusters. These votes are returned. Parameters ---------- X : np.array Features matrix X where columns are the m features and rows are the n samples. y : np.ndarray Vector of size n with the label for each sample. Unknown samples have the labels -1. depth : int Number of times to run RFoT. Returns ------- y_pred : np.ndarray Predictions made over the original y. Known samples are kept as is. Unknown samples that are no longer labeled as -1 did have prediction. Samples that are still -1 are the abstaining predictions. predicted_indices : np.ndarray The indices in y_pred where unknown labels did have prediction values. """ # unique classes classes = list(set(y)) classes.pop(classes.index(-1)) self.classes = classes # # Sample the first set of tensor options # tensor_configs = setup_tensors( self.min_dimensions, self.max_dimensions, X, self.random_state + depth, self.n_estimators, self.rank, self.min_rank, self.max_rank, ) # # Work on each random tensor configuration # tensor_votes = list() tasks = [] for idx, (key, config) in enumerate(tensor_configs.items()): if self.decomp in ["cp_apr_gpu"]: if self.n_gpus == 1: tasks.append((config, X, y, self.gpu_id)) else: tasks.append((config, X, y, idx%self.n_gpus)) else: tasks.append((config, X, y)) if self.decomp in ["cp_als", "cp_apr"]: pool = Pool(processes=self.n_jobs) for tv in tqdm.tqdm( pool.istarmap(self._get_tensor_votes, tasks, chunksize=1), total=len(tasks), disable=not (self.verbose), ): tensor_votes.append(tv) elif self.decomp in ["cp_apr_gpu"]: pool = Pool(processes=self.n_jobs) for tv in tqdm.tqdm( pool.istarmap(self._get_tensor_votes, tasks, chunksize=1), total=len(tasks), disable=not (self.verbose), ): tensor_votes.append(tv) #for task in tqdm.tqdm(tasks, total=len(tasks), disable=not (self.verbose)): # tv = self._get_tensor_votes(config=task[0], X=task[1], y=task[2], gpu_id=task[3]) # tensor_votes.append(tv) elif self.decomp in ["debug"]: for task in tqdm.tqdm(tasks, total=len(tasks), disable=not (self.verbose)): tv = self._get_tensor_votes(config=task[0], X=task[1], y=task[2]) tensor_votes.append(tv) # # Combine votes from each tensor # votes = dict() for tv in tensor_votes: for sample_idx, sample_votes in tv.items(): if sample_idx in votes: for idx, v in enumerate(sample_votes): votes[sample_idx][idx] += v else: votes[sample_idx] = sample_votes # # Max vote on the results of current depth # self.votes = votes predicted_indices = [] y_pred = y.copy() for sample_idx, sample_votes in votes.items(): # no decision was made (50-50) if len(set(sample_votes)) == 1: y_pred[sample_idx] = -1 continue max_value = max(sample_votes) max_index = sample_votes.index(max_value) y_pred[sample_idx] = max_index predicted_indices.append(sample_idx) return y_pred, predicted_indices def _get_tensor_votes(self, config, X, y, gpu_id=0): """ Sets up the tensor decomposition backend. Then bins the features to build the given current tensor. Then the tensor is decomposed and the sample patters are captured with clustering over the latent factors for the first dimension. Then semi-supervised voting is performed over the clusters. These votes are returned. Parameters ---------- config : dict Dictionary of tensor configuration. X : np.array Features matrix X where columns are the m features and rows are the n samples. y : np.ndarray Vector of size n with the label for each sample. Unknown samples have the labels -1. gpu_id : int, optional If running CP-APR, which GPU to use. The default is 0. Returns ------- votes : dict Dictionary of votes for the samples. """ # # setup backend # if self.decomp in ["cp_als", "debug"]: backend = CP_ALS( tol=self.tol, n_iters=self.n_iters, verbose=self.decomp_verbose, fixsigns=self.fixsigns, random_state=self.random_state, ) elif self.decomp in ["cp_apr"]: backend = CP_APR( n_iters=self.n_iters, verbose=self.decomp_verbose, random_state=self.random_state, ) elif self.decomp in ["cp_apr_gpu"]: backend = CP_APR( n_iters=self.n_iters, verbose=self.decomp_verbose, random_state=self.random_state, method='torch', device='gpu', device_num=gpu_id, return_type='numpy' ) else: raise Exception( "Unknown tensor decomposition method. Choose from: " + ", ".join(self.allowed_decompositions) ) # original indices all_indices = np.arange(0, len(X)) votes = {} # # bin the dimensons # if self.bin_entry: curr_entry = config["entry"] curr_dims = config["dimensions"] curr_df = bin_columns( X[curr_dims + [curr_entry]].copy(), self.bin_max_map, self.dont_bin + [0], self.bin_scale, ) else: curr_entry = config["entry"] curr_dims = config["dimensions"] curr_df = bin_columns( X[curr_dims + [curr_entry]].copy(), self.bin_max_map, self.dont_bin + [curr_entry] + [0], self.bin_scale, ) # # Factorize the current tensor # curr_tensor = setup_sptensor(curr_df, config) decomp = backend.fit( coords = curr_tensor["nnz_coords"], values = curr_tensor["nnz_values"], rank = int(config["rank"]), ) del backend # use the latent factor representing the samples (mode 0) latent_factor_0 = decomp["Factors"]["0"] # # Work on each component # for k in range(latent_factor_0.shape[1]): Mk = latent_factor_0[:, k] # # mask out elements close to 0 # mask = ~np.isclose(Mk, 0, atol=self.zero_tol) M_m = Mk[mask] curr_y = y[mask] known_sample_indices = np.argwhere(y[mask] != -1).flatten() unknown_sample_indices = np.argwhere(y[mask] == -1).flatten() if len(curr_y) == 0: continue # # Capture clusters from current component # params = { "M_k": M_m, "min_cluster_search": self.min_cluster_search, "max_cluster_search": self.max_cluster_search, "random_state": self.random_state, } try: cluster_labels, n_opt = self.cluster(params) except Exception: # error when clustering this component, skip continue # # Calculate Component Quality # if self.component_purity_tol > 0: purity_score = self._component_quality( n_opt, cluster_labels, known_sample_indices, curr_y ) # poor component quality, poor purity among clusters, skip component if purity_score < self.component_purity_tol: continue # # Semi-supervised voting # votes = self._get_cluster_votes( n_opt, cluster_labels, known_sample_indices, unknown_sample_indices, curr_y, all_indices, mask, votes, ) return votes def _get_cluster_votes( self, n_opt, cluster_labels, known_sample_indices, unknown_sample_indices, curr_y, all_indices, mask, votes,): """ Performs semi-supervised voting. Parameters ---------- n_opt : int Number of clusters. cluster_labels : np.ndarray List if cluster labels for each sample. known_sample_indices : np.ndarray Indices of the known samples. unknown_sample_indices : TYPE Indices of the unknown samples. curr_y : np.ndarray Labels. all_indices : np.ndarray Original indices. mask : np.ndarray Zero tol mask. votes : dict Votes so far. Returns ------- votes : dict Updated votes. """ for c in range(n_opt): # current cluster sample informations cluster_c_indices = np.argwhere(cluster_labels == c).flatten() # empty cluster if len(cluster_c_indices) == 0: continue cluster_c_known_indices = np.intersect1d( known_sample_indices, cluster_c_indices ) cluster_c_unknown_indices = np.intersect1d( unknown_sample_indices, cluster_c_indices ) cluster_c_known_labels = curr_y[cluster_c_known_indices] # everyone is known in the cluster if len(cluster_c_unknown_indices) == 0: continue # no known samples in the cluster - abstaining prediction if len(cluster_c_known_labels) == 0: continue # count the known labels in the cluster cluster_c_known_label_counts = dict(Counter(cluster_c_known_labels)) # cluster quality if self.cluster_purity_tol > 0: cluster_quality_score = max( cluster_c_known_label_counts.values() ) / sum(cluster_c_known_label_counts.values()) # cluster quality is poor, skip this cluster if cluster_quality_score < self.cluster_purity_tol: continue # vote vote_label = max( cluster_c_known_label_counts.items(), key=operator.itemgetter(1) )[0] org_unknown_indices = all_indices[mask][cluster_c_unknown_indices] for idx in org_unknown_indices: if idx in votes: votes[idx][vote_label] += 1 else: votes[idx] = [0] * len(self.classes) votes[idx][vote_label] += 1 return votes def _component_quality(self, n_opt, cluster_labels, known_sample_indices, curr_y): """ Calculates component quality based on cluster purity score. Parameters ---------- n_opt : int Number of clusters. cluster_labels : np.ndarray Labels for the samples in the cluster. known_sample_indices : np.ndarray Array of indices for known samples. curr_y : np.ndarray Labels for known and unknown samples. Returns ------- float Purity score. """ maximums = [] total = 0 for c in range(n_opt): cluster_c_indices = np.argwhere(cluster_labels == c).flatten() # empty cluster if len(cluster_c_indices) == 0: continue cluster_c_known_indices = np.intersect1d( known_sample_indices, cluster_c_indices ) # no known samples in the cluster - abstaining prediction if len(cluster_c_known_indices) == 0: continue cluster_c_known_labels = curr_y[cluster_c_known_indices] cluster_c_known_label_counts = dict(Counter(cluster_c_known_labels)) maximums.append(max(cluster_c_known_label_counts.values())) total += len(cluster_c_known_indices) # if none of the clusters had known instances if total == 0: return -np.inf purity_score = sum(maximums) / total return purity_score