Skip to content

Commit

Permalink
Merge branch 'dev' into la-no-processed-recordedby
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-collins authored Dec 7, 2023
2 parents 39dec79 + ddd3afc commit 6e927e1
Show file tree
Hide file tree
Showing 48 changed files with 1,015 additions and 635 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public BalancerService(BalancerConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-balancer service with parameters : {}", config);
log.info("Started pipelines-balancer service");
StepConfiguration stepConfig = config.stepConfig;

// Prefetch is one, since this is a long-running process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public HdfsViewService(HdfsViewConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info(
"Started pipelines-{}-hdfs-view service with parameters : {}", config.stepType, config);
log.info("Started pipelines-{}-hdfs-view service", config.stepType);
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public EventsIndexingService(EventsIndexingConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-event-indexing service with parameters : {}", config);
log.info("Started pipelines-event-indexing service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public EventsInterpretationService(EventsInterpretationConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-event-interpretation service with parameters : {}", config);
log.info("Started pipelines-event-interpretation service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public HdfsViewService(HdfsViewConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-hdfs-view service with parameters : {}", config);
log.info("Started pipelines-hdfs-view service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public IdentifierService(IdentifierConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info(
"Started pipelines-occurrence-identifier dataset service with parameters : {}", config);
log.info("Started pipelines-occurrence-identifier dataset service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public IndexingService(IndexingConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-occurrence-indexing service with parameters : {}", config);
log.info("Started pipelines-occurrence-indexing service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public InterpretationService(InterpreterConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-occurrence-interpretation service with parameters : {}", config);
log.info("Started pipelines-occurrence-interpretation service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public CleanerService(CleanerConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-validator-cleaner service with parameters : {}", config);
log.info("Started pipelines-validator-cleaner service");
// create the listener.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public MetricsCollectorService(MetricsCollectorConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-validator-metrics-collector with parameters : {}", config);
log.info("Started pipelines-validator-metrics-collector");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ArchiveValidatorService(ArchiveValidatorConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-validator-archive-validator service with parameters : {}", config);
log.info("Started pipelines-validator-archive-validator service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AbcdToAvroService(XmlToAvroConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-verbatim-to-avro-from-abcd service with parameters : {}", config);
log.info("Started pipelines-verbatim-to-avro-from-abcd service");
// create the listener.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public DwcaToAvroService(DwcaToAvroConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-verbatim-to-avro-from-dwca service with parameters : {}", config);
log.info("Started pipelines-verbatim-to-avro-from-dwca service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public FragmenterService(FragmenterConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-verbatim-fragmenter service with parameters : {}", config);
log.info("Started pipelines-verbatim-fragmenter service");
// Prefetch is one, since this is a long-running process.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public XmlToAvroService(XmlToAvroConfiguration config) {

@Override
protected void startUp() throws Exception {
log.info("Started pipelines-verbatim-to-avro-from-xml service with parameters : {}", config);
log.info("Started pipelines-verbatim-to-avro-from-xml service");
// create the listener.
StepConfiguration c = config.stepConfig;
listener = new MessageListener(c.messaging.getConnectionParameters(), 1);
Expand Down
12 changes: 12 additions & 0 deletions gbif/ingestion/clustering-gbif-oozie/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Oozie workflow

Install by building the shaded artifact and using the script:

```
cd ../clustering-gbif
mvn install -Pextra-artifacts
cd ../clustering-gbif-oozie
./install-workflow.sh dev GITHUB_KEY
```

34 changes: 34 additions & 0 deletions gbif/ingestion/clustering-gbif-oozie/install-workflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env bash
set -e
set -o pipefail

ENV=$1
TOKEN=$2

echo "Installing clustering workflow for $ENV"

echo "Get latest clustering config profiles from GitHub"
curl -Ss -H "Authorization: token $TOKEN" -H 'Accept: application/vnd.github.v3.raw' -O -L https://api.github.com/repos/gbif/gbif-configuration/contents/clustering/$ENV/clustering.properties

START=$(date +%Y-%m-%d)T$(grep '^startHour=' clustering.properties | cut -d= -f 2)Z
FREQUENCY="$(grep '^frequency=' clustering.properties | cut -d= -f 2)"
OOZIE=$(grep '^oozie.url=' clustering.properties | cut -d= -f 2)

# Gets the Oozie id of the current coordinator job if it exists
WID=$(oozie jobs -oozie $OOZIE -jobtype coordinator -filter name=Clustering | awk 'NR==3 {print $1}')
if [ -n "$WID" ]; then
echo "Killing current coordinator job" $WID
sudo -u hdfs oozie job -oozie $OOZIE -kill $WID
fi

echo "Assembling jar for $ENV"
# Oozie uses timezone UTC
mvn -Dclustering.frequency="$FREQUENCY" -Dclustering.start="$START" -DskipTests -Duser.timezone=UTC clean install

echo "Copy to Hadoop"
sudo -u hdfs hdfs dfs -rm -r /clustering-workflow/
sudo -u hdfs hdfs dfs -copyFromLocal target/clustering-workflow /
sudo -u hdfs hdfs dfs -copyFromLocal /etc/hive/conf/hive-site.xml /clustering-workflow/lib/

echo "Start Oozie clustering datasets job"
sudo -u hdfs oozie job --oozie $OOZIE -config clustering.properties -run
63 changes: 63 additions & 0 deletions gbif/ingestion/clustering-gbif-oozie/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>ingestion</artifactId>
<version>2.18.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>clustering-oozie-workflow</artifactId>

<name>Clustering :: Oozie workflow</name>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/coordinator.xml</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>clustering-workflow</finalName>
<descriptors>
<descriptor>src/main/assembly/oozie.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-oozie</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.gbif.pipelines</groupId>
<artifactId>clustering-gbif</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
20 changes: 20 additions & 0 deletions gbif/ingestion/clustering-gbif-oozie/resume-workflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -e
set -o pipefail

ENV=$1
TOKEN=$2

echo "Resuming clustering workflow for $ENV"

echo "Get latest clustering config profiles from GitHub"
curl -Ss -H "Authorization: token $TOKEN" -H 'Accept: application/vnd.github.v3.raw' -O -L https://api.github.com/repos/gbif/gbif-configuration/contents/clustering/$ENV/clustering.properties

OOZIE=$(grep '^oozie.url=' clustering.properties | cut -d= -f 2)

# Gets the Oozie id of the current coordinator job if it exists
WID=$(oozie jobs -oozie $OOZIE -jobtype coordinator -filter name=Clustering | awk 'NR==3 {print $1}')
if [ -n "$WID" ]; then
echo "Resuming current coordinator job" $WID
sudo -u hdfs oozie job -oozie $OOZIE -resume $WID
fi
28 changes: 28 additions & 0 deletions gbif/ingestion/clustering-gbif-oozie/src/main/assembly/oozie.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="utf-8"?>

<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">

<id>clustering</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.build.outputDirectory}</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<fileMode>0644</fileMode>
<outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping>
<includes>
<include>org.gbif.pipelines:clustering-gbif</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!--
Rebuilds clustering table on a schedule.
-->
<coordinator-app name="Clustering" frequency="${clustering.frequency}"
start="${clustering.start}" end="2050-05-28T00:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
<action>
<workflow>
<app-path>hdfs://ha-nn/clustering-workflow</app-path>
</workflow>
</action>
</coordinator-app>
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="utf-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.4.5" name="clustering">

<global>
<job-tracker>${wf:conf("hadoop.jobtracker")}</job-tracker>
<name-node>${wf:conf("hdfs.namenode")}</name-node>
<configuration>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${wf:conf("hadoop.queuename")}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>
</global>

<start to="clustering" />

<action name="clustering">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${wf:conf("hadoop.jobtracker")}</job-tracker>
<name-node>${wf:conf("hdfs.namenode")}</name-node>
<master>yarn-cluster</master>
<name>Clustering</name>
<class>org.gbif.pipelines.clustering.Cluster</class>
<jar>lib/clustering-gbif.jar</jar>
<!-- Following enabling static service pools (cgroups) we found the native libraries would not load. The only way we found to pass this through was using extraLibraryPath -->
<spark-opts>${wf:conf("gbif.clustering.spark.opts")} --conf spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native --conf spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native</spark-opts>
<arg>--hive-db</arg>
<arg>${wf:conf("gbif.clustering.hive.db")}</arg>
<arg>--source-table</arg>
<arg>${wf:conf("gbif.clustering.source.table")}</arg>
<arg>--hive-table-prefix</arg>
<arg>${wf:conf("gbif.clustering.hive.table.prefix")}</arg>
<arg>--hbase-table</arg>
<arg>${wf:conf("gbif.clustering.hbase.table")}</arg>
<arg>--hbase-regions</arg>
<arg>${wf:conf("gbif.clustering.hbase.regions")}</arg>
<arg>--hbase-zk</arg>
<arg>${wf:conf("gbif.clustering.hbase.zk")}</arg>
<arg>--target-dir</arg>
<arg>${wf:conf("gbif.clustering.target.dir")}</arg>
<arg>--hash-count-threshold</arg>
<arg>${wf:conf("gbif.clustering.hash.count.threshold")}</arg>
</spark>

<ok to="end" />
<error to="kill" />
</action>

<kill name="kill">
<message>Clustering failed:[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

<end name="end" />

</workflow-app>
20 changes: 20 additions & 0 deletions gbif/ingestion/clustering-gbif-oozie/suspend-workflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -e
set -o pipefail

ENV=$1
TOKEN=$2

echo "Suspending clustering workflow for $ENV"

echo "Get latest clustering config profiles from GitHub"
curl -Ss -H "Authorization: token $TOKEN" -H 'Accept: application/vnd.github.v3.raw' -O -L https://api.github.com/repos/gbif/gbif-configuration/contents/clustering/$ENV/clustering.properties

OOZIE=$(grep '^oozie.url=' clustering.properties | cut -d= -f 2)

# Gets the Oozie id of the current coordinator job if it exists
WID=$(oozie jobs -oozie $OOZIE -jobtype coordinator -filter name=Clustering | awk 'NR==3 {print $1}')
if [ -n "$WID" ]; then
echo "Suspending current coordinator job" $WID
sudo -u hdfs oozie job -oozie $OOZIE -suspend $WID
fi
Loading

0 comments on commit 6e927e1

Please sign in to comment.