Skip to content

Commit

Permalink
Load test for writing observations into the datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
rosinaderks committed Dec 21, 2023
1 parent 2480e94 commit 0fb17f8
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
pip install -r load-test/requirements.txt
python -m grpc_tools.protoc --proto_path=datastore/protobuf datastore.proto --python_out=load-test --grpc_python_out=load-test
cd load-test
locust --headless -u 5 -r 1 --run-time 60 --only-summary --csv store
locust -f locustfile_read.py --headless -u 5 -r 1 --run-time 60 --only-summary --csv store
- name: Archive load test artifacts
uses: actions/upload-artifact@v3
Expand Down
72 changes: 72 additions & 0 deletions load-test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Load test datastore

Locust is used for performance testing of the datastore.

## Read test
Two tasks are defined: 1) get_data_for_single_timeserie and 2) get_data_single_station_through_bbox. As it is unclear
how many users the datastore expect, the test is done for 5 users over 60 seconds.

## Write test

### Setup & Data preparation
To resemble the load from all EU partner, we need to multiply KNMI data and increase the input time resolution. For this we
needed to:
* Generate dummy data from the KNMI input data by expanding the 10-minute observations to 5-sec observations.
* Insert the data on a higher temporal resolution

Given that the load test should represent 5-min data for 5000 stations, a rate of 17 requests/sec is needed (5000/(5*60)).
A request contains the observations for all parameters for one station and one timestamp.

Test requirements
* Runtime test = 15min (900s)
* Expected #requests in 15min = 15300 (900*17)
* wait_time between tasks = between 1.5 and 2.5 sec. Resulting in 1 requests per 2 sec per user.
* 35 users should lead to a rate of 17 request/sec, resembling EU coverage.
* User spawn rate = 1 user per sec

### Run locust via web
```text
locust -f load-test/locustfile_write.py
```

### Run locust only via command line
```text
locust -f load-test/locustfile_write.py --headless -u <USERS> -r <SPAWN_RATE> --run-time <RUNTIME> --only-summary --csv store_write
```

### Results
Requests/sec: This is the number of completed requests per second.

| | | | | | |
|-------|------|----------------|----------|-----------------------|-----------------------|
| Users | r/s | Total requests | Failures | Med request time (ms) | Avg request time (ms) |
| 1 | 0.5 | 428 | 0 | 110 | 110 |
| 35 | 15.2 | 13 640 | 0 | 180 | 263 |
| 55 | 17.2 | 15 439 | 0 | 790 | 1104 |



### Read & Write Test
Run the read and write test together to test the load, where the write test will have 7 times more users than the read
test. This is enforced by the weight variable for both user classes.

### Run multiple locust files via web

```text
locust -f load-test/locustfile_write.py,load-test/locustfile_read.py
```

### Run multiple locust files only via command line

```text
locust -f load-test/locustfile_write.py,load-test/locustfile_read.py --headless -u <USERS> -r <SPAWN_RATE> --run-time <RUNTIME> --only-summary --csv store_write_read
```

| | | | | | | |
|-------|-------|------|----------------|----------|-----------------------|-----------------------|
| Test | Users | r/s | Total requests | Failures | Med request time (ms) | Avg request time (ms) |
| Write | 35 | 12.7 | 11423 | 0 | 660 | 696 |
| Read | 5 | 69.0 | 62091 | 0 | 20 | 70 |
| | | | | | | |
| Write | 53 | 14.5 | 13087 | 0 | 1500 | 1522 |
| Read | 7 | 36.4 | 32769 | 0 | 54 | 185 |
1 change: 1 addition & 0 deletions load-test/locustfile.py → load-test/locustfile_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
class StoreGrpcUser(grpc_user.GrpcUser):
host = "localhost:50050"
stub_class = dstore_grpc.DatastoreStub
weight = 1

@task
def get_data_for_single_timeserie(self):
Expand Down
111 changes: 111 additions & 0 deletions load-test/locustfile_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from pathlib import Path

import datastore_pb2 as dstore
import datastore_pb2_grpc as dstore_grpc
import grpc_user
import psycopg2
from locust import between
from locust import events
from locust import task
from netcdf_file_to_requests import generate_dummy_requests_from_netcdf_per_station_per_timestamp


file_path = Path(Path(__file__).parents[1] / "test-data" / "KNMI" / "20230101.nc")

stations = [
"06201",
"06203",
"06204",
"06205",
"06207",
"06208",
"06211",
"06214",
"06215",
"06225",
"06229",
"06235",
"06239",
"06240",
"06242",
"06248",
"06249",
"06251",
"06252",
"06257",
"06258",
"06260",
"06267",
"06269",
"06270",
"06273",
"06275",
"06277",
"06278",
"06279",
"06280",
"06283",
"06286",
"06290",
"06310",
"06317",
"06319",
"06320",
"06321",
"06323",
"06330",
"06340",
"06343",
"06344",
"06348",
"06350",
"06356",
"06370",
"06375",
"06377",
"06380",
"06391",
"78871",
"78873",
"78990",
]


class IngestionGrpcUser(grpc_user.GrpcUser):
host = "localhost:50050"
stub_class = dstore_grpc.DatastoreStub
wait_time = between(1.5, 2.5)
user_nr = 0
dummy_observations_all_stations = generate_dummy_requests_from_netcdf_per_station_per_timestamp(file_path)
weight = 7

def on_start(self):
print(f"User {IngestionGrpcUser.user_nr}")
self.dummy_observations_per_station = IngestionGrpcUser.dummy_observations_all_stations[
IngestionGrpcUser.user_nr
]
IngestionGrpcUser.user_nr += 1
self.index = 0

@task
def ingest_data_per_observation(self):
# 44 observations per task
observations = self.dummy_observations_per_station[self.index]["observations"]
request_messages = dstore.PutObsRequest(observations=observations)
response = self.stub.PutObservations(request_messages)
assert response.status == -1
self.index += 1

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("Cleaning up test data")
conn = psycopg2.connect(
database="data", user="postgres", password="mysecretpassword", host="localhost", port="5433"
)
cursor = conn.cursor()
# delete all details from observations table for date 20230101
sql = """ DELETE FROM observation WHERE extract(YEAR from obstime_instant)::int = 2023 """
cursor.execute(sql)
# Commit your changes in the database
conn.commit()
conn.close()
118 changes: 118 additions & 0 deletions load-test/netcdf_file_to_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import math
import uuid
from datetime import datetime
from datetime import timedelta
from pathlib import Path
from time import perf_counter
from typing import List
from typing import Tuple

import datastore_pb2 as dstore
import pandas as pd
import xarray as xr
from google.protobuf.timestamp_pb2 import Timestamp


knmi_parameter_names = (
"hc3",
"nc2",
"zm",
"R1H",
"hc",
"tgn",
"Tn12",
"pr",
"pg",
"tn",
"rg",
"hc1",
"nc1",
"ts1",
"nc3",
"ts2",
"qg",
"ff",
"ww",
"gff",
"dd",
"td",
"ww-10",
"Tgn12",
"ss",
"Tn6",
"dr",
"rh",
"hc2",
"Tgn6",
"R12H",
"R24H",
"Tx6",
"Tx24",
"Tx12",
"Tgn14",
"D1H",
"R6H",
"pwc",
"tx",
"nc",
"pp",
"Tn14",
"ta",
)


def timerange(start_time, end_time, interval_minutes):
current_time = start_time
while current_time < end_time:
yield current_time
current_time += timedelta(minutes=interval_minutes)


def generate_dummy_requests_from_netcdf_per_station_per_timestamp(file_path: Path | str) -> Tuple[List, List]:
print("Starting with creating the time series and observations requests.")
create_requests_start = perf_counter()
obs_per_station = []

with xr.open_dataset(file_path, engine="netcdf4", chunks=None) as file: # chunks=None to disable dask
# Slice enough timestamps to generate data for the test to run 15 min
time_slice = file.sel(time=slice(datetime(2023, 1, 1, 0, 0, 0), datetime(2023, 1, 1, 1, 1, 0)))
for station_id, latitude, longitude, height in zip(
time_slice["station"].values,
time_slice["lat"].values[0],
time_slice["lon"].values[0],
time_slice["height"].values[0],
):
station_slice = time_slice.sel(station=station_id)
obs_per_timestamp = []
for time in pd.to_datetime(station_slice["time"].data).to_pydatetime():
# Generate 5-sec data from each 10-min observation
for i in range(0, 600, 5): # 5-sec data
obs_per_parameter = []
generated_timestamp = time + timedelta(seconds=i)
ts = Timestamp()
ts.FromDatetime(generated_timestamp)
for param_id in knmi_parameter_names:
param = station_slice[param_id]
obs_value = station_slice[param_id].data[0]
obs_value = 0 if math.isnan(obs_value) else obs_value # dummy data so obs_value doesn't matter
ts_mdata = dstore.TSMetadata(
platform=station_id,
instrument=param_id,
title=param.long_name,
standard_name=param.standard_name if "standard_name" in param.attrs else None,
unit=param.units if "units" in param.attrs else None,
)
obs_mdata = dstore.ObsMetadata(
id=str(uuid.uuid4()),
geo_point=dstore.Point(lat=latitude, lon=longitude),
obstime_instant=ts,
value=str(obs_value),
)
observation = dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata)
obs_per_parameter.append(observation)
obs_per_timestamp.append({"time": generated_timestamp, "observations": obs_per_parameter})
obs_per_station.append(obs_per_timestamp)

print("Finished creating the time series and observation requests " f"{perf_counter() - create_requests_start}.")
print(f"Total number of obs generated per station is {len(obs_per_parameter)*len(obs_per_timestamp)}")
return obs_per_station
1 change: 1 addition & 0 deletions load-test/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ grpcio-tools~=1.56
grpc-interceptor~=0.15.3
locust~=2.16
shapely~=2.0
psycopg2~=2.9
Loading

0 comments on commit 0fb17f8

Please sign in to comment.