From 857049170fa54f00ce8c73e2d6a10cc7bf5daccf Mon Sep 17 00:00:00 2001 From: John Wesley Hostetter Date: Thu, 15 Aug 2024 13:11:06 -0400 Subject: [PATCH] Begin uploading code --- examples/__init__.py | 0 examples/discrete/__init__.py | 0 examples/discrete/age.py | 69 ++ examples/discrete/student.py | 119 +++ pyproject.toml | 43 ++ requirements.txt | 7 + src/fuzzy/__init__.py | 0 src/fuzzy/relations/__init__.py | 0 src/fuzzy/relations/continuous/__init__.py | 0 src/fuzzy/relations/continuous/aggregation.py | 77 ++ src/fuzzy/relations/continuous/tnorm.py | 68 ++ src/fuzzy/relations/discrete/__init__.py | 0 src/fuzzy/relations/discrete/complement.py | 32 + src/fuzzy/relations/discrete/extension.py | 203 ++++++ src/fuzzy/relations/discrete/snorm.py | 28 + src/fuzzy/relations/discrete/tnorm.py | 28 + src/fuzzy/sets/__init__.py | 0 src/fuzzy/sets/continuous/__init__.py | 0 src/fuzzy/sets/continuous/abstract.py | 680 ++++++++++++++++++ src/fuzzy/sets/continuous/group.py | 466 ++++++++++++ src/fuzzy/sets/continuous/impl.py | 450 ++++++++++++ src/fuzzy/sets/continuous/membership.py | 39 + src/fuzzy/sets/continuous/utils.py | 99 +++ src/fuzzy/sets/discrete.py | 252 +++++++ tests/__init__.py | 0 tests/test_relations/__init__.py | 0 tests/test_relations/test_aggregation.py | 226 ++++++ tests/test_relations/test_extension.py | 73 ++ tests/test_relations/test_tnorms.py | 94 +++ tests/test_sets/__init__.py | 0 tests/test_sets/continuous/__init__.py | 0 tests/test_sets/continuous/impl/__init__.py | 0 tests/test_sets/continuous/impl/common.py | 21 + .../continuous/impl/test_gaussian.py | 389 ++++++++++ .../continuous/impl/test_logistic.py | 61 ++ .../continuous/impl/test_triangular.py | 273 +++++++ tests/test_sets/continuous/test_continuous.py | 110 +++ tests/test_sets/continuous/test_group.py | 73 ++ tests/test_sets/test_discrete.py | 148 ++++ 39 files changed, 4128 insertions(+) create mode 100644 examples/__init__.py create mode 100644 examples/discrete/__init__.py create mode 100644 examples/discrete/age.py create mode 100644 examples/discrete/student.py create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 src/fuzzy/__init__.py create mode 100644 src/fuzzy/relations/__init__.py create mode 100644 src/fuzzy/relations/continuous/__init__.py create mode 100644 src/fuzzy/relations/continuous/aggregation.py create mode 100644 src/fuzzy/relations/continuous/tnorm.py create mode 100644 src/fuzzy/relations/discrete/__init__.py create mode 100644 src/fuzzy/relations/discrete/complement.py create mode 100644 src/fuzzy/relations/discrete/extension.py create mode 100644 src/fuzzy/relations/discrete/snorm.py create mode 100644 src/fuzzy/relations/discrete/tnorm.py create mode 100644 src/fuzzy/sets/__init__.py create mode 100644 src/fuzzy/sets/continuous/__init__.py create mode 100644 src/fuzzy/sets/continuous/abstract.py create mode 100644 src/fuzzy/sets/continuous/group.py create mode 100644 src/fuzzy/sets/continuous/impl.py create mode 100644 src/fuzzy/sets/continuous/membership.py create mode 100644 src/fuzzy/sets/continuous/utils.py create mode 100644 src/fuzzy/sets/discrete.py create mode 100644 tests/__init__.py create mode 100644 tests/test_relations/__init__.py create mode 100644 tests/test_relations/test_aggregation.py create mode 100644 tests/test_relations/test_extension.py create mode 100644 tests/test_relations/test_tnorms.py create mode 100644 tests/test_sets/__init__.py create mode 100644 tests/test_sets/continuous/__init__.py create mode 100644 tests/test_sets/continuous/impl/__init__.py create mode 100644 tests/test_sets/continuous/impl/common.py create mode 100644 tests/test_sets/continuous/impl/test_gaussian.py create mode 100644 tests/test_sets/continuous/impl/test_logistic.py create mode 100644 tests/test_sets/continuous/impl/test_triangular.py create mode 100644 tests/test_sets/continuous/test_continuous.py create mode 100644 tests/test_sets/continuous/test_group.py create mode 100644 tests/test_sets/test_discrete.py diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/discrete/__init__.py b/examples/discrete/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/discrete/age.py b/examples/discrete/age.py new file mode 100644 index 0000000..90ed30e --- /dev/null +++ b/examples/discrete/age.py @@ -0,0 +1,69 @@ +""" +Demo of working with discrete fuzzy sets for a toy task regarding age. +""" + +from sympy import Symbol, Interval, oo # oo is infinity + +from fuzzy.relations.discrete.snorm import StandardUnion +from fuzzy.relations.discrete.tnorm import StandardIntersection +from fuzzy.relations.discrete.complement import standard_complement +from fuzzy.sets.discrete import DiscreteFuzzySet, FuzzyVariable + + +def a_1(): + """ + A sample construction of a Fuzzy Set called 'A1'. + """ + formulas = [] + element = Symbol("x") + formulas.append((1, Interval.Lopen(-oo, 20))) + formulas.append(((35 - element) / 15, Interval.open(20, 35))) + formulas.append((0, Interval.Ropen(35, oo))) + return DiscreteFuzzySet(formulas, "A1") + + +def a_2(): + """ + A sample construction of a Fuzzy Set called 'A2'. + """ + formulas = [] + element = Symbol("x") + formulas.append((0, Interval.Lopen(-oo, 20))) + formulas.append(((element - 20) / 15, Interval.open(20, 35))) + formulas.append((1, Interval(35, 45))) + formulas.append(((60 - element) / 15, Interval.open(45, 60))) + formulas.append((0, Interval.Ropen(60, oo))) + return DiscreteFuzzySet(formulas, "A2") + + +def a_3(): + """ + A sample construction of a Fuzzy Set called 'A3'. + """ + formulas = [] + element = Symbol("x") + formulas.append((0, Interval.Lopen(-oo, 45))) + formulas.append(((element - 45) / 15, Interval.open(45, 60))) + formulas.append((1, Interval.Ropen(60, oo))) + return DiscreteFuzzySet(formulas, "A3") + + +a1 = a_1() +a2 = a_2() +a3 = a_3() + +FuzzyVariable([a1, a2, a3], "Age").plot(0, 80) +b = StandardIntersection([a1, a2], "B") +b.plot(0, 80) +c = StandardIntersection([a2, a3], "C") +c.plot(0, 80) +StandardUnion([b, c], "B Union C").plot(0, 80) +standard_complement(a1) +a1.plot(0, 80) +standard_complement(a3) +a3.plot(0, 80) +StandardIntersection([a1, a3], "Not(A1) Intersection Not(A3)").plot(0, 80) +standard_complement(a1) +standard_complement(a3) +# doesn't work yet +# StandardComplement(StandardUnion([b, c], 'Not (B Union C)')).graph(0, 80) diff --git a/examples/discrete/student.py b/examples/discrete/student.py new file mode 100644 index 0000000..875c090 --- /dev/null +++ b/examples/discrete/student.py @@ -0,0 +1,119 @@ +""" +Demo of working with discrete fuzzy sets for a toy task regarding knowledge of material. +""" + +from sympy import Symbol, Interval, oo # oo is infinity + +from fuzzy.relations.discrete.tnorm import StandardIntersection +from fuzzy.relations.discrete.extension import AlphaCut, SpecialFuzzySet +from fuzzy.sets.discrete import DiscreteFuzzySet, FuzzyVariable + + +# https://www-sciencedirect-com.prox.lib.ncsu.edu/science/article/pii/S0957417412008056 + + +def unknown(): + """ + Create a fuzzy set for the linguistic term 'unknown'. + + Returns: + OrdinaryDiscreteFuzzySet + """ + formulas = [] + element = Symbol("x") + formulas.append((1, Interval.Lopen(-oo, 55))) + formulas.append((1 - (element - 55) / 5, Interval.open(55, 60))) + formulas.append((0, Interval.Ropen(60, oo))) + return DiscreteFuzzySet(formulas, "Unknown") + + +def known(): + """ + Create a fuzzy set for the linguistic term 'known'. + + Returns: + OrdinaryDiscreteFuzzySet + """ + formulas = [] + element = Symbol("x") + formulas.append(((element - 70) / 5, Interval.open(70, 75))) + formulas.append((1, Interval(75, 85))) + formulas.append((1 - (element - 85) / 5, Interval.open(85, 90))) + formulas.append((0, Interval.Lopen(-oo, 70))) + formulas.append((0, Interval.Ropen(90, oo))) + return DiscreteFuzzySet(formulas, "Known") + + +def unsatisfactory_unknown(): + """ + Create a fuzzy set for the linguistic term 'unsatisfactory unknown'. + + Returns: + OrdinaryDiscreteFuzzySet + """ + formulas = [] + element = Symbol("x") + formulas.append(((element - 55) / 5, Interval.open(55, 60))) + formulas.append((1, Interval(60, 70))) + formulas.append((1 - (element - 70) / 5, Interval.open(70, 75))) + formulas.append((0, Interval.Lopen(-oo, 55))) + formulas.append((0, Interval.Ropen(75, oo))) + return DiscreteFuzzySet(formulas, "Unsatisfactory Unknown") + + +def learned(): + """ + Create a fuzzy set for the linguistic term 'learned'. + + Returns: + OrdinaryDiscreteFuzzySet + """ + formulas = [] + element = Symbol("x") + formulas.append(((element - 85) / 5, Interval.open(85, 90))) + formulas.append((1, Interval(90, 100))) + formulas.append((0, Interval.Lopen(-oo, 85))) + return DiscreteFuzzySet(formulas, "Learned") + + +if __name__ == "__main__": + terms = [unknown(), known(), unsatisfactory_unknown(), learned()] + + fuzzy_variable = FuzzyVariable(fuzzy_sets=terms, name="Student Knowledge") + fig, _ = fuzzy_variable.plot(samples=150) + fig.show() + + # --- DEMO --- Classify 'element' + + example_element_membership = fuzzy_variable.degree(element=73) + + alpha_cut = AlphaCut(known(), 0.6, "AlphaCut") + fig, _ = alpha_cut.plot(samples=250) + fig.show() + special_fuzzy_set = SpecialFuzzySet(known(), 0.5, "Special") + fig, _ = special_fuzzy_set.plot() + fig.show() + + alpha_cuts = [] + idx, MAX_HEIGHT, IDX_OF_MAX = 0, 0, 0 + for idx, membership_to_fuzzy_term in enumerate(example_element_membership): + if example_element_membership[idx] > 0: + special_fuzzy_set = SpecialFuzzySet( + terms[idx], membership_to_fuzzy_term, terms[idx].name + ) + alpha_cuts.append( + StandardIntersection( + [special_fuzzy_set, terms[idx]], name=f"A{idx + 1}" + ) + ) + + # maximum membership principle + if membership_to_fuzzy_term > MAX_HEIGHT: + MAX_HEIGHT, IDX_OF_MAX = membership_to_fuzzy_term, idx + + confluence: FuzzyVariable = FuzzyVariable(fuzzy_sets=alpha_cuts, name="Confluence") + fig, _ = confluence.plot() + fig.show() + + # maximum membership principle + print(f"Maximum Membership Principle: {terms[idx].name}") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5da5ba9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "fuzzy-theory" +version = "0.0.1" +authors = [ + { name="John Wesley Hostetter", email="jhostetter16@gmail.com" }, +] +description = "The fuzzy-theory library provides a PyTorch interface to fuzzy set theory and fuzzy logic operations." +readme = "README.md" +requires-python = ">=3.9" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +Homepage = "https://github.com/johnHostetter/fuzzy-theory" +Issues = "https://github.com/johnHostetter/fuzzy-theory/issues" + +[tool.hatch.build] +include = [ + "src/fuzzy/**", + "README.md", + "LICENSE", +] +exclude = [ + "tests/**", + "*.pyc", + "*.pyo", + ".git/**", + "build/**", + "dist/**", + ".venv/**", +] +# Ignore VCS +ignore = ["*.git", "*.hg", ".git/**", ".hg/**"] + +[tool.hatch.build.targets.wheel] +packages = ["src/fuzzy"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e862889 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +numpy==1.26.4 # new is 2.0.1 (update when possible - tests fail otherwise) +sympy==1.13.2 # for torch +torch==2.4.0 +torchquad==0.4.0 +matplotlib==3.9.1 # for torchquad +SciencePlots==2.1.1 # for scientific publication plots +natsort==8.4.0 # for natural sorting diff --git a/src/fuzzy/__init__.py b/src/fuzzy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fuzzy/relations/__init__.py b/src/fuzzy/relations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fuzzy/relations/continuous/__init__.py b/src/fuzzy/relations/continuous/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fuzzy/relations/continuous/aggregation.py b/src/fuzzy/relations/continuous/aggregation.py new file mode 100644 index 0000000..553fdb4 --- /dev/null +++ b/src/fuzzy/relations/continuous/aggregation.py @@ -0,0 +1,77 @@ +""" +Implements aggregation operators in fuzzy theory. +""" + +import torch + + +class OrderedWeightedAveraging(torch.nn.Module): + """ + Yager's On Ordered Weighted Averaging Aggregation Operators in + Multicriteria Decisionmaking (1988) + + An operator that lies between the 'anding' or the 'oring' of multiple criteria. + The weight vector allows us to easily adjust the degree of 'anding' and 'oring' + implicit in the aggregation. + """ + + def __init__(self, in_features, weights): + super().__init__() + self.in_features = in_features + if self.in_features != len(weights): + raise AttributeError( + "The number of input features expected in the Ordered Weighted Averaging operator " + "is expected to equal the number of elements in the weight vector." + ) + with torch.no_grad(): + if weights.sum() == 1.0: + self.weights = torch.nn.parameter.Parameter(torch.abs(weights)) + else: + raise AttributeError( + "The weight vector of the Ordered Weighted Averaging operator must sum to 1.0." + ) + + def orness(self): + """ + A degree of 1 means the OWA operator is the 'or' operator, + and this occurs when the first element of the weight vector is equal to 1 + and all other elements in the weight vector are zero. + + Returns: + The degree to which the Ordered Weighted Averaging operator is an 'or' operator. + """ + return (1 / (self.in_features - 1)) * torch.tensor( + [ + (self.in_features - i) * self.weights[i - 1] + for i in range(1, self.in_features + 1) + ] + ).sum() + + def dispersion(self): + """ + The measure of dispersion; essentially, it is a measure of entropy that is related to the + Shannon information concept. The more disperse the weight vector, the more information + is being used in the aggregation of the aggregate value. + + Returns: + The amount of dispersion in the weight vector. + """ + # there is exactly one entry where it is equal to one + if len(torch.where(self.weights == 1.0)[0]) == 1: + return torch.zeros(1) + return -1 * (self.weights * torch.log(self.weights)).sum() + + def forward(self, input_observation): + """ + Applies the Ordered Weighted Averaging operator. First, it will sort the argument + in descending order, then multiply by the weight vector, and finally sum over the entries. + + Args: + input_observation: Argument vector, unordered. + + Returns: + The aggregation of the ordered argument vector with the weight vector. + """ + # namedtuple with 'values' and 'indices' properties + ordered_argument_vector = torch.sort(input_observation, descending=True) + return (self.weights * ordered_argument_vector.values).sum() diff --git a/src/fuzzy/relations/continuous/tnorm.py b/src/fuzzy/relations/continuous/tnorm.py new file mode 100644 index 0000000..e990283 --- /dev/null +++ b/src/fuzzy/relations/continuous/tnorm.py @@ -0,0 +1,68 @@ +""" +Implements the t-norm fuzzy relations. +""" + +from enum import Enum + +import torch + + +class TNorm(Enum): + """ + Enumerates the types of t-norms. + """ + + PRODUCT = "product" # i.e., algebraic product + MINIMUM = "minimum" + ACZEL_ALSINA = "aczel_alsina" # not yet implemented + SOFTMAX_SUM = "softmax_sum" + SOFTMAX_MEAN = "softmax_mean" + LUKASIEWICZ = "generalized_lukasiewicz" + # the following are to be implemented + DRASTIC = "drastic" + NILPOTENT = "nilpotent" + HAMACHER = "hamacher" + EINSTEIN = "einstein" + YAGER = "yager" + DUBOIS = "dubois" + DIF = "dif" + + +class AlgebraicProduct(torch.nn.Module): # TODO: remove this class + """ + Implementation of the Algebraic Product t-norm (Fuzzy AND). + """ + + def __init__(self, in_features=None, importance=None): + """ + Initialization. + INPUT: + - in_features: shape of the input + - centers: trainable parameter + - sigmas: trainable parameter + importance is initialized to a one vector by default + """ + super().__init__() + self.in_features = in_features + + # initialize antecedent importance + if importance is None: + self.importance = torch.nn.parameter.Parameter(torch.tensor(1.0)) + self.importance.requires_grad = False + else: + if not isinstance(importance, torch.Tensor): + importance = torch.Tensor(importance) + self.importance = torch.nn.parameter.Parameter( + torch.abs(importance) + ) # importance can only be [0, 1] + self.importance.requires_grad = True + + def forward(self, elements): + """ + Forward pass of the function. + Applies the function to the input elementwise. + """ + self.importance = torch.nn.parameter.Parameter( + torch.abs(self.importance) + ) # importance can only be [0, 1] + return torch.prod(torch.mul(elements, self.importance)) diff --git a/src/fuzzy/relations/discrete/__init__.py b/src/fuzzy/relations/discrete/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fuzzy/relations/discrete/complement.py b/src/fuzzy/relations/discrete/complement.py new file mode 100644 index 0000000..e2a7077 --- /dev/null +++ b/src/fuzzy/relations/discrete/complement.py @@ -0,0 +1,32 @@ +""" +Implements the various fuzzy complement definitions for the discrete fuzzy sets. +""" + +from fuzzy.sets.discrete import BaseDiscreteFuzzySet + + +def standard_complement(fuzzy_set): + """ + Obtains the standard complement of a fuzzy set as defined by Lotfi A. Zadeh. + + Returns True if successful, else returns False. + + Parameters + ---------- + fuzzy_set : 'OrdinaryDiscreteFuzzySet' + + Returns + ------- + success : 'bool' + """ + + if isinstance(fuzzy_set, BaseDiscreteFuzzySet): + formulas = [] + for formula in fuzzy_set.formulas: + formula = list(formula) + formula[0] = 1 - formula[0] + formula = tuple(formula) + formulas.append(formula) + fuzzy_set.formulas = formulas + return True + return False diff --git a/src/fuzzy/relations/discrete/extension.py b/src/fuzzy/relations/discrete/extension.py new file mode 100644 index 0000000..2a1c9ce --- /dev/null +++ b/src/fuzzy/relations/discrete/extension.py @@ -0,0 +1,203 @@ +""" +Implementation of the special fuzzy set and the alpha cut operation for discrete fuzzy sets. +""" + +from typing import List, Union + +import sympy + +from fuzzy.sets.discrete import BaseDiscreteFuzzySet, DiscreteFuzzySet + + +class SpecialFuzzySet(BaseDiscreteFuzzySet): + """ + The special fuzzy set membership function for a given element x in the universe of + discourse X, is defined as the alpha value multiplied by the element x's degree of + membership within the fuzzy set's alpha cut. + """ + + def __init__(self, fuzzyset, alpha, name=None): + """ + Parameters + ---------- + fuzzyset : 'OrdinaryDiscreteFuzzySet' + An ordinary fuzzy set to retrieve the special fuzzy set given the alpha. + alpha : 'float' + The alpha value that elements' membership degree must exceed or be equal to. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy sets in the same space. + """ + alpha_cut = AlphaCut(fuzzyset, alpha) + sympy.Interval = alpha_cut.formulas[0][1] + for formula in alpha_cut.formulas[1:]: + sympy.Interval = sympy.Union(sympy.Interval, formula[1]) + BaseDiscreteFuzzySet.__init__( + self, formulas=[(alpha, sympy.Interval)], name=name + ) + self.alpha = alpha + + def degree(self, element: float) -> float: + """ + Calculates degree of membership for the provided 'element' where element is a(n) int/float. + + Parameters + ---------- + element : 'float' + The element is from the universe of discourse X. + + Returns + ------- + membership : 'float' + The degree of membership for the element. + """ + result = self.fetch(element) + if result is not None: + formula = result[0] + else: + return 0 + try: + membership = float(formula.subs(sympy.Symbol("x"), element)) + except AttributeError: + membership = formula + return membership + + def height(self) -> float: + """ + Calculates the height of the special fuzzy set. + + Returns + ------- + height : 'float' + The height, or supremum, of the Special Fuzzy Set. + """ + return self.alpha + + +class AlphaCut(BaseDiscreteFuzzySet): + """ + The alpha cut of a fuzzy set yields a crisp set. + """ + + def __init__(self, fuzzy_set, alpha: float, name: Union[str, None] = None): + """ + Parameters + ---------- + fuzzy_set : 'list' + A list of 2-tuples. The first element in the tuple at index 0 is the formula + equal to f(x) and the second element in the tuple at index 1 is the sympy.Interval + where the formula in the tuple is valid. + alpha : 'float' + The alpha value that elements' membership degree must exceed or be equal to. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy sets in the same space. + """ + self.alpha = alpha + formulas = [] + for formula in fuzzy_set.formulas: + if isinstance(formula[0], sympy.Expr): + # x = inversefunc( + # lambdify(sympy.Symbol("x"), formula[0], "numpy"), y_values=alpha + # ) + interval_1 = sympy.solve(alpha <= formula[0]) + interval_2 = sympy.solve(formula[0] <= alpha) + x_interval = sympy.Intersection( + interval_1.as_set(), interval_2.as_set() + ) + element = x_interval.atoms().pop() + if formula[1].contains(element): + # the element is within the sympy.Interval, now check the direction + membership = formula[0].subs(sympy.Symbol("x"), element - (1e-6)) + if membership >= alpha: + # then all values less than or equal to element are valid + if formula[1].left_open: + sympy.Interval = sympy.Interval.Lopen( + formula[1].inf, element + ) + else: + sympy.Interval = sympy.Interval(formula[1].inf, element) + else: + # then all values greater than or equal to element are valid + if formula[1].right_open: + sympy.Interval = sympy.Interval.Ropen( + element, formula[1].sup + ) + else: + sympy.Interval = sympy.Interval(element, formula[1].sup) + formula = list(formula) + formula[1] = sympy.Interval + formula = tuple(formula) + formulas.append(formula) + else: + if formula[0] >= alpha: + formulas.append(formula) + BaseDiscreteFuzzySet.__init__(self, formulas=formulas, name=name) + + def degree(self, element): + """ + Calculates degree of membership for the provided 'element' where element is a(n) int/float. + + Parameters + ---------- + element : 'float' + The element is from the universe of discourse X. + + Returns + ------- + membership : 'float' + The degree of membership for the element. + """ + result = self.fetch(element) + if result is not None: + formula = result[0] + else: + return 0 + try: + membership = float(formula.subs(sympy.Symbol("x"), element)) + except AttributeError: + membership = formula + return min(self.alpha, membership) + + +class DiscreteFuzzyRelation(BaseDiscreteFuzzySet): + """ + A fuzzy relation is a fuzzy set of ordered pairs. This class is a subclass of DiscreteFuzzySet. + The DiscreteFuzzyRelation class is used to represent a fuzzy relation between two universes of + discourse. More specifically, the DiscreteFuzzyRelation class is used to represent a fuzzy + relation such as t-norm or s-norm discrete fuzzy relations. + """ + + def __init__( + self, formulas: List[DiscreteFuzzySet], name=None, mode: callable = min + ): + """ + Parameters + ---------- + formulas : 'list' + A list of elements each of type OrdinaryDiscreteFuzzySet. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy fets in the same space. + """ + BaseDiscreteFuzzySet.__init__(self, formulas=formulas, name=name) + self.mode = mode + + def degree(self, element: Union[int, float]): + """ + Calculates the degree of membership for the provided element value + where element is a(n) int/float. + + Args: + element: The element is from the universe of discourse X. + + Returns: + The degree of membership for the element. + """ + degrees = [] + for formula in self.formulas: + degrees.append(formula.degree(element)) + return self.mode(degrees) diff --git a/src/fuzzy/relations/discrete/snorm.py b/src/fuzzy/relations/discrete/snorm.py new file mode 100644 index 0000000..f5be76c --- /dev/null +++ b/src/fuzzy/relations/discrete/snorm.py @@ -0,0 +1,28 @@ +""" +Implements the s-norm fuzzy relations. +""" + +from typing import List + +from fuzzy.sets.discrete import DiscreteFuzzySet +from fuzzy.relations.discrete.extension import DiscreteFuzzyRelation + + +class StandardUnion(DiscreteFuzzyRelation): + """ + A standard union of one or more ordinary fuzzy sets. + """ + + def __init__(self, fuzzy_sets: List[DiscreteFuzzySet], name=None): + """ + Parameters + ---------- + fuzzy_sets : 'list' + A list of elements each of type OrdinaryDiscreteFuzzySet. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy fets in the same space. + """ + DiscreteFuzzyRelation.__init__(self, formulas=fuzzy_sets, name=name, mode=max) + self.fuzzy_sets = fuzzy_sets diff --git a/src/fuzzy/relations/discrete/tnorm.py b/src/fuzzy/relations/discrete/tnorm.py new file mode 100644 index 0000000..e50168b --- /dev/null +++ b/src/fuzzy/relations/discrete/tnorm.py @@ -0,0 +1,28 @@ +""" +Implements the t-norm fuzzy relations. +""" + +from typing import List + +from fuzzy.sets.discrete import DiscreteFuzzySet +from fuzzy.relations.discrete.extension import DiscreteFuzzyRelation + + +class StandardIntersection(DiscreteFuzzyRelation): + """ + A standard intersection of one or more ordinary fuzzy sets. + """ + + def __init__(self, fuzzy_sets: List[DiscreteFuzzySet], name=None): + """ + Parameters + ---------- + fuzzy_sets : 'list' + A list of elements each of type OrdinaryDiscreteFuzzySet. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy fets in the same space. + """ + DiscreteFuzzyRelation.__init__(self, formulas=fuzzy_sets, name=name, mode=min) + self.fuzzy_sets = fuzzy_sets diff --git a/src/fuzzy/sets/__init__.py b/src/fuzzy/sets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fuzzy/sets/continuous/__init__.py b/src/fuzzy/sets/continuous/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fuzzy/sets/continuous/abstract.py b/src/fuzzy/sets/continuous/abstract.py new file mode 100644 index 0000000..faafbec --- /dev/null +++ b/src/fuzzy/sets/continuous/abstract.py @@ -0,0 +1,680 @@ +""" +Implements an abstract class called ContinuousFuzzySet using PyTorch. All fuzzy sets defined over +a continuous domain are derived from this class. Further, the Membership class is defined within, +which contains a helpful interface understanding membership degrees. +""" + +import abc +import inspect +from pathlib import Path +from abc import abstractmethod +from typing import List, NoReturn, Union, MutableMapping, Any, Type, Tuple + +import sympy +import torch +import torchquad +import numpy as np +import scienceplots +import matplotlib as mpl +import matplotlib.pyplot as plt +from torchquad.utils.set_up_backend import set_up_backend + +from .utils import all_subclasses +from .membership import Membership + + +class ContinuousFuzzySet(torch.nn.Module, metaclass=abc.ABCMeta): + """ + A generic and abstract torch.nn.Module class that implements continuous fuzzy sets. + + This is the most important Python class regarding fuzzy sets within this Soft Computing library. + + Defined here are most of the common methods made available to all fuzzy sets. Fuzzy sets that + will later be used in other features such as neuro-fuzzy networks are expected to abide by the + conventions outlined within. For example, parameters 'centers' and 'widths' are often expected, + but inference engines (should) only rely on the fuzzy set membership degrees. + + However, for convenience, some aspects of the SelfOrganize code may search for vertices that + have attributes of type 'ContinuousFuzzySet'. Thus, if it is pertinent that a vertex within + the KnowledgeBase is recognized as a fuzzy set, it is very likely one might be interested in + inheriting or extending from ContinuousFuzzySet. + """ + + def __init__( + self, + centers: np.ndarray, + widths: np.ndarray, + device: torch.device, + use_sparse_tensor=False, + labels: List[str] = None, + ): + super().__init__() + self.device = device + if not isinstance(centers, np.ndarray): + # ensure that the centers are a numpy array (done for consistency) + # specifically, we want to internally control the dtype and device of the centers + raise ValueError( + f"The centers of a ContinuousFuzzySet must be a numpy array, " + f"but got {type(centers)}" + ) + if not isinstance(widths, np.ndarray): + # ensure that the widths are a numpy array (done for consistency) + # specifically, we want to internally control the dtype and device of the widths + raise ValueError( + f"The widths of a ContinuousFuzzySet must be a numpy array, but got {type(widths)}" + ) + + if centers.ndim != widths.ndim: + raise ValueError( + f"The number of dimensions for the centers ({centers.ndim}) and widths " + f"({widths.ndim}) must be the same." + ) + + if centers.ndim == 0 or widths.ndim == 0: + raise ValueError( + f"The centers and widths of a ContinuousFuzzySet must have at least one dimension. " + f"Centers has {centers.ndim} dimensions and widths has {widths.ndim} dimensions." + ) + + if centers.ndim == 1: + # assuming that the array is a single linguistic variable + centers = centers[None, :] + if widths.ndim == 1: + # assuming that the array is a single linguistic variable + widths = widths[None, :] + + # avoid allocating new memory for the centers and widths + # use torch.float32 to save memory and speed up computations + self._centers = torch.nn.ParameterList([self.make_parameter(centers)]) + self._widths = torch.nn.ParameterList([self.make_parameter(widths)]) + self.use_sparse_tensor = use_sparse_tensor + # self._mask = torch.nn.ParameterList( + # [ + # self.make_mask(widths) + # ] + # ) + self._mask = [self.make_mask(widths)] + self.labels = labels # TODO: possibly remove this attribute + + def to(self, *args, **kwargs): + """ + Move the ContinuousFuzzySet to a new device. + + Returns: + None + """ + # Call the parent class's `to` method to handle parameters and submodules + super().to(*args, **kwargs) + + # special handling for the non-parameter tensors, such as mask + self._mask = [mask.to(*args, **kwargs) for mask in self._mask] + return self + + def make_parameter(self, parameter: np.ndarray) -> torch.nn.Parameter: + """ + Create a torch.nn.Parameter from a numpy array, with the appropriate dtype and device. + + Args: + parameter: The numpy array to convert to a torch.nn.Parameter (e.g., centers or widths). + + Returns: + A torch.nn.Parameter object. + """ + return torch.nn.Parameter( + torch.as_tensor(parameter, dtype=torch.float32, device=self.device), + requires_grad=True, # explicitly set to True + ) + + def make_mask(self, widths: np.ndarray) -> torch.Tensor: + """ + Create a mask for the fuzzy set, where the mask is used to filter out fuzzy sets that are + not real. This is particularly useful when the fuzzy set is not fully defined, and some + fuzzy sets are missing. The mask is a binary tensor that is used to filter out fuzzy sets + that are not real. If the mask is 0, then the fuzzy set is not real; otherwise, it is real. + + Args: + widths: The widths of the fuzzy set. + + Returns: + A torch.Tensor object. + """ + return torch.as_tensor(widths > 0.0, dtype=torch.int8, device=self.device) + # return torch.nn.Parameter( + # torch.as_tensor(widths > 0.0, dtype=torch.int8, device=self.device), + # requires_grad=False, # explicitly set to False (mask is not trainable) + # ) + + @classmethod + def create( + cls, + number_of_variables: int, + number_of_terms: int, + device: torch.device, + **kwargs, + ) -> Union[NoReturn, "ContinuousFuzzySet"]: + """ + Create a fuzzy set with the given number of variables and terms, where each variable + has the same number of terms. For example, if we have two variables, then we might have + three terms for each variable, such as "low", "medium", and "high". This would result in + a total of nine fuzzy sets. The centers and widths are initialized randomly. + + Args: + number_of_variables: The number of variables. + number_of_terms: The number of terms. + device: The device to use. + + Returns: + A ContinuousFuzzySet object, or a NotImplementedError if the method is not implemented. + """ + if inspect.isabstract(cls): + # this error is thrown if the class is abstract, such as ContinuousFuzzySet, but + # the method is not implemented (e.g., self.calculate_membership) + raise NotImplementedError( + "The ContinuousFuzzySet has no defined membership function. Please create a class " + "and inherit from ContinuousFuzzySet, or use a predefined class, such as Gaussian." + ) + + if isinstance(device, str): + device = torch.device(device) + + centers: np.ndarray = np.random.randn(number_of_variables, number_of_terms) + widths: np.ndarray = np.random.randn(number_of_variables, number_of_terms) + return cls(centers=centers, widths=widths, device=device, **kwargs) + + def __eq__(self, other: Any) -> bool: + """ + Check if the fuzzy set is equal to another fuzzy set. + + Args: + other: The other fuzzy set to compare to. + + Returns: + True if the fuzzy sets are equal, False otherwise. + """ + return ( + isinstance(other, type(self)) + and torch.equal(self.get_centers(), other.get_centers()) + and torch.equal(self.get_widths(), other.get_widths()) + ) + + def __hash__(self): + """ + Hash the fuzzy set. + + Returns: + The hash of the fuzzy set. + """ + return hash((type(self), self.get_centers(), self.get_widths(), self.labels)) + + def get_centers(self) -> torch.Tensor: + """ + Get the concatenated centers of the fuzzy set from its corresponding ParameterList. + + Returns: + The concatenated centers of the fuzzy set. + """ + return torch.cat(list(self._centers), dim=-1) + + def get_widths(self) -> torch.Tensor: + """ + Get the concatenated widths of the fuzzy set from its corresponding ParameterList. + + Returns: + The concatenated widths of the fuzzy set. + """ + return torch.cat(list(self._widths), dim=-1) + + def get_mask(self) -> torch.Tensor: + """ + Get the concatenated mask of the fuzzy set from its corresponding ParameterList. + + Returns: + The concatenated mask of the fuzzy set. + """ + return torch.cat(list(self._mask), dim=-1) + + @classmethod + def render_formula(cls) -> sympy.Expr: + """ + Render of the fuzzy set's membership function. + + Note: This is more beneficial for Python Console or Jupyter Notebook usage. + + Returns: + Render of the fuzzy set's membership function. + """ + sympy.init_printing(use_unicode=True) + return cls.sympy_formula() + + @classmethod + def latex_formula(cls) -> str: + """ + String LaTeX representation of the fuzzy set's membership function. + + Note: This is more beneficial for animations or LaTeX documents. + + Returns: + The LaTeX representation of the fuzzy set's membership function. + """ + return sympy.latex(cls.sympy_formula()) + + def save(self, path: Path): + """ + Save the fuzzy set to a file. + + Note: This does not preserve the ParameterList structure, but rather concatenates the + parameters into a single tensor, which is then saved to a file. + + Returns: + None + """ + state_dict: MutableMapping = self.state_dict() + state_dict["centers"] = self.get_centers() # concatenate the centers + state_dict["widths"] = self.get_widths() # concatenate the widths + state_dict["mask"] = self.get_mask() # currently not used + state_dict["labels"] = self.labels + state_dict["class_name"] = self.__class__.__name__ + if ".pt" not in path.name and ".pth" not in path.name: + raise ValueError( + f"The path to save the fuzzy set must have a file extension of '.pt', " + f"but got {path.name}" + ) + if ".pth" in path.name: + raise ValueError( + f"The path to save the fuzzy set must have a file extension of '.pt', " + f"but got {path.name}. Please change the file extension to '.pt' as it is not " + f"recommended to use '.pth' for PyTorch models, as it conflicts with Python path" + f"configuration files." + ) + torch.save(state_dict, path) + return state_dict + + @staticmethod + @torch.jit.ignore + def get_subclass(class_name: str) -> Union["ContinuousFuzzySet"]: + """ + Get the subclass of ContinuousFuzzySet with the given class name. + + Args: + class_name: The name of the subclass of ContinuousFuzzySet. + + Returns: + A subclass of ContinuousFuzzySet. + """ + fuzzy_set_class = None + for subclass in all_subclasses(ContinuousFuzzySet): + if subclass.__name__ == class_name: + fuzzy_set_class = subclass + break + if fuzzy_set_class is None: + raise ValueError( + f"The fuzzy set class {class_name} was not found in the subclasses of " + f"ContinuousFuzzySet. Please ensure that the fuzzy set class is a subclass of " + f"ContinuousFuzzySet." + ) + return fuzzy_set_class + + @classmethod + def load(cls, path: Path, device: torch.device) -> "ContinuousFuzzySet": + """ + Load the fuzzy set from a file and put it on the specified device. + + Returns: + None + """ + state_dict: MutableMapping = torch.load(path) + centers = state_dict.pop("centers") + widths = state_dict.pop("widths") + labels = state_dict.pop("labels") + class_name = state_dict.pop("class_name") + return cls.get_subclass(class_name)( + centers=centers.cpu().detach().numpy(), + widths=widths.cpu().detach().numpy(), + labels=labels, + device=device, + ) + + def extend(self, centers: torch.Tensor, widths: torch.Tensor, mode: str): + """ + Given additional parameters, centers and widths, extend the existing self.centers and + self.widths, respectively. Additionally, update the necessary backend logic. + + Args: + centers: The centers of new fuzzy sets. + widths: The widths of new fuzzy sets. + + Returns: + None + """ + if mode == "vertical": + method_of_extension: callable = torch.cat + elif mode == "horizontal": + method_of_extension: callable = torch.hstack + else: + raise ValueError( + f"The mode must be either 'horizontal' or 'vertical', but got {mode}" + ) + with torch.no_grad(): + self._centers[0] = torch.nn.Parameter( + method_of_extension([self._centers[0], centers]) + ) + self._widths[0] = torch.nn.Parameter( + method_of_extension([self._widths[0], widths]) + ) + + def area_helper(self, fuzzy_sets) -> List[List[float]]: + """ + Splits the fuzzy set (if representing a fuzzy variable) into individual fuzzy sets (the + fuzzy variable's possible fuzzy terms), and does so recursively until the base case is + reached. Once the base case is reached (i.e., a single fuzzy set), the area under its + curve within the integration_domain is calculated. The result is a + + Args: + fuzzy_sets: The fuzzy set to split into smaller fuzzy sets. + + Returns: + A list of floats. + """ + all_areas: List[List[float]] = [] + for variable_params in zip(fuzzy_sets.get_centers(), fuzzy_sets.get_widths()): + variable_centers, variable_widths = variable_params[0], variable_params[1] + variable_areas = [] + for term_params in zip(variable_centers, variable_widths): + centers, widths = term_params[0].item(), term_params[1].item() + # has to be "cpu" device for torchquad.Simpson to work + fuzzy_set = self.__class__( + centers=np.array([centers]), + widths=np.array([widths]), + device=self.device, + ) + + # Enable GPU support if available and set the floating point precision + set_up_backend("torch", data_type="float32") + + simpson_method = torchquad.Simpson() + area: float = simpson_method.integrate( + fuzzy_set.calculate_membership, + dim=1, + N=101, + integration_domain=[ + [ + fuzzy_set.get_centers().item() + - fuzzy_set.get_widths().item(), + fuzzy_set.get_centers().item() + + fuzzy_set.get_widths().item(), + ] + ], + backend="torch", + ).item() + if fuzzy_set.get_widths().item() <= 0 and area != 0.0: + # if the width of a fuzzy set is negative or zero, it is a special flag that + # the fuzzy set does not exist; thus, the calculated area of a fuzzy set w/ a + # width <= 0 should be zero. However, in the case this does not occur, + # a zero will substitute to be sure that this issue does not affect results + area = 0.0 + variable_areas.append(area) + all_areas.append(variable_areas) + return all_areas + + def area(self) -> torch.Tensor: + """ + Calculate the area beneath the fuzzy curve (i.e., membership function) using torchquad. + + This is a slightly expensive operation, but it is used for approximating the Mamdani fuzzy + inference with arbitrary continuous fuzzy sets. + + Typically, the results will be cached somewhere, so that the area value can be reused. + + Returns: + torch.Tensor + """ + return torch.tensor( + self.area_helper(self), device=self.device, dtype=torch.float32 + ) + + def split_by_variables(self) -> Union[list, List[Type["ContinuousFuzzySet"]]]: + """ + This operation takes the ContinuousFuzzySet and converts it to a list of ContinuousFuzzySet + objects, if applicable. For example, rather than using a single Gaussian object to represent + all Gaussian membership functions in the input space, this function will convert that to a + list of Gaussian objects, where each Gaussian function is defined and restricted to a single + input dimension. This is particularly helpful when modifying along a specific dimension. + + Returns: + A list of ContinuousFuzzySet objects, where the length is equal to the number + of input dimensions. + """ + variables = [] + for centers, widths in zip(self.get_centers(), self.get_widths()): + centers = centers.cpu().detach().tolist() + widths = widths.cpu().detach().tolist() + + # the centers and widths must be trimmed to remove missing fuzzy set placeholders + trimmed_centers, trimmed_widths = [], [] + for center, width in zip(centers, widths): + if width > 0: + # if an input dimension has less fuzzy sets than another, + # then it is possible for the width entry to have '-1' as a + # placeholder indicating so + trimmed_centers.append(center) + trimmed_widths.append(width) + + variables.append( + type(self)( + centers=np.array(trimmed_centers), + widths=np.array(trimmed_widths), + device=self.device, + ) + ) + + return variables + + def plot( + self, output_dir: Path, selected_terms: List[Tuple[int, int]] = None + ) -> None: + """ + Plot the fuzzy set. + + Args: + output_dir: The path to the directory where to save the plot(s). + selected_terms: The terms to highlight in the plot. + + Returns: + None + """ + if selected_terms is None: + selected_terms = [] + + with plt.style.context(["science", "no-latex", "high-contrast"]): + for variable_idx in range(self.get_centers().shape[0]): + _, ax = plt.subplots(1, figsize=(6, 4), dpi=100) + mpl.rcParams["figure.figsize"] = (6, 4) + mpl.rcParams["figure.dpi"] = 100 + mpl.rcParams["savefig.dpi"] = 100 + mpl.rcParams["font.size"] = 20 + mpl.rcParams["legend.fontsize"] = "medium" + mpl.rcParams["figure.titlesize"] = "medium" + mpl.rcParams["lines.linewidth"] = 2 + ax.tick_params(width=2, length=6) + plt.xticks(fontsize=20) + plt.yticks(fontsize=20) + real_centers: List[float] = [ + self.get_centers()[variable_idx, term_idx].item() + for term_idx, mask_value in enumerate(self.get_mask()[variable_idx]) + if mask_value == 1 + ] + real_widths: List[float] = [ + self.get_widths()[variable_idx, term_idx].item() + for term_idx, mask_value in enumerate(self.get_mask()[variable_idx]) + if mask_value == 1 + ] + x_values = torch.linspace( + min(real_centers) - 2 * max(real_widths), + max(real_centers) + 2 * max(real_widths), + steps=1000, + device=self.device, + ) + + if self.get_centers().ndim == 1 or self.get_centers().shape[0] == 1: + x_values = x_values[:, None] + elif self.get_centers().ndim == 2 or self.get_centers().shape[0] > 1: + x_values = x_values[:, None, None] + + memberships: torch.Tensor = self.calculate_membership(x_values) + + if memberships.ndim == 2: + memberships = memberships.unsqueeze( + dim=1 + ) # add a temporary dimension for the variable + + memberships = memberships.cpu().detach().numpy() + x_values = x_values.squeeze().cpu().detach().numpy() + + for term_idx in range(memberships.shape[-1]): + if self.get_mask()[variable_idx, term_idx] == 0: + continue # not a real fuzzy set + y_values = memberships[:, variable_idx, term_idx] + label: str = ( + r"$\mu_{" + + str(variable_idx + 1) + + "," + + str(term_idx + 1) + + "}$" + ) + if (variable_idx, term_idx) in selected_terms: + # edgecolor="#0bafa9" # beautiful with facecolor=None (AAMAS 2023) + plt.fill_between( + x_values, y_values, alpha=0.5, hatch="///", label=label + ) + else: + plt.plot(x_values, y_values, alpha=0.5, label=label) + plt.legend( + bbox_to_anchor=(0.5, -0.2), + loc="upper center", + ncol=len(real_centers), + ) + plt.subplots_adjust(bottom=0.3, wspace=0.33) + output_dir.mkdir(parents=True, exist_ok=True) + plt.savefig(output_dir / f"mu_{variable_idx}.png") + plt.clf() + + @staticmethod + def count_granule_terms(granules: List["ContinuousFuzzySet"]) -> np.ndarray: + """ + Count the number of granules that occur in each dimension. + + Args: + granules: A list of granules, where each granule is a ContinuousFuzzySet object. + + Returns: + A Numpy array with shape (len(granules), ) and the data type is integer. + """ + return np.array( + [ + ( + params.get_centers().size(dim=-1) + if params.get_centers().dim() > 0 + else 0 + ) + for params in granules + ], + dtype=np.int8, + ) + + @staticmethod + def stack( + granules: List["ContinuousFuzzySet"], + ) -> "ContinuousFuzzySet": + """ + Create a condensed and stacked representation of the given granules. + + Args: + granules: A list of granules, where each granule is a ContinuousFuzzySet object. + + Returns: + A ContinuousFuzzySet object. + """ + if list(granules)[0].training: + missing_center, missing_width = 0.0, -1.0 + else: + missing_center = missing_width = torch.nan + + centers = torch.vstack( + [ + ( + torch.nn.functional.pad( + params.get_centers(), + pad=( + 0, + ContinuousFuzzySet.count_granule_terms(granules).max() + - params.get_centers().shape[-1], + ), + mode="constant", + value=missing_center, + ) + if params.get_centers().dim() > 0 + else torch.tensor(missing_center).repeat( + ContinuousFuzzySet.count_granule_terms(granules).max() + ) + ) + for params in granules + ] + ) + widths = torch.vstack( + [ + ( + torch.nn.functional.pad( + params.get_widths(), + pad=( + 0, + ContinuousFuzzySet.count_granule_terms(granules).max() + - params.get_widths().shape[-1], + ), + mode="constant", + value=missing_width, + ) + if params.get_centers().dim() > 0 + else torch.tensor(missing_center).repeat( + ContinuousFuzzySet.count_granule_terms(granules).max() + ) + ) + for params in granules + ] + ) + + # prepare a condensed and stacked representation of the granules + mf_type = type(granules[0]) + return mf_type( + centers=centers.cpu().detach().numpy(), + widths=widths.cpu().detach().numpy(), + device=centers.device, + ) + + @classmethod + @abstractmethod + def sympy_formula(cls) -> sympy.Expr: + """ + The abstract method that defines the membership function of the fuzzy set using sympy. + + Returns: + A sympy.Expr object that represents the membership function of the fuzzy set. + """ + raise NotImplementedError("The sympy_formula method must be implemented.") + + @abc.abstractmethod + def forward(self, observations) -> Membership: + """ + Forward pass of the function. Applies the function to the input elementwise. + + Args: + observations: Two-dimensional matrix of observations, + where a row is a single observation and each column + is related to an attribute measured during that observation. + + Returns: + The membership degrees of the observations for the Gaussian fuzzy set. + """ + raise NotImplementedError( + "The ContinuousFuzzySet has no defined forward function. Please create a class and " + "inherit from ContinuousFuzzySet, or use a predefined class, such as Gaussian." + ) diff --git a/src/fuzzy/sets/continuous/group.py b/src/fuzzy/sets/continuous/group.py new file mode 100644 index 0000000..9a5a734 --- /dev/null +++ b/src/fuzzy/sets/continuous/group.py @@ -0,0 +1,466 @@ +""" +This module contains the GroupedFuzzySets class, which is a generic and abstract torch.nn.Module +class that contains a torch.nn.ModuleList of ContinuousFuzzySet objects. The expectation here is +that each ContinuousFuzzySet may define fuzzy sets of different conventions, such as Gaussian, +Triangular, Trapezoidal, etc. Then, subsequent inference engines can handle these heterogeneously +defined fuzzy sets with no difficulty. Further, this class was specifically designed to incorporate +dynamic addition of new fuzzy sets in the construction of neuro-fuzzy networks via network morphism. +""" + +import pickle +import inspect +from pathlib import Path +from typing import List, Tuple, Any, Dict, Set, Type, Union + +import torch +from natsort import natsorted + +from .membership import Membership +from .abstract import ContinuousFuzzySet +from .utils import get_object_attributes, find_widths + + +class GroupedFuzzySets(torch.nn.Module): + """ + A generic and abstract torch.nn.Module class that contains a torch.nn.ModuleList + of ContinuousFuzzySet objects. The expectation here is that each ContinuousFuzzySet + may define fuzzy sets of different conventions, such as Gaussian, Triangular, Trapezoidal, etc. + Then, subsequent inference engines can handle these heterogeneously defined fuzzy sets + with no difficulty. Further, this class was specifically designed to incorporate dynamic + addition of new fuzzy sets in the construction of neuro-fuzzy networks via network morphism. + + However, this class does *not* carry out any functionality that is necessarily tied to fuzzy + sets, it is simply named so as this was its intended purpose - grouping fuzzy sets. In other + words, the same "trick" of using a torch.nn.ModuleList of torch.nn.Module objects applies to + any kind of torch.nn.Module object. + """ + + def __init__(self, *args, modules_list=None, expandable=False, **kwargs): + super().__init__(*args, **kwargs) + if modules_list is None: + modules_list = [] + self.modules_list = torch.nn.ModuleList(modules_list) + self.expandable = expandable + self.pruning = False + self.epsilon = 1.5 # epsilon-completeness + # keep track of minimums and maximums if for fuzzy set width calculation + self.minimums: torch.Tensor = torch.empty(0, 0) + self.maximums: torch.Tensor = torch.empty(0, 0) + # store data that we have seen to later add new fuzzy sets + self.data_seen: torch.Tensor = torch.empty(0, 0) + # after we see this many data points, we will update the fuzzy sets + self.data_limit_until_update: int = 64 + + def __getattribute__(self, item): + try: + if item in ("centers", "widths", "mask"): + modules_list = self.__dict__["_modules"]["modules_list"] + if len(modules_list) > 0: + module_attributes: List[torch.Tensor] = ( + [] + ) # the secondary response denoting module filter + for module in modules_list: + # get the method for the module and then call it + item_method: callable = getattr(module, f"get_{item}") + module_attributes.append(item_method()) + return torch.cat(module_attributes, dim=-1) + raise ValueError( + "The torch.nn.ModuleList of GroupedFuzzySets is empty." + ) + return object.__getattribute__(self, item) + except AttributeError: + return self.__getattr__(item) + + def save(self, path: Path) -> None: + """ + Save the model to the given path. + + Args: + path: The path to save the GroupedFuzzySet to. + + Returns: + None + """ + # get the attributes that are local to the class, but not inherited from the super class + local_attributes_only = get_object_attributes(self) + + # save a reference to the attributes (and their values) so that when iterating over them, + # we do not modify the dictionary while iterating over it (which would cause an error) + # we modify the dictionary by removing attributes that have a value of torch.nn.ModuleList + # because we want to save the modules in the torch.nn.ModuleList separately + local_attributes_only_items: List[Tuple[str, Any]] = list( + local_attributes_only.items() + ) + for attr, value in local_attributes_only_items: + if isinstance( + value, torch.nn.ModuleList + ): # e.g., attr may be self.modules_list + for idx, module in enumerate(value): + subdirectory = path / attr / str(idx) + subdirectory.mkdir(parents=True, exist_ok=True) + if isinstance(module, ContinuousFuzzySet): + # save the fuzzy set using the fuzzy set's special protocol + module.save( + path / attr / str(idx) / f"{module.__class__.__name__}.pt" + ) + else: + # unknown and unrecognized module, but attempt to save the module + torch.save( + module, + path / attr / str(idx) / f"{module.__class__.__name__}.pt", + ) + # remove the torch.nn.ModuleList from the local attributes + del local_attributes_only[attr] + + # save the remaining attributes + with open(path / f"{self.__class__.__name__}.pickle", "wb") as handle: + pickle.dump(local_attributes_only, handle, protocol=pickle.HIGHEST_PROTOCOL) + + @classmethod + def load(cls, path: Path, device: Union[str, torch.device]) -> "GroupedFuzzySets": + """ + Load the model from the given path. + + Args: + path: The path to load the GroupedFuzzySet from. + device: The device to load the GroupedFuzzySet to. + + Returns: + The loaded GroupedFuzzySet. + """ + if isinstance(device, str): + device = torch.device(device) + modules_list = [] + local_attributes_only: Dict[str, Any] = {} + for file_path in path.iterdir(): + if ".pickle" in file_path.name: + # load the remaining attributes + with open(file_path, "rb") as handle: + local_attributes_only.update(pickle.load(handle)) + elif file_path.is_dir(): + for subdirectory in natsorted(file_path.iterdir()): + if subdirectory.is_dir(): + module_path: Path = list(subdirectory.glob("*.pt"))[0] + # load the fuzzy set using the fuzzy set's special protocol + class_name: str = module_path.name.split(".pt")[0] + try: + modules_list.append( + ContinuousFuzzySet.get_subclass(class_name).load( + module_path, device=device + ) + ) + except ValueError: + # unknown and unrecognized module, but attempt to load the module + modules_list.append(torch.load(module_path)) + else: + raise UserWarning( + f"Unexpected file found in {file_path}: {module_path}" + ) + local_attributes_only[file_path.name] = modules_list + + # of the remaining attributes, we must determine which are shared between the + # super class and the local class, otherwise we will get an error when trying to + # initialize the local class (more specifically, the torch.nn.Module __init__ method + # requires self.call_super_init to be set to True, but then the attribute would exist + # as a super class attribute, and not a local class attribute) + shared_args: Set[str] = set( + inspect.signature(cls).parameters.keys() + ).intersection(local_attributes_only.keys()) + + # create the GroupedFuzzySet object with the shared arguments + # (e.g., modules_list, expandable) + grouped_fuzzy_set: GroupedFuzzySets = cls( + **{ + key: value + for key, value in local_attributes_only.items() + if key in shared_args + } + ) + + # determine the remaining attributes + remaining_args: Dict[str, Any] = { + key: value + for key, value in local_attributes_only.items() + if key not in shared_args + } + + # set the remaining attributes + for attr, value in remaining_args.items(): + setattr(grouped_fuzzy_set, attr, value) + + return grouped_fuzzy_set + + def calculate_module_responses(self, observations) -> Membership: + """ + Calculate the responses from the modules in the torch.nn.ModuleList of GroupedFuzzySets. + """ + if len(self.modules_list) > 0: + # modules' responses are membership degrees when modules are ContinuousFuzzySet + if len(self.modules_list) == 1: + # for computational efficiency, return the response from the only module + return self.modules_list[0](observations) + + # this can be computationally expensive, but it is necessary to calculate the responses + # from all the modules in the torch.nn.ModuleList of GroupedFuzzySets + # ideally this should be done in parallel, but it is not possible with the current + # implementation; only use this if the torch.nn.Module objects are different + module_elements: List[torch.Tensor] = [] + module_memberships: List[torch.Tensor] = ( + [] + ) # the primary response from the module + module_masks: List[torch.Tensor] = ( + [] + ) # the secondary response denoting module filter + for module in self.modules_list: + membership: Membership = module(observations) + module_elements.append(membership.elements) + module_memberships.append(membership.degrees) + module_masks.append(membership.mask) + return Membership( + elements=torch.cat(module_elements, dim=-1), + degrees=torch.cat(module_memberships, dim=-1), + mask=torch.cat(module_masks, dim=-1), + ) + raise ValueError("The torch.nn.ModuleList of GroupedFuzzySets is empty.") + + def expand( + self, observations, module_responses, module_masks + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Expand the GroupedFuzzySets if necessary. + """ + if self.expandable and self.training: + # save the data that we have seen + if self.data_seen.shape[0] == 0: # buffer is empty, shape of (0, 0) + self.data_seen = observations + else: + self.data_seen = torch.cat([self.data_seen, observations], dim=0) + + if self.data_seen.shape[0] % self.data_limit_until_update == 0: + # keep a running tally of mins and maxs of the domain + minimums = self.data_seen.min(dim=0).values + maximums = self.data_seen.max(dim=0).values + + if ( + self.minimums.shape[0] == 0 and self.maximums.shape[0] == 0 + ): # first time + self.minimums = minimums + self.maximums = maximums + else: + self.minimums = torch.min(minimums, self.minimums).detach() + self.maximums = torch.max(maximums, self.maximums).detach() + + # find where the new centers should be added, if any + # LogGaussian was used, then use following to check for real membership degrees: + # for module in self.modules_list: + # if isinstance(module, LogGaussian) and not isinstance( + # module, Gaussian + # ): + # with torch.no_grad(): + # assert ( + # module_responses.exp() * module_masks + # ).max().item() <= 1.0, "Membership degrees are not in the range [0, 1]." + + exemplars: List[torch.Tensor] = [] + + max_peaks: int = 3 + for var_idx in range(self.data_seen.shape[-1]): + discovered_exemplars: torch.Tensor = self.evenly_spaced_exemplars( + self.data_seen[:, var_idx], max_peaks + ) + if discovered_exemplars.ndim == 1: + discovered_exemplars = discovered_exemplars[:, None] + + num_of_exemplars_found = discovered_exemplars.shape[0] + if num_of_exemplars_found < max_peaks: + # pad the exemplars with torch.nan if there are not enough exemplars + discovered_exemplars = torch.nn.functional.pad( + discovered_exemplars, + pad=(0, 0, 0, max_peaks - num_of_exemplars_found), + value=torch.nan, + ) + + exemplars.append(discovered_exemplars.transpose(0, 1)) + + if len(exemplars) == 0: + # no exemplars found in any dimension + return observations, module_responses, module_masks + + exemplars: torch.Tensor = torch.vstack(exemplars).transpose(0, 1) + + # Create a new matrix with nan values + new_centers = torch.full_like(exemplars, float("nan")) + + # Use torch.where to update values that satisfy the condition + new_centers = torch.where( + self.calculate_module_responses(exemplars) + .degrees.exp() + .max(dim=-1) # TODO: assuming LogGaussian was used (exp) + .values + < self.epsilon, + exemplars, + new_centers, + ) + + if not new_centers.isnan().all(): # add new centers + # TODO: this find_centers_and_widths call is problematic + new_widths: torch.Tensor = find_widths( + data_point=new_centers.nan_to_num(0.0).mean(dim=0), + minimums=self.minimums, + maximums=self.maximums, + alpha=0.3, + ) + + # new_widths = torch.tensor( + # [term["widths"] for term in terms], device=self.data_seen.device + # ) + + # assert new_widths.isnan().any() is False + + # create the widths for the new centers + new_widths = ( + # only keep the widths for the entries that are not torch.nan + ~torch.isnan(new_centers) + * new_widths + ) + (torch.isnan(new_centers) * -1) + + # above result is tensor that contains new centers, but also contains torch.nan + # in the places where a new center is not needed + + new_centers = ( + new_centers.nan_to_num(0.0) + .transpose(0, 1) + .max(dim=-1, keepdim=True) + .values + ) + new_widths = ( + new_widths.transpose(0, 1).max(dim=-1, keepdim=True).values + ) + + # TODO: this code does not work for torch.jit.script + # the following assumes only the first module is to be expanded + module = self.modules_list[0] + module._centers.append(module.make_parameter(parameter=new_centers)) + module._widths.append(module.make_parameter(parameter=new_widths)) + module._mask.append(module.make_mask(widths=new_widths)) + + # TODO: this code does not work for torch.jit.script + # the following assumes an entire new module is to be added + # module_type = type(self.modules_list[0]) # cannot call type + # if issubclass(module_type, ContinuousFuzzySet): + # # cannot call .get_subclass + # granule = ContinuousFuzzySet.get_subclass(module_type.__name__)( + # centers=new_centers, + # widths=new_widths, + # ) # cannot dynamically create a PyTorch module in torch.jit.script + # else: + # raise ValueError( + # "The module type is not ContinuousFuzzySet, and therefore cannot " + # "be used for dynamic expansion." + # ) + + # granule = LogGaussian( + # centers=new_centers.nan_to_num(0.0) + # .transpose(0, 1) + # .max(dim=-1, keepdim=True) + # .values, + # widths=new_widths.transpose(0, 1) + # .max(dim=-1, keepdim=True) + # .values, + # device=self.data_seen.device + # ) + # print( + # f"add {granule.centers.shape}; modules already: {len(self.modules_list)}" + # ) + # print(f"to dimensions: {set(range(len(sets))) - set(empty_sets)}") + # self.modules_list.add_module(str(len(self.modules_list)), granule) + + # clear the history + self.data_seen = torch.empty(0, 0) + + # reduce the number of torch.nn.Modules in the list for computational efficiency + # (this is not necessary, but it is a good idea) + # self.prune(module_type) + + ( + _, + module_responses, + module_masks, + ) = self.calculate_module_responses(observations) + return observations, module_responses, module_masks + + @staticmethod + def evenly_spaced_exemplars(data: torch.Tensor, max_peaks: int) -> torch.Tensor: + """ + Find the peaks in the data and return the peaks, or a subset of the peaks if there are + more than max_peaks. + + Args: + data: The data to find the peaks in. + max_peaks: The maximum number of peaks to return. + + Returns: + The peaks, or a subset of the peaks if there are more than max_peaks. + """ + # Find the peaks in a 1D tensor + peaks = (data[1:-1] > data[:-2]) & (data[1:-1] > data[2:]) + peak_indices = torch.nonzero(peaks).squeeze() + 1 + if peak_indices.ndim > 0 and len(peak_indices) <= max_peaks: + sampled_peak_values = data[peak_indices][ + :, None + ] # return the peaks' values + else: + sampled_peaks_indices = torch.linspace( + 0, len(peak_indices) - 1, max_peaks, dtype=torch.int + ) + sampled_peaks = peak_indices[sampled_peaks_indices] + sampled_peak_values = data[sampled_peaks][:, None] + return torch.as_tensor( + sampled_peak_values, dtype=torch.float16, device=data.device + ) + + def prune(self, module_type: Type[ContinuousFuzzySet]) -> None: + """ + Prune the torch.nn.ModuleList of GroupedFuzzySets by keeping the first module, but + collapsing the rest of the modules into a single module. This is done to reduce the + number of torch.nn.Modules in the list for computational efficiency. + """ + if self.pruning and len(self.modules_list) > 5: + centers, widths = [], [] + for module in self.modules_list[1:]: + if module.centers.shape[-1] > 1: + centers.append(module.centers.mean(dim=-1, keepdim=True)) + widths.append(module.widths.max(dim=-1, keepdim=True).values) + else: + centers.append(module.centers) + widths.append(module.widths) + if issubclass(module_type, ContinuousFuzzySet): + module = ContinuousFuzzySet.get_subclass(module_type.__name__)( + centers=torch.cat(centers, dim=-1), + widths=torch.cat(widths, dim=-1), + ) + print(module.centers.shape) + else: + raise ValueError( + "The module type is not ContinuousFuzzySet, and therefore cannot " + "be used for dynamic expansion." + ) + self.modules_list = torch.nn.ModuleList([self.modules_list[0], module]) + + def forward(self, observations) -> Membership: + """ + Calculate the responses from the modules in the torch.nn.ModuleList of GroupedFuzzySets. + Expand the GroupedFuzzySets if necessary. + """ + ( + _, # module_elements + module_responses, + module_masks, + ) = self.calculate_module_responses(observations) + + # TODO: this code does not work for torch.jit.script + # self.expand(observations, module_responses, module_masks) + + return Membership( + elements=observations, degrees=module_responses, mask=module_masks + ) diff --git a/src/fuzzy/sets/continuous/impl.py b/src/fuzzy/sets/continuous/impl.py new file mode 100644 index 0000000..7054adf --- /dev/null +++ b/src/fuzzy/sets/continuous/impl.py @@ -0,0 +1,450 @@ +""" +Implements various membership functions by inheriting from ContinuousFuzzySet. +""" + +from typing import List, Union + +import sympy +import torch + +from .membership import Membership +from .abstract import ContinuousFuzzySet + + +class LogGaussian(ContinuousFuzzySet): + """ + Implementation of the Log Gaussian membership function, written in PyTorch. + This is a modified version that helps when the dimensionality is high, + and TSK product inference engine will be used. + """ + + def __init__( + self, + centers=None, + widths=None, + width_multiplier: float = 1.0, # in fuzzy logic, convention is usually 1.0, but can be 2.0 + labels: List[str] = None, + device: Union[str, torch.device] = torch.device("cpu"), + ): + super().__init__(centers=centers, widths=widths, labels=labels, device=device) + self.width_multiplier = width_multiplier + assert int(self.width_multiplier) in [1, 2] + + # @property + # @torch.jit.ignore + # def sigmas(self) -> torch.Tensor: + # """ + # Gets the sigma for the Gaussian fuzzy set; alias for the 'widths' parameter. + # + # Returns: + # torch.Tensor + # """ + # return self.widths + # + # @sigmas.setter + # @torch.jit.ignore + # def sigmas(self, sigmas) -> None: + # """ + # Sets the sigma for the Gaussian fuzzy set; alias for the 'widths' parameter. + # + # Returns: + # None + # """ + # self.widths = sigmas + + @staticmethod + def internal_calculate_membership( + observations: torch.Tensor, + centers: torch.Tensor, + widths: torch.Tensor, + width_multiplier: float, + ) -> torch.Tensor: + """ + Calculate the membership of the observations to the Log Gaussian fuzzy set. + This is a static method, so it can be called without instantiating the class. + This static method is particularly useful when animating the membership function. + + Warning: This method is not meant to be called directly, as it does not take into account + the mask that likely should exist. Use the calculate_membership method instead. + + Args: + observations: The observations to calculate the membership for. + centers: The centers of the Log Gaussian fuzzy set. + widths: The widths of the Log Gaussian fuzzy set. + width_multiplier: The width multiplier of the Log Gaussian fuzzy set. + + Returns: + The membership degrees of the observations for the Log Gaussian fuzzy set. + """ + return -1.0 * ( + torch.pow( + observations - centers, + 2, + ) + / (width_multiplier * torch.pow(widths, 2) + 1e-32) + ) + + @classmethod + @torch.jit.ignore + def sympy_formula(cls) -> sympy.Expr: + # centers (c), widths (sigma) and observations (x) + center_symbol = sympy.Symbol("c") + width_symbol = sympy.Symbol("sigma") + input_symbol = sympy.Symbol("x") + return sympy.sympify( + f"-1.0 * pow(({input_symbol} - {center_symbol}), 2) / (2.0 * pow({width_symbol}, 2))" + ) + + def calculate_membership(self, observations: torch.Tensor) -> torch.Tensor: + """ + Calculate the membership of the observations to the Log Gaussian fuzzy set. + + Args: + observations: The observations to calculate the membership for. + + Returns: + The membership degrees of the observations for the Log Gaussian fuzzy set. + """ + return LogGaussian.internal_calculate_membership( + observations=observations, + centers=self.get_centers(), + widths=self.get_widths(), + width_multiplier=self.width_multiplier, + ) + + def forward(self, observations) -> Membership: + if observations.ndim == self.get_centers().ndim: + observations = observations.unsqueeze(dim=-1) + # we do not need torch.float64 for observations + degrees: torch.Tensor = self.calculate_membership(observations.float()) + + # assert ( + # not degrees.isnan().any() + # ), "NaN values detected in the membership degrees." + # assert ( + # not degrees.isinf().any() + # ), "Infinite values detected in the membership degrees." + + return Membership( + elements=observations, + degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, + mask=self.get_mask(), + ) + + +class Gaussian(LogGaussian): + """ + Implementation of the Gaussian membership function, written in PyTorch. + """ + + @staticmethod + def internal_calculate_membership( + observations: torch.Tensor, + centers: torch.Tensor, + widths: torch.Tensor, + width_multiplier: float = 1.0, # in fuzzy logic, convention is usually 1.0, but can be 2.0 + ) -> torch.Tensor: + """ + Calculate the membership of the observations to the Gaussian fuzzy set. + This is a static method, so it can be called without instantiating the class. + This static method is particularly useful when animating the membership function. + + Warning: This method is not meant to be called directly, as it does not take into account + the mask that likely should exist. Use the calculate_membership method instead. + + Args: + observations: The observations to calculate the membership for. + centers: The centers of the Gaussian fuzzy set. + widths: The widths of the Gaussian fuzzy set. + width_multiplier: The width multiplier of the Gaussian fuzzy set. + + Returns: + The membership degrees of the observations for the Gaussian fuzzy set. + """ + return torch.exp( + -1.0 + * ( + torch.pow( + observations - centers, + 2, + ) + / (width_multiplier * torch.pow(widths, 2) + 1e-32) + ) + ) + # return LogGaussian.internal_calculate_membership( + # centers=centers, + # widths=widths, + # width_multiplier=width_multiplier, + # observations=observations, + # ).exp() + + @classmethod + @torch.jit.ignore + def sympy_formula(cls) -> sympy.Expr: + return sympy.exp(LogGaussian.sympy_formula()) + + def calculate_membership(self, observations: torch.Tensor) -> torch.Tensor: + return Gaussian.internal_calculate_membership( + observations=observations, + centers=self.get_centers(), + widths=self.get_widths(), + width_multiplier=1.0, + ) + + def forward(self, observations) -> Membership: + if observations.ndim == self.get_centers().ndim: + observations = observations.unsqueeze(dim=-1) + # we do not need torch.float64 for observations + degrees: torch.Tensor = self.calculate_membership(observations.float()) + + # assert ( + # not degrees.isnan().any() + # ), "NaN values detected in the membership degrees." + # assert ( + # not degrees.isinf().any() + # ), "Infinite values detected in the membership degrees." + + return Membership( + elements=observations, + degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, + mask=self.get_mask(), + ) + + +class Lorentzian(ContinuousFuzzySet): + """ + Implementation of the Lorentzian membership function, written in PyTorch. + """ + + def __init__( + self, + centers=None, + widths=None, + labels: List[str] = None, + device: Union[str, torch.device] = torch.device("cpu"), + ): + super().__init__(centers=centers, widths=widths, labels=labels, device=device) + + @property + @torch.jit.ignore + def sigmas(self) -> torch.Tensor: + """ + Gets the sigma for the Lorentzian fuzzy set; alias for the 'widths' parameter. + + Returns: + torch.Tensor + """ + return self.widths + + @sigmas.setter + @torch.jit.ignore + def sigmas(self, sigmas) -> None: + """ + Sets the sigma for the Lorentzian fuzzy set; alias for the 'widths' parameter. + + Returns: + None + """ + self.widths = sigmas + + @staticmethod + def internal_calculate_membership( + observations: torch.Tensor, centers: torch.Tensor, widths: torch.Tensor + ) -> torch.Tensor: + """ + Calculate the membership of the observations to the Lorentzian fuzzy set. + This is a static method, so it can be called without instantiating the class. + This static method is particularly useful when animating the membership function. + + Warning: This method is not meant to be called directly, as it does not take into account + the mask that likely should exist. Use the calculate_membership method instead. + + Args: + observations: The observations to calculate the membership for. + centers: The centers of the Lorentzian fuzzy set. + widths: The widths of the Lorentzian fuzzy set. + + Returns: + The membership degrees of the observations for the Lorentzian fuzzy set. + """ + return 1 / (1 + torch.pow((centers - observations) / (0.5 * widths), 2)) + + @classmethod + @torch.jit.ignore + def sympy_formula(cls) -> sympy.Expr: + # centers (c), widths (sigma) and observations (x) + center_symbol = sympy.Symbol("c") + width_symbol = sympy.Symbol("sigma") + input_symbol = sympy.Symbol("x") + return sympy.sympify( + f"1 / (1 + pow(({center_symbol} - {input_symbol}) / (0.5 * {width_symbol}), 2))" + ) + + def calculate_membership(self, observations: torch.Tensor) -> torch.Tensor: + """ + Calculate the membership of the observations to the Lorentzian fuzzy set. + + Args: + observations: The observations to calculate the membership for. + + Returns: + The membership degrees of the observations for the Lorentzian fuzzy set. + """ + return Lorentzian.internal_calculate_membership( + observations=observations, + centers=self.get_centers(), + widths=self.get_widths(), + ) + + def forward(self, observations) -> Membership: + if observations.ndim == self.get_centers().ndim: + observations = observations.unsqueeze(dim=-1) + # we do not need torch.float64 for observations + degrees: torch.Tensor = self.calculate_membership(observations.float()) + + assert ( + not degrees.isnan().any() + ), "NaN values detected in the membership degrees." + assert ( + not degrees.isinf().any() + ), "Infinite values detected in the membership degrees." + + return Membership( + elements=observations, + degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, + mask=self.get_mask(), + ) + + +class LogisticCurve(torch.nn.Module): + """ + A generic torch.nn.Module class that implements a logistic curve, which allows us to + tune the midpoint, and growth of the curve, with a fixed supremum (the supremum is + the maximum value of the curve). + """ + + def __init__( + self, + midpoint: float, + growth: float, + supremum: float, + device: Union[str, torch.device] = "cpu", + ): + super().__init__() + if isinstance(device, str): + device = torch.device(device) + self.device: torch.device = device + self.midpoint = torch.nn.Parameter( + torch.as_tensor(midpoint, dtype=torch.float16, device=self.device), + requires_grad=True, # explicitly set to True for clarity + ) + self.growth = torch.nn.Parameter( + torch.as_tensor(growth, dtype=torch.float16, device=self.device), + requires_grad=True, # explicitly set to True for clarity + ) + self.supremum = torch.nn.Parameter( + torch.as_tensor(supremum, dtype=torch.float16, device=self.device), + requires_grad=False, # not a parameter, so we don't want to track it + ) + + def forward(self, tensors: torch.Tensor) -> torch.Tensor: + """ + Calculate the value of the logistic curve at the given point. + + Args: + tensors: + + Returns: + + """ + return self.supremum / ( + 1 + torch.exp(-1.0 * self.growth * (tensors - self.midpoint)) + ) + + +class Triangular(ContinuousFuzzySet): + """ + Implementation of the Triangular membership function, written in PyTorch. + """ + + def __init__( + self, + centers=None, + widths=None, + labels: List[str] = None, + device: Union[str, torch.device] = torch.device("cpu"), + ): + super().__init__(centers=centers, widths=widths, labels=labels, device=device) + + @staticmethod + def internal_calculate_membership( + centers: torch.Tensor, widths: torch.Tensor, observations: torch.Tensor + ) -> torch.Tensor: + """ + Calculate the membership of the observations to the Triangular fuzzy set. + This is a static method, so it can be called without instantiating the class. + This static method is particularly useful when animating the membership function. + + Warning: This method is not meant to be called directly, as it does not take into account + the mask that likely should exist. Use the calculate_membership method instead. + + Args: + centers: The centers of the Triangular fuzzy set. + widths: The widths of the Triangular fuzzy set. + observations: The observations to calculate the membership for. + + Returns: + The membership degrees of the observations for the Triangular fuzzy set. + """ + return torch.max( + 1.0 - (1.0 / widths) * torch.abs(observations - centers), + torch.tensor(0.0), + ) + + @classmethod + @torch.jit.ignore + def sympy_formula(cls) -> sympy.Expr: + # centers (c), widths (w) and observations (x) + center_symbol = sympy.Symbol("c") + width_symbol = sympy.Symbol("w") + input_symbol = sympy.Symbol("x") + return sympy.sympify( + f"max(1.0 - (1.0 / {width_symbol}) * abs({input_symbol} - {center_symbol}), 0.0)" + ) + + def calculate_membership(self, observations: torch.Tensor) -> torch.Tensor: + """ + Forward pass of the function. Applies the function to the input elementwise. + + Args: + observations: Two-dimensional matrix of observations, + where a row is a single observation and each column + is related to an attribute measured during that observation. + + Returns: + The membership degrees of the observations for the Triangular fuzzy set. + """ + return Triangular.internal_calculate_membership( + observations=observations, + centers=self.get_centers(), + widths=self.get_widths(), + ) + + def forward(self, observations) -> Membership: + if observations.ndim == self.get_centers().ndim: + observations = observations.unsqueeze(dim=-1) + # we do not need torch.float64 for observations + degrees: torch.Tensor = self.calculate_membership(observations.float()) + + assert ( + not degrees.isnan().any() + ), "NaN values detected in the membership degrees." + assert ( + not degrees.isinf().any() + ), "Infinite values detected in the membership degrees." + + return Membership( + elements=observations, + degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, + mask=self.get_mask(), + ) diff --git a/src/fuzzy/sets/continuous/membership.py b/src/fuzzy/sets/continuous/membership.py new file mode 100644 index 0000000..99a898e --- /dev/null +++ b/src/fuzzy/sets/continuous/membership.py @@ -0,0 +1,39 @@ +""" +The Membership class contains information describing both membership *degrees* and membership *mask* +for some given *elements*. The membership degrees are often the degree of membership, truth, +activation, applicability, etc. of a fuzzy set, or more generally, a concept. The membership mask is +shaped such that it helps filter or 'mask' out membership degrees that belong to fuzzy sets or +concepts that are not actually real. The distinction between the two is made as applying the mask +will zero out membership degrees that are not real, but this might be incorrectly interpreted as +having zero degree of membership to the fuzzy set. By including the elements' information with the +membership degrees and mask, it is possible to keep track of the original elements that were used to +calculate the membership degrees. This is useful for debugging purposes, and it is also useful for +understanding the membership degrees and mask in the context of the original elements. Also, it can +be used in conjunction with the mask to filter out membership degrees that are not real, as well as +assist in performing advanced operations. +""" + +from collections import namedtuple + + +class Membership( + namedtuple(typename="Membership", field_names=("elements", "degrees", "mask")) +): + """ + The Membership class contains information describing both membership *degrees* and + membership *mask* for some given *elements*. The membership degrees are often the degree of + membership, truth, activation, applicability, etc. of a fuzzy set, or more generally, a concept. + The membership mask is shaped such that it helps filter or 'mask' out membership degrees that + belong to fuzzy sets or concepts that are not actually real. + + The distinction between the two is made as applying the mask will zero out membership degrees + that are not real, but this might be incorrectly interpreted as having zero degree of + membership to the fuzzy set. + + By including the elements' information with the membership degrees and mask, it is possible to + keep track of the original elements that were used to calculate the membership degrees. This + is useful for debugging purposes, and it is also useful for understanding the membership + degrees and mask in the context of the original elements. Also, it can be used in conjunction + with the mask to filter out membership degrees that are not real, as well as assist in + performing advanced operations. + """ diff --git a/src/fuzzy/sets/continuous/utils.py b/src/fuzzy/sets/continuous/utils.py new file mode 100644 index 0000000..a67f0ad --- /dev/null +++ b/src/fuzzy/sets/continuous/utils.py @@ -0,0 +1,99 @@ +""" +Utility functions, such as for getting all the subclasses of a given class. +""" + +import inspect +from typing import Dict, Any, Set + +import torch + + +def all_subclasses(cls) -> Set[Any]: + """ + Get all subclasses of the given class, recursively. + + Returns: + A set of all subclasses of the given class. + """ + return {cls}.union(s for c in cls.__subclasses__() for s in all_subclasses(c)) + + +def get_object_attributes(obj_instance) -> Dict[str, Any]: + """ + Get the attributes of an object instance. + """ + # get the attributes that are local to the class, but may be inherited from the super class + local_attributes = inspect.getmembers( + obj_instance, + lambda attr: not (inspect.ismethod(attr)) and not (inspect.isfunction(attr)), + ) + # get the attributes that are inherited from (or found within) the super class + super_attributes = inspect.getmembers( + obj_instance.__class__.__bases__[0], + lambda attr: not (inspect.ismethod(attr)) and not (inspect.isfunction(attr)), + ) + # get the attributes that are local to the class, but not inherited from the super class + return { + attr: value + for attr, value in local_attributes + if (attr, value) not in super_attributes and not attr.startswith("_") + } + + +def regulator(sigma_1: torch.Tensor, sigma_2: torch.Tensor) -> torch.Tensor: + """ + Regulator function as defined in CLIP. + + Args: + sigma_1: The left sigma/width. + sigma_2: The right sigma/width. + + Returns: + sigma (float): An adjusted sigma so that the produced + Gaussian membership function is not warped. + """ + return (1 / 2) * (sigma_1 + sigma_2) + + +def find_widths(data_point, minimums, maximums, alpha: float) -> torch.Tensor: + """ + Find the centers and widths to be used for a newly created fuzzy set. + + Args: + data_point (1D Numpy array): A single input_data observation where + each column is a feature/attribute. + minimums (iterable): The minimum value per feature in X. + maximums (iterable): The maximum value per feature in X. + alpha (float): A hyperparameter to adjust the generated widths' coverage. + + Returns: + A list of dictionaries, where each dictionary contains the center and width + for a newly created fuzzy set (that is to be created later). + """ + # The variable 'theta' is added to accommodate for the instance in which an observation has + # values that are the minimum/maximum. Otherwise, when determining the Gaussian membership, + # a division by zero will occur; it essentially acts as an error tolerance. + theta: float = 1e-8 + sigmas = torch.empty((0, 0)) + for dim, attribute_value in enumerate(data_point): + left_width: torch.Tensor = torch.sqrt( + -1.0 + * ( + torch.pow((minimums[dim] - attribute_value) + theta, 2) + / torch.log(torch.as_tensor([alpha], device=attribute_value.device)) + ) + ) + right_width: torch.Tensor = torch.sqrt( + -1.0 + * ( + torch.pow((maximums[dim] - attribute_value) + theta, 2) + / torch.log(torch.as_tensor([alpha], device=attribute_value.device)) + ) + ) + aggregated_sigma: torch.Tensor = regulator(left_width, right_width) + if sigmas.shape[0] == 0: + sigmas = aggregated_sigma + else: + sigmas = torch.hstack((sigmas, aggregated_sigma)) + + return sigmas diff --git a/src/fuzzy/sets/discrete.py b/src/fuzzy/sets/discrete.py new file mode 100644 index 0000000..2e666f1 --- /dev/null +++ b/src/fuzzy/sets/discrete.py @@ -0,0 +1,252 @@ +""" +Implements the discrete fuzzy sets. +""" + +from abc import ABC, abstractmethod +from typing import Union, List, NoReturn, Tuple + +import sympy +import numpy as np +from sympy import Symbol +import matplotlib.pyplot as plt +from matplotlib.axes import Axes +from matplotlib.figure import Figure + + +# https://docs.sympy.org/latest/modules/integrals/integrals.html +# https://docs.sympy.org/latest/modules/sets.html +# https://numpydoc.readthedocs.io/en/latest/example.html + + +class BaseDiscreteFuzzySet(ABC): + """ + A parent class for all fuzzy sets to inherit. Allows the user to visualize the fuzzy set. + """ + + def __init__(self, formulas: list, name: str): + self.formulas = formulas + self.name = name + + @abstractmethod + def degree(self, element) -> NoReturn: + """ + Calculates degree of membership for the provided "element" where element is a(n) int/float. + + Args: + element: The element is from the universe of discourse. + + Returns: + NotImplementedError as this is an abstract method. + """ + raise NotImplementedError( + "Attempted to call an abstract method from BaseDiscreteFuzzySet." + ) + + def fetch(self, element: Union[int, float]): + """ + Fetch the corresponding formula for the provided element where element is a(n) int/float. + + Parameters + ---------- + element : 'float' + The element is from the universe of discourse X. + + Returns + ------- + formula : 'tuple'/'None' + Returns the tuple containing the formula and corresponding Interval. + Returns None if a formula for the element could not be found. + """ + for formula in self.formulas: + if formula[1].contains( + element + ): # check the formula's interval to see if it contains element + return formula + return None + + def plot( + self, lower: float = 0, upper: float = 100, samples: int = 100 + ) -> (Figure, Axes): + """ + Graphs the fuzzy set in the universe of elements. + + Parameters + ---------- + lower : 'float', optional + Default value is 0. Specifies the infimum value for the graph. + upper : 'float', optional + Default value is 100. Specifies the supremum value for the graph. + samples : 'int', optional + Default value is 100. Specifies the number of values to test in the domain + to approximate the graph. A higher sample value will yield a higher resolution + of the graph, but large values will lead to performance issues. + """ + fig, axes = plt.subplots() + x_list = np.linspace(lower, upper, samples) + y_list = [] + for x_value in x_list: + y_list.append(self.degree(x_value)) + if self.name is not None: + plt.title(f"{self.name} Fuzzy Set") + else: + plt.title("Unnamed Fuzzy Set") + + axes.set_xlim([lower, upper]) + axes.set_ylim([0, 1.1]) + axes.set_xlabel("Elements of Universe") + axes.set_ylabel("Degree of Membership") + axes.plot(x_list, y_list, color="grey", label="mu") + axes.legend() + return fig, axes + + +class DiscreteFuzzySet(BaseDiscreteFuzzySet): + """ + An ordinary fuzzy set that is of type 1 and level 1. + """ + + def __init__(self, formulas: List[tuple], name: str = None): + """ + Parameters + ---------- + formulas : 'list' + A list of 2-tuples. The first element in the tuple at index 0 is the formula + equal to f(x) and the second element in the tuple at index 1 is the Interval + where the formula in the tuple is valid. + + Warning: Formulas should be organized in the list such that the formulas and + their corresponding intervals are specified from the smallest possible x values + to the largest possible x values. + + The list of formulas provided constitutes the piece-wise function of the + fuzzy set's membership function. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy sets in the same space. + """ + BaseDiscreteFuzzySet.__init__(self, formulas, name) + + def degree(self, element: Union[int, float]): + """ + Calculates degree of membership for the provided element where element is a(n) int/float. + + Parameters + ---------- + element : 'float' + The element is from the universe of discourse X. + + Returns + ------- + mu : 'float' + The degree of membership for the element. + """ + formula = self.fetch(element)[0] + try: + membership = float(formula.subs(Symbol("x"), element)) + except AttributeError: + membership = formula + return membership + + def height(self) -> float: + """ + Calculates the height of the fuzzy set. + + Returns: + The height, or supremum, of the fuzzy set. + """ + heights = [] + for formula in self.formulas: + if isinstance(formula[0], sympy.Expr): + inf_x = formula[1].inf + sup_x = formula[1].sup + if formula[1].left_open: + inf_x += 1e-8 + if formula[1].right_open: + sup_x -= 1e-8 + inf_y = formula[0].subs(Symbol("x"), inf_x) + sup_y = formula[0].subs(Symbol("x"), sup_x) + heights.append(inf_y) + heights.append(sup_y) + else: + heights.append(formula[0]) + return max(heights) + + +class FuzzyVariable(BaseDiscreteFuzzySet): + """ + A fuzzy variable, or linguistic variable, that contains fuzzy sets. + """ + + def __init__(self, fuzzy_sets: List[BaseDiscreteFuzzySet], name=None): + """ + Parameters + ---------- + fuzzy_sets : 'list' + A list of elements each of type OrdinaryDiscreteFuzzySet. + name : 'str'/'None' + Default value is None. Allows the user to specify the name of the fuzzy set. + This feature is useful when visualizing the fuzzy set, and its interaction with + other fuzzy sets in the same space. + """ + BaseDiscreteFuzzySet.__init__(self, fuzzy_sets, name) + self.fuzzy_sets = fuzzy_sets + + def degree(self, element: Union[int, float]) -> Tuple[float]: + """ + Calculates the degree of membership for the provided element value + where element is a(n) int/float. + + Args: + element: The element from the universe of discourse. + + Returns: + The degree of membership for the element. + """ + degrees = [] + for fuzzy_set in self.formulas: + degrees.append(float(fuzzy_set.degree(element))) + return tuple(degrees) + + def plot( + self, lower: float = 0, upper: float = 100, samples: int = 100 + ) -> (Figure, Axes): + """ + Graphs the fuzzy set in the universe of elements. + + Args: + lower: Default value is 0. Specifies the infimum value for the graph. + upper: Default value is 100. Specifies the supremum value for the graph. + samples: Default value is 100. Specifies the number of values to test in the domain + to approximate the graph. A higher sample value will yield a higher resolution + of the graph, but large values will lead to performance issues. + + Returns: + None + """ + fig, axes = plt.subplots() + for fuzzy_set in self.fuzzy_sets: + x_list = np.linspace(lower, upper, samples) + y_list = [] + for x_value in x_list: + y_list.append(fuzzy_set.degree(x_value)) + axes.plot( + x_list, + y_list, + color=np.random.rand( + 3, + ), + label=fuzzy_set.name, + ) + + if self.name is not None: + axes.set_title(f"{self.name} Fuzzy Variable") + else: + axes.set_title("Unnamed Fuzzy Variable") + + axes.set_xlim([lower, upper]) + axes.set_ylim([0, 1.1]) + axes.set_xlabel("Elements of Universe") + axes.set_ylabel("Degree of Membership") + axes.legend() + return fig, axes diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_relations/__init__.py b/tests/test_relations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_relations/test_aggregation.py b/tests/test_relations/test_aggregation.py new file mode 100644 index 0000000..c76a5e2 --- /dev/null +++ b/tests/test_relations/test_aggregation.py @@ -0,0 +1,226 @@ +""" +Test the aggregation operator called the ordered weighted averaging operator. +""" + +import unittest + +import torch + +from fuzzy.relations.continuous.aggregation import OrderedWeightedAveraging as OWA + + +class TestOrderedWeightedAggregation(unittest.TestCase): + """ + Unit tests that check the ordered weighted averaging operator is working as intended. + """ + + def test_in_features_not_equal_to_weight_vector(self) -> None: + """ + Test that when trying to create a OrderedWeightedAveraging object with a weights vector + that does not equal the number of input features specified, that an AttributeError is + thrown. + + Returns: + None + """ + weights = torch.tensor([0.2, 0.3, 0.1]) + assert torch.isclose(weights.sum(), torch.tensor(0.6)) + try: + in_features = 4 + OWA(in_features, weights) + assert False + except AttributeError: + # an AttributeError exception should be thrown when the weights vector != in_features + assert True + + def test_weight_vector_sums_to_one(self) -> None: + """ + An OWA operator module should be created when given a weights vector that sums to 1. + + Returns: + None + """ + weights = torch.tensor([0.2, 0.3, 0.1, 0.4]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + + def test_weight_vector_not_sum_to_one(self) -> None: + """ + Attempting to create a OWA with a weight vector that does + not sum to 1 should throw an AttributeError exception. + + Returns: + None + """ + weights = torch.tensor([0.2, 0.3, 0.1, 0.3]) + assert torch.isclose(weights.sum(), torch.tensor(0.9)) + try: + in_features = len(weights) + OWA(in_features, weights) + assert False + except AttributeError: + # an AttributeError exception should be thrown when the weights do not sum to 1 + assert True + + def test_owa_calculation_1(self) -> None: + """ + A OWA operator should sort the argument vector to produce an 'ordered argument vector', + then calculate the dot product between the weights vector and ordered argument vector. + + This test replicates an example from the original paper. + + Returns: + None + """ + weights = torch.tensor([0.2, 0.3, 0.1, 0.4]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + argument_vector = torch.tensor([0.6, 1.0, 0.3, 0.5]) + assert torch.isclose(owa(argument_vector), torch.tensor(0.55)) + + def test_owa_calculation_2(self) -> None: + """ + A OWA operator should sort the argument vector to produce an 'ordered argument vector', + then calculate the dot product between the weights vector and ordered argument vector. + + This test replicates an example from the original paper. + + Returns: + None + """ + weights = torch.tensor([0.2, 0.3, 0.1, 0.4]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + argument_vector = torch.tensor([0, 0.7, 1.0, 0.2]) + assert torch.isclose(owa(argument_vector), torch.tensor(0.43)) + + def test_orness_1(self) -> None: + """ + The degree to which the Ordered Weighted Averaging (OWA) operator is the 'or' operator. + + A degree of 1 means the OWA operator is the 'or' operator, and this occurs when the first + element of the weights vector is equal to 1 and all other elements in the weights + vector are zero. + + This test replicates an example from the original paper. + + Returns: + None + """ + weights = torch.tensor([1.0, 0.0, 0.0, 0.0]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + assert owa.orness() == 1.0 + + def test_orness_2(self) -> None: + """ + The degree to which the Ordered Weighted Averaging (OWA) operator is the 'or' operator. + + A degree of 0 means the OWA operator is the 'and' operator, and this occurs when the last + element of the weights vector is equal to 1 and all other elements in the weights + vector are zero. + + This test replicates an example from the original paper. + + Returns: + None + """ + weights = torch.tensor([0.0, 0.0, 0.0, 1.0]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + assert owa.orness() == 0.0 + + def test_orness_3(self) -> None: + """ + The degree to which the Ordered Weighted Averaging (OWA) operator is the 'or' operator. + + A degree of 1 means the OWA operator is the 'or' operator, and this occurs when the first + element of the weights vector is equal to 1 and all other elements in the weights + vector are zero. + + This test follows an example from the original paper. + + Returns: + None + """ + number_of_elements = 4 + weights = torch.tensor([1 / number_of_elements] * number_of_elements) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + # will be 0.5 for any number of 'number_of_elements' if the weights vector + # consists of 1/number_of_elements values + assert owa.orness() == 0.5 + + def test_orness_4(self) -> None: + """ + The 'orness' measure can be misleading when Ordered Weighted Averaging operators are + defined with certain weights vectors as illustrated here. Both weights vectors should + have a 'orness' of 0.5, despite being clearly different. The measure of dispersion is + introduced to address this. + + This test replicates an example from the original paper. + + Returns: + None + """ + # considered to be more volatile and uses less input + weights = torch.tensor([0.0, 0.0, 1.0, 0.0, 0.0]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa1 = OWA(in_features, weights) + assert torch.isclose(owa1.weights, weights).all() + number_of_elements = 5 + weights = torch.tensor([1 / number_of_elements] * number_of_elements) + assert weights.sum() == 1.0 + in_features = len(weights) + owa2 = OWA(in_features, weights) + assert torch.isclose(owa2.weights, weights).all() + # will be 0.5 for any number of 'number_of_elements' if the weights vector + # consists of 1/number_of_elements values + assert owa1.orness() == 0.5 + assert owa2.orness() == 0.5 + assert owa1.orness() == owa2.orness() + + def test_dispersion_1(self) -> None: + """ + This scenario represents the minimum dispersion. + + Returns: + None + """ + # considered to be more volatile and uses less input + weights = torch.tensor([0.0, 0.0, 1.0, 0.0, 0.0]) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + assert owa.dispersion() == 0.0 + + def test_dispersion_2(self) -> None: + """ + This scenario represents the maximum dispersion and occurs when the entries in the + weights vector are 1/number_of_elements where number_of_elements is the number of + elements in the weights vector. + + Returns: + None + """ + number_of_elements = 5 + weights = torch.tensor([1 / number_of_elements] * number_of_elements) + assert weights.sum() == 1.0 + in_features = len(weights) + owa = OWA(in_features, weights) + assert torch.isclose(owa.weights, weights).all() + assert owa.dispersion() == torch.log(torch.tensor(number_of_elements)) diff --git a/tests/test_relations/test_extension.py b/tests/test_relations/test_extension.py new file mode 100644 index 0000000..56863cf --- /dev/null +++ b/tests/test_relations/test_extension.py @@ -0,0 +1,73 @@ +""" +Test extensions of the BaseDiscreteFuzzySet, such as AlphaCut or SpecialFuzzySet. +""" + +import unittest + +from fuzzy.relations.discrete.snorm import StandardUnion +from fuzzy.relations.discrete.tnorm import StandardIntersection +from fuzzy.relations.discrete.extension import AlphaCut, SpecialFuzzySet +from examples.discrete.student import known, learned + + +class TestAlphaCut(unittest.TestCase): + """ + Test the alpha cut operation works as intended. + """ + + def test_alpha_cut(self) -> None: + """ + Test the AlphaCut operation (class) works as intended. + + Returns: + None + """ + alpha_cut = AlphaCut(known(), 0.6, "AlphaCut") + assert alpha_cut.degree(80) == alpha_cut.alpha + + +class TestSpecialFuzzySet(unittest.TestCase): + """ + Test that we can create a special fuzzy set. + """ + + def test_special_fuzzy_set(self) -> None: + """ + Test the SpecialFuzzySet class works as intended. + + Returns: + None + """ + special_fuzzy_set = SpecialFuzzySet(known(), 0.5, "Special") + assert special_fuzzy_set.height() == special_fuzzy_set.alpha + assert special_fuzzy_set.degree(80) == special_fuzzy_set.alpha + + +class TestDiscreteFuzzyRelation(unittest.TestCase): + """ + Test the discrete fuzzy relations. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.terms = [known(), learned()] + + def test_standard_intersection(self) -> None: + """ + Test the standard intersection of two fuzzy sets. + + Returns: + None + """ + standard_intersection = StandardIntersection(fuzzy_sets=self.terms) + assert standard_intersection.degree(87) == 0.4 + + def test_standard_union(self) -> None: + """ + Test the standard union of two fuzzy sets. + + Returns: + None + """ + standard_union = StandardUnion(fuzzy_sets=self.terms) + assert standard_union.degree(87) == 0.6 diff --git a/tests/test_relations/test_tnorms.py b/tests/test_relations/test_tnorms.py new file mode 100644 index 0000000..d762b0e --- /dev/null +++ b/tests/test_relations/test_tnorms.py @@ -0,0 +1,94 @@ +""" +Test various t-norm operations, such as the algebraic product. +""" + +import unittest + +import torch +import numpy as np + +from fuzzy.relations.continuous.tnorm import AlgebraicProduct + + +def algebraic_product(elements: np.ndarray, importance: np.ndarray) -> np.float32: + """ + Numpy calculation of the algebraic product. + + Args: + elements: The elements to be multiplied. + importance: The importance of each element. + + Returns: + The algebraic product of the given elements. + """ + return np.prod(elements * importance) + + +class TestAlgebraicProduct(unittest.TestCase): + """ + Test the algebraic product operation. + """ + + def test_single_input(self) -> None: + """ + The t-norm of a single input (w/o importance) should be == input. + """ + element = torch.rand(1) + n_inputs = 1 + tnorm = AlgebraicProduct(n_inputs) + importance_before_calculation = tnorm.importance + mu_pytorch = tnorm(element) + mu_numpy = algebraic_product( + element.cpu().detach().numpy(), + importance_before_calculation.cpu().detach().numpy(), + ) + + # make sure the parameters are still identical afterward + assert torch.isclose(tnorm.importance, importance_before_calculation).all() + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.isclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-8).all() + + def test_multi_input(self) -> None: + """ + Test that the algebraic product is correctly calculated when multiple inputs are given. + + Returns: + None + """ + elements = torch.rand(4) + n_inputs = len(elements) + tnorm = AlgebraicProduct(n_inputs) + importance_before_calculation = tnorm.importance + mu_pytorch = tnorm(elements) + mu_numpy = algebraic_product( + elements.cpu().detach().numpy(), + importance_before_calculation.cpu().detach().numpy(), + ) + + # make sure the parameters are still identical afterward + assert torch.isclose(tnorm.importance, importance_before_calculation).all() + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.isclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-8).all() + + def test_multi_input_with_importance_given(self) -> None: + """ + Test that the algebraic product is correctly calculated when multiple inputs (and their + varying degrees of importance) are given. + + Returns: + None + """ + elements = torch.rand(5) + n_inputs = len(elements) + importance_before_calculation = torch.tensor([0.0, 0.25, 0.5, 0.75, 1.0]) + tnorm = AlgebraicProduct(n_inputs, importance=importance_before_calculation) + mu_pytorch = tnorm(elements) + mu_numpy = algebraic_product( + elements.cpu().detach().numpy(), + importance_before_calculation.cpu().detach().numpy(), + ) + + # make sure the parameters are still identical afterward + assert torch.isclose(tnorm.importance, importance_before_calculation).all() + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.isclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-8).all() diff --git a/tests/test_sets/__init__.py b/tests/test_sets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_sets/continuous/__init__.py b/tests/test_sets/continuous/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_sets/continuous/impl/__init__.py b/tests/test_sets/continuous/impl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_sets/continuous/impl/common.py b/tests/test_sets/continuous/impl/common.py new file mode 100644 index 0000000..682d30e --- /dev/null +++ b/tests/test_sets/continuous/impl/common.py @@ -0,0 +1,21 @@ +""" +Common functions for the unit testing of the continuous fuzzy sets. +""" + +import torch + + +def get_test_elements(device: torch.device) -> torch.Tensor: + """ + Get test elements for the unit testing of the continuous fuzzy sets. + + Args: + device: The device to use. + + Returns: + The test elements. + """ + return torch.tensor( + [[0.41737163], [0.78705574], [0.40919196], [0.72005216]], + device=device, + ) diff --git a/tests/test_sets/continuous/impl/test_gaussian.py b/tests/test_sets/continuous/impl/test_gaussian.py new file mode 100644 index 0000000..97956ae --- /dev/null +++ b/tests/test_sets/continuous/impl/test_gaussian.py @@ -0,0 +1,389 @@ +""" +Test the Gaussian fuzzy set (i.e., membership function). +""" + +import unittest + +import torch +import numpy as np + +from fuzzy.sets.continuous.impl import Gaussian +from tests.test_sets.continuous.impl.common import get_test_elements + + +AVAILABLE_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +def gaussian_numpy(element: np.ndarray, center: np.ndarray, sigma: np.ndarray): + """ + Gaussian membership function that receives an 'element' value, and uses + the 'center' and 'sigma' to determine a degree of membership for 'element'. + Implemented in Numpy and used in testing. + + Args: + element: The element which we want to retrieve its membership degree. + center: The center of the Gaussian fuzzy set. + sigma: The width of the Gaussian fuzzy set. + + Returns: + The membership degree of 'element'. + """ + return np.exp(-1.0 * (np.power(element - center, 2) / (1.0 * np.power(sigma, 2)))) + + +class TestGaussian(unittest.TestCase): + """ + Test the Gaussian fuzzy set (i.e., membership function). + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.elements = get_test_elements(device=AVAILABLE_DEVICE) + + def test_single_input(self) -> None: + """ + Test that single input works for the Gaussian membership function. + + Returns: + None + """ + element = torch.zeros(1, device=AVAILABLE_DEVICE) + gaussian_mf = Gaussian( + centers=np.array([1.5409961]), + widths=np.array([0.30742282]), + device=AVAILABLE_DEVICE, + ) + sigma = gaussian_mf.get_widths().cpu().detach().numpy() + center = gaussian_mf.get_centers().cpu().detach().numpy() + mu_pytorch = gaussian_mf(element).degrees.to_dense() + mu_numpy = gaussian_numpy(element.cpu().detach().numpy(), center, sigma) + + # make sure the Gaussian parameters are still identical afterward + assert torch.allclose( + gaussian_mf.get_widths(), torch.tensor(sigma, device=AVAILABLE_DEVICE) + ) + assert torch.allclose( + gaussian_mf.get_centers(), + torch.tensor(center, device=AVAILABLE_DEVICE), + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-6) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted(element).degrees.to_dense(), + mu_pytorch, + ) + + def test_multi_input(self) -> None: + """ + Test that multiple input works for the Gaussian membership function. + + Returns: + None + """ + gaussian_mf = Gaussian( + centers=np.array([1.5410]), + widths=np.array([0.3074]), + device=AVAILABLE_DEVICE, + ) + centers, sigmas = ( + gaussian_mf.get_centers().cpu().detach().numpy(), + gaussian_mf.get_widths().cpu().detach().numpy(), + ) + mu_pytorch = gaussian_mf(self.elements).degrees.to_dense() + mu_numpy = gaussian_numpy(self.elements.cpu().detach().numpy(), centers, sigmas) + + # make sure the Gaussian parameters are still identical afterward + assert torch.allclose( + gaussian_mf.get_widths(), + torch.tensor(sigmas, device=AVAILABLE_DEVICE).float(), + ) + assert torch.allclose( + gaussian_mf.get_centers(), + torch.tensor(centers, device=AVAILABLE_DEVICE).float(), + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + # note that the PyTorch version has an extra dimension (4, 1, 1) compared to Numpy's (4, 1) + assert np.allclose( + mu_pytorch.cpu().detach().numpy().flatten(), mu_numpy.flatten() + ) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_input_with_centers_given(self) -> None: + """ + Test that multiple input works for the Gaussian membership function when centers are + specified for the fuzzy sets. + + Returns: + None + """ + centers = np.array([0.0, 0.25, 0.5, 0.75, 1.0]) + sigmas = np.array([0.4962566, 0.7682218, 0.08847743, 0.13203049, 0.30742282]) + gaussian_mf = Gaussian( + centers=centers, + widths=sigmas, + device=AVAILABLE_DEVICE, + ) + mu_pytorch = gaussian_mf(self.elements).degrees.to_dense() + mu_numpy = gaussian_numpy(self.elements.cpu().detach().numpy(), centers, sigmas) + + # make sure the Gaussian parameters are still identical afterward + assert torch.allclose( + gaussian_mf.get_widths(), + torch.tensor(sigmas, device=AVAILABLE_DEVICE).float(), + ) + assert torch.allclose( + gaussian_mf.get_centers(), + torch.tensor(centers, device=AVAILABLE_DEVICE).float(), + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, rtol=1e-4 + ) + + expected_areas = torch.tensor( + [0.7412324, 1.1474512, 0.13215375, 0.1972067, 0.45918167], + device=AVAILABLE_DEVICE, + ) + assert torch.allclose(gaussian_mf.area(), expected_areas) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_input_with_sigmas_given(self) -> None: + """ + Test that multiple input works for the Gaussian membership function when sigmas are + specified for the fuzzy sets. + + Returns: + None + """ + sigmas = np.array( + [0.1, 0.25, 0.5, 0.75, 1.0] + ) # negative widths are missing sets + gaussian_mf = Gaussian( + centers=np.array([1.5410]), + widths=sigmas, + device=AVAILABLE_DEVICE, + ) + mu_pytorch = gaussian_mf(self.elements).degrees.to_dense() + mu_numpy = gaussian_numpy( + self.elements.cpu().detach().numpy(), + gaussian_mf.get_centers().cpu().detach().numpy(), + sigmas, + ) + + # make sure the Gaussian parameters are still identical afterward + assert np.allclose(gaussian_mf.get_widths().cpu().detach().numpy(), sigmas) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, rtol=1e-6 + ) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_input_with_both_given(self) -> None: + """ + Test that multiple input works for the Gaussian membership function when centers and + sigmas are specified for the fuzzy sets. + + Returns: + None + """ + centers = np.array([-0.5, -0.25, 0.25, 0.5, 0.75]) + sigmas = np.array( + [0.1, 0.25, 0.5, 0.75, 1.0] + ) # negative widths are missing sets + gaussian_mf = Gaussian( + centers=centers, + widths=sigmas, + device=AVAILABLE_DEVICE, + ) + mu_pytorch = gaussian_mf(self.elements).degrees.to_dense() + mu_numpy = gaussian_numpy(self.elements.cpu().detach().numpy(), centers, sigmas) + + # make sure the Gaussian parameters are still identical afterward + assert np.allclose(gaussian_mf.get_centers().cpu().detach().numpy(), centers) + assert np.allclose(gaussian_mf.get_widths().cpu().detach().numpy(), sigmas) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, rtol=1e-6 + ) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_centers(self) -> None: + """ + Test that multidimensional centers work with the Gaussian membership function. + + Returns: + None + """ + elements = torch.tensor( + [ + [ + [0.6960, 0.8233, 0.8147], + [0.1024, 0.3122, 0.5160], + [0.8981, 0.6810, 0.2366], + ], + [ + [0.2447, 0.4218, 0.6146], + [0.8887, 0.6273, 0.6697], + [0.1439, 0.9383, 0.8101], + ], + ], + device=AVAILABLE_DEVICE, + ) + centers = np.array( + [ + [ + [0.1236, 0.4893, 0.8372], + [0.8275, 0.2979, 0.7192], + [0.2328, 0.1418, 0.1036], + ], + [ + [0.9651, 0.7622, 0.1544], + [0.1274, 0.5798, 0.6425], + [0.1518, 0.6554, 0.3799], + ], + ] + ) + sigmas = np.array([[[0.1, 0.25, 0.5]]]) # negative widths are missing sets + gaussian_mf = Gaussian( + centers=centers, + widths=sigmas, + device=AVAILABLE_DEVICE, + ) + mu_pytorch = gaussian_mf(elements.unsqueeze(dim=0)).degrees.to_dense() + mu_numpy = gaussian_numpy(elements.cpu().detach().numpy(), centers, sigmas) + + # make sure the Gaussian parameters are still identical afterward + assert np.allclose(gaussian_mf.get_centers().cpu().detach().numpy(), centers) + assert np.allclose(gaussian_mf.get_widths().cpu().detach().numpy(), sigmas) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=0).cpu().detach().numpy(), mu_numpy, rtol=1e-6 + ) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted( + elements.unsqueeze(dim=0), # add batch dimension (size is 1) + ).degrees.to_dense(), + mu_pytorch, + ) + + def test_consistency(self) -> None: + """ + Test that the results are consistent with the expected membership degrees. + + Returns: + None + """ + element = np.array( + [[0.0001712], [0.00393354], [-0.03641258], [-0.01936134]], dtype=np.float32 + ) + centers = np.array( + [ + [0.01497397, -1.3607662, 1.0883657, 1.9339248], + [-0.01367673, 2.3560243, -1.8339163, -3.3379893], + [-4.489564, -0.01467094, -0.13278057, 0.08638719], + [0.17008819, 0.01596639, -1.7408595, 2.797653], + ] + ) + sigmas = np.array( + [ + [1.16553577, 1.48497267, 0.91602303, 0.91602303], + [1.98733806, 2.53987592, 1.58646032, 1.24709336], + [1.24709336, 0.10437003, 0.12908118, 0.08517358], + [0.08517358, 1.54283158, 1.89779089, 1.27380911], + ] + ) + target_membership_degrees = torch.tensor( + gaussian_numpy( + element[:, :, None], + center=centers, + sigma=sigmas, + ), + device=AVAILABLE_DEVICE, + dtype=torch.float32, + ) + + gaussian_mf = Gaussian( + centers=centers, + widths=sigmas, + device=AVAILABLE_DEVICE, + ) + mu_pytorch = gaussian_mf( + torch.tensor(element, device=AVAILABLE_DEVICE) + ).degrees.to_dense() + + # make sure the Gaussian parameters are still identical afterward + assert np.allclose( + gaussian_mf.get_centers().cpu().detach().numpy(), + centers, + ) + assert np.allclose(gaussian_mf.get_widths().cpu().detach().numpy(), sigmas) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert torch.allclose(mu_pytorch, target_membership_degrees, rtol=1e-1) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted( + torch.tensor(element, device=AVAILABLE_DEVICE) + ).degrees.to_dense(), + mu_pytorch, + ) + + def test_create_random(self) -> None: + """ + Test that a random fuzzy set of this type can be created and that the results are consistent + with the expected membership degrees. + + Returns: + None + """ + gaussian_mf = Gaussian.create( + number_of_variables=4, number_of_terms=4, device=AVAILABLE_DEVICE + ) + element = np.array([[0.0001712, 0.00393354, -0.03641258, -0.01936134]]) + target_membership_degrees = gaussian_numpy( + element.reshape(4, 1), # column vector + gaussian_mf.get_centers().cpu().detach().numpy(), + gaussian_mf.get_widths().cpu().detach().numpy(), + ) + mu_pytorch = gaussian_mf( + torch.tensor(element, device=AVAILABLE_DEVICE) + ).degrees.to_dense() + assert np.allclose( + mu_pytorch.cpu().detach().numpy(), target_membership_degrees, atol=1e-1 + ) + + # test that this is compatible with torch.jit.script + gaussian_mf_scripted = torch.jit.script(gaussian_mf) + assert torch.allclose( + gaussian_mf_scripted( + torch.tensor(element, device=AVAILABLE_DEVICE) + ).degrees.to_dense(), + mu_pytorch, + ) diff --git a/tests/test_sets/continuous/impl/test_logistic.py b/tests/test_sets/continuous/impl/test_logistic.py new file mode 100644 index 0000000..219b85f --- /dev/null +++ b/tests/test_sets/continuous/impl/test_logistic.py @@ -0,0 +1,61 @@ +""" +Test the LogisticCurve class. +""" + +import unittest + +import torch + +from fuzzy.sets.continuous.impl import LogisticCurve + + +AVAILABLE_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +class TestLogistic(unittest.TestCase): + """ + Test the LogisticCurve class. + """ + + def test_logistic_curve(self) -> None: + """ + Test the calculation of the logistic curve. + + Returns: + None + """ + elements = torch.tensor( + [ + [-1.1258, -1.1524, -0.2506], + [-0.4339, 0.8487, 0.6920], + [-0.3160, -2.1152, 0.4681], + [-0.1577, 1.4437, 0.2660], + [0.1665, 0.8744, -0.1435], + [-0.1116, 0.9318, 1.2590], + [2.0050, 0.0537, 0.6181], + [-0.4128, -0.8411, -2.3160], + ], + device=AVAILABLE_DEVICE, + ) + logistic_curve = LogisticCurve(midpoint=0.5, growth=10, supremum=1) + + self.assertTrue( + torch.allclose( + logistic_curve(elements), + torch.tensor( + [ + [8.6944e-08, 6.6637e-08, 5.4947e-04], + [8.7920e-05, 9.7032e-01, 8.7214e-01], + [2.8578e-04, 4.3886e-12, 4.2092e-01], + [1.3901e-03, 9.9992e-01, 8.7864e-02], + [3.4390e-02, 9.7689e-01, 1.6018e-03], + [2.2024e-03, 9.8685e-01, 9.9949e-01], + [1.0000e00, 1.1396e-02, 7.6513e-01], + [1.0857e-04, 1.4986e-06, 5.8921e-13], + ], + device=AVAILABLE_DEVICE, + ), + atol=1e-4, + rtol=1e-4, + ) + ) diff --git a/tests/test_sets/continuous/impl/test_triangular.py b/tests/test_sets/continuous/impl/test_triangular.py new file mode 100644 index 0000000..cf0ef0b --- /dev/null +++ b/tests/test_sets/continuous/impl/test_triangular.py @@ -0,0 +1,273 @@ +""" +Test the Triangular fuzzy set (i.e., membership function). +""" + +import unittest + +import torch +import numpy as np + +from fuzzy.sets.continuous.impl import Triangular +from tests.test_sets.continuous.impl.common import get_test_elements + + +AVAILABLE_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +def triangular_numpy(element: np.ndarray, center: np.ndarray, width: np.ndarray): + """ + Triangular membership function that receives an 'element' value, and uses + the 'center' and 'width' to determine a degree of membership for 'element'. + Implemented in Numpy and used in testing. + + https://www.mathworks.com/help/fuzzy/trimf.html + + Args: + element: The element which we want to retrieve its membership degree. + center: The center of the Triangular fuzzy set. + width: The width of the Triangular fuzzy set. + + Returns: + The membership degree of 'element'. + """ + values = 1.0 - (1.0 / width) * np.abs(element - center) + values[(values < 0)] = 0 + return values + + +class TestTriangular(unittest.TestCase): + """ + Test the Triangular fuzzy set (i.e., membership function). + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.elements = get_test_elements(device=AVAILABLE_DEVICE) + + def test_single_input(self) -> None: + """ + Test that single input works for the Triangular membership function. + + Returns: + None + """ + element = np.array([0.0], dtype=np.float32) + triangular_mf = Triangular( + centers=np.array([1.5409961]), + widths=np.array([0.30742282]), + device=AVAILABLE_DEVICE, + ) + center = triangular_mf.get_centers().cpu().detach().numpy() + width = triangular_mf.get_widths().cpu().detach().numpy() + mu_pytorch = triangular_mf( + torch.tensor(element, device=AVAILABLE_DEVICE) + ).degrees.to_dense() + mu_numpy = triangular_numpy(element, center, width) + + # make sure the Triangular parameters are still identical afterward + assert torch.allclose( + triangular_mf.get_centers(), + torch.tensor(center, device=AVAILABLE_DEVICE), + ) + assert torch.allclose( + triangular_mf.get_widths(), + torch.tensor(width, device=AVAILABLE_DEVICE), + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, atol=1e-2) + + # test that this is compatible with torch.jit.script + triangular_mf_scripted = torch.jit.script(triangular_mf) + assert torch.allclose( + triangular_mf_scripted( + torch.tensor(element, device=AVAILABLE_DEVICE) + ).degrees.to_dense(), + mu_pytorch, + ) + + def test_multi_input(self) -> None: + """ + Test that multiple input works for the Triangular membership function. + + Returns: + None + """ + triangular_mf = Triangular( + centers=np.array([1.5410]), + widths=np.array([0.3074]), + device=AVAILABLE_DEVICE, + ) + centers, widths = ( + triangular_mf.get_centers().cpu().detach().numpy(), + triangular_mf.get_widths().cpu().detach().numpy(), + ) + mu_pytorch = triangular_mf(self.elements).degrees.to_dense() + mu_numpy = triangular_numpy( + self.elements.cpu().detach().numpy(), centers, widths + ) + + # make sure the Triangular parameters are still identical afterward + assert torch.allclose( + triangular_mf.get_centers(), + torch.tensor(centers, device=AVAILABLE_DEVICE), + ) + assert torch.allclose( + triangular_mf.get_widths(), + torch.tensor(widths, device=AVAILABLE_DEVICE), + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, atol=1e-2 + ) + + # test that this is compatible with torch.jit.script + triangular_mf_scripted = torch.jit.script(triangular_mf) + assert torch.allclose( + triangular_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_input_with_centers_given(self) -> None: + """ + Test that multiple input works for the Triangular membership function when centers are + specified for the fuzzy sets. + + Returns: + None + """ + centers = np.array([0.0, 0.25, 0.5, 0.75, 1.0], dtype=np.float32) + triangular_mf = Triangular( + centers=centers, widths=np.array([0.4962566]), device=AVAILABLE_DEVICE + ) + widths = triangular_mf.get_widths().cpu().detach().numpy() + mu_pytorch = triangular_mf(self.elements).degrees.to_dense() + mu_numpy = triangular_numpy( + self.elements.cpu().detach().numpy(), centers, widths + ) + + # make sure the Triangular parameters are still identical afterward + assert torch.allclose( + triangular_mf.get_centers(), + torch.tensor(centers, device=AVAILABLE_DEVICE), + ) + assert torch.allclose( + triangular_mf.get_widths(), + torch.tensor(widths, device=AVAILABLE_DEVICE), + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, atol=1e-2 + ) + + # test that this is compatible with torch.jit.script + triangular_mf_scripted = torch.jit.script(triangular_mf) + assert torch.allclose( + triangular_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_input_with_widths_given(self) -> None: + """ + Test that multiple input works for the Triangular membership function when widths are + specified for the fuzzy sets. + + Returns: + None + """ + widths = np.array( + [0.1, 0.25, 0.5, 0.75, 1.0], dtype=np.float32 + ) # negative widths are missing sets + triangular_mf = Triangular( + centers=np.array([1.5409961]), widths=widths, device=AVAILABLE_DEVICE + ) + centers = triangular_mf.get_centers().cpu().detach().numpy() + mu_pytorch = triangular_mf(self.elements).degrees.to_dense() + mu_numpy = triangular_numpy( + self.elements.cpu().detach().numpy(), centers, widths + ) + + # make sure the Triangular parameters are still identical afterward + assert torch.allclose( + triangular_mf.get_centers(), torch.tensor(centers, device=AVAILABLE_DEVICE) + ) + assert torch.allclose( + triangular_mf.get_widths(), torch.tensor(widths, device=AVAILABLE_DEVICE) + ) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, atol=1e-2 + ) + + # test that this is compatible with torch.jit.script + triangular_mf_scripted = torch.jit.script(triangular_mf) + assert torch.allclose( + triangular_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_multi_input_with_both_given(self) -> None: + """ + Test that multiple input works for the Triangular membership function when centers + and widths are specified for the fuzzy sets. + + Returns: + None + """ + centers = np.array([-0.5, -0.25, 0.25, 0.5, 0.75]) + widths = np.array( + [0.1, 0.25, 0.5, 0.75, 1.0] + ) # negative widths are missing sets + triangular_mf = Triangular( + centers=centers, widths=widths, device=AVAILABLE_DEVICE + ) + mu_pytorch = triangular_mf(self.elements).degrees.to_dense() + mu_numpy = triangular_numpy( + self.elements.cpu().detach().numpy(), centers, widths + ) + + # make sure the Triangular parameters are still identical afterward + assert np.allclose(triangular_mf.get_centers().cpu().detach().numpy(), centers) + assert np.allclose(triangular_mf.get_widths().cpu().detach().numpy(), widths) + # the outputs of the PyTorch and Numpy versions should be approx. equal + assert np.allclose( + mu_pytorch.squeeze(dim=1).cpu().detach().numpy(), mu_numpy, atol=1e-2 + ) + + # test that this is compatible with torch.jit.script + triangular_mf_scripted = torch.jit.script(triangular_mf) + assert torch.allclose( + triangular_mf_scripted(self.elements).degrees.to_dense(), mu_pytorch + ) + + def test_create_random(self) -> None: + """ + Test that a random fuzzy set of this type can be created and that the results are consistent + with the expected membership degrees. + + Returns: + None + """ + triangular_mf = Triangular.create( + number_of_variables=4, number_of_terms=4, device=AVAILABLE_DEVICE + ) + element = np.array([[0.0001712, 0.00393354, -0.03641258, -0.01936134]]) + target_membership_degrees = triangular_numpy( + element, + triangular_mf.get_centers().cpu().detach().numpy(), + triangular_mf.get_widths().cpu().detach().numpy(), + ) + mu_pytorch = triangular_mf( + torch.tensor(element[0], device=AVAILABLE_DEVICE) + ).degrees.to_dense() + assert np.allclose( + mu_pytorch.cpu().detach().numpy(), + target_membership_degrees, + rtol=1e-1, + atol=1e-1, + ) + + # test that this is compatible with torch.jit.script + triangular_mf_scripted = torch.jit.script(triangular_mf) + assert torch.allclose( + triangular_mf_scripted( + torch.tensor(element[0], device=AVAILABLE_DEVICE) + ).degrees.to_dense(), + mu_pytorch, + ) diff --git a/tests/test_sets/continuous/test_continuous.py b/tests/test_sets/continuous/test_continuous.py new file mode 100644 index 0000000..3db469e --- /dev/null +++ b/tests/test_sets/continuous/test_continuous.py @@ -0,0 +1,110 @@ +""" +Test that various continuous fuzzy set implementations are working as intended, such as the +Gaussian fuzzy set (i.e., membership function), and the Triangular fuzzy set (i.e., membership +function). +""" + +import os +import unittest +from pathlib import Path +from typing import MutableMapping +from collections import OrderedDict + +import torch + +from fuzzy.sets.continuous.impl import Gaussian +from fuzzy.sets.continuous.abstract import ContinuousFuzzySet + + +AVAILABLE_DEVICE: torch.device = torch.device( + "cuda" if torch.cuda.is_available() else "cpu" +) + + +class TestContinuousFuzzySet(unittest.TestCase): + """ + Test the abstract ContinuousFuzzySet class. + """ + + def test_illegal_attempt_to_create(self) -> None: + """ + Test that an illegal attempt to create a ContinuousFuzzySet raises an error. + + Returns: + None + """ + with self.assertRaises(NotImplementedError): + ContinuousFuzzySet.create( + number_of_variables=4, number_of_terms=2, device="cpu" + ) + + def test_save_and_load(self) -> None: + """ + Test that saving and loading a ContinuousFuzzySet works as intended. + + Returns: + None + """ + for subclass in ContinuousFuzzySet.__subclasses__(): + membership_func = subclass.create( + number_of_variables=4, number_of_terms=4, device=AVAILABLE_DEVICE + ) + state_dict: MutableMapping = membership_func.state_dict() + + # test that the path must be valid + with self.assertRaises(ValueError): + membership_func.save(Path("")) + with self.assertRaises(ValueError): + membership_func.save(Path("test")) + with self.assertRaises(ValueError): + membership_func.save( + Path("test.pth") + ) # this file extension is not supported; see error message to learn why + + # test that saving the state dict works + saved_state_dict: OrderedDict = membership_func.save( + Path("membership_func.pt") + ) + + # check that the saved state dict is the same as the original state dict + for key in state_dict.keys(): + assert key in saved_state_dict and torch.allclose( + state_dict[key], saved_state_dict[key] + ) + # except the saved state dict includes additional information not captured by + # the original state dict, such as the class name and the labels + assert "labels" in saved_state_dict.keys() + assert "class_name" in saved_state_dict.keys() and saved_state_dict[ + "class_name" + ] in (subclass.__name__ for subclass in ContinuousFuzzySet.__subclasses__()) + + loaded_membership_func = ContinuousFuzzySet.load( + Path("membership_func.pt"), device=AVAILABLE_DEVICE + ) + # check that the parameters and members are the same + assert membership_func == loaded_membership_func + assert torch.allclose( + membership_func.get_centers(), loaded_membership_func.get_centers() + ) + assert torch.allclose( + membership_func.get_widths(), loaded_membership_func.get_widths() + ) + if isinstance( + subclass, Gaussian + ): # Gaussian has an additional parameter (alias for widths) + assert torch.allclose( + membership_func.sigmas, loaded_membership_func.sigmas + ) + assert membership_func.labels == loaded_membership_func.labels + # check some functionality that it is still working + assert torch.allclose(membership_func.area(), loaded_membership_func.area()) + assert torch.allclose( + membership_func( + torch.tensor([[0.1, 0.2, 0.3, 0.4]], device=AVAILABLE_DEVICE) + ).degrees.to_dense(), + loaded_membership_func( + torch.tensor([[0.1, 0.2, 0.3, 0.4]], device=AVAILABLE_DEVICE) + ).degrees.to_dense(), + ) + # delete the file + os.remove("membership_func.pt") diff --git a/tests/test_sets/continuous/test_group.py b/tests/test_sets/continuous/test_group.py new file mode 100644 index 0000000..c625233 --- /dev/null +++ b/tests/test_sets/continuous/test_group.py @@ -0,0 +1,73 @@ +""" +Test functionality relating to GroupedFuzzySets. +""" + +import shutil +import unittest +from pathlib import Path + +import torch + +from fuzzy.sets.continuous.impl import Gaussian +from fuzzy.sets.continuous.group import GroupedFuzzySets +from fuzzy.sets.continuous.utils import get_object_attributes + + +AVAILABLE_DEVICE: torch.device = torch.device( + "cuda" if torch.cuda.is_available() else "cpu" +) + + +class TestGroupedFuzzySets(unittest.TestCase): + """ + Test the GroupedFuzzySets class. + """ + + def test_save_grouped_fuzzy_sets(self): + """ + Test saving grouped fuzzy sets. + """ + grouped_fuzzy_sets: GroupedFuzzySets = GroupedFuzzySets( + modules_list=[ + Gaussian.create( + number_of_variables=2, number_of_terms=3, device=AVAILABLE_DEVICE + ), + Gaussian.create( + number_of_variables=2, number_of_terms=3, device=AVAILABLE_DEVICE + ), + ] + ) + + # test compatibility with torch.jit.script + torch.jit.script(grouped_fuzzy_sets) + + # test that GroupedFuzzySets can be saved and loaded + grouped_fuzzy_sets.save(Path("test_grouped_fuzzy_sets")) + loaded_grouped_fuzzy_sets: GroupedFuzzySets = grouped_fuzzy_sets.load( + Path("test_grouped_fuzzy_sets"), device=AVAILABLE_DEVICE + ) + + for idx, module in enumerate(grouped_fuzzy_sets.modules_list): + assert torch.equal( + module.get_centers(), + loaded_grouped_fuzzy_sets.modules_list[idx].get_centers(), + ) + assert torch.equal( + module.get_widths(), + loaded_grouped_fuzzy_sets.modules_list[idx].get_widths(), + ) + + # check the remaining attributes are the same + for attribute in get_object_attributes(grouped_fuzzy_sets): + value = getattr(grouped_fuzzy_sets, attribute) + if isinstance(value, torch.nn.ModuleList): + continue # already checked above + if isinstance(value, torch.Tensor): + assert torch.equal(value, getattr(loaded_grouped_fuzzy_sets, attribute)) + else: # for non-tensors + assert value == getattr(loaded_grouped_fuzzy_sets, attribute) + + # delete the temporary directory using shutil, ignore errors if there are any + # read-only files + + shutil.rmtree(Path("test_grouped_fuzzy_sets"), ignore_errors=True) diff --git a/tests/test_sets/test_discrete.py b/tests/test_sets/test_discrete.py new file mode 100644 index 0000000..4882a73 --- /dev/null +++ b/tests/test_sets/test_discrete.py @@ -0,0 +1,148 @@ +""" +Test the DiscreteFuzzySet and FuzzyVariable classes. +""" + +import unittest + +from fuzzy.sets.discrete import ( + BaseDiscreteFuzzySet, + DiscreteFuzzySet, + FuzzyVariable, +) +from examples.discrete.student import ( + unknown, + known, + unsatisfactory_unknown, + learned, +) + + +class TestDiscreteFuzzySet(unittest.TestCase): + """ + Test the DiscreteFuzzySet (i.e., Linguistic Term) class. + """ + + def test_abstract_methods(self): + """ + Test that the abstract methods correctly throw a NotImplementedError. + + Returns: + None + """ + self.assertRaises( + NotImplementedError, BaseDiscreteFuzzySet.degree, None, element=0 + ) + + def test_empty_discrete_fuzzy_set(self) -> None: + """ + Test that an empty DiscreteFuzzySet object can be created. + + Returns: + None + """ + discrete_fuzzy_set = DiscreteFuzzySet(formulas=[], name="") + assert len(discrete_fuzzy_set.formulas) == 0 + assert len(discrete_fuzzy_set.name) == 0 + assert discrete_fuzzy_set.fetch(element=0.0) is None + + def test_create_discrete_fuzzy_set(self) -> None: + """ + Test that an empty DiscreteFuzzySet object can be created. + + Returns: + None + """ + discrete_fuzzy_set = known() + assert len(discrete_fuzzy_set.formulas) == 5 + assert discrete_fuzzy_set.name == "Known" + + def test_discrete_fuzzy_set_height(self) -> None: + """ + Test that the height of the DiscreteFuzzySet is correct. + + Returns: + None + """ + discrete_fuzzy_set = known() + assert discrete_fuzzy_set.height() == 1 + + def test_discrete_fuzzy_set_plot(self) -> None: + """ + Test that the plot method for the DiscreteFuzzySet works as intended. + + Returns: + None + """ + discrete_fuzzy_set = known() + _, axes = discrete_fuzzy_set.plot( + lower=0, upper=100, samples=100 + ) # ignore figure + assert axes.get_title() == "Known Fuzzy Set" + assert axes.get_xlabel() == "Elements of Universe" + assert axes.get_ylabel() == "Degree of Membership" + assert axes.get_xlim() == (0, 100) + assert axes.get_ylim() == (0, 1.1) + + # now checking that removing the name changes it to an "Unnamed" DiscreteFuzzySet + discrete_fuzzy_set.name = None + _, axes = discrete_fuzzy_set.plot( + lower=0, upper=100, samples=100 + ) # ignore figure + assert axes.get_title() == "Unnamed Fuzzy Set" + + +class TestFuzzyVariable(unittest.TestCase): + """ + Test the FuzzyVariable (i.e., Linguistic Variable) class. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.terms = [unknown(), known(), unsatisfactory_unknown(), learned()] + self.fuzzy_variable = FuzzyVariable( + fuzzy_sets=self.terms, name="Student Knowledge" + ) + + def test_create_fuzzy_variable(self) -> None: + """ + Test that a FuzzyVariable object can be created. + + Returns: + None + """ + assert len(self.fuzzy_variable.fuzzy_sets) == len(self.terms) + assert self.fuzzy_variable.name == "Student Knowledge" + + def test_fuzzy_variable_membership(self) -> None: + """ + Test the degree of membership for the FuzzyVariable is correct. + + Returns: + None + """ + actual_membership = self.fuzzy_variable.degree(element=73) + expected_membership = (0.0, 0.6, 0.4, 0.0) + assert actual_membership == expected_membership + + def test_fuzzy_variable_plot(self) -> None: + """ + Test the plot method for the FuzzyVariable. + + Returns: + None + """ + _, axes = self.fuzzy_variable.plot( + lower=0, upper=100, samples=100 + ) # ignore figure + assert axes.get_title() == "Student Knowledge Fuzzy Variable" + assert axes.get_xlabel() == "Elements of Universe" + assert axes.get_ylabel() == "Degree of Membership" + assert axes.get_xlim() == (0, 100) + assert axes.get_ylim() == (0, 1.1) + + # now checking that removing the name changes it to an "Unnamed" FuzzyVariable + self.fuzzy_variable.name = None + _, axes = self.fuzzy_variable.plot( + lower=0, upper=100, samples=100 + ) # ignore figure + assert axes.get_title() == "Unnamed Fuzzy Variable"