Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load test for writing observations into the datastore #2

Merged
merged 29 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ca358d0
Load test for writing observations into the datastore
rosinaderks Dec 21, 2023
e00c55c
Fix pre-commit complaint and improve naming
rosinaderks Dec 21, 2023
b7de325
Simplefy data generation for load testing and expand README
rosinaderks Dec 22, 2023
2d838f2
Simplefy data generation for load testing and expand README
rosinaderks Dec 22, 2023
09d49cb
Update the requirements so that I can run the load tests.
Jeffrey-Vervoort-KNMI Jan 5, 2024
86acae6
Update the Locust command in github ci to test read and write.
Jeffrey-Vervoort-KNMI Jan 5, 2024
3a7f49e
Don't use same value for all time points.
lukas-phaf Jan 9, 2024
c2c7d7f
Load test support for setting up message "per variable".
lukas-phaf Jan 9, 2024
4dd9320
Moved load-test images
jo-asplin-met-no Jan 10, 2024
92c75c2
Fix data path.
lukas-phaf Jan 16, 2024
8108b9b
Debugging...
lukas-phaf Jan 16, 2024
d2a711b
Try to fix path. Also regenerate requirements.txt with Python 3.11.
lukas-phaf Jan 16, 2024
7b47e28
Fix path (again).
lukas-phaf Jan 16, 2024
0275360
Add scheduler based data writer (load test).
lukas-phaf Jan 16, 2024
b6d1880
Add name to jobs.
lukas-phaf Jan 17, 2024
8622b5c
Remove locustfile.py (something went wrong when renaming it to locust…
lukas-phaf Jan 17, 2024
4230ad3
Set up seperate load tests, one that only reads, and one that reads w…
lukas-phaf Jan 17, 2024
147f7f3
Less output...
lukas-phaf Jan 18, 2024
93ebaac
Flush output?
lukas-phaf Jan 18, 2024
2fe0626
Flush output 2.
lukas-phaf Jan 18, 2024
a4f033a
Add to README.
lukas-phaf Jan 18, 2024
07cf8f3
Long, high load test run.
lukas-phaf Jan 18, 2024
5a8ce29
Long, high load test run.
lukas-phaf Jan 23, 2024
3d8ecb0
Drop old writing load test. Clarify TODO.
lukas-phaf Jan 31, 2024
cb61709
Cleanup of readme after dropping the locust load test.
Jeffrey-Vervoort-KNMI Jan 31, 2024
13fb1c0
Updates to README.
lukas-phaf Feb 1, 2024
2ca219d
Small fix README.
lukas-phaf Feb 1, 2024
5dff489
Disable broken test-ingest test.
lukas-phaf Feb 1, 2024
68dacb8
Clean up requirements for load-test.
lukas-phaf Feb 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 52 additions & 34 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,55 +58,73 @@ jobs:
- name: Test client runs without errors
run: DYNAMICTIME=false LOTIME=1000-01-01T00:00:00Z HITIME=9999-12-31T23:59:59Z docker compose run --rm client

- name: Run load test
- name: Run load test (read only)
run: |
python --version
pip install -r datastore/load-test/requirements.txt
python -m grpc_tools.protoc --proto_path=./protobuf datastore.proto --python_out=datastore/load-test --grpc_python_out=datastore/load-test
cd datastore/load-test
locust --headless -u 5 -r 1 --run-time 60 --only-summary --csv store
locust -f locustfile_read.py --headless -u 5 -r 10 --run-time 60 --only-summary --csv store_read

- name: Run load test (write + read)
run: |
pip install -r datastore/load-test/requirements.txt
python -m grpc_tools.protoc --proto_path=./protobuf datastore.proto --python_out=datastore/load-test --grpc_python_out=datastore/load-test
cd datastore/load-test
python schedule_write.py > schedule_write.log 2>&1 &
locust -f locustfile_read.py --headless -u 5 -r 10 --run-time 60 --only-summary --csv store_rw
kill %1
echo Catting schedule_write output...
cat schedule_write.log
echo Done catting

- name: Archive load test artifacts
uses: actions/upload-artifact@v3
with:
name: performance
path: datastore/load-test/store_*.csv
path: |
datastore/load-test/store_read_*.csv
datastore/load-test/store_rw_*.csv

- name: Print results
run: |
pip install csvkit
echo "## Stats" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_stats.csv >> $GITHUB_STEP_SUMMARY
echo "## Stats history" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_stats_history.csv >> $GITHUB_STEP_SUMMARY
echo "## Failures" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_failures.csv >> $GITHUB_STEP_SUMMARY
echo "## Stats (READ ONLY)" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_read_stats.csv >> $GITHUB_STEP_SUMMARY
echo "## Failures (READ ONLY)" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_read_failures.csv >> $GITHUB_STEP_SUMMARY
echo "## Stats (WRITE + READ)" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_rw_stats.csv >> $GITHUB_STEP_SUMMARY
echo "## Failures (WRITE + READ)" >> $GITHUB_STEP_SUMMARY
csvlook datastore/load-test/store_rw_failures.csv >> $GITHUB_STEP_SUMMARY

- name: Cleanup
if: always()
run: docker compose down --volumes
test-ingest:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"] # Add 3.11 back pybind11 bug is fixed
steps:
- name: Checkout the repo
uses: actions/checkout@v3
- name: Ubuntu setup
run: sudo apt update && sudo apt install libeccodes-data rapidjson-dev pybind11-dev libssl-dev
- name: Python Setup
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
- name: Checkout Source
uses: actions/checkout@v3
- name: Install Dependencies
run: |
pip install --upgrade pip
pip install pytest-timeout
pip install pytest-cov
pip install ./ingest
- name: Run Tests
run: python -m pytest -v --timeout=60 ./ingest

# TODO: These tests don't currently work. Uncomment once this is resolved.
# test-ingest:
# runs-on: ubuntu-latest
# strategy:
# matrix:
# python-version: ["3.10"] # Add 3.11 back pybind11 bug is fixed
# steps:
# - name: Checkout the repo
# uses: actions/checkout@v3
# - name: Ubuntu setup
# run: sudo apt update && sudo apt install libeccodes-data rapidjson-dev pybind11-dev libssl-dev
# - name: Python Setup
# uses: actions/setup-python@v4
# with:
# python-version: ${{ matrix.python-version }}
# architecture: x64
# - name: Checkout Source
# uses: actions/checkout@v3
# - name: Install Dependencies
# run: |
# pip install --upgrade pip
# pip install pytest-timeout
# pip install pytest-cov
# pip install ./ingest
# - name: Run Tests
# run: python -m pytest -v --timeout=60 ./ingest
42 changes: 42 additions & 0 deletions datastore/load-test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Load test datastore


## Read test
Locust is used for read performance testing of the datastore.

Two tasks are defined: 1) `get_data_for_single_timeserie` and 2) `get_data_single_station_through_bbox`.
Each user does one task, as soon as the query completes, it does another.
We found that the maximum total throughput is reached with about 5 users.

### Locust Commands
#### Run locust via web
```shell
locust -f load-test/locustfile_read.py
```

#### Run locust only via command line
```shell
locust -f load-test/locustfile_read.py --headless -u <USERS> -r <SPAWN_RATE> --run-time <RUNTIME> --only-summary --csv store_write
```

## Write test

### Load Estimation
The expected load of the E-SOH system is data every 5 minutes for 5000 stations,
which gives a rate of 17 requests/sec (12*5000/3600).

### Write data using apscheduler
[Advanced Python Scheduler](https://apscheduler.readthedocs.io/en/3.x/) is a package that can be
used to schedule a large amount of jobs.

We represent each station by an apscheduler job,
which is scheduled to send data for all variables of that station once every 1, 5 or 10 minutes (randomly chosen).
This roughly represents the expected load of the E-SOH system of all EU partners.
The timestamps in the data correspond to actual clock time, and the data values are randomly chosen.
The setup will continue processing data until stopped.
This will allow testing of the cleanup functionality of the datastore.

To manually run the data writer, do the following:
```shell
python schedule_write.py
```
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from shapely import buffer
from shapely import wkt


parameters = ["ff", "dd", "rh", "pp", "tn"]
# fmt: off
stations = [
Expand Down
5 changes: 5 additions & 0 deletions datastore/load-test/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@
grpcio-tools~=1.56
grpc-interceptor~=0.15.3
locust~=2.16
netCDF4~=1.6
shapely~=2.0
pandas~=2.1
psycopg2-binary~=2.9
xarray~=2023.12
apscheduler~=3.10
75 changes: 54 additions & 21 deletions datastore/load-test/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
#
# pip-compile --no-emit-index-url
#
blinker==1.6.3
apscheduler==3.10.4
# via -r requirements.in
blinker==1.7.0
# via flask
brotli==1.1.0
# via geventhttpclient
certifi==2023.7.22
certifi==2023.11.17
# via
# geventhttpclient
# netcdf4
# requests
charset-normalizer==3.3.0
cftime==1.6.3
# via netcdf4
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via flask
Expand All @@ -33,54 +38,82 @@ gevent==23.9.1
# locust
geventhttpclient==2.0.11
# via locust
greenlet==3.0.0
greenlet==3.0.3
# via gevent
grpc-interceptor==0.15.3
grpc-interceptor==0.15.4
# via -r requirements.in
grpcio==1.59.0
grpcio==1.60.0
# via
# grpc-interceptor
# grpcio-tools
grpcio-tools==1.59.0
grpcio-tools==1.60.0
# via -r requirements.in
idna==3.4
idna==3.6
# via requests
itsdangerous==2.1.2
# via flask
jinja2==3.1.2
jinja2==3.1.3
# via flask
locust==2.17.0
locust==2.20.1
# via -r requirements.in
markupsafe==2.1.3
# via
# jinja2
# werkzeug
msgpack==1.0.7
# via locust
numpy==1.26.0
# via shapely
protobuf==4.24.4
netcdf4==1.6.5
# via -r requirements.in
numpy==1.26.3
# via
# cftime
# netcdf4
# pandas
# shapely
# xarray
packaging==23.2
# via xarray
pandas==2.1.4
# via
# -r requirements.in
# xarray
protobuf==4.25.2
# via grpcio-tools
psutil==5.9.5
psutil==5.9.7
# via locust
pyzmq==25.1.1
psycopg2-binary==2.9.9
# via -r requirements.in
python-dateutil==2.8.2
# via pandas
pytz==2023.3.post1
# via
# apscheduler
# pandas
pyzmq==25.1.2
# via locust
requests==2.31.0
# via locust
roundrobin==0.0.4
# via locust
shapely==2.0.1
shapely==2.0.2
# via -r requirements.in
six==1.16.0
# via geventhttpclient
typing-extensions==4.8.0
# via locust
urllib3==2.0.6
# via
# apscheduler
# geventhttpclient
# python-dateutil
tzdata==2023.4
# via pandas
tzlocal==5.2
# via apscheduler
urllib3==2.1.0
# via requests
werkzeug==3.0.0
werkzeug==3.0.1
# via
# flask
# locust
xarray==2023.12.0
# via -r requirements.in
zope-event==5.0
# via gevent
zope-interface==6.1
Expand Down
102 changes: 102 additions & 0 deletions datastore/load-test/schedule_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import math
import os
import random
import time
import uuid
from collections import namedtuple
from datetime import datetime
from datetime import UTC

import datastore_pb2 as dstore
import datastore_pb2_grpc as dstore_grpc
import grpc
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from google.protobuf.timestamp_pb2 import Timestamp
from variables import variable_info

# from apscheduler.executors.pool import ProcessPoolExecutor


# TODO: We use one channel & client per process (not per thread!). Check if this limits performance!
# Want to try with one channel per metoffice (so split over 20 channels)
# See also the fourth bullet here: https://grpc.io/docs/guides/performance/
channel = grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}")
client = dstore_grpc.DatastoreStub(channel=channel)
Jeffrey-Vervoort-KNMI marked this conversation as resolved.
Show resolved Hide resolved


crons = [
(1, "*"), # every minutes
(5, "*/5"), # every 5 minutes, on the five
(5, "1-59/5"), # every 5 minutes, on minute after the five
(5, "2-59/5"),
(5, "3-59/5"),
(5, "4-59/5"),
(10, "*/10"), # every 10 minutes, on the 10
(10, "1-59/10"), # every 10 minutes, on minute after the five
(10, "2-59/10"),
(10, "3-59/10"),
(10, "4-59/10"),
(10, "5-59/10"),
(10, "6-59/10"),
(10, "7-59/10"),
(10, "8-59/10"),
(10, "9-59/10"),
]

vars_per_station = 40 # Should be <=44

Station = namedtuple("Station", "id lat lon period")


def write_data(station):
pub_time = datetime.now(UTC)
# Round observation time to nearest 1, 5, 10 minutes
obs_time = pub_time.replace(minute=int(pub_time.minute / station.period) * station.period, second=0, microsecond=0)
pub_ts = Timestamp()
obs_ts = Timestamp()
pub_ts.FromDatetime(pub_time)
obs_ts.FromDatetime(obs_time)
observations = []
for var in range(0, vars_per_station):
(param_id, long_name, standard_name, unit) = variable_info[var]
ts_mdata = dstore.TSMetadata(
platform=station.id,
instrument=param_id,
title=long_name,
standard_name=standard_name,
unit=unit,
)
obs_mdata = dstore.ObsMetadata(
id=str(uuid.uuid4()),
pubtime=pub_ts,
geo_point=dstore.Point(lat=station.lat, lon=station.lon), # One random per station
obstime_instant=obs_ts,
value=str(math.sin(time.mktime(obs_time.timetuple()) / 36000.0) + 2 * var), # TODO: Make dependent station
)
observations.append(dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata))

request_messages = dstore.PutObsRequest(observations=observations)
response = client.PutObservations(request_messages)
assert response.status == -1


if __name__ == "__main__":
scheduler = BlockingScheduler()
# scheduler.add_executor(ProcessPoolExecutor())
scheduler.add_executor(ThreadPoolExecutor())
print(f"Now: {datetime.now()}", flush=True)
for i in range(0, 5000):
(period, cron) = random.choice(crons)
station_id = f"station{i:04d}"
station = Station(station_id, random.uniform(50.0, 55.0), random.uniform(4.0, 8.0), period)
# print(station_id, cron, period)
# TODO: Spread less well over time, for example, all use same second, but add jitter < 60
trigger = CronTrigger(minute=cron, second=random.randint(0, 59), jitter=1)
Jeffrey-Vervoort-KNMI marked this conversation as resolved.
Show resolved Hide resolved
scheduler.add_job(write_data, args=(station,), id=station_id, name=station_id, trigger=trigger)

try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
print("Shutting down...", flush=True)
Loading
Loading