Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] implement vald benchmark #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine/clients/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader
from engine.clients.redis import RedisConfigurator, RedisSearcher, RedisUploader
from engine.clients.vald import ValdConfigurator, ValdSearcher, ValdUploader
from engine.clients.weaviate import (
WeaviateConfigurator,
WeaviateSearcher,
Expand All @@ -33,6 +34,7 @@
"elastic": ElasticConfigurator,
"opensearch": OpenSearchConfigurator,
"redis": RedisConfigurator,
"vald": ValdConfigurator,
}

ENGINE_UPLOADERS = {
Expand All @@ -42,6 +44,7 @@
"elastic": ElasticUploader,
"opensearch": OpenSearchUploader,
"redis": RedisUploader,
"vald": ValdUploader,
}

ENGINE_SEARCHERS = {
Expand All @@ -51,6 +54,7 @@
"elastic": ElasticSearcher,
"opensearch": OpenSearchSearcher,
"redis": RedisSearcher,
"vald": ValdSearcher,
}


Expand Down
3 changes: 3 additions & 0 deletions engine/clients/vald/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from engine.clients.vald.configure import ValdConfigurator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9% of developers fix this issue

F401: 'engine.clients.vald.configure.ValdConfigurator' imported but unused

❗❗ 3 similar findings have been found in this PR

🔎 Expand here to view all instances of this finding
File Path Line Number
engine/clients/vald/init.py 2
engine/clients/vald/init.py 3
engine/clients/vald/parser.py 1

Visit the Lift Web Console to find more details in your report.


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.

from engine.clients.vald.search import ValdSearcher
from engine.clients.vald.upload import ValdUploader
Empty file added engine/clients/vald/config.py
Empty file.
42 changes: 42 additions & 0 deletions engine/clients/vald/configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import datetime
import kubernetes as k8s
import yaml
from benchmark.dataset import Dataset
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance


class ValdConfigurator(BaseConfigurator):
DISTANCE_MAPPING = {
Distance.L2: "L2",
Distance.DOT: "COS",
Distance.COSINE: "COS",
}

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)

k8s.config.load_kube_config(connection_params["kubeconfig"])
with open(collection_params["base_config"]) as f:
self.base_config = yaml.safe_load(f)

def clean(self):
pass

def recreate(self, dataset: Dataset, collection_params):
api_client = k8s.client.ApiClient()

ngt_config = collection_params["ngt_config"] | {
"dimension": dataset.config.vector_size,
"distance_type": self.DISTANCE_MAPPING[dataset.config.distance]
}
core_api = k8s.client.CoreV1Api(api_client)
core_api.patch_namespaced_config_map("vald-agent-ngt-config", "default", body={"data":{"config.yaml":yaml.safe_dump(self.base_config|{'ngt': ngt_config})}})

apps_api = k8s.client.AppsV1Api(api_client)
apps_api.patch_namespaced_stateful_set("vald-agent-ngt", "default", body={"spec":{"template":{"metadata":{"annotations":{"reloaded-at":datetime.datetime.now().isoformat()}}}}})

w = k8s.watch.Watch()
for event in w.stream(apps_api.list_namespaced_stateful_set, namespace='default', label_selector='app=vald-agent-ngt', timeout_seconds=30):
if event['object'].status.available_replicas is not None and event['object'].status.available_replicas != 0:
w.stop()
17 changes: 17 additions & 0 deletions engine/clients/vald/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Any, List, Optional
from engine.base_client import IncompatibilityError
from engine.base_client.parser import BaseConditionParser, FieldValue


class ValdParser(BaseConditionParser):
def build_condition(self, and_subfilters: List[Any] | None, or_subfilters: List[Any] | None) -> Any | None:
raise IncompatibilityError

def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any:
raise IncompatibilityError

def build_range_filter(self, field_name: str, lt: FieldValue | None, gt: FieldValue | None, lte: FieldValue | None, gte: FieldValue | None) -> Any:
raise IncompatibilityError

def build_geo_filter(self, field_name: str, lat: float, lon: float, radius: float) -> Any:
raise IncompatibilityError
28 changes: 28 additions & 0 deletions engine/clients/vald/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import List, Tuple
import grpc
from engine.base_client.search import BaseSearcher
from vald.v1.vald import search_pb2_grpc
from vald.v1.payload import payload_pb2


class ValdSearcher(BaseSearcher):
cfg: payload_pb2.Search.Config = None
stub: search_pb2_grpc.SearchStub = None

@classmethod
def init_client(
cls, host: str, distance, connection_params: dict, search_params: dict
):
grpc_opts = map(lambda x: tuple(x), connection_params["grpc_opts"])
channel = grpc.insecure_channel(f"{host}:31081", grpc_opts)
cls.stub = search_pb2_grpc.SearchStub(channel)
cls.cfg = payload_pb2.Search.Config(**search_params['search_params'])

@classmethod
def search_one(
cls, vector: List[float], meta_conditions, top: int | None
) -> List[Tuple[int, float]]:
cfg = cls.cfg
cfg.num = top
res = cls.stub.Search(payload_pb2.Search.Request(vector=vector, config=cfg))
return [(int(r.id), r.distance) for r in res.results]
50 changes: 50 additions & 0 deletions engine/clients/vald/upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import grpc
import urllib
from typing import List
from engine.base_client.upload import BaseUploader
from vald.v1.agent.core import agent_pb2_grpc
from vald.v1.vald import insert_pb2_grpc
from vald.v1.payload import payload_pb2


class ValdUploader(BaseUploader):
@classmethod
def init_client(cls, host, distance, connection_params: dict, upload_params: dict):
grpc_opts = map(lambda x: tuple(x), connection_params["grpc_opts"])
cls.channel = grpc.insecure_channel(f"{host}:31081", grpc_opts)
cls.icfg = payload_pb2.Insert.Config(**upload_params["insert_config"])
cls.acfg = payload_pb2.Control.CreateIndexRequest(**upload_params["index_config"])

while True:
try:
with urllib.request.urlopen(f"http://{host}:31001/readiness") as response:
if response.getcode() == 200:
break
except (urllib.error.HTTPError, urllib.error.URLError):
pass

@classmethod
def upload_batch(
cls, ids: List[int], vectors: List[list], metadata: List[dict | None]
):
requests = [
payload_pb2.Insert.Request(
vector=payload_pb2.Object.Vector(id=str(i), vector=v), config=cls.icfg
)
for i, v in zip(ids, vectors)
]
istub = insert_pb2_grpc.InsertStub(cls.channel)
for _ in istub.StreamInsert(iter(requests)):
pass

@classmethod
def post_upload(cls, distance):
astub = agent_pb2_grpc.AgentStub(cls.channel)
astub.CreateIndex(cls.acfg)
return {}

@classmethod
def delete_client(cls):
if cls.channel != None:
cls.channel.close()
del cls.channel
1 change: 1 addition & 0 deletions engine/servers/vald-single-node/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kubeconfig.yaml
61 changes: 61 additions & 0 deletions engine/servers/vald-single-node/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
version: '3.7'

services:
k3s_server:
image: "rancher/k3s:${K3S_VERSION:-latest}"
command: server
tmpfs:
- /run
- /var/run
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
privileged: true
environment:
- K3S_TOKEN=${K3S_TOKEN:-vald}
- K3S_KUBECONFIG_OUTPUT=/output/kubeconfig.yaml
- K3S_KUBECONFIG_MODE=666
volumes:
- k3s-server:/var/lib/rancher/k3s
- .:/output
ports:
- 6443:6443
- 80:80
- 443:443
healthcheck:
test: [ "CMD-SHELL", "kubectl wait --for=condition=Ready nodes --all --timeout=30s" ]
timeout: 30s

k3s_agent:
image: "rancher/k3s:${K3S_VERSION:-latest}"
command: agent
tmpfs:
- /run
- /var/run
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
privileged: true
environment:
- K3S_URL=https://k3s_server:6443
- K3S_TOKEN=${K3S_TOKEN:-vald}
ports:
- 31081:31081
- 31001:31001

install:
image: "rancher/k3s:${K3S_VERSION:-latest}"
entrypoint: ["kubectl", "apply", "-f=/root/resources.yaml", "--kubeconfig=/root/kubeconfig.yaml"]
depends_on:
k3s_server:
condition: service_healthy
network_mode: "host"
volumes:
- ./:/root

volumes:
k3s-server: {}
Loading