Skip to content

Commit

Permalink
Add scheduler based data writer (load test).
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas-phaf committed Jan 16, 2024
1 parent 7fe535b commit 25466eb
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 2 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ jobs:
pip install -r datastore/load-test/requirements.txt
python -m grpc_tools.protoc --proto_path=./protobuf datastore.proto --python_out=datastore/load-test --grpc_python_out=datastore/load-test
cd datastore/load-test
pwd
locust -f locustfile_write.py,locustfile_read.py --headless -u 5 -r 1 --run-time 300 --only-summary --csv store
- name: Archive load test artifacts
Expand Down
1 change: 1 addition & 0 deletions datastore/load-test/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ shapely~=2.0
pandas~=2.1
psycopg2-binary~=2.9
xarray~=2023.12
apscheduler~=3.10
9 changes: 8 additions & 1 deletion datastore/load-test/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --no-emit-index-url
#
apscheduler==3.10.4
# via -r requirements.in
blinker==1.7.0
# via flask
brotli==1.1.0
Expand Down Expand Up @@ -84,7 +86,9 @@ psycopg2-binary==2.9.9
python-dateutil==2.8.2
# via pandas
pytz==2023.3.post1
# via pandas
# via
# apscheduler
# pandas
pyzmq==25.1.2
# via locust
requests==2.31.0
Expand All @@ -95,10 +99,13 @@ shapely==2.0.2
# via -r requirements.in
six==1.16.0
# via
# apscheduler
# geventhttpclient
# python-dateutil
tzdata==2023.4
# via pandas
tzlocal==5.2
# via apscheduler
urllib3==2.1.0
# via requests
werkzeug==3.0.1
Expand Down
102 changes: 102 additions & 0 deletions datastore/load-test/schedule_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import math
import os
import random
import time
import uuid
from collections import namedtuple
from datetime import datetime
from datetime import UTC

import datastore_pb2 as dstore
import datastore_pb2_grpc as dstore_grpc
import grpc
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from google.protobuf.timestamp_pb2 import Timestamp
from variables import variable_info

# from apscheduler.executors.pool import ProcessPoolExecutor


# One channel & client per process (not per thread!)
# TODO: Does this affect load on the database? Seems load here is lower then in Rosina's code
channel = grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}")
client = dstore_grpc.DatastoreStub(channel=channel)


crons = [
(1, "*"), # every minutes
(5, "*/5"), # every 5 minutes, on the five
(5, "1-59/5"), # every 5 minutes, on minute after the five
(5, "2-59/5"),
(5, "3-59/5"),
(5, "4-59/5"),
(10, "*/10"), # every 10 minutes, on the 10
(10, "1-59/10"), # every 10 minutes, on minute after the five
(10, "2-59/10"),
(10, "3-59/10"),
(10, "4-59/10"),
(10, "5-59/10"),
(10, "6-59/10"),
(10, "7-59/10"),
(10, "8-59/10"),
(10, "9-59/10"),
]

vars_per_station = 40 # Should be <=44

Station = namedtuple("Station", "id lat lon period")


def write_data(station):
pub_time = datetime.now(UTC)
# Round observation time to nearest 1, 5, 10 minutes
obs_time = pub_time.replace(minute=int(pub_time.minute / station.period) * station.period, second=0, microsecond=0)
pub_ts = Timestamp()
obs_ts = Timestamp()
pub_ts.FromDatetime(pub_time)
obs_ts.FromDatetime(obs_time)
observations = []
for var in range(0, vars_per_station):
(param_id, long_name, standard_name, unit) = variable_info[var]
ts_mdata = dstore.TSMetadata(
platform=station.id,
instrument=param_id,
title=long_name,
standard_name=standard_name,
unit=unit,
)
obs_mdata = dstore.ObsMetadata(
id=str(uuid.uuid4()),
pubtime=pub_ts,
geo_point=dstore.Point(lat=station.lat, lon=station.lon), # One random per station
obstime_instant=obs_ts,
value=str(math.sin(time.mktime(obs_time.timetuple()) / 36000.0) + 2 * var), # TODO: Make dependent station
)
observations.append(dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata))

request_messages = dstore.PutObsRequest(observations=observations)
response = client.PutObservations(request_messages)
assert response.status == -1


if __name__ == "__main__":
scheduler = BlockingScheduler()
# scheduler.add_executor(ProcessPoolExecutor())
scheduler.add_executor(ThreadPoolExecutor())
print(datetime.now())
for i in range(0, 5000):
(period, cron) = random.choice(crons)
station_id = f"station{i:04d}"
station = Station(station_id, random.uniform(50.0, 55.0), random.uniform(4.0, 8.0), period)
print(station_id, cron, period)
# TODO: Spread less well over time, for example, all use same second, but add jitter < 60
trigger = CronTrigger(minute=cron, second=random.randint(0, 59), jitter=1)
scheduler.add_job(write_data, args=(station,), id=station_id, trigger=trigger)
print("Press Ctrl+{0} to exit".format("Break" if os.name == "nt" else "C"))

try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
46 changes: 46 additions & 0 deletions datastore/load-test/variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
variable_info = [
("hc3", "Cloud Base Third Layer", "cloud_base_altitude", "ft"),
("nc2", "Cloud Amount Second Layer", "cloud_cover", "octa"),
("zm", "Meteorological Optical Range 10 Min Average", "visibility_in_air", "m"),
("R1H", "Rainfall in last Hour", "rainfall_amount", "mm"),
("hc", "Cloud Base", "cloud_base_altitude", "ft"),
("tgn", "Grass Temperature 10cm 10 Min Minimum", "air_temperature", "degrees Celsius"),
("Tn12", "Air Temperature Minimum last 12 Hours", "air_temperature", "degrees Celsius"),
("pr", "Precipitation Duration (PWS), 10 Min Sum", "precipitation_duration", "sec"),
("pg", "Precipitation Intensity (PWS), 10 Min Average", "lwe_precipitation_rate", "mm/h"),
("tn", "Ambient Temperature 1.5m 10 Min Minimum", "air_temperature", "degrees Celsius"),
("rg", "Precipitation Intensity (Rain Gauge), 10 Min Average", "precipitation_rate", "mm/h"),
("hc1", "Cloud Base First Layer", "cloud_base_altitude", "ft"),
("nc1", "Cloud Amount First Layer", "cloud_cover", "octa"),
("ts1", "Number of Lightning Discharges at Station", "Lightning on-site", "Number"),
("nc3", "Cloud Amount Third Layer", "cloud_cover", "octa"),
("ts2", "Number of Lightning Discharges near Station", "Lightning nearby", "Number"),
("qg", "Global Solar Radiation 10 Min Average", "total_downwelling_shortwave_flux_in_air", "W m-2"),
("ff", "Wind Speed at 10m 10 Min Average", "wind_speed", "m s-1"),
("ww", "wawa Weather Code", "None", "code"),
("gff", "Wind Gust at 10m 10 Min Maximum", "wind_speed_of_gust", "m s-1"),
("dd", "Wind Direction 10 Min Average", "wind_from_direction", "degree"),
("td", "Dew Point Temperature 1.5m 1 Min Average", "dew_point_temperature", "degrees Celsius"),
("ww-10", "wawa Weather Code for Previous 10 Min Interval", "None", "code"),
("Tgn12", "Grass Temperature Minimum last 12 Hours", "air_temperature", "degrees Celsius"),
("ss", "Sunshine Duration", "duration_of_sunshine", "min"),
("Tn6", "Air Temperature Minimum last 6 Hours", "air_temperature", "degrees Celsius"),
("dr", "Precipitation Duration (Rain Gauge), 10 Min Sum", "precipitation_duration", "sec"),
("rh", "Relative Humidity 1 Min Average", "relative_humidity", "%"),
("hc2", "Cloud Base Second Layer", "cloud_base_altitude", "ft"),
("Tgn6", "Grass Temperature Minimum last 6 Hours", "air_temperature", "degrees Celsius"),
("R12H", "Rainfall in last 12 Hours", "rainfall_amount", "mm"),
("R24H", "Rainfall in last 24 Hours", "rainfall_amount", "mm"),
("Tx6", "Air Temperature Maximum last 6 Hours", "air_temperature", "degrees Celsius"),
("Tx24", "Air Temperature Maximum last 24 Hours", "air_temperature", "degrees Celsius"),
("Tx12", "Air Temperature Maximum last 12 Hours", "air_temperature", "degrees Celsius"),
("Tgn14", "Grass Temperature Minimum last 14 Hours", "air_temperature", "degrees Celsius"),
("D1H", "Rainfall Duration in last Hour", "rainfall_duration", "min"),
("R6H", "Rainfall in last 6 Hours", "rainfall_amount", "mm"),
("pwc", "Present Weather", "None", "code"),
("tx", "Ambient Temperature 1.5m 10 Min Maximum", "air_temperature", "degrees Celsius"),
("nc", "Total cloud cover", "cloud_cover", "octa"),
("pp", "Air Pressure at Sea Level 1 Min Average", "air_pressure_at_sea_level", "hPa"),
("Tn14", "Air Temperature Minimum last 14 Hours", "air_temperature", "degrees Celsius"),
("ta", "Air Temperature 1 Min Average", "air_temperature", "degrees Celsius"),
]

0 comments on commit 25466eb

Please sign in to comment.