Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track each subquery field proc separately #92

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 66 additions & 7 deletions hiku/telemetry/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ def wrapper(*args):
return wrapper


def _subquery_field_names(func):
def wrapper(fields, *args):
return func([f.name for _, f in fields], fields, *args)
return wrapper


class GraphMetricsBase(GraphTransformer):
root_name = 'Root'

Expand Down Expand Up @@ -150,6 +144,12 @@ def visit_link(self, obj):
return obj


def _subquery_field_names(func):
def wrapper(fields, *args):
return func([f.name for _, f in fields], fields, *args)
return wrapper


class _SubqueryMixin:

def subquery_wrapper(self, observe, subquery):
Expand All @@ -161,7 +161,42 @@ def proc_wrapper():
result = result_proc()
observe(start_time, field_names)
return result

return proc_wrapper

return wrapper


class _SubqueryMixinNew:

def subquery_wrapper(self, observe, subquery):
def wrap_proc(field_name, proc):
def _proc_wrapper(*args):
proc_start_time = time.perf_counter()
result = proc(*args)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing start_time from above allows to track subgraph execution + proc execution time.

Copy link
Collaborator Author

@kindermax kindermax Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it makes sense to place start_time = time.perf_counter() inside _proc_wrapper but then we are (probably) loosing subgraph(low-level graph) data exection time

observe(proc_start_time, [f'{field_name}__define'])
return result

return _proc_wrapper

def wrapper(fields, *args):
start_time = time.perf_counter()
wrapped_fields = []
field_names = []
for gf, qf in fields:
gf.func.proc = wrap_proc(gf.name, gf.func.proc)
wrapped_fields.append((gf, qf))
field_names.append(gf.name)

result_proc = subquery(wrapped_fields, *args)

def result_proc_wrapper():
result = result_proc()
return result

observe(start_time, field_names)
return result_proc_wrapper

return wrapper


Expand All @@ -184,6 +219,30 @@ def wrapper(link_name, *args):
return wrapper


class GraphMetricsNew(_SubqueryMixinNew, GraphMetricsBase):
def _wrap_subquery(self, node_name, subquery):
observe = self._observe_fields(node_name)
wrapper = self.subquery_wrapper(observe, subquery)
wrapper.__subquery__ = lambda: wrapper
return wrapper

def field_wrapper(self, observe, func):
def wrapper(field_names, *args):
start_time = time.perf_counter()
result = func(*args)
observe(start_time, field_names)
return result
return wrapper

def link_wrapper(self, observe, func):
def wrapper(link_name, *args):
start_time = time.perf_counter()
result = func(*args)
observe(start_time, [link_name])
return result
return wrapper


class AsyncGraphMetrics(_SubqueryMixin, GraphMetricsBase):

def field_wrapper(self, observe, func):
Expand All @@ -200,4 +259,4 @@ async def wrapper(link_name, *args):
result = await func(*args)
observe(start_time, [link_name])
return result
return wrapper
return wrapper
168 changes: 167 additions & 1 deletion tests/test_telemetry_prometheus.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import time

import faker
import pytest

from prometheus_client import REGISTRY

from hiku import query as q
from hiku.expr.core import S
from hiku.expr.core import define
from hiku.graph import Graph, Node, Field, Link, Root, apply
from hiku.telemetry.prometheus import GraphMetricsNew
from hiku.types import Any
from hiku.types import TypeRef
from hiku.engine import Engine, pass_context
from hiku.sources.graph import SubGraph
from hiku.executors.sync import SyncExecutor
from hiku.executors.asyncio import AsyncIOExecutor
from hiku.telemetry.prometheus import GraphMetrics, AsyncGraphMetrics
from hiku.utils import listify

from tests.base import check_result

Expand All @@ -26,13 +33,25 @@ def graph_name_fixture():
@pytest.fixture(name='sample_count')
def sample_count_fixture(graph_name):
def sample_count(node, field):
return REGISTRY.get_sample_value(
REGISTRY.get_sample_value(
'graph_field_time_count',
dict(graph=graph_name, node=node, field=field),
)
return sample_count


@pytest.fixture(name='sample_sum')
def sample_sum_fixture(graph_name):
def sample_count(node, field):
value = REGISTRY.get_sample_value(
'graph_field_time_sum',
dict(graph=graph_name, node=node, field=field),
)
print('{}.{}, value: {}'.format(node, field, value))
return value
return sample_count


def test_simple_sync(graph_name, sample_count):

def x_fields(fields, ids):
Expand Down Expand Up @@ -176,3 +195,150 @@ def root_fields2(ctx, fields):

assert sample_count('Root', 'a') == 1.0
assert sample_count('Root', 'b') == 1.0


@pytest.mark.parametrize('tracker', [
# old correctly tracks time for y1 if only low level is slow
# but does not track time for x1, x2 separately
# GraphMetrics,
# new correctly tracks time for x1, x2 separately but does not see y1 low level slowness, because y1 proc is just a simple return
# of value from index
GraphMetricsNew
])
def test_track_time(tracker, graph_name, sample_sum):

x1 = 0.12 # 12 + 22 + 52 = 86, because all fields are from LL
# 12 + 32 + 52 = 96
x2 = 0.22 # 34
x3 = 0.32 # 66 # HL only field
y1 = 0.52 # 118, 1.18
y2 = 0.62 # 180

@listify
def x_fields(fields, ids):
"""LL"""
def get_field(f):
if f == 'x1':
return x1
elif f == 'x2':
return x2

for id_ in ids:
yield [get_field(f.name) for f in fields]

@listify
def y_fields(fields, ids):
"""LL"""
def get_field(f):
if f == 'y1':
time.sleep(y1)
return y1
elif f == 'y2':
return y2

for id_ in ids:
yield [get_field(f.name) for f in fields]

def root_fields(fields):
return [1 for _ in fields]

def x_link():
return 2

ll_graph = Graph([
Node('X', [
Field('x1', None, x_fields),
Field('x2', None, x_fields),
]),
Node('Y', [
Field('y1', None, y_fields),
Field('y2', None, y_fields),
]),
])

x_sg = SubGraph(ll_graph, 'X')
y_sg = SubGraph(ll_graph, 'Y')

@define(Any)
def x1_field(val):
"""HL"""
time.sleep(x1)
return val

@define(Any)
def x2_field(val):
"""HL"""
# time.sleep(x2)
return val

@listify
def x3_field(fields, ids):
"""HL"""
def get_field(f):
if f == 'x3_3':
# time.sleep(x3)
return x3

for id_ in ids:
yield [get_field(f.name) for f in fields]

@define(Any)
def y2_field(val):
"""HL"""
# time.sleep(y2)
return val

hl_graph = Graph([
Node('X_h', [
Field('x1_1', None, x_sg.c(x1_field(S.this.x1))),
Field('x2_2', None, x_sg.c(x2_field(S.this.x2))),
# in old tracker x3_3 is the only field that is tracked correctly
# because it not uses subgraph
Field('x3_3', None, x3_field),
Field('y1_1', None, y_sg.c(S.this.y1)),
Field('y2_2', None, y_sg.c(y2_field(S.this.y2))),
]),
Root([
Field('a', None, root_fields),
Link('x', TypeRef['X_h'], x_link, requires=None),
]),
])

hl_graph = apply(hl_graph, [tracker(graph_name)])

result = Engine(SyncExecutor()).execute(hl_graph, q.Node([
# q.Field('a'),
q.Link('x', q.Node([
q.Field('x1_1'),
q.Field('x2_2'),
q.Field('x3_3'),
q.Field('y1_1'),
q.Field('y2_2'),
])),
]))
# check_result(result, {
# 'a': 1,
# 'x': {
# 'x1_1': x1,
# 'x2_2': x2,
# 'x3_3': x3,
# 'y1_1': y1,
# },
# })

print('')
print('Testing with', tracker.__name__)

got_x = sum([
sample_sum('X_h', 'x1_1'),
sample_sum('X_h', 'x2_2'),
sample_sum('X_h', 'x3_3'),
sample_sum('X_h', 'y1_1'),
sample_sum('X_h', 'y2_2')
])
sample_sum('X_h', 'x1_1__define'),
sample_sum('X_h', 'x2_2__define'),
sample_sum('X_h', 'y1_1__define'),
sample_sum('X_h', 'y2_2__define')
print('x total exp', x1 + x2 + x3 + y1 + y2)
print('x total got', got_x)