Skip to content

Commit

Permalink
Add 3-dim tests, use relative tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
kochanczyk committed Aug 16, 2018
1 parent 44926a1 commit 17887a7
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 69 deletions.
23 changes: 9 additions & 14 deletions Readme.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Channel Capacity Estimator
==========================

Channel Capacity Estimator (**cce**) is a python module to estimate
`information capacity`_ of a communication channel. Mutual
information, computed as proposed by Kraskov *et al.* [Crossref_, arXiv_],
`information capacity`_ of a communication channel. Mutual information,
computed as proposed by `Kraskov et al.` (*Physical Review E*, 2004)
Eq. (8), is maximized over input probabilities by means of a constrained
gradient-based stochastic optimization. The only parameter of the Kraskov
algorithm is the number of neighbors, *k*, used in the nearest neighbor
Expand All @@ -17,17 +17,14 @@ as implemented in TensorFlow_. Thus, to use **cce**, you should have
TensorFlow (with python bindings) installed on your system. See file
requirements.txt for a complete list of dependencies.

Module **cce** features the research article "Limits to channel information
capacity for a MAPK pathway in response to pulsatile EGF stimulation" by
Grabowski *et al.*, submitted to *PLOS Computational Biology* in 2018. Version
1.0 of the code has been included as supplementary data of this article.
A prototypical implementation of the approach implemented in **cce** has
appeared earlier, supplementing the article by Tudelska *et al.*
(`*Scientific Reports*, 2017`_).
Module **cce** features the research article "Limits to the rate of
information transmission through MAPK pathway" by Grabowski *et al.*,
submitted to *PLOS Computational Biology* in 2018. Release 0.4 of the
code has been included as supplementary data of this article.

For any updates and fixes to **cce**, visit project homepage:`
For any updates and fixes to **cce**, please visit project homepage:
http://pmbm.ippt.pan.pl/software/cce
(which currently directs to a GitHub repository:
(this permalink currently directs to a GitHub repository:
https://github.com/pawel-czyz/channel-capacity-estimator).


Expand Down Expand Up @@ -161,9 +158,7 @@ This software is distributed under `GNU GPL 3.0 license`_.


.. _information capacity: https://en.wikipedia.org/wiki/Channel_capacity
.. _arXiv: https://arxiv.org/pdf/cond-mat/0305641.pdf
.. _Crossref: https://doi.org/10.1103/PhysRevE.69.066138
.. _*Scientific Reports*, 2017: https://doi.org/10.1038/s41598-017-16166-y
.. _Kraskov et al.: https://doi.org/10.1103/PhysRevE.69.066138
.. _TensorFlow: https://www.tensorflow.org
.. _Frederic Grabowski: https://github.com/grfrederic
.. _Paweł Czyż: https://github.com/pawel-czyz
Expand Down
2 changes: 1 addition & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ help:
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
18 changes: 9 additions & 9 deletions docs/source/cce.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ cce package
Submodules
----------

cce\.calculate\_weighted\_loss module
-------------------------------------
cce\.score module
-----------------

.. automodule:: cce.calculate_weighted_loss
.. automodule:: cce.score
:members:
:undoc-members:
:show-inheritance:

cce\.dataeng module
-------------------
cce\.preprocess module
----------------------

.. automodule:: cce.dataeng
.. automodule:: cce.preprocess
:members:
:undoc-members:
:show-inheritance:
Expand All @@ -28,10 +28,10 @@ cce\.estimator module
:undoc-members:
:show-inheritance:

cce\.optimize\_weights module
-----------------------------
cce\.optimize module
--------------------

.. automodule:: cce.optimize_weights
.. automodule:: cce.optimize
:members:
:undoc-members:
:show-inheritance:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
numpy >= 1.12.0
scipy >= 1.0.0
tensorflow
tensorflow >= 1.4.0
57 changes: 44 additions & 13 deletions tests/test_use_case_1.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import unittest
import numpy as np
from itertools import product
from numpy import cos, sin, log2
from scipy.stats import multivariate_normal as multinorm
from cce.estimator import WeightedKraskovEstimator as wke
from tests.plain_kraskov import calculate_mi as plain_calculate_mi
from tests.noisy_channel import communicate

NN_K = 30
DELTA = 0.01
NN_K = 15
ATOL = 0.02
RTOL = 0.04

class TestUseCase1(unittest.TestCase):
"""Tests of mutual information (MI) estimation for input distributions
Expand All @@ -17,35 +20,63 @@ def test_equal_sizes_trivial_separate(self):
mi_est1 = wke(data).calculate_mi(k=NN_K)
mi_est2 = plain_calculate_mi(data=data, k=NN_K)
mi_exact = 1.0
self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA)
self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA)
self.assertAlmostEqual((mi_est1 - mi_exact)/mi_exact, 0, delta=RTOL)
self.assertAlmostEqual((mi_est1 - mi_est2)/mi_est2, 0, delta=RTOL)

def test_equal_sizes_trivial_overlap(self):
data = communicate({'0': 10000, '1': 10000},
{'0': 0.0, '1': 0.0})
mi_est1 = wke(data).calculate_mi(k=NN_K)
mi_est2 = plain_calculate_mi(data=data, k=NN_K)
mi_exact = 0.0
self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA)
self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA)
self.assertAlmostEqual(mi_est1, mi_exact, delta=ATOL)
self.assertAlmostEqual(mi_est1, mi_est2, delta=ATOL)

def test_unequal_sizes(self):
data = communicate({'0': 5000, '1': 10000},
{'0': 0.0, '1': 1.0})
mi_est1 = wke(data).calculate_mi(k=NN_K)
mi_est2 = plain_calculate_mi(data=data, k=NN_K)
mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3)
self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA)
self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA)
mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3)
self.assertAlmostEqual((mi_est1 - mi_exact)/mi_exact, 0, delta=RTOL)
self.assertAlmostEqual((mi_est1 - mi_est2)/mi_exact, 0, delta=RTOL)

def test_equal_sizes_partial_overlap(self):
data = communicate({'0': 5000, '1': 5000, '2': 5000},
{'0': 0.0, '1': 0.0, '2': 1.0})
mi_est1 = wke(data).calculate_mi(k=NN_K)
mi_est2 = plain_calculate_mi(data=data, k=NN_K)
mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3)
self.assertAlmostEqual(mi_est1, mi_exact, delta=DELTA)
self.assertAlmostEqual(mi_est1, mi_est2, delta=DELTA)
mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3)
self.assertAlmostEqual((mi_est1 - mi_exact)/mi_exact, 0, delta=RTOL)
self.assertAlmostEqual((mi_est1 - mi_est2)/mi_est2, 0, delta=RTOL)

def test_8gaussians3d_snake(self):
data = []
for dist_i, count in enumerate(8*[5000]):
mu, sigma = dist_i+1, 33 + 3*dist_i
means3 = [mu**2.5, 50*cos(mu/0.75), 200*sin(mu/1.5)]
covar3 = [[sigma**2, 0, 0],
[0, sigma**2, 0],
[0, 0, sigma**2]]
for v in multinorm(means3, covar3).rvs(count):
data.append((dist_i, v))
mi_est = wke(data).calculate_mi(k=NN_K)
mi_accur = 1.85959 # calculated in Mathematica for continuous distributions
self.assertAlmostEqual((mi_est - mi_accur)/mi_accur, 0, delta=RTOL)

def test_8gaussians3d_box(self):
box = list(product(range(2),repeat=3))
data = []
for i, count in enumerate(8*[5000]):
means3, sigma = (box[i], 0.25 + ((i + 1)/8 + sum(box[i]))/10)
covar3 = [[sigma**2, 0, 0],
[0, sigma**2, 0],
[0, 0, sigma**2]]
for v in multinorm(means3, covar3).rvs(count):
data.append((i, v))
mi_est = wke(data).calculate_mi(k=NN_K)
mi_accur = 1.72251 # calculated in Mathematica for continuous distributions
self.assertAlmostEqual((mi_est - mi_accur)/mi_accur, 0, delta=RTOL)


if __name__ == '__main__':
Expand Down
19 changes: 10 additions & 9 deletions tests/test_use_case_2.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import unittest
import numpy as np
from numpy import log2
from cce.estimator import WeightedKraskovEstimator as wke
from tests.noisy_channel import communicate

NN_K = 30
DELTA = 0.01
NN_K = 15
ATOL = 0.02
RTOL = 0.04

class TestUseCase2(unittest.TestCase):
"""Tests of mutual information (MI) estimation for input distributions
Expand All @@ -15,32 +16,32 @@ def test_equal_sizes(self):
output_values={'A': 0.0, 'B': 1.0})
weights = {'A': 1/3, 'B': 2/3}
mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K)
mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3)
self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA)
mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3)
self.assertAlmostEqual((mi_est - mi_exact)/mi_exact, 0, delta=RTOL)

def test_unequal_sizes(self):
data = communicate({'A': 2500, 'B': 5000},
{'A': 0.0, 'B': 1.0})
weights = {'A': 1/2, 'B': 1/2}
mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K)
mi_exact = 1.0
self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA)
self.assertAlmostEqual((mi_est - mi_exact)/mi_exact, 0, delta=RTOL)

def test_unequal_sizes_trivial_overlap(self):
data = communicate({'A': 2500, 'B': 5000},
{'A': 0.0, 'B': 0.0})
weights = {'A': 1/2, 'B': 1/2}
mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K)
mi_exact = 0.0
self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA)
self.assertAlmostEqual(mi_est, mi_exact, delta=ATOL)

def test_output_overlap(self):
data = communicate({'A': 2500, 'B': 2500, 'C': 2500},
{'A': 0, 'B': 0, 'C': 1})
weights = {'A': 1/3, 'B': 1/3, 'C': 1/3}
mi_est = wke(data).calculate_weighted_mi(weights, k=NN_K)
mi_exact = -(np.log2(2/3)*2/3 + np.log2(1/3)*1/3)
self.assertAlmostEqual(mi_est, mi_exact, delta=DELTA)
mi_exact = -(log2(2/3)*2/3 + log2(1/3)*1/3)
self.assertAlmostEqual((mi_est - mi_exact)/mi_exact, 0, delta=RTOL)


if __name__ == '__main__':
Expand Down
74 changes: 52 additions & 22 deletions tests/test_use_case_3.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import unittest
import numpy as np
from itertools import product
from numpy import cos, sin, log2
from scipy.stats import multivariate_normal as multinorm
from scipy.stats.distributions import norm
from cce.estimator import WeightedKraskovEstimator as wke
from tests.noisy_channel import communicate

NN_K = 30
DELTA = 0.01
NN_K = 15
ATOL = 0.02
RTOL = 0.04

class TestUseCase3(unittest.TestCase):
"""Test of maximum mutual information (max MI, capacity) estimation
Expand All @@ -14,28 +17,28 @@ class TestUseCase3(unittest.TestCase):
def test_equal_sizes(self):
data = communicate(input_counts={'A': 5000, 'B': 5000},
output_values={'A': 0.0, 'B': 1.0})
capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
capacity_exact = 1.0
self.assertAlmostEqual(capacity_est, capacity_exact, delta=DELTA)
self.assertAlmostEqual(opt_ws['A'], 0.5, delta=DELTA)
self.assertAlmostEqual(opt_ws['B'], 0.5, delta=DELTA)
cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
cap_exact = 1.0
self.assertAlmostEqual((cap_est - cap_exact)/cap_exact, 0, delta=RTOL)
self.assertAlmostEqual(opt_ws['A'], 0.5, delta=ATOL)
self.assertAlmostEqual(opt_ws['B'], 0.5, delta=ATOL)

def test_unequal_sizes(self):
data = communicate({'A': 2500, 'B': 5000},
{'A': 0.0, 'B': 1.0})
capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
capacity_exact = 1.0
self.assertAlmostEqual(capacity_est, capacity_exact, delta=DELTA)
self.assertAlmostEqual(opt_ws['A'], 0.5, delta=DELTA)
self.assertAlmostEqual(opt_ws['B'], 0.5, delta=DELTA)
cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
cap_exact = 1.0
self.assertAlmostEqual((cap_est - cap_exact)/cap_exact, 0, delta=RTOL)
self.assertAlmostEqual(opt_ws['A'], 0.5, delta=ATOL)
self.assertAlmostEqual(opt_ws['B'], 0.5, delta=ATOL)

def test_equal_sizes_partial_overlap(self):
data = communicate({'A': 2500, 'B': 2500, 'C': 2500},
{'A': 0, 'B': 0, 'C': 1})
capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
capacity_exact = 1.0
self.assertAlmostEqual(capacity_est, capacity_exact, delta=0.01)
self.assertAlmostEqual(opt_ws['C'], 0.5, delta=DELTA)
cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
cap_exact = 1.0
self.assertAlmostEqual((cap_est - cap_exact)/cap_exact, 0, delta=RTOL)
self.assertAlmostEqual(opt_ws['C'], 0.5, delta=ATOL)

def test_eight_gaussians(self):
"""This test case is patterned after a test included in the source code
Expand All @@ -46,17 +49,44 @@ def _mu_sigma(i):
return (1.5*(i + 1), (i + 1)/4. + 1)

data = []
for dist_i, count in enumerate(8*[2500]):
for dist_i, count in enumerate(8*[5000]):
data.extend([(dist_i, [values])
for values in norm(*_mu_sigma(dist_i)).rvs(count)])

capacity_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
cap_est, opt_ws = wke(data).calculate_maximized_mi(k=NN_K)
largest_2_opt_ws_indices = list(map(lambda kv: kv[0],
sorted(opt_ws.items(), key=lambda kv: kv[1], reverse=True)[0:2]))
capacity_accurate, largest_2_opt_ws_indices_accurate = 1.14425, [0,7]
self.assertAlmostEqual(capacity_est, capacity_accurate, delta=DELTA)
self.assertEqual(largest_2_opt_ws_indices, largest_2_opt_ws_indices_accurate)
cap_accur, largest_2_opt_ws_indices_accur = 1.14425, [0,7]
self.assertAlmostEqual((cap_est - cap_accur)/cap_accur, 0, delta=RTOL)
self.assertEqual(largest_2_opt_ws_indices, largest_2_opt_ws_indices_accur)

def test_8gaussians3d_snake(self):
data = []
for dist_i, count in enumerate(8*[5000]):
mu, sigma = dist_i + 1, 33 + 3*dist_i
means3 = [mu**2.5, 50*cos(mu/0.75), 200*sin(mu/1.5)]
covar3 = [[sigma**2, 0, 0],
[0, sigma**2, 0],
[0, 0, sigma**2]]
for v in multinorm(means3, covar3).rvs(count):
data.append((dist_i, v))
cap_est = wke(data).calculate_maximized_mi(k=NN_K)
cap_accur = 1.91191 # calculated in Mathematica for continuous distributions
self.assertAlmostEqual((cap_est[0] - cap_accur)/cap_accur, 0, delta=RTOL)

def test_8gaussians3d_box(self):
corners = list(product(range(2),repeat=3))
data = []
for i, count in enumerate(8*[5000]):
means3, sigma = (corners[i], 0.25 + ((i + 1)/8 + sum(corners[i]))/10)
covar3 = [[sigma**2, 0, 0],
[0, sigma**2, 0],
[0, 0, sigma**2]]
for v in multinorm(means3, covar3).rvs(count):
data.append((i, v))
cap_est = wke(data).calculate_maximized_mi(k=NN_K)
cap_accur = 1.8035 # calculated in Mathematica for continuous distributions
self.assertAlmostEqual((cap_est[0] - cap_accur)/cap_accur, 0, delta=RTOL)

if __name__ == '__main__':
unittest.main()

0 comments on commit 17887a7

Please sign in to comment.