From 17887a7420f2731d533b978120825467855ad6ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Kocha=C5=84czyk?= Date: Thu, 16 Aug 2018 16:08:11 +0200 Subject: [PATCH] Add 3-dim tests, use relative tolerance --- Readme.rst | 23 +++++-------- docs/Makefile | 2 +- docs/source/cce.rst | 18 +++++----- requirements.txt | 2 +- tests/test_use_case_1.py | 57 ++++++++++++++++++++++++------- tests/test_use_case_2.py | 19 ++++++----- tests/test_use_case_3.py | 74 ++++++++++++++++++++++++++++------------ 7 files changed, 126 insertions(+), 69 deletions(-) diff --git a/Readme.rst b/Readme.rst index 8b55d61..925a26c 100644 --- a/Readme.rst +++ b/Readme.rst @@ -3,8 +3,8 @@ Channel Capacity Estimator ========================== Channel Capacity Estimator (**cce**) is a python module to estimate -`information capacity`_ of a communication channel. Mutual -information, computed as proposed by Kraskov *et al.* [Crossref_, arXiv_], +`information capacity`_ of a communication channel. Mutual information, +computed as proposed by `Kraskov et al.` (*Physical Review E*, 2004) Eq. (8), is maximized over input probabilities by means of a constrained gradient-based stochastic optimization. The only parameter of the Kraskov algorithm is the number of neighbors, *k*, used in the nearest neighbor @@ -17,17 +17,14 @@ as implemented in TensorFlow_. Thus, to use **cce**, you should have TensorFlow (with python bindings) installed on your system. See file requirements.txt for a complete list of dependencies. -Module **cce** features the research article "Limits to channel information -capacity for a MAPK pathway in response to pulsatile EGF stimulation" by -Grabowski *et al.*, submitted to *PLOS Computational Biology* in 2018. Version -1.0 of the code has been included as supplementary data of this article. -A prototypical implementation of the approach implemented in **cce** has -appeared earlier, supplementing the article by Tudelska *et al.* -(`*Scientific Reports*, 2017`_). +Module **cce** features the research article "Limits to the rate of +information transmission through MAPK pathway" by Grabowski *et al.*, +submitted to *PLOS Computational Biology* in 2018. Release 0.4 of the +code has been included as supplementary data of this article. -For any updates and fixes to **cce**, visit project homepage:` +For any updates and fixes to **cce**, please visit project homepage: http://pmbm.ippt.pan.pl/software/cce -(which currently directs to a GitHub repository: +(this permalink currently directs to a GitHub repository: https://github.com/pawel-czyz/channel-capacity-estimator). @@ -161,9 +158,7 @@ This software is distributed under `GNU GPL 3.0 license`_. .. _information capacity: https://en.wikipedia.org/wiki/Channel_capacity -.. _arXiv: https://arxiv.org/pdf/cond-mat/0305641.pdf -.. _Crossref: https://doi.org/10.1103/PhysRevE.69.066138 -.. _*Scientific Reports*, 2017: https://doi.org/10.1038/s41598-017-16166-y +.. _Kraskov et al.: https://doi.org/10.1103/PhysRevE.69.066138 .. _TensorFlow: https://www.tensorflow.org .. _Frederic Grabowski: https://github.com/grfrederic .. _Paweł Czyż: https://github.com/pawel-czyz diff --git a/docs/Makefile b/docs/Makefile index 1538168..f642b30 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -17,4 +17,4 @@ help: # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). %: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/source/cce.rst b/docs/source/cce.rst index ba14fc1..777d6d5 100644 --- a/docs/source/cce.rst +++ b/docs/source/cce.rst @@ -4,18 +4,18 @@ cce package Submodules ---------- -cce\.calculate\_weighted\_loss module -------------------------------------- +cce\.score module +----------------- -.. automodule:: cce.calculate_weighted_loss +.. automodule:: cce.score :members: :undoc-members: :show-inheritance: -cce\.dataeng module -------------------- +cce\.preprocess module +---------------------- -.. automodule:: cce.dataeng +.. automodule:: cce.preprocess :members: :undoc-members: :show-inheritance: @@ -28,10 +28,10 @@ cce\.estimator module :undoc-members: :show-inheritance: -cce\.optimize\_weights module ------------------------------ +cce\.optimize module +-------------------- -.. automodule:: cce.optimize_weights +.. automodule:: cce.optimize :members: :undoc-members: :show-inheritance: diff --git a/requirements.txt b/requirements.txt index 147311e..e6df28f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ numpy >= 1.12.0 scipy >= 1.0.0 -tensorflow +tensorflow >= 1.4.0 diff --git a/tests/test_use_case_1.py b/tests/test_use_case_1.py index 7ef4d92..114bbf3 100644 --- a/tests/test_use_case_1.py +++ b/tests/test_use_case_1.py @@ -1,11 +1,14 @@ import unittest -import numpy as np +from itertools import product +from numpy import cos, sin, log2 +from scipy.stats import multivariate_normal as multinorm from cce.estimator import WeightedKraskovEstimator as wke from tests.plain_kraskov import calculate_mi as plain_calculate_mi from tests.noisy_channel import communicate -NN_K = 30 -DELTA = 0.01 +NN_K = 15 +ATOL = 0.02 +RTOL = 0.04 class TestUseCase1(unittest.TestCase): """Tests of mutual information (MI) estimation for input distributions @@ -17,8 +20,8 @@ def test_equal_sizes_trivial_separate(self): mi_est1 = wke(data).calculate_mi(k=NN_K) mi_est2 = plain_calculate_mi(data=data, k=NN_K) mi_exact = 1.0 - self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA) - self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA) + self.assertAlmostEqual((mi_est1 - mi_exact)/mi_exact, 0, delta=RTOL) + self.assertAlmostEqual((mi_est1 - mi_est2)/mi_est2, 0, delta=RTOL) def test_equal_sizes_trivial_overlap(self): data = communicate({'0': 10000, '1': 10000}, @@ -26,26 +29,54 @@ def test_equal_sizes_trivial_overlap(self): mi_est1 = wke(data).calculate_mi(k=NN_K) mi_est2 = plain_calculate_mi(data=data, k=NN_K) mi_exact = 0.0 - self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA) - self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA) + self.assertAlmostEqual(mi_est1, mi_exact, delta=ATOL) + self.assertAlmostEqual(mi_est1, mi_est2, delta=ATOL) def test_unequal_sizes(self): data = communicate({'0': 5000, '1': 10000}, {'0': 0.0, '1': 1.0}) mi_est1 = wke(data).calculate_mi(k=NN_K) mi_est2 = plain_calculate_mi(data=data, k=NN_K) - mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3) - self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA) - self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA) + mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3) + self.assertAlmostEqual((mi_est1 - mi_exact)/mi_exact, 0, delta=RTOL) + self.assertAlmostEqual((mi_est1 - mi_est2)/mi_exact, 0, delta=RTOL) def test_equal_sizes_partial_overlap(self): data = communicate({'0': 5000, '1': 5000, '2': 5000}, {'0': 0.0, '1': 0.0, '2': 1.0}) mi_est1 = wke(data).calculate_mi(k=NN_K) mi_est2 = plain_calculate_mi(data=data, k=NN_K) - mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3) - self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA) - self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA) + mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3) + self.assertAlmostEqual((mi_est1 - mi_exact)/mi_exact, 0, delta=RTOL) + self.assertAlmostEqual((mi_est1 - mi_est2)/mi_est2, 0, delta=RTOL) + + def test_8gaussians3d_snake(self): + data = [] + for dist_i, count in enumerate(8*[5000]): + mu, sigma = dist_i+1, 33 + 3*dist_i + means3 = [mu**2.5, 50*cos(mu/0.75), 200*sin(mu/1.5)] + covar3 = [[sigma**2, 0, 0], + [0, sigma**2, 0], + [0, 0, sigma**2]] + for v in multinorm(means3, covar3).rvs(count): + data.append((dist_i, v)) + mi_est = wke(data).calculate_mi(k=NN_K) + mi_accur = 1.85959 # calculated in Mathematica for continuous distributions + self.assertAlmostEqual((mi_est - mi_accur)/mi_accur, 0, delta=RTOL) + + def test_8gaussians3d_box(self): + box = list(product(range(2),repeat=3)) + data = [] + for i, count in enumerate(8*[5000]): + means3, sigma = (box[i], 0.25 + ((i + 1)/8 + sum(box[i]))/10) + covar3 = [[sigma**2, 0, 0], + [0, sigma**2, 0], + [0, 0, sigma**2]] + for v in multinorm(means3, covar3).rvs(count): + data.append((i, v)) + mi_est = wke(data).calculate_mi(k=NN_K) + mi_accur = 1.72251 # calculated in Mathematica for continuous distributions + self.assertAlmostEqual((mi_est - mi_accur)/mi_accur, 0, delta=RTOL) if __name__ == '__main__': diff --git a/tests/test_use_case_2.py b/tests/test_use_case_2.py index 288c84e..331ca55 100644 --- a/tests/test_use_case_2.py +++ b/tests/test_use_case_2.py @@ -1,10 +1,11 @@ import unittest -import numpy as np +from numpy import log2 from cce.estimator import WeightedKraskovEstimator as wke from tests.noisy_channel import communicate -NN_K = 30 -DELTA = 0.01 +NN_K = 15 +ATOL = 0.02 +RTOL = 0.04 class TestUseCase2(unittest.TestCase): """Tests of mutual information (MI) estimation for input distributions @@ -15,8 +16,8 @@ def test_equal_sizes(self): output_values={'A': 0.0, 'B': 1.0}) weights = {'A': 1/3, 'B': 2/3} mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K) - mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3) - self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA) + mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3) + self.assertAlmostEqual((mi_est - mi_exact)/mi_exact, 0, delta=RTOL) def test_unequal_sizes(self): data = communicate({'A': 2500, 'B': 5000}, @@ -24,7 +25,7 @@ def test_unequal_sizes(self): weights = {'A': 1/2, 'B': 1/2} mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K) mi_exact = 1.0 - self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA) + self.assertAlmostEqual((mi_est - mi_exact)/mi_exact, 0, delta=RTOL) def test_unequal_sizes_trivial_overlap(self): data = communicate({'A': 2500, 'B': 5000}, @@ -32,15 +33,15 @@ def test_unequal_sizes_trivial_overlap(self): weights = {'A': 1/2, 'B': 1/2} mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K) mi_exact = 0.0 - self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA) + self.assertAlmostEqual(mi_est, mi_exact, delta=ATOL) def test_output_overlap(self): data = communicate({'A': 2500, 'B': 2500, 'C': 2500}, {'A': 0, 'B': 0, 'C': 1}) weights = {'A': 1/3, 'B': 1/3, 'C': 1/3} mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K) - mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3) - self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA) + mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3) + self.assertAlmostEqual((mi_est - mi_exact)/mi_exact, 0, delta=RTOL) if __name__ == '__main__': diff --git a/tests/test_use_case_3.py b/tests/test_use_case_3.py index bf2433f..e6f7c42 100644 --- a/tests/test_use_case_3.py +++ b/tests/test_use_case_3.py @@ -1,11 +1,14 @@ import unittest -import numpy as np +from itertools import product +from numpy import cos, sin, log2 +from scipy.stats import multivariate_normal as multinorm from scipy.stats.distributions import norm from cce.estimator import WeightedKraskovEstimator as wke from tests.noisy_channel import communicate -NN_K = 30 -DELTA = 0.01 +NN_K = 15 +ATOL = 0.02 +RTOL = 0.04 class TestUseCase3(unittest.TestCase): """Test of maximum mutual information (max MI, capacity) estimation @@ -14,28 +17,28 @@ class TestUseCase3(unittest.TestCase): def test_equal_sizes(self): data = communicate(input_counts={'A': 5000, 'B': 5000}, output_values={'A': 0.0, 'B': 1.0}) - capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) - capacity_exact = 1.0 - self.assertAlmostEqual(capacity_est, capacity_exact, delta=DELTA) - self.assertAlmostEqual(opt_ws['A'], 0.5, delta=DELTA) - self.assertAlmostEqual(opt_ws['B'], 0.5, delta=DELTA) + cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) + cap_exact = 1.0 + self.assertAlmostEqual((cap_est - cap_exact)/cap_exact, 0, delta=RTOL) + self.assertAlmostEqual(opt_ws['A'], 0.5, delta=ATOL) + self.assertAlmostEqual(opt_ws['B'], 0.5, delta=ATOL) def test_unequal_sizes(self): data = communicate({'A': 2500, 'B': 5000}, {'A': 0.0, 'B': 1.0}) - capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) - capacity_exact = 1.0 - self.assertAlmostEqual(capacity_est, capacity_exact, delta=DELTA) - self.assertAlmostEqual(opt_ws['A'], 0.5, delta=DELTA) - self.assertAlmostEqual(opt_ws['B'], 0.5, delta=DELTA) + cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) + cap_exact = 1.0 + self.assertAlmostEqual((cap_est - cap_exact)/cap_exact, 0, delta=RTOL) + self.assertAlmostEqual(opt_ws['A'], 0.5, delta=ATOL) + self.assertAlmostEqual(opt_ws['B'], 0.5, delta=ATOL) def test_equal_sizes_partial_overlap(self): data = communicate({'A': 2500, 'B': 2500, 'C': 2500}, {'A': 0, 'B': 0, 'C': 1}) - capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) - capacity_exact = 1.0 - self.assertAlmostEqual(capacity_est, capacity_exact, delta=0.01) - self.assertAlmostEqual(opt_ws['C'], 0.5, delta=DELTA) + cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) + cap_exact = 1.0 + self.assertAlmostEqual((cap_est - cap_exact)/cap_exact, 0, delta=RTOL) + self.assertAlmostEqual(opt_ws['C'], 0.5, delta=ATOL) def test_eight_gaussians(self): """This test case is patterned after a test included in the source code @@ -46,17 +49,44 @@ def _mu_sigma(i): return (1.5*(i + 1), (i + 1)/4. + 1) data = [] - for dist_i, count in enumerate(8*[2500]): + for dist_i, count in enumerate(8*[5000]): data.extend([(dist_i, [values]) for values in norm(*_mu_sigma(dist_i)).rvs(count)]) - capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) + cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K) largest_2_opt_ws_indices = list(map(lambda kv: kv[0], sorted(opt_ws.items(), key=lambda kv: kv[1], reverse=True)[0:2])) - capacity_accurate, largest_2_opt_ws_indices_accurate = 1.14425, [0,7] - self.assertAlmostEqual(capacity_est, capacity_accurate, delta=DELTA) - self.assertEqual(largest_2_opt_ws_indices, largest_2_opt_ws_indices_accurate) + cap_accur, largest_2_opt_ws_indices_accur = 1.14425, [0,7] + self.assertAlmostEqual((cap_est - cap_accur)/cap_accur, 0, delta=RTOL) + self.assertEqual(largest_2_opt_ws_indices, largest_2_opt_ws_indices_accur) + def test_8gaussians3d_snake(self): + data = [] + for dist_i, count in enumerate(8*[5000]): + mu, sigma = dist_i + 1, 33 + 3*dist_i + means3 = [mu**2.5, 50*cos(mu/0.75), 200*sin(mu/1.5)] + covar3 = [[sigma**2, 0, 0], + [0, sigma**2, 0], + [0, 0, sigma**2]] + for v in multinorm(means3, covar3).rvs(count): + data.append((dist_i, v)) + cap_est = wke(data).calculate_maximized_mi(k=NN_K) + cap_accur = 1.91191 # calculated in Mathematica for continuous distributions + self.assertAlmostEqual((cap_est[0] - cap_accur)/cap_accur, 0, delta=RTOL) + + def test_8gaussians3d_box(self): + corners = list(product(range(2),repeat=3)) + data = [] + for i, count in enumerate(8*[5000]): + means3, sigma = (corners[i], 0.25 + ((i + 1)/8 + sum(corners[i]))/10) + covar3 = [[sigma**2, 0, 0], + [0, sigma**2, 0], + [0, 0, sigma**2]] + for v in multinorm(means3, covar3).rvs(count): + data.append((i, v)) + cap_est = wke(data).calculate_maximized_mi(k=NN_K) + cap_accur = 1.8035 # calculated in Mathematica for continuous distributions + self.assertAlmostEqual((cap_est[0] - cap_accur)/cap_accur, 0, delta=RTOL) if __name__ == '__main__': unittest.main()