From ef806039445e564900436a5d4893b78174eb04b3 Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Tue, 20 Feb 2024 16:37:47 -0800 Subject: [PATCH 01/12] * use pathlib module for file paths * send in packing config data directly to analysis --- cellpack/bin/pack.py | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index a35c214a..b1f7423e 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -1,5 +1,5 @@ import fire -from os import path +from pathlib import Path import logging import logging.config import time @@ -12,10 +12,9 @@ from cellpack.autopack.loaders.config_loader import ConfigLoader from cellpack.autopack.loaders.recipe_loader import RecipeLoader from cellpack.autopack.loaders.analysis_config_loader import AnalysisConfigLoader -from cellpack.autopack.utils import get_seed_list ############################################################################### -log_file_path = path.abspath(path.join(__file__, "../../logging.conf")) +log_file_path = Path(__file__).parent.parent / "logging.conf" logging.config.fileConfig(log_file_path, disable_existing_loggers=False) log = logging.getLogger() ############################################################################### @@ -30,8 +29,6 @@ def pack(recipe, config_path=None, analysis_config_path=None): :return: void """ - log.info(f"Running in {__file__}") - packing_config_data = ConfigLoader(config_path).config recipe_data = RecipeLoader( recipe, packing_config_data["save_converted_recipe"] @@ -58,21 +55,10 @@ def pack(recipe, config_path=None, analysis_config_path=None): ) log.info(f"saving to {env.out_folder}") - seed_list = get_seed_list(packing_config_data, recipe_data) - analyze.doloop( - packing_config_data["number_of_packings"], + recipe_data, + packing_config_data, env.boundingBox, - plot_figures=packing_config_data.get("save_plot_figures", True), - show_grid=packing_config_data["show_grid_plot"], - seed_list=seed_list, - config_name=packing_config_data["name"], - recipe_version=recipe_data["version"], - image_export_options=packing_config_data.get("image_export_options"), - parallel=packing_config_data.get("parallel", False), - save_gradient_data_as_image=packing_config_data.get( - "save_gradient_data_as_image", False - ), ) if analysis_config_path is not None: analyze.run_analysis_workflow( From 10f2cae46f68d80d76e01848a44ef00b83deb59c Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Tue, 20 Feb 2024 16:39:45 -0800 Subject: [PATCH 02/12] Process arguments inside `doloop` function --- cellpack/autopack/Analysis.py | 52 ++++++++++++++++------------------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index f1d281e5..03bfec63 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -33,7 +33,7 @@ from cellpack.autopack.plotly_result import PlotlyAnalysis from cellpack.autopack.upy import colors as col from cellpack.autopack.upy.colors import map_colors -from cellpack.autopack.utils import check_paired_key, get_paired_key +from cellpack.autopack.utils import check_paired_key, get_paired_key, get_seed_list from cellpack.autopack.writers import Writer from cellpack.autopack.writers.ImageWriter import ImageWriter import concurrent.futures @@ -2342,17 +2342,10 @@ def pack_one_seed( def doloop( self, - number_of_packings, + recipe_data, + packing_config_data, bounding_box, get_distance_distribution=True, - plot_figures=True, - show_grid=True, - seed_list=None, - config_name="default", - recipe_version="1.0.0", - image_export_options=None, - parallel=False, - save_gradient_data_as_image=False, ): """ Runs multiple packings of the same recipe in a loop. This workflow @@ -2363,26 +2356,17 @@ def doloop( Parameters ---------- - number_of_packing : int - Number of repeats of a packing. Default: 1 - bounding_box : np.ndarray - bounding box from the environment + recipe_data: dict + Dictionary containing recipe data + packing_config_data: dict + Dictionary containing packing configuration data + bounding_box: list + List of two lists containing the minimum and maximum coordinates + of the bounding box get_distance_distribution: bool - specify whether to calculate distance distributions - render: bool - ??? - plot_figures: bool - specify whether to save figures generated by the analyses - show_grid: bool - specify whether to display packing grid in browser - fbox_bb: ??? - ??? - seed_list: List - list of seeds to use for the packing (for reproducibility) - config_name: string - name of the configuration used for the packing - recipe_version: string - version of the recipe used for the packing + Whether to calculate and store distance and angle distributions + seed_list: list + List of seeds to use for packings Outputs ------- @@ -2393,6 +2377,16 @@ def doloop( distributions as applicable for each seed, and a combined image across seeds """ + number_of_packings = packing_config_data["number_of_packings"] + plot_figures = packing_config_data.get("save_plot_figures", True) + show_grid = packing_config_data["show_grid_plot"] + image_export_options = packing_config_data.get("image_export_options") + parallel = packing_config_data.get("parallel", False) + save_gradient_data_as_image = packing_config_data.get( + "save_gradient_data_as_image", False + ) + + seed_list = get_seed_list(packing_config_data, recipe_data) if seed_list is None: seed_list = self.getHaltonUnique(number_of_packings) packing_basename = self.env.base_name From 591eea1dc03286a04b369d85b99aeabf47fa942c Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Wed, 21 Feb 2024 16:39:00 -0800 Subject: [PATCH 03/12] Remove unused functions --- cellpack/autopack/Analysis.py | 1146 +-------------------------------- 1 file changed, 25 insertions(+), 1121 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index f1d281e5..79198349 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -4,36 +4,22 @@ @author: ludo """ -import csv import json -import math import os from pathlib import Path from time import time import matplotlib import numpy -import pandas as pd -import seaborn as sns -import trimesh from matplotlib import pyplot as plt -from matplotlib.patches import Circle, Patch +from matplotlib.patches import Circle from mdutils.mdutils import MdUtils -from PIL import Image -from scipy import stats -from scipy.cluster import hierarchy -from scipy.spatial import distance -from sklearn.metrics import matthews_corrcoef -from tqdm import tqdm import cellpack.autopack as autopack -from cellpack.autopack.GeometryTools import GeometryTools, Rectangle +from cellpack.autopack.GeometryTools import GeometryTools from cellpack.autopack.ldSequence import halton -from cellpack.autopack.MeshStore import MeshStore from cellpack.autopack.plotly_result import PlotlyAnalysis -from cellpack.autopack.upy import colors as col -from cellpack.autopack.upy.colors import map_colors -from cellpack.autopack.utils import check_paired_key, get_paired_key +from cellpack.autopack.utils import check_paired_key, get_paired_key, get_seed_list from cellpack.autopack.writers import Writer from cellpack.autopack.writers.ImageWriter import ImageWriter import concurrent.futures @@ -88,24 +74,6 @@ def __init__( self.seed_to_results = {} autopack._colors = None - @staticmethod - def get_xyz_dict_from_all_pos_dict(all_pos_dict): - """ - returns array of x, y, and z positions for each seed for runs - in all_pos_dict - """ - all_objs = {} - for seed, object_dict in all_pos_dict.items(): - for obj, positions in object_dict.items(): - positions = numpy.array(positions) - if obj not in all_objs: - all_objs[obj] = {} - if seed not in all_objs[obj]: - all_objs[obj][seed] = {} - for ct, dim in enumerate(["x", "y", "z"]): - all_objs[obj][seed][dim] = positions[:, ct] - return all_objs - @staticmethod def cartesian_to_sph(xyz, center=None): """ @@ -126,216 +94,6 @@ def cartesian_to_sph(xyz, center=None): def get_list_of_dims(): return ["x", "y", "z", "r", "theta", "phi"] - @staticmethod - def save_array_to_file(array_to_save, file_path, seed_index): - f_handle = open(file_path, "a" if seed_index else "w") - numpy.savetxt( - f_handle, - array_to_save, - delimiter=",", - ) - f_handle.close() - - def getPositionsFromResFile(self): - # could actually restore file using histoVol. - # or not - # need to parse apr file here anyway - return [] - - def getPositionsFromObject(self, parents): - positions = [] - for parent in parents: - obparent = self.helper.getObject(parent) - children = self.helper.getChilds(obparent) - for ch in children: - ingr_name = self.helper.getName(ch) - meshp = self.helper.getObject("Meshs_" + ingr_name.split("_")[0]) - if meshp is None: - c = self.helper.getChilds(ch) - if not len(c): - continue - meshp_children = self.helper.getChilds( - c[0] - ) # continue #should get sphere/cylnder parent ? - else: - meshp_children = self.helper.getChilds(meshp) - for cc in meshp_children: - pos = self.helper.ToVec(self.helper.getTranslation(cc)) - positions.append(pos) - return positions - - def getDistanceFrom(self, target, parents=None, **options): - """ - target : name or host object target or target position - parent : name of host parent object for the list of object to measure distance from - objects : list of object or list of points - """ - # get distance from object to the target. - # all object are in env.packed_objects - # get options - - if type(target) == list or type(target) == tuple: - targetPos = target - elif type(target) == str: - o = self.helper.getObject(target) - if o is not None: - targetPos = self.helper.ToVec(self.helper.getTranslation(o)) # hostForm - else: - o = self.helper.getObject(target) - if o is not None: - targetPos = self.helper.ToVec(self.helper.getTranslation(o)) # hostForm - listCenters = [] - if self.result_file is None: - if parents is None and self.result_file is None: - listeParent = [self.env.name + "_cytoplasm"] - for o in self.env.compartments: - listeParent.append(o.name + "_Matrix") - listeParent.append(o.name + "_surface") - elif parents is not None and self.result_file is None: - listeParent = parents - listCenters = self.getPositionsFromObject(listeParent) - - delta = numpy.array(listCenters) - numpy.array(targetPos) - delta *= delta - distA = numpy.sqrt(delta.sum(1)) - return distA - - def getClosestDistance(self, parents=None, **options): - if self.result_file is None: - if parents is None and self.result_file is None: - listeParent = [self.env.name + "_cytoplasm"] - for o in self.env.compartments: - listeParent.append(o.name + "_Matrix") - listeParent.append(o.name + "_surface") - elif parents is not None and self.result_file is None: - listeParent = parents - listeCenters = self.getPositionsFromObject(listeParent) - else: - # use data from file - # TODO: currently getPositionsFromResFile returns an empty list - # listeCenters = self.getPositionsFromResFile(listeParent) - listeCenters = [] - # is the distance in the result array ? - listeDistance = numpy.zeros(len(listeCenters)) + 99999 - for i in range(len(listeCenters)): - for j in range(i + 1, len(listeCenters)): - # should use point - d = self.helper.measure_distance(listeCenters[i], listeCenters[j]) - if d < listeDistance[i]: - listeDistance[i] = d - return listeDistance - - def displayDistance( - self, - ramp_color1=[1, 0, 0], - ramp_color2=[0, 0, 1], - ramp_color3=None, - cutoff=60.0, - ): - distances = numpy.array(self.env.grid.distToClosestSurf[:]) - mask = distances > cutoff - ind = numpy.nonzero(mask)[0] - distances[ind] = cutoff - mask = distances < 0 # -cutoff - ind = numpy.nonzero(mask)[0] - distances[ind] = 0 # cutoff - base = self.helper.getObject(self.env.name + "distances_base") - if base is None: - base = self.helper.Sphere(self.env.name + "distances_base")[0] - p = self.helper.getObject(self.env.name + "distances") - if p is not None: - self.helper.deleteObject(p) # recursif? - p = self.helper.newEmpty(self.env.name + "distances") - # can use cube also - - def displayDistanceCube( - self, - ramp_color1=[1, 0, 0], - ramp_color2=[0, 0, 1], - ramp_color3=None, - cutoff=60.0, - ): - distances = numpy.array(self.env.grid.distToClosestSurf[:]) - mask = distances > cutoff - ind = numpy.nonzero(mask)[0] - distances[ind] = cutoff - mask = distances < 0 # -cutoff - ind = numpy.nonzero(mask)[0] - distances[ind] = 0 # cutoff - base = self.helper.getObject(self.env.name + "distances_base_cube") - if base is None: - # base=self.helper.Sphere(self.env.name+"distances_base")[0] - size = self.env.grid.gridSpacing - base = self.helper.box( - self.env.name + "distances_base_cube", - center=[0.0, 0.0, 0.0], - size=[size, size, size], - )[0] - parent_cube = self.helper.getObject(self.env.name + "distances_cubes") - if parent_cube is not None: - self.helper.deleteObject(parent_cube) # recursif? - parent_cube = self.helper.newEmpty(self.env.name + "distances_cubes") - - def displayDistancePlane( - self, - ramp_color1=[1, 0, 0], - ramp_color2=[0, 0, 1], - ramp_color3=None, - cutoff=60.0, - ): - # which axis ? - distances = numpy.array(self.env.grid.distToClosestSurf[:]) - - ramp = col.getRamp([ramp_color1, ramp_color2], size=255) # color - mask = distances > cutoff - ind = numpy.nonzero(mask)[0] - distances[ind] = cutoff - mask = distances < 0 # -cutoff - ind = numpy.nonzero(mask)[0] - distances[ind] = 0 # cutoff - newd = numpy.append(distances, cutoff) - colors = map_colors(newd, ramp)[:-1] # 1D array of the grid x,y,1 - autopack._colors = colors - p = self.helper.getObject(self.env.name + "distances") - if p is not None: - self.helper.deleteObject(p) # recursif? - p = self.helper.newEmpty(self.env.name + "distances_p") - - d = numpy.array(self.env.grid.boundingBox[0]) - numpy.array( - self.env.grid.boundingBox[1] - ) - p, mpl = self.helper.plane( - self.env.name + "distances_plane", - center=self.env.grid.getCenter(), - size=[math.fabs(d[0]), math.fabs(d[1])], - parent=p, - ) - self.helper.rotateObj(p, [0, 0, -math.pi / 2.0]) - filename = ( - autopack.cache_results - + os.sep - + self.env.name - + "distances_plane_texture.png" - ) - c = colors.reshape( - ( - self.env.grid.nbGridPoints[0], - self.env.grid.nbGridPoints[1], - self.env.grid.nbGridPoints[2], - 3, - ) - ) - - im = Image.fromstring( - "RGB", (c.shape[0], c.shape[1]), numpy.uint8(c * 255.0).tostring() - ) - im.save(str(filename)) - mat = self.helper.createTexturedMaterial( - self.env.name + "planeMat", str(filename) - ) - # assign the material to the plane - self.helper.assignMaterial(p, mat, texture=True) - def writeJSON(self, filename, data): with open(filename, "w") as fp: # doesnt work with symbol link ? json.dump( @@ -347,287 +105,6 @@ def loadJSON(self, filename): data = json.load(data_file) return data - def grabResultFromJSON(self, n): - ingrrot = {} - ingrpos = {} - for i in range(n): - with open("results_seed_" + str(i) + ".json") as data_file: - data = json.load(data_file) - for recipe in data: - for ingrname in data[recipe]: - for k in range(len(data[recipe][ingrname]["results"])): - if ingrname not in ingrrot: - ingrrot[ingrname] = [] - ingrpos[ingrname] = [] - ingrrot[ingrname].append( - data[recipe][ingrname]["results"][k][1] - ) - ingrpos[ingrname].append( - data[recipe][ingrname]["results"][k][0] - ) - return ingrpos, ingrrot - - def grabResultFromTXT(self, n, doanalyze=False): - from autopack import transformation as t - - ingrrot = {} - ingrpos = {} - for i in range(1000): - files = open("results_seed_" + str(i) + ".txt", "r") - lines = files.readlines() - files.close() - for line in lines: - line = line.replace("<", " ").replace(">", " ") - elem = line.split() - ingrname = elem[-5] - if ingrname not in ingrrot: - ingrrot[ingrname] = [] - ingrpos[ingrname] = [] - ingrrot[ingrname].append(eval(elem[2])) - ingrpos[ingrname].append(eval(elem[0])) - for ingrname in ingrrot: - ingrrot[ingrname] = [ - numpy.array(m).reshape((4, 4)) for m in ingrrot[ingrname] - ] - if doanalyze: - for ingrname in ingrrot: - eulers3 = [t.euler_from_matrix(m, "rxyz") for m in ingrrot[ingrname]] - e3 = numpy.degrees(numpy.array(eulers3)).transpose() - numpy.savetxt( - ingrname + "_euler_X.csv", - numpy.array(e3[0]), - delimiter=",", - ) - numpy.savetxt( - ingrname + "_euler_Y.csv", - numpy.array(e3[1]), - delimiter=",", - ) - numpy.savetxt( - ingrname + "_euler_Z.csv", - numpy.array(e3[2]), - delimiter=",", - ) - self.histogram(e3[0], ingrname + "_euler_X.png") - self.histogram(e3[1], ingrname + "_euler_Y.png") - self.histogram(e3[2], ingrname + "_euler_Z.png") - return ingrpos, ingrrot - - # should take any type of list... - def save_csv(self, data, filename=None): - if filename is None: - filename = "output.csv" - resultFile = open(filename, "wb") - wr = csv.writer(resultFile, dialect="excel") - # wr.writerows(data) list of list ? - # resultFile.close() - for item in data: - wr.writerow([item]) - resultFile.close() - - def rectangle_circle_area(self, bbox, center, radius): - # http://www.eex-dev.net/index.php?id=100 - # [[0.,0,0],[1000,1000,1]] - # top,bottom, right, left - # rect=Rectangle(bbox[0][0],bbox[1][0],bbox[0][1],bbox[1][1])#top,bottom, right, left - rect = Rectangle( - bbox[1][1], bbox[0][1], bbox[1][0], bbox[0][0] - ) # top,bottom, right, left - m = [center[0], center[1]] - r = radius - area = math.pi * r**2 - chs = self.g.check_sphere_inside(rect, m, r) - if chs: # sph not completly inside - ch = self.g.check_rectangle_oustide(rect, m, r) - if ch: # rectangle not outside - leftBound, rightBound = self.g.getBoundary(rect, m, r) - area = self.g.get_rectangle_cercle_area( - rect, m, r, leftBound, rightBound - ) - else: - area = bbox[0][1] ** 2 - return area - - def getVolumeShell(self, bbox, radii, center): - # rectangle_circle_area - volumes = [] - box_size0 = bbox[1][0] - bbox[0][0] - for i in range(len(radii) - 1): - r1 = radii[i] - r2 = radii[i + 1] - v1 = self.g.calc_volume(r1, box_size0 / 2.0) - v2 = self.g.calc_volume(r2, box_size0 / 2.0) - volumes.append(v2 - v1) - return volumes - - def rdf_3d(self, ingr): - # see for intersection volume here http://crowsandcats.blogspot.com/2013/04/cube-sphere-intersection-volume.html - # and here http://crowsandcats.blogspot.com/2013/05/extending-radial-distributions.html - # will require scipy...worth it ? - # should be pairewise distance ? or not ? - distances = numpy.array(self.env.distances[ingr.name]) - basename = self.env.basename - numpy.savetxt( - basename + ingr.name + "_pos.csv", - numpy.array(self.env.ingredient_positions[ingr.name]), - delimiter=",", - ) - self.histogram(distances, basename + ingr.name + "_histo.png") - numpy.savetxt( - basename + ingr.name + "_distances.csv", - numpy.array(distances), - delimiter=",", - ) - # the bin should be not less than the biggest ingredient radius - # b=int(distances.max()/self.largest) - b = 100 - # bin_edges = numpy.arange(0, min(box_size) / 2, bin_width) - new_rdf, edges = numpy.histogramdd( - distances, bins=b, range=[(distances.min(), distances.max())] - ) - radii = edges[0] - # from http://isaacs.sourceforge.net/phys/rdfs.html - dnr = new_rdf - N = len(distances) - V = ( - self.env.grid.nbGridPoints[0] - * self.env.grid.nbGridPoints[1] - * self.env.grid.nbGridPoints[2] - * self.env.grid.gridSpacing**3 - ) - Vshell = numpy.array(self.getVolumeShell(self.bbox, radii, self.center)) - gr = (dnr * V) / (N * Vshell) - numpy.savetxt(basename + ingr.name + "_rdf.csv", numpy.array(gr), delimiter=",") - self.plot(gr, radii[:-1], basename + ingr.name + "_rdf.png") - - def getAreaShell(self, bbox, radii, center): - # rectangle_circle_area - areas = [] - for i in range(len(radii) - 1): - r1 = radii[i] - r2 = radii[i + 1] - area1 = self.rectangle_circle_area(bbox, center, r1) - area2 = self.rectangle_circle_area(bbox, center, r2) - if area1 == 0 or area2 == 0: - areas.append(numpy.pi * (numpy.power(r2, 2) - numpy.power(r1, 2))) - else: - areas.append(area2 - area1) - return areas - - def ripley(self, positions, dr=25, rMax=None): - # K(t) = A*SUM(wij*I(i,j)/n**2) - # lambda = n/A A is the area of the region containing all points - # I indicator function 1 if its operand is true, 0 otherwise - # t is the search radius - # if homogenous K(s) = pi*s**2 - # L(t) = (K(t)/pi)**1/2 - # A common plot is a graph of t - \hat{L}(t) against t - # which will approximately follow the horizontal zero-axis with constant - # dispersion if the data follow a homogeneous Poisson process. - N = len(positions) - V = 1000**2 - diag = numpy.sqrt(1000**2 + 1000**2) - dr = dr # all_distance.min() - if rMax is None: - rMax = diag - edges = numpy.arange(dr, rMax + 1.1 * dr, dr) - k = numpy.zeros((N, len(edges))) - for i, p in enumerate(positions): - di = distance.cdist( - positions, - [p], - "euclidean", - ) - # dV = np.array(analyse.getAreaShell(analyse.bbox,edges,p)) - for j, e in enumerate(edges): - area0 = math.pi * e**2 # complete circle - area1 = self.rectangle_circle_area(self.bbox, p, e) - w = area1 / area0 - k[i, j] = w * len(numpy.nonzero(di < e)[0]) / N**2 - Kt = V * numpy.sum(k, axis=0) - Lt = (Kt / numpy.pi) ** 0.5 - return Kt, Lt - - def rdf(self, positions, dr=10, rMax=None): - N = len(positions) - V = 1000**2 - diag = numpy.sqrt(1000**2 + 1000**2) - dr = dr # all_distance.min() - if rMax is None: - rMax = diag - edges = numpy.arange(0.0, rMax + 1.1 * dr, dr) - g = numpy.zeros((N, len(edges) - 1)) - dv = [] - density = float(N) / float(V) - for i, p in enumerate(positions): - di = distance.cdist( - positions, - [p], - "euclidean", - ) - dN, bins = numpy.histogram(di, bins=edges) - dV = numpy.array(self.getAreaShell(self.bbox, edges, p)) - dv.append(dV) - g[i] = dN / (dV * density) - avg = numpy.average(g, axis=0) # /np.array(dv) - return avg - - def rdf_2d(self, ingr): - # dN/N / dV/V = dN/dV * V/N - distances = numpy.array(self.env.distances[ingr.name]) - basename = self.env.basename - numpy.savetxt( - basename + ingr.name + "_pos.csv", - numpy.array(self.env.ingredient_positions[ingr.name]), - delimiter=",", - ) - self.histogram(distances, basename + ingr.name + "_histo.png") - numpy.savetxt( - basename + ingr.name + "_distances.csv", - numpy.array(distances), - delimiter=",", - ) - # the bin should be not less than the biggest ingredient radius - # b=int(distances.max()/self.largest) - new_rdf, edges = numpy.histogramdd( - distances - ) # , bins=b, range=[(distances.min(), distances.max())],normed=0) - radii = edges[0] - # r=radii.tolist() - # r.insert(0,0.0) - # radii = numpy.array(r) - # rdf= new_rdf.tolist() - # rdf.insert(0,0) - # new_rdf = numpy.array(rdf) - # from http://isaacs.sourceforge.net/phys/rdfs.html - dnr = new_rdf[:] - N = len(distances) - V = ( - self.env.grid.nbGridPoints[0] - * self.env.grid.nbGridPoints[1] - * self.env.grid.gridSpacing**2 - ) - Vshell = numpy.array(self.getAreaShell(self.bbox, radii, self.center)) - # print Vshell - # Vshell1 = numpy.pi*density*(numpy.power(radii[1:],2)-numpy.power(radii[:-1], 2)) - # print Vshell1 - # print radii - gr = (dnr * V) / (N * Vshell) - numpy.savetxt(basename + ingr.name + "_rdf.csv", numpy.array(gr), delimiter=",") - self.plot(gr, radii[:-1], basename + ingr.name + "_rdf.png") - # simpl approach Ni/Areai - G = dnr / Vshell - numpy.savetxt( - basename + ingr.name + "_rdf_simple.csv", - numpy.array(G), - delimiter=",", - ) - self.plot( - numpy.array(G), - radii[:-1], - basename + ingr.name + "_rdf_simple.png", - ) - def plot_position_distribution_total(self, all_positions): pos_xyz = numpy.array(all_positions) if pos_xyz.shape[0] <= 1: @@ -697,167 +174,6 @@ def plot_distance_distribution(self, all_ingredient_distances): y_label="count", ) - def correlation(self, ingr): - basename = self.env.basename - posxyz = numpy.array(self.env.ingredient_positions[ingr.name]).transpose() - g_average, radii, x, y, z = self.PairCorrelationFunction_3D( - posxyz, 1000, 900, 100 - ) - self.plot(g_average, radii, basename + ingr.name + "_corr.png") - - def PairCorrelationFunction_3D(self, data, S, rMax, dr): - """Compute the three-dimensional pair correlation function for a set of - spherical particles contained in a cube with side length S. This simple - function finds reference particles such that a sphere of radius rMax drawn - around the particle will fit entirely within the cube, eliminating the need - to compensate for edge effects. If no such particles exist, an error is - returned. Try a smaller rMax...or write some code to handle edge effects! ;) - Arguments: - x an array of x positions of centers of particles - y an array of y positions of centers of particles - z an array of z positions of centers of particles - S length of each side of the cube in space - rMax outer diameter of largest spherical shell - dr increment for increasing radius of spherical shell - - Returns a tuple: (g, radii, interior_x, interior_y, interior_z) - g(r) a numpy array containing the correlation function g(r) - radii a numpy array containing the radii of the - spherical shells used to compute g(r) - interior_x x coordinates of reference particles - interior_y y coordinates of reference particles - interior_z z coordinates of reference particles - """ - x = data[0] - y = data[1] - z = data[2] - # Find particles which are close enough to the cube center that a sphere of radius - # rMax will not cross any face of the cube - bools1 = x > rMax - bools2 = x < (S - rMax) - bools3 = y > rMax - bools4 = y < (S - rMax) - bools5 = z > rMax - bools6 = z < (S - rMax) - - (interior_indices,) = numpy.where( - bools1 * bools2 * bools3 * bools4 * bools5 * bools6 - ) - num_interior_particles = len(interior_indices) - - if num_interior_particles < 1: - raise RuntimeError( - "No particles found for which a sphere of radius rMax\ - will lie entirely within a cube of side length S. Decrease rMax\ - or increase the size of the cube." - ) - - edges = numpy.arange(0.0, rMax + 1.1 * dr, dr) - num_increments = len(edges) - 1 - g = numpy.zeros([num_interior_particles, num_increments]) - radii = numpy.zeros(num_increments) - numberDensity = len(x) / S**3 - - # Compute pairwise correlation for each interior particle - for p in range(num_interior_particles): - index = interior_indices[p] - d = numpy.sqrt( - (x[index] - x) ** 2 + (y[index] - y) ** 2 + (z[index] - z) ** 2 - ) - d[index] = 2 * rMax - - (result, bins) = numpy.histogram(d, bins=edges, normed=False) - g[p, :] = result / numberDensity - - # Average g(r) for all interior particles and compute radii - g_average = numpy.zeros(num_increments) - for i in range(num_increments): - radii[i] = (edges[i] + edges[i + 1]) / 2.0 - rOuter = edges[i + 1] - rInner = edges[i] - g_average[i] = numpy.average(g[:, i]) / ( - 4.0 / 3.0 * numpy.pi * (rOuter**3 - rInner**3) - ) - - return ( - g_average, - radii, - x[interior_indices], - y[interior_indices], - z[interior_indices], - ) - # Number of particles in shell/total number of particles/volume of shell/number density - # shell volume = 4/3*pi(r_outer**3-r_inner**3) - - def PairCorrelationFunction_2D(self, x, y, S, rMax, dr): - """Compute the two-dimensional pair correlation function, also known - as the radial distribution function, for a set of circular particles - contained in a square region of a plane. This simple function finds - reference particles such that a circle of radius rMax drawn around the - particle will fit entirely within the square, eliminating the need to - compensate for edge effects. If no such particles exist, an error is - returned. Try a smaller rMax...or write some code to handle edge effects! ;) - - Arguments: - x an array of x positions of centers of particles - y an array of y positions of centers of particles - S length of each side of the square region of the plane - rMax outer diameter of largest annulus - dr increment for increasing radius of annulus - - Returns a tuple: (g, radii, interior_x, interior_y) - g(r) a numpy array containing the correlation function g(r) - radii a numpy array containing the radii of the - annuli used to compute g(r) - interior_x x coordinates of reference particles - interior_y y coordinates of reference particles - """ - # Number of particles in ring/area of ring/number of reference particles/number density - # area of ring = pi*(r_outer**2 - r_inner**2) - # Find particles which are close enough to the box center that a circle of radius - # rMax will not cross any edge of the box - bools1 = x > 1.1 * rMax - bools2 = x < (S - 1.1 * rMax) - bools3 = y > rMax * 1.1 - bools4 = y < (S - rMax * 1.1) - (interior_indices,) = numpy.where(bools1 * bools2 * bools3 * bools4) - num_interior_particles = len(interior_indices) - - if num_interior_particles < 1: - raise RuntimeError( - "No particles found for which a circle of radius rMax\ - will lie entirely within a square of side length S. Decrease rMax\ - or increase the size of the square." - ) - - edges = numpy.arange(0.0, rMax + 1.1 * dr, dr) - num_increments = len(edges) - 1 - g = numpy.zeros([num_interior_particles, num_increments]) - radii = numpy.zeros(num_increments) - numberDensity = len(x) / S**2 - - # Compute pairwise correlation for each interior particle - for p in range(num_interior_particles): - index = interior_indices[p] - d = numpy.sqrt((x[index] - x) ** 2 + (y[index] - y) ** 2) - d[index] = 2 * rMax - - (result, bins) = numpy.histogram(d, bins=edges, normed=False) - g[p, :] = result / numberDensity - - # Average g(r) for all interior particles and compute radii - g_average = numpy.zeros(num_increments) - for i in range(num_increments): - radii[i] = (edges[i] + edges[i + 1]) / 2.0 - rOuter = edges[i + 1] - rInner = edges[i] - # divide by the area of sphere cut by sqyare - g_average[i] = numpy.average(g[:, i]) / ( - numpy.pi * (rOuter**2 - rInner**2) - ) - - return (g_average, radii, interior_indices) - def get_obj_dict(self, packing_results_path): """ Returns the object dictionary from the input path folder. @@ -1275,418 +591,12 @@ def run_analysis_workflow( print("Starting analysis workflow...") - if analysis_config.get("similarity_analysis"): - self.run_similarity_analysis( - all_objs, **analysis_config["similarity_analysis"] - ) - - if analysis_config.get("parametrized_representation"): - self.get_parametrized_representation( - all_pos_list=all_pos_list, - **analysis_config["parametrized_representation"], - ) - if analysis_config.get("create_report"): self.create_report( recipe_data=recipe_data, **analysis_config["create_report"], ) - def normalize_similarity_df(self, similarity_df): - """ - Normalizes the similarity dataframe - """ - dims_to_normalize = self.get_list_of_dims() + ["pairwise_distance"] - for dim in dims_to_normalize: - values = similarity_df.loc[:, dim].values - normalized_values = (values - numpy.min(values)) / ( - numpy.max(values) - numpy.min(values) - ) - normalized_values[numpy.isnan(normalized_values)] = 0 - similarity_df.loc[:, dim] = normalized_values - return similarity_df - - def calc_avg_similarity_values_for_dim(self, similarity_vals_for_dim): - packing_inds = numpy.cumsum( - numpy.hstack([0, self.num_seeds_per_packing]) - ) # returns the indices where packings start and end - avg_similarity_values = -numpy.ones((self.num_packings, self.num_packings)) - - for p1_id in range(self.num_packings): - for p2_id in range(self.num_packings): - if avg_similarity_values[p1_id, p2_id] >= 0: - continue - p1_inds = numpy.arange( - packing_inds[p1_id], packing_inds[p1_id + 1] - ) # indices corresponding to packing p1_id - p2_inds = numpy.arange(packing_inds[p2_id], packing_inds[p2_id + 1]) - avg_similarity_values[p1_id, p2_id] = numpy.mean( - similarity_vals_for_dim[p1_inds, p2_inds] - ) - avg_similarity_values[p2_id, p1_id] = avg_similarity_values[ - p1_id, p2_id - ] - return avg_similarity_values - - def calc_similarity_df(self, all_objs, ingredient_key, save_path=None): - """ - Calculates a dataframe of similarity values between packings - """ - key_list = list(all_objs[ingredient_key].keys()) - similarity_df = pd.DataFrame( - index=key_list, - columns=pd.MultiIndex.from_product([self.get_list_of_dims(), key_list]), - dtype=float, - ) - similarity_df["packing_id"] = 0 - dims_to_calc = self.get_list_of_dims() + ["pairwise_distance"] - for seed1, pos_dict1 in tqdm(all_objs[ingredient_key].items()): - similarity_df.loc[seed1, "packing_id"] = seed1.split("_")[-1] - for seed2, pos_dict2 in all_objs[ingredient_key].items(): - for dim in dims_to_calc: - if dim == "pairwise_distance": - pos1 = numpy.array( - [pos_dict1["x"], pos_dict1["y"], pos_dict1["z"]] - ) - pos2 = numpy.array( - [pos_dict2["x"], pos_dict2["y"], pos_dict2["z"]] - ) - arr1 = distance.pdist(pos1.T) - arr2 = distance.pdist(pos2.T) - else: - arr1 = pos_dict1[dim] - arr2 = pos_dict2[dim] - - min_dim = numpy.min([arr1.ravel(), arr2.ravel()]) - max_dim = numpy.max([arr1.ravel(), arr2.ravel()]) - arr1 = (arr1 - min_dim) / max_dim - arr2 = (arr2 - min_dim) / max_dim - - if len(arr1) == 1 or len(arr2) == 1: - # cannot determine similarity when only one instance is packed - similarity_score = 0 - elif len(numpy.unique(arr1)) == 1 or len(numpy.unique(arr2)) == 1: - # if there is only one unique value, compare the value - if ( - len(numpy.unique(arr1)) == 1 - and len(numpy.unique(arr2)) == 1 - ): - # both packings have only one unique value, compare the value - similarity_score = ( - 1 if numpy.unique(arr1) == numpy.unique(arr2) else 0 - ) - else: - # one of the packings has more than one unique value, cannot compare - similarity_score = 0 - else: - # anderson-darling test - # ad_stat = stats.anderson_ksamp([arr1, arr2]) - # similarity_score = (ad_stat.significance_level - 0.001) / ( - # 0.25 - 0.001 - # ) - # similarity_score = 1 - ad_stat.statistic - - # 2 sample ks - ks_stat = stats.ks_2samp(arr1, arr2) - similarity_score = 1 - ks_stat.statistic - # similarity_score = ks_stat.pvalue - - # histograms for bhattacharyya and jensen-shannon distances - # hist1, bin_edges1 = numpy.histogram(arr1, bins="auto", density=True) - # hist2, bin_edges2 = numpy.histogram(arr2, bins=bin_edges1, density=True) - - # bhattacharyya distance - # similarity_score = numpy.sqrt(numpy.sum(numpy.sqrt(hist1 * hist2))) - - # jensen-shannon distance - # similarity_score = 1 - distance.jensenshannon(arr1, arr2) - - similarity_df.loc[seed1, (dim, seed2)] = similarity_score - - similarity_df = self.normalize_similarity_df(similarity_df) - if save_path is not None: - dfpath = save_path / f"similarity_df_{ingredient_key}.csv" - print(f"Saving similarity df to {dfpath}") - similarity_df.to_csv(dfpath) - - return similarity_df - - def plot_and_save_similarity_heatmaps(self, similarity_df, ingredient_key): - """ - Plots heatmaps with hierarchical clustering using similarity scores - """ - # accounting for changes when reading from csv - if similarity_df["packing_id"].ndim > 1: - packing_ids = similarity_df["packing_id"].iloc[:, 0] - else: - packing_ids = similarity_df["packing_id"] - lut = dict(zip(packing_ids.unique(), sns.color_palette())) - row_colors = packing_ids.map(lut) - row_colors.rename("Packing ID", inplace=True) - figdir = self.figures_path / "clustering" - figdir.mkdir(parents=True, exist_ok=True) - - dims_to_calc = self.get_list_of_dims() + ["pairwise_distance"] - for dim in dims_to_calc: - avg_similarity_values = self.calc_avg_similarity_values_for_dim( - similarity_df[dim].values - ) - numpy.savetxt( - figdir / f"avg_similarity_{ingredient_key}_{dim}.txt", - avg_similarity_values, - ) - - row_linkage = hierarchy.linkage( - 1 - distance.squareform(similarity_df[dim], checks=False), - method="ward", - optimal_ordering=True, - ) - - g = sns.clustermap( - similarity_df[dim], - row_colors=row_colors, - row_linkage=row_linkage, - col_linkage=row_linkage, - cbar_kws={"label": "similarity score"}, - ) - handles = [Patch(facecolor=lut[name]) for name in lut] - plt.legend( - handles, - lut, - title="Packing IDs", - bbox_to_anchor=(1, 1), - bbox_transform=plt.gcf().transFigure, - loc="upper right", - ) - g.ax_col_dendrogram.set_visible(False) - g.ax_row_dendrogram.set_visible(False) - g.ax_heatmap.set_xlabel(f"{ingredient_key}_{dim}") - g.savefig(figdir / f"clustermap_{ingredient_key}_{dim}", dpi=300) - - def run_similarity_analysis( - self, - all_objs, - ingredient_key, - save_heatmaps=False, - recalculate=False, - ): - """ - TODO: add docs - """ - print("Running similarity analysis...") - - if ingredient_key not in all_objs: - raise ValueError(f"Missing ingredient: {ingredient_key}") - - dfpath = self.output_path / f"similarity_df_{ingredient_key}.csv" - if dfpath.is_file() and not recalculate: - print(f"Loading similarity values from {dfpath}") - similarity_df = pd.read_csv(dfpath, header=[0, 1]) - else: - similarity_df = self.calc_similarity_df( - all_objs, - ingredient_key=ingredient_key, - save_path=self.output_path, - ) - - if save_heatmaps: - self.plot_and_save_similarity_heatmaps( - similarity_df, - ingredient_key=ingredient_key, - ) - - return similarity_df - - def calc_and_save_correlations( - self, - all_spilr_scaled, - ingredient_key, - ): - key_list = [ - f"{pc}_{sc}" - for pc in range(self.num_packings) - for sc in range(self.num_seeds_per_packing[pc]) - ] - corr_df = pd.DataFrame( - index=key_list, - columns=key_list, - dtype=float, - ) - corr_df["packing_id"] = numpy.nan - for pc1 in range(self.num_packings): - for sc1 in tqdm(range(self.num_seeds_per_packing[pc1])): - for pc2 in range(self.num_packings): - for sc2 in range(self.num_seeds_per_packing[pc2]): - corr_df.loc[f"{pc1}_{sc1}", "packing_id"] = pc1 - # do not calculate if: - # a) already calculated - # b) calculating for same packing - if ( - not numpy.isnan(corr_df.loc[f"{pc1}_{sc1}", f"{pc2}_{sc2}"]) - ) or ((pc1 == pc2) and (sc1 == sc2)): - continue - corr_df.loc[f"{pc1}_{sc1}", f"{pc2}_{sc2}"] = matthews_corrcoef( - all_spilr_scaled[pc1, sc1].flatten(), - all_spilr_scaled[pc2, sc2].flatten(), - ) - corr_df.loc[f"{pc2}_{sc2}", f"{pc1}_{sc1}"] = corr_df.loc[ - f"{pc1}_{sc1}", f"{pc2}_{sc2}" - ] - df_packing = corr_df.pop("packing_id") - lut = dict(zip(df_packing.unique(), sns.color_palette())) - row_colors = df_packing.map(lut) - corr_df.fillna(0, inplace=True) - g = sns.clustermap( - corr_df, - row_colors=row_colors, - cbar_kws={"label": "spilr correlation"}, - ) - g.savefig( - self.figures_path / f"spilr_correlation_{ingredient_key}.png", dpi=300 - ) - - def save_spilr_heatmap(self, input_dict, file_path, label_str=None): - fig, ax = plt.subplots() - g = sns.heatmap( - input_dict, - cbar=False, - xticklabels=False, - yticklabels=False, - ax=ax, - ) - g.set_xlabel("Angular coordinates") - g.set_ylabel(label_str) - g.invert_yaxis() - - fig.savefig( - file_path, - dpi=300, - ) - plt.close() - - def get_parametrized_representation( - self, - all_pos_list, - ingredient_key=None, - mesh_paths={}, - num_angular_points=64, - save_plots=False, - max_plots_to_save=1, - get_correlations=False, - ): - print("creating parametrized representations...") - - if "inner" not in mesh_paths or "outer" not in mesh_paths: - raise ValueError( - "Missing mesh paths required to generate parametrized representations" - ) - - inner_mesh = trimesh.load_mesh(mesh_paths.get("inner")) - outer_mesh = trimesh.load_mesh(mesh_paths.get("outer")) - - theta_vals = numpy.linspace(0, numpy.pi, 1 + num_angular_points) - phi_vals = numpy.linspace(0, 2 * numpy.pi, 1 + 2 * num_angular_points) - rad_vals = numpy.linspace(0, 1, 1 + num_angular_points) - - all_spilr = {} - for scaled_val in ["raw", "scaled"]: - all_spilr[scaled_val] = numpy.full( - ( - self.num_packings, - numpy.max(self.num_seeds_per_packing), - len(rad_vals), - len(theta_vals) * len(phi_vals), - ), - numpy.nan, - ) - - if save_plots: - save_dir = self.figures_path / "spilr_heatmaps" - os.makedirs(save_dir, exist_ok=True) - - for pc, (packing_id, packing_dict) in enumerate( - zip(self.packing_id_dict.values(), all_pos_list) - ): - num_saved_plots = 0 - for sc, (_, pos_dict) in enumerate(packing_dict.items()): - pos_list = numpy.array(pos_dict[ingredient_key]) - sph_pts = self.cartesian_to_sph(pos_list) - - ( - scaled_rad, - distance_between_surfaces, - inner_surface_distances, - ) = MeshStore.calc_scaled_distances_for_positions( - pos_list, inner_mesh, outer_mesh - ) - - trial_spilr = {} - for scaled_val in ["raw", "scaled"]: - rad_array = ( - numpy.linspace( - 0, distance_between_surfaces.max(), len(rad_vals) - ) - if scaled_val == "raw" - else rad_vals - ) - - trial_spilr[scaled_val] = numpy.zeros( - (len(rad_array), len(theta_vals), len(phi_vals)) - ) - - max_rad = ( - distance_between_surfaces.max() if scaled_val == "raw" else 1 - ) - - if numpy.any(rad_array > max_rad) or numpy.any(rad_array < 0): - raise ValueError("Check ray-mesh intersections!") - - rad_pos = ( - inner_surface_distances if scaled_val == "raw" else scaled_rad - ) - rad_inds = numpy.digitize(rad_pos, rad_array) - 1 - theta_inds = numpy.digitize(sph_pts[:, 1], theta_vals) - 1 - phi_inds = numpy.digitize(sph_pts[:, 2], phi_vals) - 1 - - trial_spilr[scaled_val][rad_inds, theta_inds, phi_inds] = 1 - - all_spilr[scaled_val][pc, sc] = trial_spilr[scaled_val].reshape( - (len(rad_array), -1) - ) - - if save_plots and (num_saved_plots <= max_plots_to_save): - label_str = f"Distance from Nuclear Surface, {packing_id}_{sc}, {scaled_val}" - file_path = ( - save_dir - / f"heatmap_{scaled_val}_{packing_id}_{sc}_{ingredient_key}" - ) - self.save_spilr_heatmap( - all_spilr[scaled_val][pc, sc], file_path, label_str - ) - num_saved_plots += 1 - - if get_correlations: - print("calculating correlations...") - self.calc_and_save_correlations( - all_spilr["scaled"], - ingredient_key=ingredient_key, - ) - - if save_plots: - for scaled_val in ["raw", "scaled"]: - average_spilr = numpy.nanmean(all_spilr[scaled_val], axis=1) - for pc, packing_id in enumerate(self.packing_id_dict.values()): - label_str = ( - f"Distance from Nuclear Surface, avg {packing_id}, {scaled_val}" - ) - file_path = ( - save_dir - / f"avg_heatmap_{scaled_val}_{packing_id}_{ingredient_key}" - ) - self.save_spilr_heatmap(average_spilr[pc], file_path, label_str) - - return all_spilr - def histogram(self, distances, filename, title_str="", x_label="", y_label=""): plt.clf() # calculate histogram @@ -2342,17 +1252,10 @@ def pack_one_seed( def doloop( self, - number_of_packings, + recipe_data, + packing_config_data, bounding_box, get_distance_distribution=True, - plot_figures=True, - show_grid=True, - seed_list=None, - config_name="default", - recipe_version="1.0.0", - image_export_options=None, - parallel=False, - save_gradient_data_as_image=False, ): """ Runs multiple packings of the same recipe in a loop. This workflow @@ -2363,26 +1266,17 @@ def doloop( Parameters ---------- - number_of_packing : int - Number of repeats of a packing. Default: 1 - bounding_box : np.ndarray - bounding box from the environment + recipe_data: dict + Dictionary containing recipe data + packing_config_data: dict + Dictionary containing packing configuration data + bounding_box: list + List of two lists containing the minimum and maximum coordinates + of the bounding box get_distance_distribution: bool - specify whether to calculate distance distributions - render: bool - ??? - plot_figures: bool - specify whether to save figures generated by the analyses - show_grid: bool - specify whether to display packing grid in browser - fbox_bb: ??? - ??? - seed_list: List - list of seeds to use for the packing (for reproducibility) - config_name: string - name of the configuration used for the packing - recipe_version: string - version of the recipe used for the packing + Whether to calculate and store distance and angle distributions + seed_list: list + List of seeds to use for packings Outputs ------- @@ -2393,6 +1287,16 @@ def doloop( distributions as applicable for each seed, and a combined image across seeds """ + number_of_packings = packing_config_data["number_of_packings"] + plot_figures = packing_config_data.get("save_plot_figures", True) + show_grid = packing_config_data["show_grid_plot"] + image_export_options = packing_config_data.get("image_export_options") + parallel = packing_config_data.get("parallel", False) + save_gradient_data_as_image = packing_config_data.get( + "save_gradient_data_as_image", False + ) + + seed_list = get_seed_list(packing_config_data, recipe_data) if seed_list is None: seed_list = self.getHaltonUnique(number_of_packings) packing_basename = self.env.base_name From e3774cf4b79a3368defccd3c856b2d1ca2f5660e Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Fri, 23 Feb 2024 16:51:38 -0800 Subject: [PATCH 04/12] Remove unnecessary imports from Analysis.py --- cellpack/autopack/Analysis.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index e5b8b86a..79198349 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -19,11 +19,6 @@ from cellpack.autopack.GeometryTools import GeometryTools from cellpack.autopack.ldSequence import halton from cellpack.autopack.plotly_result import PlotlyAnalysis -<<<<<<< HEAD -======= -from cellpack.autopack.upy import colors as col -from cellpack.autopack.upy.colors import map_colors ->>>>>>> feature/send_config_data_to_analysis from cellpack.autopack.utils import check_paired_key, get_paired_key, get_seed_list from cellpack.autopack.writers import Writer from cellpack.autopack.writers.ImageWriter import ImageWriter From d548737c93a972e00dfae04d5d65da8951e38175 Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Tue, 7 May 2024 14:56:47 -0700 Subject: [PATCH 05/12] Remove unused functions and add documentation --- cellpack/autopack/Analysis.py | 148 +++++++--------------------------- 1 file changed, 28 insertions(+), 120 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index 79198349..8804ee58 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -72,7 +72,6 @@ def __init__( self.figures_path = self.output_path / "figures" self.figures_path.mkdir(parents=True, exist_ok=True) self.seed_to_results = {} - autopack._colors = None @staticmethod def cartesian_to_sph(xyz, center=None): @@ -177,7 +176,13 @@ def plot_distance_distribution(self, all_ingredient_distances): def get_obj_dict(self, packing_results_path): """ Returns the object dictionary from the input path folder. - TODO: add description of object dictionary + + Args: + packing_results_path (str): The path to the folder containing the packing results. + + Returns: + tuple: A tuple containing the object dictionary and the list of all positions. + """ file_list = Path(packing_results_path).glob("positions_*.json") all_pos_list = [] @@ -269,7 +274,7 @@ def get_ingredient_radii( ingredient_radii[object_key] = object_values["radius"] return ingredient_radii - def get_dict_from_glob( + def read_dict_from_glob_file( self, glob_str, ): @@ -500,7 +505,9 @@ def create_report( ) if not hasattr(self, "ingredient_key_dict"): - self.ingredient_key_dict = self.get_dict_from_glob("ingredient_keys_*") + self.ingredient_key_dict = self.read_dict_from_glob_file( + "ingredient_keys_*" + ) if ingredient_keys is None: ingredient_keys = list(self.ingredient_key_dict.keys()) @@ -511,7 +518,7 @@ def create_report( ingredient_radii = self.get_ingredient_radii(recipe_data=recipe_data) if not hasattr(self, "pairwise_distance_dict"): - self.pairwise_distance_dict = self.get_dict_from_glob( + self.pairwise_distance_dict = self.read_dict_from_glob_file( "pairwise_distances_*.json" ) @@ -685,88 +692,6 @@ def pack( self.plotly.add_ingredient_positions(self.env) self.plotly.show() - def calcDistanceMatrixFastEuclidean2(self, nDimPoints): - nDimPoints = numpy.array(nDimPoints) - n, m = nDimPoints.shape - delta = numpy.zeros((n, n), "d") - for d in range(m): - data = nDimPoints[:, d] - delta += (data - data[:, numpy.newaxis]) ** 2 - return numpy.sqrt(delta) - - def flush(self): - import gc - import pprint - - for i in range(2): - print("Collecting %d ..." % i) - n = gc.collect() - print("Unreachable objects:", n) - print("Remaining Garbage:") - pprint.pprint(gc.garbage) - del gc.garbage[:] - - def merge(self, d1, d2, merge=lambda x, y: y): - result = dict(d1) - for k, v in d2.items(): - if k in result: - result[k].extend(v) - else: - result[k] = v - return result - - def plotNResult2D(self, n, bbox=[[0.0, 0, 0.0], [1000.0, 1000.0, 1000.0]]): - for i in range(n): - f = "results_seed_" + str(i) + ".json" - self.plot_one_result_2d(filename=f, bbox=bbox) - - def plot_one_result_2d( - self, - data=None, - filename=None, - bbox=[[0.0, 0, 0.0], [1000.0, 1000.0, 1000.0]], - ): - if data is None and filename is None: - return - elif data is None and filename is not None: - with open(filename) as data_file: - data = json.load(data_file) - fig = plt.figure() - ax = fig.add_subplot(111) - radius = {} - ingrrot = {} - ingrpos = {} - for recipe in data: - for ingrname in data[recipe]: - for k in range(len(data[recipe][ingrname]["results"])): - if ingrname not in ingrrot: - ingrrot[ingrname] = [] - ingrpos[ingrname] = [] - radius[ingrname] = data[recipe][ingrname][ - "encapsulating_radius" - ] - ingrrot[ingrname].append(data[recipe][ingrname]["results"][k][1]) - ingrpos[ingrname].append(data[recipe][ingrname]["results"][k][0]) - for ingr in ingrpos: - for i, p in enumerate(ingrpos[ingr]): - ax.add_patch( - Circle( - (p[0], p[1]), - radius[ingr], - edgecolor="black", - facecolor="red", - ) - ) - ax.set_aspect(1.0) - plt.axhline(y=bbox[0][1], color="k") - plt.axhline(y=bbox[1][1], color="k") - plt.axvline(x=bbox[0][0], color="k") - plt.axvline(x=bbox[1][0], color="k") - plt.axis([bbox[0][0], bbox[1][0], bbox[0][1], bbox[1][1]]) - plt.savefig("plot" + ingr + ".png") - plt.close() # closes the current figure - return ingrpos - def set_ingredient_color(self, ingr): """ Sets the color of an ingredient @@ -853,7 +778,10 @@ def add_ingredient_positions_to_plot( ) # plot the sphere if ingr.use_rbsphere: - (ext_recipe, pts,) = ingr.getInterpolatedSphere( + ( + ext_recipe, + pts, + ) = ingr.getInterpolatedSphere( seed_ingredient_positions[-i - 1], seed_ingredient_positions[-i], ) @@ -868,21 +796,6 @@ def add_ingredient_positions_to_plot( ) return ax - def one_exp(self, seed, output_path, eid=0, nmol=1, periodicity=True, dim=2): - output = output_path + str(nmol) - if periodicity: - self.env.use_periodicity = True - autopack.testPeriodicity = True - else: - self.env.use_periodicity = False - autopack.testPeriodicity = False - if dim == 3: - autopack.biasedPeriodicity = [1, 1, 1] - else: - autopack.biasedPeriodicity = [1, 1, 0] - if not os.path.exists(output): - os.makedirs(output) - def getHaltonUnique(self, n): seeds_f = numpy.array(halton(int(n * 1.5))) * int(n * 1.5) seeds_int = numpy.array(numpy.round(seeds_f), "int") @@ -948,7 +861,7 @@ def update_distance_distribution_dictionaries( ingredient_occurence_dict, ) - def update_crosswise_distances( + def update_pairwise_distances( self, ingr, recipe, pairwise_distance_dict, seed_index ): """ @@ -962,9 +875,9 @@ def update_crosswise_distances( ingr.name, ingr2.name, ): - pairwise_distance_dict[seed_index][ - f"{ingr.name}_{ingr2.name}" - ] = self.env.calc_pairwise_distances(ingr.name, ingr2.name).tolist() + pairwise_distance_dict[seed_index][f"{ingr.name}_{ingr2.name}"] = ( + self.env.calc_pairwise_distances(ingr.name, ingr2.name).tolist() + ) return pairwise_distance_dict @@ -1017,7 +930,7 @@ def process_ingredients_in_recipe( ) # calculate cross ingredient_distances - pairwise_distance_dict = self.update_crosswise_distances( + pairwise_distance_dict = self.update_pairwise_distances( ingr, recipe, pairwise_distance_dict, seed_index ) @@ -1204,18 +1117,13 @@ def pack_one_seed( if plot_figures and two_d: ax.set_aspect(1.0) - plt.axhline(y=bounding_box[0][1], color="k") - plt.axhline(y=bounding_box[1][1], color="k") - plt.axvline(x=bounding_box[0][0], color="k") - plt.axvline(x=bounding_box[1][0], color="k") - plt.axis( - [ - bounding_box[0][0], - bounding_box[1][0], - bounding_box[0][1], - bounding_box[1][1], - ] - ) + ax.axhline(y=bounding_box[0][1], color="k") + ax.axhline(y=bounding_box[1][1], color="k") + ax.axvline(x=bounding_box[0][0], color="k") + ax.axvline(x=bounding_box[1][0], color="k") + ax.set_xlim([bounding_box[0][0], bounding_box[1][0]]) + ax.set_ylim([bounding_box[0][1], bounding_box[1][1]]) + plt.savefig(self.figures_path / f"packing_image_{seed_basename}.png") plt.close() # closes the current figure From a6d52f32b60fe9279c55ea9217e8869b9c7a27aa Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Tue, 7 May 2024 15:00:06 -0700 Subject: [PATCH 06/12] Refactor point chunking in MeshStore.py --- cellpack/autopack/MeshStore.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cellpack/autopack/MeshStore.py b/cellpack/autopack/MeshStore.py index 31a57381..781770e4 100644 --- a/cellpack/autopack/MeshStore.py +++ b/cellpack/autopack/MeshStore.py @@ -268,11 +268,8 @@ def contains_points_mesh(self, geomname, points): else: # chunk the points for i in range(0, len(points), CHUNK_SIZE): - chunk = points[i : min(i + CHUNK_SIZE, len(points))] - if i == 0: - inside = mesh.contains(chunk) - else: - inside = numpy.append(inside, mesh.contains(chunk)) + chunk = points[i: i + CHUNK_SIZE] + inside[i: i + CHUNK_SIZE] = mesh.contains(chunk) return inside return inside From 0f020c59a2ee2a2d38e1d13210156950a4a1278b Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Wed, 8 May 2024 09:28:39 -0700 Subject: [PATCH 07/12] organize imports --- cellpack/autopack/Analysis.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index 8804ee58..f72d4f25 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -4,8 +4,9 @@ @author: ludo """ +import concurrent.futures import json -import os +import multiprocessing from pathlib import Path from time import time @@ -22,8 +23,6 @@ from cellpack.autopack.utils import check_paired_key, get_paired_key, get_seed_list from cellpack.autopack.writers import Writer from cellpack.autopack.writers.ImageWriter import ImageWriter -import concurrent.futures -import multiprocessing class Analysis: From 1a024a8012f564dd26f6f167a75a0c08dd02b297 Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Wed, 8 May 2024 10:12:29 -0700 Subject: [PATCH 08/12] Move over functions from Analysis.py --- cellpack/autopack/utils.py | 1188 +++++++++++++++++++++++++++++++++++- 1 file changed, 1187 insertions(+), 1 deletion(-) diff --git a/cellpack/autopack/utils.py b/cellpack/autopack/utils.py index 01d856ce..a9cca11e 100644 --- a/cellpack/autopack/utils.py +++ b/cellpack/autopack/utils.py @@ -1,8 +1,9 @@ import collections import copy -import numpy import pickle +import numpy + def get_distance(pt1, pt2): return numpy.linalg.norm(pt2 - pt1) @@ -280,3 +281,1188 @@ def get_seed_list(packing_config_data, recipe_data): ] return seed_list + + +# These functions have been moved over from Analysis.py +""" +Created on Mon May 6 22:58:44 2013 + +@author: ludo +""" + +import csv +import json +import math +import os + +import numpy as np +import pandas as pd +import seaborn as sns +import trimesh +from matplotlib import pyplot as plt +from matplotlib.patches import Circle, Patch +from PIL import Image +from scipy import stats +from scipy.cluster import hierarchy +from scipy.spatial import distance +from sklearn.metrics import matthews_corrcoef +from tqdm import tqdm + +import cellpack.autopack as autopack +from cellpack.autopack.GeometryTools import Rectangle +from cellpack.autopack.MeshStore import MeshStore +from cellpack.autopack.upy import colors as col +from cellpack.autopack.upy.colors import map_colors + + +def get_xyz_dict_from_all_pos_dict(all_pos_dict): + """ + returns array of x, y, and z positions for each seed for runs + in all_pos_dict + """ + all_objs = {} + for seed, object_dict in all_pos_dict.items(): + for obj, positions in object_dict.items(): + positions = np.array(positions) + if obj not in all_objs: + all_objs[obj] = {} + if seed not in all_objs[obj]: + all_objs[obj][seed] = {} + for ct, dim in enumerate(["x", "y", "z"]): + all_objs[obj][seed][dim] = positions[:, ct] + return all_objs + + +def save_array_to_file(array_to_save, file_path, seed_index): + f_handle = open(file_path, "a" if seed_index else "w") + np.savetxt( + f_handle, + array_to_save, + delimiter=",", + ) + f_handle.close() + + +def getPositionsFromResFile(self): + # could actually restore file using histoVol. + # or not + # need to parse apr file here anyway + return [] + + +def getPositionsFromObject(self, parents): + positions = [] + for parent in parents: + obparent = self.helper.getObject(parent) + children = self.helper.getChilds(obparent) + for ch in children: + ingr_name = self.helper.getName(ch) + meshp = self.helper.getObject("Meshs_" + ingr_name.split("_")[0]) + if meshp is None: + c = self.helper.getChilds(ch) + if not len(c): + continue + meshp_children = self.helper.getChilds( + c[0] + ) # continue #should get sphere/cylnder parent ? + else: + meshp_children = self.helper.getChilds(meshp) + for cc in meshp_children: + pos = self.helper.ToVec(self.helper.getTranslation(cc)) + positions.append(pos) + return positions + + +def getDistanceFrom(self, target, parents=None, **options): + """ + target : name or host object target or target position + parent : name of host parent object for the list of object to measure distance from + objects : list of object or list of points + """ + # get distance from object to the target. + # all object are in env.packed_objects + # get options + + if isinstance(target, (list, tuple)): + targetPos = target + elif isinstance(target, str): + o = self.helper.getObject(target) + if o is not None: + targetPos = self.helper.ToVec(self.helper.getTranslation(o)) + else: + o = self.helper.getObject(target) + if o is not None: + targetPos = self.helper.ToVec(self.helper.getTranslation(o)) + listCenters = [] + if self.result_file is None: + if parents is None and self.result_file is None: + listeParent = [self.env.name + "_cytoplasm"] + for o in self.env.compartments: + listeParent.append(o.name + "_Matrix") + listeParent.append(o.name + "_surface") + elif parents is not None and self.result_file is None: + listeParent = parents + listCenters = self.getPositionsFromObject(listeParent) + + delta = np.array(listCenters) - np.array(targetPos) + delta *= delta + distA = np.sqrt(delta.sum(1)) + return distA + + +def getClosestDistance(self, parents=None, **options): + if self.result_file is None: + if parents is None and self.result_file is None: + listeParent = [self.env.name + "_cytoplasm"] + for o in self.env.compartments: + listeParent.append(o.name + "_Matrix") + listeParent.append(o.name + "_surface") + elif parents is not None and self.result_file is None: + listeParent = parents + listeCenters = self.getPositionsFromObject(listeParent) + else: + # use data from file + # TODO: currently getPositionsFromResFile returns an empty list + # listeCenters = self.getPositionsFromResFile(listeParent) + listeCenters = [] + # is the distance in the result array ? + listeDistance = np.zeros(len(listeCenters)) + 99999 + for i in range(len(listeCenters)): + for j in range(i + 1, len(listeCenters)): + # should use point + d = self.helper.measure_distance(listeCenters[i], listeCenters[j]) + if d < listeDistance[i]: + listeDistance[i] = d + return listeDistance + + +def displayDistance( + self, + ramp_color1=[1, 0, 0], + ramp_color2=[0, 0, 1], + ramp_color3=None, + cutoff=60.0, +): + distances = np.array(self.env.grid.distToClosestSurf[:]) + mask = distances > cutoff + ind = np.nonzero(mask)[0] + distances[ind] = cutoff + mask = distances < 0 # -cutoff + ind = np.nonzero(mask)[0] + distances[ind] = 0 # cutoff + base = self.helper.getObject(self.env.name + "distances_base") + if base is None: + base = self.helper.Sphere(self.env.name + "distances_base")[0] + p = self.helper.getObject(self.env.name + "distances") + if p is not None: + self.helper.deleteObject(p) # recursif? + p = self.helper.newEmpty(self.env.name + "distances") + # can use cube also + + +def displayDistanceCube( + self, + ramp_color1=[1, 0, 0], + ramp_color2=[0, 0, 1], + ramp_color3=None, + cutoff=60.0, +): + distances = np.array(self.env.grid.distToClosestSurf[:]) + mask = distances > cutoff + ind = np.nonzero(mask)[0] + distances[ind] = cutoff + mask = distances < 0 # -cutoff + ind = np.nonzero(mask)[0] + distances[ind] = 0 # cutoff + base = self.helper.getObject(self.env.name + "distances_base_cube") + if base is None: + # base=self.helper.Sphere(self.env.name+"distances_base")[0] + size = self.env.grid.gridSpacing + base = self.helper.box( + self.env.name + "distances_base_cube", + center=[0.0, 0.0, 0.0], + size=[size, size, size], + )[0] + parent_cube = self.helper.getObject(self.env.name + "distances_cubes") + if parent_cube is not None: + self.helper.deleteObject(parent_cube) # recursif? + parent_cube = self.helper.newEmpty(self.env.name + "distances_cubes") + + +def displayDistancePlane( + self, + ramp_color1=[1, 0, 0], + ramp_color2=[0, 0, 1], + ramp_color3=None, + cutoff=60.0, +): + # which axis ? + distances = np.array(self.env.grid.distToClosestSurf[:]) + + ramp = col.getRamp([ramp_color1, ramp_color2], size=255) # color + mask = distances > cutoff + ind = np.nonzero(mask)[0] + distances[ind] = cutoff + mask = distances < 0 # -cutoff + ind = np.nonzero(mask)[0] + distances[ind] = 0 # cutoff + newd = np.append(distances, cutoff) + colors = map_colors(newd, ramp)[:-1] # 1D array of the grid x,y,1 + autopack._colors = colors + p = self.helper.getObject(self.env.name + "distances") + if p is not None: + self.helper.deleteObject(p) # recursif? + p = self.helper.newEmpty(self.env.name + "distances_p") + + d = np.array(self.env.grid.boundingBox[0]) - np.array(self.env.grid.boundingBox[1]) + p, mpl = self.helper.plane( + self.env.name + "distances_plane", + center=self.env.grid.getCenter(), + size=[math.fabs(d[0]), math.fabs(d[1])], + parent=p, + ) + self.helper.rotateObj(p, [0, 0, -math.pi / 2.0]) + filename = ( + autopack.cache_results + os.sep + self.env.name + "distances_plane_texture.png" + ) + c = colors.reshape( + ( + self.env.grid.nbGridPoints[0], + self.env.grid.nbGridPoints[1], + self.env.grid.nbGridPoints[2], + 3, + ) + ) + + im = Image.fromstring( + "RGB", (c.shape[0], c.shape[1]), np.uint8(c * 255.0).tostring() + ) + im.save(str(filename)) + mat = self.helper.createTexturedMaterial(self.env.name + "planeMat", str(filename)) + # assign the material to the plane + self.helper.assignMaterial(p, mat, texture=True) + + +def grabResultFromJSON(self, n): + ingrrot = {} + ingrpos = {} + for i in range(n): + with open("results_seed_" + str(i) + ".json") as data_file: + data = json.load(data_file) + for recipe in data: + for ingrname in data[recipe]: + for k in range(len(data[recipe][ingrname]["results"])): + if ingrname not in ingrrot: + ingrrot[ingrname] = [] + ingrpos[ingrname] = [] + ingrrot[ingrname].append(data[recipe][ingrname]["results"][k][1]) + ingrpos[ingrname].append(data[recipe][ingrname]["results"][k][0]) + return ingrpos, ingrrot + + +def grabResultFromTXT(self, n, doanalyze=False): + from autopack import transformation as t + + ingrrot = {} + ingrpos = {} + for i in range(1000): + files = open("results_seed_" + str(i) + ".txt", "r") + lines = files.readlines() + files.close() + for line in lines: + line = line.replace("<", " ").replace(">", " ") + elem = line.split() + ingrname = elem[-5] + if ingrname not in ingrrot: + ingrrot[ingrname] = [] + ingrpos[ingrname] = [] + ingrrot[ingrname].append(eval(elem[2])) + ingrpos[ingrname].append(eval(elem[0])) + for ingrname in ingrrot: + ingrrot[ingrname] = [np.array(m).reshape((4, 4)) for m in ingrrot[ingrname]] + if doanalyze: + for ingrname in ingrrot: + eulers3 = [t.euler_from_matrix(m, "rxyz") for m in ingrrot[ingrname]] + e3 = np.degrees(np.array(eulers3)).transpose() + np.savetxt( + ingrname + "_euler_X.csv", + np.array(e3[0]), + delimiter=",", + ) + np.savetxt( + ingrname + "_euler_Y.csv", + np.array(e3[1]), + delimiter=",", + ) + np.savetxt( + ingrname + "_euler_Z.csv", + np.array(e3[2]), + delimiter=",", + ) + self.histogram(e3[0], ingrname + "_euler_X.png") + self.histogram(e3[1], ingrname + "_euler_Y.png") + self.histogram(e3[2], ingrname + "_euler_Z.png") + return ingrpos, ingrrot + + +# should take any type of list... +def save_csv(self, data, filename=None): + if filename is None: + filename = "output.csv" + resultFile = open(filename, "wb") + wr = csv.writer(resultFile, dialect="excel") + # wr.writerows(data) list of list ? + # resultFile.close() + for item in data: + wr.writerow([item]) + resultFile.close() + + +def rectangle_circle_area(self, bbox, center, radius): + # http://www.eex-dev.net/index.php?id=100 + # [[0.,0,0],[1000,1000,1]] + # top,bottom, right, left + # rect=Rectangle(bbox[0][0],bbox[1][0],bbox[0][1],bbox[1][1])#top,bottom, right, left + rect = Rectangle( + bbox[1][1], bbox[0][1], bbox[1][0], bbox[0][0] + ) # top,bottom, right, left + m = [center[0], center[1]] + r = radius + area = math.pi * r**2 + chs = self.g.check_sphere_inside(rect, m, r) + if chs: # sph not completly inside + ch = self.g.check_rectangle_oustide(rect, m, r) + if ch: # rectangle not outside + leftBound, rightBound = self.g.getBoundary(rect, m, r) + area = self.g.get_rectangle_cercle_area(rect, m, r, leftBound, rightBound) + else: + area = bbox[0][1] ** 2 + return area + + +def getVolumeShell(self, bbox, radii, center): + # rectangle_circle_area + volumes = [] + box_size0 = bbox[1][0] - bbox[0][0] + for i in range(len(radii) - 1): + r1 = radii[i] + r2 = radii[i + 1] + v1 = self.g.calc_volume(r1, box_size0 / 2.0) + v2 = self.g.calc_volume(r2, box_size0 / 2.0) + volumes.append(v2 - v1) + return volumes + + +def rdf_3d(self, ingr): + # see for intersection volume here http://crowsandcats.blogspot.com/2013/04/cube-sphere-intersection-volume.html + # and here http://crowsandcats.blogspot.com/2013/05/extending-radial-distributions.html + # will require scipy...worth it ? + # should be pairewise distance ? or not ? + distances = np.array(self.env.distances[ingr.name]) + basename = self.env.basename + np.savetxt( + basename + ingr.name + "_pos.csv", + np.array(self.env.ingredient_positions[ingr.name]), + delimiter=",", + ) + self.histogram(distances, basename + ingr.name + "_histo.png") + np.savetxt( + basename + ingr.name + "_distances.csv", + np.array(distances), + delimiter=",", + ) + # the bin should be not less than the biggest ingredient radius + # b=int(distances.max()/self.largest) + b = 100 + # bin_edges = np.arange(0, min(box_size) / 2, bin_width) + new_rdf, edges = np.histogramdd( + distances, bins=b, range=[(distances.min(), distances.max())] + ) + radii = edges[0] + # from http://isaacs.sourceforge.net/phys/rdfs.html + dnr = new_rdf + N = len(distances) + V = ( + self.env.grid.nbGridPoints[0] + * self.env.grid.nbGridPoints[1] + * self.env.grid.nbGridPoints[2] + * self.env.grid.gridSpacing**3 + ) + Vshell = np.array(self.getVolumeShell(self.bbox, radii, self.center)) + gr = (dnr * V) / (N * Vshell) + np.savetxt(basename + ingr.name + "_rdf.csv", np.array(gr), delimiter=",") + self.plot(gr, radii[:-1], basename + ingr.name + "_rdf.png") + + +def getAreaShell(self, bbox, radii, center): + # rectangle_circle_area + areas = [] + for i in range(len(radii) - 1): + r1 = radii[i] + r2 = radii[i + 1] + area1 = self.rectangle_circle_area(bbox, center, r1) + area2 = self.rectangle_circle_area(bbox, center, r2) + if area1 == 0 or area2 == 0: + areas.append(np.pi * (np.power(r2, 2) - np.power(r1, 2))) + else: + areas.append(area2 - area1) + return areas + + +def ripley(self, positions, dr=25, rMax=None): + # K(t) = A*SUM(wij*I(i,j)/n**2) + # lambda = n/A A is the area of the region containing all points + # I indicator function 1 if its operand is true, 0 otherwise + # t is the search radius + # if homogenous K(s) = pi*s**2 + # L(t) = (K(t)/pi)**1/2 + # A common plot is a graph of t - \hat{L}(t) against t + # which will approximately follow the horizontal zero-axis with constant + # dispersion if the data follow a homogeneous Poisson process. + N = len(positions) + V = 1000**2 + diag = np.sqrt(1000**2 + 1000**2) + dr = dr # all_distance.min() + if rMax is None: + rMax = diag + edges = np.arange(dr, rMax + 1.1 * dr, dr) + k = np.zeros((N, len(edges))) + for i, p in enumerate(positions): + di = distance.cdist( + positions, + [p], + "euclidean", + ) + # dV = np.array(analyse.getAreaShell(analyse.bbox,edges,p)) + for j, e in enumerate(edges): + area0 = math.pi * e**2 # complete circle + area1 = self.rectangle_circle_area(self.bbox, p, e) + w = area1 / area0 + k[i, j] = w * len(np.nonzero(di < e)[0]) / N**2 + Kt = V * np.sum(k, axis=0) + Lt = (Kt / np.pi) ** 0.5 + return Kt, Lt + + +def rdf(self, positions, dr=10, rMax=None): + N = len(positions) + V = 1000**2 + diag = np.sqrt(1000**2 + 1000**2) + dr = dr # all_distance.min() + if rMax is None: + rMax = diag + edges = np.arange(0.0, rMax + 1.1 * dr, dr) + g = np.zeros((N, len(edges) - 1)) + dv = [] + density = float(N) / float(V) + for i, p in enumerate(positions): + di = distance.cdist( + positions, + [p], + "euclidean", + ) + dN, bins = np.histogram(di, bins=edges) + dV = np.array(self.getAreaShell(self.bbox, edges, p)) + dv.append(dV) + g[i] = dN / (dV * density) + avg = np.average(g, axis=0) # /np.array(dv) + return avg + + +def rdf_2d(self, ingr): + # dN/N / dV/V = dN/dV * V/N + distances = np.array(self.env.distances[ingr.name]) + basename = self.env.basename + np.savetxt( + basename + ingr.name + "_pos.csv", + np.array(self.env.ingredient_positions[ingr.name]), + delimiter=",", + ) + self.histogram(distances, basename + ingr.name + "_histo.png") + np.savetxt( + basename + ingr.name + "_distances.csv", + np.array(distances), + delimiter=",", + ) + # the bin should be not less than the biggest ingredient radius + # b=int(distances.max()/self.largest) + new_rdf, edges = np.histogramdd( + distances + ) # , bins=b, range=[(distances.min(), distances.max())],normed=0) + radii = edges[0] + # r=radii.tolist() + # r.insert(0,0.0) + # radii = np.array(r) + # rdf= new_rdf.tolist() + # rdf.insert(0,0) + # new_rdf = np.array(rdf) + # from http://isaacs.sourceforge.net/phys/rdfs.html + dnr = new_rdf[:] + N = len(distances) + V = ( + self.env.grid.nbGridPoints[0] + * self.env.grid.nbGridPoints[1] + * self.env.grid.gridSpacing**2 + ) + Vshell = np.array(self.getAreaShell(self.bbox, radii, self.center)) + # print Vshell + # Vshell1 = np.pi*density*(np.power(radii[1:],2)-np.power(radii[:-1], 2)) + # print Vshell1 + # print radii + gr = (dnr * V) / (N * Vshell) + np.savetxt(basename + ingr.name + "_rdf.csv", np.array(gr), delimiter=",") + self.plot(gr, radii[:-1], basename + ingr.name + "_rdf.png") + # simpl approach Ni/Areai + G = dnr / Vshell + np.savetxt( + basename + ingr.name + "_rdf_simple.csv", + np.array(G), + delimiter=",", + ) + self.plot( + np.array(G), + radii[:-1], + basename + ingr.name + "_rdf_simple.png", + ) + + +def correlation(self, ingr): + basename = self.env.basename + posxyz = np.array(self.env.ingredient_positions[ingr.name]).transpose() + g_average, radii, x, y, z = self.PairCorrelationFunction_3D(posxyz, 1000, 900, 100) + self.plot(g_average, radii, basename + ingr.name + "_corr.png") + + +def PairCorrelationFunction_3D(self, data, S, rMax, dr): + """Compute the three-dimensional pair correlation function for a set of + spherical particles contained in a cube with side length S. This simple + function finds reference particles such that a sphere of radius rMax drawn + around the particle will fit entirely within the cube, eliminating the need + to compensate for edge effects. If no such particles exist, an error is + returned. Try a smaller rMax...or write some code to handle edge effects! ;) + Arguments: + x an array of x positions of centers of particles + y an array of y positions of centers of particles + z an array of z positions of centers of particles + S length of each side of the cube in space + rMax outer diameter of largest spherical shell + dr increment for increasing radius of spherical shell + + Returns a tuple: (g, radii, interior_x, interior_y, interior_z) + g(r) a np array containing the correlation function g(r) + radii a np array containing the radii of the + spherical shells used to compute g(r) + interior_x x coordinates of reference particles + interior_y y coordinates of reference particles + interior_z z coordinates of reference particles + """ + x = data[0] + y = data[1] + z = data[2] + # Find particles which are close enough to the cube center that a sphere of radius + # rMax will not cross any face of the cube + bools1 = x > rMax + bools2 = x < (S - rMax) + bools3 = y > rMax + bools4 = y < (S - rMax) + bools5 = z > rMax + bools6 = z < (S - rMax) + + (interior_indices,) = np.where(bools1 * bools2 * bools3 * bools4 * bools5 * bools6) + num_interior_particles = len(interior_indices) + + if num_interior_particles < 1: + raise RuntimeError( + "No particles found for which a sphere of radius rMax\ +will lie entirely within a cube of side length S. Decrease rMax\ +or increase the size of the cube." + ) + + edges = np.arange(0.0, rMax + 1.1 * dr, dr) + num_increments = len(edges) - 1 + g = np.zeros([num_interior_particles, num_increments]) + radii = np.zeros(num_increments) + numberDensity = len(x) / S**3 + + # Compute pairwise correlation for each interior particle + for p in range(num_interior_particles): + index = interior_indices[p] + d = np.sqrt((x[index] - x) ** 2 + (y[index] - y) ** 2 + (z[index] - z) ** 2) + d[index] = 2 * rMax + + (result, bins) = np.histogram(d, bins=edges, normed=False) + g[p, :] = result / numberDensity + + # Average g(r) for all interior particles and compute radii + g_average = np.zeros(num_increments) + for i in range(num_increments): + radii[i] = (edges[i] + edges[i + 1]) / 2.0 + rOuter = edges[i + 1] + rInner = edges[i] + g_average[i] = np.average(g[:, i]) / ( + 4.0 / 3.0 * np.pi * (rOuter**3 - rInner**3) + ) + + return ( + g_average, + radii, + x[interior_indices], + y[interior_indices], + z[interior_indices], + ) + # Number of particles in shell/total number of particles/volume of shell/number density + # shell volume = 4/3*pi(r_outer**3-r_inner**3) + + +def PairCorrelationFunction_2D(self, x, y, S, rMax, dr): + """Compute the two-dimensional pair correlation function, also known + as the radial distribution function, for a set of circular particles + contained in a square region of a plane. This simple function finds + reference particles such that a circle of radius rMax drawn around the + particle will fit entirely within the square, eliminating the need to + compensate for edge effects. If no such particles exist, an error is + returned. Try a smaller rMax...or write some code to handle edge effects! ;) + + Arguments: + x an array of x positions of centers of particles + y an array of y positions of centers of particles + S length of each side of the square region of the plane + rMax outer diameter of largest annulus + dr increment for increasing radius of annulus + + Returns a tuple: (g, radii, interior_x, interior_y) + g(r) a np array containing the correlation function g(r) + radii a np array containing the radii of the + annuli used to compute g(r) + interior_x x coordinates of reference particles + interior_y y coordinates of reference particles + """ + # Number of particles in ring/area of ring/number of reference particles/number density + # area of ring = pi*(r_outer**2 - r_inner**2) + # Find particles which are close enough to the box center that a circle of radius + # rMax will not cross any edge of the box + bools1 = x > 1.1 * rMax + bools2 = x < (S - 1.1 * rMax) + bools3 = y > rMax * 1.1 + bools4 = y < (S - rMax * 1.1) + (interior_indices,) = np.where(bools1 * bools2 * bools3 * bools4) + num_interior_particles = len(interior_indices) + + if num_interior_particles < 1: + raise RuntimeError( + "No particles found for which a circle of radius rMax\ + will lie entirely within a square of side length S. Decrease rMax\ + or increase the size of the square." + ) + + edges = np.arange(0.0, rMax + 1.1 * dr, dr) + num_increments = len(edges) - 1 + g = np.zeros([num_interior_particles, num_increments]) + radii = np.zeros(num_increments) + numberDensity = len(x) / S**2 + + # Compute pairwise correlation for each interior particle + for p in range(num_interior_particles): + index = interior_indices[p] + d = np.sqrt((x[index] - x) ** 2 + (y[index] - y) ** 2) + d[index] = 2 * rMax + + (result, bins) = np.histogram(d, bins=edges, normed=False) + g[p, :] = result / numberDensity + + # Average g(r) for all interior particles and compute radii + g_average = np.zeros(num_increments) + for i in range(num_increments): + radii[i] = (edges[i] + edges[i + 1]) / 2.0 + rOuter = edges[i + 1] + rInner = edges[i] + # divide by the area of sphere cut by sqyare + g_average[i] = np.average(g[:, i]) / (np.pi * (rOuter**2 - rInner**2)) + + return (g_average, radii, interior_indices) + + +def normalize_similarity_df(self, similarity_df): + """ + Normalizes the similarity dataframe + """ + dims_to_normalize = self.get_list_of_dims() + ["pairwise_distance"] + for dim in dims_to_normalize: + values = similarity_df.loc[:, dim].values + normalized_values = (values - np.min(values)) / ( + np.max(values) - np.min(values) + ) + normalized_values[np.isnan(normalized_values)] = 0 + similarity_df.loc[:, dim] = normalized_values + return similarity_df + + +def calc_avg_similarity_values_for_dim(self, similarity_vals_for_dim): + packing_inds = np.cumsum( + np.hstack([0, self.num_seeds_per_packing]) + ) # returns the indices where packings start and end + avg_similarity_values = -np.ones((self.num_packings, self.num_packings)) + + for p1_id in range(self.num_packings): + for p2_id in range(self.num_packings): + if avg_similarity_values[p1_id, p2_id] >= 0: + continue + p1_inds = np.arange( + packing_inds[p1_id], packing_inds[p1_id + 1] + ) # indices corresponding to packing p1_id + p2_inds = np.arange(packing_inds[p2_id], packing_inds[p2_id + 1]) + avg_similarity_values[p1_id, p2_id] = np.mean( + similarity_vals_for_dim[p1_inds, p2_inds] + ) + avg_similarity_values[p2_id, p1_id] = avg_similarity_values[p1_id, p2_id] + return avg_similarity_values + + +def calc_similarity_df(self, all_objs, ingredient_key, save_path=None): + """ + Calculates a dataframe of similarity values between packings + """ + key_list = list(all_objs[ingredient_key].keys()) + similarity_df = pd.DataFrame( + index=key_list, + columns=pd.MultiIndex.from_product([self.get_list_of_dims(), key_list]), + dtype=float, + ) + similarity_df["packing_id"] = 0 + dims_to_calc = self.get_list_of_dims() + ["pairwise_distance"] + for seed1, pos_dict1 in tqdm(all_objs[ingredient_key].items()): + similarity_df.loc[seed1, "packing_id"] = seed1.split("_")[-1] + for seed2, pos_dict2 in all_objs[ingredient_key].items(): + for dim in dims_to_calc: + if dim == "pairwise_distance": + pos1 = np.array([pos_dict1["x"], pos_dict1["y"], pos_dict1["z"]]) + pos2 = np.array([pos_dict2["x"], pos_dict2["y"], pos_dict2["z"]]) + arr1 = distance.pdist(pos1.T) + arr2 = distance.pdist(pos2.T) + else: + arr1 = pos_dict1[dim] + arr2 = pos_dict2[dim] + + min_dim = np.min([arr1.ravel(), arr2.ravel()]) + max_dim = np.max([arr1.ravel(), arr2.ravel()]) + arr1 = (arr1 - min_dim) / max_dim + arr2 = (arr2 - min_dim) / max_dim + + if len(arr1) == 1 or len(arr2) == 1: + # cannot determine similarity when only one instance is packed + similarity_score = 0 + elif len(np.unique(arr1)) == 1 or len(np.unique(arr2)) == 1: + # if there is only one unique value, compare the value + if len(np.unique(arr1)) == 1 and len(np.unique(arr2)) == 1: + # both packings have only one unique value, compare the value + similarity_score = ( + 1 if np.unique(arr1) == np.unique(arr2) else 0 + ) + else: + # one of the packings has more than one unique value, cannot compare + similarity_score = 0 + else: + # anderson-darling test + # ad_stat = stats.anderson_ksamp([arr1, arr2]) + # similarity_score = (ad_stat.significance_level - 0.001) / ( + # 0.25 - 0.001 + # ) + # similarity_score = 1 - ad_stat.statistic + + # 2 sample ks + ks_stat = stats.ks_2samp(arr1, arr2) + similarity_score = 1 - ks_stat.statistic + # similarity_score = ks_stat.pvalue + + # histograms for bhattacharyya and jensen-shannon distances + # hist1, bin_edges1 = np.histogram(arr1, bins="auto", density=True) + # hist2, bin_edges2 = np.histogram(arr2, bins=bin_edges1, density=True) + + # bhattacharyya distance + # similarity_score = np.sqrt(np.sum(np.sqrt(hist1 * hist2))) + + # jensen-shannon distance + # similarity_score = 1 - distance.jensenshannon(arr1, arr2) + + similarity_df.loc[seed1, (dim, seed2)] = similarity_score + + similarity_df = self.normalize_similarity_df(similarity_df) + if save_path is not None: + dfpath = save_path / f"similarity_df_{ingredient_key}.csv" + print(f"Saving similarity df to {dfpath}") + similarity_df.to_csv(dfpath) + + return similarity_df + + +def plot_and_save_similarity_heatmaps(self, similarity_df, ingredient_key): + """ + Plots heatmaps with hierarchical clustering using similarity scores + """ + # accounting for changes when reading from csv + if similarity_df["packing_id"].ndim > 1: + packing_ids = similarity_df["packing_id"].iloc[:, 0] + else: + packing_ids = similarity_df["packing_id"] + lut = dict(zip(packing_ids.unique(), sns.color_palette())) + row_colors = packing_ids.map(lut) + row_colors.rename("Packing ID", inplace=True) + figdir = self.figures_path / "clustering" + figdir.mkdir(parents=True, exist_ok=True) + + dims_to_calc = self.get_list_of_dims() + ["pairwise_distance"] + for dim in dims_to_calc: + avg_similarity_values = self.calc_avg_similarity_values_for_dim( + similarity_df[dim].values + ) + np.savetxt( + figdir / f"avg_similarity_{ingredient_key}_{dim}.txt", + avg_similarity_values, + ) + + row_linkage = hierarchy.linkage( + 1 - distance.squareform(similarity_df[dim], checks=False), + method="ward", + optimal_ordering=True, + ) + + g = sns.clustermap( + similarity_df[dim], + row_colors=row_colors, + row_linkage=row_linkage, + col_linkage=row_linkage, + cbar_kws={"label": "similarity score"}, + ) + handles = [Patch(facecolor=lut[name]) for name in lut] + plt.legend( + handles, + lut, + title="Packing IDs", + bbox_to_anchor=(1, 1), + bbox_transform=plt.gcf().transFigure, + loc="upper right", + ) + g.ax_col_dendrogram.set_visible(False) + g.ax_row_dendrogram.set_visible(False) + g.ax_heatmap.set_xlabel(f"{ingredient_key}_{dim}") + g.savefig(figdir / f"clustermap_{ingredient_key}_{dim}", dpi=300) + + +def run_similarity_analysis( + self, + all_objs, + ingredient_key, + save_heatmaps=False, + recalculate=False, +): + """ + TODO: add docs + """ + print("Running similarity analysis...") + + if ingredient_key not in all_objs: + raise ValueError(f"Missing ingredient: {ingredient_key}") + + dfpath = self.output_path / f"similarity_df_{ingredient_key}.csv" + if dfpath.is_file() and not recalculate: + print(f"Loading similarity values from {dfpath}") + similarity_df = pd.read_csv(dfpath, header=[0, 1]) + else: + similarity_df = self.calc_similarity_df( + all_objs, + ingredient_key=ingredient_key, + save_path=self.output_path, + ) + + if save_heatmaps: + self.plot_and_save_similarity_heatmaps( + similarity_df, + ingredient_key=ingredient_key, + ) + + return similarity_df + + +def calc_and_save_correlations( + self, + all_spilr_scaled, + ingredient_key, +): + key_list = [ + f"{pc}_{sc}" + for pc in range(self.num_packings) + for sc in range(self.num_seeds_per_packing[pc]) + ] + corr_df = pd.DataFrame( + index=key_list, + columns=key_list, + dtype=float, + ) + corr_df["packing_id"] = np.nan + for pc1 in range(self.num_packings): + for sc1 in tqdm(range(self.num_seeds_per_packing[pc1])): + for pc2 in range(self.num_packings): + for sc2 in range(self.num_seeds_per_packing[pc2]): + corr_df.loc[f"{pc1}_{sc1}", "packing_id"] = pc1 + # do not calculate if: + # a) already calculated + # b) calculating for same packing + if (not np.isnan(corr_df.loc[f"{pc1}_{sc1}", f"{pc2}_{sc2}"])) or ( + (pc1 == pc2) and (sc1 == sc2) + ): + continue + corr_df.loc[f"{pc1}_{sc1}", f"{pc2}_{sc2}"] = matthews_corrcoef( + all_spilr_scaled[pc1, sc1].flatten(), + all_spilr_scaled[pc2, sc2].flatten(), + ) + corr_df.loc[f"{pc2}_{sc2}", f"{pc1}_{sc1}"] = corr_df.loc[ + f"{pc1}_{sc1}", f"{pc2}_{sc2}" + ] + df_packing = corr_df.pop("packing_id") + lut = dict(zip(df_packing.unique(), sns.color_palette())) + row_colors = df_packing.map(lut) + corr_df.fillna(0, inplace=True) + g = sns.clustermap( + corr_df, + row_colors=row_colors, + cbar_kws={"label": "spilr correlation"}, + ) + g.savefig(self.figures_path / f"spilr_correlation_{ingredient_key}.png", dpi=300) + + +def save_spilr_heatmap(self, input_dict, file_path, label_str=None): + fig, ax = plt.subplots() + g = sns.heatmap( + input_dict, + cbar=False, + xticklabels=False, + yticklabels=False, + ax=ax, + ) + g.set_xlabel("Angular coordinates") + g.set_ylabel(label_str) + g.invert_yaxis() + + fig.savefig( + file_path, + dpi=300, + ) + plt.close() + + +def get_parametrized_representation( + self, + all_pos_list, + ingredient_key=None, + mesh_paths={}, + num_angular_points=64, + save_plots=False, + max_plots_to_save=1, + get_correlations=False, +): + print("creating parametrized representations...") + + if "inner" not in mesh_paths or "outer" not in mesh_paths: + raise ValueError( + "Missing mesh paths required to generate parametrized representations" + ) + + inner_mesh = trimesh.load_mesh(mesh_paths.get("inner")) + outer_mesh = trimesh.load_mesh(mesh_paths.get("outer")) + + theta_vals = np.linspace(0, np.pi, 1 + num_angular_points) + phi_vals = np.linspace(0, 2 * np.pi, 1 + 2 * num_angular_points) + rad_vals = np.linspace(0, 1, 1 + num_angular_points) + + all_spilr = {} + for scaled_val in ["raw", "scaled"]: + all_spilr[scaled_val] = np.full( + ( + self.num_packings, + np.max(self.num_seeds_per_packing), + len(rad_vals), + len(theta_vals) * len(phi_vals), + ), + np.nan, + ) + + if save_plots: + save_dir = self.figures_path / "spilr_heatmaps" + os.makedirs(save_dir, exist_ok=True) + + for pc, (packing_id, packing_dict) in enumerate( + zip(self.packing_id_dict.values(), all_pos_list) + ): + num_saved_plots = 0 + for sc, (_, pos_dict) in enumerate(packing_dict.items()): + pos_list = np.array(pos_dict[ingredient_key]) + sph_pts = self.cartesian_to_sph(pos_list) + + ( + scaled_rad, + distance_between_surfaces, + inner_surface_distances, + ) = MeshStore.calc_scaled_distances_for_positions( + pos_list, inner_mesh, outer_mesh + ) + + trial_spilr = {} + for scaled_val in ["raw", "scaled"]: + rad_array = ( + np.linspace(0, distance_between_surfaces.max(), len(rad_vals)) + if scaled_val == "raw" + else rad_vals + ) + + trial_spilr[scaled_val] = np.zeros( + (len(rad_array), len(theta_vals), len(phi_vals)) + ) + + max_rad = distance_between_surfaces.max() if scaled_val == "raw" else 1 + + if np.any(rad_array > max_rad) or np.any(rad_array < 0): + raise ValueError("Check ray-mesh intersections!") + + rad_pos = inner_surface_distances if scaled_val == "raw" else scaled_rad + rad_inds = np.digitize(rad_pos, rad_array) - 1 + theta_inds = np.digitize(sph_pts[:, 1], theta_vals) - 1 + phi_inds = np.digitize(sph_pts[:, 2], phi_vals) - 1 + + trial_spilr[scaled_val][rad_inds, theta_inds, phi_inds] = 1 + + all_spilr[scaled_val][pc, sc] = trial_spilr[scaled_val].reshape( + (len(rad_array), -1) + ) + + if save_plots and (num_saved_plots <= max_plots_to_save): + label_str = f"Distance from Nuclear Surface, {packing_id}_{sc}, {scaled_val}" + file_path = ( + save_dir + / f"heatmap_{scaled_val}_{packing_id}_{sc}_{ingredient_key}" + ) + self.save_spilr_heatmap( + all_spilr[scaled_val][pc, sc], file_path, label_str + ) + num_saved_plots += 1 + + if get_correlations: + print("calculating correlations...") + self.calc_and_save_correlations( + all_spilr["scaled"], + ingredient_key=ingredient_key, + ) + + if save_plots: + for scaled_val in ["raw", "scaled"]: + average_spilr = np.nanmean(all_spilr[scaled_val], axis=1) + for pc, packing_id in enumerate(self.packing_id_dict.values()): + label_str = ( + f"Distance from Nuclear Surface, avg {packing_id}, {scaled_val}" + ) + file_path = ( + save_dir / f"avg_heatmap_{scaled_val}_{packing_id}_{ingredient_key}" + ) + self.save_spilr_heatmap(average_spilr[pc], file_path, label_str) + + return all_spilr + + +def calcDistanceMatrixFastEuclidean2(self, nDimPoints): + nDimPoints = np.array(nDimPoints) + n, m = nDimPoints.shape + delta = np.zeros((n, n), "d") + for d in range(m): + data = nDimPoints[:, d] + delta += (data - data[:, np.newaxis]) ** 2 + return np.sqrt(delta) + + +def flush(self): + import gc + import pprint + + for i in range(2): + print("Collecting %d ..." % i) + n = gc.collect() + print("Unreachable objects:", n) + print("Remaining Garbage:") + pprint.pprint(gc.garbage) + del gc.garbage[:] + + +def merge(self, d1, d2, merge=lambda x, y: y): + result = dict(d1) + for k, v in d2.items(): + if k in result: + result[k].extend(v) + else: + result[k] = v + return result + + +def plotNResult2D(self, n, bbox=[[0.0, 0, 0.0], [1000.0, 1000.0, 1000.0]]): + for i in range(n): + f = "results_seed_" + str(i) + ".json" + self.plot_one_result_2d(filename=f, bbox=bbox) + + +def plot_one_result_2d( + self, + data=None, + filename=None, + bbox=[[0.0, 0, 0.0], [1000.0, 1000.0, 1000.0]], +): + if data is None and filename is None: + return + elif data is None and filename is not None: + with open(filename) as data_file: + data = json.load(data_file) + fig = plt.figure() + ax = fig.add_subplot(111) + radius = {} + ingrrot = {} + ingrpos = {} + for recipe in data: + for ingrname in data[recipe]: + for k in range(len(data[recipe][ingrname]["results"])): + if ingrname not in ingrrot: + ingrrot[ingrname] = [] + ingrpos[ingrname] = [] + radius[ingrname] = data[recipe][ingrname]["encapsulating_radius"] + ingrrot[ingrname].append(data[recipe][ingrname]["results"][k][1]) + ingrpos[ingrname].append(data[recipe][ingrname]["results"][k][0]) + for ingr in ingrpos: + for i, p in enumerate(ingrpos[ingr]): + ax.add_patch( + Circle( + (p[0], p[1]), + radius[ingr], + edgecolor="black", + facecolor="red", + ) + ) + ax.set_aspect(1.0) + plt.axhline(y=bbox[0][1], color="k") + plt.axhline(y=bbox[1][1], color="k") + plt.axvline(x=bbox[0][0], color="k") + plt.axvline(x=bbox[1][0], color="k") + plt.axis([bbox[0][0], bbox[1][0], bbox[0][1], bbox[1][1]]) + plt.savefig("plot" + ingr + ".png") + plt.close() # closes the current figure + return + + +def one_exp(self, seed, output_path, eid=0, nmol=1, periodicity=True, dim=2): + output = output_path + str(nmol) + if periodicity: + self.env.use_periodicity = True + autopack.testPeriodicity = True + else: + self.env.use_periodicity = False + autopack.testPeriodicity = False + if dim == 3: + autopack.biasedPeriodicity = [1, 1, 1] + else: + autopack.biasedPeriodicity = [1, 1, 0] + if not os.path.exists(output): + os.makedirs(output) From 36e654db0d312334c38b8ab8bebd6bc47439b653 Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Wed, 8 May 2024 13:57:17 -0700 Subject: [PATCH 09/12] move imports to the top --- cellpack/autopack/utils.py | 46 ++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/cellpack/autopack/utils.py b/cellpack/autopack/utils.py index a9cca11e..5d86a9e5 100644 --- a/cellpack/autopack/utils.py +++ b/cellpack/autopack/utils.py @@ -1,8 +1,30 @@ import collections import copy +import csv +import json +import math +import os import pickle import numpy +import numpy as np +import pandas as pd +import seaborn as sns +import trimesh +from matplotlib import pyplot as plt +from matplotlib.patches import Circle, Patch +from PIL import Image +from scipy import stats +from scipy.cluster import hierarchy +from scipy.spatial import distance +from sklearn.metrics import matthews_corrcoef +from tqdm import tqdm + +import cellpack.autopack as autopack +from cellpack.autopack.GeometryTools import Rectangle +from cellpack.autopack.MeshStore import MeshStore +from cellpack.autopack.upy import colors as col +from cellpack.autopack.upy.colors import map_colors def get_distance(pt1, pt2): @@ -290,30 +312,6 @@ def get_seed_list(packing_config_data, recipe_data): @author: ludo """ -import csv -import json -import math -import os - -import numpy as np -import pandas as pd -import seaborn as sns -import trimesh -from matplotlib import pyplot as plt -from matplotlib.patches import Circle, Patch -from PIL import Image -from scipy import stats -from scipy.cluster import hierarchy -from scipy.spatial import distance -from sklearn.metrics import matthews_corrcoef -from tqdm import tqdm - -import cellpack.autopack as autopack -from cellpack.autopack.GeometryTools import Rectangle -from cellpack.autopack.MeshStore import MeshStore -from cellpack.autopack.upy import colors as col -from cellpack.autopack.upy.colors import map_colors - def get_xyz_dict_from_all_pos_dict(all_pos_dict): """ From 6a12dc8c8cd17965f95fff68d423b15f9ca3a30f Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Wed, 8 May 2024 13:57:34 -0700 Subject: [PATCH 10/12] format --- cellpack/autopack/Analysis.py | 11 ++++------- cellpack/autopack/MeshStore.py | 4 ++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index f72d4f25..0b5b22bf 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -777,10 +777,7 @@ def add_ingredient_positions_to_plot( ) # plot the sphere if ingr.use_rbsphere: - ( - ext_recipe, - pts, - ) = ingr.getInterpolatedSphere( + (ext_recipe, pts,) = ingr.getInterpolatedSphere( seed_ingredient_positions[-i - 1], seed_ingredient_positions[-i], ) @@ -874,9 +871,9 @@ def update_pairwise_distances( ingr.name, ingr2.name, ): - pairwise_distance_dict[seed_index][f"{ingr.name}_{ingr2.name}"] = ( - self.env.calc_pairwise_distances(ingr.name, ingr2.name).tolist() - ) + pairwise_distance_dict[seed_index][ + f"{ingr.name}_{ingr2.name}" + ] = self.env.calc_pairwise_distances(ingr.name, ingr2.name).tolist() return pairwise_distance_dict diff --git a/cellpack/autopack/MeshStore.py b/cellpack/autopack/MeshStore.py index 781770e4..17b549f4 100644 --- a/cellpack/autopack/MeshStore.py +++ b/cellpack/autopack/MeshStore.py @@ -268,8 +268,8 @@ def contains_points_mesh(self, geomname, points): else: # chunk the points for i in range(0, len(points), CHUNK_SIZE): - chunk = points[i: i + CHUNK_SIZE] - inside[i: i + CHUNK_SIZE] = mesh.contains(chunk) + chunk = points[i : i + CHUNK_SIZE] + inside[i : i + CHUNK_SIZE] = mesh.contains(chunk) return inside return inside From c78627f7f1f43e152ac2f397ff72d97fab3334b3 Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Tue, 28 May 2024 14:37:12 -0700 Subject: [PATCH 11/12] remove unused functions from utils --- cellpack/autopack/utils.py | 1186 +----------------------------------- 1 file changed, 1 insertion(+), 1185 deletions(-) diff --git a/cellpack/autopack/utils.py b/cellpack/autopack/utils.py index 5d86a9e5..01d856ce 100644 --- a/cellpack/autopack/utils.py +++ b/cellpack/autopack/utils.py @@ -1,30 +1,7 @@ import collections import copy -import csv -import json -import math -import os -import pickle - import numpy -import numpy as np -import pandas as pd -import seaborn as sns -import trimesh -from matplotlib import pyplot as plt -from matplotlib.patches import Circle, Patch -from PIL import Image -from scipy import stats -from scipy.cluster import hierarchy -from scipy.spatial import distance -from sklearn.metrics import matthews_corrcoef -from tqdm import tqdm - -import cellpack.autopack as autopack -from cellpack.autopack.GeometryTools import Rectangle -from cellpack.autopack.MeshStore import MeshStore -from cellpack.autopack.upy import colors as col -from cellpack.autopack.upy.colors import map_colors +import pickle def get_distance(pt1, pt2): @@ -303,1164 +280,3 @@ def get_seed_list(packing_config_data, recipe_data): ] return seed_list - - -# These functions have been moved over from Analysis.py -""" -Created on Mon May 6 22:58:44 2013 - -@author: ludo -""" - - -def get_xyz_dict_from_all_pos_dict(all_pos_dict): - """ - returns array of x, y, and z positions for each seed for runs - in all_pos_dict - """ - all_objs = {} - for seed, object_dict in all_pos_dict.items(): - for obj, positions in object_dict.items(): - positions = np.array(positions) - if obj not in all_objs: - all_objs[obj] = {} - if seed not in all_objs[obj]: - all_objs[obj][seed] = {} - for ct, dim in enumerate(["x", "y", "z"]): - all_objs[obj][seed][dim] = positions[:, ct] - return all_objs - - -def save_array_to_file(array_to_save, file_path, seed_index): - f_handle = open(file_path, "a" if seed_index else "w") - np.savetxt( - f_handle, - array_to_save, - delimiter=",", - ) - f_handle.close() - - -def getPositionsFromResFile(self): - # could actually restore file using histoVol. - # or not - # need to parse apr file here anyway - return [] - - -def getPositionsFromObject(self, parents): - positions = [] - for parent in parents: - obparent = self.helper.getObject(parent) - children = self.helper.getChilds(obparent) - for ch in children: - ingr_name = self.helper.getName(ch) - meshp = self.helper.getObject("Meshs_" + ingr_name.split("_")[0]) - if meshp is None: - c = self.helper.getChilds(ch) - if not len(c): - continue - meshp_children = self.helper.getChilds( - c[0] - ) # continue #should get sphere/cylnder parent ? - else: - meshp_children = self.helper.getChilds(meshp) - for cc in meshp_children: - pos = self.helper.ToVec(self.helper.getTranslation(cc)) - positions.append(pos) - return positions - - -def getDistanceFrom(self, target, parents=None, **options): - """ - target : name or host object target or target position - parent : name of host parent object for the list of object to measure distance from - objects : list of object or list of points - """ - # get distance from object to the target. - # all object are in env.packed_objects - # get options - - if isinstance(target, (list, tuple)): - targetPos = target - elif isinstance(target, str): - o = self.helper.getObject(target) - if o is not None: - targetPos = self.helper.ToVec(self.helper.getTranslation(o)) - else: - o = self.helper.getObject(target) - if o is not None: - targetPos = self.helper.ToVec(self.helper.getTranslation(o)) - listCenters = [] - if self.result_file is None: - if parents is None and self.result_file is None: - listeParent = [self.env.name + "_cytoplasm"] - for o in self.env.compartments: - listeParent.append(o.name + "_Matrix") - listeParent.append(o.name + "_surface") - elif parents is not None and self.result_file is None: - listeParent = parents - listCenters = self.getPositionsFromObject(listeParent) - - delta = np.array(listCenters) - np.array(targetPos) - delta *= delta - distA = np.sqrt(delta.sum(1)) - return distA - - -def getClosestDistance(self, parents=None, **options): - if self.result_file is None: - if parents is None and self.result_file is None: - listeParent = [self.env.name + "_cytoplasm"] - for o in self.env.compartments: - listeParent.append(o.name + "_Matrix") - listeParent.append(o.name + "_surface") - elif parents is not None and self.result_file is None: - listeParent = parents - listeCenters = self.getPositionsFromObject(listeParent) - else: - # use data from file - # TODO: currently getPositionsFromResFile returns an empty list - # listeCenters = self.getPositionsFromResFile(listeParent) - listeCenters = [] - # is the distance in the result array ? - listeDistance = np.zeros(len(listeCenters)) + 99999 - for i in range(len(listeCenters)): - for j in range(i + 1, len(listeCenters)): - # should use point - d = self.helper.measure_distance(listeCenters[i], listeCenters[j]) - if d < listeDistance[i]: - listeDistance[i] = d - return listeDistance - - -def displayDistance( - self, - ramp_color1=[1, 0, 0], - ramp_color2=[0, 0, 1], - ramp_color3=None, - cutoff=60.0, -): - distances = np.array(self.env.grid.distToClosestSurf[:]) - mask = distances > cutoff - ind = np.nonzero(mask)[0] - distances[ind] = cutoff - mask = distances < 0 # -cutoff - ind = np.nonzero(mask)[0] - distances[ind] = 0 # cutoff - base = self.helper.getObject(self.env.name + "distances_base") - if base is None: - base = self.helper.Sphere(self.env.name + "distances_base")[0] - p = self.helper.getObject(self.env.name + "distances") - if p is not None: - self.helper.deleteObject(p) # recursif? - p = self.helper.newEmpty(self.env.name + "distances") - # can use cube also - - -def displayDistanceCube( - self, - ramp_color1=[1, 0, 0], - ramp_color2=[0, 0, 1], - ramp_color3=None, - cutoff=60.0, -): - distances = np.array(self.env.grid.distToClosestSurf[:]) - mask = distances > cutoff - ind = np.nonzero(mask)[0] - distances[ind] = cutoff - mask = distances < 0 # -cutoff - ind = np.nonzero(mask)[0] - distances[ind] = 0 # cutoff - base = self.helper.getObject(self.env.name + "distances_base_cube") - if base is None: - # base=self.helper.Sphere(self.env.name+"distances_base")[0] - size = self.env.grid.gridSpacing - base = self.helper.box( - self.env.name + "distances_base_cube", - center=[0.0, 0.0, 0.0], - size=[size, size, size], - )[0] - parent_cube = self.helper.getObject(self.env.name + "distances_cubes") - if parent_cube is not None: - self.helper.deleteObject(parent_cube) # recursif? - parent_cube = self.helper.newEmpty(self.env.name + "distances_cubes") - - -def displayDistancePlane( - self, - ramp_color1=[1, 0, 0], - ramp_color2=[0, 0, 1], - ramp_color3=None, - cutoff=60.0, -): - # which axis ? - distances = np.array(self.env.grid.distToClosestSurf[:]) - - ramp = col.getRamp([ramp_color1, ramp_color2], size=255) # color - mask = distances > cutoff - ind = np.nonzero(mask)[0] - distances[ind] = cutoff - mask = distances < 0 # -cutoff - ind = np.nonzero(mask)[0] - distances[ind] = 0 # cutoff - newd = np.append(distances, cutoff) - colors = map_colors(newd, ramp)[:-1] # 1D array of the grid x,y,1 - autopack._colors = colors - p = self.helper.getObject(self.env.name + "distances") - if p is not None: - self.helper.deleteObject(p) # recursif? - p = self.helper.newEmpty(self.env.name + "distances_p") - - d = np.array(self.env.grid.boundingBox[0]) - np.array(self.env.grid.boundingBox[1]) - p, mpl = self.helper.plane( - self.env.name + "distances_plane", - center=self.env.grid.getCenter(), - size=[math.fabs(d[0]), math.fabs(d[1])], - parent=p, - ) - self.helper.rotateObj(p, [0, 0, -math.pi / 2.0]) - filename = ( - autopack.cache_results + os.sep + self.env.name + "distances_plane_texture.png" - ) - c = colors.reshape( - ( - self.env.grid.nbGridPoints[0], - self.env.grid.nbGridPoints[1], - self.env.grid.nbGridPoints[2], - 3, - ) - ) - - im = Image.fromstring( - "RGB", (c.shape[0], c.shape[1]), np.uint8(c * 255.0).tostring() - ) - im.save(str(filename)) - mat = self.helper.createTexturedMaterial(self.env.name + "planeMat", str(filename)) - # assign the material to the plane - self.helper.assignMaterial(p, mat, texture=True) - - -def grabResultFromJSON(self, n): - ingrrot = {} - ingrpos = {} - for i in range(n): - with open("results_seed_" + str(i) + ".json") as data_file: - data = json.load(data_file) - for recipe in data: - for ingrname in data[recipe]: - for k in range(len(data[recipe][ingrname]["results"])): - if ingrname not in ingrrot: - ingrrot[ingrname] = [] - ingrpos[ingrname] = [] - ingrrot[ingrname].append(data[recipe][ingrname]["results"][k][1]) - ingrpos[ingrname].append(data[recipe][ingrname]["results"][k][0]) - return ingrpos, ingrrot - - -def grabResultFromTXT(self, n, doanalyze=False): - from autopack import transformation as t - - ingrrot = {} - ingrpos = {} - for i in range(1000): - files = open("results_seed_" + str(i) + ".txt", "r") - lines = files.readlines() - files.close() - for line in lines: - line = line.replace("<", " ").replace(">", " ") - elem = line.split() - ingrname = elem[-5] - if ingrname not in ingrrot: - ingrrot[ingrname] = [] - ingrpos[ingrname] = [] - ingrrot[ingrname].append(eval(elem[2])) - ingrpos[ingrname].append(eval(elem[0])) - for ingrname in ingrrot: - ingrrot[ingrname] = [np.array(m).reshape((4, 4)) for m in ingrrot[ingrname]] - if doanalyze: - for ingrname in ingrrot: - eulers3 = [t.euler_from_matrix(m, "rxyz") for m in ingrrot[ingrname]] - e3 = np.degrees(np.array(eulers3)).transpose() - np.savetxt( - ingrname + "_euler_X.csv", - np.array(e3[0]), - delimiter=",", - ) - np.savetxt( - ingrname + "_euler_Y.csv", - np.array(e3[1]), - delimiter=",", - ) - np.savetxt( - ingrname + "_euler_Z.csv", - np.array(e3[2]), - delimiter=",", - ) - self.histogram(e3[0], ingrname + "_euler_X.png") - self.histogram(e3[1], ingrname + "_euler_Y.png") - self.histogram(e3[2], ingrname + "_euler_Z.png") - return ingrpos, ingrrot - - -# should take any type of list... -def save_csv(self, data, filename=None): - if filename is None: - filename = "output.csv" - resultFile = open(filename, "wb") - wr = csv.writer(resultFile, dialect="excel") - # wr.writerows(data) list of list ? - # resultFile.close() - for item in data: - wr.writerow([item]) - resultFile.close() - - -def rectangle_circle_area(self, bbox, center, radius): - # http://www.eex-dev.net/index.php?id=100 - # [[0.,0,0],[1000,1000,1]] - # top,bottom, right, left - # rect=Rectangle(bbox[0][0],bbox[1][0],bbox[0][1],bbox[1][1])#top,bottom, right, left - rect = Rectangle( - bbox[1][1], bbox[0][1], bbox[1][0], bbox[0][0] - ) # top,bottom, right, left - m = [center[0], center[1]] - r = radius - area = math.pi * r**2 - chs = self.g.check_sphere_inside(rect, m, r) - if chs: # sph not completly inside - ch = self.g.check_rectangle_oustide(rect, m, r) - if ch: # rectangle not outside - leftBound, rightBound = self.g.getBoundary(rect, m, r) - area = self.g.get_rectangle_cercle_area(rect, m, r, leftBound, rightBound) - else: - area = bbox[0][1] ** 2 - return area - - -def getVolumeShell(self, bbox, radii, center): - # rectangle_circle_area - volumes = [] - box_size0 = bbox[1][0] - bbox[0][0] - for i in range(len(radii) - 1): - r1 = radii[i] - r2 = radii[i + 1] - v1 = self.g.calc_volume(r1, box_size0 / 2.0) - v2 = self.g.calc_volume(r2, box_size0 / 2.0) - volumes.append(v2 - v1) - return volumes - - -def rdf_3d(self, ingr): - # see for intersection volume here http://crowsandcats.blogspot.com/2013/04/cube-sphere-intersection-volume.html - # and here http://crowsandcats.blogspot.com/2013/05/extending-radial-distributions.html - # will require scipy...worth it ? - # should be pairewise distance ? or not ? - distances = np.array(self.env.distances[ingr.name]) - basename = self.env.basename - np.savetxt( - basename + ingr.name + "_pos.csv", - np.array(self.env.ingredient_positions[ingr.name]), - delimiter=",", - ) - self.histogram(distances, basename + ingr.name + "_histo.png") - np.savetxt( - basename + ingr.name + "_distances.csv", - np.array(distances), - delimiter=",", - ) - # the bin should be not less than the biggest ingredient radius - # b=int(distances.max()/self.largest) - b = 100 - # bin_edges = np.arange(0, min(box_size) / 2, bin_width) - new_rdf, edges = np.histogramdd( - distances, bins=b, range=[(distances.min(), distances.max())] - ) - radii = edges[0] - # from http://isaacs.sourceforge.net/phys/rdfs.html - dnr = new_rdf - N = len(distances) - V = ( - self.env.grid.nbGridPoints[0] - * self.env.grid.nbGridPoints[1] - * self.env.grid.nbGridPoints[2] - * self.env.grid.gridSpacing**3 - ) - Vshell = np.array(self.getVolumeShell(self.bbox, radii, self.center)) - gr = (dnr * V) / (N * Vshell) - np.savetxt(basename + ingr.name + "_rdf.csv", np.array(gr), delimiter=",") - self.plot(gr, radii[:-1], basename + ingr.name + "_rdf.png") - - -def getAreaShell(self, bbox, radii, center): - # rectangle_circle_area - areas = [] - for i in range(len(radii) - 1): - r1 = radii[i] - r2 = radii[i + 1] - area1 = self.rectangle_circle_area(bbox, center, r1) - area2 = self.rectangle_circle_area(bbox, center, r2) - if area1 == 0 or area2 == 0: - areas.append(np.pi * (np.power(r2, 2) - np.power(r1, 2))) - else: - areas.append(area2 - area1) - return areas - - -def ripley(self, positions, dr=25, rMax=None): - # K(t) = A*SUM(wij*I(i,j)/n**2) - # lambda = n/A A is the area of the region containing all points - # I indicator function 1 if its operand is true, 0 otherwise - # t is the search radius - # if homogenous K(s) = pi*s**2 - # L(t) = (K(t)/pi)**1/2 - # A common plot is a graph of t - \hat{L}(t) against t - # which will approximately follow the horizontal zero-axis with constant - # dispersion if the data follow a homogeneous Poisson process. - N = len(positions) - V = 1000**2 - diag = np.sqrt(1000**2 + 1000**2) - dr = dr # all_distance.min() - if rMax is None: - rMax = diag - edges = np.arange(dr, rMax + 1.1 * dr, dr) - k = np.zeros((N, len(edges))) - for i, p in enumerate(positions): - di = distance.cdist( - positions, - [p], - "euclidean", - ) - # dV = np.array(analyse.getAreaShell(analyse.bbox,edges,p)) - for j, e in enumerate(edges): - area0 = math.pi * e**2 # complete circle - area1 = self.rectangle_circle_area(self.bbox, p, e) - w = area1 / area0 - k[i, j] = w * len(np.nonzero(di < e)[0]) / N**2 - Kt = V * np.sum(k, axis=0) - Lt = (Kt / np.pi) ** 0.5 - return Kt, Lt - - -def rdf(self, positions, dr=10, rMax=None): - N = len(positions) - V = 1000**2 - diag = np.sqrt(1000**2 + 1000**2) - dr = dr # all_distance.min() - if rMax is None: - rMax = diag - edges = np.arange(0.0, rMax + 1.1 * dr, dr) - g = np.zeros((N, len(edges) - 1)) - dv = [] - density = float(N) / float(V) - for i, p in enumerate(positions): - di = distance.cdist( - positions, - [p], - "euclidean", - ) - dN, bins = np.histogram(di, bins=edges) - dV = np.array(self.getAreaShell(self.bbox, edges, p)) - dv.append(dV) - g[i] = dN / (dV * density) - avg = np.average(g, axis=0) # /np.array(dv) - return avg - - -def rdf_2d(self, ingr): - # dN/N / dV/V = dN/dV * V/N - distances = np.array(self.env.distances[ingr.name]) - basename = self.env.basename - np.savetxt( - basename + ingr.name + "_pos.csv", - np.array(self.env.ingredient_positions[ingr.name]), - delimiter=",", - ) - self.histogram(distances, basename + ingr.name + "_histo.png") - np.savetxt( - basename + ingr.name + "_distances.csv", - np.array(distances), - delimiter=",", - ) - # the bin should be not less than the biggest ingredient radius - # b=int(distances.max()/self.largest) - new_rdf, edges = np.histogramdd( - distances - ) # , bins=b, range=[(distances.min(), distances.max())],normed=0) - radii = edges[0] - # r=radii.tolist() - # r.insert(0,0.0) - # radii = np.array(r) - # rdf= new_rdf.tolist() - # rdf.insert(0,0) - # new_rdf = np.array(rdf) - # from http://isaacs.sourceforge.net/phys/rdfs.html - dnr = new_rdf[:] - N = len(distances) - V = ( - self.env.grid.nbGridPoints[0] - * self.env.grid.nbGridPoints[1] - * self.env.grid.gridSpacing**2 - ) - Vshell = np.array(self.getAreaShell(self.bbox, radii, self.center)) - # print Vshell - # Vshell1 = np.pi*density*(np.power(radii[1:],2)-np.power(radii[:-1], 2)) - # print Vshell1 - # print radii - gr = (dnr * V) / (N * Vshell) - np.savetxt(basename + ingr.name + "_rdf.csv", np.array(gr), delimiter=",") - self.plot(gr, radii[:-1], basename + ingr.name + "_rdf.png") - # simpl approach Ni/Areai - G = dnr / Vshell - np.savetxt( - basename + ingr.name + "_rdf_simple.csv", - np.array(G), - delimiter=",", - ) - self.plot( - np.array(G), - radii[:-1], - basename + ingr.name + "_rdf_simple.png", - ) - - -def correlation(self, ingr): - basename = self.env.basename - posxyz = np.array(self.env.ingredient_positions[ingr.name]).transpose() - g_average, radii, x, y, z = self.PairCorrelationFunction_3D(posxyz, 1000, 900, 100) - self.plot(g_average, radii, basename + ingr.name + "_corr.png") - - -def PairCorrelationFunction_3D(self, data, S, rMax, dr): - """Compute the three-dimensional pair correlation function for a set of - spherical particles contained in a cube with side length S. This simple - function finds reference particles such that a sphere of radius rMax drawn - around the particle will fit entirely within the cube, eliminating the need - to compensate for edge effects. If no such particles exist, an error is - returned. Try a smaller rMax...or write some code to handle edge effects! ;) - Arguments: - x an array of x positions of centers of particles - y an array of y positions of centers of particles - z an array of z positions of centers of particles - S length of each side of the cube in space - rMax outer diameter of largest spherical shell - dr increment for increasing radius of spherical shell - - Returns a tuple: (g, radii, interior_x, interior_y, interior_z) - g(r) a np array containing the correlation function g(r) - radii a np array containing the radii of the - spherical shells used to compute g(r) - interior_x x coordinates of reference particles - interior_y y coordinates of reference particles - interior_z z coordinates of reference particles - """ - x = data[0] - y = data[1] - z = data[2] - # Find particles which are close enough to the cube center that a sphere of radius - # rMax will not cross any face of the cube - bools1 = x > rMax - bools2 = x < (S - rMax) - bools3 = y > rMax - bools4 = y < (S - rMax) - bools5 = z > rMax - bools6 = z < (S - rMax) - - (interior_indices,) = np.where(bools1 * bools2 * bools3 * bools4 * bools5 * bools6) - num_interior_particles = len(interior_indices) - - if num_interior_particles < 1: - raise RuntimeError( - "No particles found for which a sphere of radius rMax\ -will lie entirely within a cube of side length S. Decrease rMax\ -or increase the size of the cube." - ) - - edges = np.arange(0.0, rMax + 1.1 * dr, dr) - num_increments = len(edges) - 1 - g = np.zeros([num_interior_particles, num_increments]) - radii = np.zeros(num_increments) - numberDensity = len(x) / S**3 - - # Compute pairwise correlation for each interior particle - for p in range(num_interior_particles): - index = interior_indices[p] - d = np.sqrt((x[index] - x) ** 2 + (y[index] - y) ** 2 + (z[index] - z) ** 2) - d[index] = 2 * rMax - - (result, bins) = np.histogram(d, bins=edges, normed=False) - g[p, :] = result / numberDensity - - # Average g(r) for all interior particles and compute radii - g_average = np.zeros(num_increments) - for i in range(num_increments): - radii[i] = (edges[i] + edges[i + 1]) / 2.0 - rOuter = edges[i + 1] - rInner = edges[i] - g_average[i] = np.average(g[:, i]) / ( - 4.0 / 3.0 * np.pi * (rOuter**3 - rInner**3) - ) - - return ( - g_average, - radii, - x[interior_indices], - y[interior_indices], - z[interior_indices], - ) - # Number of particles in shell/total number of particles/volume of shell/number density - # shell volume = 4/3*pi(r_outer**3-r_inner**3) - - -def PairCorrelationFunction_2D(self, x, y, S, rMax, dr): - """Compute the two-dimensional pair correlation function, also known - as the radial distribution function, for a set of circular particles - contained in a square region of a plane. This simple function finds - reference particles such that a circle of radius rMax drawn around the - particle will fit entirely within the square, eliminating the need to - compensate for edge effects. If no such particles exist, an error is - returned. Try a smaller rMax...or write some code to handle edge effects! ;) - - Arguments: - x an array of x positions of centers of particles - y an array of y positions of centers of particles - S length of each side of the square region of the plane - rMax outer diameter of largest annulus - dr increment for increasing radius of annulus - - Returns a tuple: (g, radii, interior_x, interior_y) - g(r) a np array containing the correlation function g(r) - radii a np array containing the radii of the - annuli used to compute g(r) - interior_x x coordinates of reference particles - interior_y y coordinates of reference particles - """ - # Number of particles in ring/area of ring/number of reference particles/number density - # area of ring = pi*(r_outer**2 - r_inner**2) - # Find particles which are close enough to the box center that a circle of radius - # rMax will not cross any edge of the box - bools1 = x > 1.1 * rMax - bools2 = x < (S - 1.1 * rMax) - bools3 = y > rMax * 1.1 - bools4 = y < (S - rMax * 1.1) - (interior_indices,) = np.where(bools1 * bools2 * bools3 * bools4) - num_interior_particles = len(interior_indices) - - if num_interior_particles < 1: - raise RuntimeError( - "No particles found for which a circle of radius rMax\ - will lie entirely within a square of side length S. Decrease rMax\ - or increase the size of the square." - ) - - edges = np.arange(0.0, rMax + 1.1 * dr, dr) - num_increments = len(edges) - 1 - g = np.zeros([num_interior_particles, num_increments]) - radii = np.zeros(num_increments) - numberDensity = len(x) / S**2 - - # Compute pairwise correlation for each interior particle - for p in range(num_interior_particles): - index = interior_indices[p] - d = np.sqrt((x[index] - x) ** 2 + (y[index] - y) ** 2) - d[index] = 2 * rMax - - (result, bins) = np.histogram(d, bins=edges, normed=False) - g[p, :] = result / numberDensity - - # Average g(r) for all interior particles and compute radii - g_average = np.zeros(num_increments) - for i in range(num_increments): - radii[i] = (edges[i] + edges[i + 1]) / 2.0 - rOuter = edges[i + 1] - rInner = edges[i] - # divide by the area of sphere cut by sqyare - g_average[i] = np.average(g[:, i]) / (np.pi * (rOuter**2 - rInner**2)) - - return (g_average, radii, interior_indices) - - -def normalize_similarity_df(self, similarity_df): - """ - Normalizes the similarity dataframe - """ - dims_to_normalize = self.get_list_of_dims() + ["pairwise_distance"] - for dim in dims_to_normalize: - values = similarity_df.loc[:, dim].values - normalized_values = (values - np.min(values)) / ( - np.max(values) - np.min(values) - ) - normalized_values[np.isnan(normalized_values)] = 0 - similarity_df.loc[:, dim] = normalized_values - return similarity_df - - -def calc_avg_similarity_values_for_dim(self, similarity_vals_for_dim): - packing_inds = np.cumsum( - np.hstack([0, self.num_seeds_per_packing]) - ) # returns the indices where packings start and end - avg_similarity_values = -np.ones((self.num_packings, self.num_packings)) - - for p1_id in range(self.num_packings): - for p2_id in range(self.num_packings): - if avg_similarity_values[p1_id, p2_id] >= 0: - continue - p1_inds = np.arange( - packing_inds[p1_id], packing_inds[p1_id + 1] - ) # indices corresponding to packing p1_id - p2_inds = np.arange(packing_inds[p2_id], packing_inds[p2_id + 1]) - avg_similarity_values[p1_id, p2_id] = np.mean( - similarity_vals_for_dim[p1_inds, p2_inds] - ) - avg_similarity_values[p2_id, p1_id] = avg_similarity_values[p1_id, p2_id] - return avg_similarity_values - - -def calc_similarity_df(self, all_objs, ingredient_key, save_path=None): - """ - Calculates a dataframe of similarity values between packings - """ - key_list = list(all_objs[ingredient_key].keys()) - similarity_df = pd.DataFrame( - index=key_list, - columns=pd.MultiIndex.from_product([self.get_list_of_dims(), key_list]), - dtype=float, - ) - similarity_df["packing_id"] = 0 - dims_to_calc = self.get_list_of_dims() + ["pairwise_distance"] - for seed1, pos_dict1 in tqdm(all_objs[ingredient_key].items()): - similarity_df.loc[seed1, "packing_id"] = seed1.split("_")[-1] - for seed2, pos_dict2 in all_objs[ingredient_key].items(): - for dim in dims_to_calc: - if dim == "pairwise_distance": - pos1 = np.array([pos_dict1["x"], pos_dict1["y"], pos_dict1["z"]]) - pos2 = np.array([pos_dict2["x"], pos_dict2["y"], pos_dict2["z"]]) - arr1 = distance.pdist(pos1.T) - arr2 = distance.pdist(pos2.T) - else: - arr1 = pos_dict1[dim] - arr2 = pos_dict2[dim] - - min_dim = np.min([arr1.ravel(), arr2.ravel()]) - max_dim = np.max([arr1.ravel(), arr2.ravel()]) - arr1 = (arr1 - min_dim) / max_dim - arr2 = (arr2 - min_dim) / max_dim - - if len(arr1) == 1 or len(arr2) == 1: - # cannot determine similarity when only one instance is packed - similarity_score = 0 - elif len(np.unique(arr1)) == 1 or len(np.unique(arr2)) == 1: - # if there is only one unique value, compare the value - if len(np.unique(arr1)) == 1 and len(np.unique(arr2)) == 1: - # both packings have only one unique value, compare the value - similarity_score = ( - 1 if np.unique(arr1) == np.unique(arr2) else 0 - ) - else: - # one of the packings has more than one unique value, cannot compare - similarity_score = 0 - else: - # anderson-darling test - # ad_stat = stats.anderson_ksamp([arr1, arr2]) - # similarity_score = (ad_stat.significance_level - 0.001) / ( - # 0.25 - 0.001 - # ) - # similarity_score = 1 - ad_stat.statistic - - # 2 sample ks - ks_stat = stats.ks_2samp(arr1, arr2) - similarity_score = 1 - ks_stat.statistic - # similarity_score = ks_stat.pvalue - - # histograms for bhattacharyya and jensen-shannon distances - # hist1, bin_edges1 = np.histogram(arr1, bins="auto", density=True) - # hist2, bin_edges2 = np.histogram(arr2, bins=bin_edges1, density=True) - - # bhattacharyya distance - # similarity_score = np.sqrt(np.sum(np.sqrt(hist1 * hist2))) - - # jensen-shannon distance - # similarity_score = 1 - distance.jensenshannon(arr1, arr2) - - similarity_df.loc[seed1, (dim, seed2)] = similarity_score - - similarity_df = self.normalize_similarity_df(similarity_df) - if save_path is not None: - dfpath = save_path / f"similarity_df_{ingredient_key}.csv" - print(f"Saving similarity df to {dfpath}") - similarity_df.to_csv(dfpath) - - return similarity_df - - -def plot_and_save_similarity_heatmaps(self, similarity_df, ingredient_key): - """ - Plots heatmaps with hierarchical clustering using similarity scores - """ - # accounting for changes when reading from csv - if similarity_df["packing_id"].ndim > 1: - packing_ids = similarity_df["packing_id"].iloc[:, 0] - else: - packing_ids = similarity_df["packing_id"] - lut = dict(zip(packing_ids.unique(), sns.color_palette())) - row_colors = packing_ids.map(lut) - row_colors.rename("Packing ID", inplace=True) - figdir = self.figures_path / "clustering" - figdir.mkdir(parents=True, exist_ok=True) - - dims_to_calc = self.get_list_of_dims() + ["pairwise_distance"] - for dim in dims_to_calc: - avg_similarity_values = self.calc_avg_similarity_values_for_dim( - similarity_df[dim].values - ) - np.savetxt( - figdir / f"avg_similarity_{ingredient_key}_{dim}.txt", - avg_similarity_values, - ) - - row_linkage = hierarchy.linkage( - 1 - distance.squareform(similarity_df[dim], checks=False), - method="ward", - optimal_ordering=True, - ) - - g = sns.clustermap( - similarity_df[dim], - row_colors=row_colors, - row_linkage=row_linkage, - col_linkage=row_linkage, - cbar_kws={"label": "similarity score"}, - ) - handles = [Patch(facecolor=lut[name]) for name in lut] - plt.legend( - handles, - lut, - title="Packing IDs", - bbox_to_anchor=(1, 1), - bbox_transform=plt.gcf().transFigure, - loc="upper right", - ) - g.ax_col_dendrogram.set_visible(False) - g.ax_row_dendrogram.set_visible(False) - g.ax_heatmap.set_xlabel(f"{ingredient_key}_{dim}") - g.savefig(figdir / f"clustermap_{ingredient_key}_{dim}", dpi=300) - - -def run_similarity_analysis( - self, - all_objs, - ingredient_key, - save_heatmaps=False, - recalculate=False, -): - """ - TODO: add docs - """ - print("Running similarity analysis...") - - if ingredient_key not in all_objs: - raise ValueError(f"Missing ingredient: {ingredient_key}") - - dfpath = self.output_path / f"similarity_df_{ingredient_key}.csv" - if dfpath.is_file() and not recalculate: - print(f"Loading similarity values from {dfpath}") - similarity_df = pd.read_csv(dfpath, header=[0, 1]) - else: - similarity_df = self.calc_similarity_df( - all_objs, - ingredient_key=ingredient_key, - save_path=self.output_path, - ) - - if save_heatmaps: - self.plot_and_save_similarity_heatmaps( - similarity_df, - ingredient_key=ingredient_key, - ) - - return similarity_df - - -def calc_and_save_correlations( - self, - all_spilr_scaled, - ingredient_key, -): - key_list = [ - f"{pc}_{sc}" - for pc in range(self.num_packings) - for sc in range(self.num_seeds_per_packing[pc]) - ] - corr_df = pd.DataFrame( - index=key_list, - columns=key_list, - dtype=float, - ) - corr_df["packing_id"] = np.nan - for pc1 in range(self.num_packings): - for sc1 in tqdm(range(self.num_seeds_per_packing[pc1])): - for pc2 in range(self.num_packings): - for sc2 in range(self.num_seeds_per_packing[pc2]): - corr_df.loc[f"{pc1}_{sc1}", "packing_id"] = pc1 - # do not calculate if: - # a) already calculated - # b) calculating for same packing - if (not np.isnan(corr_df.loc[f"{pc1}_{sc1}", f"{pc2}_{sc2}"])) or ( - (pc1 == pc2) and (sc1 == sc2) - ): - continue - corr_df.loc[f"{pc1}_{sc1}", f"{pc2}_{sc2}"] = matthews_corrcoef( - all_spilr_scaled[pc1, sc1].flatten(), - all_spilr_scaled[pc2, sc2].flatten(), - ) - corr_df.loc[f"{pc2}_{sc2}", f"{pc1}_{sc1}"] = corr_df.loc[ - f"{pc1}_{sc1}", f"{pc2}_{sc2}" - ] - df_packing = corr_df.pop("packing_id") - lut = dict(zip(df_packing.unique(), sns.color_palette())) - row_colors = df_packing.map(lut) - corr_df.fillna(0, inplace=True) - g = sns.clustermap( - corr_df, - row_colors=row_colors, - cbar_kws={"label": "spilr correlation"}, - ) - g.savefig(self.figures_path / f"spilr_correlation_{ingredient_key}.png", dpi=300) - - -def save_spilr_heatmap(self, input_dict, file_path, label_str=None): - fig, ax = plt.subplots() - g = sns.heatmap( - input_dict, - cbar=False, - xticklabels=False, - yticklabels=False, - ax=ax, - ) - g.set_xlabel("Angular coordinates") - g.set_ylabel(label_str) - g.invert_yaxis() - - fig.savefig( - file_path, - dpi=300, - ) - plt.close() - - -def get_parametrized_representation( - self, - all_pos_list, - ingredient_key=None, - mesh_paths={}, - num_angular_points=64, - save_plots=False, - max_plots_to_save=1, - get_correlations=False, -): - print("creating parametrized representations...") - - if "inner" not in mesh_paths or "outer" not in mesh_paths: - raise ValueError( - "Missing mesh paths required to generate parametrized representations" - ) - - inner_mesh = trimesh.load_mesh(mesh_paths.get("inner")) - outer_mesh = trimesh.load_mesh(mesh_paths.get("outer")) - - theta_vals = np.linspace(0, np.pi, 1 + num_angular_points) - phi_vals = np.linspace(0, 2 * np.pi, 1 + 2 * num_angular_points) - rad_vals = np.linspace(0, 1, 1 + num_angular_points) - - all_spilr = {} - for scaled_val in ["raw", "scaled"]: - all_spilr[scaled_val] = np.full( - ( - self.num_packings, - np.max(self.num_seeds_per_packing), - len(rad_vals), - len(theta_vals) * len(phi_vals), - ), - np.nan, - ) - - if save_plots: - save_dir = self.figures_path / "spilr_heatmaps" - os.makedirs(save_dir, exist_ok=True) - - for pc, (packing_id, packing_dict) in enumerate( - zip(self.packing_id_dict.values(), all_pos_list) - ): - num_saved_plots = 0 - for sc, (_, pos_dict) in enumerate(packing_dict.items()): - pos_list = np.array(pos_dict[ingredient_key]) - sph_pts = self.cartesian_to_sph(pos_list) - - ( - scaled_rad, - distance_between_surfaces, - inner_surface_distances, - ) = MeshStore.calc_scaled_distances_for_positions( - pos_list, inner_mesh, outer_mesh - ) - - trial_spilr = {} - for scaled_val in ["raw", "scaled"]: - rad_array = ( - np.linspace(0, distance_between_surfaces.max(), len(rad_vals)) - if scaled_val == "raw" - else rad_vals - ) - - trial_spilr[scaled_val] = np.zeros( - (len(rad_array), len(theta_vals), len(phi_vals)) - ) - - max_rad = distance_between_surfaces.max() if scaled_val == "raw" else 1 - - if np.any(rad_array > max_rad) or np.any(rad_array < 0): - raise ValueError("Check ray-mesh intersections!") - - rad_pos = inner_surface_distances if scaled_val == "raw" else scaled_rad - rad_inds = np.digitize(rad_pos, rad_array) - 1 - theta_inds = np.digitize(sph_pts[:, 1], theta_vals) - 1 - phi_inds = np.digitize(sph_pts[:, 2], phi_vals) - 1 - - trial_spilr[scaled_val][rad_inds, theta_inds, phi_inds] = 1 - - all_spilr[scaled_val][pc, sc] = trial_spilr[scaled_val].reshape( - (len(rad_array), -1) - ) - - if save_plots and (num_saved_plots <= max_plots_to_save): - label_str = f"Distance from Nuclear Surface, {packing_id}_{sc}, {scaled_val}" - file_path = ( - save_dir - / f"heatmap_{scaled_val}_{packing_id}_{sc}_{ingredient_key}" - ) - self.save_spilr_heatmap( - all_spilr[scaled_val][pc, sc], file_path, label_str - ) - num_saved_plots += 1 - - if get_correlations: - print("calculating correlations...") - self.calc_and_save_correlations( - all_spilr["scaled"], - ingredient_key=ingredient_key, - ) - - if save_plots: - for scaled_val in ["raw", "scaled"]: - average_spilr = np.nanmean(all_spilr[scaled_val], axis=1) - for pc, packing_id in enumerate(self.packing_id_dict.values()): - label_str = ( - f"Distance from Nuclear Surface, avg {packing_id}, {scaled_val}" - ) - file_path = ( - save_dir / f"avg_heatmap_{scaled_val}_{packing_id}_{ingredient_key}" - ) - self.save_spilr_heatmap(average_spilr[pc], file_path, label_str) - - return all_spilr - - -def calcDistanceMatrixFastEuclidean2(self, nDimPoints): - nDimPoints = np.array(nDimPoints) - n, m = nDimPoints.shape - delta = np.zeros((n, n), "d") - for d in range(m): - data = nDimPoints[:, d] - delta += (data - data[:, np.newaxis]) ** 2 - return np.sqrt(delta) - - -def flush(self): - import gc - import pprint - - for i in range(2): - print("Collecting %d ..." % i) - n = gc.collect() - print("Unreachable objects:", n) - print("Remaining Garbage:") - pprint.pprint(gc.garbage) - del gc.garbage[:] - - -def merge(self, d1, d2, merge=lambda x, y: y): - result = dict(d1) - for k, v in d2.items(): - if k in result: - result[k].extend(v) - else: - result[k] = v - return result - - -def plotNResult2D(self, n, bbox=[[0.0, 0, 0.0], [1000.0, 1000.0, 1000.0]]): - for i in range(n): - f = "results_seed_" + str(i) + ".json" - self.plot_one_result_2d(filename=f, bbox=bbox) - - -def plot_one_result_2d( - self, - data=None, - filename=None, - bbox=[[0.0, 0, 0.0], [1000.0, 1000.0, 1000.0]], -): - if data is None and filename is None: - return - elif data is None and filename is not None: - with open(filename) as data_file: - data = json.load(data_file) - fig = plt.figure() - ax = fig.add_subplot(111) - radius = {} - ingrrot = {} - ingrpos = {} - for recipe in data: - for ingrname in data[recipe]: - for k in range(len(data[recipe][ingrname]["results"])): - if ingrname not in ingrrot: - ingrrot[ingrname] = [] - ingrpos[ingrname] = [] - radius[ingrname] = data[recipe][ingrname]["encapsulating_radius"] - ingrrot[ingrname].append(data[recipe][ingrname]["results"][k][1]) - ingrpos[ingrname].append(data[recipe][ingrname]["results"][k][0]) - for ingr in ingrpos: - for i, p in enumerate(ingrpos[ingr]): - ax.add_patch( - Circle( - (p[0], p[1]), - radius[ingr], - edgecolor="black", - facecolor="red", - ) - ) - ax.set_aspect(1.0) - plt.axhline(y=bbox[0][1], color="k") - plt.axhline(y=bbox[1][1], color="k") - plt.axvline(x=bbox[0][0], color="k") - plt.axvline(x=bbox[1][0], color="k") - plt.axis([bbox[0][0], bbox[1][0], bbox[0][1], bbox[1][1]]) - plt.savefig("plot" + ingr + ".png") - plt.close() # closes the current figure - return - - -def one_exp(self, seed, output_path, eid=0, nmol=1, periodicity=True, dim=2): - output = output_path + str(nmol) - if periodicity: - self.env.use_periodicity = True - autopack.testPeriodicity = True - else: - self.env.use_periodicity = False - autopack.testPeriodicity = False - if dim == 3: - autopack.biasedPeriodicity = [1, 1, 1] - else: - autopack.biasedPeriodicity = [1, 1, 0] - if not os.path.exists(output): - os.makedirs(output) From 22d453ff3c4b5ee1fc79f413890705cd55214755 Mon Sep 17 00:00:00 2001 From: Saurabh Mogre Date: Tue, 28 May 2024 15:02:33 -0700 Subject: [PATCH 12/12] remove unused attributes from analysis class --- cellpack/autopack/Analysis.py | 23 +---------------------- cellpack/bin/pack.py | 3 --- 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/cellpack/autopack/Analysis.py b/cellpack/autopack/Analysis.py index 0b5b22bf..363ddd96 100644 --- a/cellpack/autopack/Analysis.py +++ b/cellpack/autopack/Analysis.py @@ -17,7 +17,6 @@ from mdutils.mdutils import MdUtils import cellpack.autopack as autopack -from cellpack.autopack.GeometryTools import GeometryTools from cellpack.autopack.ldSequence import halton from cellpack.autopack.plotly_result import PlotlyAnalysis from cellpack.autopack.utils import check_paired_key, get_paired_key, get_seed_list @@ -29,29 +28,13 @@ class Analysis: def __init__( self, env=None, - viewer=None, - result_file=None, packing_results_path=None, output_path=None, ): self.env = None - self.smallest = 99999.0 - self.largest = 0.0 if env: self.env = env - self.smallest = env.smallestProteinSize - self.largest = env.largestProteinSize - self.afviewer = viewer - self.helper = None - if viewer: - self.helper = self.afviewer.vi - self.result_file = result_file self.center = [0, 0, 0] - self.bbox = [[0, 0, 0], [1, 1, 1]] - self.g = GeometryTools() - self.g.Resolution = 1.0 # or grid step? - self.current_pos = None - self.current_distance = None self.plotly = PlotlyAnalysis() if packing_results_path is not None: @@ -995,11 +978,7 @@ def pack_one_seed( """ seed = int(seed_list[seed_index]) seed_basename = self.env.add_seed_number_to_base_name(seed) - # Clear - if self.afviewer: - self.afviewer.clearFill("Test_Spheres2D") - else: - self.env.reset() + self.env.reset() self.env.saveResult = True numpy.random.seed(seed) self.build_grid() diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index b1f7423e..80d052c8 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -43,15 +43,12 @@ def pack(recipe, config_path=None, analysis_config_path=None): env = Environment(config=packing_config_data, recipe=recipe_data) env.helper = helper - afviewer = None if ( packing_config_data["save_analyze_result"] or packing_config_data["number_of_packings"] > 1 ): analyze = Analysis( env=env, - viewer=afviewer, - result_file=None, ) log.info(f"saving to {env.out_folder}")