Skip to content

Commit

Permalink
(NOT IN NEW REPO) Add scheduler based writer.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas-phaf committed Jan 12, 2024
1 parent b48cd12 commit ed1015a
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 2 deletions.
4 changes: 2 additions & 2 deletions datastore/datastore/storagebackend/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func initCleanupInterval() {
name, val0, defaultVal)
val = defaultVal
}

cleanupInterval = time.Duration(val)
//cleanupInterval = time.Duration(val)
cleanupInterval = time.Duration(val) * time.Second
}

func init() { // automatically called once on program startup (on first import of this package)
Expand Down
104 changes: 104 additions & 0 deletions datastore/load-test/schedule_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import math
import os
import random
import time
import uuid
from collections import namedtuple
from datetime import datetime
from datetime import UTC

import datastore_pb2 as dstore
import datastore_pb2_grpc as dstore_grpc
import grpc
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from google.protobuf.timestamp_pb2 import Timestamp
from variables import variable_info

# from apscheduler.executors.pool import ProcessPoolExecutor


# One channel & client per process (not per thread!)
# TODO: Does this affect load on the databse? Seems load here is lower then in Rosina's code
channel = grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}")
client = dstore_grpc.DatastoreStub(channel=channel)


crons = [
(1, "*"), # every minutes
(5, "*/5"), # every 5 minutes, on the five
(5, "1-59/5"), # every 5 minutes, on minute after the five
(5, "2-59/5"),
(5, "3-59/5"),
(5, "4-59/5"),
(10, "*/10"), # every 5 minutes, on the five
(10, "1-59/10"), # every 10 minutes, on minute after the five
(10, "2-59/10"),
(10, "3-59/10"),
(10, "4-59/10"),
(10, "5-59/10"),
(10, "6-59/10"),
(10, "7-59/10"),
(10, "8-59/10"),
(10, "9-59/10"),
]

# count = 0

vars_per_station = 40 # Should be <=44

Station = namedtuple("Station", "id lat lon period")


def write_data(station):
pub_time = datetime.now(UTC)
# Round observation time to nearest 1, 5, 10 minutes
obs_time = pub_time.replace(minute=int(pub_time.minute / station.period) * station.period, second=0, microsecond=0)
pub_ts = Timestamp()
obs_ts = Timestamp()
pub_ts.FromDatetime(pub_time)
obs_ts.FromDatetime(obs_time)
observations = []
for var in range(0, vars_per_station):
(param_id, long_name, standard_name, unit) = variable_info[var]
ts_mdata = dstore.TSMetadata(
platform=station.id,
instrument=param_id,
title=long_name,
standard_name=standard_name,
unit=unit,
)
obs_mdata = dstore.ObsMetadata(
id=str(uuid.uuid4()),
pubtime=pub_ts,
geo_point=dstore.Point(lat=station.lat, lon=station.lon), # One random per station
obstime_instant=obs_ts,
value=str(math.sin(time.mktime(obs_time.timetuple()) / 36000.0) + 2 * var), # TODO: Make dependent station
)
observations.append(dstore.Metadata1(ts_mdata=ts_mdata, obs_mdata=obs_mdata))

request_messages = dstore.PutObsRequest(observations=observations)
response = client.PutObservations(request_messages)
assert response.status == -1


if __name__ == "__main__":
scheduler = BlockingScheduler()
# scheduler.add_executor(ProcessPoolExecutor())
scheduler.add_executor(ThreadPoolExecutor())
print(datetime.now())
for i in range(0, 5000):
(period, cron) = random.choice(crons)
station_id = f"station{i:04d}"
station = Station(station_id, random.uniform(50.0, 55.0), random.uniform(4.0, 8.0), period)
print(station_id, cron, period)
# TODO: Spread less well over time, for example, all use same second, but add jitter < 60
trigger = CronTrigger(minute=cron, second=random.randint(0, 59), jitter=1)
scheduler.add_job(write_data, args=(station,), id=station_id, trigger=trigger)
print("Press Ctrl+{0} to exit".format("Break" if os.name == "nt" else "C"))

try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
46 changes: 46 additions & 0 deletions datastore/load-test/variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
variable_info = [
("hc3", "Cloud Base Third Layer", "cloud_base_altitude", "ft"),
("nc2", "Cloud Amount Second Layer", "cloud_cover", "octa"),
("zm", "Meteorological Optical Range 10 Min Average", "visibility_in_air", "m"),
("R1H", "Rainfall in last Hour", "rainfall_amount", "mm"),
("hc", "Cloud Base", "cloud_base_altitude", "ft"),
("tgn", "Grass Temperature 10cm 10 Min Minimum", "air_temperature", "degrees Celsius"),
("Tn12", "Air Temperature Minimum last 12 Hours", "air_temperature", "degrees Celsius"),
("pr", "Precipitation Duration (PWS), 10 Min Sum", "precipitation_duration", "sec"),
("pg", "Precipitation Intensity (PWS), 10 Min Average", "lwe_precipitation_rate", "mm/h"),
("tn", "Ambient Temperature 1.5m 10 Min Minimum", "air_temperature", "degrees Celsius"),
("rg", "Precipitation Intensity (Rain Gauge), 10 Min Average", "precipitation_rate", "mm/h"),
("hc1", "Cloud Base First Layer", "cloud_base_altitude", "ft"),
("nc1", "Cloud Amount First Layer", "cloud_cover", "octa"),
("ts1", "Number of Lightning Discharges at Station", "Lightning on-site", "Number"),
("nc3", "Cloud Amount Third Layer", "cloud_cover", "octa"),
("ts2", "Number of Lightning Discharges near Station", "Lightning nearby", "Number"),
("qg", "Global Solar Radiation 10 Min Average", "total_downwelling_shortwave_flux_in_air", "W m-2"),
("ff", "Wind Speed at 10m 10 Min Average", "wind_speed", "m s-1"),
("ww", "wawa Weather Code", "None", "code"),
("gff", "Wind Gust at 10m 10 Min Maximum", "wind_speed_of_gust", "m s-1"),
("dd", "Wind Direction 10 Min Average", "wind_from_direction", "degree"),
("td", "Dew Point Temperature 1.5m 1 Min Average", "dew_point_temperature", "degrees Celsius"),
("ww-10", "wawa Weather Code for Previous 10 Min Interval", "None", "code"),
("Tgn12", "Grass Temperature Minimum last 12 Hours", "air_temperature", "degrees Celsius"),
("ss", "Sunshine Duration", "duration_of_sunshine", "min"),
("Tn6", "Air Temperature Minimum last 6 Hours", "air_temperature", "degrees Celsius"),
("dr", "Precipitation Duration (Rain Gauge), 10 Min Sum", "precipitation_duration", "sec"),
("rh", "Relative Humidity 1 Min Average", "relative_humidity", "%"),
("hc2", "Cloud Base Second Layer", "cloud_base_altitude", "ft"),
("Tgn6", "Grass Temperature Minimum last 6 Hours", "air_temperature", "degrees Celsius"),
("R12H", "Rainfall in last 12 Hours", "rainfall_amount", "mm"),
("R24H", "Rainfall in last 24 Hours", "rainfall_amount", "mm"),
("Tx6", "Air Temperature Maximum last 6 Hours", "air_temperature", "degrees Celsius"),
("Tx24", "Air Temperature Maximum last 24 Hours", "air_temperature", "degrees Celsius"),
("Tx12", "Air Temperature Maximum last 12 Hours", "air_temperature", "degrees Celsius"),
("Tgn14", "Grass Temperature Minimum last 14 Hours", "air_temperature", "degrees Celsius"),
("D1H", "Rainfall Duration in last Hour", "rainfall_duration", "min"),
("R6H", "Rainfall in last 6 Hours", "rainfall_amount", "mm"),
("pwc", "Present Weather", "None", "code"),
("tx", "Ambient Temperature 1.5m 10 Min Maximum", "air_temperature", "degrees Celsius"),
("nc", "Total cloud cover", "cloud_cover", "octa"),
("pp", "Air Pressure at Sea Level 1 Min Average", "air_pressure_at_sea_level", "hPa"),
("Tn14", "Air Temperature Minimum last 14 Hours", "air_temperature", "degrees Celsius"),
("ta", "Air Temperature 1 Min Average", "air_temperature", "degrees Celsius"),
]

0 comments on commit ed1015a

Please sign in to comment.