Skip to content

Commit

Permalink
feat: Impl the file upload interface for FLEX dataloading (#3557)
Browse files Browse the repository at this point in the history
- Upgrade the latest OpenApi Generator version to 7.3.0
- Return the graph related info for Portal
- Impl the file upload interface for FLEX dataloading
  • Loading branch information
lidongze0629 authored Feb 19, 2024
1 parent 448f3ac commit 738f645
Show file tree
Hide file tree
Showing 74 changed files with 1,563 additions and 702 deletions.
1 change: 1 addition & 0 deletions flex/coordinator/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gs_flex_coordinator/models/column_mapping.py
gs_flex_coordinator/models/connection.py
gs_flex_coordinator/models/connection_status.py
gs_flex_coordinator/models/deployment_info.py
gs_flex_coordinator/models/deployment_info_graphs_info_value.py
gs_flex_coordinator/models/deployment_status.py
gs_flex_coordinator/models/edge_mapping.py
gs_flex_coordinator/models/edge_mapping_destination_vertex_mappings_inner.py
Expand Down
2 changes: 1 addition & 1 deletion flex/coordinator/.openapi-generator/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.2.0
7.3.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from gs_flex_coordinator.core import client_wrapper
from gs_flex_coordinator.core import handle_api_exception
from gs_flex_coordinator import util


@handle_api_exception()
def upload_file(filestorage=None): # noqa: E501
"""upload_file
# noqa: E501
:param filestorage:
:type filestorage: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.upload_file(filestorage)
73 changes: 68 additions & 5 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
import datetime
import itertools
import logging
import os
import pickle
import socket
import threading
from typing import List, Union

import psutil
from gs_flex_coordinator.core.config import (CLUSTER_TYPE, INSTANCE_NAME,
SOLUTION)
from gs_flex_coordinator.core.config import (CLUSTER_TYPE,
COORDINATOR_STARTING_TIME,
DATASET_WORKSPACE, INSTANCE_NAME,
SOLUTION, WORKSPACE)
from gs_flex_coordinator.core.interactive import init_hqps_client
from gs_flex_coordinator.core.utils import encode_datetime
from gs_flex_coordinator.core.scheduler import schedule
from gs_flex_coordinator.core.utils import (GraphInfo, decode_datetimestr,
encode_datetime, get_current_time)
from gs_flex_coordinator.models import (DeploymentInfo, Graph, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
Expand All @@ -45,6 +51,34 @@ def __init__(self):
self._lock = threading.RLock()
# initialize specific client
self._client = self._initialize_client()
# graphs info
self._graphs_info = {}
# pickle path
self._pickle_path = os.path.join(WORKSPACE, "graphs_info.pickle")
# recover
self._try_to_recover_from_disk()

def _try_to_recover_from_disk(self):
try:
if os.path.exists(self._pickle_path):
logger.info("Recover graphs info from file %s", self._pickle_path)
with open(self._pickle_path, "rb") as f:
self._graphs_info = pickle.load(f)
except Exception as e:
logger.warn("Failed to recover graphs info: %s", str(e))
# set default graph info
for g in self.list_graphs():
if g.name not in self._graphs_info:
self._graphs_info[g.name] = GraphInfo(
name=g.name, creation_time=COORDINATOR_STARTING_TIME
)

def _pickle_graphs_info_impl(self):
try:
with open(self._pickle_path, "wb") as f:
pickle.dump(self._graphs_info, f)
except Exception as e:
logger.warn("Failed to dump graphs info: %s", str(e))

def _initialize_client(self):
service_initializer = {"INTERACTIVE": init_hqps_client}
Expand Down Expand Up @@ -73,10 +107,18 @@ def create_graph(self, graph: Graph) -> str:
graph_dict = graph.to_dict()
if "_schema" in graph_dict:
graph_dict["schema"] = graph_dict.pop("_schema")
return self._client.create_graph(graph_dict)
rlt = self._client.create_graph(graph_dict)
self._graphs_info[graph.name] = GraphInfo(
name=graph.name, creation_time=get_current_time()
)
self._pickle_graphs_info_impl()
return rlt

def delete_graph_by_name(self, graph_name: str) -> str:
return self._client.delete_graph_by_name(graph_name)
rlt = self._client.delete_graph_by_name(graph_name)
del self._graphs_info[graph_name]
self._pickle_graphs_info_impl()
return rlt

def create_procedure(self, graph_name: str, procedure: Procedure) -> str:
procedure_dict = procedure.to_dict()
Expand Down Expand Up @@ -111,10 +153,25 @@ def get_node_status(self) -> List[NodeStatus]:
return rlt

def get_deployment_info(self) -> DeploymentInfo:
# update graphs info
for job in self.list_jobs():
if (
job.detail["graph_name"] in self._graphs_info
and job.end_time is not None
):
self._graphs_info[job.detail["graph_name"]].last_dataloading_time = (
decode_datetimestr(job.end_time)
)
self._pickle_graphs_info_impl()
graphs_info = {}
for name, info in self._graphs_info.items():
graphs_info[name] = info.to_dict()
info = {
"name": INSTANCE_NAME,
"cluster_type": CLUSTER_TYPE,
"version": __version__,
"solution": SOLUTION,
"graphs_info": graphs_info,
}
return DeploymentInfo.from_dict(info)

Expand Down Expand Up @@ -159,5 +216,11 @@ def create_dataloading_job(
job_id = self._client.create_dataloading_job(graph_name, schema_mapping_dict)
return job_id

def upload_file(self, filestorage) -> str:
if CLUSTER_TYPE == "HOSTS":
filepath = os.path.join(DATASET_WORKSPACE, filestorage.filename)
filestorage.save(filepath)
return str(filepath)


client_wrapper = ClientWrapper()
10 changes: 10 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
#

import datetime
import logging
import os
import tempfile
Expand Down Expand Up @@ -63,6 +64,11 @@ def config_logging(log_level: str):
os.makedirs(ALERT_WORKSPACE, exist_ok=True)


# dataset workspace
DATASET_WORKSPACE = os.path.join(WORKSPACE, "dataset")
os.makedirs(DATASET_WORKSPACE, exist_ok=True)


# we use the solution encompasses the various applications and use cases of the
# product across different industries and business scenarios, e.g. "INTERACTIVE",
# "GRAPHSCOPE INSIGHT".
Expand All @@ -79,3 +85,7 @@ def config_logging(log_level: str):

# interactive configuration
HQPS_ADMIN_SERVICE_PORT = os.environ.get("HIACTOR_ADMIN_SERVICE_PORT", 7777)


# coordinator starting time
COORDINATOR_STARTING_TIME = datetime.datetime.now()
35 changes: 25 additions & 10 deletions flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@
from typing import List, Union

import hqps_client
from hqps_client import (Graph, JobResponse, JobStatus, ModelSchema, Procedure,
SchemaMapping, Service)

from gs_flex_coordinator.core.config import (CLUSTER_TYPE,
HQPS_ADMIN_SERVICE_PORT,
WORKSPACE)
from gs_flex_coordinator.core.utils import encode_datetime, get_internal_ip
from hqps_client import (
Graph,
JobResponse,
JobStatus,
ModelSchema,
Procedure,
SchemaMapping,
Service,
)

from gs_flex_coordinator.core.config import (
CLUSTER_TYPE,
HQPS_ADMIN_SERVICE_PORT,
WORKSPACE,
)
from gs_flex_coordinator.core.utils import (
encode_datetime,
get_internal_ip,
get_public_ip,
)
from gs_flex_coordinator.models import StartServiceRequest

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -130,13 +143,15 @@ def get_service_status(self) -> dict:
response = api_instance.get_service_status()
# transfer
if CLUSTER_TYPE == "HOSTS":
internal_ip = get_internal_ip()
host = get_public_ip()
if host is None:
host = get_internal_ip()
return {
"status": response.status,
"graph_name": response.graph_name,
"sdk_endpoints": {
"cypher": f"neo4j://{internal_ip}:{response.bolt_port}",
"hqps": f"http://{internal_ip}:{response.hqps_port}",
"cypher": f"neo4j://{host}:{response.bolt_port}",
"hqps": f"http://{host}:{response.hqps_port}",
},
}

Expand Down
67 changes: 67 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import random
import socket
import string
from typing import Union

import requests

logger = logging.getLogger("graphscope")

Expand Down Expand Up @@ -72,6 +75,10 @@ def random_string(nlen):
return "".join([random.choice(string.ascii_lowercase) for _ in range(nlen)])


def get_current_time() -> datetime.datetime:
return datetime.datetime.now()


def str_to_bool(s):
if isinstance(s, bool):
return s
Expand All @@ -82,3 +89,63 @@ def get_internal_ip() -> str:
hostname = socket.gethostname()
internal_ip = socket.gethostbyname(hostname)
return internal_ip


def get_public_ip() -> Union[str, None]:
try:
response = requests.get("https://api.ipify.org?format=json")
if response.status_code == 200:
data = response.json()
return data["ip"]
else:
return None
except requests.exceptions.RequestException as e:
logger.warn("Failed to get public ip: %s", str(e))
return None


class GraphInfo(object):
def __init__(
self, name, creation_time, update_time=None, last_dataloading_time=None
):
self._name = name
self._creation_time = creation_time
self._update_time = update_time
if self._update_time is None:
self._update_time = self._creation_time
self._last_dataloading_time = last_dataloading_time

@property
def name(self):
return self._name

@property
def creation_time(self):
return self._creation_time

@property
def update_time(self):
return self._update_time

@property
def last_dataloading_time(self):
return self._last_dataloading_time

@update_time.setter
def update_time(self, new_time):
self._update_time = new_time

@last_dataloading_time.setter
def last_dataloading_time(self, new_time):
if self._last_dataloading_time is None:
self._last_dataloading_time = new_time
elif new_time > self._last_dataloading_time:
self._last_dataloading_time = new_time

def to_dict(self):
return {
"name": self._name,
"creation_time": encode_datetime(self._creation_time),
"update_time": encode_datetime(self._update_time),
"last_dataloading_time": encode_datetime(self._last_dataloading_time),
}
1 change: 1 addition & 0 deletions flex/coordinator/gs_flex_coordinator/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gs_flex_coordinator.models.connection import Connection
from gs_flex_coordinator.models.connection_status import ConnectionStatus
from gs_flex_coordinator.models.deployment_info import DeploymentInfo
from gs_flex_coordinator.models.deployment_info_graphs_info_value import DeploymentInfoGraphsInfoValue
from gs_flex_coordinator.models.deployment_status import DeploymentStatus
from gs_flex_coordinator.models.edge_mapping import EdgeMapping
from gs_flex_coordinator.models.edge_mapping_destination_vertex_mappings_inner import EdgeMappingDestinationVertexMappingsInner
Expand Down
Loading

0 comments on commit 738f645

Please sign in to comment.