diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerService.java index 7b6f4f13ef..75c7c38f4a 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/balancer/BalancerService.java @@ -24,7 +24,7 @@ public BalancerService(BalancerConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-balancer service with parameters : {}", config); + log.info("Started pipelines-balancer service"); StepConfiguration stepConfig = config.stepConfig; // Prefetch is one, since this is a long-running process. diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/hdfs/HdfsViewService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/hdfs/HdfsViewService.java index 39b95a4f82..5c9ac9e9a0 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/hdfs/HdfsViewService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/hdfs/HdfsViewService.java @@ -30,8 +30,7 @@ public HdfsViewService(HdfsViewConfiguration config) { @Override protected void startUp() throws Exception { - log.info( - "Started pipelines-{}-hdfs-view service with parameters : {}", config.stepType, config); + log.info("Started pipelines-{}-hdfs-view service", config.stepType); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/indexing/EventsIndexingService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/indexing/EventsIndexingService.java index 304f8ff4e8..6a6894f080 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/indexing/EventsIndexingService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/indexing/EventsIndexingService.java @@ -29,7 +29,7 @@ public EventsIndexingService(EventsIndexingConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-event-indexing service with parameters : {}", config); + log.info("Started pipelines-event-indexing service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/interpretation/EventsInterpretationService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/interpretation/EventsInterpretationService.java index 42695409f8..8d1694a9e9 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/interpretation/EventsInterpretationService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/events/interpretation/EventsInterpretationService.java @@ -29,7 +29,7 @@ public EventsInterpretationService(EventsInterpretationConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-event-interpretation service with parameters : {}", config); + log.info("Started pipelines-event-interpretation service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/hdfs/HdfsViewService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/hdfs/HdfsViewService.java index 043df6b3f5..990349fa31 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/hdfs/HdfsViewService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/hdfs/HdfsViewService.java @@ -32,7 +32,7 @@ public HdfsViewService(HdfsViewConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-hdfs-view service with parameters : {}", config); + log.info("Started pipelines-hdfs-view service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/identifier/IdentifierService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/identifier/IdentifierService.java index 106125f857..355d56c3e7 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/identifier/IdentifierService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/identifier/IdentifierService.java @@ -33,8 +33,7 @@ public IdentifierService(IdentifierConfiguration config) { @Override protected void startUp() throws Exception { - log.info( - "Started pipelines-occurrence-identifier dataset service with parameters : {}", config); + log.info("Started pipelines-occurrence-identifier dataset service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/indexing/IndexingService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/indexing/IndexingService.java index ff2f84f786..e8d8d33aad 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/indexing/IndexingService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/indexing/IndexingService.java @@ -36,7 +36,7 @@ public IndexingService(IndexingConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-occurrence-indexing service with parameters : {}", config); + log.info("Started pipelines-occurrence-indexing service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/interpretation/InterpretationService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/interpretation/InterpretationService.java index 39515356af..559831e34e 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/interpretation/InterpretationService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/occurrences/interpretation/InterpretationService.java @@ -36,7 +36,7 @@ public InterpretationService(InterpreterConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-occurrence-interpretation service with parameters : {}", config); + log.info("Started pipelines-occurrence-interpretation service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/cleaner/CleanerService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/cleaner/CleanerService.java index 58706bb1c8..a82d55c7cc 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/cleaner/CleanerService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/cleaner/CleanerService.java @@ -25,7 +25,7 @@ public CleanerService(CleanerConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-validator-cleaner service with parameters : {}", config); + log.info("Started pipelines-validator-cleaner service"); // create the listener. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/metrics/MetricsCollectorService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/metrics/MetricsCollectorService.java index 4493f9e3f0..d7ef2ae58c 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/metrics/MetricsCollectorService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/metrics/MetricsCollectorService.java @@ -29,7 +29,7 @@ public MetricsCollectorService(MetricsCollectorConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-validator-metrics-collector with parameters : {}", config); + log.info("Started pipelines-validator-metrics-collector"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/validator/ArchiveValidatorService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/validator/ArchiveValidatorService.java index 2d4d65e412..84c4febcbf 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/validator/ArchiveValidatorService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/validators/validator/ArchiveValidatorService.java @@ -28,7 +28,7 @@ public ArchiveValidatorService(ArchiveValidatorConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-validator-archive-validator service with parameters : {}", config); + log.info("Started pipelines-validator-archive-validator service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/abcd/AbcdToAvroService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/abcd/AbcdToAvroService.java index 4cb87449d8..9ab0fc6cce 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/abcd/AbcdToAvroService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/abcd/AbcdToAvroService.java @@ -37,7 +37,7 @@ public AbcdToAvroService(XmlToAvroConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-verbatim-to-avro-from-abcd service with parameters : {}", config); + log.info("Started pipelines-verbatim-to-avro-from-abcd service"); // create the listener. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/dwca/DwcaToAvroService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/dwca/DwcaToAvroService.java index 368cc99338..d7ca238562 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/dwca/DwcaToAvroService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/dwca/DwcaToAvroService.java @@ -28,7 +28,7 @@ public DwcaToAvroService(DwcaToAvroConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-verbatim-to-avro-from-dwca service with parameters : {}", config); + log.info("Started pipelines-verbatim-to-avro-from-dwca service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/fragmenter/FragmenterService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/fragmenter/FragmenterService.java index 1665750ede..99a3601985 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/fragmenter/FragmenterService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/fragmenter/FragmenterService.java @@ -37,7 +37,7 @@ public FragmenterService(FragmenterConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-verbatim-fragmenter service with parameters : {}", config); + log.info("Started pipelines-verbatim-fragmenter service"); // Prefetch is one, since this is a long-running process. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/xml/XmlToAvroService.java b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/xml/XmlToAvroService.java index ab7ae34599..92b37b99a8 100644 --- a/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/xml/XmlToAvroService.java +++ b/gbif/coordinator/tasks/src/main/java/org/gbif/pipelines/tasks/verbatims/xml/XmlToAvroService.java @@ -35,7 +35,7 @@ public XmlToAvroService(XmlToAvroConfiguration config) { @Override protected void startUp() throws Exception { - log.info("Started pipelines-verbatim-to-avro-from-xml service with parameters : {}", config); + log.info("Started pipelines-verbatim-to-avro-from-xml service"); // create the listener. StepConfiguration c = config.stepConfig; listener = new MessageListener(c.messaging.getConnectionParameters(), 1); diff --git a/gbif/ingestion/clustering-gbif-oozie/README.md b/gbif/ingestion/clustering-gbif-oozie/README.md new file mode 100644 index 0000000000..f89cf9be6e --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/README.md @@ -0,0 +1,12 @@ +# Oozie workflow + +Install by building the shaded artifact and using the script: + +``` +cd ../clustering-gbif +mvn install -Pextra-artifacts + +cd ../clustering-gbif-oozie +./install-workflow.sh dev GITHUB_KEY +``` + diff --git a/gbif/ingestion/clustering-gbif-oozie/install-workflow.sh b/gbif/ingestion/clustering-gbif-oozie/install-workflow.sh new file mode 100755 index 0000000000..f5400fa3b7 --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/install-workflow.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +set -e +set -o pipefail + +ENV=$1 +TOKEN=$2 + +echo "Installing clustering workflow for $ENV" + +echo "Get latest clustering config profiles from GitHub" +curl -Ss -H "Authorization: token $TOKEN" -H 'Accept: application/vnd.github.v3.raw' -O -L https://api.github.com/repos/gbif/gbif-configuration/contents/clustering/$ENV/clustering.properties + +START=$(date +%Y-%m-%d)T$(grep '^startHour=' clustering.properties | cut -d= -f 2)Z +FREQUENCY="$(grep '^frequency=' clustering.properties | cut -d= -f 2)" +OOZIE=$(grep '^oozie.url=' clustering.properties | cut -d= -f 2) + +# Gets the Oozie id of the current coordinator job if it exists +WID=$(oozie jobs -oozie $OOZIE -jobtype coordinator -filter name=Clustering | awk 'NR==3 {print $1}') +if [ -n "$WID" ]; then + echo "Killing current coordinator job" $WID + sudo -u hdfs oozie job -oozie $OOZIE -kill $WID +fi + +echo "Assembling jar for $ENV" +# Oozie uses timezone UTC +mvn -Dclustering.frequency="$FREQUENCY" -Dclustering.start="$START" -DskipTests -Duser.timezone=UTC clean install + +echo "Copy to Hadoop" +sudo -u hdfs hdfs dfs -rm -r /clustering-workflow/ +sudo -u hdfs hdfs dfs -copyFromLocal target/clustering-workflow / +sudo -u hdfs hdfs dfs -copyFromLocal /etc/hive/conf/hive-site.xml /clustering-workflow/lib/ + +echo "Start Oozie clustering datasets job" +sudo -u hdfs oozie job --oozie $OOZIE -config clustering.properties -run diff --git a/gbif/ingestion/clustering-gbif-oozie/pom.xml b/gbif/ingestion/clustering-gbif-oozie/pom.xml new file mode 100644 index 0000000000..53fbb62527 --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/pom.xml @@ -0,0 +1,63 @@ + + + 4.0.0 + + + org.gbif.pipelines + ingestion + 2.18.0-SNAPSHOT + ../pom.xml + + + clustering-oozie-workflow + + Clustering :: Oozie workflow + + + + + src/main/resources + true + + **/coordinator.xml + + + + src/main/resources + false + + + + + maven-assembly-plugin + + false + clustering-workflow + + src/main/assembly/oozie.xml + + + + + make-oozie + package + + single + + + + + + + + + + org.gbif.pipelines + clustering-gbif + ${project.version} + + + + diff --git a/gbif/ingestion/clustering-gbif-oozie/resume-workflow.sh b/gbif/ingestion/clustering-gbif-oozie/resume-workflow.sh new file mode 100755 index 0000000000..5b9c3841ab --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/resume-workflow.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +set -e +set -o pipefail + +ENV=$1 +TOKEN=$2 + +echo "Resuming clustering workflow for $ENV" + +echo "Get latest clustering config profiles from GitHub" +curl -Ss -H "Authorization: token $TOKEN" -H 'Accept: application/vnd.github.v3.raw' -O -L https://api.github.com/repos/gbif/gbif-configuration/contents/clustering/$ENV/clustering.properties + +OOZIE=$(grep '^oozie.url=' clustering.properties | cut -d= -f 2) + +# Gets the Oozie id of the current coordinator job if it exists +WID=$(oozie jobs -oozie $OOZIE -jobtype coordinator -filter name=Clustering | awk 'NR==3 {print $1}') +if [ -n "$WID" ]; then + echo "Resuming current coordinator job" $WID + sudo -u hdfs oozie job -oozie $OOZIE -resume $WID +fi diff --git a/gbif/ingestion/clustering-gbif-oozie/src/main/assembly/oozie.xml b/gbif/ingestion/clustering-gbif-oozie/src/main/assembly/oozie.xml new file mode 100644 index 0000000000..749ce29921 --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/src/main/assembly/oozie.xml @@ -0,0 +1,28 @@ + + + + + clustering + + dir + + false + + + ${project.build.outputDirectory} + / + + + + + lib + 0644 + ${artifact.artifactId}.${artifact.extension} + + org.gbif.pipelines:clustering-gbif + + + + diff --git a/gbif/ingestion/clustering-gbif-oozie/src/main/resources/coordinator.xml b/gbif/ingestion/clustering-gbif-oozie/src/main/resources/coordinator.xml new file mode 100644 index 0000000000..71ff7dce66 --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/src/main/resources/coordinator.xml @@ -0,0 +1,11 @@ + + + + + hdfs://ha-nn/clustering-workflow + + + diff --git a/gbif/ingestion/clustering-gbif-oozie/src/main/resources/workflow.xml b/gbif/ingestion/clustering-gbif-oozie/src/main/resources/workflow.xml new file mode 100644 index 0000000000..adfc50eb50 --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/src/main/resources/workflow.xml @@ -0,0 +1,59 @@ + + + + + ${wf:conf("hadoop.jobtracker")} + ${wf:conf("hdfs.namenode")} + + + oozie.launcher.mapred.job.queue.name + ${wf:conf("hadoop.queuename")} + + + oozie.action.sharelib.for.spark + spark2 + + + + + + + + + ${wf:conf("hadoop.jobtracker")} + ${wf:conf("hdfs.namenode")} + yarn-cluster + Clustering + org.gbif.pipelines.clustering.Cluster + lib/clustering-gbif.jar + + ${wf:conf("gbif.clustering.spark.opts")} --conf spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native --conf spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native + --hive-db + ${wf:conf("gbif.clustering.hive.db")} + --source-table + ${wf:conf("gbif.clustering.source.table")} + --hive-table-prefix + ${wf:conf("gbif.clustering.hive.table.prefix")} + --hbase-table + ${wf:conf("gbif.clustering.hbase.table")} + --hbase-regions + ${wf:conf("gbif.clustering.hbase.regions")} + --hbase-zk + ${wf:conf("gbif.clustering.hbase.zk")} + --target-dir + ${wf:conf("gbif.clustering.target.dir")} + --hash-count-threshold + ${wf:conf("gbif.clustering.hash.count.threshold")} + + + + + + + + Clustering failed:[${wf:errorMessage(wf:lastErrorNode())}] + + + + + diff --git a/gbif/ingestion/clustering-gbif-oozie/suspend-workflow.sh b/gbif/ingestion/clustering-gbif-oozie/suspend-workflow.sh new file mode 100755 index 0000000000..acc689fd66 --- /dev/null +++ b/gbif/ingestion/clustering-gbif-oozie/suspend-workflow.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +set -e +set -o pipefail + +ENV=$1 +TOKEN=$2 + +echo "Suspending clustering workflow for $ENV" + +echo "Get latest clustering config profiles from GitHub" +curl -Ss -H "Authorization: token $TOKEN" -H 'Accept: application/vnd.github.v3.raw' -O -L https://api.github.com/repos/gbif/gbif-configuration/contents/clustering/$ENV/clustering.properties + +OOZIE=$(grep '^oozie.url=' clustering.properties | cut -d= -f 2) + +# Gets the Oozie id of the current coordinator job if it exists +WID=$(oozie jobs -oozie $OOZIE -jobtype coordinator -filter name=Clustering | awk 'NR==3 {print $1}') +if [ -n "$WID" ]; then + echo "Suspending current coordinator job" $WID + sudo -u hdfs oozie job -oozie $OOZIE -suspend $WID +fi diff --git a/gbif/ingestion/clustering-gbif/README.md b/gbif/ingestion/clustering-gbif/README.md index a3a4345d42..a69690920f 100644 --- a/gbif/ingestion/clustering-gbif/README.md +++ b/gbif/ingestion/clustering-gbif/README.md @@ -1,24 +1,13 @@ ## Occurrence Clustering -Provides utilities to cluster GBIF occurrence records. +Processes occurrence data and establishes links between similar records. -Status: Having explored approaches using Spark ML (no success due to data skew) and -[LinkedIn ScANNs](https://github.com/linkedin/scanns) (limited success, needs more investigation) -this uses a declarative rule-based approach making use of domain knowledge. A multi-blocking stage -groups candidate record pairs, followed by a pair-wise comparison to detect links within the group. +The output is a set of HFiles suitable for bulk loading into HBase which drives the pipeline lookup and +public related occurrence API. -The initial focus is on specimens to locate: - 1. Physical records (e.g. Isotypes and specimens split and deposited in multiple herbaria) - 2. Database duplicates across datasets (huge biases observed within datasets (e.g. gutworm datasets)) - 3. Strong links between sequence records, citations and specimens +Build the project: `mvn spotless:apply test package -Pextra-artifacts` -This is intended to be run regularly (e.g. daily) and therefore performance is of critical concern. - -The output is bulk loaded into HBase, suitable for an API to deployed on. - -Build the project: `mvn clean package` - -To run (while in exploration - will be made into Oozie workflow later): +To run this against a completely new table: Setup hbase: ``` @@ -40,34 +29,30 @@ create 'occurrence_relationships_experimental', ]} ``` -Remove hive tables from the target database: -``` -drop table occurrence_clustering_hashed; -drop table occurrence_clustering_hashed_all; -drop table occurrence_clustering_candidates; -drop table occurrence_relationships; -``` - -Run the job (In production this configuration takes 2.6 hours with ~2.3B records) +Run the job +(In production this configuration takes ~2hours with ~2.3B records) ``` hdfs dfs -rm -r /tmp/clustering nohup sudo -u hdfs spark2-submit --class org.gbif.pipelines.clustering.Cluster \ --master yarn --num-executors 100 \ - --executor-cores 6 \ + --executor-cores 4 \ --conf spark.dynamicAllocation.enabled=false \ - --conf spark.sql.shuffle.partitions=1000 \ + --conf spark.sql.shuffle.partitions=1200 \ --executor-memory 64G \ --driver-memory 4G \ - clustering-gbif-2.14.0-SNAPSHOT.jar \ - --hive-db prod_h \ - --hive-table-hashed occurrence_clustering_hashed \ - --hive-table-candidates occurrence_clustering_candidates \ - --hive-table-relationships occurrence_relationships \ + --conf spark.executor.memoryOverhead=4096 \ + --conf spark.debug.maxToStringFields=100000 \ + --conf spark.network.timeout=600s \ + clustering-gbif-2.18.0-SNAPSHOT.jar \ + --hive-db prod_TODO \ + --source-table occurrence \ + --hive-table-prefix clustering \ --hbase-table occurrence_relationships_experimental \ --hbase-regions 100 \ --hbase-zk c5zk1.gbif.org,c5zk2.gbif.org,c5zk3.gbif.org \ - --hfile-dir /tmp/clustering & + --target-dir /tmp/clustering \ + --hash-count-threshold 100 & ``` Load HBase diff --git a/gbif/ingestion/clustering-gbif/pom.xml b/gbif/ingestion/clustering-gbif/pom.xml index c77c5594df..73a6d37bd5 100644 --- a/gbif/ingestion/clustering-gbif/pom.xml +++ b/gbif/ingestion/clustering-gbif/pom.xml @@ -24,45 +24,21 @@ true - - - - net.alchim31.maven - scala-maven-plugin - ${maven-scala-plugin.version} - - - scala-compile-first - process-resources - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - /var/tmp/sbt - - - - - org.gbif.pipelines core + + + + * + * + + - org.apache.spark - spark-catalyst_2.11 - ${spark.version} + org.projectlombok + lombok provided @@ -77,18 +53,6 @@ ${spark.version} provided - - org.apache.spark - spark-yarn_2.11 - ${spark.version} - provided - - - org.apache.spark - spark-hive_2.11 - ${spark.version} - provided - org.apache.hbase hbase-client @@ -158,11 +122,13 @@ org.codehaus.janino commons-compiler ${commons-compiler.version} + test org.codehaus.janino janino ${janino.version} + test @@ -183,6 +149,8 @@ false false + + false *:* @@ -204,6 +172,13 @@ + + + + com.fasterxml.jackson + gbif.com.fasterxml.jackson + + diff --git a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/Cluster.java b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/Cluster.java new file mode 100644 index 0000000000..ba0b9c67fc --- /dev/null +++ b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/Cluster.java @@ -0,0 +1,376 @@ +package org.gbif.pipelines.clustering; + +import static org.gbif.pipelines.clustering.HashUtilities.recordHashes; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import lombok.Builder; +import lombok.Data; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships; +import org.gbif.pipelines.core.parsers.clustering.RelationshipAssertion; +import scala.Tuple2; + +/** + * Regenerates the HBase and Hive tables for a data clustering run. This will read the occurrence + * Hive table, generate HFiles for relationships and create intermediate tables in Hive for + * diagnostics, replacing any that exist. Finally, the HBase table is truncated preserving its + * partitioning and bulk loaded with new data.
+ * This process works on the deliberate design principle that readers of the table will have + * implemented a retry mechanism for the brief outage during the table swap which is expected to be + * on an e.g. weekly basis. + */ +@Builder +@Data +public class Cluster implements Serializable { + private String hiveDB; + private String sourceTable; + private String hiveTablePrefix; + private String hbaseTable; + private int hbaseRegions; + private String hbaseZK; + private String targetDir; + private int hashCountThreshold; + + private static final StructType HASH_ROW_SCHEMA = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("gbifId", DataTypes.StringType, false), + DataTypes.createStructField("datasetKey", DataTypes.StringType, false), + DataTypes.createStructField("hash", DataTypes.StringType, false) + }); + + private static final StructType RELATIONSHIP_SCHEMA = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id1", DataTypes.StringType, false), + DataTypes.createStructField("id2", DataTypes.StringType, false), + DataTypes.createStructField("reasons", DataTypes.StringType, false), + DataTypes.createStructField("dataset1", DataTypes.StringType, false), + DataTypes.createStructField("dataset2", DataTypes.StringType, false), + DataTypes.createStructField("o1", DataTypes.StringType, false), + DataTypes.createStructField("o2", DataTypes.StringType, false) + }); + + public static void main(String[] args) throws IOException { + CommandLineParser.parse(args).build().run(); + } + + /** Run the full process, generating relationships and refreshing the HBase table. */ + private void run() throws IOException { + removeTargetDir(); // fail fast if given bad config + + SparkSession spark = + SparkSession.builder() + .appName("Occurrence clustering") + .config("spark.sql.warehouse.dir", new File("spark-warehouse").getAbsolutePath()) + .enableHiveSupport() + .getOrCreate(); + spark.sql("use " + hiveDB); + + createCandidatePairs(spark); + Dataset relationships = generateRelationships(spark); + generateHFiles(relationships); + replaceHBaseTable(); + } + + /** + * Reads the input, and through a series of record hashing creates a table of candidate record + * pairs for proper comparison. + */ + private void createCandidatePairs(SparkSession spark) { + // Read the input fields needed for generating the hashes + Dataset occurrences = + spark.sql( + String.format( + "SELECT" + + " gbifId, datasetKey, basisOfRecord, typeStatus, " + + " CAST(taxonKey AS String) AS taxonKey, CAST(speciesKey AS String) AS speciesKey, " + + " decimalLatitude, decimalLongitude, year, month, day, recordedBy, " + + " recordNumber, fieldNumber, occurrenceID, otherCatalogNumbers, institutionCode, " + + " collectionCode, catalogNumber " + + "FROM %s", + sourceTable)); + Dataset hashes = + occurrences + .flatMap( + row -> recordHashes(new RowOccurrenceFeatures(row)), + RowEncoder.apply(HASH_ROW_SCHEMA)) + .dropDuplicates(); + spark.sql("DROP TABLE IF EXISTS " + hiveTablePrefix + "_hashes"); + hashes.write().format("parquet").saveAsTable(hiveTablePrefix + "_hashes"); + + // To avoid NxN, filter to a threshold to exclude e.g. gut worm analysis datasets + Dataset hashCounts = + spark.sql( + String.format( + "SELECT hash, count(*) AS c FROM %s_hashes GROUP BY hash", hiveTablePrefix)); + spark.sql("DROP TABLE IF EXISTS " + hiveTablePrefix + "_hash_counts"); + hashCounts.write().format("parquet").saveAsTable(hiveTablePrefix + "_hash_counts"); + Dataset filteredHashes = + spark.sql( + String.format( + "SELECT t1.gbifID, t1.datasetKey, t1.hash " + + "FROM %s_hashes t1" + + " JOIN %s_hash_counts t2 ON t1.hash=t2.hash " + + "WHERE t2.c <= %d", + hiveTablePrefix, hiveTablePrefix, hashCountThreshold)); + spark.sql("DROP TABLE IF EXISTS " + hiveTablePrefix + "_hashes_filtered"); + filteredHashes.write().format("parquet").saveAsTable(hiveTablePrefix + "_hashes_filtered"); + + // distinct cross join to generate the candidate pairs for comparison + Dataset candidates = + spark.sql( + String.format( + "SELECT " + + "t1.gbifId as id1, t1.datasetKey as ds1, t2.gbifId as id2, t2.datasetKey as ds2 " + + "FROM %s_hashes_filtered t1 JOIN %s_hashes_filtered t2 ON t1.hash = t2.hash " + + "WHERE " + + " t1.gbifId < t2.gbifId AND " + + " t1.datasetKey != t2.datasetKey " + + "GROUP BY t1.gbifId, t1.datasetKey, t2.gbifId, t2.datasetKey", + hiveTablePrefix, hiveTablePrefix)); + spark.sql("DROP TABLE IF EXISTS " + hiveTablePrefix + "_candidates"); + candidates.write().format("parquet").saveAsTable(hiveTablePrefix + "_candidates"); + } + + /** Reads the candidate pairs table, and runs the record to record comparison. */ + private Dataset generateRelationships(SparkSession spark) { + // Spark DF naming convention requires that we alias each term to avoid naming collision while + // still having named fields to access (i.e. not relying on the column number of the term). All + // taxa keys are converted to String to allow shared routines between GBIF and ALA + // (https://github.com/gbif/pipelines/issues/484) + Dataset pairs = + spark.sql( + String.format( + "SELECT " + + " t1.gbifId AS t1_gbifId, t1.datasetKey AS t1_datasetKey, t1.basisOfRecord AS t1_basisOfRecord, t1.publishingorgkey AS t1_publishingOrgKey, t1.datasetName AS t1_datasetName, t1.publisher AS t1_publishingOrgName, " + + " CAST(t1.kingdomKey AS String) AS t1_kingdomKey, CAST(t1.phylumKey AS String) AS t1_phylumKey, CAST(t1.classKey AS String) AS t1_classKey, CAST(t1.orderKey AS String) AS t1_orderKey, CAST(t1.familyKey AS String) AS t1_familyKey, CAST(t1.genusKey AS String) AS t1_genusKey, CAST(t1.speciesKey AS String) AS t1_speciesKey, CAST(t1.acceptedTaxonKey AS String) AS t1_acceptedTaxonKey, CAST(t1.taxonKey AS String) AS t1_taxonKey, " + + " t1.scientificName AS t1_scientificName, t1.acceptedScientificName AS t1_acceptedScientificName, t1.kingdom AS t1_kingdom, t1.phylum AS t1_phylum, t1.order_ AS t1_order, t1.family AS t1_family, t1.genus AS t1_genus, t1.species AS t1_species, t1.genericName AS t1_genericName, t1.specificEpithet AS t1_specificEpithet, t1.taxonRank AS t1_taxonRank, " + + " t1.typeStatus AS t1_typeStatus, t1.preparations AS t1_preparations, " + + " t1.decimalLatitude AS t1_decimalLatitude, t1.decimalLongitude AS t1_decimalLongitude, t1.countryCode AS t1_countryCode, " + + " t1.year AS t1_year, t1.month AS t1_month, t1.day AS t1_day, from_unixtime(floor(t1.eventDate/1000)) AS t1_eventDate, " + + " t1.recordNumber AS t1_recordNumber, t1.fieldNumber AS t1_fieldNumber, t1.occurrenceID AS t1_occurrenceID, t1.otherCatalogNumbers AS t1_otherCatalogNumbers, t1.institutionCode AS t1_institutionCode, t1.collectionCode AS t1_collectionCode, t1.catalogNumber AS t1_catalogNumber, " + + " t1.recordedBy AS t1_recordedBy, t1.recordedByID AS t1_recordedByID, " + + " t1.ext_multimedia AS t1_media, " + + "" + + " t2.gbifId AS t2_gbifId, t2.datasetKey AS t2_datasetKey, t2.basisOfRecord AS t2_basisOfRecord, t2.publishingorgkey AS t2_publishingOrgKey, t2.datasetName AS t2_datasetName, t2.publisher AS t2_publishingOrgName, " + + " CAST(t2.kingdomKey AS String) AS t2_kingdomKey, CAST(t2.phylumKey AS String) AS t2_phylumKey, CAST(t2.classKey AS String) AS t2_classKey, CAST(t2.orderKey AS String) AS t2_orderKey, CAST(t2.familyKey AS String) AS t2_familyKey, CAST(t2.genusKey AS String) AS t2_genusKey, CAST(t2.speciesKey AS String) AS t2_speciesKey, CAST(t2.acceptedTaxonKey AS String) AS t2_acceptedTaxonKey, CAST(t2.taxonKey AS String) AS t2_taxonKey, " + + " t2.scientificName AS t2_scientificName, t2.acceptedScientificName AS t2_acceptedScientificName, t2.kingdom AS t2_kingdom, t2.phylum AS t2_phylum, t2.order_ AS t2_order, t2.family AS t2_family, t2.genus AS t2_genus, t2.species AS t2_species, t2.genericName AS t2_genericName, t2.specificEpithet AS t2_specificEpithet, t2.taxonRank AS t2_taxonRank, " + + " t2.typeStatus AS t2_typeStatus, t2.preparations AS t2_preparations, " + + " t2.decimalLatitude AS t2_decimalLatitude, t2.decimalLongitude AS t2_decimalLongitude, t2.countryCode AS t2_countryCode, " + + " t2.year AS t2_year, t2.month AS t2_month, t2.day AS t2_day, from_unixtime(floor(t2.eventDate/1000)) AS t2_eventDate, " + + " t2.recordNumber AS t2_recordNumber, t2.fieldNumber AS t2_fieldNumber, t2.occurrenceID AS t2_occurrenceID, t2.otherCatalogNumbers AS t2_otherCatalogNumbers, t2.institutionCode AS t2_institutionCode, t2.collectionCode AS t2_collectionCode, t2.catalogNumber AS t2_catalogNumber, " + + " t2.recordedBy AS t2_recordedBy, t2.recordedByID AS t2_recordedByID, " + + " t2.ext_multimedia AS t2_media " + + "" + + "FROM %s h" + + " JOIN %s t1 ON h.id1 = t1.gbifID " + + " JOIN %s t2 ON h.id2 = t2.gbifID", + hiveTablePrefix + "_candidates", sourceTable, sourceTable)); + + // Compare all candidate pairs and generate the relationships + Dataset relationships = + pairs.flatMap(row -> relateRecords(row), RowEncoder.apply(RELATIONSHIP_SCHEMA)); + spark.sql("DROP TABLE IF EXISTS " + hiveTablePrefix + "_relationships"); + relationships.write().format("parquet").saveAsTable(hiveTablePrefix + "_relationships"); + return relationships; + } + + /** Partitions the relationships to match the target table layout and creates the HFiles. */ + private void generateHFiles(Dataset relationships) throws IOException { + // convert to HFiles, prepared with modulo salted keys + JavaPairRDD, String> sortedRelationships = + relationships + .javaRDD() + .flatMapToPair( + row -> { + String id1 = row.getString(0); + String id2 = row.getString(1); + + // salt only on the id1 to enable prefix scanning using an occurrence ID + int salt = Math.abs(id1.hashCode()) % hbaseRegions; + String saltedRowKey = salt + ":" + id1 + ":" + id2; + + List, String>> cells = new ArrayList<>(); + cells.add(new Tuple2<>(new Tuple2<>(saltedRowKey, "id1"), id1)); + cells.add(new Tuple2<>(new Tuple2<>(saltedRowKey, "id2"), id2)); + cells.add(new Tuple2<>(new Tuple2<>(saltedRowKey, "reasons"), row.getString(2))); + cells.add(new Tuple2<>(new Tuple2<>(saltedRowKey, "dataset1"), row.getString(3))); + cells.add(new Tuple2<>(new Tuple2<>(saltedRowKey, "dataset2"), row.getString(4))); + cells.add( + new Tuple2<>(new Tuple2<>(saltedRowKey, "occurrence1"), row.getString(5))); + cells.add( + new Tuple2<>(new Tuple2<>(saltedRowKey, "occurrence2"), row.getString(6))); + + return cells.iterator(); + }) + .repartitionAndSortWithinPartitions( + new SaltPrefixPartitioner(hbaseRegions), new Tuple2StringComparator()); + + sortedRelationships + .mapToPair( + cell -> { + ImmutableBytesWritable k = new ImmutableBytesWritable(Bytes.toBytes(cell._1._1)); + Cell row = + new KeyValue( + Bytes.toBytes(cell._1._1), // key + Bytes.toBytes("o"), // column family + Bytes.toBytes(cell._1._2), // cell + Bytes.toBytes(cell._2) // cell value + ); + return new Tuple2<>(k, row); + }) + .saveAsNewAPIHadoopFile( + targetDir, + ImmutableBytesWritable.class, + KeyValue.class, + HFileOutputFormat2.class, + hadoopConf()); + } + + /** Truncates the target table preserving its layout and loads in the HFiles. */ + private void replaceHBaseTable() throws IOException { + Configuration conf = hadoopConf(); + try (Connection connection = ConnectionFactory.createConnection(conf); + HTable table = new HTable(conf, hbaseTable); + Admin admin = connection.getAdmin()) { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + + // bulkload requires files to be in hbase ownership + FsShell shell = new FsShell(conf); + try { + System.out.println("Executing chown -R hbase:hbase for " + targetDir); + shell.run(new String[] {"-chown", "-R", "hbase:hbase", targetDir}); + } catch (Exception e) { + throw new IOException("Unable to modify FS ownership to hbase", e); + } + + System.out.println(String.format("Truncating table[%s]", hbaseTable)); + Instant start = Instant.now(); + admin.disableTable(table.getName()); + admin.truncateTable(table.getName(), true); + System.out.println( + String.format( + "Table[%s] truncated in %d ms", + hbaseTable, Duration.between(start, Instant.now()).toMillis())); + System.out.println(String.format("Loading table[%s] from [%s]", hbaseTable, targetDir)); + loader.doBulkLoad(new Path(targetDir), table); + System.out.println( + String.format( + "Table [%s] truncated and reloaded in %d ms", + hbaseTable, Duration.between(start, Instant.now()).toMillis())); + removeTargetDir(); + + } catch (Exception e) { + throw new IOException(e); + } + } + + /** Removes the target directory provided it is in the /tmp location */ + private void removeTargetDir() throws IOException { + // defensive, cleaning only /tmp in hdfs (we assume people won't do /tmp/../... + String regex = "/tmp/.+"; + if (targetDir.matches(regex)) { + FsShell shell = new FsShell(new Configuration()); + try { + System.out.println( + String.format( + "Deleting working directory [%s] which translates to [-rm -r -skipTrash %s ]", + targetDir, targetDir)); + + shell.run(new String[] {"-rm", "-r", "-skipTrash", targetDir}); + } catch (Exception e) { + throw new IOException("Unable to delete the working directory", e); + } + } else { + throw new IllegalArgumentIOException("Target directory must be within /tmp"); + } + } + + /** Runs the record to record comparison */ + private Iterator relateRecords(Row row) throws IOException { + Set relationships = new HashSet<>(); + + RowOccurrenceFeatures o1 = new RowOccurrenceFeatures(row, "t1_", "t1_media"); + RowOccurrenceFeatures o2 = new RowOccurrenceFeatures(row, "t2_", "t2_media"); + RelationshipAssertion assertions = + OccurrenceRelationships.generate(o1, o2); + + // store any relationship bidirectionally matching RELATIONSHIP_SCHEMA + if (assertions != null) { + relationships.add( + RowFactory.create( + assertions.getOcc1().get("gbifId"), + assertions.getOcc2().get("gbifId"), + assertions.getJustificationAsDelimited(), + assertions.getOcc1().get("datasetKey"), + assertions.getOcc2().get("datasetKey"), + assertions.getOcc1().asJson(), + assertions.getOcc2().asJson())); + + relationships.add( + RowFactory.create( + assertions.getOcc2().get("gbifId"), + assertions.getOcc1().get("gbifId"), + assertions.getJustificationAsDelimited(), + assertions.getOcc2().get("datasetKey"), + assertions.getOcc1().get("datasetKey"), + assertions.getOcc2().asJson(), + assertions.getOcc1().asJson())); + } + return relationships.iterator(); + } + + /** Creates the Hadoop configuration suitable for HDFS and HBase use. */ + private Configuration hadoopConf() throws IOException { + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", hbaseZK); + conf.set(FileOutputFormat.COMPRESS, "true"); + conf.setClass(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class); + Job job = new Job(conf, "Clustering"); // name not actually used + HTable table = new HTable(conf, hbaseTable); + HFileOutputFormat2.configureIncrementalLoad(job, table); + return job.getConfiguration(); // job created a copy of the conf + } + + /** Necessary as the Tuple2 does not implement a comparator in Java */ + static class Tuple2StringComparator implements Comparator>, Serializable { + @Override + public int compare(Tuple2 o1, Tuple2 o2) { + if (o1._1.equals(o2._1)) return o1._2.compareTo(o2._2); + else return o1._1.compareTo(o2._1); + } + } +} diff --git a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/CommandLineParser.java b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/CommandLineParser.java new file mode 100644 index 0000000000..91dedad2e7 --- /dev/null +++ b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/CommandLineParser.java @@ -0,0 +1,53 @@ +package org.gbif.pipelines.clustering; + +import java.util.Arrays; +import java.util.List; + +/** Utility to read the named arguments. */ +class CommandLineParser { + + static Cluster.ClusterBuilder parse(String[] args) { + if (args.length != 16) { + String provided = args.length == 0 ? "no args" : String.join(" ", args); + throw new IllegalArgumentException("Incorrect configuration provided: " + provided); + } + Cluster.ClusterBuilder builder = nextOption(new Cluster.ClusterBuilder(), Arrays.asList(args)); + System.out.println("Clustering started with configuration: " + builder); + return builder; + } + + private static Cluster.ClusterBuilder nextOption( + Cluster.ClusterBuilder builder, List list) { + if (list.isEmpty()) return builder; + + String option = list.get(0); + List tail = list.subList(1, list.size()); + if (option.startsWith("-")) { + switch (option) { + case "--hive-db": + return nextOption(builder.hiveDB(tail.get(0)), tail.subList(1, tail.size())); + case "--source-table": + return nextOption(builder.sourceTable(tail.get(0)), tail.subList(1, tail.size())); + case "--hive-table-prefix": + return nextOption(builder.hiveTablePrefix(tail.get(0)), tail.subList(1, tail.size())); + case "--hbase-table": + return nextOption(builder.hbaseTable(tail.get(0)), tail.subList(1, tail.size())); + case "--hbase-regions": + return nextOption( + builder.hbaseRegions(Integer.parseInt(tail.get(0))), tail.subList(1, tail.size())); + case "--hbase-zk": + return nextOption(builder.hbaseZK(tail.get(0)), tail.subList(1, tail.size())); + case "--target-dir": + return nextOption(builder.targetDir(tail.get(0)), tail.subList(1, tail.size())); + case "--hash-count-threshold": + return nextOption( + builder.hashCountThreshold(Integer.parseInt(tail.get(0))), + tail.subList(1, tail.size())); + default: + throw new IllegalArgumentException("Unknown option " + option); + } + } else { + throw new IllegalArgumentException("Unknown option " + option); + } + } +} diff --git a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/HashUtilities.java b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/HashUtilities.java index 8cb85d1bae..1fcdc8ac31 100644 --- a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/HashUtilities.java +++ b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/HashUtilities.java @@ -3,42 +3,136 @@ import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.concatIfEligible; import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.hashOrNull; import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.isEligibleCode; -import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.isNumeric; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.gbif.pipelines.core.parsers.clustering.OccurrenceFeatures; import org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships; -/** Utility functions for hashing records to pregroup. */ -public class HashUtilities { +/** Utility functions for hashing records to pre-group. */ +class HashUtilities { + private static final Set SPECIMEN_BASIS_OF_RECORD_SET = + Stream.of( + "PRESERVED_SPECIMEN", + "MATERIAL_SAMPLE", + "LIVING_SPECIMEN", + "FOSSIL_SPECIMEN", + "MATERIAL_CITATION") + .collect(Collectors.toSet()); + + static Iterator recordHashes(OccurrenceFeatures o) { + Double lat = o.getDecimalLatitude(); + Double lng = o.getDecimalLongitude(); + Integer year = o.getYear(); + Integer month = o.getMonth(); + Integer day = o.getDay(); + String taxonKey = o.getTaxonKey(); + List typeStatus = o.getTypeStatus(); + List recordedBy = o.getRecordedBy(); + String speciesKey = o.getSpeciesKey(); + Set identifiers = hashCodesAndIDs(o, true); + + Set hashes = new HashSet<>(); + + // generic grouping on species, time and space + if (noNulls(lat, lng, year, month, day, speciesKey)) { + hashes.add( + RowFactory.create( + o.getId(), + o.getDatasetKey(), + String.join( + "|", + speciesKey, + String.valueOf(Math.round(lat * 1000)), + String.valueOf(Math.round(lng * 1000)), + year.toString(), + month.toString(), + day.toString()))); + } + + // identifiers overlap for the same species + if (noNulls(speciesKey)) { + for (String id : identifiers) { + hashes.add( + RowFactory.create(o.getId(), o.getDatasetKey(), String.join("|", speciesKey, id))); + } + } + + // anything claiming a type for the same name is of interest (regardless of type stated) + if (noNulls(taxonKey, typeStatus) && !typeStatus.isEmpty()) + hashes.add( + RowFactory.create(o.getId(), o.getDatasetKey(), String.join("|", taxonKey, "TYPE"))); + + // all similar species recorded by the same person within the same year are of interest + if (noNulls(taxonKey, year, recordedBy)) { + for (String r : recordedBy) { + hashes.add( + RowFactory.create(o.getId(), o.getDatasetKey(), String.join("|", year.toString(), r))); + } + } + + // append the specimen specific hashes + hashes.addAll(specimenHashes(o)); + + return hashes.iterator(); + } /** - * Hashes the occurrenceID, catalogCode and otherCatalogNumber into a Set of codes. The - * catalogNumber is constructed as it's code and also prefixed in the commonly used format of - * IC:CC:CN and IC:CN. No code in the resulting set will be numeric as the cardinality of e.g. - * catalogNumber=100 is too high to compute, and this is intended to be used to detect explicit - * relationships with otherCatalogNumber. - * - * @return a set of hashes suitable for grouping specimen related records without other filters. + * Generate hashes for specimens combining the various IDs and accepted species. Specimens often + * link by record identifiers, while other occurrence data skews here greatly for little benefit. */ - public static Set hashCatalogNumbers(OccurrenceFeatures o) { - Stream cats = + private static Set specimenHashes(OccurrenceFeatures o) { + Set hashes = new HashSet<>(); + String bor = o.getBasisOfRecord(); + if (SPECIMEN_BASIS_OF_RECORD_SET.contains(bor)) { + + // non-numeric identifiers for specimens used across datasets + Set codes = hashCodesAndIDs(o, true); + for (String code : codes) { + hashes.add( + RowFactory.create( + o.getId(), + o.getDatasetKey(), + String.join("|", o.getSpeciesKey(), OccurrenceRelationships.normalizeID(code)))); + } + + // stricter code hashing (non-numeric) but without species + Set codesStrict = hashCodesAndIDs(o, false); + for (String code : codesStrict) { + hashes.add( + RowFactory.create( + o.getId(), + o.getDatasetKey(), + String.join("|", OccurrenceRelationships.normalizeID(code)))); + } + } + return hashes; + } + + /** Hashes all the various ways that record codes and identifiers are commonly used. */ + static Set hashCodesAndIDs(OccurrenceFeatures o, boolean allowNumerics) { + Stream ids = Stream.of( - hashOrNull(o.getCatalogNumber(), false), - hashOrNull(o.getOccurrenceID(), false), + hashOrNull(o.getOccurrenceID(), allowNumerics), + hashOrNull(o.getRecordNumber(), allowNumerics), + hashOrNull(o.getFieldNumber(), allowNumerics), + hashOrNull(o.getCatalogNumber(), allowNumerics), concatIfEligible( ":", o.getInstitutionCode(), o.getCollectionCode(), o.getCatalogNumber()), concatIfEligible(":", o.getInstitutionCode(), o.getCatalogNumber())); - if (o.getOtherCatalogNumbers() != null) { - cats = - Stream.concat(cats, o.getOtherCatalogNumbers().stream().map(c -> hashOrNull(c, false))); + ids = + Stream.concat( + ids, o.getOtherCatalogNumbers().stream().map(c -> hashOrNull(c, allowNumerics))); } + return ids.filter(c -> isEligibleCode(c)).collect(Collectors.toSet()); + } - return cats.map(OccurrenceRelationships::normalizeID) - .filter(c -> isEligibleCode(c) && !isNumeric(c)) - .collect(Collectors.toSet()); + /** Return true of no nulls or empty strings provided */ + static boolean noNulls(Object... o) { + return Arrays.stream(o).noneMatch(s -> s == null || "".equals(s)); } } diff --git a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/RowOccurrenceFeatures.java b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/RowOccurrenceFeatures.java index 861408645f..2e039b3c73 100644 --- a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/RowOccurrenceFeatures.java +++ b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/RowOccurrenceFeatures.java @@ -130,7 +130,7 @@ public String asJson() throws IOException { @Override public String getId() { - return get("id"); + return get("gbifId"); } @Override diff --git a/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/SaltPrefixPartitioner.java b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/SaltPrefixPartitioner.java new file mode 100644 index 0000000000..86d19b082d --- /dev/null +++ b/gbif/ingestion/clustering-gbif/src/main/java/org/gbif/pipelines/clustering/SaltPrefixPartitioner.java @@ -0,0 +1,22 @@ +package org.gbif.pipelines.clustering; + +import lombok.AllArgsConstructor; +import org.apache.spark.Partitioner; +import scala.Tuple2; + +/** Partitions by the prefix on the given key extracted from the given key. */ +@AllArgsConstructor +class SaltPrefixPartitioner extends Partitioner { + private final int numPartitions; + + @Override + public int getPartition(Object key) { + String k = ((Tuple2) key)._1; + return Integer.parseInt(k.substring(0, k.indexOf(":"))); + } + + @Override + public int numPartitions() { + return numPartitions; + } +} diff --git a/gbif/ingestion/clustering-gbif/src/main/scala/org/gbif/pipelines/clustering/Cluster.scala b/gbif/ingestion/clustering-gbif/src/main/scala/org/gbif/pipelines/clustering/Cluster.scala deleted file mode 100755 index 24b8e87dfc..0000000000 --- a/gbif/ingestion/clustering-gbif/src/main/scala/org/gbif/pipelines/clustering/Cluster.scala +++ /dev/null @@ -1,391 +0,0 @@ -package org.gbif.pipelines.clustering - -import java.io.File -import org.apache.hadoop.hbase.client.HTable -import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat, HFileOutputFormat2} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.Partitioner -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} -import org.apache.spark.sql.{Row, SparkSession} -import org.gbif.pipelines.core.parsers.clustering.{OccurrenceRelationships, RelationshipAssertion} - -import scala.collection.JavaConversions._ - -object Cluster { - - /** - * Reads the salt from the encoded key structure. - */ - class SaltPartitioner(partitionCount: Int) extends Partitioner { - - def getPartition(key: Any): Int = { - // (salt:id1:id2:type, column) - val k = key.asInstanceOf[(String, String)]._1 - Integer.valueOf(k.substring(0, k.indexOf(":"))) - } - - override def numPartitions: Int = partitionCount - } - - // To aid running in Oozie, all properties are supplied as main arguments - val usage = """ - Usage: Cluster \ - [--hive-db database] \ - [--hive-table-hashed tableName] \ - [--hive-table-candidates tableName] \ - [--hive-table-relationships tableName] \ - [--hbase-table tableName] \ - [--hbase-regions numberOfRegions] \ - [--hbase-zk zookeeperEnsemble] \ - [--hfile-dir directoryForHFiles] - """ - - def main(args: Array[String]): Unit = { - val parsedArgs = checkArgs(args) // sanitize input - assert(parsedArgs.size==8, usage) - System.err.println("Configuration: " + parsedArgs) // Oozie friendly logging use - - val hiveDatabase = parsedArgs.get('hiveDatabase).get - val hiveTableHashed = parsedArgs.get('hiveTableHashed).get - val hiveTableCandidates = parsedArgs.get('hiveTableCandidates).get - val hiveTableRelationships = parsedArgs.get('hiveTableRelationships).get - val hbaseTable = parsedArgs.get('hbaseTable).get - val hbaseRegions = parsedArgs.get('hbaseRegions).get.toInt - val hbaseZK = parsedArgs.get('hbaseZK).get - val hfileDir = parsedArgs.get('hfileDir).get - - val warehouseLocation = new File("spark-warehouse").getAbsolutePath - - val spark = SparkSession - .builder() - .appName("Occurrence clustering") - .config("spark.sql.warehouse.dir", warehouseLocation) - .enableHiveSupport() - .getOrCreate() - import spark.sql - - spark.sql("use " + hiveDatabase) - - val runAll = true; // developers: set to false to short circuit to the clustering stage - - if (runAll) { - val occurrences = sql(SQL_OCCURRENCE) - - val schema = StructType( - StructField("gbifId", StringType, nullable = false) :: - StructField("datasetKey", StringType, nullable = false) :: - StructField("hash", StringType, nullable = false) :: Nil - ) - val hashEncoder = RowEncoder(schema) - - import spark.implicits._ - - // species+ids for specimens only - val hashSpecimenIds = occurrences.flatMap(r => { - val records = scala.collection.mutable.ListBuffer[Row]() - - // specimens often link by record identifiers, while occurrence data skews here greatly for little benefit - val bor = Option(r.getAs[String]("basisOfRecord")) - if (specimenBORs.contains(bor.getOrElse("ignore"))) { - var idsUsed = Set[Option[String]]( - Option(r.getAs[String]("occurrenceID")), - Option(r.getAs[String]("fieldNumber")), - Option(r.getAs[String]("recordNumber")), - Option(r.getAs[String]("catalogNumber")), - triplify(r), // ic:cc:cn format - scopeCatalogNumber(r) // ic:cn format like ENA uses - ) - - val other = Option(r.getAs[Seq[String]]("otherCatalogNumbers")) - if (!other.isEmpty && other.get.length>0) { - idsUsed ++= other.get.map(x => Option(x)) - } - - // clean IDs - val filteredIds = idsUsed.filter(s => { - s match { - case Some(s) => s!=null && s.length>0 && !omitIds.contains(s.toUpperCase()) - case _ => false - } - }) - - filteredIds.foreach(id => { - id match { - case Some(id) => - val speciesKey = Option(r.getAs[Int]("speciesKey")) - if (!speciesKey.isEmpty) { - records.append( - Row( - r.getAs[String]("gbifId"), - r.getAs[String]("datasetKey"), - speciesKey + "|" + OccurrenceRelationships.normalizeID(id) - )) - } - case None => // skip - } - }) - - // a special treatment for catalogNumber and otherCatalogNumber overlap - val features = new RowOccurrenceFeatures(r) - val hashedIDs = HashUtilities.hashCatalogNumbers(features) // normalised, non-numeric codes - hashedIDs.foreach(id => { - records.append( - Row( - r.getAs[String]("gbifId"), - r.getAs[String]("datasetKey"), - id // only the ID overlap is suffice here - )) - }) - } - records - })(hashEncoder).toDF() - - val hashAll = occurrences.flatMap(r => { - val records = scala.collection.mutable.ListBuffer[Row]() - - // all records of species at same location, time should be compared regardless of BOR - // TODO: consider improving this for null values (and when one side is null) - will the ID link above suffice? - val lat = Option(r.getAs[Double]("decimalLatitude")) - val lng = Option(r.getAs[Double]("decimalLongitude")) - val year = Option(r.getAs[Int]("year")) - val month = Option(r.getAs[Int]("month")) - val day = Option(r.getAs[Int]("day")) - val taxonKey = Option(r.getAs[Int]("taxonKey")) - val typeStatus = Option(r.getAs[Seq[String]]("typeStatus")) - val recordedBy = Option(r.getAs[Seq[String]]("recordedBy")) - val speciesKey = Option(r.getAs[Int]("speciesKey")) - - if (!lat.isEmpty && !lng.isEmpty && !year.isEmpty && !month.isEmpty && !day.isEmpty && !speciesKey.isEmpty) { - records.append( - Row( - r.getAs[String]("gbifId"), - r.getAs[String]("datasetKey"), - r.getAs[Integer]("speciesKey") + "|" + Math.round(lat.get*1000) + "|" + Math.round(lng.get*1000) + "|" + year.get + "|" + month.get + "|" + day.get - )) - } - // any type record of a taxon is of interest - if (!taxonKey.isEmpty && !typeStatus.isEmpty) { - typeStatus.get.foreach(t => { - records.append( - Row( - r.getAs[String]("gbifId"), - r.getAs[String]("datasetKey"), - taxonKey.get + "|" + t - )) - }) - } - - // all similar species recorded by the same person within the same year is of interest (misses recordings over new year) - if (!taxonKey.isEmpty && !year.isEmpty && !recordedBy.isEmpty) { - recordedBy.get.foreach(name => { - records.append( - Row( - r.getAs[String]("gbifId"), - r.getAs[String]("datasetKey"), - taxonKey.get + "|" + year.get + "|" + name - )) - }) - } - - records - })(hashEncoder).toDF() - - val deduplicatedHashedRecords = hashAll.union(hashSpecimenIds).dropDuplicates() - - // persist for debugging, enable for further processing in SQL - deduplicatedHashedRecords.write.saveAsTable(hiveTableHashed + "_all") // for diagnostics in hive - - //deduplicatedHashedRecords.createOrReplaceTempView("DF_hashed_all") - - // defend against NxN runtime issues by capping the number of records in a candidate group to 1000 - val hashCounts = deduplicatedHashedRecords.groupBy("hash").count().withColumnRenamed("count", "c"); - hashCounts.createOrReplaceTempView("DF_hash_counts") - val hashedFiltered = sql("SELECT t1.gbifID, t1.datasetKey, t1.hash " + - "FROM " + hiveTableHashed + "_all t1 JOIN DF_hash_counts t2 ON t1.hash=t2.hash " + - "WHERE t2.c <= 1000") - hashedFiltered.write.saveAsTable(hiveTableHashed) // for diagnostics in hive - - //hashedFiltered.createOrReplaceTempView("DF_hashed") - - // Cross join to distinct pairs of records spanning 2 datasets - val candidates = sql("SELECT t1.gbifId as id1, t1.datasetKey as ds1, t2.gbifId as id2, t2.datasetKey as ds2 " + - "FROM " + - hiveTableHashed + " t1 JOIN " + hiveTableHashed + " t2 ON t1.hash = t2.hash " + - "WHERE " + - " t1.gbifId < t2.gbifId AND " + - " t1.datasetKey != t2.datasetKey " + - "GROUP BY t1.gbifId, t1.datasetKey, t2.gbifId, t2.datasetKey"); - - candidates.write.saveAsTable(hiveTableCandidates) // for diagnostics in hive - } - - // Spark DF naming convention requires that we alias each term to avoid naming collision while still having - // named fields to access (i.e. not relying the column number of the term). All taxa keys are converted to String - // to allow shared routines between GBIF and ALA (https://github.com/gbif/pipelines/issues/484) - val pairs = sql(""" - SELECT - - t1.gbifId AS t1_gbifId, t1.datasetKey AS t1_datasetKey, t1.basisOfRecord AS t1_basisOfRecord, t1.publishingorgkey AS t1_publishingOrgKey, t1.datasetName AS t1_datasetName, t1.publisher AS t1_publishingOrgName, - CAST(t1.kingdomKey AS String) AS t1_kingdomKey, CAST(t1.phylumKey AS String) AS t1_phylumKey, CAST(t1.classKey AS String) AS t1_classKey, CAST(t1.orderKey AS String) AS t1_orderKey, CAST(t1.familyKey AS String) AS t1_familyKey, CAST(t1.genusKey AS String) AS t1_genusKey, CAST(t1.speciesKey AS String) AS t1_speciesKey, CAST(t1.acceptedTaxonKey AS String) AS t1_acceptedTaxonKey, CAST(t1.taxonKey AS String) AS t1_taxonKey, - t1.scientificName AS t1_scientificName, t1.acceptedScientificName AS t1_acceptedScientificName, t1.kingdom AS t1_kingdom, t1.phylum AS t1_phylum, t1.order_ AS t1_order, t1.family AS t1_family, t1.genus AS t1_genus, t1.species AS t1_species, t1.genericName AS t1_genericName, t1.specificEpithet AS t1_specificEpithet, t1.taxonRank AS t1_taxonRank, - t1.typeStatus AS t1_typeStatus, t1.preparations AS t1_preparations, - t1.decimalLatitude AS t1_decimalLatitude, t1.decimalLongitude AS t1_decimalLongitude, t1.countryCode AS t1_countryCode, - t1.year AS t1_year, t1.month AS t1_month, t1.day AS t1_day, from_unixtime(floor(t1.eventDate/1000)) AS t1_eventDate, - t1.recordNumber AS t1_recordNumber, t1.fieldNumber AS t1_fieldNumber, t1.occurrenceID AS t1_occurrenceID, t1.otherCatalogNumbers AS t1_otherCatalogNumbers, t1.institutionCode AS t1_institutionCode, t1.collectionCode AS t1_collectionCode, t1.catalogNumber AS t1_catalogNumber, - t1.recordedBy AS t1_recordedBy, t1.recordedByID AS t1_recordedByID, - t1.ext_multimedia AS t1_media, - - t2.gbifId AS t2_gbifId, t2.datasetKey AS t2_datasetKey, t2.basisOfRecord AS t2_basisOfRecord, t2.publishingorgkey AS t2_publishingOrgKey, t2.datasetName AS t2_datasetName, t2.publisher AS t2_publishingOrgName, - CAST(t2.kingdomKey AS String) AS t2_kingdomKey, CAST(t2.phylumKey AS String) AS t2_phylumKey, CAST(t2.classKey AS String) AS t2_classKey, CAST(t2.orderKey AS String) AS t2_orderKey, CAST(t2.familyKey AS String) AS t2_familyKey, CAST(t2.genusKey AS String) AS t2_genusKey, CAST(t2.speciesKey AS String) AS t2_speciesKey, CAST(t2.acceptedTaxonKey AS String) AS t2_acceptedTaxonKey, CAST(t2.taxonKey AS String) AS t2_taxonKey, - t2.scientificName AS t2_scientificName, t2.acceptedScientificName AS t2_acceptedScientificName, t2.kingdom AS t2_kingdom, t2.phylum AS t2_phylum, t2.order_ AS t2_order, t2.family AS t2_family, t2.genus AS t2_genus, t2.species AS t2_species, t2.genericName AS t2_genericName, t2.specificEpithet AS t2_specificEpithet, t2.taxonRank AS t2_taxonRank, - t2.typeStatus AS t2_typeStatus, t2.preparations AS t2_preparations, - t2.decimalLatitude AS t2_decimalLatitude, t2.decimalLongitude AS t2_decimalLongitude, t2.countryCode AS t2_countryCode, - t2.year AS t2_year, t2.month AS t2_month, t2.day AS t2_day, from_unixtime(floor(t2.eventDate/1000)) AS t2_eventDate, - t2.recordNumber AS t2_recordNumber, t2.fieldNumber AS t2_fieldNumber, t2.occurrenceID AS t2_occurrenceID, t2.otherCatalogNumbers AS t2_otherCatalogNumbers, t2.institutionCode AS t2_institutionCode, t2.collectionCode AS t2_collectionCode, t2.catalogNumber AS t2_catalogNumber, - t2.recordedBy AS t2_recordedBy, t2.recordedByID AS t2_recordedByID, - t2.ext_multimedia AS t2_media - - FROM """ + hiveTableCandidates + """ h - JOIN occurrence t1 ON h.id1 = t1.gbifID - JOIN occurrence t2 ON h.id2 = t2.gbifID - """); - - import org.apache.spark.sql.types._ - // schema holds redundant information, but aids diagnostics in Hive at low cost - val relationshipSchema = StructType( - StructField("id1", StringType, nullable = false) :: - StructField("id2", StringType, nullable = false) :: - StructField("reasons", StringType, nullable = false) :: - StructField("dataset1", StringType, nullable = false) :: - StructField("dataset2", StringType, nullable = false) :: - StructField("o1", StringType, nullable = false) :: - StructField("o2", StringType, nullable = false) :: Nil - - ) - val relationshipEncoder = RowEncoder(relationshipSchema) - - val relationships = pairs.flatMap(row => { - val records = scala.collection.mutable.ListBuffer[Row]() - - val o1 = new RowOccurrenceFeatures(row, "t1_", "t1_media") - val o2 = new RowOccurrenceFeatures(row, "t2_", "t2_media") - - val relationships: Option[RelationshipAssertion[RowOccurrenceFeatures]] = Option(OccurrenceRelationships.generate(o1,o2)) - relationships match { - case Some(r) => { - // store both ways - records.append(Row( - r.getOcc1.get("gbifId"), - r.getOcc2.get("gbifId"), - r.getJustificationAsDelimited, - r.getOcc1.get("datasetKey"), - r.getOcc2.get("datasetKey"), - r.getOcc1.asJson(), - r.getOcc2.asJson())) - - records.append(Row( - r.getOcc2.get("gbifId"), - r.getOcc1.get("gbifId"), - r.getJustificationAsDelimited, - r.getOcc2.get("datasetKey"), - r.getOcc1.get("datasetKey"), - r.getOcc2.asJson(), - r.getOcc1.asJson())) - } - case None => // skip - } - - records - - })(relationshipEncoder).toDF().dropDuplicates() - - relationships.write.saveAsTable(hiveTableRelationships) // for diagnostics in hive - - // convert to HBase, with modulo salted keys - val relationshipsSorted = relationships.rdd.flatMap(r => { - // index based access as cannot access by schema using flatMap and rdd - val id1 = r.getString(0) - val id2 = r.getString(1) - val relationshipType = r.getString(2) - val dataset1 = r.getString(3) - val dataset2 = r.getString(4) - val occurrence1 = r.getString(5) - val occurrence2 = r.getString(6) - - // we salt in HBase only on the id1 to enable prefix scanning using an occurrence ID - val salt = Math.abs(id1.hashCode) % hbaseRegions - - val saltedRowKey = salt + ":" + id1 + ":" + id2 - val cells = scala.collection.mutable.ListBuffer[((String, String), String)]() - - // while only occurrence2 is needed it is not expensive to store each which aids diagnostics - cells.append(((saltedRowKey, "id1"),id1)) - cells.append(((saltedRowKey, "id2"),id2)) - cells.append(((saltedRowKey, "reasons"),relationshipType)) - cells.append(((saltedRowKey, "dataset1"),dataset1)) - cells.append(((saltedRowKey, "dataset2"),dataset2)) - cells.append(((saltedRowKey, "occurrence1"),occurrence1)) - cells.append(((saltedRowKey, "occurrence2"),occurrence2)) - - cells - }).repartitionAndSortWithinPartitions(new SaltPartitioner(hbaseRegions)).map(cell => { - val k = new ImmutableBytesWritable(Bytes.toBytes(cell._1._1)) - val row = new KeyValue(Bytes.toBytes(cell._1._1), // key - Bytes.toBytes("o"), // column family - Bytes.toBytes(cell._1._2), // cell - Bytes.toBytes(cell._2) // cell value - ) - - (k, row) - }) - - val conf = HBaseConfiguration.create() - conf.set("hbase.zookeeper.quorum", hbaseZK); - // NOTE: job creates a copy of the conf - val job = new Job(conf,"Relationships") // name not actually used since we don't submit MR - job.setJarByClass(this.getClass) - val table = new HTable(conf, hbaseTable) - HFileOutputFormat2.configureIncrementalLoad(job, table); - val conf2 = job.getConfiguration // important - - relationshipsSorted.saveAsNewAPIHadoopFile(hfileDir, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf2) - } - - /** - * Sanitizes application arguments. - */ - private def checkArgs(args: Array[String]) : Map[Symbol, String] = { - assert(args != null && args.length==16, usage) - - def nextOption(map : Map[Symbol, String], list: List[String]) : Map[Symbol, String] = { - def isSwitch(s : String) = (s(0) == '-') - list match { - case Nil => map - case "--hive-db" :: value :: tail => - nextOption(map ++ Map('hiveDatabase -> value), tail) - case "--hive-table-hashed" :: value :: tail => - nextOption(map ++ Map('hiveTableHashed -> value), tail) - case "--hive-table-candidates" :: value :: tail => - nextOption(map ++ Map('hiveTableCandidates -> value), tail) - case "--hive-table-relationships" :: value :: tail => - nextOption(map ++ Map('hiveTableRelationships -> value), tail) - case "--hbase-table" :: value :: tail => - nextOption(map ++ Map('hbaseTable -> value), tail) - case "--hbase-regions" :: value :: tail => - nextOption(map ++ Map('hbaseRegions -> value), tail) - case "--hbase-zk" :: value :: tail => - nextOption(map ++ Map('hbaseZK -> value), tail) - case "--hfile-dir" :: value :: tail => - nextOption(map ++ Map('hfileDir -> value), tail) - case option :: tail => println("Unknown option "+option) - System.exit(1) - map - } - } - nextOption(Map(), args.toList) - } -} diff --git a/gbif/ingestion/clustering-gbif/src/main/scala/org/gbif/pipelines/clustering/package.scala b/gbif/ingestion/clustering-gbif/src/main/scala/org/gbif/pipelines/clustering/package.scala deleted file mode 100644 index c028cdb0fa..0000000000 --- a/gbif/ingestion/clustering-gbif/src/main/scala/org/gbif/pipelines/clustering/package.scala +++ /dev/null @@ -1,52 +0,0 @@ -package org.gbif.pipelines - -import org.apache.spark.sql.Row - -package object clustering { - // IDs to skip - val omitIds = List("NO APLICA", "NA", "[]", "NO DISPONIBLE", "NO DISPONIBL", "NO NUMBER", "--", "UNKNOWN") - - // SPECIMENS - val specimenBORs = List("PRESERVED_SPECIMEN", "MATERIAL_SAMPLE", "LIVING_SPECIMEN", "FOSSIL_SPECIMEN", "MATERIAL_CITATION") - - // SQL to extract fields necessary for grouping for candidate pairs - val SQL_OCCURRENCE = """ -SELECT - gbifId, datasetKey, basisOfRecord, publishingorgkey, datasetName, publisher, - kingdomKey, phylumKey, classKey, orderKey, familyKey, genusKey, speciesKey, acceptedTaxonKey, taxonKey, - scientificName, acceptedScientificName, kingdom, phylum, order_ AS order, family, genus, species, genericName, specificEpithet, taxonRank, - typeStatus, preparations, - decimalLatitude, decimalLongitude, countryCode, - year, month, day, from_unixtime(floor(eventDate/1000)) AS eventDate, - recordNumber, fieldNumber, occurrenceID, otherCatalogNumbers, institutionCode, collectionCode, catalogNumber, - recordedBy, recordedByID, - ext_multimedia -FROM occurrence -""" - - case class SimpleOccurrence(gbifID: String, decimalLatitude: Double) - - /** - * @return A triplified version of the codes if all present, otherwise None - */ - def triplify(r: Row) : Option[String] = { - val ic = Option(r.getAs[String]("institutionCode")); - val cc = Option(r.getAs[String]("collectionCode")); - val cn = Option(r.getAs[String]("catalogNumber")); - - if (!ic.isEmpty && !cc.isEmpty && !cn.isEmpty) Option(ic.get + ":" + cc.get + ":" + cn.get) - else None - } - - /** - * @return The catalogNumber prefixed by the institutionCode if both present, otherwise None - */ - def scopeCatalogNumber(r: Row) : Option[String] = { - // This format of catalog number used by e.g. ENA datasets - val ic = Option(r.getAs[String]("institutionCode")); - val cn = Option(r.getAs[String]("catalogNumber")); - - if (!ic.isEmpty && !cn.isEmpty) Option(ic.get + ":" + cn.get) - else None - } -} diff --git a/gbif/ingestion/pom.xml b/gbif/ingestion/pom.xml index 640422d74e..eaa406794a 100644 --- a/gbif/ingestion/pom.xml +++ b/gbif/ingestion/pom.xml @@ -17,6 +17,7 @@ clustering-gbif + clustering-gbif-oozie ingest-gbif-beam ingest-gbif-fragmenter ingest-gbif-java diff --git a/gbif/validator/validator-checklists/src/main/java/org/gbif/pipelines/validator/checklists/cli/ChecklistValidatorService.java b/gbif/validator/validator-checklists/src/main/java/org/gbif/pipelines/validator/checklists/cli/ChecklistValidatorService.java index 1048ca878d..40f1a73838 100644 --- a/gbif/validator/validator-checklists/src/main/java/org/gbif/pipelines/validator/checklists/cli/ChecklistValidatorService.java +++ b/gbif/validator/validator-checklists/src/main/java/org/gbif/pipelines/validator/checklists/cli/ChecklistValidatorService.java @@ -53,8 +53,7 @@ public static ValidationWsClient createValidationWsClient( @Override protected void startUp() throws Exception { - log.info( - "Started pipelines-validator-checklist-validator service with parameters : {}", config); + log.info("Started pipelines-validator-checklist-validator service"); // Prefetch is one, since this is a long-running process. listener = new MessageListener(config.messaging.getConnectionParameters(), 1); publisher = new DefaultMessagePublisher(config.messaging.getConnectionParameters()); diff --git a/livingatlas/pipelines/pom.xml b/livingatlas/pipelines/pom.xml index 3d966eae84..fc442a6186 100644 --- a/livingatlas/pipelines/pom.xml +++ b/livingatlas/pipelines/pom.xml @@ -302,6 +302,11 @@ org.gbif.registry registry-ws-client + + + + +
diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java index e88eda5361..b23bca7855 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexRecordTransform.java @@ -446,31 +446,13 @@ private static void applyMultimedia( if (!images.isEmpty()) { indexRecord.getStrings().put(IMAGE_ID, isr.getImageItems().get(0).getIdentifier()); - indexRecord - .getMultiValues() - .put( - IMAGE_IDS, - isr.getImageItems().stream() - .map(Image::getIdentifier) - .collect(Collectors.toList())); + indexRecord.getMultiValues().put(IMAGE_IDS, images); } if (!sounds.isEmpty()) { - indexRecord - .getMultiValues() - .put( - SOUND_IDS, - isr.getImageItems().stream() - .map(Image::getIdentifier) - .collect(Collectors.toList())); + indexRecord.getMultiValues().put(SOUND_IDS, sounds); } if (!videos.isEmpty()) { - indexRecord - .getMultiValues() - .put( - VIDEO_IDS, - isr.getImageItems().stream() - .map(Image::getIdentifier) - .collect(Collectors.toList())); + indexRecord.getMultiValues().put(VIDEO_IDS, videos); } List mir = @@ -886,6 +868,8 @@ public static Set getAddedValues() { .add(DwcTerm.associatedOccurrences.simpleName()) .add(DwcTerm.identifiedByID.simpleName()) .add(DwcTerm.recordedByID.simpleName()) + .add(STATE_CONSERVATION) + .add(COUNTRY_CONSERVATION) .build(); } @@ -907,12 +891,6 @@ private static void addSpeciesListInfo( for (ConservationStatus conservationStatus : conservationStatuses) { if (conservationStatus.getRegion() != null) { if (conservationStatus.getRegion().equalsIgnoreCase(stateProvince)) { - - if (isNotBlank(conservationStatus.getSourceStatus())) { - indexRecord - .getStrings() - .put(RAW_STATE_CONSERVATION, conservationStatus.getSourceStatus()); - } if (isNotBlank(conservationStatus.getStatus())) { indexRecord.getStrings().put(STATE_CONSERVATION, conservationStatus.getStatus()); } diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/beam/CompleteIngestPipelineTestIT.java b/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/beam/CompleteIngestPipelineTestIT.java index c7b4323085..b70c4a6ee1 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/beam/CompleteIngestPipelineTestIT.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/beam/CompleteIngestPipelineTestIT.java @@ -150,6 +150,10 @@ public static void checkSingleRecordContent(String currentIndexName) throws Exce assertEquals("id1|id2", record.get().get("raw_identifiedByID")); assertEquals("id1", ((List) record.get().get("identifiedByID")).get(0)); assertEquals("id2", ((List) record.get().get("identifiedByID")).get(1)); + + // raw state and raw country conservation + assertEquals("Extinct", record.get().get("raw_stateConservation")); + assertEquals("Vulnerable", record.get().get("raw_countryConservation")); } public void loadTestDataset(String datasetID, String inputPath) throws Exception { diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java b/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java index 3037e7c4e4..bfbc4ef70c 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/pipelines/converters/CoreTsvConverterTest.java @@ -826,26 +826,45 @@ public void converterTest() { .setFirstLoaded(6L) .build(); + Image im1 = + Image.newBuilder() + .setCreated("ir_Image") + .setAudience("ir_Audienc") + .setCreator("ir_Creator") + .setContributor("ir_Contributor") + .setDatasetId("ir_DatasetId") + .setLicense("ir_License") + .setLatitude(77d) + .setLongitude(777d) + .setSpatial("ir_Spatial") + .setTitle("ir_Title") + .setRightsHolder("ir_RightsHolder") + .setIdentifier("ir_Identifier1") + .setFormat("image") + .build(); + + Image im2 = + Image.newBuilder() + .setCreated("ir_Audio") + .setAudience("ir_Audienc") + .setCreator("ir_Creator") + .setContributor("ir_Contributor") + .setDatasetId("ir_DatasetId") + .setLicense("ir_License") + .setLatitude(77d) + .setLongitude(777d) + .setSpatial("ir_Spatial") + .setTitle("ir_Title") + .setRightsHolder("ir_RightsHolder") + .setIdentifier("ir_Identifier2") + .setFormat("audio") + .build(); + ImageRecord ir = ImageRecord.newBuilder() .setId(DwcTerm.occurrenceID.simpleName()) .setCreated(7L) - .setImageItems( - Collections.singletonList( - Image.newBuilder() - .setCreated("ir_Image") - .setAudience("ir_Audienc") - .setCreator("ir_Creator") - .setContributor("ir_Contributor") - .setDatasetId("ir_DatasetId") - .setLicense("ir_License") - .setLatitude(77d) - .setLongitude(777d) - .setSpatial("ir_Spatial") - .setTitle("ir_Title") - .setRightsHolder("ir_RightsHolder") - .setIdentifier("ir_Identifier") - .build())) + .setImageItems(Arrays.asList(im1, im2)) .build(); TaxonProfile tp = TaxonProfile.newBuilder().setId(DwcTerm.occurrenceID.simpleName()).build(); @@ -903,6 +922,10 @@ public void converterTest() { // Should Assert.assertEquals(String.join("\t", expected), result); + + Assert.assertEquals(1, source.getMultiValues().get("imageIDs").size()); + Assert.assertEquals(1, source.getMultiValues().get("soundIDs").size()); + Assert.assertNull(source.getMultiValues().get("videoIDs")); } @Test diff --git a/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/meta.xml b/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/meta.xml index 376fecdda4..7ff8ba40e2 100755 --- a/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/meta.xml +++ b/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/meta.xml @@ -52,5 +52,7 @@ + + diff --git a/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/occurrence.csv b/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/occurrence.csv index 934387d5fd..b9d94f985b 100644 --- a/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/occurrence.csv +++ b/livingatlas/pipelines/src/test/resources/complete-pipeline-java/dr893/occurrence.csv @@ -1,6 +1,6 @@ -not-an-uuid-1,,,,http://www.bowerbird.org.au/observations/49875,,Jenny Holmes,,,MORDELLIDAE,urn:lsid:biodiversity.org.au:afd.taxon:58c067e4-a55c-45bf-894c-e5edb7c771d8,family,Animalia,Arthropoda,Insecta,Coleoptera,Mordellidae,,,-37.1516626,142.8546633,EPSG:4326,,,,,,Australia,Victoria,,,2015,11,27,27/11/15,,,Pintail,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Coleoptera | Mordellidae | | ]],,https://biocache.ala.org.au/occurrences/fffb8bf8-caf2-47f5-8ed8-e0777570e870,nonDwcFieldSalinity,native,native,id1|id2, -not-an-uuid-2,,,,http://www.bowerbird.org.au/observations/48531,,Adam Edmonds,,,Scoparia crocospila,urn:lsid:biodiversity.org.au:afd.taxon:0b444e3f-22a2-4809-b102-a0ac65636425,species,Animalia,Arthropoda,Insecta,Lepidoptera,Crambidae,Scoparia,,-38.211987,144.322971,EPSG:4326,,,,,,Australia,Victoria,,,2015,10,2,2/10/15,,,Moth Grovedale 2-10-15,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Pyralidae | Scoparia | ]],,https://biocache.ala.org.au/occurrences/fffcb15c-2a7f-4c32-ad80-08b3c03a40f3,salty,captive,introduced,,id3|id4 -not-an-uuid-3,,,,http://www.bowerbird.org.au/observations/98064,,Janet Grevillea,,,Argiope keyserlingi,urn:lsid:biodiversity.org.au:afd.taxon:22b63a56-adb9-468b-b086-e6552f48cee3,species,Animalia,Arthropoda,Arachnida,Araneae,Araneidae,Argiope,,-33.06852769,151.6010284,EPSG:4326,,,,,,Australia,New South Wales,,,2017,11,2,2/11/17,,,St Andrews Cross spider,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Arachnida | Araneae | Araneidae | Argiope | ]],,https://biocache.ala.org.au/occurrences/fffdad7f-93df-4ea8-97f8-b4a78bab1021,fresh,native,introduced,id1,id3 -not-an-uuid-4,,,,http://www.bowerbird.org.au/observations/4810,,Reiner Richter,,,Rattus fuscipes,urn:lsid:biodiversity.org.au:afd.taxon:96e81ce0-ba02-4f33-b831-b598eece44a7,species,Animalia,Chordata,Mammalia,Rodentia,Muridae,Rattus,Bush Rat,-38.1369,145.2833,EPSG:4326,,,,,,Australia,Victoria,,,2013,7,31,31/7/13,,,Bush Rat,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Chordata | Mammalia | Rodentia | Muridae | Rattus | ]],,https://biocache.ala.org.au/occurrences/fffe4a61-a032-45e4-b5b4-53ae230e804a,fresh,captive,other,id1|id2,id3|id4 -not-an-uuid-5,,,,http://www.bowerbird.org.au/observations/76433,,Carol Page,,,Scioglyptis chionomera,urn:lsid:biodiversity.org.au:afd.taxon:d3c54055-181d-4135-87b7-97920e6bd73e,species,Animalia,Arthropoda,Insecta,Lepidoptera,Geometridae,Scioglyptis,,-37.9909881,145.1256931,EPSG:4326,,,,,,Australia,Victoria,,,2016,11,26,26/11/16,,,Tattered Geometrid,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Geometridae | Scioglyptis | ]],,https://biocache.ala.org.au/occurrences/ffff4965-d9bc-4baa-a7dd-84ba594fc298,salty,other,native,id1|id2,id3 -not-an-uuid-6,,,,http://www.bowerbird.org.au/observations/49878,,Jenny Holmes,,,Erythrotriorchis radiatus,urn:lsid:biodiversity.org.au:afd.taxon:1405a1d4-557c-40ac-9f44-a6d41e9136cd,species,Animalia,,,,,,,-35.260319,149.425934,EPSG:4326,,,,,,Australia,New South Wales,,,2015,11,27,27/11/15,,,,,,,,, +not-an-uuid-1,,,,http://www.bowerbird.org.au/observations/49875,,Jenny Holmes,,,MORDELLIDAE,urn:lsid:biodiversity.org.au:afd.taxon:58c067e4-a55c-45bf-894c-e5edb7c771d8,family,Animalia,Arthropoda,Insecta,Coleoptera,Mordellidae,,,-37.1516626,142.8546633,EPSG:4326,,,,,,Australia,Victoria,,,2015,11,27,27/11/15,,,Pintail,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Coleoptera | Mordellidae | | ]],,https://biocache.ala.org.au/occurrences/fffb8bf8-caf2-47f5-8ed8-e0777570e870,nonDwcFieldSalinity,native,native,id1|id2,,Endangered,Vulnerable +not-an-uuid-2,,,,http://www.bowerbird.org.au/observations/48531,,Adam Edmonds,,,Scoparia crocospila,urn:lsid:biodiversity.org.au:afd.taxon:0b444e3f-22a2-4809-b102-a0ac65636425,species,Animalia,Arthropoda,Insecta,Lepidoptera,Crambidae,Scoparia,,-38.211987,144.322971,EPSG:4326,,,,,,Australia,Victoria,,,2015,10,2,2/10/15,,,Moth Grovedale 2-10-15,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Pyralidae | Scoparia | ]],,https://biocache.ala.org.au/occurrences/fffcb15c-2a7f-4c32-ad80-08b3c03a40f3,salty,captive,introduced,,id3|id4,Endangered, +not-an-uuid-3,,,,http://www.bowerbird.org.au/observations/98064,,Janet Grevillea,,,Argiope keyserlingi,urn:lsid:biodiversity.org.au:afd.taxon:22b63a56-adb9-468b-b086-e6552f48cee3,species,Animalia,Arthropoda,Arachnida,Araneae,Araneidae,Argiope,,-33.06852769,151.6010284,EPSG:4326,,,,,,Australia,New South Wales,,,2017,11,2,2/11/17,,,St Andrews Cross spider,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Arachnida | Araneae | Araneidae | Argiope | ]],,https://biocache.ala.org.au/occurrences/fffdad7f-93df-4ea8-97f8-b4a78bab1021,fresh,native,introduced,id1,id3,,Vulnerable +not-an-uuid-4,,,,http://www.bowerbird.org.au/observations/4810,,Reiner Richter,,,Rattus fuscipes,urn:lsid:biodiversity.org.au:afd.taxon:96e81ce0-ba02-4f33-b831-b598eece44a7,species,Animalia,Chordata,Mammalia,Rodentia,Muridae,Rattus,Bush Rat,-38.1369,145.2833,EPSG:4326,,,,,,Australia,Victoria,,,2013,7,31,31/7/13,,,Bush Rat,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Chordata | Mammalia | Rodentia | Muridae | Rattus | ]],,https://biocache.ala.org.au/occurrences/fffe4a61-a032-45e4-b5b4-53ae230e804a,fresh,captive,other,id1|id2,id3|id4,, +not-an-uuid-5,,,,http://www.bowerbird.org.au/observations/76433,,Carol Page,,,Scioglyptis chionomera,urn:lsid:biodiversity.org.au:afd.taxon:d3c54055-181d-4135-87b7-97920e6bd73e,species,Animalia,Arthropoda,Insecta,Lepidoptera,Geometridae,Scioglyptis,,-37.9909881,145.1256931,EPSG:4326,,,,,,Australia,Victoria,,,2016,11,26,26/11/16,,,Tattered Geometrid,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Geometridae | Scioglyptis | ]],,https://biocache.ala.org.au/occurrences/ffff4965-d9bc-4baa-a7dd-84ba594fc298,salty,other,native,id1|id2,id3,Extinct,Vulnerable +not-an-uuid-6,,,,http://www.bowerbird.org.au/observations/49878,,Jenny Holmes,,,Erythrotriorchis radiatus,urn:lsid:biodiversity.org.au:afd.taxon:1405a1d4-557c-40ac-9f44-a6d41e9136cd,species,Animalia,,,,,,,-35.260319,149.425934,EPSG:4326,,,,,,Australia,New South Wales,,,2015,11,27,27/11/15,,,,,,,,,,,,, diff --git a/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/meta.xml b/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/meta.xml index 376fecdda4..7ff8ba40e2 100755 --- a/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/meta.xml +++ b/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/meta.xml @@ -52,5 +52,7 @@ + + diff --git a/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/occurrence.csv b/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/occurrence.csv index 934387d5fd..2b5ffd6e82 100644 --- a/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/occurrence.csv +++ b/livingatlas/pipelines/src/test/resources/complete-pipeline/dr893/occurrence.csv @@ -1,6 +1,6 @@ -not-an-uuid-1,,,,http://www.bowerbird.org.au/observations/49875,,Jenny Holmes,,,MORDELLIDAE,urn:lsid:biodiversity.org.au:afd.taxon:58c067e4-a55c-45bf-894c-e5edb7c771d8,family,Animalia,Arthropoda,Insecta,Coleoptera,Mordellidae,,,-37.1516626,142.8546633,EPSG:4326,,,,,,Australia,Victoria,,,2015,11,27,27/11/15,,,Pintail,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Coleoptera | Mordellidae | | ]],,https://biocache.ala.org.au/occurrences/fffb8bf8-caf2-47f5-8ed8-e0777570e870,nonDwcFieldSalinity,native,native,id1|id2, -not-an-uuid-2,,,,http://www.bowerbird.org.au/observations/48531,,Adam Edmonds,,,Scoparia crocospila,urn:lsid:biodiversity.org.au:afd.taxon:0b444e3f-22a2-4809-b102-a0ac65636425,species,Animalia,Arthropoda,Insecta,Lepidoptera,Crambidae,Scoparia,,-38.211987,144.322971,EPSG:4326,,,,,,Australia,Victoria,,,2015,10,2,2/10/15,,,Moth Grovedale 2-10-15,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Pyralidae | Scoparia | ]],,https://biocache.ala.org.au/occurrences/fffcb15c-2a7f-4c32-ad80-08b3c03a40f3,salty,captive,introduced,,id3|id4 -not-an-uuid-3,,,,http://www.bowerbird.org.au/observations/98064,,Janet Grevillea,,,Argiope keyserlingi,urn:lsid:biodiversity.org.au:afd.taxon:22b63a56-adb9-468b-b086-e6552f48cee3,species,Animalia,Arthropoda,Arachnida,Araneae,Araneidae,Argiope,,-33.06852769,151.6010284,EPSG:4326,,,,,,Australia,New South Wales,,,2017,11,2,2/11/17,,,St Andrews Cross spider,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Arachnida | Araneae | Araneidae | Argiope | ]],,https://biocache.ala.org.au/occurrences/fffdad7f-93df-4ea8-97f8-b4a78bab1021,fresh,native,introduced,id1,id3 -not-an-uuid-4,,,,http://www.bowerbird.org.au/observations/4810,,Reiner Richter,,,Rattus fuscipes,urn:lsid:biodiversity.org.au:afd.taxon:96e81ce0-ba02-4f33-b831-b598eece44a7,species,Animalia,Chordata,Mammalia,Rodentia,Muridae,Rattus,Bush Rat,-38.1369,145.2833,EPSG:4326,,,,,,Australia,Victoria,,,2013,7,31,31/7/13,,,Bush Rat,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Chordata | Mammalia | Rodentia | Muridae | Rattus | ]],,https://biocache.ala.org.au/occurrences/fffe4a61-a032-45e4-b5b4-53ae230e804a,fresh,captive,other,id1|id2,id3|id4 -not-an-uuid-5,,,,http://www.bowerbird.org.au/observations/76433,,Carol Page,,,Scioglyptis chionomera,urn:lsid:biodiversity.org.au:afd.taxon:d3c54055-181d-4135-87b7-97920e6bd73e,species,Animalia,Arthropoda,Insecta,Lepidoptera,Geometridae,Scioglyptis,,-37.9909881,145.1256931,EPSG:4326,,,,,,Australia,Victoria,,,2016,11,26,26/11/16,,,Tattered Geometrid,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Geometridae | Scioglyptis | ]],,https://biocache.ala.org.au/occurrences/ffff4965-d9bc-4baa-a7dd-84ba594fc298,salty,other,native,id1|id2,id3 -not-an-uuid-6,,,,http://www.bowerbird.org.au/observations/49878,,Jenny Holmes,,,Erythrotriorchis radiatus,urn:lsid:biodiversity.org.au:afd.taxon:1405a1d4-557c-40ac-9f44-a6d41e9136cd,species,Animalia,,,,,,,-35.260319,149.425934,EPSG:4326,,,,,,Australia,New South Wales,,,2015,11,27,27/11/15,,,,,,,,, +not-an-uuid-1,,,,http://www.bowerbird.org.au/observations/49875,,Jenny Holmes,,,MORDELLIDAE,urn:lsid:biodiversity.org.au:afd.taxon:58c067e4-a55c-45bf-894c-e5edb7c771d8,family,Animalia,Arthropoda,Insecta,Coleoptera,Mordellidae,,,-37.1516626,142.8546633,EPSG:4326,,,,,,Australia,Victoria,,,2015,11,27,27/11/15,,,Pintail,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Coleoptera | Mordellidae | | ]],,https://biocache.ala.org.au/occurrences/fffb8bf8-caf2-47f5-8ed8-e0777570e870,nonDwcFieldSalinity,native,native,id1|id2,,Endangered,Vulnerable +not-an-uuid-2,,,,http://www.bowerbird.org.au/observations/48531,,Adam Edmonds,,,Scoparia crocospila,urn:lsid:biodiversity.org.au:afd.taxon:0b444e3f-22a2-4809-b102-a0ac65636425,species,Animalia,Arthropoda,Insecta,Lepidoptera,Crambidae,Scoparia,,-38.211987,144.322971,EPSG:4326,,,,,,Australia,Victoria,,,2015,10,2,2/10/15,,,Moth Grovedale 2-10-15,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Pyralidae | Scoparia | ]],,https://biocache.ala.org.au/occurrences/fffcb15c-2a7f-4c32-ad80-08b3c03a40f3,salty,captive,introduced,,id3|id4,Endangered, +not-an-uuid-3,,,,http://www.bowerbird.org.au/observations/98064,,Janet Grevillea,,,Argiope keyserlingi,urn:lsid:biodiversity.org.au:afd.taxon:22b63a56-adb9-468b-b086-e6552f48cee3,species,Animalia,Arthropoda,Arachnida,Araneae,Araneidae,Argiope,,-33.06852769,151.6010284,EPSG:4326,,,,,,Australia,New South Wales,,,2017,11,2,2/11/17,,,St Andrews Cross spider,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Arachnida | Araneae | Araneidae | Argiope | ]],,https://biocache.ala.org.au/occurrences/fffdad7f-93df-4ea8-97f8-b4a78bab1021,fresh,native,introduced,id1,id3,,Vulnerable +not-an-uuid-4,,,,http://www.bowerbird.org.au/observations/4810,,Reiner Richter,,,Rattus fuscipes,urn:lsid:biodiversity.org.au:afd.taxon:96e81ce0-ba02-4f33-b831-b598eece44a7,species,Animalia,Chordata,Mammalia,Rodentia,Muridae,Rattus,Bush Rat,-38.1369,145.2833,EPSG:4326,,,,,,Australia,Victoria,,,2013,7,31,31/7/13,,,Bush Rat,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Chordata | Mammalia | Rodentia | Muridae | Rattus | ]],,https://biocache.ala.org.au/occurrences/fffe4a61-a032-45e4-b5b4-53ae230e804a,fresh,captive,other,id1|id2,id3|id4,, +not-an-uuid-5,,,,http://www.bowerbird.org.au/observations/76433,,Carol Page,,,Scioglyptis chionomera,urn:lsid:biodiversity.org.au:afd.taxon:d3c54055-181d-4135-87b7-97920e6bd73e,species,Animalia,Arthropoda,Insecta,Lepidoptera,Geometridae,Scioglyptis,,-37.9909881,145.1256931,EPSG:4326,,,,,,Australia,Victoria,,,2016,11,26,26/11/16,,,Tattered Geometrid,Identification aligned to national taxonomy. Originally supplied as [[ Animalia | Arthropoda | Insecta | Lepidoptera | Geometridae | Scioglyptis | ]],,https://biocache.ala.org.au/occurrences/ffff4965-d9bc-4baa-a7dd-84ba594fc298,salty,other,native,id1|id2,id3,Extinct,Vulnerable +not-an-uuid-6,,,,http://www.bowerbird.org.au/observations/49878,,Jenny Holmes,,,Erythrotriorchis radiatus,urn:lsid:biodiversity.org.au:afd.taxon:1405a1d4-557c-40ac-9f44-a6d41e9136cd,species,Animalia,,,,,,,-35.260319,149.425934,EPSG:4326,,,,,,Australia,New South Wales,,,2015,11,27,27/11/15,,,,,,,,,,, diff --git a/livingatlas/solr/conf/managed-schema b/livingatlas/solr/conf/managed-schema index 109740a550..b51638686e 100644 --- a/livingatlas/solr/conf/managed-schema +++ b/livingatlas/solr/conf/managed-schema @@ -17,7 +17,7 @@ + @@ -538,6 +539,7 @@ + @@ -712,6 +714,7 @@ + diff --git a/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreter.java b/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreter.java index c547d7614b..528db783aa 100644 --- a/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreter.java +++ b/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreter.java @@ -7,12 +7,14 @@ import static org.gbif.api.vocabulary.OccurrenceIssue.OCCURRENCE_STATUS_INFERRED_FROM_INDIVIDUAL_COUNT; import static org.gbif.api.vocabulary.OccurrenceIssue.OCCURRENCE_STATUS_UNPARSABLE; import static org.gbif.api.vocabulary.OccurrenceIssue.TYPE_STATUS_INVALID; +import static org.gbif.pipelines.core.utils.ModelUtils.DEFAULT_SEPARATOR; import static org.gbif.pipelines.core.utils.ModelUtils.addIssue; -import static org.gbif.pipelines.core.utils.ModelUtils.extractOptListValue; +import static org.gbif.pipelines.core.utils.ModelUtils.extractListValue; import static org.gbif.pipelines.core.utils.ModelUtils.extractOptValue; import com.google.common.base.Strings; import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -287,27 +289,42 @@ public static BiConsumer interpretOccurrenceStatus( /** {@link DwcTerm#otherCatalogNumbers} interpretation. */ public static void interpretOtherCatalogNumbers(ExtendedRecord er, BasicRecord br) { - extractOptListValue(er, DwcTerm.otherCatalogNumbers).ifPresent(br::setOtherCatalogNumbers); + List list = extractListValue(DEFAULT_SEPARATOR + "|;", er, DwcTerm.otherCatalogNumbers); + if (!list.isEmpty()) { + br.setOtherCatalogNumbers(list); + } } /** {@link DwcTerm#recordedBy} interpretation. */ public static void interpretRecordedBy(ExtendedRecord er, BasicRecord br) { - extractOptListValue(er, DwcTerm.recordedBy).ifPresent(br::setRecordedBy); + List list = extractListValue(er, DwcTerm.recordedBy); + if (!list.isEmpty()) { + br.setRecordedBy(list); + } } /** {@link DwcTerm#identifiedBy} interpretation. */ public static void interpretIdentifiedBy(ExtendedRecord er, BasicRecord br) { - extractOptListValue(er, DwcTerm.identifiedBy).ifPresent(br::setIdentifiedBy); + List list = extractListValue(er, DwcTerm.identifiedBy); + if (!list.isEmpty()) { + br.setIdentifiedBy(list); + } } /** {@link DwcTerm#preparations} interpretation. */ public static void interpretPreparations(ExtendedRecord er, BasicRecord br) { - extractOptListValue(er, DwcTerm.preparations).ifPresent(br::setPreparations); + List list = extractListValue(er, DwcTerm.preparations); + if (!list.isEmpty()) { + br.setPreparations(list); + } } /** {@link org.gbif.dwc.terms.GbifTerm#projectId} interpretation. */ public static void interpretProjectId(ExtendedRecord er, BasicRecord br) { - extractOptListValue(er, GbifTerm.projectId).ifPresent(br::setProjectId); + List list = extractListValue(er, GbifTerm.projectId); + if (!list.isEmpty()) { + br.setProjectId(list); + } } /** Sets the coreId field. */ diff --git a/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/CoreInterpreter.java b/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/CoreInterpreter.java index 9bd6853d92..fb56487b3c 100644 --- a/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/CoreInterpreter.java +++ b/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/CoreInterpreter.java @@ -2,8 +2,8 @@ import static org.gbif.api.vocabulary.OccurrenceIssue.REFERENCES_URI_INVALID; import static org.gbif.pipelines.core.utils.ModelUtils.addIssue; +import static org.gbif.pipelines.core.utils.ModelUtils.extractListValue; import static org.gbif.pipelines.core.utils.ModelUtils.extractNullAwareOptValue; -import static org.gbif.pipelines.core.utils.ModelUtils.extractOptListValue; import static org.gbif.pipelines.core.utils.ModelUtils.extractOptValue; import static org.gbif.pipelines.core.utils.ModelUtils.extractValue; @@ -78,12 +78,18 @@ public static void interpretLicense(ExtendedRecord er, Consumer consumer /** {@link DwcTerm#datasetID} interpretation. */ public static void interpretDatasetID(ExtendedRecord er, Consumer> consumer) { - extractOptListValue(er, DwcTerm.datasetID).ifPresent(consumer); + List list = extractListValue(er, DwcTerm.datasetID); + if (!list.isEmpty()) { + consumer.accept(list); + } } /** {@link DwcTerm#datasetName} interpretation. */ public static void interpretDatasetName(ExtendedRecord er, Consumer> consumer) { - extractOptListValue(er, DwcTerm.datasetName).ifPresent(consumer); + List list = extractListValue(er, DwcTerm.datasetName); + if (!list.isEmpty()) { + consumer.accept(list); + } } /** {@link DwcTerm#parentEventID} interpretation. */ @@ -134,7 +140,10 @@ public static BiConsumer interpretLineages( /** {@link DwcTerm#samplingProtocol} interpretation. */ public static void interpretSamplingProtocol(ExtendedRecord er, Consumer> consumer) { - extractOptListValue(er, DwcTerm.samplingProtocol).ifPresent(consumer); + List list = extractListValue(er, DwcTerm.samplingProtocol); + if (!list.isEmpty()) { + consumer.accept(list); + } } /** {@link DwcTerm#locationID} interpretation. */ diff --git a/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/GrscicollInterpreter.java b/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/GrscicollInterpreter.java index 59fd7c1481..c6b2ef0f8f 100644 --- a/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/GrscicollInterpreter.java +++ b/sdks/core/src/main/java/org/gbif/pipelines/core/interpreters/core/GrscicollInterpreter.java @@ -134,7 +134,9 @@ private static boolean isSpecimenRecord(ExtendedRecord er) { return bor == BasisOfRecord.PRESERVED_SPECIMEN || bor == BasisOfRecord.FOSSIL_SPECIMEN - || bor == BasisOfRecord.LIVING_SPECIMEN; + || bor == BasisOfRecord.LIVING_SPECIMEN + || bor == BasisOfRecord.MATERIAL_SAMPLE + || bor == BasisOfRecord.MATERIAL_CITATION; } @VisibleForTesting diff --git a/sdks/core/src/main/java/org/gbif/pipelines/core/utils/ModelUtils.java b/sdks/core/src/main/java/org/gbif/pipelines/core/utils/ModelUtils.java index b1c1fc2084..c7a791e45e 100644 --- a/sdks/core/src/main/java/org/gbif/pipelines/core/utils/ModelUtils.java +++ b/sdks/core/src/main/java/org/gbif/pipelines/core/utils/ModelUtils.java @@ -2,13 +2,14 @@ import static org.gbif.pipelines.core.utils.IdentificationUtils.extractFromIdentificationExtension; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.gbif.api.vocabulary.Extension; @@ -68,15 +69,20 @@ public static Optional extractOptValue(ExtendedRecord er, Term term) { return Optional.ofNullable(extractValue(er, term)); } - public static Optional> extractOptListValue(ExtendedRecord er, Term term) { + public static List extractListValue(ExtendedRecord er, Term term) { + return extractListValue(DEFAULT_SEPARATOR, er, term); + } + + public static List extractListValue(String separatorRegex, ExtendedRecord er, Term term) { return extractOptValue(er, term) .filter(x -> !x.isEmpty()) .map( x -> - Stream.of(x.split(DEFAULT_SEPARATOR)) + Arrays.stream(x.split(separatorRegex)) .map(String::trim) .filter(v -> !v.isEmpty()) - .collect(Collectors.toList())); + .collect(Collectors.toList())) + .orElse(Collections.emptyList()); } public static boolean hasValue(ExtendedRecord er, Term term) { diff --git a/sdks/core/src/test/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreterTest.java b/sdks/core/src/test/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreterTest.java index 6e20428902..da9798c649 100644 --- a/sdks/core/src/test/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreterTest.java +++ b/sdks/core/src/test/java/org/gbif/pipelines/core/interpreters/core/BasicInterpreterTest.java @@ -268,6 +268,28 @@ public void interpretOtherCatalogNumbersTest() { assertIssueSize(br, 0); } + @Test + public void interpretSemicolonOtherCatalogNumbersTest() { + final String number1 = "111"; + final String number2 = "22"; + + // State + Map coreMap = new HashMap<>(1); + coreMap.put(DwcTerm.otherCatalogNumbers.qualifiedName(), number1 + " ; " + number2 + " ; "); + ExtendedRecord er = ExtendedRecord.newBuilder().setId(ID).setCoreTerms(coreMap).build(); + + BasicRecord br = BasicRecord.newBuilder().setId(ID).build(); + + // When + BasicInterpreter.interpretOtherCatalogNumbers(er, br); + + // Should + Assert.assertEquals(2, br.getOtherCatalogNumbers().size()); + Assert.assertTrue(br.getOtherCatalogNumbers().contains(number1)); + Assert.assertTrue(br.getOtherCatalogNumbers().contains(number2)); + assertIssueSize(br, 0); + } + @Test public void interpretRecordedByTest() { final String person1 = "person 1";