Skip to content

Commit

Permalink
Update registration harvester worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mario-winkler committed Oct 7, 2024
1 parent 4e3d624 commit fce4787
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 21 deletions.
42 changes: 37 additions & 5 deletions worker/worker/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import get_type_hints, Union
from dotenv import load_dotenv
from worker.tasks.sentinel import sentinel_check_data, sentinel_log_data
from worker.tasks.sentinel import sentinel_discover_data, sentinel_download_data, sentinel_unzip, sentinel_check_integrity, sentinel_extract_metadata, sentinel_register_metadata

# Load configuration
# The value of a variable is the first of the values found in:
Expand All @@ -26,22 +26,54 @@ class HarvesterConfig:

# Name in lower case to skip mapping of env variables
default_subscriptions = {
"sentinel_check_data": {
"callback_handler": sentinel_check_data,
"sentinel_discover_data": {
"callback_handler": sentinel_discover_data,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1
},
"sentinel_log_data": {
"callback_handler": sentinel_log_data,
"sentinel_download_data": {
"callback_handler": sentinel_download_data,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1
},
"sentinel_unzip": {
"callback_handler": sentinel_unzip,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1
},
"sentinel_check_integrity": {
"callback_handler": sentinel_check_integrity,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1
},
"sentinel_extract_metadata": {
"callback_handler": sentinel_extract_metadata,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1
},
"sentinel_register_metadata": {
"callback_handler": sentinel_register_metadata,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1
},
}

"""
Expand Down
8 changes: 8 additions & 0 deletions worker/worker/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ def configure_logging():
datefmt="%Y-%m-%dT%H:%M:%S"
)

def log_with_job(message, job=None, log_level='info', **kwargs):
log_function = __get_log_function(log_level)

if job is not None:
log_function(f"[BPMN_TASK: {job.element.id}] {message}", **kwargs)
else:
log_function(message, **kwargs)

def log_with_context(message, context=None, log_level='info', **kwargs):
context = context if context is not None else {}
log_function = __get_log_function(log_level)
Expand Down
41 changes: 25 additions & 16 deletions worker/worker/tasks/sentinel.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
from worker.log_utils import configure_logging, log_with_context
from worker.log_utils import configure_logging, log_with_context, log_with_job

configure_logging()

def sentinel_check_data(job, worker_result_builder):
log_context = {
"JOB": job.id,
"PROCESS_INSTANCE": job.process_instance_id,
"TASK": job.element_name,
}
log_with_context("sentinel_check_data", log_context)
def sentinel_discover_data(job, worker_result_builder):
log_with_job(message="Discovering new sentinel data ...", job=job)

# job variables
# for v in job.variables:
Expand All @@ -24,13 +19,27 @@ def sentinel_check_data(job, worker_result_builder):

return result

def sentinel_log_data(job, worker_result_builder):
log_context = {
"JOB": job.id,
"PROCESS_INSTANCE": job.process_instance_id,
"TASK": job.element_name,
}
log_with_context("sentinel_log_data", log_context)

def sentinel_download_data(job, worker_result_builder):
log_with_job(message="Downloading data ...", job=job)
result = worker_result_builder.success()
return result

def sentinel_unzip(job, worker_result_builder):
log_with_job(message="Unzipping ...", job=job)
result = worker_result_builder.success()
return result

def sentinel_check_integrity(job, worker_result_builder):
log_with_job(message="Checking integrity ...", job=job)
result = worker_result_builder.success()
return result

def sentinel_extract_metadata(job, worker_result_builder):
log_with_job(message="Extracting metadata and creating STAC item ...", job=job)
result = worker_result_builder.success()
return result

def sentinel_register_metadata(job, worker_result_builder):
log_with_job(message="Registering STAC item ...", job=job)
result = worker_result_builder.success()
return result
183 changes: 183 additions & 0 deletions workflows/sentinel-registration-hourly.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:flowable="http://flowable.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:design="http://flowable.org/design" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://flowable.org/test" design:palette="flowable-work-process-palette">
<collaboration id="Collaboration">
<participant id="sentinel-scene-registration" name="Sentinel Scene Registration" processRef="sentinelHourly"></participant>
</collaboration>
<process id="sentinelHourly" name="sentinel-hourly" isExecutable="true" flowable:candidateStarterGroups="flowableUser">
<extensionElements>
<design:stencilid><![CDATA[BPMNDiagram]]></design:stencilid>
<design:creationdate><![CDATA[2024-10-07T08:29:30.266Z]]></design:creationdate>
<design:modificationdate><![CDATA[2024-10-07T09:47:20.455Z]]></design:modificationdate>
</extensionElements>
<laneSet id="laneSet_sentinelHourly">
<lane id="bpmnLane_22">
<flowNodeRef>discoverSentinelScenes</flowNodeRef>
<flowNodeRef>bpmnTask_11</flowNodeRef>
<flowNodeRef>bpmnTask_13</flowNodeRef>
<flowNodeRef>bpmnTask_15</flowNodeRef>
<flowNodeRef>bpmnTask_17</flowNodeRef>
<flowNodeRef>bpmnTask_19</flowNodeRef>
<flowNodeRef>startEachHour</flowNodeRef>
<flowNodeRef>bpmnEndEvent_21</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_12</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_14</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_16</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_18</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_20</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_22</flowNodeRef>
<flowNodeRef>bpmnSequenceFlow_3</flowNodeRef>
</lane>
</laneSet>
<serviceTask id="discoverSentinelScenes" name="Discover Sentinel scenes" flowable:type="external-worker" flowable:topic="sentinel_discover_data" flowable:exclusive="false">
<extensionElements>
<design:stencilid><![CDATA[ExternalWorkerTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</serviceTask>
<serviceTask id="bpmnTask_11" name="Download Scene" flowable:type="external-worker" flowable:topic="sentinel_download_data" flowable:exclusive="false">
<extensionElements>
<design:stencilid><![CDATA[ExternalWorkerTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</serviceTask>
<serviceTask id="bpmnTask_13" name="Unzip" flowable:type="external-worker" flowable:topic="sentinel_unzip" flowable:exclusive="false">
<extensionElements>
<design:stencilid><![CDATA[ExternalWorkerTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</serviceTask>
<serviceTask id="bpmnTask_15" name="Check file integrity" flowable:type="external-worker" flowable:topic="sentinel_check_integrity" flowable:exclusive="false">
<extensionElements>
<design:stencilid><![CDATA[ExternalWorkerTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</serviceTask>
<serviceTask id="bpmnTask_17" name="Extract Metadata" flowable:type="external-worker" flowable:topic="sentinel_extract_metadata" flowable:exclusive="false">
<extensionElements>
<design:stencilid><![CDATA[ExternalWorkerTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</serviceTask>
<serviceTask id="bpmnTask_19" name="Register Metadata" flowable:type="external-worker" flowable:topic="sentinel_register_metadata" flowable:exclusive="false">
<extensionElements>
<design:stencilid><![CDATA[ExternalWorkerTask]]></design:stencilid>
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
</extensionElements>
</serviceTask>
<startEvent id="startEachHour" name="Start Each Hour" flowable:initiator="initiator" isInterrupting="true">
<extensionElements>
<flowable:work-form-field-validation><![CDATA[false]]></flowable:work-form-field-validation>
<design:stencilid><![CDATA[StartTimerEvent]]></design:stencilid>
<design:display_ref_in_diagram><![CDATA[true]]></design:display_ref_in_diagram>
</extensionElements>
<timerEventDefinition>
<timeCycle>R/P0Y0M0DT1H0M0S</timeCycle>
</timerEventDefinition>
</startEvent>
<endEvent id="bpmnEndEvent_21">
<extensionElements>
<design:stencilid><![CDATA[EndNoneEvent]]></design:stencilid>
</extensionElements>
</endEvent>
<sequenceFlow id="bpmnSequenceFlow_12" sourceRef="discoverSentinelScenes" targetRef="bpmnTask_11">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_14" sourceRef="bpmnTask_11" targetRef="bpmnTask_13">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_16" sourceRef="bpmnTask_13" targetRef="bpmnTask_15">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_18" sourceRef="bpmnTask_15" targetRef="bpmnTask_17">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_20" sourceRef="bpmnTask_17" targetRef="bpmnTask_19">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_22" sourceRef="bpmnTask_19" targetRef="bpmnEndEvent_21">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
<sequenceFlow id="bpmnSequenceFlow_3" sourceRef="startEachHour" targetRef="discoverSentinelScenes">
<extensionElements>
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
</extensionElements>
</sequenceFlow>
</process>
<bpmndi:BPMNDiagram id="BPMNDiagram_Collaboration">
<bpmndi:BPMNPlane bpmnElement="Collaboration" id="BPMNPlane_Collaboration">
<bpmndi:BPMNShape bpmnElement="sentinel-scene-registration" id="BPMNShape_sentinel-scene-registration">
<omgdc:Bounds height="241.0" width="1095.0" x="153.0" y="199.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnLane_22" id="BPMNShape_bpmnLane_22">
<omgdc:Bounds height="241.0" width="1065.0" x="183.0" y="199.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="discoverSentinelScenes" id="BPMNShape_discoverSentinelScenes">
<omgdc:Bounds height="80.0" width="100.0" x="326.0" y="276.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_11" id="BPMNShape_bpmnTask_11">
<omgdc:Bounds height="80.0" width="100.0" x="469.0" y="276.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_13" id="BPMNShape_bpmnTask_13">
<omgdc:Bounds height="80.0" width="100.0" x="614.0" y="276.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_15" id="BPMNShape_bpmnTask_15">
<omgdc:Bounds height="80.0" width="100.0" x="756.0" y="276.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_17" id="BPMNShape_bpmnTask_17">
<omgdc:Bounds height="80.0" width="100.0" x="897.0" y="276.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnTask_19" id="BPMNShape_bpmnTask_19">
<omgdc:Bounds height="80.0" width="100.0" x="1047.0" y="276.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="startEachHour" id="BPMNShape_startEachHour">
<omgdc:Bounds height="30.0" width="30.0" x="236.0" y="301.0"></omgdc:Bounds>
<bpmndi:BPMNLabel>
<omgdc:Bounds height="18.0" width="95.0" x="212.0" y="340.0"></omgdc:Bounds>
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="bpmnEndEvent_21" id="BPMNShape_bpmnEndEvent_21">
<omgdc:Bounds height="28.0" width="28.0" x="1194.0" y="302.0"></omgdc:Bounds>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_22" id="BPMNEdge_bpmnSequenceFlow_22" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0" flowable:targetDockerX="14.0" flowable:targetDockerY="14.0">
<omgdi:waypoint x="1146.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="1193.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_12" id="BPMNEdge_bpmnSequenceFlow_12" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0" flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="425.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="468.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_14" id="BPMNEdge_bpmnSequenceFlow_14" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0" flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="568.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="613.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_16" id="BPMNEdge_bpmnSequenceFlow_16" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0" flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="713.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="755.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_3" id="BPMNEdge_bpmnSequenceFlow_3" flowable:sourceDockerX="15.0" flowable:sourceDockerY="15.0" flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="265.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="325.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_18" id="BPMNEdge_bpmnSequenceFlow_18" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0" flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="855.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="896.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_20" id="BPMNEdge_bpmnSequenceFlow_20" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0" flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
<omgdi:waypoint x="996.0" y="316.0"></omgdi:waypoint>
<omgdi:waypoint x="1046.0" y="316.0"></omgdi:waypoint>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</definitions>

0 comments on commit fce4787

Please sign in to comment.