Skip to content

Commit

Permalink
[resotocore][feat] Provide possible values for attributes and values (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Nov 3, 2023
1 parent 69b12a0 commit b8c516c
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 9 deletions.
78 changes: 77 additions & 1 deletion resotocore/resotocore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
from collections import defaultdict
from textwrap import dedent
from typing import Union, List, Tuple, Any, Optional, Dict, Set
from typing import Union, List, Tuple, Any, Optional, Dict, Set, Literal

from arango.typings import Json
from attrs import evolve
Expand Down Expand Up @@ -751,6 +751,82 @@ def ft_term(cursor: str, ab_term: Term) -> str:
return resulting_cursor, query_str


def possible_values(
db: Any,
query: QueryModel,
path_or_predicate: Union[str, Predicate],
detail: Literal["attributes", "values"],
limit: Optional[int] = None,
skip: Optional[int] = None,
) -> Tuple[str, Json]:
path = path_or_predicate if isinstance(path_or_predicate, str) else path_or_predicate.name
counters: Dict[str, int] = defaultdict(lambda: 0)

def next_counter(name: str) -> int:
count = counters[name]
counters[name] = count + 1
return count

def next_crs(name: str = "m") -> str:
return f"{name}{next_counter(name)}"

bind_vars: Json = {}
start = f"`{db.vertex_name}`"
cursor, query_str = query_string(db, query.query, query, start, False, bind_vars, counters, id_column="_key")

# iterate over the result
let_cursor = next_crs()
query_str += f" LET {let_cursor} = ("
next_cursor = next_crs()
query_str += f" FOR {next_cursor} in {cursor}"
cursor = next_cursor

# expand array paths
ars = [a.lstrip(".") for a in array_marker_in_path_regexp.split(path)]
prop_name = None if path.endswith("[]") or path.endswith("[*]") else ars.pop()
for ar in ars:
nxt_crs = next_crs()
query_str += f" FOR {nxt_crs} IN TO_ARRAY({cursor}.{ar})"
cursor = nxt_crs
access_path = f"{cursor}.{prop_name}" if prop_name is not None else cursor

# access the detail
if detail == "attributes":
cursor = next_crs()
query_str += (
f" FILTER IS_OBJECT({access_path}) FOR {cursor} IN ATTRIBUTES({access_path}, true) RETURN {cursor})"
)
elif detail == "values":
query_str += f" RETURN {access_path})"
else:
raise AttributeError(f"Unknown detail: {detail}")

# result stream of matching entries: filter and sort
sorted_let = next_crs()
next_cursor = next_crs()
query_str += f" LET {sorted_let} = (FOR {next_cursor} IN {let_cursor} FILTER {next_cursor}!=null"
cursor = next_cursor
if isinstance(path_or_predicate, Predicate):
p: Predicate = path_or_predicate
bvn = f'b{next_counter("bind_vars")}'
prop = query.model.property_by_path(Section.without_section(p.name))
pk = prop.kind
op = lgt_ops[p.op] if prop.simple_kind.reverse_order and p.op in lgt_ops else p.op
bind_vars[bvn] = [pk.coerce(a) for a in p.value] if isinstance(p.value, list) else pk.coerce(p.value)
if op == "=~": # use regex_test to do case-insensitive matching
query_str += f" FILTER REGEX_TEST({cursor}, @{bvn}, true)"
else:
query_str += f" FILTER {cursor} {op} @{bvn}"
query_str += f" RETURN DISTINCT {cursor})"
cursor = sorted_let
next_cursor = next_crs()
query_str += f"FOR {next_cursor} IN {cursor} SORT {next_cursor} ASC"
if limit:
query_str += f" LIMIT {skip if skip else 0}, {limit}"
query_str += f" RETURN {next_cursor}"
return query_str, bind_vars


async def query_cost(graph_db: Any, model: QueryModel, with_edges: bool) -> EstimatedSearchCost:
q_string, bind = to_query(graph_db, model, with_edges=with_edges)
nr_nodes = await graph_db.db.count(graph_db.vertex_name)
Expand Down
48 changes: 47 additions & 1 deletion resotocore/resotocore/db/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
TypeVar,
cast,
AsyncIterator,
Literal,
Union,
)

from aiostream import stream
Expand Down Expand Up @@ -51,7 +53,7 @@
synthetic_metadata_kinds,
)
from resotocore.model.resolve_in_graph import NodePath, GraphResolver
from resotocore.query.model import Query, FulltextTerm, MergeTerm, P
from resotocore.query.model import Query, FulltextTerm, MergeTerm, P, Predicate
from resotocore.report import ReportSeverity
from resotocore.types import JsonElement, EdgeType
from resotocore.util import first, value_in_path_get, utc_str, uuid_str, value_in_path, json_hash, set_value_in_path
Expand Down Expand Up @@ -160,6 +162,19 @@ async def search_history(
) -> AsyncCursorContext:
pass

@abstractmethod
async def list_possible_values(
self,
query: QueryModel,
path_or_predicate: Union[str, Predicate],
part: Literal["attributes", "values"],
limit: Optional[int] = None,
skip: Optional[int] = None,
with_count: bool = False,
timeout: Optional[timedelta] = None,
) -> AsyncCursorContext:
pass

@abstractmethod
async def search_graph_gen(
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None
Expand Down Expand Up @@ -528,6 +543,25 @@ async def by_id_with(self, db: AsyncArangoDBBase, node_id: NodeId) -> Optional[J
with await db.aql(query=self.query_node_by_id(), bind_vars={"rid": node_id}) as cursor:
return cursor.next() if not cursor.empty() else None

async def list_possible_values(
self,
query: QueryModel,
path_or_predicate: Union[str, Predicate],
part: Literal["attributes", "values"],
limit: Optional[int] = None,
skip: Optional[int] = None,
with_count: bool = False,
timeout: Optional[timedelta] = None,
) -> AsyncCursorContext:
q_string, bind = arango_query.possible_values(self, query, path_or_predicate, part, limit, skip)
return await self.db.aql_cursor(
query=q_string,
count=with_count,
bind_vars=bind,
batch_size=10000,
ttl=cast(Number, int(timeout.total_seconds())) if timeout else None,
)

async def search_list(
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any
) -> AsyncCursorContext:
Expand Down Expand Up @@ -1496,6 +1530,18 @@ async def abort_update(self, batch_id: str) -> None:
await self.real.abort_update(batch_id)
await self.event_sender.core_event(CoreEvent.BatchUpdateAborted, {"graph": self.graph_name, "batch": info})

async def list_possible_values(
self,
query: QueryModel,
path_or_predicate: Union[str, Predicate],
part: Literal["attributes", "values"],
limit: Optional[int] = None,
skip: Optional[int] = None,
with_count: bool = False,
timeout: Optional[timedelta] = None,
) -> AsyncCursorContext:
return await self.real.list_possible_values(query, path_or_predicate, part, limit, skip, with_count, timeout)

async def search_list(
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any
) -> AsyncCursorContext:
Expand Down
123 changes: 123 additions & 0 deletions resotocore/resotocore/static/api-doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,129 @@ paths:
application/x-ndjson:
schema:
$ref: "#/components/schemas/Aggregated"
/graph/{graph_id}/property/attributes:
post:
summary: "Search the graph and return all possible attribute names for given property path."
tags:
- graph_search
parameters:
- name: graph_id
in: path
example: resoto
description: "The identifier of the graph"
required: true
schema:
type: string
- name: prop
in: query
example: |
tags
description: "The property path to search for with an optional predicate"
required: true
schema:
type: string
- name: section
in: query
description: "The name of the section used for all property paths. If not defined root is assumed."
required: false
schema:
type: string
enum:
- reported
- desired
- metadata
- name: count
in: query
description: "Optional parameter to get a Ck-Element-Count header which returns the number of returned json elements"
required: false
schema:
type: boolean
default: true
requestBody:
description: "The search to perform"
content:
text/plain:
schema:
type: string
example: is(graph_root) and reported.name=="root" -->
responses:
"200":
description: "The result of this search in the defined format"
content:
"application/json":
schema:
type: array
example: |
[
"owner",
"checksum/secret",
"prometheus.io/path",
]
items:
type: string
/graph/{graph_id}/property/values:
post:
summary: "Search the graph and return all possible attribute values for given property path."
tags:
- graph_search
parameters:
- name: graph_id
in: path
example: resoto
description: "The identifier of the graph"
required: true
schema:
type: string
- name: prop
in: query
example: |
tags
description: "The property path to search for with an optional predicate"
required: true
schema:
type: string
- name: section
in: query
description: "The name of the section used for all property paths. If not defined root is assumed."
required: false
schema:
type: string
enum:
- reported
- desired
- metadata
- name: count
in: query
description: "Optional parameter to get a Ck-Element-Count header which returns the number of returned json elements"
required: false
schema:
type: boolean
default: true
requestBody:
description: "The search to perform"
content:
text/plain:
schema:
type: string
example: is(graph_root) and reported.name=="root" -->
responses:
"200":
description: "The result of this search in the defined format"
content:
"application/json":
schema:
type: array
example: |
[
"owner",
"checksum/secret",
"prometheus.io/path",
]
items:
type: string



# endregion

# region events
Expand Down
28 changes: 26 additions & 2 deletions resotocore/resotocore/web/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Callable,
Awaitable,
Iterable,
Literal,
)
from urllib.parse import urlencode, urlparse, parse_qs, urlunparse

Expand Down Expand Up @@ -58,7 +59,6 @@

from resotocore.analytics import AnalyticsEvent
from resotocore.cli.command import alias_names
from resotocore.dependencies import Dependencies, TenantDependencies
from resotocore.cli.model import (
ParsedCommandLine,
CLIContext,
Expand All @@ -73,6 +73,8 @@
from resotocore.console_renderer import ConsoleColorSystem, ConsoleRenderer
from resotocore.db.graphdb import GraphDB, HistoryChange
from resotocore.db.model import QueryModel
from resotocore.dependencies import Dependencies, TenantDependencies
from resotocore.dependencies import TenantDependencyProvider
from resotocore.error import NotFoundError, NotEnoughPermissions
from resotocore.ids import (
TaskId,
Expand All @@ -91,6 +93,8 @@
from resotocore.model.json_schema import json_schema
from resotocore.model.model import Kind, Model
from resotocore.model.typed_model import to_json, from_js, to_js_str, to_js
from resotocore.query.model import Predicate, PathRoot, variable_to_absolute
from resotocore.query.query_parser import predicate_term
from resotocore.report import Benchmark, ReportCheck
from resotocore.service import Service
from resotocore.task.model import Subscription
Expand All @@ -99,7 +103,6 @@
from resotocore.util import uuid_str, force_gen, rnd_str, if_set, duration, utc_str, parse_utc, async_noop, utc
from resotocore.web.auth import raw_jwt_from_auth_message, LoginWithCode, AuthHandler
from resotocore.web.content_renderer import result_binary_gen, single_result
from resotocore.dependencies import TenantDependencyProvider
from resotocore.web.directives import (
metrics_handler,
error_handler,
Expand Down Expand Up @@ -226,6 +229,8 @@ def __add_routes(self, prefix: str) -> None:
web.post(prefix + "/graph/{graph_id}/search/aggregate", require(self.query_aggregation, r)),
web.post(prefix + "/graph/{graph_id}/search/history/list", require(self.query_history, r)),
web.post(prefix + "/graph/{graph_id}/search/history/aggregate", require(self.query_history, r)),
web.post(prefix + "/graph/{graph_id}/property/attributes", require(self.possible_values, r)),
web.post(prefix + "/graph/{graph_id}/property/values", require(self.possible_values, r)),
# maintain the graph
web.patch(prefix + "/graph/{graph_id}/nodes", require(self.update_nodes, a)),
web.post(prefix + "/graph/{graph_id}/merge", require(self.merge_graph, a)),
Expand Down Expand Up @@ -1016,6 +1021,25 @@ async def explain(self, request: Request, deps: TenantDependencies) -> StreamRes
result = await graph_db.explain(query_model)
return web.json_response(to_js(result))

async def possible_values(self, request: Request, deps: TenantDependencies) -> StreamResponse:
graph_db, query_model = await self.graph_query_model_from_request(request, deps)
section = section_of(request)
detail: Literal["attributes", "values"] = "attributes" if request.path.endswith("attributes") else "values"
root_or_section = None if section is None or section == PathRoot else section
fn = partial(variable_to_absolute, root_or_section)
prop = request.query["prop"] # fail if not provided
limit = if_set(request.query.get("limit"), int)
skip = if_set(request.query.get("skip"), int)
count = request.query.get("count", "true").lower() != "false"
try:
prop_or_predicate: Union[Predicate, str] = predicate_term.parse(prop).change_variable(fn)
except Exception:
prop_or_predicate = fn(prop)
async with await graph_db.list_possible_values(
query_model, prop_or_predicate, detail, limit, skip, count
) as cursor:
return await self.stream_response_from_gen(request, cursor, cursor.count())

async def query_structure(self, request: Request, deps: TenantDependencies) -> StreamResponse:
_, query_model = await self.graph_query_model_from_request(request, deps)
return web.json_response(query_model.query.structure())
Expand Down
Loading

0 comments on commit b8c516c

Please sign in to comment.