Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Ingestion Service C4 #49

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions source/all.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ On the right is the Central E-SOH API Endpoint. In the middle are all the compon

=== Component Design

==== Data Ingestion Service

The Ingestion Service handles the incoming data and this component sends the message to the Notification Service.

[#img-component-diagram-ingest]
.C4 component diagram for the Ingestion Service.
plantuml::puml/c4-container-ingest.puml[format=png, alt="C4 Ingest Service coontainer diangram"]

The data processing chain

* Input handler: grab the data and send it to the Input decoder
* Input decoder: call the right decoder for the input, send message to the Message Handler
** BUFR decoder: decode data, map data to cf_variables, add missing metadata(platform, location) from OSCAR, generate message
** CSV decoder: not yet implemented
** NetCDF decoder: extraxt data, map data to cf_variables, generate message
* Message handler: validate messages and send to the db and to the Notification Service
** MQTT Meta: add/update the meta information of the message(id, pubtime, data_id, metadata_id)
** MQTT Validator: validate the messages
** DB Ingestion: ingest to the db, and trigger the message sending
** MQTT Message Sender: send message to Notification Service if stored

==== Notification Service

Based on investigations documented in the link:https://github.com/EURODEO/e-soh-poc-report[E-SOH PoC Report], RabbitMQ is the technology choice for the E-SOH Notification Service. The main drawback is that it does not support MQTT V5 but we expect that this is coming.
Expand Down
85 changes: 85 additions & 0 deletions source/puml/c4-container-ingest.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@

@startuml "c4-container-diagram"
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Container.puml


!define osaPuml https://raw.githubusercontent.com/Crashedmind/PlantUML-opensecurityarchitecture2-icons/master
!include osaPuml/Common.puml
!include osaPuml/User/all.puml

!include <office/Servers/database_server>
!include <office/Concepts/folder>

'LAYOUT_TOP_DOWN()
LAYOUT_LEFT_RIGHT()

AddContainerTag("db", $sprite="database_server", $legendText="database container")
AddContainerTag("File", $sprite="folder", $legendText="File")


SystemQueue_Ext(bufr,"BUFR", "NMHS Observation in BUFR format")
SystemQueue_Ext(netcdf,"NetCDF", "NMHS Observations in NetCDF format")
SystemQueue_Ext(csv,"CSV", "NMHS Observation in CSV format")
SystemDb_Ext(oscar, "WMO OSCAR", "WMO WIGOS metadata DB")

System_Boundary(esoh_system, "E-SOH System"){
System_Boundary(esoh_ingest, "E-SOH Ingestion"){
System_Boundary(local_oscar, "Local OSCAR", "?", "Descr"){
Container(oscar_db,"OSCAR db","json file",$tags=File)
Container(oscar_updater,"OSCAR Updater","weekly update")
}
Container(ingest_api, "Ingest API", "", "Handles input, and forwards incoming data to correct input decoder")
System_Boundary(input_decoder, "Input Decoder"){
Container(bufr_decoder, "BUFR decoder", "", "Decode BUFR, add missing metadata and generate E-SOH MQTT message")
Container(netcdf_decoder, "NetCDF decoder", "", "Decode NetCDF, add missing metadata and generate E-SOH MQTT message")
Container(csv_decoder, "CSV decoder", "", "Decode CSV, add missing metadata and generate E-SOH MQTT message")
}
System_Boundary(mqtt_handler, "MQTT Message Handler"){
Container(mqtt_meta, "MQTT Meta", "", "Set id, pubtime, data_id, metadata_id")
Container(validator, "MQTT Validator", "", "Check message content")
Container(MQTT, "MQTT Message Sender", "", "Sending Message")
Container(db_ingest, "DB Ingestion", "", "Sending measure data to store via API. If success, then trigger MQTT sending")
}
ContainerQueue(queue, "Notification Service", "RabbitMQ", "MQTT Event Queue. WIS2 real-time data sharing by a publication/subscription (PubSub) mechanism based on the Message Queuing Telemetry Transport Protocol.")
}
System_Boundary(db,"Database"){
Container(ingestion, "Data Ingestion API(s)", "", "Validate incoming, and forward to data and metadata store. API or object-store?")
ContainerDb(store, "Data and Metadata Store", "Storage of 24 hours of data and indexing.",$tags="db")
}
Container(lmar, "Logging, monitoring, alerting and reporting", "")
}

System_Ext(subscribers,"Ext Subscribers", "Ext subscriber")


Rel(csv,ingest_api,"File or bytestream")
Rel(bufr,ingest_api,"File or bytestream")
Rel(netcdf,ingest_api,"File or bytestream")
Rel(ingest_api, input_decoder, "File or bytestream")
Rel(csv_decoder,mqtt_meta,"MQTT json messages")
Rel(bufr_decoder,mqtt_meta,"MQTT json messages")
Rel(netcdf_decoder,mqtt_meta,"MQTT json messages")
Rel(mqtt_meta,validator,"MQTT json messages")
Rel_U(validator,db_ingest,"MQTT json messages")
Rel(validator,MQTT,"MQTT json messages")
BiRel(db_ingest,ingestion,"")
Rel(ingestion,store,"")
Rel(db_ingest,MQTT,"Db store verification")
Rel(MQTT,queue,"MQTT v5.x")

Rel(input_decoder,lmar,"")
Rel(mqtt_handler,lmar,"")

'BiRel(csv_decoder,oscar_db,"metadata query")
'BiRel(netcdf_decoder,oscar_db,"metadata query")
BiRel(bufr_decoder,oscar_db,"metadata query")
BiRel_U(oscar_updater, oscar, "update metadata")
BiRel(oscar_updater, oscar_db, "update metadata")
Rel(local_oscar,lmar,"")
Rel(queue,subscribers,"MQTT v5.x")



@enduml