diff --git a/change_analyzer/agents/explorer_agent.py b/change_analyzer/agents/explorer_agent.py index cb7d51c..9d5cf7c 100644 --- a/change_analyzer/agents/explorer_agent.py +++ b/change_analyzer/agents/explorer_agent.py @@ -1,7 +1,6 @@ import glob import os import re -import time import sys import pandas as pd @@ -23,9 +22,8 @@ class ExplorerAgent(Agent): SEQUENCE_COLUMNS = ['Screen', 'Image', 'PageSource', 'AvailableActions'] TARGET_DF_COLUMNS = ['ActionImage', 'ActionText', 'DistanceFromCenter', 'DistanceFromTopLeftCorner', 'PreviousSteps', 'ActionImageMainColorR', 'ActionImageMainColorG', 'ActionImageMainColorB'] - # ActiveScreen is removed from TARGET_DF_COLUMNS MODEL_DF_COLUMNS = TARGET_DF_COLUMNS + ['Reward'] - INITIAL_STEP = [] # previously ["Main view"] + INITIAL_STEP = [] # We need to set numpy print options to maximum, in order to avoid truncating numpy arrays np.set_printoptions(threshold=sys.maxsize) @@ -37,12 +35,11 @@ def __init__(self, env: gym.Env, total_steps: int = randrange(10)) -> None: key=os.path.getmtime) self.config = self.read_config_file() # Initialize needed dataframes and model - # Target dataframe - an intermediate dataframe which will hold the available actions - # from current screen (used to make predictions) so starting_screen is the default initialization - # Model dataframe - a dataframe that contains new training data for the Ludwig model - + # Target dataframe - dataframe which will hold the available actions from current screen (used for predictions) self.target_df = pd.DataFrame(columns=ExplorerAgent.TARGET_DF_COLUMNS) + # Model dataframe - a dataframe that contains new training data self.model_df = pd.DataFrame(columns=ExplorerAgent.MODEL_DF_COLUMNS) + # Ludwig model - initially none self.model = None self.image_before = Image.new(mode='RGB', size=(0, 0)) @@ -53,26 +50,19 @@ def __init__(self, env: gym.Env, total_steps: int = randrange(10)) -> None: self.previous_screen = "" self.sequence_steps = ExplorerAgent.INITIAL_STEP - # To avoid ValueError: tf.function-decorated function tried to create variables on non-first call' - # tf.config.run_functions_eagerly(True) - def run(self) -> None: for i in range(self.total_steps): - # print("Starting step", i + 1) try: action = self.get_action() - print(f"Perform {str(action)}") element_to_use = str(action).replace('click on ', '') # Before performing the step, we need to collect the data (session may be lost once we perform the step) - # Get image from the current screen, image_before - as numpy array self.image_before = Image.fromarray(self.env.render("rgb_array")) # Get page_source_before, using SequenceRecorder class self.page_source_before = self.sequence_recorder_obj.get_enriched_page_source(action.el.parent) - # print("page_source_before within try:", self.page_source_before) obs, _, done, info = self.env.step(action) @@ -89,12 +79,9 @@ def run(self) -> None: step_reward = self.reward(self.image_before, self.image_after, self.sequence_steps) # Update model dataframe - print("Update model dataframe") - self.update_model_df(available_actions=available_actions, - element_to_use=element_to_use, + self.update_model_df(element_to_use=element_to_use, step_reward=step_reward, sequence_steps=self.sequence_steps) - # self.update_model_df(available_actions, element_to_use, step_reward, self.sequence_steps) # Update model config self.update_config() @@ -115,15 +102,14 @@ def run(self) -> None: except Exception as e: self._logger.info("Action couldn't be performed due to an exception") self._logger.info(e) - # We assume that the step failed and we are outside the SUT. + # We assume that the step failed, and we are outside the SUT. # We need to reset the self.image_after and self.page_source_after as there is nothing after self.image_after = Image.new(mode='RGB', size=(0, 0)) self.page_source_after = None step_reward = self.reward(self.image_before, self.image_after, self.sequence_steps) - self.update_model_df(available_actions=[], - element_to_use=element_to_use, + self.update_model_df(element_to_use=element_to_use, step_reward=step_reward, sequence_steps=self.sequence_steps) # We have no available actions done = True @@ -134,20 +120,7 @@ def run(self) -> None: break # We need to ensure that we save the model Dataframe - self.save_model_dataframe() - self._logger.info( - f"Episode done in {self.total_steps} steps, total reward {self.total_reward}" - ) - - def get_model_df(self): - pass - - @staticmethod - def show_pil_image(pil_image: Image): - print("Show image") - pil_image.show() - time.sleep(10) def get_action(self): """ @@ -157,13 +130,11 @@ def get_action(self): """ if self.model: return self.predict_action() - return self.env.action_space.sample() def predict_action(self): # Make predictions using targeted dataframe and current trained model predictions, _ = self.model.predict(self.target_df) - print("Predictions\n", predictions) # Get a prediction (action with the highest reward) prediction_id = predictions['Reward_predictions'].idxmax() @@ -179,64 +150,26 @@ def read_config_file(self): return config_content @staticmethod - def reward(image_before: Image, image_after: Image, sequence_steps): + def reward(image_before: Image, image_after: Image, sequence_steps: List[str]) -> int: """Reward function used for each performed step. Currently, a naive aproach is used.""" # How about a Class with Rewards, for different purposes? # Convert images from PIL to numpy.ndarray - print('Calculate the step reward') image_before = np.array(image_before) image_after = np.array(image_after) penalty = 0 if len(sequence_steps) > 2: # Check if the last 2 pairs of sequence steps are the same if sequence_steps[0:2] == sequence_steps[-3:-1]: - print("We need to establish a penalty") + # We need to establish a penalty penalty = 100000 if image_before.size != image_after.size: - # We are not in same window anymore - print("Images don't have same size - reward 0") + # We are not in same window anymore, reward 0 return 0 else: non_zero = np.count_nonzero(image_after - image_before) return non_zero - penalty - @staticmethod - def debug_print(my_str_var): - try: - print(f'{my_str_var}:', {eval(my_str_var)}) - except: - print(f'Printing {my_str_var} failed') - - def update_model_df_debug(self, available_actions: List, element_to_use: str, step_reward: int, sequence_steps: List): - print("==== Update model dataframe ====") - # print("Model DF before the update:\n", self.model_df) - - model_df_for_action = pd.DataFrame( - [ - [ - None, - element_to_use, - None, - None, - str(sequence_steps), # previous steps - None, - None, - None, - step_reward - ] - ], - columns=ExplorerAgent.MODEL_DF_COLUMNS - ) - - # self.model_df = self.model_df.append(model_df_for_action, ignore_index=True) - # We need to have the PreviousSteps as string, - # otherwise the list will get appended too and we will have only last sequence everywhere - self.model_df.loc[:, "PreviousSteps"] = self.model_df.loc[:, "PreviousSteps"].astype("str") - self.model_df = pd.concat([self.model_df, model_df_for_action], ignore_index=True) - print("==== Model DF updated ====") - # print("Model DF after the update:\n", self.model_df) - - def update_model_df(self, available_actions: list, element_to_use: str, step_reward: int, sequence_steps: List): + def update_model_df(self, element_to_use: str, step_reward: int, sequence_steps: List): """Create model_df for performed action The Dataframe consists of: - 'ActionImage': the image of the performed action (for instance the image of a button) @@ -251,21 +184,12 @@ def update_model_df(self, available_actions: list, element_to_use: str, step_rew - 'Reward': the reward received by performing the action (step_reward) """ - # self.debug_print('self.page_source_before') - # self.debug_print('self.active_screen') - # self.debug_print('self.image_before') - # Get ActionImage # First we need the coordinates for the action image, from self.page_source_before action_coordinates = self.get_action_coords(self.page_source_before, element_to_use) # Use coordinates to extract the action image from previous screen, self.image_before - # Originally action_img was numpy.ndarray. - # Due to migration of Ludwig to 0.5, it has to be converted to a tensor action_img = self.get_action_image(self.image_before, action_coordinates) - # print('Action img num of channels:', action_img.shape, len(action_img.shape)) - # action_img_tensor = torch.from_numpy(self.get_action_image(self.image_before, - # action_coordinates)).permute(2, 0, 1) # Get DistanceFromCenter distance_from_center = self.get_distance_from_center(action_coordinates, self.image_before) @@ -273,9 +197,6 @@ def update_model_df(self, available_actions: list, element_to_use: str, step_rew # GetDistanceFromTopLeftCorner distance_from_top_left_corner = self.get_distance_from_top_left_corner(action_coordinates) - # Get ActiveScreen - # active_screen_text = self.sequence_steps[-1] # The screen the action originates from - # Get RGB from main color of the action_image r, g, b = self.get_main_color_from_action_image(action_img) @@ -286,7 +207,6 @@ def update_model_df(self, available_actions: list, element_to_use: str, step_rew element_to_use, distance_from_center, distance_from_top_left_corner, - # active_screen_text, str(sequence_steps), # previous steps r, g, @@ -303,7 +223,6 @@ def update_model_df(self, available_actions: list, element_to_use: str, step_rew @staticmethod def get_main_color_from_action_image(action_image): - # print('Get main color') # Convert from numpy array to PIL image action_image = Image.fromarray(action_image) max_colors = action_image.width * action_image.height @@ -311,14 +230,13 @@ def get_main_color_from_action_image(action_image): return colors[0][1] @staticmethod - def get_action_image(screen_img: Image, action_coordinates: Tuple): + def get_action_image(screen_img: Image, action_coordinates: Tuple) -> np.ndarray: """Get action image from given screen image, based on coordinates""" x = action_coordinates[0] y = action_coordinates[1] h = action_coordinates[2] w = action_coordinates[3] action_img = screen_img.crop((x, y, x + w, y + h)) - # Let's return a numpy array return np.asarray(action_img) @staticmethod @@ -341,7 +259,7 @@ def get_action_coords(page_source: str, action: str): return x, y, h, w @staticmethod - def get_distance_from_center(action_coordinates, screen_img): + def get_distance_from_center(action_coordinates, screen_img) -> int: """The distance is calculated between the center of the action image and the center of the screen image x1, y1 are the coordinates within screen image of action image's center x2, y2 are the coordinates of the screen image's center @@ -356,7 +274,7 @@ def get_distance_from_center(action_coordinates, screen_img): return int(math.sqrt(math.pow(x1 - x2, 2) + math.pow(y1 - y2, 2))) @staticmethod - def get_distance_from_top_left_corner(action_coordinates): + def get_distance_from_top_left_corner(action_coordinates) -> int: """The distance is calculated between the center of the action image and the top left corner of the screen image x1, y1 are the coordinates within screen image of action image's center x2, y2 are the coordinates of the screen image's top left corner which in this case are 0 and 0 @@ -380,11 +298,7 @@ def get_max_dims(images): return max_h, max_w def update_config(self): - print("Update model config") # Get all action images from model_df as PIL images - # Actions images are now torch tensors and here are converted to numpy arrays - # Permute tensor channels back from 2,0,1 to 1,2,0 - # action_images = [Image.fromarray(action_image.permute(1, 2, 0).numpy()) for action_image in self.model_df['ActionImage'].tolist()] action_images = [Image.fromarray(action_image) for action_image in self.model_df['ActionImage'].tolist()] # Find the maximum height and width from images @@ -408,19 +322,9 @@ def update_target_df(self, available_actions, seq_steps): - 'ActionImageMainColorG': green from main color of the 'ActionImage' - 'ActionImageMainColorB: blue from main color of the 'ActionImage' """ - print("Create target_df for Ludwig model") - - # Get future screens from AvailableActions -> in func param # We need to convert all available_actions from 'click on ' to '' available_actions = [str(action).replace('click on ', '') for action in available_actions] - # print('Available actions:', available_actions) - # available_actions = current_action_df['AvailableActions'].iloc[0] - - # Get ActiveScreen -> self.active_screen - # print("Get ActiveScreen:", self.active_screen) - # active_screen = current_action_df['Screen'].iloc[0] - # Get distances distances_from_center = [] distances_from_top_left_center = [] @@ -431,33 +335,23 @@ def update_target_df(self, available_actions, seq_steps): distances_from_center.append(self.get_distance_from_center(action_coords, self.image_after)) distances_from_top_left_center.append(self.get_distance_from_top_left_corner(action_coords)) - # print(f"Distances\nFrom center: {distances_from_center} \nFrom top-left: {distances_from_top_left_center}") - action_images = self.get_actions_images_from_screen(self.image_after, self.page_source_after, available_actions) - # print(action_images) main_color_rs = [] main_color_gs = [] main_color_bs = [] for action_image in action_images: - # r, g, b = self.get_main_color_from_action_image(Image.fromarray(action_image)) - # action_image is converted to Image.fromarray (numpy.ndarray) within get_main_color function r, g, b = self.get_main_color_from_action_image(action_image) main_color_rs.append(r) main_color_gs.append(g) main_color_bs.append(b) - # print(f"Main RGBs for action images \nRs: {main_color_rs} \nBs: {main_color_bs} \nGs: {main_color_gs}") - - # Create data for target_df target_data = { 'ActionImage': action_images, 'ActionText': available_actions, 'DistanceFromCenter': distances_from_center, 'DistanceFromTopLeftCorner': distances_from_top_left_center, - # 'ActiveScreen': [self.active_screen for a in available_actions], - 'PreviousSteps': [', '.join(map(str, seq_steps)) for a in available_actions], - # 'PreviousSteps': [', '.join(map(str, self.sequence_steps)) for a in available_actions], + 'PreviousSteps': [', '.join(map(str, seq_steps)) for _ in available_actions], 'ActionImageMainColorR': main_color_rs, 'ActionImageMainColorG': main_color_gs, 'ActionImageMainColorB': main_color_bs, @@ -465,7 +359,10 @@ def update_target_df(self, available_actions, seq_steps): self.target_df = pd.DataFrame(target_data) - def get_actions_images_from_screen(self, screen_image, page_source, actions): + def get_actions_images_from_screen(self, + screen_image: Image, + page_source: str, + actions: List[str]) -> List[np.ndarray]: """Get all actions images (PIL -> numpy array) from given screen as a list""" images = [] for action in actions: @@ -475,15 +372,10 @@ def get_actions_images_from_screen(self, screen_image, page_source, actions): return images def init_model(self): - print("Initialize Ludwig model") self.model = LudwigModel(self.config) - print('Model initialized') def train_model(self): - # print("Model Dataframe", self.model_df) - print('Train model') train_stats, _, _ = self.model.train(self.model_df) - print('Model trained') @staticmethod def create_string_in_form_of_numpy_array(cell) -> str: @@ -492,8 +384,7 @@ def create_string_in_form_of_numpy_array(cell) -> str: def save_model_dataframe(self) -> None: """Save model dataframe, to be used later on in future models""" csv_file = os.path.join(self.latest_recordings_folder, 'model_df.csv') - print("ActionImage sample:\n", self.model_df['ActionImage'].iloc[0]) - print("Convert column 'ActionImage' to string in form of numpy arrays") + + # Convert column 'ActionImage' to string in form of numpy arrays self.model_df['ActionImage'] = self.model_df['ActionImage'].apply(self.create_string_in_form_of_numpy_array) - print("Save the model dataframe") self.model_df.to_csv(csv_file, index=False)