From d9e4b7ec571f86bc73ae75e50a3e87af0b776173 Mon Sep 17 00:00:00 2001 From: Joel Simoneau Date: Fri, 23 Apr 2021 17:36:35 -0400 Subject: [PATCH] removing unused scripts --- group_testing/decoder.py | 147 ----------------------- group_testing/group_testing_optimizer.py | 61 ---------- group_testing/group_testing_reporter.py | 4 - group_testing/ilp_preprocessing.py | 111 ----------------- group_testing/model_preprocessing.py | 113 ----------------- 5 files changed, 436 deletions(-) delete mode 100644 group_testing/decoder.py delete mode 100644 group_testing/group_testing_optimizer.py delete mode 100644 group_testing/group_testing_reporter.py delete mode 100644 group_testing/ilp_preprocessing.py delete mode 100644 group_testing/model_preprocessing.py diff --git a/group_testing/decoder.py b/group_testing/decoder.py deleted file mode 100644 index 6668af4..0000000 --- a/group_testing/decoder.py +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Created on Tue Mar 31 14:57:49 2020 - -@author: hoomanzabeti -""" - -import numpy as np -import copy -from sklearn.base import BaseEstimator, ClassifierMixin -import cplex -from sklearn.metrics import f1_score - -""" -Decoder for the group testing problem -""" -class GroupTestingDecoder(BaseEstimator, ClassifierMixin): - - """ - Initialization of the decoder - """ - def __init__(self, rc=1, rc_e=1, rc_p=0.1, rc_z=0.1, cpx_timelimit=None,\ - rule_max_length=None, FN_up=None, FP_up=None): - - self.rc = rc - self.rc_e = rc_e - self.rc_p = rc_p - self.rc_z = rc_z - self.cpx_timelimit = cpx_timelimit - self.rule_max_length = rule_max_length - self.FN_up = FN_up - self.FP_up = FP_up - self.FP_up_percent = FP_up - - """ - Method for training the classifier with CPLEX - """ - def fit(self, X, y): - - m, n = X.shape - row_sum = list(X.sum(axis=1)) - self.m_ = m - self.n_ = n - X = X.values.tolist() - - # Positive and zero split - P = [i for i in range(len(y)) if y[i] == 1] - Z = [i for i in range(len(y)) if i not in P] - # FN_up and FP_up are in percentage. We should convert them to get the - # actual numbers - if self.FN_up is not None: - self.FN_up = self.FN_up * len(P) - if self.FP_up is not None: - self.FP_up = self.FP_up * len(Z) - - # ----------- Cplex ILP problem setup ---------------------- - # Objective function general form: lambda_rc*(1^T w)+ lambda_rc_e*(lambda_rc_p*(1^T ep) + lambda_rc_z*(1^T ez)) - p_obj = [self.rc] * n + [self.rc_e * self.rc_p] * len(P) + [self.rc_e * self.rc_z] * len(Z) - # Variables lower bound - p_lb = [0] * n + [0] * m - # Variables upper bound - p_ub = [1] * n + [1] * m - # Variable types(Binary) - p_ctype = ["B"] * n + ["B"] * m - # Variable names - p_colnames = ["w" + str(i) for i in range(n)] + ["ep" + str(j) for j in P] + ["ez" + str(j) for j in Z] - # Constraints bounds and names - additional_rows = [x for x in [self.rule_max_length, self.FN_up, self.FP_up] if x is not None] - p_rownames = ["r" + str(i) for i in range(m + len(additional_rows))] - p_rhs = [1] * len(P) + [0] * len(Z) + additional_rows - p_sense = ["G"] * len(P) + ["G"] * len(Z) + ["L"] * len(additional_rows) - - rows = [] - prob = cplex.Cplex() - # Time limit for solving the problem - if self.cpx_timelimit is not None: - prob.parameters.timelimit.set(self.cpx_timelimit) - - - # -------------------------------------- - prob.objective.set_sense(prob.objective.sense.minimize) - prob.variables.add(obj=p_obj, lb=p_lb, ub=p_ub, types=p_ctype, names=p_colnames) - - # ---------------- Constraints ---------------------------- - for i in P: - row_P_name = ["w" + str(j) for j in range(n) if X[i][j] != 0] + ["ep" + str(i)] - row_P_value = [X[i][j] for j in range(n) if X[i][j] != 0] + [1] - rows.append(copy.copy([row_P_name, row_P_value])) - - for i in Z: - row_Z_name = ["w" + str(j) for j in range(n) if X[i][j] != 0] + ["ez" + str(i)] - row_Z_value = [-X[i][j] for j in range(n) if X[i][j] != 0] + [row_sum[i]] - rows.append(copy.copy([row_Z_name, row_Z_value])) - - # ----------------- Additional constraints ----------------- - if self.rule_max_length is not None: - row_w_name = ["w" + str(j) for j in range(n)] - row_w_value = [1 for j in range(n)] - rows.append(copy.copy([row_w_name, row_w_value])) - if self.FN_up is not None: - row_ep_name = ["ep" + str(j) for j in P] - row_ep_value = [1 for j in P] - rows.append(copy.copy([row_ep_name, row_ep_value])) - if self.FP_up is not None: - row_ez_name = ["ez" + str(j) for j in Z] - row_ez_value = [1 for j in Z] - rows.append(copy.copy([row_ez_name, row_ez_value])) - - # ---------------------------------------------------------- - prob.linear_constraints.add(lin_expr=rows, senses=p_sense, rhs=p_rhs, names=p_rownames) - #prob.set_log_stream(None) - #prob.set_error_stream(None) - #prob.set_warning_stream(None) - #prob.set_results_stream(None) - prob.solve() - self.prob_obj_ = prob - # Note that in the following line we should have prob.solution.get_values(v) != 0 - # However, since their might be small cal error we might have some numbers like - # "-1.2184445573610454e-13". Therefore we set prob.solution.get_values(v) >= 0.5. - self.w_solution_ = [int(v[1:]) for v in prob.variables.get_names() if - v[0] == 'w' and prob.solution.get_values(v) >= 0.5] - print(self.w_solution_) - print('------------------------------------------------') - print(len(self.w_solution_)) - return self - - """ - Method for predicting with classifier - """ - def predict(self, X, y=None): - - try: - getattr(self, "w_solution_") - w = [1 if i in self.w_solution_ else 0 for i in range(self.n_)] - y_pred = [1 if i != 0 else 0 for i in np.dot(X.values.tolist(), w)] - except AttributeError: - raise RuntimeError("You must train classifer before predicting data!") - return y_pred - - """ - Method for assigning a score usin the F1 metric - """ - def score(self, X, y=None, metric='F1'): - if metric == 'F1': - score_value = f1_score(y, self.predict(X)) - return score_value diff --git a/group_testing/group_testing_optimizer.py b/group_testing/group_testing_optimizer.py deleted file mode 100644 index ccda730..0000000 --- a/group_testing/group_testing_optimizer.py +++ /dev/null @@ -1,61 +0,0 @@ -import os - - -def GT_optimizer(file_path, param, name="cplex"): - if name == "cplex": - # TODO: cplex is imported here in case some users does not have the licence. In this case it would be called - # TODO: when "cplex" is selected. - import cplex - prob = cplex.Cplex() - prob.read(file_path) - log_stream_status = param['log_stream'] - error_stream_status = param['error_stream'] - warning_stream_status = param['warning_stream'] - result_stream_status = param['result_stream'] - - # prob.set_log_stream(log_stream_status) - # prob.set_error_stream(error_stream_status) - # prob.set_warning_stream(warning_stream_status) - # prob.set_results_stream(result_stream_status) - - # Solving the problem - prob.parameters.timelimit.set(60) - prob.solve() - groupTestingSln = {'w': [int(v[2:-1]) for v in prob.variables.get_names() if - v[0] == 'w' and prob.solution.get_values(v) >= 1e-05], - 'Fn': [int(v[3:-1]) for v in prob.variables.get_names() if - v[0:2] == 'ep' and prob.solution.get_values(v) >= 1e-05], - 'Fp': [int(v[3:-1]) for v in prob.variables.get_names() if - v[0:2] == 'en' and prob.solution.get_values(v) >= 1e-05]} - # print(prob.solution.get_objective_value()) - elif name == "gurobi": - import gurobipy as gp - from gurobipy import GRB - prob = gp.read(file_path) - prob.optimize() - groupTestingSln = {'w': [int(v.varName[2:-1]) for v in prob.getVars() if - v.varName[0] == 'w' and v.x >= 1e-05], - 'Fn': [int(v.varName[3:-1]) for v in prob.getVars() if - v.varName[0:2] == 'ep' and v.x >= 1e-05], - 'Fp': [int(v.varName[3:-1]) for v in prob.getVars() if - v.varName[0:2] == 'en' and v.x >= 1e-05]} - # print(prob.objVal) - - return groupTestingSln - - -if __name__ == '__main__': - current_directory = os.getcwd() - file_path = os.path.join(current_directory, r'problem.mps') - - param = {} - param['file_path'] = file_path - param['log_stream'] = None - param['error_stream'] = None - param['warning_stream'] = None - param['result_stream'] = None - - sln = GT_optimizer(file_path=file_path, param=param, name="cplex") - # print(sln) - # sln = GT_optimizer(file_path=file_path, param=param, name="gurobi") - print(sln) diff --git a/group_testing/group_testing_reporter.py b/group_testing/group_testing_reporter.py deleted file mode 100644 index f245954..0000000 --- a/group_testing/group_testing_reporter.py +++ /dev/null @@ -1,4 +0,0 @@ -import pandas as pd - -def decoder_reporter(ev_result): - pd.DataFrame.from_dict(ev_result, orient='index').to_csv('Results/CM.csv') \ No newline at end of file diff --git a/group_testing/ilp_preprocessing.py b/group_testing/ilp_preprocessing.py deleted file mode 100644 index 0838a29..0000000 --- a/group_testing/ilp_preprocessing.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -@author: hoomanzabeti -""" -import os -import numpy as np -from generate_test_results import gen_test_vector - -def mps_writer(A,label,param): - - file_path = param['file_path'] - lambda_w = param['lambda_w'] - lambda_p = param['lambda_p'] - lambda_n = param['lambda_n'] - m,n= A.shape - alpha = A.sum(axis=1) - positive_label = np.where(label==1)[0] - negative_label = np.where(label==0)[0] - - - opt_file = open(file_path, 'w') - opt_file.write('NAME\tProblem\n') - opt_file.write('OBJSENSE\n') - opt_file.write('\tMIN\n') - opt_file.write('OBJNAME\n') - opt_file.write('\tOBJ\n') - - # ---------------------- Print ROWS ------------------------- - opt_file.write('ROWS\n') - opt_file.write(' N OBJ\n') - - for i in range(m): - if i in positive_label: - opt_file.write(' G cp{}\n'.format(i)) - elif i in negative_label: - opt_file.write(' G cn{}\n'.format(i)) - - # ---------------------- Print COLUMNS ------------------------- - opt_file.write('COLUMNS\n') - w_column = '' - for i in range(n): - w_column+=' w{}\tOBJ\t{}\n'.format(i,lambda_w) - for j in np.where(A[:,i]==1)[0]: - if j in positive_label: - w_column+=' w{}\tcp{}\t1\n'.format(i,j) - elif j in negative_label: - w_column+=' w{}\tcn{}\t-1\n'.format(i,j) - opt_file.write(w_column) - - ep_column='' - for i in positive_label: - ep_column+=' ep{}\tOBJ\t{}\n'.format(i,lambda_p) - ep_column+=' ep{}\tcp{}\t1\n'.format(i,i) - opt_file.write(ep_column) - - en_column='' - for i in negative_label: - en_column+=' en{}\tOBJ\t{}\n'.format(i,lambda_n) - en_column+=' en{}\tcn{}\t{}\n'.format(i,i,alpha[i]) - opt_file.write(en_column) - - # ---------------------- Print RHS ------------------------- - opt_file.write('RHS\n') - rhs_columns = '' - for i in positive_label: rhs_columns+=' RHS1\tcp{}\t1\n'.format(i) - - opt_file.write(rhs_columns) - # ---------------------- Print BOUNDS ------------------------- - opt_file.write('BOUNDS\n') - for i in range(n): - opt_file.write(' BV BND\tw{}\n'.format(i)) -# for i in positive_label: -# opt_file.write(' BV BND\tep{}\n'.format(i)) - # ============================================================================= - for i in positive_label: - opt_file.write(' LO BND\tep{}\t0\n'.format(i)) - opt_file.write(' UP BND\tep{}\t1\n'.format(i)) - # ============================================================================= - for i in negative_label: - opt_file.write(' BV BND\ten{}\n'.format(i)) - opt_file.write('ENDATA') - opt_file.close() -if __name__ == '__main__': - - # options for plotting, verbose output, saving, seed - opts = {} - opts['m'] = 4 - opts['N'] = 6 - opts['verbose'] = True #False - opts['plotting'] = True #False - opts['saving'] = True - opts['run_ID'] = 'GT_test_result_vector_generation_component' - opts['test_noise_method'] = 'none' - opts['data_filename'] = opts['run_ID'] + '_generate_groups_output.mat' - opts['seed'] = 0 - - A = np.random.randint(2,size=(opts['m'],opts['N'])) - u = np.random.randint(2,size=opts['N']) - b = gen_test_vector(A, u, opts) - - current_directory = os.getcwd() - file_path = os.path.join(current_directory,r'problem.mps') - - param= {} - param['file_path'] = file_path - param['lambda_w'] = 1 - param['lambda_p'] = 0.1 - param['lambda_n'] = 0.2 - - mps_writer(A,b,param) \ No newline at end of file diff --git a/group_testing/model_preprocessing.py b/group_testing/model_preprocessing.py deleted file mode 100644 index 73bc965..0000000 --- a/group_testing/model_preprocessing.py +++ /dev/null @@ -1,113 +0,0 @@ -from pymprog import model -import os -import numpy as np -from generate_test_results import gen_test_vector - - -def problem_setup(A, label, param): - # This function provide a ILP in form of eq 11 in WABI paper. However it can be changed if we uncomment - # "Additional constraints" below. - file_path = param['file_path'] - lambda_w = param['lambda_w'] - lambda_p = param['lambda_p'] - lambda_n = param['lambda_n'] - fixed_defective_num = param['defective_num'] - sensitivity = param['sensitivity'] - specificity = param['specificity'] - noiseless_mode = param['noiseless_mode'] - LP_relaxation = param['LP_relaxation'] - m, n = A.shape - alpha = A.sum(axis=1) - # label = np.array(label) - positive_label = np.where(label == 1)[0] - negative_label = np.where(label == 0)[0] - - # We are using pymprog - # Initializing the ILP problem - p = model('GroupTesting') - p.verbose(param['verbose']) - # Variables kind - if LP_relaxation: - varKind = float - else: - varKind = bool - # Variable w - #TODO: Fix w lowerbound and upperbound - w = p.var(name='w', inds=n, bounds=(0, 1), kind=varKind) - # Variable ep - if len(positive_label)!=0: - ep = p.var(name='ep', inds=list(positive_label), kind=float, bounds=(0, 1)) - # Variable en - if len(negative_label)!=0: - if noiseless_mode: - en = p.var(name='en', inds=list(negative_label)) - else: - en = p.var(name='en', inds=list(negative_label), kind=bool) - # Defining the objective function - prob_obj = sum(lambda_w * w[i] for i in range(n)) + \ - sum(lambda_p * ep[j] for j in positive_label) + \ - sum(lambda_n * en[k] for k in negative_label) - - p.minimize(prob_obj, name='OBJ') - # Constraints - for i in positive_label: - sum(A[i][j] * w[j] for j in range(n)) + ep[i] >= 1 - for i in negative_label: - if noiseless_mode: - sum(A[i][j] * w[j] for j in range(n)) - en[i] == 0 - else: - sum(-1 * A[i][j] * w[j] for j in range(n)) + alpha[i] * en[i] >= 0 - - # Additional constraints - - # if fixed_defective_num is not None: - # sum(w[i] for i in range(n)) <= fixed_defective_num - - # if sensitivity is not None: - # sum(en[i] for i in negative_label) <= sensitivity* len(Z) - - # TODO: Right now we need to solve the problem before saving the file! Otherwise it wouldn't save the constraints. - # TODO: For now we can set a time limit - p.solver(int, tm_lim=1) - p.solve() - # Save fixed mps format - p.write_mps(1, None, file_path) - # Change the line above to p.write_mps(2, None, file_path) if you want free mps format. - p.end() - -if __name__ == '__main__': - # options for plotting, verbose output, saving, seed - opts = {} - opts['m'] = 3 - opts['N'] = 5 - opts['verbose'] = True # False - opts['plotting'] = True # False - opts['saving'] = True - opts['run_ID'] = 'GT_test_result_vector_generation_component' - opts['data_filename'] = opts['run_ID'] + '_generate_groups_output.mat' - opts['test_noise_methods'] = [] - opts['seed'] = 0 - - A = np.random.randint(2, size=(opts['m'], opts['N'])) - u = np.random.randint(2, size=opts['N']) - b = gen_test_vector(A, u, opts) - - # Test - #A = np.array([[1,0,0],[1,0,1],[0,1,0]]) - #b = np.array([1,0,1]) - current_directory = os.getcwd() - file_path = os.path.join(current_directory, r'problem.mps') - - param = {} - param['file_path'] = file_path - param['lambda_w'] = 1 - param['lambda_p'] = 100 - param['lambda_n'] = 100 - param['verbose'] = False - param['defective_num'] = None - param['sensitivity'] = None - param['specificity'] = None - param['noiseless_mode'] = True - param['LP_relaxation'] = False - - problem_setup(A, b, param)