diff --git a/framework/.gitignore b/framework/.gitignore new file mode 100644 index 0000000..cce01a4 --- /dev/null +++ b/framework/.gitignore @@ -0,0 +1,2 @@ +/outputs/** +!/outputs/**/.gitkeep \ No newline at end of file diff --git a/framework/README.md b/framework/README.md new file mode 100644 index 0000000..eeaaa18 --- /dev/null +++ b/framework/README.md @@ -0,0 +1,153 @@ +# Framework + +The Framework has been realized with the aim of creating an environment where to implement and test varios load balancing strategies. + +An high-level overview of the Framework modules, inputs and output is reported in the following image. This picture represent the UML Component Diagram of the simulator. In blu are highlighted main modules, in green their output artifacts and in orange the required files from the nodes modelling work. + +![ComponentDiagram](../images/framework/component_diagram.png) + +The simulation takes place in **steps**, also called **simulation instants**; each of them corresponds to a different situation or configuration that must be simulated in terms of traffic to be balanced. + +## Main components + +### Instance Generator + +The purpose of this component is to generate the instance on which the simulation will be performed. Specifically, information regarding the peer-to-peer node network and the load that each node will have to handle for each simulation instance is generated in this phase. This script take 3 CLI arguments: + +- **Nodes number**: Number of nodes to simulate; +- **Edge probability**: Probability to generate an edge between each pair of nodes; +- **Max percentage of overloaded nodes**: A percentage indicating the max number of nodes to be in overload during the simulation +- **Seed**: Seed used as starting point for pseudo-random generation process. + +The artifact produced by this element are a generated json instance and an image of corresponding network topology. Both this elements can be found in **output/archive/[timestamp].zip** output directory. + +### Model +The main objective of the model component is to provide a representation of the metrics prediction models within the Framework. + +The implementation of the component was done by creating two classes: +- **Model**: this class is used to represent models within the System. It has five private attributes that allow it to uniquely identify a particular model; +- **ModelProxy**: this class serves as an access point for all components of the System that need to obtain predictions. + +In the following picture a UML class diagram of this component is reported. +![SequenceDiagram](../images/framework/model_class.png) + + +### Simulator + +This component is concerned to execute all steps of simulation, calculating load balancing weights of each agent towards other agents of the network, using different techniques, and calculating forwarding tables. In this phase is executed _simulation.py_ script. + +It accepts as input a single parameter, namely **Model type**, that allows to specify which model to use among standard regression, 5% quantile regression, and 95% quantile regression. + +This component is the main part of the **Framework** package and uses a various number of other components. +In the following picture a UML sequence diagram of simulator main operations is reported. + +![SequenceDiagram](../images/framework/sequence_simulator.png) + +The **simulator**, reading an instance file, need to retrieve the metrics predictions of the current instant. This data are provided by the **Model** component, which requires as input the expected load values from the nodes in the network. + +Each agent, during simulation, execute different load balancing strategies, providing the ability to use the metrics prediction models. How this classes are organized is represented in the following class diagram. + +![AgentStrategies](../images/framework/agent.png) + +### Analyzer + +This component analyze **Simulator** output files and calculate indexes used for comparison between different strategies. Like the Simulator, this module also makes use of models for its own purposes; for this reason, the type of model to be used (5% quantile regression, regression or 95% quantile regression) can be provided as input, which should reflect the choice made in the Simulator. + +Main index calculated are: + +1) **Success rate**: Percentage of requests managed with success by the system, applaying a specific load balancing technique; + +2) **Success rate during stress period**: Percentage of requests managed with success by the system, applaying a specific load balancing technique, when the load is higher; + +3) **Mean Node Power Consumption**: this index is intended to allow comparison on the average power consumption among different strategies; + +4) **Max Node Power Consumption**: represents the maximum consumption value estimated on the nodes in the network; + +5) **Reject number**: Number of rejected request by the system. + +This script export a CSV file containing mean and median of these indexex. It also export comparison simulation graph that compares success rates of different startegies during simulation (see figure below). + +![GraphExample](../images/framework/comparison_curl.png) + +### Simulation controller + +Using this component to run a simulation simplifies the experimental phase because it automates the sequential execution of internal modules that would otherwise have to be run manually. + +Acting as an entry point for the components contained within it, the Simulation Controller is set up to be able to handle all the parameters in those modules, with some differences: + +1) The Simulation Controller is capable of receive as input one or more values of **Max percentage of overloaded nodes**, so that the same instance can be evaluated on different values of load; +2) The Simulation Controller allows you to specify an additional parameter, the **number of experiments**. This will generate and simulate n scenarios, and then then aggregate the indices obtained in the various experiments into unique values that can provide much more precise indications of performance. If number of experiments is greater than one, than values for the seed will not be accepted by the Simulation Controller. + +In the following picture a UML sequence diagram of Simulation Controller main operations is reported. + +![SequenceDiagram](../images/framework/simulation_controller.png) + +All artifacts produced by this process are gathered in a zip file as name the timestamp, under [**output**](outputs/) directory. + +## Configuration Manager + +This utility class (_configuration/configuration\_manager.py_) contains all paramter, variables, strategy names, and settings of simulator. It also include the definitions of the **parameters** used by the **Node Margin Strategy** and the **Power Saving Strategy**. + +- **Note**: is an algorithm is added to simulator or some paramters need to change, they must be changed in this configuration class. + +## Implemented Strategies + +During simulation are executed all load balancing strategies defined in _behaviour_ module, and added in _configuration/configuration\_manager.py_ class. + +The implemented strategies are: + +- **Base Strategy**. This strategy does not forward traffic, each agent use only its resources. This strategy is implemented in _behaviour/base\_strategy.py_; + +- **Equal Strategy**. This strategy shares excess traffic equally among neighbors. This strategy is implemented in _behaviour/equal\_strategy.py_; + +- **Node Margin Strategy**. This is the main distribuited load balancing algorithm. With this strategy each node balances its load according to the **margin** offered by its neighborhood. This strategy is implemented in _behaviour/node\_margin\_strategy.py_; + +- **Power Saving Strategy**. declination of the Node Margin Strategy that allows a limit to be placed on energy consumption. This strategy is implemented in _behaviour/power\_saving\_strategy.py_. + +All of the strategies returns as output a set of weights for each function from each agent towards others + +### How to add other strategies + +To add a new strategy it is only necessary create a new file in _behaviour_ module, and the new class must implement the _behaviour/strategy.py_ "interface". New class must implement run method and return a dictionary containing forwarding weights. + +It is also necessary add new strategy name in _configuration/configuration\_manager.py_ file and add a case for strategy creation in _factory/strategy\_factory.py_ class. + +## How to execute a simulation + +For execute a simulation using **simulation controller** as entry point, first move on simulation folder from the project root. + +```console +cd simulation +``` + +After that, type and execute the following command: + +```console +python simulation_controller.py --nodesnum [integer number] --edgeprob [float probability between 0 and 1] --overloaded [one or more values of percentage] --expnum [integer number] --modeltype [a string among regression, quantile005, quantile095] --seed [integer number] +``` +- **Note**: All parameters are optional. + +Example: + +```console +python simulation_controller.py --nodesnum 10 --edgeprob 0.3 --overloaded 30 60 --expnum 1 --modeltype regression --seed 701 +``` +- **Note**: If **expnum** is greater than 1, than seed cannot be specified: + +```console +python simulation_controller.py --nodesnum 10 --edgeprob 0.3 --overloaded 30 60 --expnum 10 --modeltype regression +``` + +It is also possible to execute single scripts: + +```console +python instance_generator.py --nodesnum 5 --edgeprob 0.3 --overloaded 40 --seed 711 +``` + +```console +python simulation.py --modeltype regression +``` + +```console +python analyzer.py --modeltype regression +``` \ No newline at end of file diff --git a/framework/__init__.py b/framework/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/agent/__init__.py b/framework/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/agent/agent.py b/framework/agent/agent.py new file mode 100644 index 0000000..ec92213 --- /dev/null +++ b/framework/agent/agent.py @@ -0,0 +1,41 @@ +from behaviour.strategy import Strategy + +class Agent(): # Inherit by Thread in () bratches + + def __init__(self, id, logger, behaviour: Strategy, model_proxy): + super().__init__() + self._id = id + self._behaviour = behaviour + self._behaviour.set_id(id) + self._behaviour.set_model_proxy(model_proxy) + self._logger = logger + self._behaviour.set_logger(logger) + + # Used when this class extends Thread + def run(self) -> dict: + #self.loop() + w = self._behaviour.run() + #print(w) + return w + + @property + def strategy(self) -> Strategy: + """ + The Context maintains a reference to one of the Strategy objects. The + Context does not know the concrete class of a strategy. It should work + with all strategies via the Strategy interface. + """ + + return self._behaviour + + @strategy.setter + def strategy(self, _behaviour: Strategy) -> None: + """ + Usually, the Context allows replacing a Strategy object at runtime. + """ + + self._behaviour = _behaviour + + def disable_logging(self): + #self._logger.isEnabledFor(50) # Used to do not print and gain in speed. + self._logger.disabled = True diff --git a/framework/analyzer.py b/framework/analyzer.py new file mode 100644 index 0000000..c8171f9 --- /dev/null +++ b/framework/analyzer.py @@ -0,0 +1,384 @@ +import pandas as pd +import os +import math +import numpy as np +import matplotlib.pyplot as plt +from configuration.config_manager import ConfigManager +from model.model_proxy import ModelProxy +from utils.utils import flatten +import json +from cli.cli import get_analyzer_and_simulator_args + +config_manager = ConfigManager() +model_manager = ModelProxy() + +def calculate_rates_globally(function_rate_x_node, df_presence, config_file, strategy_name): + # It contains the number of requests rejected in this instant for each function + func_reject_num = {} + + # It contains the success rate of this instant for each function + success_rate = {} + + # It contains the reject rate of this instant for each function + reject_rate = {} + + # Reject the requests of the functions that are not deployed in the node + for f in config_manager.FUNCTION_NAMES: + func_reject_num[f] = 0 + for index, row in df_presence.iterrows(): + if df_presence.loc[index, f] == 0: + func_reject_num[f] += function_rate_x_node.loc[index, f] + + # Average consumption of the node in the current istant + node_avg_power_consumption = 0 + # Max consumption of the nodes in the current istant + max_power_consumption = 0 + # Overloaded nodes counter + overloaded_nodes = 0 + + # Iterate over all the node + for index, row in df_presence.iterrows(): + # Reset the rate of the functions that are not deployed on the node + rate_only_present_functions = function_rate_x_node.loc[index] * df_presence.loc[index] + # Obtains data in group format + features_data = model_manager.transform_functions_in_groups(rate_only_present_functions) + + # Add the information about the node type + node_type = config_file[index]["node_type"] + features_data["node_type"] = config_manager.NODES_TYPES_IN_MODELS[node_type] + + node_predictions = model_manager.get_node_predictions(features_data) + node_power_consumption = node_predictions["power_usage_node"].iloc[0] + overload = node_predictions["overloaded_node"].iloc[0] + + if overload == 1: + overloaded_nodes += 1 + + # Percentage of load to discard + load_discard_percentage = 0.05 + while overload == 1 or (strategy_name == "power_saving_strategy" and node_power_consumption > config_manager.MAX_POWER_USAGE[node_type]) : + # Calculate the the percentage value for each element + percentage_values = [round(element * load_discard_percentage) for element in rate_only_present_functions] + + # If all calculated values are 0, then increment the percentage + while all(value == 0 for value in percentage_values): + load_discard_percentage += 0.01 + + # Calculate the new values + percentage_values = [round(element * load_discard_percentage) for element in rate_only_present_functions] + + # Remove a percentage of the requests for each function > 0 + for i in range(0, len(rate_only_present_functions)): + if rate_only_present_functions[i] > 0: + # Calculate the rate to remove for the selected function + rate_to_remove = percentage_values[i] + + # Remove the rate of the function from the node + rate_only_present_functions[i] -= rate_to_remove + + # Count the removed rate in the total reject req + func_reject_num[rate_only_present_functions.index[i]] += rate_to_remove + + # Return features data in the correct from to be predicted + features_data = model_manager.transform_functions_in_groups(rate_only_present_functions) + + # Add the information about the node type + features_data["node_type"] = config_manager.NODES_TYPES_IN_MODELS[config_file[index]["node_type"]] + + node_predictions = model_manager.get_node_predictions(features_data) + + node_power_consumption = node_predictions["power_usage_node"].iloc[0] + overload = node_predictions["overloaded_node"].iloc[0] + + if node_power_consumption > max_power_consumption: + max_power_consumption = node_power_consumption + + # Add the node consumption to the total + node_avg_power_consumption += node_power_consumption + + + # Get the average consumption for node + node_avg_power_consumption /= len(df_presence) + + # Calculate the success and the reject rate for each function + total_invoc_rate_x_function = function_rate_x_node.sum(axis = 0) + for func in func_reject_num: + success_number = total_invoc_rate_x_function[func] - func_reject_num[func] + success_rate[func] = (success_number / total_invoc_rate_x_function[func]) if total_invoc_rate_x_function[func] > 0 else 1.0 + reject_rate[func] = 1.0 - success_rate[func] + + return success_rate, reject_rate, func_reject_num, node_avg_power_consumption, max_power_consumption, overloaded_nodes + + +def export_for_minute_overloaded_nodes(overloaded_for_algo): + """ + Export plot that represent success rate during all minutes of experiment + """ + # Plot configurations + plt.figure(figsize=(20, 10)) + plt.title("Overloaded nodes over all the minutes of experiment") + plt.xlabel("Minute") + plt.ylabel("N° of overloaded node") + + df = pd.DataFrame(data=overloaded_for_algo, index=[i for i in range(0, config_manager.SIMULATION_MINUTES)]) + for column in df.columns: + plt.plot(df.index, df[column], label="Overloaded nodes for {}".format(column)) + + # Plot configurations + plt.legend(loc="lower left") + plt.grid() + + plt.savefig(create_path_if_not_exists(config_manager.ANALYZER_OUTPUT_PATH).joinpath("comparison_nodes.png")) + + +def export_for_minute_rates(func, rates): + """ + Export plot that represent success rate during all minutes of experiment + """ + # Plot configurations + plt.figure(figsize=(20, 10)) + plt.title("Success rate for function {} during {} minutes of experiment".format(func, config_manager.SIMULATION_MINUTES)) + plt.xlabel("Minute") + plt.ylabel("Success rate") + + df = pd.DataFrame(data=rates, index=[i for i in range(0, config_manager.SIMULATION_MINUTES)]) + #print(df) + + # Define line styles and colors + line_styles = ['-', '--', '-.', ':'] + colors = ['blue', 'green', 'red', 'orange'] + + # Plot each column with a different line style and color + for i, column in enumerate(df.columns): + linestyle = line_styles[i % len(line_styles)] + color = colors[i % len(colors)] + plt.plot(df.index, df[column], label="Success rate for {}".format(column), linestyle=linestyle, color=color) + + # Plot configurations + plt.legend(loc="lower left") + plt.grid() + + plt.savefig(create_path_if_not_exists(config_manager.ANALYZER_OUTPUT_PATH).joinpath("comparison_{}.png".format(func))) + + +def export_index_comparison_table(df): + """ + Export index comparison table of different strategies as CSV file + """ + df.to_csv(config_manager.INDEX_COMPARISON_FILE, sep=',', encoding='utf-8') + +# Create the specific path if not present in the machine +def create_path_if_not_exists(path): + if not os.path.exists(path): + os.makedirs(path) + return path + +def main(): + kargs = get_analyzer_and_simulator_args() + model_manager.set_model_type(kargs["modeltype"]) + rates_for_algo = {} + overloaded_for_algo = {} + index_comparison = pd.DataFrame(index=config_manager.INDEX_TO_COMPARE) + + # Read the instance file + instance_file = config_manager.OUTPUT_INSTANCE_JSON_FILE_PATH + with open(instance_file) as f: + config_file = json.load(f) + + # Initialize DataFrame + df_presence = pd.DataFrame(0, index=config_manager.FUNCTION_NAMES, columns=[]) + + # Populate DataFrame + for node_key, node_value in config_file.items(): + if "node_" in node_key: + node_functions = [ + function_info["function_name"] + for k, v in node_value["load"][0].items() + for function_info in v["functions"] + ] + df_presence[node_key] = df_presence.index.isin(node_functions).astype(int) + + df_presence = df_presence.sort_index(axis=1).T + + # Used only for initialization + for func in config_manager.FUNCTION_NAMES: + rates_for_algo[func] = {} + + # For each strategy type, for each minute and for each function read data exported + # by the simulation and use them to calculate rates and indexes for comparison + for algo in config_manager.STRATEGIES: + x_func_success_rate = {} + x_func_reject_rate = {} + x_func_reject_num = {} + x_node_power_consumption = [] + x_node_max_power_consumption = [] + x_overloaded_nodes_counter = [] + + # Initialize dictionary of rates for all functions + for func in config_manager.FUNCTION_NAMES: + x_func_success_rate[func] = [] + x_func_reject_rate[func] = [] + x_func_reject_num[func] = [] + + print("-------------------------- ALGO {} --------------------------".format(algo)) + + # Create path for recover tables + base_path = config_manager.SIMULATION_TABLES_OUTPUT_PATH.joinpath(algo) + + for minute in range(0, config_manager.SIMULATION_MINUTES): + print("MINUTE {}".format(minute)) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + + # Complete path for load tables + path = base_path.joinpath("minute_" + str(minute)) + + # Table which contains the rates after the exchanges of each function that each node has + function_rate_x_node = pd.DataFrame() + + # For each minute and foreach function load dataframe + for func in config_manager.FUNCTION_NAMES: + df = pd.read_csv(path.joinpath(func + ".csv"), delimiter='\t', header=0, index_col=0) + function_rate_x_node[func] = df.sum(axis=0) + + sr, rr, rn, an, mc, on = calculate_rates_globally(function_rate_x_node, df_presence, config_file, algo) + x_node_power_consumption.append(an) + x_node_max_power_consumption.append(mc) + x_overloaded_nodes_counter.append(on) + + + for func in sr: + x_func_success_rate[func].append(sr[func]) + x_func_reject_rate[func].append(rr[func]) + x_func_reject_num[func].append(rn[func]) + rates_for_algo[func][algo] = x_func_success_rate[func] + + print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + + overloaded_for_algo[algo] = x_overloaded_nodes_counter + print("STATS FOR ALGO {}".format(algo)) + # Metrics prints + + ##### SUCCESS RATES METRICS ##### + # Mean success rate + mean_success_rate = np.mean([np.mean(srates) for k, srates in x_func_success_rate.items()]) * 100 + print(" > Mean success rate: {:0.2f}%".format( + mean_success_rate + )) + + # Mean node consumption + mean_node_consumption = sum(x_node_power_consumption) / len(x_node_power_consumption) + print(" > Mean node power consumption: {:0.2f}%".format( + mean_node_consumption + )) + + # Max node consumption + max_node_consumption = max(x_node_max_power_consumption) + print(" > Max node power consumption: {:0.2f}%".format( + max_node_consumption + )) + + # Success rate variance + # flat_list = [i * 100 for i in flatten(list(x_func_success_rate.values()))] + # success_rate_variance = np.var(flat_list) + # print(" > Success rate variance: {:0.2f}".format(success_rate_variance)) + + # Success rate median + flat_list = flatten(list(x_func_success_rate.values())) + success_rate_median = np.median(flat_list) * 100 + print(" > Success rate median: {:0.2f}%".format(success_rate_median)) + + # Success rate percentile + # flat_list = flatten(list(x_func_success_rate.values())) + # success_rate_percentile = np.percentile(flat_list, config_manager.ANALYSIS_PERCENTILE) * 100 + # print(" > Success rate {}% percentile: {:0.2f}%".format( + # config_manager.ANALYSIS_PERCENTILE, + # success_rate_percentile + # ) + # ) + + ##### SUCCESS RATES (STRESS PERIOD) METRICS ##### + # Mean success rate calculated during high traffic period + mid_instant = math.floor(config_manager.SIMULATION_MINUTES / 2) + low = mid_instant - 1 + high = mid_instant + 1 + mean_success_rate_stress_period = np.mean([np.mean(srates[low:high]) for k, srates in x_func_success_rate.items()]) * 100 + print(" > Mean success rate during stress period: {:0.2f}%".format( + mean_success_rate_stress_period + )) + + # Success rate variance (stress period) + # flat_list = [i * 100 for i in flatten([item[1:6] for item in list(x_func_success_rate.values())])] + # success_rate_stress_period_variance = np.var(flat_list) + # print(" > Success rate variance during stress period (from minute 1 to 5): {:0.2f}" + # .format(success_rate_stress_period_variance)) + + # Success rate median (stress period) + flat_list = flatten([item[low:high] for item in list(x_func_success_rate.values())]) + success_rate_stress_period_median = np.median(flat_list) * 100 + print(" > Success rate median during stress period: {:0.2f}%" + .format(success_rate_stress_period_median)) + + # Success rate percentile (stress period) + # flat_list = flatten([item[1:6] for item in list(x_func_success_rate.values())]) + # success_rate_stress_period_percentile = np.percentile(flat_list, config_manager.ANALYSIS_PERCENTILE) * 100 + # print(" > Success rate {}% percentile during stress period (from minute 1 to 5): {:0.2f}%" + # .format( + # config_manager.ANALYSIS_PERCENTILE, + # success_rate_stress_period_percentile + # ) + # ) + + ##### REJECT RATES METRICS ##### + # Total rejected requests num calculated for each algorithm across minutes + total_reject_requests = np.sum([np.sum(rejnums) for k, rejnums in x_func_reject_num.items()]) + print(" > Total rejected requests: {} req".format( + total_reject_requests + )) + + # Reject number variance + # flat_list = flatten(list(x_func_reject_num.values())) + # reject_number_variance = np.var(flat_list) + # print(" > Reject num variance: {:0.2f}".format(reject_number_variance)) + + # Reject number median + # flat_list = flatten(list(x_func_reject_num.values())) + # reject_number_median = np.median(flat_list) + # print(" > Reject num median: {:0.2f}".format(reject_number_median)) + + # Reject number percentile + # flat_list = flatten(list(x_func_reject_num.values())) + # reject_number_percentile = np.percentile(flat_list, config_manager.ANALYSIS_PERCENTILE) + # print(" > Reject num {}% percentile: {:0.2f}".format( + # config_manager.ANALYSIS_PERCENTILE, + # reject_number_percentile + # ) + # ) + + print("----------------------------------------------------------------------------") + + index_comparison[algo] = [ + mean_success_rate, + success_rate_median, + mean_success_rate_stress_period, + success_rate_stress_period_median, + mean_node_consumption, + max_node_consumption, + total_reject_requests + ] + + # Export print for comparison + for func in config_manager.FUNCTION_NAMES: + export_for_minute_rates(func, rates_for_algo[func]) + + # Creates plot of overloaded nodes + export_for_minute_overloaded_nodes(overloaded_for_algo) + + # Export index comparison table + print("> INDEX COMPARISON TABLE") + print(index_comparison.T) + export_index_comparison_table(index_comparison.T) + + +# Call main program. +if __name__ == "__main__": + main() diff --git a/framework/behaviour/__init__.py b/framework/behaviour/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/behaviour/base_strategy.py b/framework/behaviour/base_strategy.py new file mode 100644 index 0000000..ea985d3 --- /dev/null +++ b/framework/behaviour/base_strategy.py @@ -0,0 +1,36 @@ +from .strategy import Strategy + +class BaseStrategy(Strategy): + + def __init__(self, config_json): + super().__init__(config_json) + + def run(self) -> dict: + return self.loop() + + + def loop(self) -> dict: + """ + This stategy does not perform any forward of request. + In any case, the node keeps all the requests + """ + weights = {} + + # Extract loads data of the node from the config file + original_requests_groups, original_requests_functions = self._get_load_for_groups_and_functions(self._config_json) + + for fun in original_requests_functions.keys(): + weights[fun] = {} + weights[fun][self._id] = 1 + + for func, val in weights.items(): + self._logger.info("Weights for func {}: {}".format(func, val)) + + return weights + + def set_id(self, id): + self._id = self._config_manager.NODE_KEY_PREFIX + str(id) + self._prefix = "THREAD: " + self._id + + def set_logger(self, logger): + self._logger = logger diff --git a/framework/behaviour/equal_strategy.py b/framework/behaviour/equal_strategy.py new file mode 100644 index 0000000..cea8b49 --- /dev/null +++ b/framework/behaviour/equal_strategy.py @@ -0,0 +1,101 @@ +from .strategy import Strategy +import copy +import math + +class EqualStrategy(Strategy): + + def __init__(self, config_json): + super().__init__(config_json) + + def run(self) -> dict: + return self.loop() + + + def loop(self) -> dict: + weights = {} + + # Extract loads data of the node from the config file + original_requests_groups, original_requests_functions = self._get_load_for_groups_and_functions(self._config_json) + + # Add the information about the type of the node + original_requests_groups["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[self._config_json["node_type"]] + + # Get a copy for each original dictionary + remained_requests_groups = copy.deepcopy(original_requests_groups) + remained_requests_functions = copy.deepcopy(original_requests_functions) + + # List of functions deployed on the node + node_functions = list(original_requests_functions.keys()) + + # Iterator of the functions in the node + i = 0 + + # Iterate removing load until the node is underload + overload = self._config_json["node_metrics"]["overloaded_node"] + + load_discard_percentage = 0.01 + while overload == 1: + + # Remove a percentage of the requests for each function > 0 + for i in range(0, len(node_functions)): + if remained_requests_functions[node_functions[i]] > 0: + # Calculate the rate to remove for the selected function + load_to_discard = math.ceil(load_discard_percentage * remained_requests_functions[node_functions[i]]) + + # Remove the rate of the function from the node + remained_requests_functions[node_functions[i]] -= load_to_discard + + # Remove the rate group of the selected function + group_to_discard = self._get_group_of_function(node_functions[i]) + remained_requests_groups[group_to_discard] -= load_to_discard + + #Get prediction using the new features + node_predictions = self._model_proxy.get_node_predictions(remained_requests_groups) + + # Check if the node is still in overload + overload = node_predictions["overloaded_node"].iloc[0] + + # Reset the index of the functions + i += 1 + if i == len(node_functions): + i = 0 + + # It contains, for each functions, the removed and the mantained requests + rem_mant_request_function = {} + + for func in node_functions: + rem_mant_request_function[func] = {} + rem_mant_request_function[func]["removed"] = original_requests_functions[func] - remained_requests_functions[func] + rem_mant_request_function[func]["mantained"] = remained_requests_functions[func] + + + n_of_neigh = len(self._config_json["neighbours"]) + + for fun in rem_mant_request_function.keys(): + weights[fun] = {} + # Obtain total number of requests of the selected function + tot_n_of_req = rem_mant_request_function[fun]["removed"] + rem_mant_request_function[fun]["mantained"] + # Use divmod to get the quotient and remainder + quotient, remainder = divmod(rem_mant_request_function[fun]["removed"], n_of_neigh) + for node in self._config_json["neighbours"] + [self._id]: + if tot_n_of_req == 0: + weights[fun][node] = 0.0 + else: + # If node is a neighbour + if node != self._id: + weights[fun][node] = round((quotient + remainder) / tot_n_of_req, 4) + remainder = 0 + else: + weights[fun][node] = round(rem_mant_request_function[fun]["mantained"] / tot_n_of_req, 4) + + for func, val in weights.items(): + self._logger.info("Weights for func {}: {}".format(func, val)) + + return weights + + def set_id(self, id): + self._id = self._config_manager.NODE_KEY_PREFIX + str(id) + self._prefix = "THREAD: " + self._id + + def set_logger(self, logger): + self._logger = logger diff --git a/framework/behaviour/node_margin_strategy.py b/framework/behaviour/node_margin_strategy.py new file mode 100644 index 0000000..5fff3b3 --- /dev/null +++ b/framework/behaviour/node_margin_strategy.py @@ -0,0 +1,317 @@ +from .strategy import Strategy +import copy +import math + + +class NodeMarginStrategy(Strategy): + def __init__(self, config_json): + super().__init__(config_json) + + # For each node represents the percentage of usage that the neighbours can use + self._margin_percentage_in = {} + + # Represents the percentage of usage that is possible forward to each neighbour + self._margin_percentage_out = {} + + def run(self) -> dict: + return self.__loop() + + def set_id(self, id): + self._id = self._config_manager.NODE_KEY_PREFIX + str(id) + self._prefix = "THREAD: " + self._id + + def set_logger(self, logger): + self._logger = logger + + def __loop(self) -> dict: + self._calculate_margin() + self._exchange() + return self._calculate_weights() + + # Get all the functions deployed in the selected node + def __get_deployed_functions(self, actual_node): + functions_deployed = [] + for k, value in self._config_json[actual_node]["load"].items(): + if "USAGE" in k: + for i in range(0, len(value["functions"])): + functions_deployed.append(value["functions"][i]["function_name"]) + return functions_deployed + + # Get the number of neigh with at least one function in common with the actual node + def __neigh_with_at_least_one_fun_in_common(self, actual_node): + functions_deployed = self.__get_deployed_functions(actual_node) + + num = 0 + for neigh in self._config_json[actual_node]["neighbours"]: + found = False + for k, value in self._config_json[neigh]["load"].items(): + if "USAGE" in k: + for i in range(0, len(value["functions"])): + if value["functions"][i]["function_name"] in functions_deployed: + num += 1 + found = True + break + if found == True: + break + return num + + # Calculates the margin for each node in the net + def _calculate_margin(self): + for node, node_infos in self._config_json.items(): + self._margin_percentage_in[node] = {} + self._margin_percentage_in[node]["node_type"] = node_infos["node_type"] + # In case the node is in overload, the only information that produces is margin = 0 + # avoiding to send information about the load + if node_infos["node_metrics"]["overloaded_node"] == 0: + # Get the load of each group in the node + self._margin_percentage_in[node]["load"] = {} + for group, values in node_infos["load"].items(): + self._margin_percentage_in[node]["load"][group] = values["total_rate"] + neigh_num = self.__neigh_with_at_least_one_fun_in_common(node) + ##print(neigh_num) + if neigh_num == 0: + self._margin_percentage_in[node]["margin"] = 0 + else: + node_type = node_infos["node_type"] + # Calculate the percentage usage for each node metric + usage_percentage = {} + skip_calculations = False + for metric, values in self._config_manager.MAX_RESOURCES_USAGE.items(): + # If at least one metric is higher than the considered max value, then the margin is 0 + if node_infos["node_metrics"][metric] > values[node_type]: + self._margin_percentage_in[node]["margin"] = 0 + skip_calculations = True + break + else: + usage_percentage[metric] = (node_infos["node_metrics"][metric] * 100) / values[node_type] + ##print(usage_percentage) + # Skip the next calculations if at least one metric was higher than the considered max value + if skip_calculations: + continue + + # Obtain a single value of the node usage mediating over all the node metrics + total_usage_percentage = 0 + for metric in usage_percentage: + total_usage_percentage += usage_percentage[metric] + total_usage_percentage /= len(usage_percentage.keys()) + ##print(total_usage_percentage) + + # Obtain the margin of the node + self._margin_percentage_in[node]["margin"] = round((100 - total_usage_percentage) / neigh_num, 4) + else: + self._margin_percentage_in[node]["margin"] = 0 + + def _exchange(self): + """ + Mocked phase of algo. All agents already has all information to execute weights calculation + During this phase "margin_percentage_in" are exchanged from each node to other neighbours + Note: during previous phase "margin_percentage_in" are calculated for each node so the exchange phase only transform margin_percentage_in in margin_percentage_out + """ + + for node, limits in self._margin_percentage_in.items(): + if self._id == node or not node in self._config_json[self._id]["neighbours"]: + continue + self._margin_percentage_out[node] = {} + for k, v in limits.items(): + self._margin_percentage_out[node][k] = v + + logger_dict = {} + for node, values in self._margin_percentage_out.items(): + logger_dict[node] = values["margin"] + self._logger.info("Margin percentage out: {}".format(logger_dict)) + + def _calculate_weights(self): + w = {} + + # Extract loads data of the node from the config file + original_requests_groups, original_requests_functions = self._get_load_for_groups_and_functions(self._config_json[self._id]) + + # Add the information about the type of the node + original_requests_groups["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[self._config_json[self._id]["node_type"]] + + # Get a copy for each original dictionary + remained_requests_groups = copy.deepcopy(original_requests_groups) + remained_requests_functions = copy.deepcopy(original_requests_functions) + + # List of functions deployed on the node + node_functions = list(original_requests_functions.keys()) + + # For each neighbour with margin > 0, list of the functions deployed in common with the current node + # If a neighbour does not have any function in common, it is not considered + functions_in_common = self.__get_common_functions(node_functions) + #print("Functions in common:", functions_in_common) + + # List of the neighbours available to receive load + neighbours = list(functions_in_common.keys()) + + # For each neighbour and for each function in common, contains the load forwarded by the current node + fwd_to_neigh = {} + for neigh, list_of_fun in functions_in_common.items(): + fwd_to_neigh[neigh] = {} + for i in range(0, len(list_of_fun)): + fwd_to_neigh[neigh][list_of_fun[i]] = 0 + + #print("Fwd to neigh original", fwd_to_neigh) + + # Iterator for: + # - Selected neighbour + # - Selected function for each neighbour + iterator = {} + iterator["neigh"] = 0 + for i in range (0, len(neighbours)): + iterator[neighbours[i]] = 0 + # Iterate removing load until the node is underload + overload = self._config_json[self._id]["node_metrics"]["overloaded_node"] + while overload == 1: + # Check if there's at least one neigh who can still receive load for some function + zero_neigh_available = True + for node_to in functions_in_common: + if len(functions_in_common[node_to]) != 0: + zero_neigh_available = False + break + + # If there're not neigh who can receive load, even if the node is in overload, it cannot transfer any request + if not zero_neigh_available: + # Get the name of the function and the node selected by the iterator + node_to = list(functions_in_common.keys())[iterator["neigh"]] + func_to = functions_in_common[node_to][iterator[node_to]] + + # Check if there're still requests to remove from the selected function + if remained_requests_functions[func_to] > 0: + # Extracts the load of the selected neighbour (this information has been forwarded toghether with the information about the margin) + original_node_to_features = copy.deepcopy(self._margin_percentage_out[node_to]["load"]) + # Add node type + original_node_to_features["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[self._margin_percentage_out[node_to]["node_type"]] + + original_node_to_predictions = self._model_proxy.get_node_predictions(original_node_to_features) + + # Calculate the percentage usage for each node metric + original_node_to_percentage = self.__calculate_usage_node(original_node_to_predictions, self._margin_percentage_out[node_to]["node_type"]) + #print("Original node" + node_to + " percentage: " + str(original_node_to_percentage)) + + # Calculate load to transfer + load_to_transfer = math.ceil(remained_requests_functions[func_to] * 0.01) + # Check if the selected "node_to" can receive the load + fwd_to_neigh[node_to][func_to] += load_to_transfer + #print("Fwd to neigh after trying to send a new request", fwd_to_neigh) + + # Get the correspondent group data of the current load chosen to be forwarded at "node_to" + group_data = self._model_proxy.transform_functions_in_groups(fwd_to_neigh[node_to]) + # Add the original load present in "node_to" + for group in self._margin_percentage_out[node_to]["load"]: + group_data["rate_group_" + group].iloc[0] += self._margin_percentage_out[node_to]["load"][group] + + # Add the information about the node type (it is forwarded togheter with the margin percentage) + group_data["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[self._margin_percentage_out[node_to]["node_type"]] + + #Get prediction using the new features + node_to_predictions = self._model_proxy.get_node_predictions(group_data) + + # Calculate the percentage usage for each node metric + node_to_percentage = self.__calculate_usage_node(node_to_predictions, self._margin_percentage_out[node_to]["node_type"]) + #print("New percentage after adding load:" + str(node_to_percentage)) + # Check if "node_to" can receive the request + if self._margin_percentage_out[node_to]["margin"] >= node_to_percentage - original_node_to_percentage: + remained_requests_functions[func_to] -= load_to_transfer + #print("New load after removing one request:", remained_requests_functions) + group_to_discard = self._get_group_of_function(func_to) + remained_requests_groups[group_to_discard] -= load_to_transfer + #Get prediction using the new features + node_predictions = self._model_proxy.get_node_predictions(remained_requests_groups) + + # Check if the current node is still in overload + overload = node_predictions["overloaded_node"].iloc[0] + #print("Is the node still in overload?:" + str(overload)) + else: + # Cancel the transfer of the request + fwd_to_neigh[node_to][func_to] -= 1 + + # Node_to cannot accept requests from func_to so remove the function from the possibilities + functions_in_common[node_to].remove(func_to) + # Reset the iterator of the functions of "Node_to" + iterator[node_to] = -1 + # If node_to has no more functions available, delete node_to from the possibilities + if len(functions_in_common[node_to]) == 0: + del functions_in_common[node_to] + iterator["neigh"] = -1 + + # Since the node has no more requests of the selected function to forward + # remove it from the possibilities of each neigh + else: + remained_nodes = list(functions_in_common.keys()) + for node in remained_nodes: + if func_to in functions_in_common[node]: + functions_in_common[node].remove(func_to) + iterator[node] = -1 + + if len(functions_in_common[node]) == 0: + del functions_in_common[node] + iterator["neigh"] = -1 + + # Reset iterator + iterator["neigh"] += 1 + if iterator["neigh"] >= len(list(functions_in_common.keys())): + iterator["neigh"] = 0 + if iterator[node_to] != -1: + iterator[node_to] += 1 + if iterator[node_to] == len(functions_in_common[node_to]): + iterator[node_to] = 0 + else: + break + + for fun in node_functions: + w[fun] = {} + # Obtain total number of requests of the selected function + tot_n_of_req = original_requests_functions[fun] + + for node in self._config_json[self._id]["neighbours"] + [self._id]: + if tot_n_of_req == 0: + w[fun][node] = 0.0 + else: + if node != self._id: + if node in list(fwd_to_neigh.keys()) and fun in list(fwd_to_neigh[node].keys()): + w[fun][node] = round(fwd_to_neigh[node][fun] / tot_n_of_req, 4) + else: + w[fun][node] = 0.0 + else: + w[fun][node] = round(remained_requests_functions[fun] / tot_n_of_req, 4) + + self._logger.info(w) + for func, val in w.items(): + self._logger.info("Weights for func {}: {}".format(func, val)) + return w + + # Calculate functions in common with each node in the net + def __get_common_functions(self, current_node_functions): + common_functions = {} + for node in self._config_json: + if node == self._id or not node in self._config_json[self._id]["neighbours"] or self._margin_percentage_out[node]["margin"] == 0: + continue + node_functions = self.__get_deployed_functions(node) + temp_common_functions = list(set(node_functions).intersection(current_node_functions)) + if len(temp_common_functions) > 0: + common_functions[node] = temp_common_functions + return common_functions + + # Calculate the percentage usage for each node metric + def __calculate_usage_node(self, predictions, node_type): + usage_percentage = {} + fwd_req = True + for metric, values in self._config_manager.MAX_RESOURCES_USAGE.items(): + # If at least one metric is higher than the considered max value, then the request cannot be forwarded + if predictions[metric].iloc[0] > values[node_type]: + fwd_req = False + break + else: + usage_percentage[metric] = (predictions[metric].iloc[0] * 100) / values[node_type] + if fwd_req: + # Obtain a single value of the node usage mediating over all the node metrics + total_usage_percentage = 0 + for metric in usage_percentage: + total_usage_percentage += usage_percentage[metric] + total_usage_percentage /= len(usage_percentage.keys()) + return total_usage_percentage + else: + return 100 + + diff --git a/framework/behaviour/power_saving_strategy.py b/framework/behaviour/power_saving_strategy.py new file mode 100644 index 0000000..7a213e7 --- /dev/null +++ b/framework/behaviour/power_saving_strategy.py @@ -0,0 +1,304 @@ +from .strategy import Strategy +import copy +import math + + +class PowerSavingStrategy(Strategy): + def __init__(self, config_json): + super().__init__(config_json) + + # For each node represents the percentage of usage that the neighbours can use + self._margin_percentage_in = {} + + # Represents the percentage of usage that is possible forward to each neighbour + self._margin_percentage_out = {} + + def run(self) -> dict: + return self.__loop() + + def set_id(self, id): + self._id = self._config_manager.NODE_KEY_PREFIX + str(id) + self._prefix = "THREAD: " + self._id + + def set_logger(self, logger): + self._logger = logger + + def __loop(self) -> dict: + self._calculate_margin() + self._exchange() + return self._calculate_weights() + + # Get all the functions deployed in the selected node + def __get_deployed_functions(self, actual_node): + functions_deployed = [] + for k, value in self._config_json[actual_node]["load"].items(): + if "USAGE" in k: + for i in range(0, len(value["functions"])): + functions_deployed.append(value["functions"][i]["function_name"]) + return functions_deployed + + # Get the number of neigh with at least one function in common with the actual node + def __neigh_with_at_least_one_fun_in_common(self, actual_node): + + functions_deployed = self.__get_deployed_functions(actual_node) + + num = 0 + for neigh in self._config_json[actual_node]["neighbours"]: + found = False + for k, value in self._config_json[neigh]["load"].items(): + if "USAGE" in k: + for i in range(0, len(value["functions"])): + if value["functions"][i]["function_name"] in functions_deployed: + num += 1 + found = True + break + if found == True: + break + return num + + # Calculates the margin in for each node in the net + def _calculate_margin(self): + for node, node_infos in self._config_json.items(): + ##print(node) + self._margin_percentage_in[node] = {} + node_type = node_infos["node_type"] + self._margin_percentage_in[node]["node_type"] = node_type + # In case the node is in overload or the power usage is greater then the max value, the only information that produces is margin = 0 + # avoiding to send information about the load + if node_infos["node_metrics"]["overloaded_node"] == 0 and node_infos["node_metrics"]["power_usage_node"] < self._config_manager.MAX_POWER_USAGE[node_type]: + # Get the load of each group in the node + self._margin_percentage_in[node]["load"] = {} + for group, values in node_infos["load"].items(): + self._margin_percentage_in[node]["load"][group] = values["total_rate"] + neigh_num = self.__neigh_with_at_least_one_fun_in_common(node) + ##print(neigh_num) + if neigh_num == 0: + self._margin_percentage_in[node]["margin"] = 0 + else: + total_usage_percentage = (node_infos["node_metrics"]["power_usage_node"] * 100) / self._config_manager.MAX_POWER_USAGE[node_type] + # Obtain the margin of the node + self._margin_percentage_in[node]["margin"] = round((100 - total_usage_percentage) / neigh_num, 4) + else: + self._margin_percentage_in[node]["margin"] = 0 + + def _exchange(self): + """ + Mocked phase of algo. All agents already has all information to execute weights calculation + During this phase "margin_percentage_in" are exchanged from each node to other neighbours + Note: during previous phase "margin_percentage_in" are calculated for each node so the exchange phase only transform margin_percentage_in + in margin_percentage_out + """ + + for node, limits in self._margin_percentage_in.items(): + if self._id == node or not node in self._config_json[self._id]["neighbours"]: + continue + self._margin_percentage_out[node] = {} + for k, v in limits.items(): + self._margin_percentage_out[node][k] = v + + logger_dict = {} + for node, values in self._margin_percentage_out.items(): + logger_dict[node] = values["margin"] + self._logger.info("Margin percentage out: {}".format(logger_dict)) + + def _calculate_weights(self): + w = {} + + # Extract loads data of the node from the config file + original_requests_groups, original_requests_functions = self._get_load_for_groups_and_functions(self._config_json[self._id]) + + # Add the information about the type of the node + node_type = self._config_json[self._id]["node_type"] + original_requests_groups["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[node_type] + + # Get a copy for each original dictionary + remained_requests_groups = copy.deepcopy(original_requests_groups) + remained_requests_functions = copy.deepcopy(original_requests_functions) + + # List of functions deployed on the node + node_functions = list(original_requests_functions.keys()) + + # For each neighbour with margin > 0, list of the functions deployed in common with the current node + # If a neighbour does not have any function in common, it is not considered + functions_in_common = self.__get_common_functions(node_functions) + #print("Functions in common:", functions_in_common) + + # List of the neighbours available to receive load + neighbours = list(functions_in_common.keys()) + + # For each neighbour and for each function in common, contains the load forwarded by the current node + fwd_to_neigh = {} + for neigh, list_of_fun in functions_in_common.items(): + fwd_to_neigh[neigh] = {} + for i in range(0, len(list_of_fun)): + fwd_to_neigh[neigh][list_of_fun[i]] = 0 + + #print("Fwd to neigh original", fwd_to_neigh) + + # Iterator for: + # - Selected neighbour + # - Selected function for each neighbour + iterator = {} + iterator["neigh"] = 0 + for i in range (0, len(neighbours)): + iterator[neighbours[i]] = 0 + # Iterate removing load until the node is underload + overload = self._config_json[self._id]["node_metrics"]["overloaded_node"] + node_consumption = self._config_json[self._id]["node_metrics"]["power_usage_node"] + while overload == 1 or node_consumption > self._config_manager.MAX_POWER_USAGE[node_type]: + # Check if there's at least one neigh who can still receive load for some function + zero_neigh_available = True + for node_to in functions_in_common: + if len(functions_in_common[node_to]) != 0: + zero_neigh_available = False + break + + # if there're not neigh who can receive load, even if the node is in overload, it cannot transfer any request + if not zero_neigh_available: + # Get the name of the function and the node selected by the iterator + node_to = list(functions_in_common.keys())[iterator["neigh"]] + func_to = functions_in_common[node_to][iterator[node_to]] + + # Check if there're still requests to remove from the selected function + if remained_requests_functions[func_to] > 0: + # Extracts the load of the selected neighbour (this information has been forwarded toghether with the information about the margin) + original_node_to_features = copy.deepcopy(self._margin_percentage_out[node_to]["load"]) + # Add node type + node_type_to = self._margin_percentage_out[node_to]["node_type"] + original_node_to_features["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[node_type_to] + + original_node_to_predictions = self._model_proxy.get_node_predictions(original_node_to_features) + + # Calculate the percentage usage for each node metric + original_node_to_percentage = (original_node_to_predictions["power_usage_node"].iloc[0] * 100) / self._config_manager.MAX_POWER_USAGE[node_type_to] + #print("Original node" + node_to + " percentage: " + str(original_node_to_percentage)) + + # Calculate load to transfer + load_to_transfer = math.ceil(remained_requests_functions[func_to] * 0.01) + # Check if the selected "node_to" can receive the load + fwd_to_neigh[node_to][func_to] += load_to_transfer + #print("Fwd to neigh after trying to send a new request", fwd_to_neigh) + + # Get the correspondent group data of the current load chosen to be forwarded at "node_to" + group_data = self._model_proxy.transform_functions_in_groups(fwd_to_neigh[node_to]) + # Add the original load present in "node_to" + for group in self._margin_percentage_out[node_to]["load"]: + group_data["rate_group_" + group].iloc[0] += self._margin_percentage_out[node_to]["load"][group] + + # Add the information about the node type (it is forwarded togheter with the margin percentage) + group_data["node_type"] = self._config_manager.NODES_TYPES_IN_MODELS[node_type_to] + + #Get prediction using the new features + node_to_predictions = self._model_proxy.get_node_predictions(group_data) + + # Calculate the percentage usage for each node metric + node_to_percentage = (node_to_predictions["power_usage_node"].iloc[0] * 100) / self._config_manager.MAX_POWER_USAGE[node_type_to] + #print("New percentage after adding load:" + str(node_to_percentage)) + # Check if "node_to" can receive the request + if self._margin_percentage_out[node_to]["margin"] >= node_to_percentage - original_node_to_percentage: + remained_requests_functions[func_to] -= load_to_transfer + #print("New load after removing one request:", remained_requests_functions) + group_to_discard = self._get_group_of_function(func_to) + remained_requests_groups[group_to_discard] -= 1 + #Get prediction using the new features + node_predictions = self._model_proxy.get_node_predictions(remained_requests_groups) + + # Check if the current node is still in overload + overload = node_predictions["overloaded_node"].iloc[0] + node_consumption = node_predictions["power_usage_node"].iloc[0] + #print("Is the node still in overload?:" + str(overload)) + else: + # Cancel the transfer of the request + fwd_to_neigh[node_to][func_to] -= load_to_transfer + + # Node_to cannot accept requests from func_to so remove the function from the possibilities + functions_in_common[node_to].remove(func_to) + # Reset the iterator of the functions of "Node_to" + iterator[node_to] = -1 + # If node_to has no more functions available, delete node_to from the possibilities + if len(functions_in_common[node_to]) == 0: + del functions_in_common[node_to] + iterator["neigh"] = -1 + + # Since the node has no more requests of the selected function to forward + # remove it from the possibilities of each neigh + else: + remained_nodes = list(functions_in_common.keys()) + for node in remained_nodes: + if func_to in functions_in_common[node]: + functions_in_common[node].remove(func_to) + iterator[node] = -1 + + if len(functions_in_common[node]) == 0: + del functions_in_common[node] + iterator["neigh"] = -1 + + # Reset iterator + iterator["neigh"] += 1 + if iterator["neigh"] >= len(list(functions_in_common.keys())): + iterator["neigh"] = 0 + if iterator[node_to] != -1: + iterator[node_to] += 1 + if iterator[node_to] == len(functions_in_common[node_to]): + iterator[node_to] = 0 + else: + break + + + + for fun in node_functions: + w[fun] = {} + # Obtain total number of requests of the selected function + tot_n_of_req = original_requests_functions[fun] + + for node in self._config_json[self._id]["neighbours"] + [self._id]: + if tot_n_of_req == 0: + w[fun][node] = 0.0 + else: + if node != self._id: + if node in list(fwd_to_neigh.keys()) and fun in list(fwd_to_neigh[node].keys()): + w[fun][node] = round(fwd_to_neigh[node][fun] / tot_n_of_req, 4) + else: + w[fun][node] = 0.0 + else: + w[fun][node] = round(remained_requests_functions[fun] / tot_n_of_req, 4) + + for func, val in w.items(): + self._logger.info("Weights for func {}: {}".format(func, val)) + + return w + + # Calculate functions in common with each node in the net + def __get_common_functions(self, current_node_functions): + common_functions = {} + for node in self._config_json: + if node == self._id or not node in self._config_json[self._id]["neighbours"] or self._margin_percentage_out[node]["margin"] == 0: + continue + node_functions = self.__get_deployed_functions(node) + temp_common_functions = list(set(node_functions).intersection(current_node_functions)) + if len(temp_common_functions) > 0: + common_functions[node] = temp_common_functions + return common_functions + + # Calculate the percentage usage for each node metric + def __calculate_usage_node(self, predictions, node_type): + usage_percentage = {} + fwd_req = True + for metric, values in self._config_manager.MAX_RESOURCES_USAGE.items(): + # If at least one metric is higher than the considered max value, then the request cannot be forwarded + if predictions[metric].iloc[0] > values[node_type]: + fwd_req = False + break + else: + usage_percentage[metric] = (predictions[metric].iloc[0] * 100) / values[node_type] + if fwd_req: + # Obtain a single value of the node usage mediating over all the node metrics + total_usage_percentage = 0 + for metric in usage_percentage: + total_usage_percentage += usage_percentage[metric] + total_usage_percentage /= len(usage_percentage.keys()) + return total_usage_percentage + else: + return 100 + + diff --git a/framework/behaviour/strategy.py b/framework/behaviour/strategy.py new file mode 100644 index 0000000..332686d --- /dev/null +++ b/framework/behaviour/strategy.py @@ -0,0 +1,36 @@ +from __future__ import annotations +from abc import ABC, abstractmethod +from configuration.config_manager import ConfigManager +from model.model_proxy import ModelProxy + +class Strategy(ABC): + def __init__(self, config_file): + self._config_json = config_file + self._config_manager = ConfigManager() + + # Model manager is passed from simulation context + self._model_proxy = None + + @abstractmethod + def run(self) -> dict: + pass + + def set_model_proxy(self, model_manager: ModelProxy): + self._model_proxy = model_manager + + # Get load for each group and function in the node reading the config file + def _get_load_for_groups_and_functions(self, config_json): + groups_requests = {} + functions_requests = {} + for k, value in config_json["load"].items(): + if "USAGE" in k: + groups_requests[k] = value["total_rate"] + for i in range(0, len(value["functions"])): + functions_requests[value["functions"][i]["function_name"]] = value["functions"][i]["function_rate"] + return groups_requests, functions_requests + + # Get the name of the group of the function passed + def _get_group_of_function(self, function_name): + for group, functions in self._config_manager.GROUPS.items(): + if function_name in functions: + return group diff --git a/framework/cli/__init__.py b/framework/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/cli/cli.py b/framework/cli/cli.py new file mode 100644 index 0000000..cb7cd35 --- /dev/null +++ b/framework/cli/cli.py @@ -0,0 +1,95 @@ +import argparse +from configuration.config_manager import ConfigManager + +config_manager = ConfigManager() + +def analyzer_and_simulator_arguments(): + """ + Method used to handle arguments passed by terminal to the simulator + """ + parser =argparse.ArgumentParser() + parser.add_argument('-m', '--modeltype', type=str, default="regression", required=False, + help="Optional parameter used to choose the model type to use during the simulation (regression, quantile005, quantile095). Default value is \"regression\"") + + args = parser.parse_args() + if args.modeltype != "regression" and args.modeltype != "quantile005" and args.modeltype != "quantile095": + raise parser.error("Model type can only be \"regression\" \"quantile005\" or \"quantile095\"") + return args + + +def simulation_controller_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('-n', '--nodesnum', type=int, default=-1, required=False, + help="Represent the number of nodes used in simulation") + parser.add_argument('-s', '--seed', type=int, default=-1, required=False, + help="Optional param that represent seed used for random number generation. It cannot be used if expnum is greater than 1") + parser.add_argument('-p', '--edgeprob', type=float, default=-1.0, required=False, + help="Optional param that represent probability of creating an edge") + parser.add_argument('-o', '--overloaded', nargs='+', type=int, required=False, + help="Optional param that represent the percentages of overloaded node to test on the same instance. Default values are 30, 60, 90") + parser.add_argument('-e', '--expnum', type=int, default=-1, required=False, + help="Optional param that represent the number of experiment to perform. If not setted, it will be executed just one experiment") + parser.add_argument('-m', '--modeltype', type=str, default="regression", required=False, + help="Optional parameter used to choose the model type to use during the simulation (regression, quantile005, quantile095). Default value is \"regression\"") + + args = parser.parse_args() + + if args.modeltype != "regression" and args.modeltype != "quantile005" and args.modeltype != "quantile095": + raise parser.error("Model type can only be \"regression\" \"quantile005\" or \"quantile095\"") + if args.nodesnum != -1 and args.nodesnum <= 0: + raise parser.error("Nodes number must be integer, greater than 0") + if args.expnum != -1 and args.expnum <= 0: + raise parser.error("Experiments number must be integer, greater than 0") + if args.seed != -1 and args.expnum > 1: + raise parser.error("Seed cannot be setted when experiments number is greater than 1") + if args.seed != -1 and args.seed <= 0: + raise parser.error("Seed must be an integer, greater than 0") + if args.edgeprob != -1.0 and (args.edgeprob < 0.0 or args.edgeprob > 1.0): + raise parser.error("Edge probability must be a float number between 0 and 1") + if args.overloaded is not None: + for rate in args.overloaded: + if rate < 0 or rate > 100: + raise parser.error("Overloaded percentage must be integer, grater eq than 0 and lower eq than 100") + return args + +def parse_arguments(): + """ + Method used for parse arguments passed by terminal. + """ + parser = argparse.ArgumentParser() + parser.add_argument('-n', '--nodesnum', type=int, default=-1, required=False, + help="Represent the number of nodes used in simulation") + parser.add_argument('-s', '--seed', type=int, default=-1, required=False, + help="Optional param that represent seed used for random number generation") + parser.add_argument('-p', '--edgeprob', type=float, default=-1.0, required=False, + help="Optional param that represent probability of creating an edge") + parser.add_argument('-o', '--overloaded', type=int, default=-1, required=False, + help="Optional param that represent the percentages of overloaded node to test on the same instance") + + args = parser.parse_args() + + if args.nodesnum != -1 and args.nodesnum <= 0: + raise parser.error("Nodes number must be integer, greater than 0") + if args.seed != -1 and args.seed <= 0: + raise parser.error("Seed must be an integer, greater than 0") + if args.edgeprob != -1.0 and (args.edgeprob < 0.0 or args.edgeprob > 1.0): + raise parser.error("Edge probability must be a float number between 0 and 1") + if args.overloaded != -1 and (args.overloaded < 0 or args.overloaded > 100): + raise parser.error("Overloaded percentage must be integer, grater eq than 0 and lower eq than 100") + return args + + +def get_args(): + """ + Returns dictionary created with key-value params passed to program. + """ + kargs = dict(parse_arguments()._get_kwargs()) + return kargs + +def get_analyzer_and_simulator_args(): + kargs = dict(analyzer_and_simulator_arguments()._get_kwargs()) + return kargs + +def get_simulation_controller_args(): + kargs = dict(simulation_controller_arguments()._get_kwargs()) + return kargs \ No newline at end of file diff --git a/framework/configuration/__init__.py b/framework/configuration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/configuration/config_manager.py b/framework/configuration/config_manager.py new file mode 100644 index 0000000..f876fcd --- /dev/null +++ b/framework/configuration/config_manager.py @@ -0,0 +1,113 @@ +import os +import json +from pathlib import Path + + +class ConfigManager(object): + """ + Class used to create/read configuration file. + """ + + _config_manager = None # Singletoon instance of this class. + + root_path = Path("../").resolve() + simulation_dir = root_path.joinpath("simulation") + output_dir = simulation_dir.joinpath("outputs") + + # Constants used in instance generator script + OUTPUT_INSTANCE_PATH = output_dir.joinpath("instance_gen_output") + OUTPUT_INSTANCE_JSON_FILE_PATH = OUTPUT_INSTANCE_PATH.joinpath("instance.json") # Path for output instance json file + OUTPUT_INSTANCE_GRAPH_FILE_PATH = OUTPUT_INSTANCE_PATH.joinpath("graph.png") + DATA_DIR = simulation_dir.joinpath("data") # Directory that contains experiment files + + NODES_TYPES = ["HEAVY", "MID", "LIGHT"] # Nodes types used in experiments + NODES_TYPE_PROBABILITY_DISTRIBUTION = [0.2, 0.3, 0.5] + NODES_TYPES_IN_MODELS = {"HEAVY": 0, "MID": 1, "LIGHT": 2} # Association of the nodes types with the values used in the models + PREDICTED_METRICS = ["cpu_usage_node", "overloaded_node", "power_usage_node", "ram_usage_node"] # Metrics predicted by the forecaster + MODEL_TYPES = ["quantile005", "quantile095", "regression"] + NODE_KEY_PREFIX = "node_" + + FUNCTION_GROUPS = 4 # Number of groups of functions + GROUP_KEY_PREFIX = "group_" + GROUPS_MAX_RATES = [70, 200, 70, 50] # Max rates for each group of function + INITIAL_LOAD_STEPS = 2 # Number of instances for increasing load + MODEL_BASE_PATH = '../metrics_predictions/system-forecaster-models/groups/' + SCALER_BASE_PATH = '../metrics_predictions/scalers/groups/' + GROUP_FILE_PATH = '../metrics_predictions/group_list.json' # group_list path + + # Read group_list file + with open(GROUP_FILE_PATH, 'r') as json_file: + groups_number = json.load(json_file) + + # Map groups number with the correspondent name + GROUPS = {} + for key, value in groups_number.items(): + if "figlet" in value: + GROUPS["LOW_USAGE"] = value + elif "nmap" in value: + GROUPS["HIGH_USAGE"] = value + else: + GROUPS["MEDIUM_USAGE"] = value + + # All the functions present in the config file + FUNCTION_NAMES = [] + + # Name of rates columns + GROUPS_COLUMNS_NAMES = [] + for group, functions in GROUPS.items(): + GROUPS_COLUMNS_NAMES.append('rate_group_' + group) + for function in functions: + FUNCTION_NAMES.append(function) + + # Parameters for Node Margin Strategy + MAX_RESOURCES_USAGE = {"cpu_usage_node": {"HEAVY": 460, "MID": 290, "LIGHT": 150}, + "ram_usage_node": {"HEAVY": 6000000000, "MID": 5500000000, "LIGHT": 4100000000}, + "power_usage_node": {"HEAVY": 3500000, "MID": 2200000, "LIGHT": 700000}} + + # Parameters for Power Saving Strategy + MAX_POWER_USAGE = {node_type: value * 0.70 for node_type, value in MAX_RESOURCES_USAGE["power_usage_node"].items()} + + # Constants used in simulation script + SIMULATION_MINUTES = 7 + + STRATEGIES = ["base_strategy", "equal_strategy", "node_margin_strategy", "power_saving_strategy"] + BASE_STRATEGY = STRATEGIES[0] + EQUAL_STRATEGY = STRATEGIES[1] + NODE_MARGIN_STRATEGY = STRATEGIES[2] + POWER_SAVING_STRATEGY = STRATEGIES[3] + + SIMULATION_OUTPUT_DIR = output_dir.joinpath("simulation_output") + SIMULATION_COMPLETE_CONFIGURATION_OUTPUT_PATH = SIMULATION_OUTPUT_DIR.joinpath("minute_config") + SIMULATION_TABLES_OUTPUT_PATH = SIMULATION_OUTPUT_DIR.joinpath("reports") + SIMULATION_AGENT_LOGGING_BASE_PATH = SIMULATION_OUTPUT_DIR.joinpath("logs") + + # Constant used in analyzer + ANALYSIS_PERCENTILE = 90 + INDEX_TO_COMPARE = [ + "Mean success rate", + "Success rate median", + "Mean success rate (stress period)", + "Success rate median (stress period)", + "Mean node power consumption", + "Max node power consumption", + "Tot. rejected requests" + ] + + ANALYZER_OUTPUT_PATH = output_dir.joinpath("analyzer_output") + INDEX_COMPARISON_FILE = ANALYZER_OUTPUT_PATH.joinpath("index_comparison.csv") + + # Constant used in simulation controller + NUMBER_OF_SIMULATION_EXECUTION = 5 + SIMULATION_CONTROLLER_OUTPUT_PATH = output_dir.joinpath("simulation_controller_output") + SIMULATION_CONTROLLER_OUTPUT_FILE = SIMULATION_CONTROLLER_OUTPUT_PATH.joinpath("experiment_results.csv") + SIMULATION_CONTROLLER_OUTPUT_MEAN_FILE = SIMULATION_CONTROLLER_OUTPUT_PATH.joinpath("experiment_mean_results.csv") + + # Constant used for store resume of simulation controller + SIMULATION_CONTROLLER_ARCHIVE_PATH = output_dir.joinpath("archive") + SIMULATION_CONTROLLER_ARCHIVE_COMPARISON_FILE_NAME = "final_comparison.txt" + + def __new__(self): + # if is not define create new instance otherwise return only instance of thi class. + if not isinstance(self._config_manager, self): + self._config_manager = object.__new__(self) + return self._config_manager diff --git a/framework/factory/__init__.py b/framework/factory/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/factory/strategy_factory.py b/framework/factory/strategy_factory.py new file mode 100644 index 0000000..a1940cf --- /dev/null +++ b/framework/factory/strategy_factory.py @@ -0,0 +1,32 @@ +from behaviour.equal_strategy import EqualStrategy +from behaviour.base_strategy import BaseStrategy +from behaviour.node_margin_strategy import NodeMarginStrategy +from behaviour.power_saving_strategy import PowerSavingStrategy +from configuration.config_manager import ConfigManager + + +class StrategyFactory(): + @staticmethod + def create_strategy(strategy_type, node_key, final_config): + """ + Create strategy based on type passed as first parameter + """ + # Important: the list of neighbours is supposed to be already present in a node in the reality + # So it is not a message to forward between the nodes + + config_manager = ConfigManager() + if strategy_type == config_manager.BASE_STRATEGY: + # It only needs its configuration and a list of neighbours + return BaseStrategy(final_config[node_key]) + + if strategy_type == config_manager.EQUAL_STRATEGY: + # It only needs its configuration and a list of neighbours + return EqualStrategy(final_config[node_key]) + + if strategy_type == config_manager.NODE_MARGIN_STRATEGY: + # It needs the configuration of all the nodes in order to calculate phase 1 correctly + return NodeMarginStrategy(final_config) + + if strategy_type == config_manager.POWER_SAVING_STRATEGY: + # It needs the configuration of all the nodes in order to calculate phase 1 correctly + return PowerSavingStrategy(final_config) diff --git a/framework/instance_generator.py b/framework/instance_generator.py new file mode 100644 index 0000000..8bc5d39 --- /dev/null +++ b/framework/instance_generator.py @@ -0,0 +1,314 @@ +import os +import json +import networkx as nx +import random +import matplotlib.pyplot as plt +import numpy as np +import math +from cli.cli import get_args +from itertools import combinations, groupby +from configuration.config_manager import ConfigManager +import pandas as pd + +config_manager = ConfigManager() + +def generate_configs(nodes_number, overloaded_max_percentage, seed): + net_gen = np.random.default_rng(seed) + load_gen = np.random.default_rng(seed) + + n_max_overload = round(overloaded_max_percentage * nodes_number) + actual_overload = 0 + # Fetching node types and their probability distribution from the config_manager + nodes_type = config_manager.NODES_TYPES + nodes_type_prob_dist = config_manager.NODES_TYPE_PROBABILITY_DISTRIBUTION + + # Randomly choosing node types based on the provided probability distribution + choices = net_gen.choice(nodes_type, nodes_number, p=nodes_type_prob_dist) + + # List to store the generated configurations + configurations = [] + + # Iterating over each chosen node type + for choice in choices: + # random.seed(seed) + # np.random.seed(seed) + # Configuration dictionary for the current node + node_config = {} + node_config["node_type"] = choice + node_config["load"] = [] + + # Randomly determining the number of groups for the current node + # Each node should have at least two groups in order to guarantee the overload in at least the peak istant + node_groups_number = net_gen.integers(1,4) + + # Randomly selecting groups from the available groups in config_manager + groups = net_gen.choice(list(config_manager.GROUPS.keys()), node_groups_number, replace=False) + + # Dictionary to store functions for each group + functions_groups = {} + for group in groups: + # Randomly selecting a subset of functions for each group + num_elements_to_choose = net_gen.integers(1, len(config_manager.GROUPS[group]) + 1) + # TODO: puoi usare il choice del net_gen al posto del sample + functions_groups[group] = net_gen.choice(config_manager.GROUPS[group], num_elements_to_choose, replace=False) + + max_rates_o = {"HEAVY": {"LOW_USAGE": 400, "HIGH_USAGE": 90, "MEDIUM_USAGE": 200}, + "MID": {"LOW_USAGE": 570, "HIGH_USAGE": 80, "MEDIUM_USAGE": 200}, + "LIGHT": {"LOW_USAGE": 330, "HIGH_USAGE": 40, "MEDIUM_USAGE": 180}} + + # Create a new dictionary with values as 15% of the original values + max_rates_u = {key1: {key2: value * 0.15 for key2, value in inner_dict.items()} for key1, inner_dict in max_rates_o.items()} + + + overloaded_node = 1 if load_gen.random() <= overloaded_max_percentage else 0 + + if overloaded_node == 1 and actual_overload < n_max_overload: + max_rates = max_rates_o + actual_overload += 1 + else: + max_rates = max_rates_u + + # For each time instant in the simulation + for instant in range(0, config_manager.SIMULATION_MINUTES): + + # Dictionary to store configurations for each group + config_group = {} + + # For each group in the node + for group in groups: + # List to store function configurations for the current group + group_list = [] + + # Triangle generation increasing until the middle simulation instant and decreasing after that + maximum_rate_instant = math.ceil(config_manager.SIMULATION_MINUTES/2) + + if instant < maximum_rate_instant: + group_load = load_gen.integers(math.floor(instant * (max_rates[choice][group] / maximum_rate_instant)), math.ceil((instant + 1) * (max_rates[choice][group] / maximum_rate_instant)), endpoint=True) + else: + top_instant = config_manager.SIMULATION_MINUTES + group_load = load_gen.integers(math.floor((top_instant - (instant + 1)) * (max_rates[choice][group] / maximum_rate_instant)), math.ceil((top_instant - instant) * (max_rates[choice][group] / maximum_rate_instant)), endpoint=True) + + group_load = int(group_load) + + # Generate random load values for each function + loads = [load_gen.integers(0, group_load, endpoint=True) for _ in range(len(functions_groups[group]))] + + # Normalize the loads to sum up to total_load + load_sum = sum(loads) + + if load_sum == 0: + normalized_loads = [0] * len(functions_groups[group]) + else: + normalized_loads = [load * group_load // load_sum for load in loads] + + # Adjust for rounding errors + remaining_load = group_load - sum(normalized_loads) + normalized_loads[0] += remaining_load + + # Creates the config for each function + for i in range(0, len(functions_groups[group])): + function_config = { + "function_name": functions_groups[group][i], + "function_rate": int(normalized_loads[i]) + } + group_list.append(function_config) + + # Adding a new level of nesting for each group + config_group[group] = { + "functions": group_list, + "total_rate": group_load + } + + # Adding group configurations to the node's load + node_config["load"].append(config_group) + + # Adding node configuration to the list of configurations + #plot_node_load(node_config, groups) + configurations.append(node_config) + + + # Printing the generated configurations (for debugging purposes) + #print(configurations) + + return configurations + + + +def gnp_random_connected_graph(n, p): + """ + Generates a random undirected graph, similarly to an Erdős-Rényi + graph, but enforcing that the resulting graph is connected + + See: https://stackoverflow.com/questions/61958360/how-to-create-random-graph-where-each-node-has-at-least-1-edge-using-networkx + """ + edges = combinations([el[0] for el in n], 2) + G = nx.Graph() + G.add_nodes_from(n) + if p <= 0: + return G + if p >= 1: + return nx.complete_graph(n, create_using=G) + for _, node_edges in groupby(edges, key=lambda x: x[0]): + node_edges = list(node_edges) + random_edge = random.choice(node_edges) + G.add_edge(*random_edge) + for e in node_edges: + if random.random() < p: + G.add_edge(*e) + return G + +def plot_graph(G): + """ + Plot graph and export on file + """ + plt.figure(figsize=(10, 6)) + nx.draw(G, node_color='lightblue', + with_labels=True, + node_size=2000) + path = config_manager.OUTPUT_INSTANCE_PATH + if not os.path.exists(path): + os.makedirs(path) + plt.savefig(config_manager.OUTPUT_INSTANCE_GRAPH_FILE_PATH) + +def dict_key_substitution(data, old, new): + """ + Utility function used to substitute dictionary key + """ + data[new] = data[old] + del data[old] + +def plot_node_load(node_config, groups): + # Plot configurations + plt.figure(figsize=(20, 10)) + plt.title("Rate for each group in a {} node during {} minutes of experiment".format(node_config["node_type"], config_manager.SIMULATION_MINUTES)) + plt.xlabel("Minute") + plt.ylabel("Requests per second") + + # Initialize lists to store data + low_usage = [] + medium_usage = [] + high_usage = [] + + # Extract data into separate lists + for entry in node_config['load']: + low_total_rate = entry.get('LOW_USAGE', {}).get('total_rate', None) + medium_total_rate = entry.get('MEDIUM_USAGE', {}).get('total_rate', None) + high_total_rate = entry.get('HIGH_USAGE', {}).get('total_rate', None) + + low_usage.append(low_total_rate) + medium_usage.append(medium_total_rate) + high_usage.append(high_total_rate) + + # Create DataFrame + df = pd.DataFrame({ + 'LOW_USAGE': low_usage, + 'MEDIUM_USAGE': medium_usage, + 'HIGH_USAGE': high_usage + }) + + # Define line styles and colors + line_styles = ['-', '--', '-.', ':'] + colors = ['blue', 'green', 'red', 'orange'] + # Plot each column with a different line style and color + for i, column in enumerate(df.columns): + linestyle = line_styles[i % len(line_styles)] + color = colors[i % len(colors)] + plt.plot(df.index, df[column], label="r/s for {}".format(column), linestyle=linestyle, color=color) + + # Plot configurations + plt.legend(loc="upper right") + plt.grid() + + plt.savefig(config_manager.OUTPUT_INSTANCE_PATH.joinpath('temp.png')) + +def create_node_representation(key, start_config, G): + """ + Create node json representation in new format of instance file + """ + node_config = {} + + node_config["node_type"] = start_config["node_type"] + node_config["neighbours"] = list(G[key].keys()) + node_config["load"] = start_config["load"] + + return node_config + +def build_output_json(seed, nodes_num, edge_prob, overloaded_max_percentage, G): + """ + Function used to build output json file that represents the instance + """ + instance = {} + instance["seed"] = seed + instance["nodes_number"] = nodes_num + instance["edge_prob"] = edge_prob + instance["overloaded_perc"] = overloaded_max_percentage * 100 + + # Iterate over all graph nodes + for node in G.nodes(data=True): + key, config = node[0], node[1] + instance[key] = create_node_representation(key, config["config"], G) + + return instance + +def export_instance_file(instance): + with open(config_manager.OUTPUT_INSTANCE_JSON_FILE_PATH, 'w', encoding='utf-8') as f: + json.dump(instance, f, ensure_ascii=False, indent=4) + +def main(): + # Get args passed as params + kargs = get_args() + print(kargs) + nodes_num = kargs["nodesnum"] + seed = kargs["seed"] + probability = kargs["edgeprob"] + overloaded_max_percentage = kargs["overloaded"] + + # Assign default values to params if not defined + if nodes_num == -1: + nodes_num = 10 + if seed == -1: + seed = np.random.randint(4097) + if probability == -1.0: + probability = 0.1 + if overloaded_max_percentage == -1: + overloaded_max_percentage = 60 + + overloaded_max_percentage *= 0.01 + + #print(*(nodes_num, seed, probability, max_rates)) + # raise(Exception) + random.seed(seed) + np.random.seed(seed) + + # Load "nodes_num" configuration file + generated_configs = generate_configs(nodes_num, overloaded_max_percentage, seed) + + # print(loaded_files) + # Create a random graph with "nodes_num" nodes + # Nodes are tuple of (node_id, json_config) + # json_config is used as a node property + nodes = [] + for i, config in zip(range(0, nodes_num), generated_configs): + key = config_manager.NODE_KEY_PREFIX + str(i) + nodes.append((key, {"config": config})) + #print(nodes) + G = gnp_random_connected_graph(nodes, probability) + + # Export an image of graph + plot_graph(G) + + # Print neighbor for each node + # Utility print + print(" > Neighborhood:") + for n in nodes: + print(" > Node {} neighbors: {}".format(n[0], G[n[0]])) + + # Build instance file (JSON file) with all information + instance_json = build_output_json(seed, nodes_num, probability, overloaded_max_percentage, G) + + # Export instance file + export_instance_file(instance_json) + +# Call the main program. +if __name__ == "__main__": + main() diff --git a/framework/model/model.py b/framework/model/model.py new file mode 100644 index 0000000..7bc7112 --- /dev/null +++ b/framework/model/model.py @@ -0,0 +1,43 @@ +import joblib +import pandas as pd +import numpy as np + +from configuration.config_manager import ConfigManager + +class Model: + + __config_manager = ConfigManager() + + def __init__(self, metric, model_type): + self.metric = metric + self.model_type = model_type + + # Load the model by the joblib file produced by the forecaster + self.model = joblib.load(self.__config_manager.MODEL_BASE_PATH + metric + "/" + model_type + "/model.joblib") + + # For the overloaded metric, load only the scaler of the features + if "overloaded" in metric: + self.features_scaler = joblib.load(self.__config_manager.SCALER_BASE_PATH + "scaler_x/" + metric + ".joblib") + self.target_scaler = None + else: + self.features_scaler = joblib.load(self.__config_manager.SCALER_BASE_PATH + "scaler_x/features.joblib") + self.target_scaler = joblib.load(self.__config_manager.SCALER_BASE_PATH + "scaler_y/" + metric + ".joblib") + + def predict(self, input_data): + # Scale input dataset + input_data_scaled = self.features_scaler.transform(input_data) + + # Transform input dataset to dataframe + input_data_scaled = pd.DataFrame(input_data_scaled, columns=[*self.__config_manager.GROUPS_COLUMNS_NAMES, "node_type"]) + scaled_predictions = self.model.predict(input_data_scaled) + if "overloaded" in self.metric: + return scaled_predictions + else: + original_predictions = self.target_scaler.inverse_transform(scaled_predictions.reshape(-1,1)) + return np.round(original_predictions,2) + + + + + + diff --git a/framework/model/model_proxy.py b/framework/model/model_proxy.py new file mode 100644 index 0000000..b1fc71b --- /dev/null +++ b/framework/model/model_proxy.py @@ -0,0 +1,100 @@ +from configuration.config_manager import ConfigManager +from model.model import Model +import pandas as pd + +class ModelProxy: + """ + Class used as handler of all the models created by the forecaster + """ + _config_manager = ConfigManager() + _models = [] + + def __init__(self): + self._model_type = None + + def _create_model(self, metric): + if "overloaded" not in metric: + return Model(metric, self._model_type) + else: + return Model(metric, "") + + def _get_model(self, metric): + """ + Method used to find a particular model in the list + :metric: the metric predicted by the model + """ + for model in self._models: + if model.metric == metric and ("overloaded" in model.metric or model.model_type == self._model_type): + return model + model = self._create_model(metric) + self._models.append(model) + return model + #raise Exception("It has not been possible to find the requested model") + + + def _process_input(self, input_data): + """ + Method used to transform input_data in the correct data structure + :input_data: data to transform + """ + input_data_df = pd.DataFrame(index=range(0,1)) + + # Populate the df with the load of each group + for group in self._config_manager.GROUPS: + if group in input_data: + input_data_df['rate_group_' + group] = input_data[group] + else: + input_data_df['rate_group_' + group] = 0 + + # Insert the information about the node type in the df + input_data_df["node_type"] = input_data["node_type"] + return input_data_df + + def get_predictions(self, input_data, metric): + """ + Method used get predictions from a particular model + :input_data: features values + :metric: the metric predicted by the model + """ + # Check if the features are already in the correct structure + if isinstance(input_data, dict): + input_data_df = self._process_input(input_data) + else: + input_data_df = input_data + return self._get_model(metric).predict(input_data_df) + + def get_node_predictions(self, input_data): + """ + Method used get predictions of the all node metrics + :input_data: features values + """ + predictions = pd.DataFrame(index=range(0,1)) + + # Iterate over all the metric of the node + for metric in self._config_manager.PREDICTED_METRICS: + predictions[metric] = self.get_predictions(input_data, metric) + return predictions + + + def transform_functions_in_groups(self, functions_data): + """ + Method used to transform data from function to group form + :input_data: data in function form + """ + groups_data = pd.DataFrame(index=range(0,1)) + for group, functions in self._config_manager.GROUPS.items(): + temp_counter = 0 + for function in functions: + # This branch of the if is used in the stategies, and it could not contain the selected function + if isinstance(functions_data, dict): + if function in list(functions_data.keys()): + temp_counter += functions_data[function] + # This branch is used in the analyzer, and it contains already all the functions + else: + temp_counter += functions_data.get(function) + groups_data["rate_group_" + group] = temp_counter + return groups_data + + def set_model_type(self, model_type): + self._model_type = model_type + diff --git a/framework/simulation.py b/framework/simulation.py new file mode 100644 index 0000000..b06f6c1 --- /dev/null +++ b/framework/simulation.py @@ -0,0 +1,250 @@ +import logging +import time +import json +import pandas as pd +import numpy as np +from agent.agent import Agent +from configuration.config_manager import ConfigManager +from factory.strategy_factory import StrategyFactory +from model.model_proxy import ModelProxy +from cli.cli import get_analyzer_and_simulator_args +import os + +config_manager = ConfigManager() + +# Create the specific path if not present in the machine +def create_path_if_not_exists(path): + if not os.path.exists(path): + os.makedirs(path) + return path + +# Get a specific logger with passed configurations +def get_logger(name, log_file, level=logging.DEBUG): + """ + Get logger for agent logging + """ + handler = logging.FileHandler(log_file) + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + +def complete_fwd_table(weights, invoc_rate_table): + + fwd_requests = {} + + # Transform the weights in requests by applying weights * invoc_rate_table + for node_from, weights_x_func in weights.items(): + fwd_requests[node_from] = {} + for func, weights_x_node in weights_x_func.items(): + fwd_requests[node_from][func] = {} + for node_to, weight in weights_x_node.items(): + fwd_requests[node_from][func][node_to] = int(weight * invoc_rate_table[node_from][func]) + + + # Fill forwarding table with missing functions + for node_from, weights_x_func in fwd_requests.items(): + for f in config_manager.FUNCTION_NAMES: + if f not in list(weights_x_func.keys()): + fwd_requests[node_from][f] = {} + + # Set with all the nodes of the simulation + nodes_set = set(fwd_requests.keys()) + + # Complete the forwarding table with all the missing nodes to forward requests + for node_from, weights_x_func in fwd_requests.items(): + for func, weights_x_node in weights_x_func.items(): + if func in config_manager.FUNCTION_NAMES: + for node in nodes_set: + if node not in list(weights_x_node.keys()): + fwd_requests[node_from][func][node] = 0 + return fwd_requests + +def create_tables(fwd_requests, minute, strategy_type): + """ + Starting by forwarding requests create a table and export it in a CSV file + Also invocation rate and max rate table are create and exported in the same + format + """ + path = config_manager.SIMULATION_TABLES_OUTPUT_PATH.joinpath(strategy_type, "minute_" + str(minute)) + create_path_if_not_exists(path) + nodes_set = sorted(set(fwd_requests.keys())) + + # Foreach function and for each node create a dataframe with forwarded requests + for func in config_manager.FUNCTION_NAMES: + df_x_func = pd.DataFrame([], index=nodes_set) + for node_from in fwd_requests: + df_x_func[node_from] = [fwd_requests[node_from][func][k] + for k in sorted(fwd_requests[node_from][func].keys())] + # Invert rows and columns + df_x_func = df_x_func.T + df_x_func.to_csv(path.joinpath(func + ".csv"), sep='\t', encoding='utf-8') + + print(" > FWD_TABLE FOR FUNC {}".format(func)) + print(df_x_func) + + +def run_agent(agent): + """ + Run agent loop, calculate execution time and return it along with weights + """ + # time.perf_counter() returns elapsed time in seconds + # It is the best way to measure performance + # + # See: https://www.geeksforgeeks.org/time-perf_counter-function-in-python/ + start = time.perf_counter() + weights = agent.run() + end = time.perf_counter() + execution = end - start + + return weights, execution + + +def simulation(nodes_number, config_file, model_type): + """ + This function allow to simulate various strategies for workload distribution + and use weights to distribuite the load across neighbours + """ + model_proxy = ModelProxy() + model_proxy.set_model_type(model_type) + + # Execution time dictionary + execution_times = {} + + # Initialize execution time map for each strategy + for s in config_manager.STRATEGIES: + execution_times[s] = [] + + for minute in range(0, config_manager.SIMULATION_MINUTES): # 6 minutes + # Dictionary that contains final json configuration of the current minute + minute_config = {} + + # Dictionaries used for export + simulation_weights_table = {} + simulation_invoc_rate_table = {} + + # Forwarding requests dictionary + fwd_requests = {} + + # Initialize maps for each strategy + for s in config_manager.STRATEGIES: + simulation_weights_table[s] = {} + fwd_requests[s] = {} + + # Create global configuration file with info of all nodes + for i in range(0, nodes_number): + key = config_manager.NODE_KEY_PREFIX + str(i) + minute_config[key] = {} + + # Add the information about the node type and the neighbours of the current node + minute_config[key]["node_type"] = config_file[key]["node_type"] + minute_config[key]["neighbours"] = config_file[key]["neighbours"] + + # Add the information about the load of the current minute + minute_config[key]["load"] = config_file[key]["load"][minute] + + simulation_invoc_rate_table[key] = {} + + features_data = {} + for k, value in minute_config[key]["load"].items(): + # Extracts the load of each group + features_data[k] = value["total_rate"] + + # Iterate over the functions of the group + for j in range(0, len(minute_config[key]["load"][k]["functions"])): + # Populate the dicitonary which contains the load of each function deployed on the node + simulation_invoc_rate_table[key][value["functions"][j]["function_name"]] = value["functions"][j]["function_rate"] + + # Insert the information about the node type in the features dict + features_data["node_type"] = config_manager.NODES_TYPES_IN_MODELS[minute_config[key]["node_type"]] + + # Get node predictions for the selected features + node_predictions = model_proxy.get_node_predictions(features_data) + + # Add node metrics to minute_config + minute_config[key]["node_metrics"] = node_predictions.to_dict(orient='records')[0] + + + + print("----------------------------------------------------------") + + + with open(create_path_if_not_exists(config_manager.SIMULATION_COMPLETE_CONFIGURATION_OUTPUT_PATH).joinpath( + 'config{}.json'.format(minute)), 'w', encoding='utf-8') as f: + json.dump(minute_config, f, ensure_ascii=False, indent=4) + + + # Call agent loop for each config that has been previously built + # + # With last update this code is executed for each type of behaviour + # (base, random and empirical) and for each agent in the network + for id in range(0, nodes_number): + key = config_manager.NODE_KEY_PREFIX + str(id) + # config_with_neigh = {} + # config_with_neigh[key] = minute_config[key] # Add this node + # neighbours = config_file[key]["neighbours"] + + # # Create configuration file with only neighbours + # for neighbour in neighbours: + # config_with_neigh[neighbour] = minute_config[neighbour] + + logger = get_logger( + "agent" + str(id) + "_minute_" + str(minute), + create_path_if_not_exists(config_manager.SIMULATION_AGENT_LOGGING_BASE_PATH).joinpath("agent_" + + str(id) + ".log"), + logging.INFO + ) + + logger.info("\n") + logger.info("-------- MINUTE {} --------".format(minute)) + + # Execute agent loop for each strategy + for s in config_manager.STRATEGIES: + # Build correct strategy + strategy = StrategyFactory.create_strategy(s, key, minute_config) + logger.info(" > STRATEGY: {} <".format(s)) + agent = Agent( + id, + logger, + strategy, + model_proxy + ) + #agent.disable_logging() # Disable logging for speed + weights, execution_time = run_agent(agent) + execution_times[s].append(execution_time) + simulation_weights_table[s][key] = weights + for s in config_manager.STRATEGIES: + fwd_requests[s] = complete_fwd_table(simulation_weights_table[s], simulation_invoc_rate_table) + + print("> START MINUTE {}".format(minute)) + + for s in config_manager.STRATEGIES: + # Create and export tables for three algorithms + print(" > {}".format(s)) + create_tables(fwd_requests[s], minute, s) + print("------------------------------------------------") + + print("> END MINUTE {}".format(minute)) + + return {k: np.mean(times_for_algo) for k, times_for_algo in execution_times.items()} + + +def main(instance_file=""): + kargs = get_analyzer_and_simulator_args() + model_type = kargs["modeltype"] + + # Instance file can come from simulation_controller + if instance_file == "": + instance_file = config_manager.OUTPUT_INSTANCE_JSON_FILE_PATH + + # Read the instance json produced by the instance generator + f = open(instance_file) + config_file = json.load(f) + simulation(config_file["nodes_number"], config_file, model_type) + + + +# Call main program. +if __name__ == "__main__": + main() diff --git a/framework/simulation_controller.py b/framework/simulation_controller.py new file mode 100644 index 0000000..b1b007e --- /dev/null +++ b/framework/simulation_controller.py @@ -0,0 +1,141 @@ +import instance_generator +import simulation +import analyzer +import pandas as pd +import numpy as np +import random +from utils.utils import * +from cli.cli import get_simulation_controller_args +from configuration.config_manager import ConfigManager +import subprocess + +config_manager = ConfigManager() + +# Create the specific path if not present in the machine +def create_path_if_not_exists(path): + if not os.path.exists(path): + os.makedirs(path) + return path + +def main(): + # Get cli args + kargs = get_simulation_controller_args() + print(kargs) + + max_percentage_values = kargs["overloaded"] + if max_percentage_values == None: + max_percentage_values = [30, 60, 90] + n_of_experiments = kargs["expnum"] + if n_of_experiments == -1: + n_of_experiments = 1 + + model_type = kargs["modeltype"] + # Final dataframe containing data for each experiment of the simulation + final_df = pd.DataFrame() + + # Create folder with timestamp as name for storing + # simulation data + dir_path = create_timestamp_folder(config_manager.SIMULATION_CONTROLLER_ARCHIVE_PATH) + for i in range(1, n_of_experiments + 1): + exp_path = dir_path.joinpath("exp_{}".format(i)) + + seed = kargs["seed"] + if seed == -1: + seed = np.random.randint(1000000) + print("Experiment" + str(i)) + for percentage in max_percentage_values: + print("Percentage: " + str(percentage)) + # Create a dir for each iteration of the simulation + path = exp_path.joinpath("simulation_with_load_{}".format(percentage)) + os.makedirs(path) + + # 1) Generate instance configuration using the passed parameters + print("> STEP 1 - Generating instance configuration...") + subprocess.run(["python", "./instance_generator.py", "-n", str(kargs["nodesnum"]), "-p", str(kargs["edgeprob"]), "-s", str(seed), "-o", str(percentage)]) + + # Move instance generator output to final folder + copy_dir(config_manager.OUTPUT_INSTANCE_PATH, path) + + # Before simulations starts, remove all agent logs file from base foulder + remove_dir_content(config_manager.SIMULATION_AGENT_LOGGING_BASE_PATH) + + + # 2) Single simulation based on configuration file generated before + print("> STEP 2 - Simulation of instance...") + subprocess.run(["python", "./simulation.py", "-m", model_type]) + + + # 3) Analyze simulation output + print("> STEP 3 - Analyze output...") + subprocess.run(["python", "./analyzer.py", "-m", model_type]) + + + # Move analyzer output files to final folder (separated for each iteration) + copy_dir(config_manager.ANALYZER_OUTPUT_PATH, path) + + # Copy to this foulder also simulation results (weights for each agent) + # Also clean src dir of all content (avoiding file overwriting) + copy_dir(config_manager.SIMULATION_AGENT_LOGGING_BASE_PATH, path) + remove_dir_content(config_manager.SIMULATION_AGENT_LOGGING_BASE_PATH) + + #time.sleep(2) + + # 4) Load analyzer output file that contains index for comparison + print("> STEP 4 - Load Index df...") + df = pd.read_csv(config_manager.INDEX_COMPARISON_FILE, + delimiter=',', header=0, index_col=0) + df.reset_index(drop=False, inplace=True, names = "Strategy") + #df.index.name = "strategy" + percentage_values = [percentage] * len(list(df.index.values)) + df.insert(0, 'Max overloaded percentage', percentage_values) + #df.set_index(["Max overloaded percentage", df.index], inplace=True) + + + seed_values = [seed] * len(list(df.index.values)) + df.insert(0, 'Seed', seed_values) + #print(df) + + if final_df.empty: + final_df = df + else: + final_df = pd.concat([final_df, df]) + + #print(final_df) + # time.sleep(5) + + # 5) Export final results comparison table + print("> STEP 5 - Export final results table...") + #print(final_df) + + create_path_if_not_exists(config_manager.SIMULATION_CONTROLLER_OUTPUT_PATH) + final_df.to_csv(config_manager.SIMULATION_CONTROLLER_OUTPUT_FILE, sep=',', encoding='utf-8', index=False) + #print(final_df) + + #df.reset_index(drop=False, inplace=True) + # Columns to exclude + columns_to_exclude = ['Seed'] + + # Drop the columns to exclude + df_filtered = final_df.drop(columns=columns_to_exclude) + #print(df_filtered) + + # Group by 'strategy' and 'percentage', then calculate the mean of other columns + mean_df = df_filtered.groupby(['Max overloaded percentage', 'Strategy']).mean() + mean_df.reset_index(drop=False, inplace=True, names = ["Max overloaded percentage", "Strategy"]) + mean_df.insert(0, 'N of experiments', pd.Series([n_of_experiments], index=[0])) + + mean_df.to_csv(config_manager.SIMULATION_CONTROLLER_OUTPUT_MEAN_FILE, sep=',', encoding='utf-8', index=False) + + + # Move instance generator output to final folder + copy_dir(config_manager.SIMULATION_CONTROLLER_OUTPUT_PATH, dir_path) + + # Zip archive foulder with all previous gathered data + zip_foulder(dir_path, dir_path) + + # Remove dir previously zipped + remove_dir_with_content(dir_path) + +# Call main program. +if __name__ == "__main__": + main() diff --git a/framework/utils/__init__.py b/framework/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/framework/utils/change-keys.py b/framework/utils/change-keys.py new file mode 100644 index 0000000..f7d3642 --- /dev/null +++ b/framework/utils/change-keys.py @@ -0,0 +1,31 @@ +import os +import sys +import json + +def dict_key_substitution(data, old, new): + """ + Utility function used to substitute dictionary key + """ + data[new] = data[old] + del data[old] + + +path = sys.argv[1] +print(path) + +json_files = [pos_json for pos_json in os.listdir(path) if pos_json.endswith('.json')] + +print(json_files) + +for exp in json_files: + exp = os.path.join(path, exp) + f = open(exp) + json_doc = json.load(f) # Return json file as a dictionary + + dict_key_substitution(json_doc["input"], "funcb_num", "qrcode_num") + dict_key_substitution(json_doc["input"], "funcc_num", "ocr_num") + dict_key_substitution(json_doc["input"], "funcb_wl", "qrcode_wl") + dict_key_substitution(json_doc["input"], "funcc_wl", "ocr_wl") + + with open(exp, 'w', encoding='utf-8') as f: + json.dump(json_doc, f, ensure_ascii=False, indent=4) diff --git a/framework/utils/utils.py b/framework/utils/utils.py new file mode 100644 index 0000000..bac7045 --- /dev/null +++ b/framework/utils/utils.py @@ -0,0 +1,78 @@ +import os +import shutil +from datetime import datetime + +def create_timestamp_folder(base_path): + """ + This function create a directory with timestamp as name + """ + dir_path = base_path.joinpath(datetime.now().strftime('%Y-%m-%d_%H-%M-%S')) + + mydir = os.path.join( + os.getcwd(), + dir_path + ) + try: + os.makedirs(mydir) + except OSError as e: + if e.errno != errno.EEXIST: + raise # This was not a "directory exist" error.. + return dir_path + + +def copy_file(full_file_name, dest): + """ + Copy file [full_file_name] to [dest] + """ + shutil.copy(full_file_name, dest) + + +def copy_dir(src, dest): + """ + Copy content of directory [src] to directory [dest] + """ + src_files = os.listdir(src) + for file_name in src_files: + full_file_name = os.path.join(src, file_name) + if os.path.isfile(full_file_name): + copy_file(full_file_name, dest) + + +def remove_file(path): + """ + Remove an existing file + """ + os.remove(path) + + +def remove_dir_content(dir): + """ + Remove all files from a directory [dir] + """ + files = os.listdir(dir) + for file_name in files: + path = os.path.join(dir, file_name) + if os.path.isfile(path): + remove_file(path) + + +def remove_dir_with_content(dir): + """ + Remove dir [dir] along with all its files + """ + shutil.rmtree(dir) + + +def zip_foulder(dir, out_path, format="zip"): + """ + Zip foulder specified by [dir] in [out_path] using [format] format + Default format is "zip" + """ + shutil.make_archive(out_path, format, dir) + +def flatten(t): + """ + Flat list [t] passed as param + Returns a new flatten list + """ + return [item for sublist in t for item in sublist] diff --git a/images/framework/agent.png b/images/framework/agent.png new file mode 100644 index 0000000..16abcbe Binary files /dev/null and b/images/framework/agent.png differ diff --git a/images/framework/comparison_curl.png b/images/framework/comparison_curl.png new file mode 100644 index 0000000..bcb18bd Binary files /dev/null and b/images/framework/comparison_curl.png differ diff --git a/images/framework/component_diagram.png b/images/framework/component_diagram.png new file mode 100644 index 0000000..d6330fd Binary files /dev/null and b/images/framework/component_diagram.png differ diff --git a/images/framework/model_class.png b/images/framework/model_class.png new file mode 100644 index 0000000..936868f Binary files /dev/null and b/images/framework/model_class.png differ diff --git a/images/framework/sequence_simulator.png b/images/framework/sequence_simulator.png new file mode 100644 index 0000000..a3d2502 Binary files /dev/null and b/images/framework/sequence_simulator.png differ diff --git a/images/framework/simulation_controller.png b/images/framework/simulation_controller.png new file mode 100644 index 0000000..ba28ecd Binary files /dev/null and b/images/framework/simulation_controller.png differ