|
| 1 | +import itertools |
| 2 | +from collections.abc import Iterable |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +import pandas as pd |
| 6 | +from deap import base, creator |
| 7 | +from sklearn.base import ClassifierMixin |
| 8 | +from sklearn.linear_model import LinearRegression |
| 9 | +from sklearn.preprocessing import PolynomialFeatures |
| 10 | + |
| 11 | +from psyke import Target |
| 12 | +from psyke.fuzzy import fuzzify, plot_membership, fuzzy_labels, generate_fuzzy_rules, get_activations |
| 13 | +from psyke.genetic import regions_from_cuts, output_estimation |
| 14 | +from psyke.genetic.fgin import FGIn |
| 15 | + |
| 16 | + |
| 17 | +class FInGER: |
| 18 | + |
| 19 | + def __init__(self, predictor, features, sigmas, max_slices, min_rules=1, max_poly=1, alpha=0.5, indpb=0.5, |
| 20 | + tournsize=3, n_gen=50, n_pop=50, membership_shape='trap', metric='R2', valid=None, |
| 21 | + output=Target.REGRESSION): |
| 22 | + |
| 23 | + self.predictor = predictor |
| 24 | + self.features = features |
| 25 | + self.max_features = len(features) |
| 26 | + self.sigmas = sigmas |
| 27 | + self.max_slices = max_slices |
| 28 | + self.min_rules = min_rules |
| 29 | + self.poly = max_poly |
| 30 | + self._output = Target.CLASSIFICATION if isinstance(predictor, ClassifierMixin) else output |
| 31 | + self.valid = valid |
| 32 | + self.trained_poly = None |
| 33 | + |
| 34 | + self.alpha = alpha |
| 35 | + self.indpb = indpb |
| 36 | + self.tournsize = tournsize |
| 37 | + self.metric = metric |
| 38 | + self.n_gen = n_gen |
| 39 | + self.n_pop = n_pop |
| 40 | + |
| 41 | + self.shape = membership_shape |
| 42 | + self.valid_masks = None |
| 43 | + self.outputs = None |
| 44 | + self.functions_domains = {} |
| 45 | + |
| 46 | + creator.create("FitnessMax", base.Fitness, weights=(1.0,)) |
| 47 | + creator.create("Individual", list, fitness=creator.FitnessMax) |
| 48 | + |
| 49 | + # TODO: a class for methods and attributes supporting polynomial combinations |
| 50 | + def __poly_names(self): |
| 51 | + return [''.join(['' if pp == 0 else f'{n} * ' if pp == 1 else f'{n}**{pp} * ' |
| 52 | + for pp, n in zip(p, self.trained_poly.feature_names_in_)])[:-3] |
| 53 | + for p in self.trained_poly.powers_] |
| 54 | + |
| 55 | + @staticmethod |
| 56 | + def _get_cuts(individual, slices): |
| 57 | + boundaries = np.cumsum([0] + list(slices)) |
| 58 | + return [sorted(individual[boundaries[i]:boundaries[i + 1]]) for i in range(len(slices))] |
| 59 | + |
| 60 | + def extract(self, dataframe: pd.DataFrame) -> str: |
| 61 | + best = {} |
| 62 | + for poly in range(self.poly): |
| 63 | + for slices in list(itertools.product(range(1, self.max_slices + 1), repeat=self.max_features)): |
| 64 | + gr = FGIn((dataframe.iloc[:, :-1], dataframe.iloc[:, -1]), self.valid, self.features, self.sigmas, |
| 65 | + slices, min_rules=self.min_rules, poly=poly + 1, alpha=self.alpha, indpb=self.indpb, |
| 66 | + tournsize=self.tournsize, membership_shape=self.shape, metric=self.metric, |
| 67 | + output=self._output, warm=True) |
| 68 | + |
| 69 | + b, score, _, _ = gr.run(n_gen=self.n_gen, n_pop=self.n_pop) |
| 70 | + best[(score, poly + 1, slices)] = b |
| 71 | + m = min(best) |
| 72 | + poly, slices, best = m[1], m[2], best[m] |
| 73 | + self.trained_poly = PolynomialFeatures(degree=poly, include_bias=False) |
| 74 | + |
| 75 | + cuts = FInGER._get_cuts(best, slices) |
| 76 | + self.functions_domains = fuzzify(cuts, dataframe.iloc[:, :-1], self.features, |
| 77 | + {f: i for i, f in enumerate(dataframe.columns[:-1])}, self.shape) |
| 78 | + |
| 79 | + masks = np.array([regions_from_cuts(dataframe, cuts, self.features) == r |
| 80 | + for r in range(np.prod([s + 1 for s in slices]))]) |
| 81 | + self.valid_masks = masks.sum(axis=1) >= 3 |
| 82 | + masks = masks[self.valid_masks] |
| 83 | + |
| 84 | + self.outputs = np.array([output_estimation(dataframe.iloc[:, :-1], dataframe.iloc[:, -1], self._output, |
| 85 | + self.trained_poly, mask) for mask in masks]).T |
| 86 | + |
| 87 | + functions_domains = {k: (v[0], v[1], fuzzy_labels(len(v[0]))) for k, v in self.functions_domains.items()} |
| 88 | + return "\n".join(generate_fuzzy_rules({k: v[2] for k, v in functions_domains.items()}, self.outputs, |
| 89 | + dataframe.columns[:-1], self.valid_masks)) |
| 90 | + |
| 91 | + def show_membership_functions(self): |
| 92 | + functions_domains = {k: (v[0], v[1], fuzzy_labels(len(v[0]))) for k, v in self.functions_domains.items()} |
| 93 | + plot_membership(functions_domains) |
| 94 | + |
| 95 | + def predict(self, dataframe: pd.DataFrame) -> Iterable: |
| 96 | + activations = np.array([get_activations(x, self.functions_domains, self.valid_masks) |
| 97 | + for _, x in dataframe.iterrows()]) |
| 98 | + |
| 99 | + if self._output == Target.CLASSIFICATION: |
| 100 | + classes, idx = np.unique(self.outputs, return_inverse=True) |
| 101 | + pred = classes[np.argmax(np.vstack([activations[:, idx == i].sum(axis=1) for i, c in enumerate(classes)]), |
| 102 | + axis=0)] |
| 103 | + else: |
| 104 | + outputs = self.outputs if self._output == Target.CONSTANT else \ |
| 105 | + np.vstack([lr.predict(self.trained_poly.fit_transform(dataframe)) for lr in self.outputs]).T |
| 106 | + pred = (outputs * activations).sum(axis=1) |
| 107 | + return np.array(pred) |
| 108 | + |
| 109 | + @property |
| 110 | + def n_rules(self): |
| 111 | + return len(self.outputs) |
0 commit comments