From 165efeddc5b553fa94e95c1dfc33be0d9e4d0fe0 Mon Sep 17 00:00:00 2001 From: Laura Schanno Date: Tue, 6 Aug 2024 01:42:43 -0400 Subject: [PATCH] Replace usage of deprecated AccumuloInputFormat Replace usages of org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat with the recommended org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat. Part of #2443 --- .../metrics/analytic/FileByteSummaryLoader.java | 16 ++++++++-------- .../analytic/IngestMetricsSummaryLoader.java | 14 ++++++-------- .../analytic/QueryMetricsSummaryLoader.java | 17 +++++++---------- .../datawave/mapreduce/MapReduceJobs.xml | 2 +- .../datawave/webservice/mr/MapReduceBean.java | 1 + 5 files changed, 23 insertions(+), 27 deletions(-) diff --git a/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/FileByteSummaryLoader.java b/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/FileByteSummaryLoader.java index 3a58363d3da..dfba8cc9206 100644 --- a/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/FileByteSummaryLoader.java +++ b/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/FileByteSummaryLoader.java @@ -3,19 +3,19 @@ import java.io.IOException; import java.util.Collections; import java.util.Date; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; @@ -105,11 +105,11 @@ public int run(String[] args) throws Exception { job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); job.setInputFormatClass(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password)); - AccumuloInputFormat.setInputTableName(job, inputTable); - AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY); - AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance.trim()).withZkHosts(zookeepers.trim())); - AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange)); + + Properties clientProperties = Accumulo.newClientProperties().to(instance.trim(), zookeepers.trim()).as(userName, password).build(); + AccumuloInputFormat.configure().clientProperties(clientProperties).table(inputTable).auths(Authorizations.EMPTY) + .ranges(Collections.singletonList(dayRange)).store(job); + // Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo RowPartitioner.configureJob(job); diff --git a/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/IngestMetricsSummaryLoader.java b/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/IngestMetricsSummaryLoader.java index 27e6b5692d5..f4ad35e5550 100644 --- a/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/IngestMetricsSummaryLoader.java +++ b/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/IngestMetricsSummaryLoader.java @@ -7,6 +7,7 @@ import java.util.Date; import java.util.HashSet; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -16,15 +17,13 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; @@ -277,11 +276,10 @@ public int run(String[] args) throws Exception { job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); job.setInputFormatClass(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password)); - AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers)); - AccumuloInputFormat.setInputTableName(job, inputTable); - AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY); - AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange)); + + Properties clientProperties = Accumulo.newClientProperties().to(instance, zookeepers).as(userName, password).build(); + AccumuloInputFormat.configure().clientProperties(clientProperties).table(inputTable).auths(Authorizations.EMPTY) + .ranges(Collections.singletonList(dayRange)).store(job); // Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo RowPartitioner.configureJob(job); diff --git a/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/QueryMetricsSummaryLoader.java b/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/QueryMetricsSummaryLoader.java index 746684e07d6..a6044f627a4 100644 --- a/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/QueryMetricsSummaryLoader.java +++ b/warehouse/metrics-core/src/main/java/datawave/metrics/analytic/QueryMetricsSummaryLoader.java @@ -8,20 +8,20 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; @@ -287,16 +287,13 @@ public int run(String[] args) throws Exception { job.setMapOutputValueClass(Value.class); job.setInputFormatClass(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password)); - AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers)); - AccumuloInputFormat.setRanges(job, dayRanges); - AccumuloInputFormat.setAutoAdjustRanges(job, false); - AccumuloInputFormat.setInputTableName(job, inputTable); - AccumuloInputFormat.setScanAuthorizations(job, auths); + Properties clientProperties = Accumulo.newClientProperties().to(instance, zookeepers).as(userName, password).build(); IteratorSetting regex = new IteratorSetting(50, RegExFilter.class); regex.addOption(RegExFilter.COLF_REGEX, QUERY_METRICS_REGEX); - AccumuloInputFormat.addIterator(job, regex); + + AccumuloInputFormat.configure().clientProperties(clientProperties).table(inputTable).auths(auths).ranges(dayRanges).autoAdjustRanges(false) + .addIterator(regex).store(job); // Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo RowPartitioner.configureJob(job); diff --git a/web-services/deploy/configuration/src/main/resources/datawave/mapreduce/MapReduceJobs.xml b/web-services/deploy/configuration/src/main/resources/datawave/mapreduce/MapReduceJobs.xml index b3a04ec7bb6..3817a39f851 100644 --- a/web-services/deploy/configuration/src/main/resources/datawave/mapreduce/MapReduceJobs.xml +++ b/web-services/deploy/configuration/src/main/resources/datawave/mapreduce/MapReduceJobs.xml @@ -15,7 +15,7 @@ - org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat + org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat datawave.mr.bulk.BulkInputFormat diff --git a/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java b/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java index df051b3d6a6..1c502bdbf5b 100644 --- a/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java +++ b/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java @@ -43,6 +43,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.StreamingOutput; +import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.lang.StringUtils;