From ed1015af3a0214b76bb8275a6028187ea6b1688e Mon Sep 17 00:00:00 2001 From: Lukas Phaf Date: Fri, 12 Jan 2024 12:02:02 +0100 Subject: [PATCH] (NOT IN NEW REPO) Add scheduler based writer. --- .../storagebackend/postgresql/postgresql.go | 4 +- datastore/load-test/schedule_write.py | 104 ++++++++++++++++++ datastore/load-test/variables.py | 46 ++++++++ 3 files changed, 152 insertions(+), 2 deletions(-) create mode 100644 datastore/load-test/schedule_write.py create mode 100644 datastore/load-test/variables.py diff --git a/datastore/datastore/storagebackend/postgresql/postgresql.go b/datastore/datastore/storagebackend/postgresql/postgresql.go index 3dbb4d3..facbe0b 100644 --- a/datastore/datastore/storagebackend/postgresql/postgresql.go +++ b/datastore/datastore/storagebackend/postgresql/postgresql.go @@ -30,8 +30,8 @@ func initCleanupInterval() { name, val0, defaultVal) val = defaultVal } - - cleanupInterval = time.Duration(val) + //cleanupInterval = time.Duration(val) + cleanupInterval = time.Duration(val) * time.Second } func init() { // automatically called once on program startup (on first import of this package) diff --git a/datastore/load-test/schedule_write.py b/datastore/load-test/schedule_write.py new file mode 100644 index 0000000..a6b59e4 --- /dev/null +++ b/datastore/load-test/schedule_write.py @@ -0,0 +1,104 @@ +import math +import os +import random +import time +import uuid +from collections import namedtuple +from datetime import datetime +from datetime import UTC + +import datastore_pb2 as dstore +import datastore_pb2_grpc as dstore_grpc +import grpc +from apscheduler.executors.pool import ThreadPoolExecutor +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger +from google.protobuf.timestamp_pb2 import Timestamp +from variables import variable_info + +# from apscheduler.executors.pool import ProcessPoolExecutor + + +# One channel & client per process (not per thread!) +# TODO: Does this affect load on the databse? Seems load here is lower then in Rosina's code +channel = grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") +client = dstore_grpc.DatastoreStub(channel=channel) + + +crons = [ + (1, "*"), # every minutes + (5, "*/5"), # every 5 minutes, on the five + (5, "1-59/5"), # every 5 minutes, on minute after the five + (5, "2-59/5"), + (5, "3-59/5"), + (5, "4-59/5"), + (10, "*/10"), # every 5 minutes, on the five + (10, "1-59/10"), # every 10 minutes, on minute after the five + (10, "2-59/10"), + (10, "3-59/10"), + (10, "4-59/10"), + (10, "5-59/10"), + (10, "6-59/10"), + (10, "7-59/10"), + (10, "8-59/10"), + (10, "9-59/10"), +] + +# count = 0 + +vars_per_station = 40 # Should be <=44 + +Station = namedtuple("Station", "id lat lon period") + + +def write_data(station): + pub_time = datetime.now(UTC) + # Round observation time to nearest 1, 5, 10 minutes + obs_time = pub_time.replace(minute=int(pub_time.minute / station.period) * station.period, second=0, microsecond=0) + pub_ts = Timestamp() + obs_ts = Timestamp() + pub_ts.FromDatetime(pub_time) + obs_ts.FromDatetime(obs_time) + observations = [] + for var in range(0, vars_per_station): + (param_id, long_name, standard_name, unit) = variable_info[var] + ts_mdata = dstore.TSMetadata( + platform=station.id, + instrument=param_id, + title=long_name, + standard_name=standard_name, + unit=unit, + ) + obs_mdata = dstore.ObsMetadata( + id=str(uuid.uuid4()), + pubtime=pub_ts, + geo_point=dstore.Point(lat=station.lat, lon=station.lon), # One random per station + obstime_instant=obs_ts, + value=str(math.sin(time.mktime(obs_time.timetuple()) / 36000.0) + 2 * var), # TODO: Make dependent station + ) + observations.append(dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata)) + + request_messages = dstore.PutObsRequest(observations=observations) + response = client.PutObservations(request_messages) + assert response.status == -1 + + +if __name__ == "__main__": + scheduler = BlockingScheduler() + # scheduler.add_executor(ProcessPoolExecutor()) + scheduler.add_executor(ThreadPoolExecutor()) + print(datetime.now()) + for i in range(0, 5000): + (period, cron) = random.choice(crons) + station_id = f"station{i:04d}" + station = Station(station_id, random.uniform(50.0, 55.0), random.uniform(4.0, 8.0), period) + print(station_id, cron, period) + # TODO: Spread less well over time, for example, all use same second, but add jitter < 60 + trigger = CronTrigger(minute=cron, second=random.randint(0, 59), jitter=1) + scheduler.add_job(write_data, args=(station,), id=station_id, trigger=trigger) + print("Press Ctrl+{0} to exit".format("Break" if os.name == "nt" else "C")) + + try: + scheduler.start() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/datastore/load-test/variables.py b/datastore/load-test/variables.py new file mode 100644 index 0000000..1659968 --- /dev/null +++ b/datastore/load-test/variables.py @@ -0,0 +1,46 @@ +variable_info = [ + ("hc3", "Cloud Base Third Layer", "cloud_base_altitude", "ft"), + ("nc2", "Cloud Amount Second Layer", "cloud_cover", "octa"), + ("zm", "Meteorological Optical Range 10 Min Average", "visibility_in_air", "m"), + ("R1H", "Rainfall in last Hour", "rainfall_amount", "mm"), + ("hc", "Cloud Base", "cloud_base_altitude", "ft"), + ("tgn", "Grass Temperature 10cm 10 Min Minimum", "air_temperature", "degrees Celsius"), + ("Tn12", "Air Temperature Minimum last 12 Hours", "air_temperature", "degrees Celsius"), + ("pr", "Precipitation Duration (PWS), 10 Min Sum", "precipitation_duration", "sec"), + ("pg", "Precipitation Intensity (PWS), 10 Min Average", "lwe_precipitation_rate", "mm/h"), + ("tn", "Ambient Temperature 1.5m 10 Min Minimum", "air_temperature", "degrees Celsius"), + ("rg", "Precipitation Intensity (Rain Gauge), 10 Min Average", "precipitation_rate", "mm/h"), + ("hc1", "Cloud Base First Layer", "cloud_base_altitude", "ft"), + ("nc1", "Cloud Amount First Layer", "cloud_cover", "octa"), + ("ts1", "Number of Lightning Discharges at Station", "Lightning on-site", "Number"), + ("nc3", "Cloud Amount Third Layer", "cloud_cover", "octa"), + ("ts2", "Number of Lightning Discharges near Station", "Lightning nearby", "Number"), + ("qg", "Global Solar Radiation 10 Min Average", "total_downwelling_shortwave_flux_in_air", "W m-2"), + ("ff", "Wind Speed at 10m 10 Min Average", "wind_speed", "m s-1"), + ("ww", "wawa Weather Code", "None", "code"), + ("gff", "Wind Gust at 10m 10 Min Maximum", "wind_speed_of_gust", "m s-1"), + ("dd", "Wind Direction 10 Min Average", "wind_from_direction", "degree"), + ("td", "Dew Point Temperature 1.5m 1 Min Average", "dew_point_temperature", "degrees Celsius"), + ("ww-10", "wawa Weather Code for Previous 10 Min Interval", "None", "code"), + ("Tgn12", "Grass Temperature Minimum last 12 Hours", "air_temperature", "degrees Celsius"), + ("ss", "Sunshine Duration", "duration_of_sunshine", "min"), + ("Tn6", "Air Temperature Minimum last 6 Hours", "air_temperature", "degrees Celsius"), + ("dr", "Precipitation Duration (Rain Gauge), 10 Min Sum", "precipitation_duration", "sec"), + ("rh", "Relative Humidity 1 Min Average", "relative_humidity", "%"), + ("hc2", "Cloud Base Second Layer", "cloud_base_altitude", "ft"), + ("Tgn6", "Grass Temperature Minimum last 6 Hours", "air_temperature", "degrees Celsius"), + ("R12H", "Rainfall in last 12 Hours", "rainfall_amount", "mm"), + ("R24H", "Rainfall in last 24 Hours", "rainfall_amount", "mm"), + ("Tx6", "Air Temperature Maximum last 6 Hours", "air_temperature", "degrees Celsius"), + ("Tx24", "Air Temperature Maximum last 24 Hours", "air_temperature", "degrees Celsius"), + ("Tx12", "Air Temperature Maximum last 12 Hours", "air_temperature", "degrees Celsius"), + ("Tgn14", "Grass Temperature Minimum last 14 Hours", "air_temperature", "degrees Celsius"), + ("D1H", "Rainfall Duration in last Hour", "rainfall_duration", "min"), + ("R6H", "Rainfall in last 6 Hours", "rainfall_amount", "mm"), + ("pwc", "Present Weather", "None", "code"), + ("tx", "Ambient Temperature 1.5m 10 Min Maximum", "air_temperature", "degrees Celsius"), + ("nc", "Total cloud cover", "cloud_cover", "octa"), + ("pp", "Air Pressure at Sea Level 1 Min Average", "air_pressure_at_sea_level", "hPa"), + ("Tn14", "Air Temperature Minimum last 14 Hours", "air_temperature", "degrees Celsius"), + ("ta", "Air Temperature 1 Min Average", "air_temperature", "degrees Celsius"), +]