diff --git a/requirements.txt b/requirements.txt index e862889..f5a6776 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ numpy==1.26.4 # new is 2.0.1 (update when possible - tests fail otherwise) +igraph==0.11.6 +scipy==1.13.1 # for sparse matrices sympy==1.13.2 # for torch torch==2.4.0 torchquad==0.4.0 diff --git a/src/fuzzy/relations/continuous/n_ary.py b/src/fuzzy/relations/continuous/n_ary.py new file mode 100644 index 0000000..e49e8d0 --- /dev/null +++ b/src/fuzzy/relations/continuous/n_ary.py @@ -0,0 +1,249 @@ +""" +Classes for representing n-ary fuzzy relations, such as t-norms and t-conorms. These relations +are used to combine multiple membership values into a single value. The n-ary relations (of +differing types) can then be combined into a compound relation. +""" + +from typing import Union, Tuple, List + +import igraph +import torch +import numpy as np +import scipy.sparse as sps + +from fuzzy.sets.continuous.membership import Membership + + +class NAryRelation(torch.nn.Module): + """ + This class represents an n-ary fuzzy relation. An n-ary fuzzy relation is a relation that takes + n arguments and returns a (float) value. This class is useful for representing fuzzy relations + that take multiple arguments, such as a t-norm that takes two or more arguments and returns a + truth value. + """ + + # pylint: disable=too-many-instance-attributes + + def __init__( + self, + *indices: Union[Tuple[int, int], List[Tuple[int, int]]], + device: torch.device, + nan_replacement: float = 0.0, + **kwargs, + ): + """ + Apply an n-ary relation to the indices (i.e., relation's matrix) on the provided device. + + Args: + items: The 2-tuple indices to apply the n-ary relation to (e.g., (0, 1), (1, 0)). + device: The device to use for the relation. + nan_replacement: The value to use when a value is missing in the relation (i.e., nan); + this is useful for when input to the relation is not complete. Default is 0.0 + (penalize), a value of 1.0 would ignore missing values (i.e., do not penalize). + """ + super().__init__(**kwargs) + self.matrix = None # this will be created later (via self._rebuild) + self.graph = None # this will be created later (via self._rebuild) + self.device: torch.device = device + if nan_replacement not in [0.0, 1.0]: + raise ValueError("The nan_replacement must be either 0.0 or 1.0.") + self.nan_replacement: float = nan_replacement + self.indices: List[List[Tuple[int, int]]] = [] + + if not isinstance(indices[0], list): + indices = [indices] + + # this scenario is for when we have multiple compound indices that use the same relation + # this is useful for computational efficiency (i.e., not having to use a for loop) + self._coo_matrix: List[sps._coo.coo_matrix] = [] + self._original_shape: List[Tuple[int, int]] = [] + for relation_indices in indices: + if len(set(relation_indices)) < len(relation_indices): + raise ValueError( + "The indices must be unique for the relation to be well-defined." + ) + coo_matrix = self.convert_indices_to_matrix(relation_indices) + self._original_shape.append(coo_matrix.shape) + self._coo_matrix.append(coo_matrix) + # now convert to a list of matrices + max_var = max(t[0] for t in self._original_shape) + max_term = max(t[1] for t in self._original_shape) + self.indices.extend(indices) + self._rebuild(*(max_var, max_term)) + + @staticmethod + def convert_indices_to_matrix(indices) -> sps._coo.coo_matrix: + """ + Convert the given indices to a COO matrix. + + Args: + indices: The indices where a '1' will be placed at each index. + + Returns: + The COO matrix with a '1' at each index. + """ + data = np.ones(len(indices)) # a '1' indicates a relation exists + row, col = zip(*indices) + return sps.coo_matrix((data, (row, col)), dtype=np.int8) + + def create_ndarray(self, max_var: int, max_term: int) -> None: + """ + Make (or update) the numpy matrix from the COO matrices. + + Args: + max_var: The maximum number of variables. + max_term: The maximum number of terms. + + Returns: + None + """ + matrices = [] + for coo_matrix in self._coo_matrix: + # first resize + coo_matrix.resize(max_var, max_term) + matrices.append(coo_matrix.toarray()) + # make a new axis and stack long that axis + self.matrix: np.ndarray = np.stack(matrices).swapaxes(0, 1).swapaxes(1, 2) + + def create_igraph(self) -> None: + """ + Create the graph representation of the relation(s). + + Returns: + None + """ + graphs: List[igraph.Graph] = [] + for relation in self.indices: + # create a directed (mode="in") star graph with the relation as the center (vertex 0) + graphs.append(igraph.Graph.Star(n=len(relation) + 1, mode="in", center=0)) + # relation vertices are the first vertices in the graph + relation_vertex: igraph.Vertex = graphs[-1].vs.find(0) # located at index 0 + # set item and tags for the relation vertex for easy retrieval; name is for graph union + ( + relation_vertex["name"], + relation_vertex["item"], + relation_vertex["tags"], + ) = (hash(self) + hash(tuple(relation)), self, {"relation"}) + # anchor vertices are the var-term pairs that are involved in the relation vertex + anchor_vertices: List[igraph.Vertex] = relation_vertex.predecessors() + # set anchor vertices' item and tags for easy retrieval; name is for graph union + for anchor_vertex, index_pair in zip(anchor_vertices, relation): + anchor_vertex["name"], anchor_vertex["item"], anchor_vertex["tags"] = ( + index_pair, + index_pair, + {"anchor"}, + ) + self.graph = igraph.union(graphs, byname=True) + + def _rebuild(self, *shape) -> None: + """ + Rebuild the relation's matrix and graph. + + Args: + shape: The new shape of the n-ary fuzzy relation; assuming shape is (max_var, max_term). + + Returns: + None + """ + # re-create the self.matrix + self.create_ndarray(shape[0], shape[1]) + # re-create the self.graph + self.create_igraph() + # update the self.mask to reflect the new shape + # this mask is used to zero out the values that are not part of the relation + self.mask = torch.tensor(self.matrix, dtype=torch.float32, device=self.device) + + def resize(self, *shape) -> None: + """ + Resize the matrix in-place to the given shape, and then rebuild the relations' members. + + Args: + shape: The new shape of the matrix. + + Returns: + None + """ + for coo_matrix in self._coo_matrix: + coo_matrix.resize(*shape) + self._rebuild(*shape) + + def apply_mask(self, membership: Membership) -> torch.Tensor: + """ + Apply the n-ary relation's mask to the given memberships. + + Args: + membership: The membership values to apply the minimum n-ary relation to. + + Returns: + The masked membership values (zero may or may not be a valid degree of truth). + """ + membership_shape: torch.Size = membership.degrees.shape + if self.matrix.shape[:-1] != membership_shape[1:]: + # if len(membership_shape) > 2: + # this is for the case where masks have been stacked due to compound relations + membership_shape = membership_shape[1:] # get the last two dimensions + self.resize(*membership_shape) + # select memberships that are not zeroed out (i.e., involved in the relation) + after_mask = membership.degrees.unsqueeze(dim=-1) * self.mask.unsqueeze(0) + # the complement mask adds zeros where the mask is zero, these are not part of the relation + # nan_to_num is used to replace nan values with the nan_replacement value (often not needed) + return ( + (after_mask + (1 - self.mask)) + .prod(dim=2, keepdim=False) + .nan_to_num(self.nan_replacement) + ) + + def forward(self, membership: Membership) -> torch.Tensor: + """ + Apply the n-ary relation to the given memberships. + + Args: + membership: The membership values to apply the minimum n-ary relation to. + + Returns: + The minimum membership value, according to the n-ary relation (i.e., which truth values + to actually consider). + """ + raise NotImplementedError( + f"The {self.__class__.__name__} has no defined forward function. Please create a class " + f"and inherit from {self.__class__.__name__}, or use a predefined class." + ) + + +class Compound(torch.nn.Module): + """ + This class represents an n-ary compound relation, where it expects at least 1 or more + instance of NAryRelation. + """ + + def __init__(self, *relations: NAryRelation, **kwargs): + """ + Initialize the compound relation with the given n-ary relation(s). + + Args: + relation: The n-ary compound relation. + """ + super().__init__(**kwargs) + # store the relations as a module list (as they are also modules) + self.relations = torch.nn.ModuleList(relations) + + def forward(self, membership: Membership) -> Membership: + """ + Apply the compound n-ary relation to the given membership values. + + Args: + membership: The membership values to apply the compound n-ary relation to. + + Returns: + The stacked output of the compound n-ary relation; ready for subsequent follow-up. + """ + # apply the compound n-ary relation to the membership values + memberships: List[Membership] = [ + relation(membership=membership) for relation in self.relations + ] + degrees: torch.Tensor = torch.cat( + [membership.degrees for membership in memberships], dim=-1 + ).unsqueeze(dim=-1) + # create a new mask that accounts for the different masks for each relation + mask = torch.stack([relation.mask for relation in self.relations]) + return Membership(elements=membership.elements, degrees=degrees, mask=mask) diff --git a/src/fuzzy/relations/continuous/old_tnorm.py b/src/fuzzy/relations/continuous/old_tnorm.py new file mode 100644 index 0000000..b0d6f21 --- /dev/null +++ b/src/fuzzy/relations/continuous/old_tnorm.py @@ -0,0 +1,28 @@ +""" +Implements the t-norm fuzzy relations. +""" + +from enum import Enum + +from fuzzy.relations.continuous.t_norm import Product, Minimum, SoftmaxSum + + +class TNorm(Enum): + """ + Enumerates the types of t-norms. + """ + + PRODUCT = Product # i.e., algebraic product + MINIMUM = Minimum + ACZEL_ALSINA = "aczel_alsina" # not yet implemented + SOFTMAX_SUM = SoftmaxSum # not yet implemented + SOFTMAX_MEAN = "softmax_mean" + LUKASIEWICZ = "generalized_lukasiewicz" + # the following are to be implemented + DRASTIC = "drastic" + NILPOTENT = "nilpotent" + HAMACHER = "hamacher" + EINSTEIN = "einstein" + YAGER = "yager" + DUBOIS = "dubois" + DIF = "dif" diff --git a/src/fuzzy/relations/continuous/t_norm.py b/src/fuzzy/relations/continuous/t_norm.py new file mode 100644 index 0000000..6b45d26 --- /dev/null +++ b/src/fuzzy/relations/continuous/t_norm.py @@ -0,0 +1,95 @@ +""" +This module contains the implementation of the n-ary t-norm fuzzy relations. These relations +are used to combine multiple membership values into a single value. The minimum and product +relations are implemented here. +""" + +import torch + +from fuzzy.sets.continuous.membership import Membership +from fuzzy.relations.continuous.n_ary import NAryRelation + + +class Minimum(NAryRelation): + """ + This class represents the minimum n-ary fuzzy relation. This is a special case of + the n-ary fuzzy relation where the minimum value is returned. + """ + + def forward(self, membership: Membership) -> Membership: + """ + Apply the minimum n-ary relation to the given memberships. + + Args: + membership: The membership values to apply the minimum n-ary relation to. + + Returns: + The minimum membership, according to the n-ary relation (i.e., which truth values + to actually consider). + """ + # first filter out the values that are not part of the relation + # then take the minimum value of those that remain in the last dimension + return Membership( + elements=membership.elements, + degrees=self.apply_mask(membership=membership) + .min(dim=-2, keepdim=False) + .values, + mask=self.mask, + ) + + +class Product(NAryRelation): + """ + This class represents the algebraic product n-ary fuzzy relation. This is a special case of + the n-ary fuzzy relation where the product value is returned. + """ + + def forward(self, membership: Membership) -> Membership: + """ + Apply the algebraic product n-ary relation to the given memberships. + + Args: + membership: The membership values to apply the algebraic product n-ary relation to. + + Returns: + The algebraic product membership value, according to the n-ary relation + (i.e., which truth values to actually consider). + """ + # first filter out the values that are not part of the relation + # then take the minimum value of those that remain in the last dimension + return Membership( + elements=membership.elements, + degrees=self.apply_mask(membership=membership).prod(dim=-2, keepdim=False), + mask=self.mask, + ) + + +class SoftmaxSum(NAryRelation): + """ + This class represents the softmax sum n-ary fuzzy relation. This is a special case when dealing + with high-dimensional TSK systems, where the softmax sum is used to leverage Gaussians' + defuzzification relationship to the softmax function. + """ + + def forward(self, membership: Membership) -> Membership: + """ + Calculates the fuzzy compound's applicability using the softmax sum inference engine. + This is particularly useful for when dealing with high-dimensional data, and is considered + a traditional variant of TSK fuzzy stems on high-dimensional datasets. + + Args: + membership: The memberships. + + Returns: + The applicability of the fuzzy compounds (e.g., fuzzy logic rules). + """ + # intermediate_values = self.calc_intermediate_input(antecedents_memberships) + # pylint: disable=fixme + # TODO: these dimensions are possibly not correct, need to be fixed/tested + firing_strengths = membership.degrees.sum(dim=1) + max_values, _ = firing_strengths.max(dim=-1, keepdim=True) + return Membership( + elements=membership.elements, + degrees=torch.nn.functional.softmax(firing_strengths - max_values, dim=-1), + mask=self.mask, + ) diff --git a/src/fuzzy/relations/continuous/tnorm.py b/src/fuzzy/relations/continuous/tnorm.py deleted file mode 100644 index d0f92dc..0000000 --- a/src/fuzzy/relations/continuous/tnorm.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -Implements the t-norm fuzzy relations. -""" - -from enum import Enum - -import torch - - -class TNorm(Enum): - """ - Enumerates the types of t-norms. - """ - - PRODUCT = "product" # i.e., algebraic product - MINIMUM = "minimum" - ACZEL_ALSINA = "aczel_alsina" # not yet implemented - SOFTMAX_SUM = "softmax_sum" - SOFTMAX_MEAN = "softmax_mean" - LUKASIEWICZ = "generalized_lukasiewicz" - # the following are to be implemented - DRASTIC = "drastic" - NILPOTENT = "nilpotent" - HAMACHER = "hamacher" - EINSTEIN = "einstein" - YAGER = "yager" - DUBOIS = "dubois" - DIF = "dif" - - -class AlgebraicProduct(torch.nn.Module): - """ - Implementation of the Algebraic Product t-norm (Fuzzy AND). - """ - - def __init__(self, in_features=None, importance=None): - """ - Initialization. - INPUT: - - in_features: shape of the input - - centers: trainable parameter - - sigmas: trainable parameter - importance is initialized to a one vector by default - """ - super().__init__() - self.in_features = in_features - - # initialize antecedent importance - if importance is None: - self.importance = torch.nn.parameter.Parameter(torch.tensor(1.0)) - self.importance.requires_grad = False - else: - if not isinstance(importance, torch.Tensor): - importance = torch.Tensor(importance) - self.importance = torch.nn.parameter.Parameter( - torch.abs(importance) - ) # importance can only be [0, 1] - self.importance.requires_grad = True - - def forward(self, elements): - """ - Forward pass of the function. - Applies the function to the input elementwise. - """ - self.importance = torch.nn.parameter.Parameter( - torch.abs(self.importance) - ) # importance can only be [0, 1] - return torch.prod(torch.mul(elements, self.importance)) diff --git a/src/fuzzy/sets/continuous/group.py b/src/fuzzy/sets/continuous/group.py index b0c5132..8d55a8e 100644 --- a/src/fuzzy/sets/continuous/group.py +++ b/src/fuzzy/sets/continuous/group.py @@ -246,12 +246,8 @@ def expand( self.minimums = minimums self.maximums = maximums else: - self.minimums = torch.min( - minimums, self.minimums - ).detach() - self.maximums = torch.max( - maximums, self.maximums - ).detach() + self.minimums = torch.min(minimums, self.minimums).detach() + self.maximums = torch.max(maximums, self.maximums).detach() # find where the new centers should be added, if any # LogGaussian was used, then use following to check for real membership degrees: diff --git a/src/fuzzy/sets/continuous/impl.py b/src/fuzzy/sets/continuous/impl.py index c244dc6..ae9eaf4 100644 --- a/src/fuzzy/sets/continuous/impl.py +++ b/src/fuzzy/sets/continuous/impl.py @@ -125,7 +125,7 @@ def forward(self, observations) -> Membership: # ), "Infinite values detected in the membership degrees." return Membership( - elements=observations, + elements=observations.squeeze(dim=-1), # remove the last dimension degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, mask=self.get_mask(), ) @@ -204,7 +204,7 @@ def forward(self, observations) -> Membership: # ), "Infinite values detected in the membership degrees." return Membership( - elements=observations, + elements=observations.squeeze(dim=-1), # remove the last dimension degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, mask=self.get_mask(), ) @@ -308,7 +308,7 @@ def forward(self, observations) -> Membership: ), "Infinite values detected in the membership degrees." return Membership( - elements=observations, + elements=observations.squeeze(dim=-1), # remove the last dimension degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, mask=self.get_mask(), ) @@ -441,7 +441,7 @@ def forward(self, observations) -> Membership: ), "Infinite values detected in the membership degrees." return Membership( - elements=observations, + elements=observations.squeeze(dim=-1), # remove the last dimension degrees=degrees.to_sparse() if self.use_sparse_tensor else degrees, mask=self.get_mask(), ) diff --git a/tests/test_relations/test_n_ary.py b/tests/test_relations/test_n_ary.py new file mode 100644 index 0000000..4c0f2ae --- /dev/null +++ b/tests/test_relations/test_n_ary.py @@ -0,0 +1,463 @@ +""" +Test the fuzzy n-ary relations work as expected. +""" + +import unittest +from typing import List, Tuple + +import torch +import igraph +import numpy as np + +from fuzzy.sets.continuous.impl import Gaussian +from fuzzy.sets.continuous.membership import Membership +from fuzzy.sets.continuous.group import GroupedFuzzySets +from fuzzy.sets.continuous.abstract import ContinuousFuzzySet +from fuzzy.relations.continuous.t_norm import Minimum, Product +from fuzzy.relations.continuous.n_ary import NAryRelation, Compound + +N_TERMS: int = 2 +N_VARIABLES: int = 4 +N_OBSERVATIONS: int = 3 +N_COMPOUNDS: int = 5 +AVAILABLE_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +class TestNAryRelation(unittest.TestCase): + """ + Test the abstract n-ary relation, including functionality that is common to all n-ary relations. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.gaussian_mf = Gaussian( + centers=np.array([[i, i + 1] for i in range(N_VARIABLES)]), + widths=np.array([[(i + 1) / 2, (i + 1) / 3] for i in range(N_VARIABLES)]), + device=AVAILABLE_DEVICE, + ) + self.data: np.ndarray = np.array( + [ + [0.0412, 0.4543, 0.1980, 0.3821], + [0.9327, 0.5900, 0.1569, 0.6902], + [0.0894, 0.9433, 0.9903, 0.5800], + ] + ) + + def test_gaussian_membership(self) -> Membership: + """ + Although this test is not directly related to the NAryRelation class, and is possibly + redundant due to Gaussian's unit testing, it is used to double-check that the Gaussian + membership function is working as intended as these unit tests rely on correct values + from the Gaussian membership function to work. + + Returns: + The membership values for the Gaussian membership function. + """ + membership: Membership = self.gaussian_mf( + torch.tensor(self.data, dtype=torch.float32, device=AVAILABLE_DEVICE) + ) + + self.assertEqual(membership.degrees.shape[0], N_OBSERVATIONS) + self.assertEqual(membership.degrees.shape[1], N_VARIABLES) + self.assertEqual(membership.degrees.shape[2], N_TERMS) + + # check that the membership is correct + expected_membership_degrees: torch.Tensor = torch.tensor( + [ + [ + [9.9323326e-01, 2.5514542e-04], + [7.4245834e-01, 4.6277978e-03], + [2.3617040e-01, 3.8928282e-04], + [1.8026091e-01, 6.3449936e-04], + ], + [ + [3.0816132e-02, 9.6005607e-01], + [8.4526926e-01, 1.1410457e-02], + [2.2095737e-01, 3.0867624e-04], + [2.6347569e-01, 2.1079029e-03], + ], + [ + [9.6853620e-01, 5.7408627e-04], + [9.9679035e-01, 8.1074789e-02], + [6.3564914e-01, 1.7616944e-02], + [2.3128603e-01, 1.3889252e-03], + ], + ], + device=AVAILABLE_DEVICE, + ) + self.assertTrue(torch.allclose(membership.degrees, expected_membership_degrees)) + return membership + + def test_n_ary_relation(self) -> None: + """ + Test the abstract n-ary relation. + + Returns: + None + """ + n_ary = NAryRelation((0, 1), (1, 0), device=AVAILABLE_DEVICE) + # the forward pass should not be implemented + self.assertRaises(NotImplementedError, n_ary.forward, None) + # check that the matrix shape is correct + self.assertEqual(n_ary._coo_matrix[0].shape, (2, 2)) + # check that the original shape is stored + self.assertEqual(n_ary._original_shape[0], (2, 2)) + # matrix size can increase (in-place) for more potential rows (vars) and columns (terms) + n_ary._coo_matrix[0].resize(3, 3) + self.assertEqual(n_ary._coo_matrix[0].shape, (3, 3)) + # check that the original shape is still kept after resizing + self.assertEqual(n_ary._original_shape[0], (2, 2)) + + def test_duplicates(self) -> None: + """ + Test that the NAryRelation class throws an error when given duplicate indices. Otherwise, a + duplicate index will result in a value greater than 1 in the mask, which is not allowed. + + Returns: + None + """ + self.assertRaises( + ValueError, + NAryRelation, + (0, 1), + (1, 0), + (1, 0), + device=AVAILABLE_DEVICE, + ) + + def test_graph(self) -> None: + """ + Test that a graph representation of the relation can be created. + + Returns: + None + """ + indices: List[Tuple[int, int]] = [(0, 1), (1, 0)] + single_n_ary = NAryRelation(*indices, device=AVAILABLE_DEVICE) + single_n_ary_graph: igraph.Graph = single_n_ary.graph + self.assertTrue(single_n_ary_graph is not None) + self.assertEqual( + single_n_ary_graph.vcount(), 3 + ) # 2 index pairs + 1 for relation + self.assertEqual(single_n_ary_graph.ecount(), 2) # 2 edges (relations) + + # check vertex attributes are as we expect + self.assertEqual(single_n_ary_graph.vs[0]["tags"], {"relation"}) + for index in (1, 2): + self.assertEqual(single_n_ary_graph.vs[index]["tags"], {"anchor"}) + self.assertEqual(single_n_ary_graph.vs[index]["item"], indices[index - 1]) + + # check edges are as we expect + for index in (0, 1): + self.assertEqual(single_n_ary_graph.es[index].source, index + 1) + self.assertEqual(single_n_ary_graph.es[index].target, 0) + + indices: List[List[Tuple[int, int]]] = [[(0, 1), (1, 0)], [(1, 1), (2, 1)]] + multiple_n_ary = NAryRelation(*indices, device=AVAILABLE_DEVICE) + multiple_n_ary_graph: igraph.Graph = multiple_n_ary.graph + self.assertTrue(multiple_n_ary_graph is not None) + self.assertEqual( + multiple_n_ary_graph.vcount(), 6 + ) # 4 index pairs + 2 for relations + self.assertEqual(multiple_n_ary_graph.ecount(), 4) # 4 edges (relations) + + # check vertex attributes are as we expect + relation_vertices: igraph.VertexSeq = multiple_n_ary_graph.vs.select( + tags_eq={"relation"} + ) + self.assertEqual(len(relation_vertices), 2) + relation_index: int = 0 + for relation_vertex in relation_vertices: + self.assertEqual(relation_vertex["tags"], {"relation"}) + predecessors: List[igraph.Vertex] = relation_vertex.predecessors() + for predecessor in predecessors: + self.assertEqual(predecessor["tags"], {"anchor"}) + # below does not work consistently + # self.assertEqual(predecessor["item"], indices[relation_index][index]) + relation_index += 1 + + # check that relations involving the same index references share the same vertex + + multiple_n_ary = NAryRelation( + [(0, 1), (1, 0)], [(1, 1), (0, 1)], device=AVAILABLE_DEVICE + ) + multiple_n_ary_graph_with_uniques: igraph.Graph = multiple_n_ary.graph + self.assertTrue(multiple_n_ary_graph_with_uniques is not None) + self.assertEqual( + multiple_n_ary_graph_with_uniques.vcount(), 5 + ) # 3 unique index pairs + 2 for relations + self.assertEqual( + multiple_n_ary_graph_with_uniques.ecount(), 4 # 4 edges (relations) + ) + + +class TestProduct(TestNAryRelation): + """ + Test the Product n-ary relation. + """ + + def test_algebraic_product(self) -> None: + """ + Test the n-ary product operation given a single relation. + + Returns: + + """ + n_ary = Product((0, 1), (1, 0), device=AVAILABLE_DEVICE) + membership = self.test_gaussian_membership() + + # test the mask application + after_mask = n_ary.apply_mask(membership=membership) + expected_after_mask = torch.tensor( + [ + [[2.5514542e-04], [7.4245834e-01], [1.0000000e00], [1.0000000e00]], + [[9.6005607e-01], [8.4526926e-01], [1.0000000e00], [1.0000000e00]], + [[5.7408627e-04], [9.9679035e-01], [1.0000000e00], [1.0000000e00]], + ], + device=AVAILABLE_DEVICE, + ) + self.assertTrue(torch.allclose(after_mask, expected_after_mask)) + + # test the forward pass + prod_membership: Membership = n_ary.forward(membership) + expected_prod_values = torch.tensor( + [ + [7.4245834e-01 * 2.5514542e-04], + [8.4526926e-01 * 9.6005607e-01], + [9.9679035e-01 * 5.7408627e-04], + ], + device=AVAILABLE_DEVICE, + ) + self.assertTrue(torch.allclose(prod_membership.degrees, expected_prod_values)) + + # check that it is torch.jit scriptable (currently not working) + # n_ary_script = torch.jit.script(n_ary) + # + # after_mask_script = n_ary_script.apply_mask(membership=membership) + # self.assertTrue(torch.allclose(after_mask_script, expected_after_mask)) + # + # min_values_script = n_ary_script.forward(membership) + # self.assertTrue(torch.allclose(min_values_script, expected_min_values)) + + def test_multiple_indices_passed_as_list(self) -> None: + """ + Test the Product operation given multiple relations, where some variables are never used + by those relations. This is a test to ensure that the Product operation can handle + relations that do not use all variables (i.e., does not wrongly output zeros). + + Returns: + None + """ + n_ary = Product( + [(0, 1), (1, 0)], + [(1, 1), (2, 1)], + [(2, 1), (2, 0)], + [(0, 1), (2, 0)], + [(1, 1), (0, 1)], + device=AVAILABLE_DEVICE, + ) + membership = self.test_gaussian_membership() + prod_membership: Membership = n_ary(membership) + expected_prod_values = torch.tensor( + [ + [ + membership.degrees[0][0][1].item() + * membership.degrees[0][1][0].item(), + membership.degrees[0][1][1].item() + * membership.degrees[0][2][1].item(), + membership.degrees[0][2][1].item() + * membership.degrees[0][2][0].item(), + membership.degrees[0][0][1].item() + * membership.degrees[0][2][0].item(), + membership.degrees[0][1][1].item() + * membership.degrees[0][0][1].item(), + ], + [ + membership.degrees[1][0][1].item() + * membership.degrees[1][1][0].item(), + membership.degrees[1][1][1].item() + * membership.degrees[1][2][1].item(), + membership.degrees[1][2][1].item() + * membership.degrees[1][2][0].item(), + membership.degrees[1][0][1].item() + * membership.degrees[1][2][0].item(), + membership.degrees[1][1][1].item() + * membership.degrees[1][0][1].item(), + ], + [ + membership.degrees[2][0][1].item() + * membership.degrees[2][1][0].item(), + membership.degrees[2][1][1].item() + * membership.degrees[2][2][1].item(), + membership.degrees[2][2][1].item() + * membership.degrees[2][2][0].item(), + membership.degrees[2][0][1].item() + * membership.degrees[2][2][0].item(), + membership.degrees[2][1][1].item() + * membership.degrees[2][0][1].item(), + ], + ], + device=AVAILABLE_DEVICE, + ) + self.assertEqual(prod_membership.degrees.shape[0], N_OBSERVATIONS) + self.assertEqual(prod_membership.degrees.shape[1], N_COMPOUNDS) + self.assertEqual(prod_membership.degrees.shape, expected_prod_values.shape) + self.assertTrue(torch.allclose(prod_membership.degrees, expected_prod_values)) + + +class TestMinimum(TestNAryRelation): + """ + Test the Minimum n-ary relation. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.hypercube = GroupedFuzzySets( + modules_list=[ + ContinuousFuzzySet.stack( + [ + Gaussian( + centers=np.array([-1, 0.0, 1.0]), + widths=np.array([1.0, 1.0, 1.0]), + device=AVAILABLE_DEVICE, + ), + Gaussian( + centers=np.array([-1.0, 0.0, 1.0]), + widths=np.array([1.0, 1.0, 1.0]), + device=AVAILABLE_DEVICE, + ), + ] + ) + ] + ) + + def test_minimum(self) -> None: + """ + Test the n-ary minimum operation given a single relation. + + Returns: + None + """ + n_ary = Minimum((0, 1), (1, 0), device=AVAILABLE_DEVICE) + membership = self.test_gaussian_membership() + + # test the mask application + after_mask = n_ary.apply_mask(membership=membership) + expected_after_mask = torch.tensor( + [ + [[2.5514542e-04], [7.4245834e-01], [1.0000000e00], [1.0000000e00]], + [[9.6005607e-01], [8.4526926e-01], [1.0000000e00], [1.0000000e00]], + [[5.7408627e-04], [9.9679035e-01], [1.0000000e00], [1.0000000e00]], + ], + device=AVAILABLE_DEVICE, + ) + self.assertTrue(torch.allclose(after_mask, expected_after_mask)) + + # test the forward pass + min_membership: Membership = n_ary.forward(membership) + expected_min_values = torch.tensor( + [[2.5514542e-04], [8.4526926e-01], [5.7408627e-04]], + device=AVAILABLE_DEVICE, + ) + self.assertTrue(torch.allclose(min_membership.degrees, expected_min_values)) + + # check that it is torch.jit scriptable (currently not working) + # n_ary_script = torch.jit.script(n_ary) + # + # after_mask_script = n_ary_script.apply_mask(membership=membership) + # self.assertTrue(torch.allclose(after_mask_script, expected_after_mask)) + # + # min_values_script = n_ary_script.forward(membership) + # self.assertTrue(torch.allclose(min_values_script, expected_min_values)) + + def test_multiple_indices_passed_as_list(self) -> None: + """ + Test the Minimum operation given multiple relations, where some variables are never used + by those relations. This is a test to ensure that the Minimum operation can handle + relations that do not use all variables (i.e., does not wrongly output zeros). + + Returns: + None + """ + data = torch.tensor( + [ + [1.5409961, -0.2934289], + [-2.1787894, 0.56843126], + [-1.0845224, -1.3985955], + [0.40334684, 0.83802634], + ], + device=AVAILABLE_DEVICE, + ) + minimum = Minimum( + [(0, 0), (1, 0)], + [(0, 0), (1, 1)], + [(0, 1), (1, 0)], + [(0, 1), (1, 1)], + [(0, 1), (1, 2)], + device=AVAILABLE_DEVICE, + ) + + membership: Membership = self.hypercube(data) + min_membership: Membership = minimum(membership) + expected_degrees = torch.tensor( + [ + [0.00157003, 0.00157003, 0.09304529, 0.09304529, 0.09304529], + [ + 8.5436940e-02, + 2.4918883e-01, + 8.6766202e-03, + 8.6766e-03, + 8.6766202e-03, + ], + [0.8531001, 0.14141318, 0.3084521, 0.14141318, 0.00317242], + [0.034104, 0.13954304, 0.034104, 0.49545035, 0.8498557], + ], + device=AVAILABLE_DEVICE, + ) + + self.assertTrue(torch.allclose(min_membership.degrees, expected_degrees)) + + +class TestCompound(TestNAryRelation): + """ + Test the Compound n-ary relation, which allows the user to compound/aggregate multiple n-ary + relations together. + """ + + def test_combination_of_t_norms(self) -> None: + """ + Test we can create a combination of t-norms to reflect more complex compound propositions. + + Returns: + None + """ + n_ary_min = Minimum((0, 1), (1, 0), device=AVAILABLE_DEVICE) + n_ary_prod = Product((0, 1), (1, 0), device=AVAILABLE_DEVICE) + membership = self.test_gaussian_membership() + + t_norm = Compound(n_ary_min, n_ary_prod) + compound_values = t_norm(membership=membership) + expected_compound_values = torch.cat( + [ + n_ary_min(membership=membership).degrees, + n_ary_prod(membership=membership).degrees, + ], + dim=-1, + ).unsqueeze(dim=-1) + self.assertTrue( + torch.allclose(compound_values.degrees, expected_compound_values) + ) + + # we can then follow it up with another t-norm + + n_ary_next_min = Minimum((0, 1), (1, 0), device=AVAILABLE_DEVICE) + min_membership: Membership = n_ary_next_min(compound_values) + expected_min_values = torch.tensor( + [ + [7.4245834e-01 * 2.5514542e-04], + [8.4526926e-01 * 9.6005607e-01], + [9.9679035e-01 * 5.7408627e-04], + ], + device=AVAILABLE_DEVICE, + ) + self.assertTrue(torch.allclose(min_membership.degrees, expected_min_values)) diff --git a/tests/test_relations/test_tnorms.py b/tests/test_relations/test_tnorms.py deleted file mode 100644 index d762b0e..0000000 --- a/tests/test_relations/test_tnorms.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Test various t-norm operations, such as the algebraic product. -""" - -import unittest - -import torch -import numpy as np - -from fuzzy.relations.continuous.tnorm import AlgebraicProduct - - -def algebraic_product(elements: np.ndarray, importance: np.ndarray) -> np.float32: - """ - Numpy calculation of the algebraic product. - - Args: - elements: The elements to be multiplied. - importance: The importance of each element. - - Returns: - The algebraic product of the given elements. - """ - return np.prod(elements * importance) - - -class TestAlgebraicProduct(unittest.TestCase): - """ - Test the algebraic product operation. - """ - - def test_single_input(self) -> None: - """ - The t-norm of a single input (w/o importance) should be == input. - """ - element = torch.rand(1) - n_inputs = 1 - tnorm = AlgebraicProduct(n_inputs) - importance_before_calculation = tnorm.importance - mu_pytorch = tnorm(element) - mu_numpy = algebraic_product( - element.cpu().detach().numpy(), - importance_before_calculation.cpu().detach().numpy(), - ) - - # make sure the parameters are still identical afterward - assert torch.isclose(tnorm.importance, importance_before_calculation).all() - # the outputs of the PyTorch and Numpy versions should be approx. equal - assert np.isclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-8).all() - - def test_multi_input(self) -> None: - """ - Test that the algebraic product is correctly calculated when multiple inputs are given. - - Returns: - None - """ - elements = torch.rand(4) - n_inputs = len(elements) - tnorm = AlgebraicProduct(n_inputs) - importance_before_calculation = tnorm.importance - mu_pytorch = tnorm(elements) - mu_numpy = algebraic_product( - elements.cpu().detach().numpy(), - importance_before_calculation.cpu().detach().numpy(), - ) - - # make sure the parameters are still identical afterward - assert torch.isclose(tnorm.importance, importance_before_calculation).all() - # the outputs of the PyTorch and Numpy versions should be approx. equal - assert np.isclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-8).all() - - def test_multi_input_with_importance_given(self) -> None: - """ - Test that the algebraic product is correctly calculated when multiple inputs (and their - varying degrees of importance) are given. - - Returns: - None - """ - elements = torch.rand(5) - n_inputs = len(elements) - importance_before_calculation = torch.tensor([0.0, 0.25, 0.5, 0.75, 1.0]) - tnorm = AlgebraicProduct(n_inputs, importance=importance_before_calculation) - mu_pytorch = tnorm(elements) - mu_numpy = algebraic_product( - elements.cpu().detach().numpy(), - importance_before_calculation.cpu().detach().numpy(), - ) - - # make sure the parameters are still identical afterward - assert torch.isclose(tnorm.importance, importance_before_calculation).all() - # the outputs of the PyTorch and Numpy versions should be approx. equal - assert np.isclose(mu_pytorch.cpu().detach().numpy(), mu_numpy, rtol=1e-8).all()