Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ping scenario #115

Merged
merged 12 commits into from
Sep 20, 2024
111 changes: 109 additions & 2 deletions tests/mindwm_bdd/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,27 @@
from messages import DataTable
from kubetest import utils as kubetest_utils
from kubetest import condition
import json
import requests
import time
from opentelemetry.proto.trace.v1 import trace_pb2
from google.protobuf.json_format import ParseDict
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
from functools import wraps
import inspect
import nats_reader
from queue import Empty

@pytest.fixture
def ctx():
return {}
@pytest.fixture
def cloudevent():
return {}
@pytest.fixture
def trace_data():
return {}

@scenario('lifecycle.feature','Validate Mindwm custom resource definitions')
def test_scenario():
Expand Down Expand Up @@ -55,7 +72,7 @@ def mindwm_environment(kube):


for plural in ["xcontexts", "xhosts", "xusers"]:
utils.custom_object_wait_for(kube, 'mindwm.io', 'v1beta1', plural)
utils.custom_object_plural_wait_for(kube, 'mindwm.io', 'v1beta1', plural)
kube.get_custom_objects(group = 'mindwm.io', version = 'v1beta1', plural = plural, all_namespaces = True)
with allure.step(f"Mindwm crd '{plural}' is exists"):
pass
Expand Down Expand Up @@ -316,6 +333,97 @@ def nats_stream_exists(kube, nats_stream_name, namespace):
pass
assert(is_ready == 'True')

@when("God creates a new cloudevent")
def cloudevent_new(cloudevent):
cloudevent = {}

@when("sets cloudevent \"{key}\" to \"{value}\"")
def cloudevent_header_set(cloudevent, key, value):
cloudevent[key] = value


@when("sends cloudevent to \"{broker_name}\" in \"{namespace}\" namespace")
def event_send_ping(kube, cloudevent, broker_name, namespace):

ingress_host = utils.get_lb(kube)
url = f"http://{ingress_host}/{namespace}/{broker_name}"

headers = {
"Host": "broker-ingress.knative-eventing.svc.cluster.local",
"Content-Type": "application/json",
"traceparent": cloudevent['traceparent'],
"Ce-specversion": "1.0",
"Ce-id": cloudevent['ce-id'],
"ce-source": cloudevent['ce-source'],
"ce-subject": cloudevent['ce-subject'],
"ce-type": cloudevent['ce-type']
}
payload = {
"input": cloudevent["ce-source"],
"output": "",
"ps1": "❯",
"type": cloudevent['ce-type']
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 202, f"Unexpected status code: {response.status_code}"

pass

@then("the trace with \"{traceparent}\" should appear in TraceQL")
def tracesql_get_trace(kube, traceparent, trace_data):
ingress_host = utils.get_lb(kube)
trace_id = utils.extract_trace_id(traceparent)
url = f"http://{ingress_host}/api/traces/{trace_id}"
headers = {
"Host": "tempo.mindwm.local"
}
time.sleep(5)
response = requests.get(url, headers = headers)
assert response.status_code == 200, f"Response code {response.status_code} != 200"
tempo_reply = json.loads(response.text)
trace_resourceSpans = {
"resourceSpans": tempo_reply['batches']
}
trace_data['data'] = ParseDict(trace_resourceSpans, trace_pb2.TracesData())

@then("the trace should contains")
def trace_should_contains(step, trace_data):
#pprint.pprint(f"TRACE DATA = {trace_data['data']}")
title_row, *rows = step.data_table.rows
for row in rows:
service_name = row.cells[0].value
#http_code = row.cells[1].value
#http_path = row.cells[2].value
#pprint.pprint(f"{service_name} {http_code} {http_path}")
scope_span = utils.span_by_service_name(trace_data['data'], service_name)
assert(scope_span is not None)
span = utils.parse_resourceSpan(scope_span)
assert(span is not None)
assert(span['service_name'] == service_name)
# assert(span['http_code'] == http_code)
# assert(span['http_path'] == http_path)
pass

@then("a cloudevent with type == \"{cloudevent_type}\" should have been received from the NATS topic")
def cloudevent_check(cloudevent_type):
time.sleep(5)
message_queue = nats_reader.message_queue
while True:
try:
message = message_queue.get(timeout=1)
event = json.loads(message)
if (event['type'] == cloudevent_type):
return True
message_queue.task_done()
except Empty:
break

assert False, f"no pong in nats"

@when("God starts reading message from NATS topic \"{nats_topic_name}\"")
def nats_message_receive(kube, nats_topic_name):
ingress_host = utils.get_lb(kube)
nats_reader.main(f"nats://root:r00tpass@{ingress_host}:4222", nats_topic_name)

@then("following deployments is in ready state in \"{namespace}\" namespace")
def deployment_ready(kube, step, namespace):
Expand All @@ -332,4 +440,3 @@ def pytest_collection_modifyitems(config: pytest.Config, items: List[pytest.Item
for item in items:
item.add_marker(pytest.mark.namespace(create = False, name = "default"))


53 changes: 53 additions & 0 deletions tests/mindwm_bdd/features/3_mindwm_ping_pong.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
@ping_pong
Feature: MindWM Ping-pong EDA test
Background:
Given A MindWM environment
Then all nodes in Kubernetes are ready

Scenario: Ping <context>

When God creates a MindWM context with the name "<context>"
Then the context should be ready and operable
Then following knative service is in ready state in "context-<context>" namespace
| Knative service name |
| pong |

When God creates a MindWM user resource with the name "<username>" and connects it to the context "<context>"
Then the user resource should be ready and operable

When God creates a MindWM host resource with the name "<host>" and connects it to the user "<username>"
Then the host resource should be ready and operable

When God starts reading message from NATS topic "user-<username>.<host>-host-broker-kne-trigger._knative"
When God creates a new cloudevent
And sets cloudevent "ce-id" to "<cloudevent_id>"
And sets cloudevent "traceparent" to "<traceparent>"
And sets cloudevent "ce-subject" to "#ping"
And sets cloudevent "ce-source" to "org.mindwm.<username>.<host>.L3RtcC90bXV4LTEwMDAvZGVmYXVsdA==.09fb195c-c419-6d62-15e0-51b6ee990922.23.36.iodocument"
And sets cloudevent "ce-type" to "org.mindwm.v1.iodocument"
And sends cloudevent to "context-broker" in "context-<context>" namespace
Then the trace with "<traceparent>" should appear in TraceQL
And the trace should contains
| service name |
| broker-ingress.knative-eventing |
| unknown_service |
| jetstream-ch-dispatcher |
And a cloudevent with type == "org.mindwm.v1.pong" should have been received from the NATS topic

Examples:
| context | username | host | cloudevent_id | traceparent |
| green4 | amanda4 | pi6-host | 442af213-c860-4535-b639-355f13b2d443 | 00-5df92f3577b34da6a3ce929d0e0e4734-00f067aa0ba902b7-00 |


Scenario: Cleanup <username>@<host> in <context>
When God deletes the MindWM host resource "<host>"
Then the host "<host>" should be deleted

When God deletes the MindWM user resource "<username>"

When God deletes the MindWM context resource "<context>"

Examples:
| context | username | host |
| green4 | amanda4 | pi6-host |

63 changes: 63 additions & 0 deletions tests/mindwm_bdd/nats_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
import threading
import sys
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from queue import Queue, Empty

# Configuration
message_queue = Queue()

async def subscribe_to_nats(nats_url, topic: str):
nc = NATS()

try:
# Attempt to connect to the NATS server
await nc.connect(servers=[nats_url])
print(f"Connected to NATS server at {nats_url}")

except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
return

async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
message_queue.put(data)
print(f"Received a message on '{subject}': {data}")

# Subscribe to the specified topic with the message handler
try:
print(f"Subscribed to topic '{topic}'")
await nc.subscribe(topic, cb=message_handler)
except Exception as e:
print(f"Failed to subscribe to topic '{topic}': {e}")
await nc.close()
return

# Keep the connection alive to listen for messages
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Subscription task cancelled.")
finally:
await nc.close()
print("NATS connection closed.")

def run_nats_subscriber(nats_url, topic):

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
loop.run_until_complete(subscribe_to_nats(nats_url, topic))
finally:
loop.close()

def main(nats_url: str, topic: str):
subscriber_thread = threading.Thread(target=run_nats_subscriber, daemon=True, args=(nats_url, topic,))
subscriber_thread.start()


1 change: 1 addition & 0 deletions tests/mindwm_bdd/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[pytest]
asyncio_default_fixture_loop_scope = "module"
pythonpath = . custom_objects
markers =
crd
Loading
Loading