diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4bc806a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM tensorflow/tensorflow + +COPY requirements.txt requirements.txt + +RUN pip install -r requirements.txt + +ENTRYPOINT [ "python" ] diff --git a/MCTS/__init__.py b/MCTS/__init__.py index 89c8672..442bab6 100644 --- a/MCTS/__init__.py +++ b/MCTS/__init__.py @@ -68,7 +68,7 @@ def simulate(self, state): self._Qsa[hash_][action] = (self._N(hash_, action) * self._Q(hash_, action) + value) / (self._N(hash_, action) + 1) self._Nsa[hash_][action] += 1 self._Ns[hash_] += 1 - return value + return -value def N(self, state, action=None): """Get number of visits during MCTS simulations diff --git a/Net/NNet.py b/Net/NNet.py index 1af95c9..3818cd3 100644 --- a/Net/NNet.py +++ b/Net/NNet.py @@ -4,6 +4,7 @@ """ import numpy as np import os +import tensorflow as tf from enum import Enum, auto @@ -20,7 +21,7 @@ class NeuralNets(Enum): """ class NNetWrapper: def __init__(self, board_size=(8,8), batch_size=32, epochs=10, - num_channels_1=128, num_channels_2=256, lr=0.001, dropout=0.3, network=NeuralNets.ONN): + num_channels_1=512, num_channels_2=256, lr=0.001, dropout=0.3, network=NeuralNets.ONN): ''' Inputs: board_size -> a Tuple with the size of the board (n,n) @@ -60,8 +61,11 @@ def train(self, examples, verbose=None): target_pis = np.asarray(target_pis) target_vs = np.asarray(target_vs) + log_dir = "logs/fit/" + f"onn-{self.board_size_x}" + tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1) + return self.nnet.model.fit(x=input_boards, y=[target_pis, target_vs], batch_size=self.batch_size, - epochs=self.epochs, verbose=verbose) + epochs=self.epochs, verbose=verbose, callbacks=[tensorboard_callback]) def predict(self, board): ''' @@ -84,22 +88,15 @@ def predict(self, board): # save weights def save_checkpoint(self, filepath): - if self.network_type == NeuralNets.ONN: - filepath += f'-onn-{self.board_size_x}.h5' - elif self.network_type == NeuralNets.BNN: - filepath += f'-bnn-{self.board_size_x}.h5' - self.nnet.model.save_weights(filepath) + self.nnet.model.save_weights(filepath, save_format='h5') # load saved weights def load_checkpoint(self, filepath): - if self.network_type == NeuralNets.ONN: - filepath += f'-onn-{self.board_size_x}.h5' - elif self.network_type == NeuralNets.BNN: - filepath += f'-bnn-{self.board_size_x}.h5' + assert filepath.endswith('.h5'), 'Expecting a file with .h5 as extension' self.nnet.model.load_weights(filepath) def copy(self): copy_wrapper = NNetWrapper((self.board_size_x, self.board_size_y), network=self.network_type) copy_wrapper.nnet.model.set_weights(self.nnet.model.get_weights()) return copy_wrapper - \ No newline at end of file + diff --git a/Net/OthelloNN.py b/Net/OthelloNN.py index adf2833..13b8cc0 100644 --- a/Net/OthelloNN.py +++ b/Net/OthelloNN.py @@ -35,31 +35,22 @@ def __init__(self, board_size, num_channels_1, num_channels_2, lr=0.001, dropout self.action_size = self.board_x * self.board_y self.learning_rate = lr self.dropout = dropout - self.num_channels_1 = num_channels_1 - self.num_channels_2 = num_channels_2 + self.num_channels = num_channels_1 + #self.num_channels_2 = num_channels_2 self.input_boards = Input(shape=(self.board_x, self.board_y, 2)) #shape (batch_size, board_x, board_y, 2) - - conv1 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels_1, 3, padding='same', use_bias=False)(self.input_boards))) #shape (batch_size, board_x, board_y, num_channels_1) - conv2 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels_1, 3, padding='same', use_bias=False)(conv1))) #shape (batch_size, board_x, board_y, num_channels_1) - special1 = MaxPooling2D((2, 2), strides=(2, 2), padding='valid')(conv2) #shape (batch_size, board_x/2, board_y/2, num_channels_1) - - conv3 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels_2, 3, padding='same', use_bias=False)(special1))) #shape (batch_size, board_x/2, board_y/2, num_channels_2) - conv4 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels_2, 3, padding='same', use_bias=False)(conv3))) #shape (batch_size, board_x/2, board_y/2, num_channels_2) - - flatten = Flatten()(conv4) #shape (batch_size, board_x/2 x board_y/2 x num_channels_2) - - #value side - v_dense1 = Dropout(self.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(512, use_bias=False)(flatten)))) # shape (batch_size x 512) - v_dense2 = Dropout(self.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(256, use_bias=False)(v_dense1)))) # shpe (batch_size x 256) - self.v = Dense(1, activation='tanh', name='v')(v_dense2) # shape (batch_size x 1) - - #pi side - pi_dense1 = Dropout(self.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(512, use_bias=False)(flatten)))) # shape (batch_size x 512) - pi_dense2 = Dropout(self.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(256, use_bias=False)(pi_dense1)))) # shape (batch_size x 256) - self.pi = Dense(self.action_size, activation='softmax', name='pi')(pi_dense2) # shape (batch_size x action_size) - self.pi = Reshape((self.board_x, self.board_y))(self.pi) + h_conv1 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels, 3, padding='same')(self.input_boards))) + h_conv2 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels, 3, padding='same')(h_conv1))) + h_conv3 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels, 3, padding='valid')(h_conv2))) + h_conv4 = Activation('relu')(BatchNormalization(axis=3)(Conv2D(self.num_channels, 3, padding='valid')(h_conv3))) + h_conv4_flat = Flatten()(h_conv4) + s_fc1 = Dropout(self.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(1024)(h_conv4_flat)))) + s_fc2 = Dropout(self.dropout)(Activation('relu')(BatchNormalization(axis=1)(Dense(512)(s_fc1)))) + pi = Dense(self.action_size, activation='softmax', name='pi')(s_fc2) + self.pi = Reshape((self.board_x, self.board_y), name='pi-reshaped')(pi) + self.v = Dense(1, activation='tanh', name='v')(s_fc2) self.model = Model(inputs=self.input_boards, outputs=[self.pi, self.v]) - self.model.compile(loss=['categorical_crossentropy','mean_squared_error'], optimizer=Adam(self.learning_rate)) + self.model.compile(loss=['categorical_crossentropy','mean_squared_error'], + optimizer=Adam(self.learning_rate, clipvalue=0.5)) diff --git a/Othello/random_agent.py b/Othello/random_agent.py deleted file mode 100644 index 2e507a4..0000000 --- a/Othello/random_agent.py +++ /dev/null @@ -1,13 +0,0 @@ -import random - -from Othello import OthelloGame, OthelloPlayer, BoardView - -def random_agent(game): - - ''' - Makes random moves - ''' - - possible_moves = game.get_valid_actions() - move = random.choice(tuple(possible_moves)) - game.play(move[0],move[1]) diff --git a/README.md b/README.md index 52b0bec..7e57c88 100644 --- a/README.md +++ b/README.md @@ -1 +1,9 @@ -# OthelloZero \ No newline at end of file +[![license](https://img.shields.io/badge/license-GPL%20v3.0-brightgreen.svg?style=flat-square)](https://github.com/Galtvam/OthelloZero/blob/main/LICENSE) +![LastCommit](https://img.shields.io/github/last-commit/Galtvam/OthelloZero?style=flat-square) +![CommitSinceLastversion](https://img.shields.io/github/commits-since/Galtvam/OthelloZero/0.0.1/master?label=Commits%20Since%20Lastest%20Version&style=flat-square) +# OthelloZero + +## How to create SSH private key: +```ssh-keygen -t rsa -f ~/.ssh/othello-zero -C othello-zero``` + +Add private-keys to Computer Engine metadatas diff --git a/agents.py b/agents.py new file mode 100644 index 0000000..c158781 --- /dev/null +++ b/agents.py @@ -0,0 +1,84 @@ +import random +import logging +import numpy as np + +from Net.NNet import NeuralNets +from Othello import OthelloGame, OthelloPlayer, BoardView + +from othelo_mcts import OthelloMCTS + + +class OthelloAgent: + def __init__(self, game): + self.game = game + + def play(self): + """Do an action on OthelloGame""" + raise NotImplementedError + + +class RandomOthelloAgent(OthelloAgent): + def play(self): + possible_moves = tuple(self.game.get_valid_actions()) + move = random.choice(possible_moves) + self.game.play(*move) + + +class GreedyOthelloAgent(OthelloAgent): + def play(self): + move_points = {} + possible_moves = tuple(self.game.get_valid_actions()) + points_before = game.get_players_points()[game.current_player] + board = self.game.board(BoardView) + + for move in possible_moves: + state = np.copy(self.game.board(BoardView)) + OthelloGame.flip_board_squares(state, game.current_playe, *move) + points = OthelloGame.get_board_players_points(state)[OthelloPlayer.BLACK] - points_before + move_points[move] = points + + greedy_move = max(move_points, key=move_points.get) + game.play(*greedy_move) + + +class NeuralNetworkOthelloAgent(OthelloAgent): + def __init__(self, game, neural_network, num_simulations, degree_exploration, temperature=0): + self.temperature = 0 + self.neural_network = neural_network + self.num_simulations = num_simulations + self.mcts = OthelloMCTS(game.board_size, neural_network, degree_exploration) + super().__init__(game) + + def play(self): + state = self.game.board(BoardView.TWO_CHANNELS) + for _ in range(self.num_simulations): + self.mcts.simulate(state, self.game.current_player) + + if self.game.current_player == OthelloPlayer.WHITE: + state = OthelloGame.invert_board(state) + + if self.neural_network.network_type is NeuralNets.ONN: + action_probabilities = self.mcts.get_policy_action_probabilities(state, self.temperature) + else: + action_probabilities = self.mcts.get_policy_action_probabilities( + self.game.board(), self.temperature) + + valid_actions = self.game.get_valid_actions() + best_action = max(valid_actions, key=lambda position: action_probabilities[tuple(position)]) + self.game.play(*best_action) + + +def duel_between_agents(game, agent_1, agent_2): + players_agents = { + OthelloPlayer.BLACK: agent_1, + OthelloPlayer.WHITE: agent_2 + } + + logging.info(f'Duel - Started') + while not game.has_finished(): + logging.info(f'Duel - Round: {game.round}') + agent = players_agents[game.current_player] + agent.play() + + winner, points = game.get_winning_player() + return players_agents[winner], points diff --git a/gcloud-startup-script.sh b/gcloud-startup-script.sh new file mode 100644 index 0000000..aaafe53 --- /dev/null +++ b/gcloud-startup-script.sh @@ -0,0 +1,56 @@ +#! /bin/bash + +LOGIN_USER=othello-zero +STARTUP_SUCCESS_FILE=/home/$LOGIN_USER/.ran-startup-script + +if test ! -f "$STARTUP_SUCCESS_FILE"; then + echo "$STARTUP_SUCCESS_FILE does not exist. running startup..." + + # add user + sudo useradd -m $LOGIN_USER + + # no more 'sudo docker' after this + sudo groupadd docker + sudo usermod -aG docker $LOGIN_USER + newgrp docker + + # make sure docker-credential-gcloud is in PATH + # https://stackoverflow.com/questions/54494386/gcloud-auth-configure-docker-on-gcp-vm-instance-with-ubuntu-not-setup-properly + sudo ln -s /snap/google-cloud-sdk/current/bin/docker-credential-gcloud /usr/local/bin + + # make gcloud docker's credential helper + sudo -u $LOGIN_USER bash -c 'gcloud auth configure-docker --quiet' + + # host machine requires nvidia drivers. tensorflow image should contain the rest required + wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/cuda-ubuntu1804.pin + sudo mv cuda-ubuntu1804.pin /etc/apt/preferences.d/cuda-repository-pin-600 + sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/7fa2af80.pub + sudo add-apt-repository "deb http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/ /" + sudo apt-get update && sudo apt-get install -y cuda-drivers + + # install docker + sudo apt-get update && apt-get install -y \ + apt-transport-https \ + ca-certificates \ + curl \ + gnupg-agent \ + software-properties-common + + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - + sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" + sudo apt-get update && sudo apt-get install -y docker-ce docker-ce-cli containerd.io + + # install nvidia docker support + distribution=$(. /etc/os-release;echo $ID$VERSION_ID) + curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add - + curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list + sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit + sudo systemctl restart docker + + docker pull igorxp5/othello-zero + + # create file which will be checked on next reboot + touch /home/$LOGIN_USER/.ran-startup-script +else + echo "$STARTUP_SUCCESS_FILE exists. not running startup script!" +fi \ No newline at end of file diff --git a/gcloud.py b/gcloud.py new file mode 100644 index 0000000..b337ee8 --- /dev/null +++ b/gcloud.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python + +# Copyright 2015 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Example of using the Compute Engine API to create and delete instances. +Creates a new compute engine instance and uses it to apply a caption to +an image. + https://cloud.google.com/compute/docs/tutorials/python-guide +For more information, see the README.md under /compute. +""" + +import os +import time +import tarfile +import logging +import pathlib +import argparse +import subprocess + +import paramiko +import googleapiclient.discovery + +from paramiko.client import SSHClient + + + +SSH_USER = 'othello-zero' +REMOTE_HOME = f'/home/{SSH_USER}' +DEFAULT_ZONE = 'southamerica-east1-b' +INSTANCE_LABEL = 'othello-zero', 'true' +INSTANCE_NAME = 'othello-zero-worker-{}' +STARTUP_SCRIPT = './gcloud-startup-script.sh' + +# [START list_instances] +def list_instances(compute, project, zone): + result = compute.instances().list(project=project, zone=zone).execute() + return result['items'] if 'items' in result else None +# [END list_instances] + + +# [START create_instance] +def create_instance(compute, project, zone, name): + # Get the latest Debian Jessie image. + image_response = compute.images().getFromFamily( + project='ubuntu-os-cloud', family='ubuntu-2004-lts').execute() + source_disk_image = image_response['selfLink'] + + # Configure the machine + machine_type = "zones/%s/machineTypes/n1-standard-1" % zone + startup_script = open( + os.path.join( + os.path.dirname(__file__), STARTUP_SCRIPT), 'r').read() + + config = { + 'name': name, + 'machineType': machine_type, + + # Specify the boot disk and the image to use as a source. + 'disks': [ + { + 'boot': True, + 'autoDelete': True, + 'initializeParams': { + 'sourceImage': source_disk_image, + } + } + ], + + 'labels': { + INSTANCE_LABEL[0]: INSTANCE_LABEL[1] + }, + + # Specify a network interface with NAT to access the public + # internet. + 'networkInterfaces': [{ + 'network': 'global/networks/default', + 'accessConfigs': [ + {'type': 'ONE_TO_ONE_NAT', 'name': 'External NAT'} + ] + }], + + # Allow the instance to access cloud storage and logging. + 'serviceAccounts': [{ + 'email': 'default', + 'scopes': [ + 'https://www.googleapis.com/auth/devstorage.read_write', + 'https://www.googleapis.com/auth/logging.write' + ] + }], + + # Metadata is readable from the instance and allows you to + # pass configuration from deployment scripts to instances. + 'metadata': { + 'items': [{ + # Startup script is automatically executed by the + # instance upon startup. + 'key': 'startup-script', + 'value': startup_script + }] + } + } + + return compute.instances().insert( + project=project, + zone=zone, + body=config).execute() +# [END create_instance] + + +# [START delete_instance] +def delete_instance(compute, project, zone, name): + return compute.instances().delete( + project=project, + zone=zone, + instance=name).execute() +# [END delete_instance] + + +# [START search_instance] +def search_instances(compute, project, zone, label_key, label_value): + result = compute.instances().list( + project=project, + zone=zone, + filter=f'labels.{label_key}={label_value}').execute() + return result['items'] if 'items' in result else [] +# [END delete_instance] + + +# [START get_instance] +def get_instance(compute, project, zone, instance_name): + result = compute.instances().get( + project=project, + zone=zone, + instance=instance_name).execute() + return result +# [END get_instance] + + +# [START wait_for_operation] +def wait_for_operation(compute, project, zone, operation): + print('Waiting for operation to finish...') + while True: + result = compute.zoneOperations().get( + project=project, + zone=zone, + operation=operation).execute() + + if result['status'] == 'DONE': + print("done.") + if 'error' in result: + raise Exception(result['error']) + return result + + time.sleep(1) +# [END wait_for_operation] + + +def self_upload_to_instance(instance, key_filename): + print('uploading projects files to instance...') + + ip = instance['networkInterfaces'][0]['accessConfigs'][0]['natIP'] + + files = subprocess.check_output('git ls-files', shell=True, text=True).splitlines() + + client = ssh_connection(ip, key_filename) + sftp = client.open_sftp() + remote_filepath = os.path.join(REMOTE_HOME, 'files.tar.xz') + tar_fileobj = sftp.open(remote_filepath, mode='w') + tar_file = tarfile.open(fileobj=tar_fileobj, mode='w:xz') + for filepath in files: + local_path = os.path.join(os.getcwd(), filepath) + tar_file.add(local_path, arcname=filepath) + tar_file.close() + tar_fileobj.close() + sftp.close() + client.exec_command(f'tar -xf files.tar.xz') + client.exec_command(f'rm files.tar.xz') + client.close() + +def get_instance_external_ip(instance): + return instance['networkInterfaces'][0]['accessConfigs'][0]['natIP'] + + +def get_instance_internal_ip(instance): + return instance['networkInterfaces'][0]['networkIP'] + + +def wait_for_instance_startup_script(instance, key_filename): + ip = get_instance_external_ip(instance) + client = ssh_connection(ip, key_filename) + stdin, stdout, stderr = client.exec_command( + 'ps aux | grep -v grep | grep "/bin/bash /startup" | awk \'{print $2}\'') + lines = stdout.readlines() + if not lines: + return + pid = lines[0].strip() + stdin, stdout, stderr = client.exec_command(f'sudo cat /proc/{pid}/fd/1 > /dev/null') + stdout.channel.recv_exit_status() + + +def ssh_connection(ip, key_filename, timeout=120, auth_timeout=60): + client = SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(ip, username=SSH_USER, key_filename=key_filename, timeout=timeout, auth_timeout=auth_timeout) + return client + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('project', help='Google Cloud Platform project name') + parser.add_argument('credentials', help='Google Cloud API Credentials JSON file path') + parser.add_argument('-z', '--zone', default=DEFAULT_ZONE, help='Google Cloud Platform instances zone') + parser.add_argument('-k', '--key-filename', default=None, help='SSH Auth key file') + + subprasers = parser.add_subparsers(dest='command') + + create = subprasers.add_parser('create', help='Create new worker instances') + create.add_argument('instances_amount', nargs='?', default=1, type=int, help='Number of instances to create') + + delete = subprasers.add_parser('delete', help='Delete worker instances') + delete.add_argument('instances_amount', nargs='?', default=None, type=int, + help='Number of instances to delete. If None, delete all instances.') + + list_ = subprasers.add_parser('list', help='List worker instances') + + upload_env = subprasers.add_parser('upload', help='Uploads environment files (project files)') + + args = parser.parse_args() + + os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.credentials + + compute = googleapiclient.discovery.build('compute', 'v1') + + instances = search_instances(compute, args.project, args.zone, *INSTANCE_LABEL) + + if args.command == 'list': + print(' '.join(instance['name'] for instance in instances)) + elif args.command == 'create': + operations = [] + for i in range(len(instances), len(instances) + args.instances_amount): + operation = create_instance(compute, args.project, args.zone, INSTANCE_NAME.format(i)) + operations.append(operation) + for operation in operations: + wait_for_operation(compute, args.project, args.zone, operation['name']) + + assert args.key_filename, 'Cannot upload environment files without key file' + + print('waiting one minute to instances be accessible...') + time.sleep(60) + + old_instances = [instance['name'] for instance in instances] + new_instances = search_instances(compute, args.project, args.zone, *INSTANCE_LABEL) + new_instances = [instance for instance in new_instances if instance['name'] not in old_instances] + + for instance in new_instances: + self_upload_to_instance(instance, args.key_filename) + + for instance in new_instances: + print('waiting for startup script finish to run') + wait_for_instance_startup_script(instance, args.key_filename) + + elif args.command == 'delete': + instances.reverse() + amount = args.instances_amount or len(instances) + operations = [] + for instance, _ in zip(instances, range(amount)): + operation = delete_instance(compute, args.project, args.zone, instance['name']) + operations.append(operation) + + for operation in operations: + wait_for_operation(compute, args.project, args.zone, operation['name']) + + elif args.command == 'upload': + assert args.key_filename, 'Cannot upload environment files without key file' + + for instance in instances: + self_upload_to_instance(instance, args.key_filename) diff --git a/main.py b/main.py new file mode 100644 index 0000000..3a405e0 --- /dev/null +++ b/main.py @@ -0,0 +1,363 @@ +import os +import random +import logging +import argparse + +import gcloud +import googleapiclient.discovery + +from othelo_mcts import * +from Othello import * +from agents import * + +#os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' + +logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) + +LOG_FORMAT = '[%(threadName)s] %(asctime)s %(levelname)s: %(message)s' +DEFAULT_CHECKPOINT_FILEPATH = './othelo_model_weights.h5' + + +class CircularArray: + def __init__(self, max_): + self._list = [] + self._max = max_ + self._index = 0 + + def append(self, item): + if len(self._list) < self._max: + return self._list.append(item) + self._list[self._index % len(self._list)] = item + self._index = (self._index % len(self._list)) + 1 + + def extend(self, items): + for item in items: + self.append(item) + + def __len__(self): + return len(self._list) + + def __getitem__(self, *args): + return self._list.__class__.__getitem__(self._list, *args) + + def __setitem__(self, *args): + return self._list.__class__.__setitem__(self._list, *args) + + def __iter__(self): + return iter(self._list) + + def __str__(self): + return str(self._list) + + def __repr__(self): + return '{}({})'.format(self.__class__.__name__, repr(len(self._list))) + + +def training(board_size, num_iterations, num_episodes, num_simulations, degree_exploration, temperature, neural_network, + e_greedy, evaluation_interval, evaluation_iterations, evaluation_agent_class, evaluation_agent_arguments, + temperature_threshold, self_play_training, self_play_interval, self_play_total_games, + self_play_threshold, checkpoint_filepath, worker_manager, training_buffer_size): + + if self_play_training: + assert self_play_threshold <= self_play_total_games, 'Self-play threshold must be less than self-play games' + + assert evaluation_iterations % worker_manager.total_workers() == 0, \ + 'Evaluation iterations must be divisible equally between the workers' + + historic = [] + total_episodes_done = 0 + training_examples = CircularArray(training_buffer_size) + old_neural_network = neural_network.copy() + for i in range(1, num_iterations + 1): + + logging.info(f'Iteration {i}/{num_iterations}: Starting iteration') + + if temperature_threshold and i >= temperature_threshold: + logging.info(f'Iteration {i}/{num_iterations}: Temperature threshold reached, ' + 'changing temperature to 0') + temperature = 0 + + logging.info(f'Iteration {i}/{num_iterations} - Generating episodes') + + logging.info(f'Iteration {i}/{num_iterations} - Waiting for episodes results') + worker_manager.run(WorkType.EXECUTE_EPISODE, num_episodes, board_size, + neural_network, degree_exploration, num_simulations, + temperature, e_greedy) + + episode_examples = worker_manager.get_results() + for training_example in episode_examples: + training_examples.extend(training_example) + + total_episodes_done += len(episode_examples) + + logging.info(f'Iteration {i}/{num_iterations}: All episodes finished') + + training_verbose = 2 if logging.root.level <= logging.DEBUG else None + + logging.info(f'Iteration {i}/{num_iterations}: Training model with episodes examples') + + random.shuffle(training_examples) + + history = neural_network.train(training_examples, verbose=training_verbose) + + if self_play_training and i % self_play_interval == 0: + logging.info(f'Iteration {i}/{num_iterations}: Self-play to evaluate the neural network training') + + self_play_results = [] + + logging.info(f'Iteration {i}/{num_iterations} - Generating BLACK x WHITE matches') + + logging.info(f'Iteration {i}/{num_iterations} - Waiting for BLACK x WHITE matches results') + worker_manager.run(WorkType.DUEL_BETWEEN_NEURAL_NETWORKS, self_play_total_games // 2, + board_size, neural_network, old_neural_network, + degree_exploration, num_simulations) + + for winner in worker_manager.get_results(): + if winner == 0: + self_play_results.append(neural_network) + else: + self_play_results.append(old_neural_network) + + logging.info(f'Iteration {i}/{num_iterations} - Generating WHITE x BLACK matches') + + logging.info(f'Iteration {i}/{num_iterations} - Waiting for WHITE x BLACK matches results') + worker_manager.run(WorkType.DUEL_BETWEEN_NEURAL_NETWORKS, self_play_total_games // 2 + self_play_total_games % 2, + board_size, old_neural_network, neural_network, + degree_exploration, num_simulations) + + for winner in worker_manager.get_results(): + if winner == 0: + self_play_results.append(old_neural_network) + else: + self_play_results.append(neural_network) + + new_net_victories = len([1 for winner in self_play_results if winner is neural_network]) + + logging.info(f'Iteration {i}/{num_iterations} - Game results: {new_net_victories}/{self_play_total_games}: ') + + if new_net_victories >= self_play_threshold: + logging.info(f'Iteration {i}/{num_iterations}: New neural network has been promoted') + + neural_network.save_checkpoint(checkpoint_filepath) + logging.info(f'Iteration {i}/{num_iterations}: Saving trained model in "{checkpoint_filepath}"') + old_neural_network = neural_network + else: + neural_network = old_neural_network + logging.info(f'Iteration {i}/{num_iterations}: New neural network has not been promoted') + else: + neural_network.save_checkpoint(checkpoint_filepath) + + if i % evaluation_interval == 0: + net_wins = 0 + net_black_win = 0 + net_white_win = 0 + black_games = 0 + white_games = 0 + + old_net_wins = 0 + old_net_black_win = 0 + old_net_white_win = 0 + old_black_games = 0 + old_white_games = 0 + + logging.info(f'New Neural Network evaluation!') + + for k in range(evaluation_iterations): + game = OthelloGame(board_size, current_player = OthelloPlayer.BLACK) + nn_agent = NeuralNetworkOthelloAgent(game, neural_network, num_simulations, degree_exploration) + random_agent = RandomOthelloAgent(game) + + agents = [nn_agent, random_agent] + random.shuffle(agents) + + agent_winner, points = duel_between_agents(game, *agents) + + if agents[0] is agent_winner: + winner = OthelloPlayer.BLACK + else: + winner = OthelloPlayer.WHITE + + logging.info(f'The player {winner} won with {points} points') + + + if agent_winner is nn_agent: + net_wins += 1 + if winner == OthelloPlayer.BLACK: + black_games += 1 + net_black_win += 1 + else: + white_games += 1 + net_white_win += 1 + logging.info(f'Total Episodes Runned: {total_episodes_done} - Network won: {net_wins}/{k+1} => {round((net_wins/(k+1)), 2)} win rate, black: {net_black_win}/{black_games} , white: {net_white_win}/{white_games} ') + else: + if winner == OthelloPlayer.BLACK: + black_games += 1 + else: + white_games += 1 + logging.info(f'Total Episodes Runned: {total_episodes_done} - Network won: {net_wins}/{k+1} => {round((net_wins/(k+1)), 2)} win rate, black: {net_black_win}/{black_games} , white: {net_white_win}/{white_games} ') + + + logging.info(f'Old Neural Network evaluation!') + + for k in range(evaluation_iterations): + game = OthelloGame(board_size) + nn_agent = NeuralNetworkOthelloAgent(game, old_neural_network, num_simulations, degree_exploration) + random_agent = RandomOthelloAgent(game) + + agents = [nn_agent, random_agent] + random.shuffle(agents) + + agent_winner, points = duel_between_agents(game, *agents) + + if agents[0] is agent_winner: + winner = OthelloPlayer.BLACK + else: + winner = OthelloPlayer.WHITE + + logging.info(f'The player {winner} won with {points} points') + + + if agent_winner is nn_agent: + old_net_wins += 1 + if winner == OthelloPlayer.BLACK: + old_black_games += 1 + old_net_black_win += 1 + else: + old_white_games += 1 + old_net_white_win += 1 + logging.info(f'Total Episodes Runned: {total_episodes_done} - Old Network won: {old_net_wins}/{k+1} => {round((old_net_wins/(k+1)), 2)} win rate, black: {old_net_black_win}/{old_black_games} , white: {old_net_white_win}/{old_white_games} ') + else: + if winner == OthelloPlayer.BLACK: + old_black_games += 1 + else: + old_white_games += 1 + logging.info(f'Total Episodes Runned: {total_episodes_done} - Old Network won: {old_net_wins}/{k+1} => {round((old_net_wins/(k+1)), 2)} win rate, black: {old_net_black_win}/{old_black_games} , white: {old_net_white_win}/{old_white_games} ') + + + + if net_wins > (old_net_wins * 1.1): + logging.info("Saving new network!") + historic.append( (total_episodes_done, (net_wins/evaluation_iterations)) ) + logging.info(historic) + neural_network.save_checkpoint(checkpoint_filepath) + old_neural_network = neural_network + else: + logging.info("Saving old network!") + historic.append( (total_episodes_done, (old_net_wins/evaluation_iterations)) ) + logging.info(historic) + old_neural_network.save_checkpoint(checkpoint_filepath) + neural_network = old_neural_network + + logging.info(f'Total episodes done: {total_episodes_done}') + + with open(f'examples-{board_size}.txt', 'w') as output: + output.write(str(training_examples)) + + with open(f'historic-last-training-session-{board_size}.txt', 'w') as output: + output.write(str(historic)) + + return historic + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('-b', '--board-size', default=6, type=int, help='Othello board size') + parser.add_argument('-i', '--iterations', default=80, type=int, help='Number of training iterations') + parser.add_argument('-e', '--episodes', default=100, type=int, help='Number of episodes by iterations') + parser.add_argument('-s', '--simulations', default=25, type=int, help='Number of MCTS simulations by episode') + parser.add_argument('-c', '--constant-upper-confidence', default=1, type=int, help='MCTS upper confidence bound constant') + parser.add_argument('-g', '--e-greedy', default=0.9, type=float, help='e constant used in e-greedy') + + parser.add_argument('-n', '--network-type', default=1, choices=(1, 2), help='1- OthelloNN, 2- BaseNN') + + # Default 3x 100 iterations of 6x6 Othello + parser.add_argument('-bf', '--buffer-size', default=8 * 32 * 100 * 3, type=int, help='Training buffer size') + + parser.add_argument('-sp', '--self-play', default=False, action='store_true', help='Do self-play at end of each iteration') + parser.add_argument('-si', '--self-play-interval', default=1, type=int, help='Number of iterations between self-play games') + parser.add_argument('-sg', '--self-play-games', default=10, type=int, help='Number of games during self-play games') + parser.add_argument('-st', '--self-play-threshold', default=6, type=int, help='Number of victories to promote neural network') + + parser.add_argument('-ea', '--evaluation-agent', default='random', choices=['random'], help='Agent for neural network evaluation') + parser.add_argument('-ei', '--evaluation-interval', default=5, type=int, help='Number of iterations between evaluations') + parser.add_argument('-eg', '--evaluation-games', default=12, type=int, help='Number of matches against the evaluation agent') + + parser.add_argument('-ep', '--epochs', default=10, type=int, help='Number of epochs for neural network training') + parser.add_argument('-lr', '--learning-rate', default=0.001, type=float, help='Neural network training learning rate') + parser.add_argument('-dp', '--dropout', default=0.3, type=float, help='Neural network training dropout') + parser.add_argument('-bs', '--batch-size', default=32, type=int, help='Neural network training batch size') + + parser.add_argument('-tw', '--thread-workers', default=1, type=int, help='Number of Thread workers to do training tasks') + parser.add_argument('-gw', '--google-workers', default=False, action='store_true', help='Use Google Cloud workers') + parser.add_argument('-gt', '--google-workers-label', default=gcloud.INSTANCE_LABEL[0], + help='Tag of Google Cloud machines which will be as worker') + parser.add_argument('-gc', '--google-credentials', default=None, + help='Google Cloud API Credentials JSON file path') + parser.add_argument('-gp', '--google-project', default=None, help='Google Cloud Platform project name') + parser.add_argument('-gz', '--google-zone', default=gcloud.DEFAULT_ZONE, + help='Google Cloud Platform instances zone') + parser.add_argument('-gk', '--google-key-filename', default=None, + help='Google Cloud SSH Private key') + + parser.add_argument('-o', '--output-file', default=DEFAULT_CHECKPOINT_FILEPATH, help='File path to save neural network weights') + parser.add_argument('-w', '--weights-file', default=None, help='File path to load neural network weights') + parser.add_argument('-l', '--log-level', default='INFO', choices=('INFO', 'DEBUG', 'WARNING', 'ERROR'), help='Logging level') + parser.add_argument('-t', '--temperature', default=1, type=int, help='Policy temperature parameter') + parser.add_argument('-tt', '--temperature-threshold', default=25, type=int, help='Number of iterations using the temperature ' + 'parameter before changing to 0') + + parser.add_argument('-ug', '--use-gpu', default=False, action='store_true', help='Enable GPU for Tensorflow') + + args = parser.parse_args() + + if not args.use_gpu: + os.environ['CUDA_VISIBLE_DEVICES'] = '-1' + + from agents import RandomOthelloAgent + from Net.NNet import NNetWrapper, NeuralNets + from workers import WorkerManager, WorkType, ThreadWorker, GoogleCloudWorker + + logging.basicConfig(level=getattr(logging, args.log_level, None), format=LOG_FORMAT) + + net_type = NeuralNets.ONN if args.network_type == 1 else NeuralNets.BNN + + neural_network = NNetWrapper(board_size=(args.board_size, args.board_size), batch_size=args.batch_size, + epochs=args.epochs, lr=args.learning_rate, dropout=args.dropout, network=net_type) + if args.weights_file: + neural_network.load_checkpoint(args.weights_file) + + evaluation_agent_class = RandomOthelloAgent + evaluation_agent_arguments = dict() + + worker_manager = WorkerManager() + worker_manager.add_worker(ThreadWorker()) + + if args.google_workers: + assert args.google_credentials, 'Google Cloud Credentials required' + assert args.google_project, 'Google Cloud Project name required' + assert args.google_zone, 'Google Cloud instances zone required' + assert args.google_key_filename, 'Google Cloud SSH Private key required' + + os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = args.google_credentials + + compute = googleapiclient.discovery.build('compute', 'v1') + + instances = gcloud.search_instances(compute, args.google_project, args.google_zone, + args.google_workers_label, 'true') + + for instance in instances: + worker = GoogleCloudWorker(compute, args.google_project, args.google_zone, + instance['name'], args.google_key_filename) + worker_manager.add_worker(worker) + else: + for _ in range(args.thread_workers - 1): + worker_manager.add_worker(ThreadWorker()) + + training(board_size=args.board_size, num_iterations=args.iterations, num_episodes=args.episodes, num_simulations=args.simulations, + degree_exploration=args.constant_upper_confidence, temperature=args.temperature, neural_network=neural_network, + e_greedy=args.e_greedy, evaluation_interval=args.evaluation_interval, evaluation_iterations=args.evaluation_games, + evaluation_agent_class=evaluation_agent_class, evaluation_agent_arguments=evaluation_agent_arguments, + temperature_threshold=args.temperature_threshold, self_play_training=args.self_play, + self_play_interval=args.self_play_interval, self_play_total_games=args.self_play_games, + self_play_threshold=args.self_play_threshold, worker_manager=worker_manager, + checkpoint_filepath=args.output_file, training_buffer_size=args.buffer_size) diff --git a/othelo_mcts.py b/othelo_mcts.py new file mode 100644 index 0000000..d515d7e --- /dev/null +++ b/othelo_mcts.py @@ -0,0 +1,88 @@ +import random +import numpy as np + +from Net.NNet import NeuralNets +from MCTS import MCTS, hash_ndarray +from Othello import BoardView, OthelloPlayer, OthelloGame + + +class OthelloMCTS(MCTS): + def __init__(self, board_size, neural_network, degree_exploration): + self._board_size = board_size + self._neural_network = neural_network + self._predict_cache = {} + + if self._neural_network.network_type is NeuralNets.ONN: + self._board_view_type = BoardView.TWO_CHANNELS + elif self._neural_network.network_type is NeuralNets.BNN: + self._board_view_type = BoardView.ONE_CHANNEL + + super().__init__(degree_exploration) + + def simulate(self, state, player): + board = np.copy(state) + if player is OthelloPlayer.WHITE: + board = OthelloGame.invert_board(board) + return super().simulate(board) + + def is_terminal_state(self, state): + return OthelloGame.has_board_finished(state) + + def get_state_value(self, state): + return self._neural_network_predict(state)[1] + + def get_state_reward(self, state): + return OthelloGame.get_board_winning_player(state)[0].value + + def get_state_actions_propabilities(self, state): + return self._neural_network_predict(state)[0] + + def get_state_actions(self, state): + return [tuple(a) for a in OthelloGame.get_player_valid_actions(state, OthelloPlayer.BLACK)] + + def get_next_state(self, state, action): + board = np.copy(state) + OthelloGame.flip_board_squares(board, OthelloPlayer.BLACK, *action) + if OthelloGame.has_player_actions_on_board(board, OthelloPlayer.WHITE): + # Invert board to keep using BLACK perspective + board = OthelloGame.invert_board(board) + return board + + def get_policy_action_probabilities(self, state, temperature): + probabilities = np.zeros((self._board_size, self._board_size)) + + if temperature == 0: + for action in self._get_state_actions(state): + row, col = action + probabilities[row, col] = self.N(state, action) + bests = np.argwhere(probabilities == probabilities.max()) + row, col = random.choice(bests) + probabilities = np.zeros((self._board_size, self._board_size)) + probabilities[row, col] = 1 + return probabilities + + for action in self._get_state_actions(state): + row, col = action + probabilities[row, col] = self.N(state, action) ** (1 / temperature) + return probabilities / (np.sum(probabilities) or 1) + + def moves_scaled_by_valid_moves(self, state): + network_probabilities = self.get_state_actions_propabilities(state) + mask = self._mask_valid_moves(state) + probabilities = network_probabilities * mask + return probabilities + + def _mask_valid_moves(self, state): + board_mask = np.zeros((self._board_size, self._board_size)) + for action in self._get_state_actions(state): + row, col = action + board_mask[row, col] = 1 + return board_mask + + def _neural_network_predict(self, state): + hash_ = hash_ndarray(state) + if hash_ not in self._predict_cache: + if self._board_view_type == BoardView.ONE_CHANNEL: + state = OthelloGame.convert_to_one_channel_board(state) + self._predict_cache[hash_] = self._neural_network.predict(state) + return self._predict_cache[hash_] diff --git a/pickle_training.py b/pickle_training.py new file mode 100644 index 0000000..8752d47 --- /dev/null +++ b/pickle_training.py @@ -0,0 +1,79 @@ +import os +import base64 +import pickle +import inspect +import argparse + +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' + +from Net.NNet import NNetWrapper + +import training + +def convert_to_base64_pickle(obj): + return base64.b64encode(pickle.dumps(obj)).decode('ascii') + + +def unpack_base64_pickle(pickle_dump): + return pickle.loads(base64.b64decode(pickle_dump)) + + +def execute_episode(board_size, weights_file, degree_exploration, + num_simulations, policy_temperature, e_greedy): + neural_network = NNetWrapper(board_size=(board_size, board_size)) + neural_network.load_checkpoint(weights_file) + + return training.execute_episode(board_size, neural_network, degree_exploration, + num_simulations, policy_temperature, e_greedy) + +def duel_between_neural_networks(board_size, weights_1_file, weights_2_file, degree_exploration, num_simulations): + neural_network_1 = NNetWrapper(board_size=(board_size, board_size)) + neural_network_1.load_checkpoint(weights_1_file) + + neural_network_2 = NNetWrapper(board_size=(board_size, board_size)) + neural_network_2.load_checkpoint(weights_2_file) + + return training.duel_between_neural_networks(board_size, neural_network_1, neural_network_2, + degree_exploration, num_simulations) + + +def evaluate_neural_network(board_size, total_iterations, weights_1_file, num_simulations, degree_exploration, + agent_class, agent_arguments): + # FIXME This function doesn't work if agent_class is a NeuralNetworkOthelloAgent, + # agent_arguments must pass a neural network + neural_network = NNetWrapper(board_size=(board_size, board_size)) + neural_network.load_checkpoint(weights_file) + + return training.evaluate_neural_network(board_size, total_iterations, neural_network, num_simulations, degree_exploration, + agent_class, agent_arguments) + + +def pack_arguments_to_pickle(*args): + return [convert_to_base64_pickle(o) for o in args] + + +def unpack_pickle_arguments(*args): + return [unpack_base64_pickle(o) for o in args] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('command', choices=['execute_episode', + 'duel_between_neural_networks', + 'evaluate_neural_network'], help='Training method') + parser.add_argument('args', nargs='+', help='Method arguments in Base-64 Pickle format') + + args = parser.parse_args() + + if args.command == 'execute_episode': + method = execute_episode + elif args.command == 'duel_between_neural_networks': + method = duel_between_neural_networks + elif args.command == 'evaluate_neural_network': + method = evaluate_neural_network + + parameters = unpack_pickle_arguments(*args.args) + result = method(*parameters) + result = convert_to_base64_pickle(result) + + print(result) diff --git a/requirements.txt b/requirements.txt index a4a98c2..f5b83ec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,18 +4,23 @@ cachetools==4.1.1 certifi==2020.11.8 chardet==3.0.4 gast==0.3.3 +google-api-python-client==1.12.8 google-auth==1.23.0 +google-auth-httplib2==0.0.4 google-auth-oauthlib==0.4.2 google-pasta==0.2.0 grpcio==1.33.2 h5py==2.10.0 +humanfriendly==8.2 idna==2.10 Keras==2.4.3 Keras-Preprocessing==1.1.2 Markdown==3.3.3 numpy==1.18.5 oauthlib==3.1.0 +oauth2client==4.1.3 opt-einsum==3.3.0 +paramiko==2.7.2 protobuf==3.14.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 diff --git a/tensorboard.ipynb b/tensorboard.ipynb new file mode 100644 index 0000000..027c063 --- /dev/null +++ b/tensorboard.ipynb @@ -0,0 +1,77 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext tensorboard" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%tensorboard --logdir logs/fit" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/training.py b/training.py index 8a24ff5..03b769f 100644 --- a/training.py +++ b/training.py @@ -1,103 +1,30 @@ -import os import random import logging -import argparse import numpy as np -import concurrent.futures +from Net.NNet import NeuralNets -from Net.NNet import NNetWrapper, NeuralNets -from MCTS import MCTS, hash_ndarray -from Othello import OthelloGame, OthelloPlayer, BoardView -from Othello.random_agent import * - -LOG_FORMAT = '[%(threadName)s] %(asctime)s %(levelname)s: %(message)s' -DEFAULT_CHECKPOINT_FILEPATH = './othelo_model_weights' - +from othelo_mcts import OthelloMCTS +from agents import NeuralNetworkOthelloAgent, duel_between_agents -class OthelloMCTS(MCTS): - def __init__(self, board_size, neural_network, degree_exploration): - self._board_size = board_size - self._neural_network = neural_network - self._predict_cache = {} - - if self._neural_network.network_type is NeuralNets.ONN: - self._board_view_type = BoardView.TWO_CHANNELS - elif self._neural_network.network_type is NeuralNets.BNN: - self._board_view_type = BoardView.ONE_CHANNEL +from Othello import OthelloGame, OthelloPlayer, BoardView - super().__init__(degree_exploration) - - def simulate(self, state, player): - board = np.copy(state) - if player is OthelloPlayer.WHITE: - board = OthelloGame.invert_board(board) - return super().simulate(board) - - def is_terminal_state(self, state): - return OthelloGame.has_board_finished(state) - - def get_state_value(self, state): - return self._neural_network_predict(state)[1] - def get_state_reward(self, state): - return OthelloGame.get_board_winning_player(state)[0].value +def training_example_symmetries(board, policy): + symetric_examples = [] + for rotation in range(1, 5): + for flip in [True, False]: + new_board_example = np.rot90(board, k=rotation) + new_policy_example = np.rot90(policy, k=rotation) + if flip: + new_board_example = np.fliplr(new_board_example) + new_policy_example = np.fliplr(new_policy_example) + symetric_examples.append((new_board_example, new_policy_example)) + return symetric_examples - def get_state_actions_propabilities(self, state): - return self._neural_network_predict(state)[0] - - def get_state_actions(self, state): - return [tuple(a) for a in OthelloGame.get_player_valid_actions(state, OthelloPlayer.BLACK)] - - def get_next_state(self, state, action): - board = np.copy(state) - OthelloGame.flip_board_squares(board, OthelloPlayer.BLACK, *action) - if OthelloGame.has_player_actions_on_board(board, OthelloPlayer.WHITE): - # Invert board to keep using BLACK perspective - board = OthelloGame.invert_board(board) - return board - - def get_policy_action_probabilities(self, state, temperature): - probabilities = np.zeros((self._board_size, self._board_size)) - - if temperature == 0: - for action in self._get_state_actions(state): - row, col = action - probabilities[row, col] = self.N(state, action) - bests = np.argwhere(probabilities == probabilities.max()) - row, col = random.choice(bests) - probabilities = np.zeros((self._board_size, self._board_size)) - probabilities[row, col] = 1 - return probabilities - - for action in self._get_state_actions(state): - row, col = action - probabilities[row, col] = self.N(state, action) ** (1 / temperature) - return probabilities / (np.sum(probabilities) or 1) - - def moves_scaled_by_valid_moves(self, state): - network_probabilities = self.get_state_actions_propabilities(state) - mask = self._mask_valid_moves(state) - probabilities = network_probabilities * mask - return probabilities - - def _mask_valid_moves(self, state): - board_mask = np.zeros((self._board_size, self._board_size)) - for action in self._get_state_actions(state): - row, col = action - board_mask[row, col] = 1 - return board_mask - - def _neural_network_predict(self, state): - hash_ = hash_ndarray(state) - if hash_ not in self._predict_cache: - if self._board_view_type == BoardView.ONE_CHANNEL: - state = OthelloGame.convert_to_one_channel_board(state) - self._predict_cache[hash_] = self._neural_network.predict(state) - return self._predict_cache[hash_] - -def execute_episode(board_size, neural_network, degree_exploration, num_simulations, policy_temperature, e_greedy): +def execute_episode(board_size, neural_network, degree_exploration, + num_simulations, policy_temperature, e_greedy): examples = [] game = OthelloGame(board_size) @@ -111,6 +38,7 @@ def execute_episode(board_size, neural_network, degree_exploration, num_simulati while not game.has_finished(): state = game.board(BoardView.TWO_CHANNELS) + for _ in range(num_simulations): mcts.simulate(state, game.current_player) @@ -119,29 +47,27 @@ def execute_episode(board_size, neural_network, degree_exploration, num_simulati policy = mcts.get_policy_action_probabilities(state, policy_temperature) - #e-greedy + # e-greedy coin = random.random() + if coin <= e_greedy: action = np.argwhere(policy == policy.max())[0] else: action = mcts.get_state_actions(state)[np.random.choice(len(mcts.get_state_actions(state)))] - action_choosed = np.zeros((board_size, board_size)) action_choosed[action[0]][action[1]] = 1 - #save examples - if board_view_type == BoardView.ONE_CHANNEL: - example = game.board(BoardView.ONE_CHANNEL), action_choosed, game.current_player - else: - example = state, action_choosed, game.current_player - examples.append(example) + example = game.board(board_view_type), action_choosed, game.current_player + + for board_example, policy_example in training_example_symmetries(game.board(board_view_type), action_choosed): + example = board_example, policy_example, game.current_player + examples.append(example) game.play(*action) - logging.info(game.board(BoardView.ONE_CHANNEL)) winner, winner_points = game.get_winning_player() - logging.info(f'The Winner obtained: {winner_points} points.') + logging.info(f'Episode finished: The winner obtained {winner_points} points.') return [(state, policy, 1 if winner == player else -1) for state, policy, player in examples] @@ -149,239 +75,41 @@ def execute_episode(board_size, neural_network, degree_exploration, num_simulati def duel_between_neural_networks(board_size, neural_network_1, neural_network_2, degree_exploration, num_simulations): game = OthelloGame(board_size) - players_neural_networks = { - OthelloPlayer.BLACK: neural_network_1, - OthelloPlayer.WHITE: neural_network_2 - } + nn_1_agent = NeuralNetworkOthelloAgent(game, neural_network_1, num_simulations, degree_exploration) + nn_2_agent = NeuralNetworkOthelloAgent(game, neural_network_2, num_simulations, degree_exploration) - neural_networks_mcts = { - neural_network_1 : OthelloMCTS(board_size, neural_network_1, degree_exploration), - neural_network_2 : OthelloMCTS(board_size, neural_network_2, degree_exploration) + agents = { + nn_1_agent: neural_network_1, + nn_2_agent: neural_network_2 } - while not game.has_finished(): - nn = players_neural_networks[game.current_player] - state = game.board(BoardView.TWO_CHANNELS) + agent_winner = duel_between_agents(game, nn_1_agent, nn_2_agent) - logging.info(f'Round: {game.round}') - for _ in range(num_simulations): - neural_networks_mcts[nn].simulate(state, game.current_player) - - if game.current_player == OthelloPlayer.WHITE: - state = OthelloGame.invert_board(state) - - action_probabilities = neural_networks_mcts[nn].get_policy_action_probabilities(state, 0) - valid_actions = game.get_valid_actions() - best_action = max(valid_actions, key=lambda position: action_probabilities[tuple(position)]) - game.play(*best_action) + return 0 if agents[agent_winner] is neural_network_1 else 1 - return players_neural_networks[game.get_winning_player()[0]] +def evaluate_neural_network(board_size, total_iterations, neural_network, num_simulations, degree_exploration, + agent_class, agent_arguments): + net_wins = 0 -def machine_move(game, neural_networks_mcts, num_simulations): - ''' - Makes the movement of the machine according to the policy already known - ''' + logging.info(f'Neural Network Evaluation: Started') - state = game.board(BoardView.TWO_CHANNELS) - for _ in range(num_simulations): - neural_networks_mcts.simulate(state, game.current_player) + for iteration in range(1, total_iterations + 1): - if game.current_player == OthelloPlayer.WHITE: - state = OthelloGame.invert_board(state) + game = OthelloGame(board_size) - action_probabilities = neural_networks_mcts.get_policy_action_probabilities(state, 0) + nn_agent = NeuralNetworkOthelloAgent(game, neural_network, num_simulations, degree_exploration) + evaluation_agent = agent_class(game, *agent_arguments) - valid_actions = game.get_valid_actions() - best_action = max(valid_actions, key=lambda position: action_probabilities[tuple(position)]) - game.play(*best_action) + agents = [evaluation_agent, nn_agent] + random.shuffle(agents) - -def training(board_size, num_iterations, num_episodes, num_simulations, degree_exploration, temperature, - total_games, victory_threshold, neural_network, random_agent_interval, random_agent_fights, games_interval, - e_greedy, temperature_threshold=None, checkpoint_filepath=None, episode_thread_pool=1, - game_thread_pool=1, net_type=NeuralNets.ONN): - - total_episodes_done = 0 - historic = [] - training_examples = [] - for i in range(1, num_iterations + 1): - old_neural_network = neural_network.copy() - - logging.info(f'Iteration {i}/{num_iterations}: Starting iteration') - - if temperature_threshold and i >= temperature_threshold: - logging.info(f'Iteration {i}/{num_iterations}: Temperature threshold reached, ' - 'changing temperature to 0') - temperature = 0 - - logging.info(f'Iteration {i}/{num_iterations} - Generating episodes') - - with concurrent.futures.ThreadPoolExecutor(max_workers=episode_thread_pool) as executor: - future_results = {} - - for e in range(1, num_episodes + 1): - total_episodes_done += 1 - future_result = executor.submit(execute_episode, board_size, neural_network, degree_exploration, - num_simulations, temperature, e_greedy) - future_results[future_result] = e - - logging.info(f'Iteration {i}/{num_iterations} - Waiting for episodes results') - - for future in concurrent.futures.as_completed(future_results): - e = future_results[future] - logging.info(f'Iteration {i}/{num_iterations} - Episode {e}: Finished') - episode_examples = future.result() - training_examples.extend(episode_examples) - - logging.info(f'Iteration {i}/{num_iterations}: All episodes finished') - - training_verbose = 2 if logging.root.level <= logging.DEBUG else None - - logging.info(f'Iteration {i}/{num_iterations}: Training model with episodes examples') - - random.shuffle(training_examples) - history = neural_network.train(training_examples, verbose=training_verbose) - - - if games_interval > 0 and i % games_interval: - logging.info(f'Iteration {i}/{num_iterations}: Self-play to evaluate the neural network training') - - new_net_victories = 0 - - logging.info(f'Iteration {i}/{num_iterations} - Generating matches') - - with concurrent.futures.ThreadPoolExecutor(max_workers=game_thread_pool) as executor: - future_results = {} - neural_networks = [old_neural_network, neural_network] - - for g in range(1, total_games + 1): - random.shuffle(neural_networks) - future_result = executor.submit(duel_between_neural_networks, board_size, - neural_networks[0], neural_networks[1], - degree_exploration, num_simulations) - future_results[future_result] = g - - logging.info(f'Iteration {i}/{num_iterations} - Waiting for matches results') - - for future in concurrent.futures.as_completed(future_results): - g = future_results[future] - winner = future.result() - if winner is neural_network: - logging.info(f'Iteration {i}/{num_iterations} - Game {g}/{total_games}: New neural network has won') - new_net_victories += 1 - else: - logging.info(f'Iteration {i}/{num_iterations} - Game {g}/{total_games}: New neural network has lost') - - logging.info(f'Iteration {i}/{num_iterations} - Game {g}/{total_games}: ' - f'Promotion status ({new_net_victories}/{victory_threshold})') - - if new_net_victories >= victory_threshold: - logging.info(f'Iteration {i}/{num_iterations}: New neural network has been promoted') - - neural_network.save_checkpoint(checkpoint_filepath) - logging.info(f'Iteration {i}/{num_iterations}: Saving trained model in "{checkpoint_filepath}"') - else: - neural_network = old_neural_network + agent_winner = duel_between_agents(game, *agents) - # gambiarra - neural_network.save_checkpoint(checkpoint_filepath) - - if (i % random_agent_interval) == 0: - color = [OthelloPlayer.BLACK, OthelloPlayer.WHITE] - net_wins = 0 - - for k in range(random_agent_fights): - random.shuffle(color) - game = OthelloGame(board_size, current_player = OthelloPlayer.BLACK) - neural_networks_mcts = OthelloMCTS(board_size, neural_network, degree_exploration=1) - - while not game.has_finished(): - #neural network move - if game.current_player is color[0]: - machine_move(game, neural_networks_mcts, num_simulations) - - #random agent move - else: - random_agent(game) - - winner, points = game.get_winning_player() - logging.info(f'The player {winner} won with {points} points') - - if winner == color[0]: - net_wins += 1 - logging.info(f'Total Episodes Runned: {total_episodes_done} - Network won: {net_wins}/{k+1}') - else: - logging.info(f'Total Episodes Runned: {total_episodes_done} - Network lost: {net_wins}/{k+1}') - - historic.append( (total_episodes_done, (net_wins/random_agent_fights)) ) - logging.info(historic) - - with open(f'historic-last-training-session-{board_size}.txt', 'w') as output: - output.write(str(historic)) - with open(f'examples-{board_size}.txt', 'w') as output: - output.write(str(training_examples)) - - - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('-b', '--board-size', default=6, type=int, help='Othello board size') - parser.add_argument('-i', '--iterations', default=80, type=int, help='Number of training iterations') - parser.add_argument('-e', '--episodes', default=100, type=int, help='Number of episodes by iterations') - parser.add_argument('-s', '--simulations', default=25, type=int, help='Number of MCTS simulations by episode') - parser.add_argument('-g', '--total-games', default=10, type=int, help='Total of games to evaluate neural network training') - parser.add_argument('-v', '--victory-threshold', default=6, type=int, help='Number of victories to promote neural network training') - parser.add_argument('-c', '--constant-upper-confidence', default=1, type=int, help='MCTS upper confidence bound constant') - parser.add_argument('-eg', '--e-greedy', default=0.9, type=float, help='e constant used in e-greedy') - - parser.add_argument('-n', '--network-type', default=1, type=int, help='1- OthelloNN, 2- BaseNN') - - parser.add_argument('-ri', '--random-agent-interval', default=5, type=int, help='Number of iterations between evaluation against random agent') - parser.add_argument('-rf', '--random-agent-fights', default=10, type=int, help='Number of fights against random agent') - - parser.add_argument('-gi', '--games-interval', default=1, type=int, help='Number of iterations between self-play games') - - parser.add_argument('-ep', '--epochs', default=10, type=int, help='Number of epochs for neural network training') - parser.add_argument('-lr', '--learning-rate', default=0.001, type=float, help='Neural network training learning rate') - parser.add_argument('-dp', '--dropout', default=0.3, type=float, help='Neural network training dropout') - parser.add_argument('-bs', '--batch-size', default=32, type=int, help='Neural network training batch size') - - parser.add_argument('-et', '--episode-threads', default=1, type=int, help='Number of episodes to be executed asynchronously') - parser.add_argument('-gt', '--game-threads', default=1, type=int, help='Number of games to be executed asynchronously ' - 'during evaluation') - - parser.add_argument('-o', '--output-file', default=DEFAULT_CHECKPOINT_FILEPATH, help='File path to save neural network weights') - parser.add_argument('-w', '--weights-file', default=None, help='File path to load neural network weights') - parser.add_argument('-l', '--log-level', default='INFO', choices=('INFO', 'DEBUG', 'WARNING', 'ERROR'), help='Logging level') - parser.add_argument('-t', '--temperature', default=1, type=int, help='Policy temperature parameter') - parser.add_argument('-tt', '--temperature-threshold', default=25, type=int, help='Number of iterations using the temperature ' - 'parameter before changing to 0') - - parser.add_argument('-ug', '--use-gpu', default=False, action='store_true', help='Enable GPU for Tensorflow') - - args = parser.parse_args() - - if not args.use_gpu: - os.environ['CUDA_VISIBLE_DEVICES'] = '-1' - - from Net.NNet import NNetWrapper - - assert args.victory_threshold <= args.total_games, '"victory-threshold" must be less than "total-games"' - - logging.basicConfig(level=getattr(logging, args.log_level, None), format=LOG_FORMAT) - - net_type = NeuralNets.ONN if args.network_type == 1 else NeuralNets.BNN + if agent_winner is nn_agent: + net_wins += 1 + logging.info(f'Neural Network Evaluation: Network won') + else: + logging.info(f'Neural Network Evaluation: Network lost') - neural_network = NNetWrapper(board_size=(args.board_size, args.board_size), batch_size=args.batch_size, - epochs=args.epochs, lr=args.learning_rate, dropout=args.dropout, network=net_type) - if args.weights_file: - neural_network.load_checkpoint(args.weights_file) - - training(board_size=args.board_size, num_iterations=args.iterations, num_episodes=args.episodes, num_simulations=args.simulations, - degree_exploration=args.constant_upper_confidence, temperature=args.temperature, total_games=args.total_games, - victory_threshold=args.victory_threshold, neural_network=neural_network, temperature_threshold=args.temperature_threshold, - checkpoint_filepath=args.output_file, episode_thread_pool=args.episode_threads, game_thread_pool=args.game_threads, - net_type=args.network_type, random_agent_interval=args.random_agent_interval, random_agent_fights=args.random_agent_fights, - games_interval=args.games_interval, e_greedy=args.e_greedy) + return net_wins diff --git a/workers.py b/workers.py new file mode 100644 index 0000000..1d61868 --- /dev/null +++ b/workers.py @@ -0,0 +1,303 @@ +import os +import uuid +import logging +import tempfile +import contextlib +import humanfriendly + +from enum import Enum +from threading import Thread, Event + +from pickle_training import pack_arguments_to_pickle, unpack_base64_pickle +from gcloud import get_instance, ssh_connection, get_instance_external_ip, \ + get_instance_internal_ip, SSH_USER +from training import execute_episode, evaluate_neural_network, \ + duel_between_neural_networks + + +class WorkType: + EXECUTE_EPISODE = 'Execute Episode' + DUEL_BETWEEN_NEURAL_NETWORKS = 'Duel between Neural Networks' + EVALUATE_NEURAL_NETWORK = 'Evaluate Neural Network' + + +class Worker: + def __init__(self): + self._executor_thread = None + self._results = None + self._worker_manager = None + + def setup(self, work_type, iterations, *args, **kwargs): + pass + + def run(self, work_type, iterations, *args, **kwargs): + self._results = [] + self._executor_thread = Thread(name=self.get_executor_thread_name(), target=self._run, + args=(work_type, iterations, args, kwargs)) + self._executor_thread.start() + + def execute_episode(self, *args, **kwargs): + raise NotImplementedError + + def duel_between_neural_networks(self, *args, **kwargs): + raise NotImplementedError + + def evaluate_neural_network(self, *args, **kwargs): + raise NotImplementedError + + def teardown(self, work_type): + pass + + def wait(self): + return self._executor_thread.join() if self._executor_thread else None + + def get_results(self): + return self._results + + def _run(self, work_type, iterations, args, kwargs): + target = self.get_target(work_type) + self.setup(work_type, iterations, *args, *kwargs) + for i in range(1, iterations + 1): + logging.info(f'Task {work_type} ({i}/{iterations}): Starting...') + result = target(*args, **kwargs) + self._results.append(result) + logging.info(f'Task {work_type} ({i}/{iterations}): Finished!') + self.teardown(work_type) + + def get_executor_thread_name(self): + id_ = uuid.uuid4() + id_ = str(id_).split('-', 1)[0] + return f'{self.__class__.__name__}-{id_}' + + def get_target(self, work_type): + if work_type is WorkType.EXECUTE_EPISODE: + return self.execute_episode + elif work_type is WorkType.DUEL_BETWEEN_NEURAL_NETWORKS: + return self.duel_between_neural_networks + elif work_type is WorkType.EVALUATE_NEURAL_NETWORK: + return self.evaluate_neural_network + raise TypeError('expecting WorkType object') + + +class ThreadWorker(Worker): + def execute_episode(self, *args, **kwargs): + return execute_episode(*args, **kwargs) + + def evaluate_neural_network(self, *args, **kwargs): + return evaluate_neural_network(*args, **kwargs) + + def duel_between_neural_networks(self, *args, **kwargs): + return duel_between_neural_networks(*args, **kwargs) + + +class GoogleCloudWorker(Worker): + SSH_PRIV_KEY = f'/home/{SSH_USER}/.ssh/{SSH_USER}-internal' + SSH_PUB_KEY = f'{SSH_PRIV_KEY}.pub' + + def __init__(self, compute, project, zone, instance_name, key_filename): + instance = get_instance(compute, project, zone, instance_name) + if not instance: + raise RuntimeError(f'Instance {instance_name} not found') + self._instance = instance + self._key_filename = key_filename + + self._internal_ssh_pub_key = None + + self._neural_network_weights_file = [] + self._ssh = None + self._sftp = None + + def setup(self, work_type, iterations, *args, **kwargs): + pass + + def execute_episode(self, board_size, neural_network, degree_exploration, + num_simulations, policy_temperature, e_greedy): + args = [board_size, self._neural_network_weights_file[0], degree_exploration, + num_simulations, policy_temperature, e_greedy] + training_examples = self._remote_pickle_training_call('execute_episode', args) + return training_examples + + def evaluate_neural_network(self, board_size, total_iterations, neural_network, num_simulations, degree_exploration, + agent_class, agent_arguments): + args = [board_size, total_iterations, self._neural_network_weights_file[0], + num_simulations, degree_exploration, agent_class, agent_arguments] + net_wins = self._remote_pickle_training_call('evaluate_neural_network', args) + return net_wins + + def duel_between_neural_networks(self, board_size, neural_network_1, neural_network_2, + degree_exploration, num_simulations): + args = [board_size, self._neural_network_weights_file[0], + self._neural_network_weights_file[1], degree_exploration, num_simulations] + net_wins = self._remote_pickle_training_call('duel_between_neural_networks', args) + return net_wins + + def teardown(self, work_type): + logging.info(f'Task {work_type} Teardown: Deleting cache files...') + if self._neural_network_weights_file: + self._sftp = self._ssh.open_sftp() + + for filepath in self._neural_network_weights_file: + self._sftp.remove(filepath) + self._ssh.close() + + self._ssh = None + self._sftp = None + self._neural_network_weights_file = [] + + def _remote_pickle_training_call(self, command_name, args): + args = pack_arguments_to_pickle(*args) + command = 'docker run -v $PWD:/OthelloZero -v /tmp/:/tmp:ro igorxp5/othello-zero ' + command += f'OthelloZero/pickle_training.py {command_name} {" ".join(args)}' + + stdin, stdout, stderr = self._ssh.exec_command(command) + stdout.channel.recv_exit_status() + if stdout.channel.recv_exit_status() != 0: + error = stderr.read().decode() + logging.info(error) + raise RuntimeError(error) + + return unpack_base64_pickle(stdout.readlines()[0].strip()) + + +class WorkerManager: + def __init__(self): + self._workers = [] + self._waiter_thread = None + self._finished_event = Event() + + def run(self, work_type, iterations, *args, **kwargs): + if isinstance(work_type, WorkType): + raise TypeError('expecting WorkerType object') + self._finished_event.clear() + worker_iterations = WorkerManager.divide_iterations(iterations, len(self._workers)) + self._setup(work_type, iterations, *args, **kwargs) + for worker, total_iterations in zip(self._workers, worker_iterations): + worker.run(work_type, total_iterations, *args, **kwargs) + self._waiter_thread = Thread(target=self._wait_workers) + self._waiter_thread.start() + self._finished_event.wait() + + def get_results(self): + results = [] + for worker in self._workers: + results.extend(worker.get_results()) + return results + + def add_worker(self, worker): + if not isinstance(worker, Worker): + raise TypeError('expecting Worker object') + worker._worker_manager = self + self._workers.append(worker) + + def total_workers(self): + return len(self._workers) + + def _wait_workers(self): + for worker in self._workers: + worker.wait() + self._finished_event.set() + + def has_google_worker(self): + return any(isinstance(worker, GoogleCloudWorker) for worker in self._workers) + + def _setup(self, work_type, iterations, *args, **kwargs): + files_to_send = [] + if work_type is WorkType.EXECUTE_EPISODE and self.has_google_worker(): + _, filepath = tempfile.mkstemp(suffix='.h5') + neural_network = args[1] + neural_network.save_checkpoint(filepath) + files_to_send.append(filepath) + + elif work_type is WorkType.EVALUATE_NEURAL_NETWORK and self.has_google_worker(): + _, filepath = tempfile.mkstemp(suffix='.h5') + neural_network = args[2] + neural_network.save_checkpoint(filepath) + files_to_send.append(filepath) + + elif work_type is WorkType.DUEL_BETWEEN_NEURAL_NETWORKS and self.has_google_worker(): + _, filepath = tempfile.mkstemp(suffix='.h5') + neural_network_1 = args[1] + neural_network_1.save_checkpoint(filepath) + files_to_send.append(filepath) + + _, filepath = tempfile.mkstemp(suffix='.h5') + neural_network_2 = args[2] + neural_network_2.save_checkpoint(filepath) + files_to_send.append(filepath) + + if self.has_google_worker(): + for filepath in files_to_send: + file_size = humanfriendly.format_size(os.path.getsize(filepath)) + + scp_processes = [] + uploaded_worker = None + for worker in self._workers: + if isinstance(worker, GoogleCloudWorker): + worker._neural_network_weights_file.append(filepath) + ip = get_instance_external_ip(worker._instance) + worker._ssh = ssh_connection(ip, worker._key_filename) + if not uploaded_worker: + worker._sftp = worker._ssh.open_sftp() + logging.info(f'Uploading Neural network weights ({file_size})...') + worker._sftp.put(filepath, filepath) + logging.info(f'Neural network weights uploaded') + os.remove(filepath) + if not worker._internal_ssh_pub_key: + try: + worker._sftp.stat(worker.SSH_PRIV_KEY) + except IOError: + logging.info(f'Creating Internal SSH Key...') + command = f'ssh-keygen -q -N "" -t rsa -f {worker.SSH_PRIV_KEY} -C {SSH_USER}' + stdin, stdout, stderr = worker._ssh.exec_command(command) + if stdout.channel.recv_exit_status() != 0: + raise RuntimeError('cannot create internal ssh key') + logging.info(f'Internal SSH Key created successfully!') + logging.info(f'Saving SSH Public Key...') + with worker._sftp.open(worker.SSH_PUB_KEY) as file: + worker._internal_ssh_pub_key = file.read().decode('ascii') + logging.info(f'SSH Public Key saved!') + worker._sftp.close() + uploaded_worker = worker + + for worker in self._workers: + if isinstance(worker, GoogleCloudWorker) and uploaded_worker and uploaded_worker is not worker: + instance_name = worker._instance['name'] + if not worker._internal_ssh_pub_key: + logging.info(f'Adding SSH Key to {instance_name}...') + worker._sftp = worker._ssh.open_sftp() + try: + worker._sftp.stat(worker.SSH_PUB_KEY) + except IOError: + command = f'echo "{uploaded_worker._internal_ssh_pub_key}" > {worker.SSH_PUB_KEY}' + stdin, stdout, stderr = worker._ssh.exec_command(command) + if stdout.channel.recv_exit_status() != 0: + logging.error(stderr.read().decode()) + raise RuntimeError(f'cannot write pub key into {instance_name}') + command = f'cat {worker.SSH_PUB_KEY} >> /home/{SSH_USER}/.ssh/authorized_keys' + stdin, stdout, stderr = worker._ssh.exec_command(command) + if stdout.channel.recv_exit_status() != 0: + logging.error(stderr.read().decode()) + raise RuntimeError(f'cannot add key to authorized_keys') + finally: + worker._sftp.close() + worker._internal_ssh_pub_key = uploaded_worker._internal_ssh_pub_key + logging.info(f'SSH Key added to {instance_name}!') + + ip = get_instance_internal_ip(worker._instance) + logging.info(f'Sending Neural network weights to instance: {instance_name}') + scp_process = uploaded_worker._ssh.exec_command(f'scp -i {uploaded_worker.SSH_PRIV_KEY} {filepath} {ip}:{filepath}') + scp_processes.append(scp_process) + + logging.info(f'Waiting for neural networks be transfered...') + for sdtin, stdout, stderr in scp_processes: + if stdout.channel.recv_exit_status() != 0: + logging.error(stderr.read().decode()) + raise RuntimeError('something wrong happepend during file transfer') + logging.info(f'Neural network weights uploaded successfully') + + @staticmethod + def divide_iterations(total_iterations, total_workers): + worker_total_iterations = [0] * total_iterations + for i in range(total_iterations): + worker_total_iterations[i % total_workers] += 1 + return worker_total_iterations