Skip to content

Commit

Permalink
Replace usage of deprecated AccumuloInputFormat (#2496)
Browse files Browse the repository at this point in the history
* Replace usage of deprecated AccumuloInputFormat

Replace usages of
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat with the
recommended org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat.

Part of #2443

* Format calls to builders

* Fix formatter tag typo

---------

Co-authored-by: alerman <[email protected]>
Co-authored-by: Daniel Roberts <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent 45968ee commit 279c8a0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -105,11 +105,21 @@ public int run(String[] args) throws Exception {
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance.trim()).withZkHosts(zookeepers.trim()));
AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange));

// @formatter:off
Properties clientProperties = Accumulo.newClientProperties()
.to(instance.trim(), zookeepers.trim())
.as(userName, password)
.build();

AccumuloInputFormat.configure()
.clientProperties(clientProperties)
.table(inputTable)
.auths(Authorizations.EMPTY)
.ranges(Collections.singletonList(dayRange))
.store(job);
// @formatter:on

// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand All @@ -16,15 +17,13 @@
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -277,12 +276,20 @@ public int run(String[] args) throws Exception {
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange));

// @formatter:off
Properties clientProperties = Accumulo.newClientProperties()
.to(instance, zookeepers)
.as(userName, password)
.build();

AccumuloInputFormat.configure()
.clientProperties(clientProperties)
.table(inputTable)
.auths(Authorizations.EMPTY)
.ranges(Collections.singletonList(dayRange))
.store(job);
// @formatter:on
// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -287,17 +287,26 @@ public int run(String[] args) throws Exception {
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);

AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloInputFormat.setRanges(job, dayRanges);
AccumuloInputFormat.setAutoAdjustRanges(job, false);
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, auths);
// @formatter:off
Properties clientProperties = Accumulo.newClientProperties()
.to(instance, zookeepers)
.as(userName, password)
.build();
// @formatter:on

IteratorSetting regex = new IteratorSetting(50, RegExFilter.class);
regex.addOption(RegExFilter.COLF_REGEX, QUERY_METRICS_REGEX);
AccumuloInputFormat.addIterator(job, regex);

// @formatter:off
AccumuloInputFormat.configure()
.clientProperties(clientProperties)
.table(inputTable)
.auths(auths)
.ranges(dayRanges)
.autoAdjustRanges(false)
.addIterator(regex)
.store(job);
// @formatter:on
// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<property name="restrictInputFormats" value="${mapReduce.inputFormat.restrict}" />
<property name="validInputFormats">
<list value-type="java.lang.Class">
<value>org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat</value>
<value>org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat</value>
<value>datawave.mr.bulk.BulkInputFormat</value>
</list>
</property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.StreamingOutput;

import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang.StringUtils;
Expand Down

0 comments on commit 279c8a0

Please sign in to comment.