Skip to content

Commit

Permalink
Replace usage of deprecated AccumuloInputFormat
Browse files Browse the repository at this point in the history
Replace usages of
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat with the
recommended org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat.

Part of #2443
  • Loading branch information
lbschanno committed Aug 6, 2024
1 parent 5617f02 commit 165efed
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -105,11 +105,11 @@ public int run(String[] args) throws Exception {
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance.trim()).withZkHosts(zookeepers.trim()));
AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange));

Properties clientProperties = Accumulo.newClientProperties().to(instance.trim(), zookeepers.trim()).as(userName, password).build();
AccumuloInputFormat.configure().clientProperties(clientProperties).table(inputTable).auths(Authorizations.EMPTY)
.ranges(Collections.singletonList(dayRange)).store(job);

// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand All @@ -16,15 +17,13 @@
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -277,11 +276,10 @@ public int run(String[] args) throws Exception {
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange));

Properties clientProperties = Accumulo.newClientProperties().to(instance, zookeepers).as(userName, password).build();
AccumuloInputFormat.configure().clientProperties(clientProperties).table(inputTable).auths(Authorizations.EMPTY)
.ranges(Collections.singletonList(dayRange)).store(job);

// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -287,16 +287,13 @@ public int run(String[] args) throws Exception {
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);

AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloInputFormat.setRanges(job, dayRanges);
AccumuloInputFormat.setAutoAdjustRanges(job, false);
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, auths);
Properties clientProperties = Accumulo.newClientProperties().to(instance, zookeepers).as(userName, password).build();

IteratorSetting regex = new IteratorSetting(50, RegExFilter.class);
regex.addOption(RegExFilter.COLF_REGEX, QUERY_METRICS_REGEX);
AccumuloInputFormat.addIterator(job, regex);

AccumuloInputFormat.configure().clientProperties(clientProperties).table(inputTable).auths(auths).ranges(dayRanges).autoAdjustRanges(false)
.addIterator(regex).store(job);

// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<property name="restrictInputFormats" value="${mapReduce.inputFormat.restrict}" />
<property name="validInputFormats">
<list value-type="java.lang.Class">
<value>org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat</value>
<value>org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat</value>
<value>datawave.mr.bulk.BulkInputFormat</value>
</list>
</property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.StreamingOutput;

import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang.StringUtils;
Expand Down

0 comments on commit 165efed

Please sign in to comment.