diff --git a/neetbox/cli/parse.py b/neetbox/cli/parse.py index ea5759ed..8863d60c 100644 --- a/neetbox/cli/parse.py +++ b/neetbox/cli/parse.py @@ -3,7 +3,12 @@ import click import neetbox -from neetbox.daemon._apis import get_status_of +<<<<<<< Updated upstream +======= +from neetbox.daemon._client_apis import get_status_of +from neetbox.logging.formatting import LogStyle +>>>>>>> Stashed changes +from neetbox.logging.logger import Logger from neetbox.logging.formatting import LogStyle from neetbox.logging.logger import Logger diff --git a/neetbox/config/__init__.py b/neetbox/config/__init__.py index e56ce325..7ba1f7e0 100644 --- a/neetbox/config/__init__.py +++ b/neetbox/config/__init__.py @@ -4,10 +4,9 @@ # URL: https://gong.host # Date: 20230413 -import inspect -from typing import Optional, Union -from neetbox.config._config import DEFAULT_CONFIG as default +import inspect +from neetbox.config._config import DEFAULT_WORKSPACE_CONFIG as default from neetbox.config._config import get_current from neetbox.utils.framing import * diff --git a/neetbox/config/_config.py b/neetbox/config/_config.py index d494832c..0f61d9c4 100644 --- a/neetbox/config/_config.py +++ b/neetbox/config/_config.py @@ -7,9 +7,19 @@ import collections from multiprocessing import current_process -from neetbox.utils.mvc import patch +DEFAULT_GLOBAL_CONFIG = { + "daemon": { + "enable": True, + "allowIpython": False, + "servers": [{"host": "localhost", "port": "20202"},], + "mode": "detached", + "displayName": None, + "uploadInterval": 10, + "mute": True, + }, +} -DEFAULT_CONFIG = { +DEFAULT_WORKSPACE_CONFIG = { "name": None, "version": None, "logging": {"logdir": None}, @@ -36,7 +46,7 @@ }, }, } -WORKSPACE_CONFIG: dict = DEFAULT_CONFIG.copy() +WORKSPACE_CONFIG: dict = DEFAULT_WORKSPACE_CONFIG.copy() def update_with(cfg: dict): diff --git a/neetbox/daemon/__init__.py b/neetbox/daemon/__init__.py index f9dee65b..eebcbd79 100644 --- a/neetbox/daemon/__init__.py +++ b/neetbox/daemon/__init__.py @@ -16,6 +16,9 @@ from neetbox.pipeline import listen, watch from neetbox.utils import pkg +pkg.is_installed("flask", try_install_if_not=True) +pkg.is_installed("setproctitle", try_install_if_not=True) + def __attach_daemon(daemon_config): if not daemon_config["allowIpython"]: @@ -32,7 +35,14 @@ def __attach_daemon(daemon_config): _online_status = connect_daemon(daemon_config) # try to connect daemon logger.log("daemon connection status: " + str(_online_status)) if not _online_status: # if no daemon online - logger.log( + if daemon_config["server"] not in ["localhost", "127.0.0.1", "0.0.0.0"]: + # daemon not running on localhost + logger.err( + f"No daemon running at {daemon_config['server']}:{daemon_config['port']}, daemon will not be attached. Continue anyway." + ) + return False + + logger.warn( f"No daemon running at {daemon_config['server']}:{daemon_config['port']}, trying to create daemon..." ) diff --git a/neetbox/daemon/_client_apis.py b/neetbox/daemon/_client_apis.py new file mode 100644 index 00000000..26e00a99 --- /dev/null +++ b/neetbox/daemon/_client_apis.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# +# Author: GavinGong aka VisualDust +# URL: https://gong.host +# Date: 20230414 + + +from neetbox.daemon._local_http_client import _local_http_client +from neetbox.utils import pkg +from neetbox.utils.framing import get_frame_module_traceback + +module_name = get_frame_module_traceback().__name__ +assert pkg.is_installed( + "httpx", try_install_if_not=True +), f"{module_name} requires httpx which is not installed" +import json +import time + +import httpx + +from neetbox.config import get_module_level_config +from neetbox.logging import logger + +logger = logger("NEETBOX DAEMON API") + +__cfg = get_module_level_config() +daemon_address = f"{__cfg['server']}:{__cfg['port']}" +base_addr = f"http://{daemon_address}" + + +def get_status_of(name=None): + name = name or "" + api_addr = f"{base_addr}/status" + logger.info(f"Fetching from {api_addr}") + r = _local_http_client.get(api_addr) + _data = r.json() + return _data diff --git a/neetbox/daemon/_daemon_client.py b/neetbox/daemon/_daemon_client.py index 42f79e93..41a8cedf 100644 --- a/neetbox/daemon/_daemon_client.py +++ b/neetbox/daemon/_daemon_client.py @@ -31,22 +31,23 @@ def _upload_thread(daemon_config, base_addr, display_name): time.sleep(__TIME_UNIT_SEC) if _ctr % _upload_interval: # not zero continue - # upload data + # dump status as json _data = json.dumps(_update_value_dict, default=str) _headers = {"Content-Type": "application/json"} try: + # upload data resp = _local_http_client.post(_api_addr, data=_data, headers=_headers) - if resp.is_error: + if resp.is_error: # upload failed raise IOError(f"Failed to upload data to daemon. ({resp.status_code})") except Exception as e: - if _disconnect_flag: + if _disconnect_flag: # already in retries _disconnect_retries -= 1 - if not _disconnect_retries: + if not _disconnect_retries: # retry count down exceeded logger.err( "Failed to reconnect to daemon after {10} retries, Trying to launch new daemon..." ) from neetbox.daemon import _try_attach_daemon - + _try_attach_daemon() time.sleep(__TIME_UNIT_SEC) continue @@ -62,7 +63,7 @@ def _upload_thread(daemon_config, base_addr, display_name): _disconnect_retries = 10 -def connect_daemon(daemon_config): +def connect_daemon(daemon_config, launch_upload_thread=True): _display_name = get_module_level_config()["displayName"] _launch_config = get_module_level_config("@") _display_name = _display_name or _launch_config["name"] @@ -71,30 +72,31 @@ def connect_daemon(daemon_config): f"Connecting to daemon at {daemon_config['server']}:{daemon_config['port']} ..." ) _daemon_address = f"{daemon_config['server']}:{daemon_config['port']}" - base_addr = f"http://{_daemon_address}" + _base_addr = f"http://{_daemon_address}" # check if daemon is alive - def _check_daemon_alive(): + def _check_daemon_alive(_api_addr): _api_name = "hello" - _api_addr = f"{base_addr}/{_api_name}" + _api_addr = f"{_api_addr}/{_api_name}" r = _local_http_client.get(_api_addr) if r.is_error: raise IOError(f"Daemon at {_api_addr} is not alive. ({r.status_code})") - logger.log(f"daemon response from {_api_addr} is {r} ({r.status_code})") try: - _check_daemon_alive() + _check_daemon_alive(_base_addr) + logger.ok(f"daemon alive at {_base_addr}") except Exception as e: logger.err(e) return False - global __upload_thread - if __upload_thread is None or not __upload_thread.is_alive(): - __upload_thread = Thread( - target=_upload_thread, - daemon=True, - args=[daemon_config, base_addr, _display_name], - ) - __upload_thread.start() + if launch_upload_thread: + global __upload_thread + if __upload_thread is None or not __upload_thread.is_alive(): + __upload_thread = Thread( + target=_upload_thread, + daemon=True, + args=[daemon_config, _base_addr, _display_name], + ) + __upload_thread.start() return True diff --git a/neetbox/daemon/_daemon_launcher.py b/neetbox/daemon/_daemon_launcher.py index af3aa361..4685687a 100644 --- a/neetbox/daemon/_daemon_launcher.py +++ b/neetbox/daemon/_daemon_launcher.py @@ -5,8 +5,7 @@ import os import sys -from neetbox.daemon._daemon import daemon_process - +from neetbox.daemon._flask_server import daemon_process # sys.stdout=open(r'D:\Projects\ML\neetbox\logdir\daemon.log', 'a+') diff --git a/neetbox/daemon/_flask_server.py b/neetbox/daemon/_flask_server.py new file mode 100644 index 00000000..15ad0ce4 --- /dev/null +++ b/neetbox/daemon/_flask_server.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# +# Author: GavinGong aka VisualDust +# URL: https://gong.host +# Date: 20230414 + +from neetbox.utils import pkg +from neetbox.utils.framing import get_frame_module_traceback + +module_name = get_frame_module_traceback().__name__ +assert pkg.is_installed( + "flask", try_install_if_not=True +), f"{module_name} requires flask which is not installed" +import sys +import time +from threading import Thread + +from flask import Flask, abort, json, request + +from neetbox.config import get_module_level_config + +_STAT_POOL = {} +__DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC = 60 * 60 * 12 # 12 Hours +__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC +__DAEMON_NAME = "NEETBOX DAEMON" + + +def daemon_process(daemon_config=None): + import setproctitle + + setproctitle.setproctitle(__DAEMON_NAME) + daemon_config = daemon_config or get_module_level_config() + api = Flask(__DAEMON_NAME) + + @api.route("/hello", methods=["GET"]) + def just_send_hello(): + return json.dumps({"hello": "hello"}) + + @api.route("/status", methods=["GET"], defaults={"name": None}) + @api.route("/status/", methods=["GET"]) + def return_status_of(name): + global __COUNT_DOWN + global _STAT_POOL + __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC + _returning_stat = dict(_STAT_POOL) + if not name: + pass # returning full dict + elif name in _returning_stat: + _returning_stat = _returning_stat[name] # returning specific status + else: + abort(404) + return _returning_stat + + @api.route("/status/list", methods=["GET"]) + def return_names_of_status(name): + global __COUNT_DOWN + global _STAT_POOL + __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC + _names = {_STAT_POOL.keys()} + return _names + + @api.route("/sync/", methods=["POST"]) + def sync_status_of(name): + global __COUNT_DOWN + global _STAT_POOL + __COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC + _json_data = request.get_json() + _STAT_POOL[name] = _json_data + return "ok" + + @api.route("/shutdown", methods=["POST"]) + def shutdown(): + global __COUNT_DOWN + __COUNT_DOWN = -1 + + def __sleep_and_shutdown(secs=3): + time.sleep(secs=secs) + sys.exit(0) + + Thread(target=__sleep_and_shutdown, args=(3)).start() # shutdown after 3 seconds + return "ok" + + def _count_down_thread(): + global __COUNT_DOWN + while True: + __COUNT_DOWN -= 1 + if not __COUNT_DOWN: + sys.exit(0) + time.sleep(1) + + count_down_thread = Thread(target=_count_down_thread, daemon=True) + count_down_thread.start() + + api.run(host="0.0.0.0", port=daemon_config["port"])