diff --git a/camus-api/pom.xml b/camus-api/pom.xml
index c41aecd9c..2bc4abd37 100644
--- a/camus-api/pom.xml
+++ b/camus-api/pom.xml
@@ -5,19 +5,23 @@
com.linkedin.camus
camus-parent
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
camus-api
- Camus API.
- jar
+ Camus API.
+ jar
-
-
- org.apache.hadoop
- hadoop-client
-
-
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
diff --git a/camus-etl-kafka/pom.xml b/camus-etl-kafka/pom.xml
index a5750aec1..098a705f0 100644
--- a/camus-etl-kafka/pom.xml
+++ b/camus-etl-kafka/pom.xml
@@ -1,95 +1,128 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
-
- com.linkedin.camus
- camus-parent
- 0.1.0-SNAPSHOT
-
+
+ com.linkedin.camus
+ camus-parent
+ 0.3.0-SNAPSHOT
+
- camus-etl-kafka
- Camus ETL to move data from Kafka to Hadoop.
- jar
-
-
- com.linkedin.camus
- camus-api
-
-
- com.linkedin.camus
- camus-schema-registry
-
-
- com.linkedin.camus
- camus-kafka-coders
-
-
- joda-time
- joda-time
-
-
- org.apache.avro
- avro-mapred
-
-
- org.apache.hadoop
- hadoop-client
-
-
- kafka
- kafka
-
-
- com.github.sgroschupf
- zkclient
-
-
- org.apache.zookeeper
- zookeeper
-
-
- log4j
- log4j
-
-
- jline
- jline
-
-
-
-
- org.scala-lang
- scala-library
-
-
- commons-cli
- commons-cli
- 1.2
-
-
- junit
- junit
- 4.8.1
-
-
-
-
-
-
- org.apache.avro
- avro-maven-plugin
- 1.7.0
-
- schemas generate-sources
- schema protocol idl-protocol
- ${project.basedir}/src/main/avro/
- ${project.basedir}/src/main/java/
-
-
-
-
-
-
+ camus-etl-kafka
+ Camus ETL to move data from Kafka to Hadoop.
+ jar
+
+
+ com.linkedin.camus
+ camus-api
+
+
+ com.linkedin.camus
+ camus-schema-registry
+
+
+ com.linkedin.camus
+ camus-kafka-coders
+
+
+ joda-time
+ joda-time
+
+
+ org.apache.avro
+ avro-mapred
+
+
+ org.apache.hadoop
+ hadoop-client
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+ org.apache.kafka
+ kafka_2.10
+
+
+ com.github.sgroschupf
+ zkclient
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ log4j
+ log4j
+
+
+ jline
+ jline
+
+
+
+
+ org.scala-lang
+ scala-library
+
+
+ commons-cli
+ commons-cli
+ 1.2
+
+
+ junit
+ junit
+ 4.8.1
+
+
+ org.mockito
+ mockito-core
+ 1.9.5
+ test
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.7.0
+
+ schemas generate-sources
+ schema protocol idl-protocol
+ ${project.basedir}/src/main/avro/
+ ${project.basedir}/src/main/java/
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java
index c5e97549a..5d9b0688b 100644
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/CamusJob.java
@@ -9,10 +9,12 @@
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.NumberFormat;
@@ -26,6 +28,8 @@
import java.util.Comparator;
import java.util.Arrays;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -88,8 +92,11 @@ public class CamusJob extends Configured implements Tool {
public static final String KAFKA_HOST_PORT = "kafka.host.port";
public static final String KAFKA_TIMEOUT_VALUE = "kafka.timeout.value";
public static final String LOG4J_CONFIGURATION = "log4j.configuration";
+ public static final String LOG4J_PATH = "log4j.path";
private static org.apache.log4j.Logger log;
+ public static final String CAMUS_COUNTERS_PATH = "camus.counters.path";
+
private final Properties props;
public CamusJob() throws IOException {
@@ -99,7 +106,7 @@ public CamusJob() throws IOException {
public CamusJob(Properties props) throws IOException {
this(props, org.apache.log4j.Logger.getLogger(CamusJob.class));
}
-
+
public CamusJob(Properties props, Logger log) throws IOException {
this.props = props;
this.log = log;
@@ -124,19 +131,19 @@ public static void setTime(String name) {
(timingMap.get(name) == null ? 0 : timingMap.get(name))
+ System.currentTimeMillis());
}
-
+
private Job createJob(Properties props) throws IOException {
- Job job;
+ Job job;
if(getConf() == null)
{
- setConf(new Configuration());
+ setConf(new Configuration());
}
-
+
populateConf(props, getConf(), log);
-
+
job = new Job(getConf());
job.setJarByClass(CamusJob.class);
-
+
if(job.getConfiguration().get("camus.job.name") != null)
{
job.setJobName(job.getConfiguration().get("camus.job.name"));
@@ -145,7 +152,7 @@ private Job createJob(Properties props) throws IOException {
{
job.setJobName("Camus Job");
}
-
+
return job;
}
@@ -165,7 +172,7 @@ public static void populateConf(Properties props, Configuration conf, Logger log
if (status != null) {
for (int i = 0; i < status.length; ++i) {
if (!status[i].isDir()) {
- log.info("Adding Jar to Distributed Cache Archive File:"
+ System.out.println("Adding Jar to Distributed Cache Archive File:"
+ status[i].getPath());
DistributedCache
@@ -185,7 +192,7 @@ public static void populateConf(Properties props, Configuration conf, Logger log
if (externalJarList != null) {
String[] jarFiles = externalJarList.split(",");
for (String jarFile : jarFiles) {
- log.info("Adding external jar File:" + jarFile);
+ System.out.println("Adding external jar File:" + jarFile);
DistributedCache.addFileToClassPath(new Path(jarFile),
conf, fs);
}
@@ -198,11 +205,11 @@ public void run() throws Exception {
startTiming("total");
Job job = createJob(props);
if (getLog4jConfigure(job)) {
- DOMConfigurator.configure("log4j.xml");
+ DOMConfigurator.configure(getLog4jPath(job));
}
FileSystem fs = FileSystem.get(job.getConfiguration());
- log.info("Dir Destination set to: "
+ System.out.println("Dir Destination set to: "
+ EtlMultiOutputFormat.getDestinationPath(job));
Path execBasePath = new Path(props.getProperty(ETL_EXECUTION_BASE_PATH));
@@ -210,11 +217,11 @@ public void run() throws Exception {
props.getProperty(ETL_EXECUTION_HISTORY_PATH));
if (!fs.exists(execBasePath)) {
- log.info("The execution base path does not exist. Creating the directory");
+ System.out.println("The execution base path does not exist. Creating the directory");
fs.mkdirs(execBasePath);
}
if (!fs.exists(execHistory)) {
- log.info("The history base path does not exist. Creating the directory.");
+ System.out.println("The history base path does not exist. Creating the directory.");
fs.mkdirs(execHistory);
}
@@ -235,12 +242,12 @@ public int compare(FileStatus f1, FileStatus f2) {
return f1.getPath().getName().compareTo(f2.getPath().getName());
}
});
-
+
// removes oldest directory until we get under required % of count
// quota. Won't delete the most recent directory.
for (int i = 0; i < executions.length - 1 && limit < currentCount; i++) {
FileStatus stat = executions[i];
- log.info("removing old execution: " + stat.getPath().getName());
+ System.out.println("removing old execution: " + stat.getPath().getName());
ContentSummary execContent = fs.getContentSummary(stat.getPath());
currentCount -= execContent.getFileCount()
- execContent.getDirectoryCount();
@@ -252,7 +259,7 @@ public int compare(FileStatus f1, FileStatus f2) {
if (executions.length > 0) {
Path previous = executions[executions.length - 1].getPath();
FileInputFormat.setInputPaths(job, previous);
- log.info("Previous execution: " + previous.toString());
+ System.out.println("Previous execution: " + previous.toString());
} else {
System.out
.println("No previous execution, all topics pulled from earliest available offset");
@@ -267,11 +274,11 @@ public int compare(FileStatus f1, FileStatus f2) {
Path newExecutionOutput = new Path(execBasePath,
new DateTime().toString(dateFmt));
FileOutputFormat.setOutputPath(job, newExecutionOutput);
- log.info("New execution temp location: "
+ System.out.println("New execution temp location: "
+ newExecutionOutput.toString());
EtlInputFormat.setLogger(log);
-
+
job.setInputFormatClass(EtlInputFormat.class);
job.setOutputFormatClass(EtlMultiOutputFormat.class);
job.setNumReduceTasks(0);
@@ -282,14 +289,37 @@ public int compare(FileStatus f1, FileStatus f2) {
// dump all counters
Counters counters = job.getCounters();
+ JSONArray jsonData = new JSONArray();
+
for (String groupName : counters.getGroupNames()) {
CounterGroup group = counters.getGroup(groupName);
- log.info("Group: " + group.getDisplayName());
+ String groupDisplayName = group.getDisplayName();
+ System.out.println("Group: " + groupDisplayName);
for (Counter counter : group) {
- log.info(counter.getDisplayName() + ":\t" + counter.getValue());
+ System.out.println(counter.getDisplayName() + ":\t"
+ + counter.getValue());
+ JSONObject oneJsonNode = new JSONObject();
+ oneJsonNode.put("group", groupDisplayName);
+ oneJsonNode.put("countername", counter.getDisplayName());
+ oneJsonNode.put("countervalue", counter.getValue());
+ jsonData.add(oneJsonNode);
}
}
+ String countersPathString = props.getProperty(CAMUS_COUNTERS_PATH);
+ if (countersPathString != null) {
+ Path countersDir = new Path(countersPathString);
+ if (!fs.exists(countersDir))
+ fs.mkdirs(countersDir);
+ Path countersPath = new Path(countersPathString, "counters.json");
+ fs.delete(countersPath, true);
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ fs.create(countersPath)));
+ writer.write(jsonData.toJSONString());
+ writer.close();
+ }
+
stopTiming("hadoop");
startTiming("commit");
@@ -301,7 +331,7 @@ public int compare(FileStatus f1, FileStatus f2) {
fs.rename(newExecutionOutput, execHistory);
- log.info("Job finished");
+ System.out.println("Job finished");
stopTiming("commit");
stopTiming("total");
createReport(job, timingMap);
@@ -434,7 +464,7 @@ public void sendTrackingCounts(JobContext job, FileSystem fs,
/**
* Creates a diagnostic report mostly focused on timing breakdowns. Useful
* for determining where to optimize.
- *
+ *
* @param job
* @param timingMap
* @throws IOException
@@ -546,7 +576,7 @@ private void createReport(Job job, Map timingMap)
sb.append(String.format("\n%16s %s\n", "Total MB read:",
mb / 1024 / 1024));
- log.info(sb.toString());
+ System.out.println(sb.toString());
}
/**
@@ -628,7 +658,7 @@ public static String getKafkaBrokers(JobContext job) {
if (brokers == null) {
brokers = job.getConfiguration().get(KAFKA_HOST_URL);
if (brokers != null) {
- log.warn("The configuration properties " + KAFKA_HOST_URL + " and " +
+ log.warn("The configuration properties " + KAFKA_HOST_URL + " and " +
KAFKA_HOST_PORT + " are deprecated. Please switch to using " + KAFKA_BROKERS);
return brokers + ":" + job.getConfiguration().getInt(KAFKA_HOST_PORT, 10251);
}
@@ -662,4 +692,9 @@ public static int getKafkaBufferSize(JobContext job) {
public static boolean getLog4jConfigure(JobContext job) {
return job.getConfiguration().getBoolean(LOG4J_CONFIGURATION, false);
}
+
+ public static String getLog4jPath(JobContext job) {
+ return job.getConfiguration().get(LOG4J_PATH, "log4j.xml");
+ }
+
}
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/SnapshotDailyMetadata.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/SnapshotDailyMetadata.java
new file mode 100644
index 000000000..f80e8ab08
--- /dev/null
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/SnapshotDailyMetadata.java
@@ -0,0 +1,584 @@
+package com.linkedin.camus.etl.kafka;
+
+import com.linkedin.camus.etl.kafka.common.LeaderInfo;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * A simple consumer to consume metadata from kafka and write our the difference
+ * in the offsets of the partitions from previous run. This is used for
+ * accounting purposes to figure out how many events are expected to be read
+ * from kafka between successive runs of this consumer.
+ *
+ * @author krishna_puttaswamy
+ */
+
+class OffsetInfo
+ implements WritableComparable
+{
+ private String topic;
+ private int partition;
+ private long latestOffset;
+
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ public long getLatestOffset()
+ {
+ return latestOffset;
+ }
+
+ public int getPartition()
+
+ {
+ return partition;
+ }
+
+ public String toString()
+ {
+ return String.format("Topic: %s Partition: %d LatestOffset: %d", topic,
+ partition, latestOffset);
+ }
+
+ OffsetInfo()
+ {
+ this.set("dummy", 0, 0);
+ }
+
+ OffsetInfo(String topic, int partition, long latestoffset)
+ {
+ this.set(topic, partition, latestoffset);
+ }
+
+ void set(String t, int p, long l)
+ {
+ this.topic = t;
+ this.partition = p;
+ this.latestOffset = l;
+ }
+
+ @Override
+ public void write(DataOutput out)
+ throws IOException
+ {
+ out.writeUTF(topic);
+ out.writeInt(partition);
+ out.writeLong(latestOffset);
+ }
+
+ @Override
+ public void readFields(DataInput in)
+ throws IOException
+ {
+ this.topic = in.readUTF();
+ this.partition = in.readInt();
+ this.latestOffset = in.readLong();
+ }
+
+ @Override
+ public int compareTo(OffsetInfo o)
+ {
+ if (partition != o.partition) {
+ return partition - o.partition;
+ }
+ else {
+ if (latestOffset > o.latestOffset) {
+ return 1;
+ }
+ else if (latestOffset > o.latestOffset) {
+ return -1;
+ }
+ else {
+ return 0;
+ }
+ }
+ }
+}
+
+public class SnapshotDailyMetadata
+{
+ public static final String KAFKA_BLACKLIST_TOPIC = "kafka.blacklist.topics";
+ public static final String KAFKA_WHITELIST_TOPIC = "kafka.whitelist.topics";
+ public static final String KAFKA_METADATA_CONSUMER_PATH = "kafka.metadata.consumer.path";
+ static Properties props = new Properties();
+ final static org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SnapshotDailyMetadata.class);
+
+ public static void main(String[] args)
+ throws URISyntaxException,
+ IOException, ParseException
+ {
+ Options options = new Options();
+
+ options.addOption("p", true, "properties file for Camus");
+ options.addOption("h", false, "help");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (!(cmd.hasOption('p'))) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("SnapshotDailyMetadata.java", options);
+ System.exit(1);
+ }
+
+ File file = new File(cmd.getOptionValue('p'));
+ FileInputStream fStream;
+ fStream = new FileInputStream(file);
+ props.load(fStream);
+
+ // Get topic metadata for all topics; this gives back the various
+ // topics, and for each topic the partitions and the corresponding
+ // leaders
+ List topicMetadataList = getTopicMetadataFromKafka();
+ log.info("Got " + topicMetadataList.size()
+ + " topic metadata list: "
+ + StringUtils.join(",", topicMetadataList));
+
+ // fetch latest offsets for each partition of the topic we care and
+ // write them to hdfs
+ List newOffsets = fetchLatestOffset(topicMetadataList);
+ writeLatestOffsetInfo(newOffsets);
+
+ // read latest offsets from previous run
+ Map oldOffsets = readLatestOffsetInfo();
+
+ // compute the diff in the latest offsets
+ JSONArray jsonData = new JSONArray();
+ for (OffsetInfo key : newOffsets) {
+ String pk = key.getTopic() + "_" + key.getPartition();
+ if (oldOffsets.containsKey(pk)) {
+ long diff = key.getLatestOffset() - oldOffsets.get(pk);
+
+ JSONObject oneJsonNode = new JSONObject();
+ oneJsonNode.put("topic", key.getTopic());
+ oneJsonNode.put("partition", key.getPartition());
+ oneJsonNode.put("difference", diff);
+ jsonData.add(oneJsonNode);
+
+ log.info(String.format("Topic: %s OffsetDifference: %d", pk, diff));
+ }
+ }
+
+ // write out the diff as a json file
+ writeToJsonFile(jsonData);
+
+ // move current latest to old latest file
+ renameCurrentFolderToPrevious();
+ }
+
+ public static void writeToJsonFile(JSONArray data)
+ throws IOException
+ {
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+
+ if (!fs.exists(getCurrentOffsetsDir())) {
+ fs.mkdirs(getCurrentOffsetsDir());
+ }
+
+ Path countersPath = new Path(new Path(getMetadataDir()),
+ "diff/difference.json");
+ fs.delete(countersPath, true);
+
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ fs.create(countersPath)));
+ writer.write(data.toJSONString());
+ writer.close();
+ }
+
+ public static void renameCurrentFolderToPrevious()
+ {
+ try {
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+
+ if (!fs.exists(getOldOffsetsDir())) {
+ fs.mkdirs(getOldOffsetsDir());
+ }
+
+ if (fs.exists(getOldOffsetsPath())) {
+ fs.delete(getOldOffsetsPath(), true);
+ }
+
+ if (fs.exists(getCurrentOffsetsPath())) {
+ fs.rename(getCurrentOffsetsPath(), getOldOffsetsPath());
+ }
+
+ log.info("Successfully renamed "
+ + getCurrentOffsetsPath() + " to " + getOldOffsetsPath());
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static String getMetadataDir()
+ throws IOException
+ {
+ String metadataDir = props.getProperty(KAFKA_METADATA_CONSUMER_PATH);
+ if (metadataDir == null) {
+ throw new IOException(KAFKA_METADATA_CONSUMER_PATH
+ + " is not specified.");
+ }
+ return metadataDir;
+ }
+
+ public static Path getOffsetsDiffPath()
+ throws IOException
+ {
+ return new Path(getMetadataDir(), "current/difference.seq");
+ }
+
+ public static Path getCurrentOffsetsDir()
+ throws IOException
+ {
+ return new Path(getMetadataDir(), "current");
+ }
+
+ public static Path getCurrentOffsetsPath()
+ throws IOException
+ {
+ return new Path(getCurrentOffsetsDir(), "offsets.seq");
+ }
+
+ public static Path getOldOffsetsDir()
+ throws IOException
+ {
+ return new Path(getMetadataDir(), "previous");
+ }
+
+ public static Path getOldOffsetsPath()
+ throws IOException
+ {
+ return new Path(getOldOffsetsDir(), "offsets.seq");
+ }
+
+ public static Map readLatestOffsetInfo()
+ {
+ Map partitionToOffsetMap = new HashMap();
+
+ try {
+ Path oldOutputPath = getOldOffsetsPath();
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ oldOutputPath, conf);
+
+ OffsetInfo key = new OffsetInfo();
+
+ while (reader.next(key, NullWritable.get())) {
+ partitionToOffsetMap.put(
+ key.getTopic() + "_" + key.getPartition(),
+ key.getLatestOffset());
+ }
+
+ reader.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return partitionToOffsetMap;
+ }
+
+ public static void writeLatestOffsetInfo(List latestList)
+ throws IOException
+ {
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+
+ String metadataDir = getMetadataDir();
+
+ if (!fs.exists(new Path(metadataDir))) {
+ log.info("creating directory " + metadataDir);
+ fs.mkdirs(new Path(metadataDir));
+ }
+
+ Path newOutputPath = getCurrentOffsetsPath();
+
+ log.info("creating file " + newOutputPath.toString());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ newOutputPath, OffsetInfo.class, NullWritable.class);
+
+ for (OffsetInfo key : latestList) {
+ writer.append(key, NullWritable.get());
+ }
+
+ writer.close();
+ }
+
+ public static String createTopicRegEx(Set topicsSet)
+ {
+ if (topicsSet.isEmpty()) {
+ return "";
+ }
+
+ String regex = "";
+ StringBuilder stringbuilder = new StringBuilder();
+ for (String whiteList : topicsSet) {
+ stringbuilder.append(whiteList);
+ stringbuilder.append("|");
+ }
+ regex = "(" + stringbuilder.substring(0, stringbuilder.length() - 1) + ")";
+ return regex;
+ }
+
+ public static String[] getKafkaBlacklistTopic()
+ {
+ if (props.get(KAFKA_BLACKLIST_TOPIC) != null
+ && !props.getProperty(KAFKA_BLACKLIST_TOPIC).isEmpty()) {
+ return StringUtils.getStrings(props
+ .getProperty(KAFKA_BLACKLIST_TOPIC));
+ }
+ else {
+ return new String[] {};
+ }
+ }
+
+ public static String[] getKafkaWhitelistTopic()
+ {
+ if (props.get(KAFKA_WHITELIST_TOPIC) != null
+ && !props.getProperty(KAFKA_WHITELIST_TOPIC).isEmpty()) {
+ return StringUtils.getStrings(props
+ .getProperty(KAFKA_WHITELIST_TOPIC));
+ }
+ else {
+ return new String[] {};
+ }
+ }
+
+ public static List filterWhitelistTopics(
+ List topicMetadataList,
+ Set whiteListTopics)
+ {
+ ArrayList filteredTopics = new ArrayList();
+ String regex = createTopicRegEx(whiteListTopics);
+ for (TopicMetadata topicMetadata : topicMetadataList) {
+ if (Pattern.matches(regex, topicMetadata.topic())) {
+ filteredTopics.add(topicMetadata);
+ }
+ }
+ return filteredTopics;
+ }
+
+ public static List fetchLatestOffset(
+ List topicMetadataList)
+ throws URISyntaxException
+ {
+ // Filter any white list topics
+ Set whiteListTopics = new HashSet(
+ Arrays.asList(getKafkaWhitelistTopic()));
+ if (!whiteListTopics.isEmpty()) {
+ topicMetadataList = filterWhitelistTopics(topicMetadataList, whiteListTopics);
+ }
+
+ // Filter all blacklist topics
+ Set blackListTopics = new HashSet(
+ Arrays.asList(getKafkaBlacklistTopic()));
+ String regex = "";
+ if (!blackListTopics.isEmpty()) {
+ regex = createTopicRegEx(blackListTopics);
+ }
+
+ Map> offsetRequestInfo = new HashMap>();
+
+ // create the list of partition offset requests to the leaders
+ for (TopicMetadata mdata : topicMetadataList) {
+ if (Pattern.matches(regex, mdata.topic())) {
+ log.info("Discarding topic (blacklisted): "
+ + mdata.topic());
+ }
+ else {
+ for (PartitionMetadata pdata : mdata.partitionsMetadata()) {
+ LeaderInfo leader = new LeaderInfo(new URI("tcp://"
+ + pdata.leader().getConnectionString()), pdata
+ .leader().id());
+
+ if (offsetRequestInfo.containsKey(leader)) {
+ ArrayList topicAndPartitions = offsetRequestInfo
+ .get(leader);
+ topicAndPartitions.add(new TopicAndPartition(mdata
+ .topic(), pdata.partitionId()));
+ offsetRequestInfo.put(leader, topicAndPartitions);
+ }
+ else {
+ ArrayList topicAndPartitions = new ArrayList();
+ topicAndPartitions.add(new TopicAndPartition(mdata
+ .topic(), pdata.partitionId()));
+ offsetRequestInfo.put(leader, topicAndPartitions);
+ }
+ }
+ }
+ }
+
+ List offsetList = new ArrayList();
+
+ for (LeaderInfo leader : offsetRequestInfo.keySet()) {
+ SimpleConsumer consumer = new SimpleConsumer(leader.getUri()
+ .getHost(), leader.getUri().getPort(),
+ Integer.parseInt(props.getProperty(
+ CamusJob.KAFKA_TIMEOUT_VALUE, "30000")),
+ Integer.parseInt(props.getProperty(
+ CamusJob.KAFKA_FETCH_BUFFER_SIZE, "83886080")),
+ props.getProperty(CamusJob.KAFKA_CLIENT_NAME,
+ "KafkaMetadatFetcher"));
+
+ // Latest Offset
+ PartitionOffsetRequestInfo partitionLatestOffsetRequestInfo = new PartitionOffsetRequestInfo(
+ kafka.api.OffsetRequest.LatestTime(), 1);
+
+ Map latestOffsetInfo = new HashMap();
+ ArrayList topicAndPartitions = offsetRequestInfo
+ .get(leader);
+ for (TopicAndPartition topicAndPartition : topicAndPartitions) {
+ latestOffsetInfo.put(topicAndPartition,
+ partitionLatestOffsetRequestInfo);
+ }
+
+ int tries = 0;
+ OffsetResponse latestOffsetResponse = null;
+ while (tries < 3) {
+ latestOffsetResponse = consumer
+ .getOffsetsBefore(new OffsetRequest(latestOffsetInfo,
+ kafka.api.OffsetRequest.CurrentVersion(),
+ "KafkaMetadatFetcher"));
+ if (!latestOffsetResponse.hasError()) {
+ break;
+ }
+ try {
+ Thread.sleep(300);
+ }
+ catch (java.lang.InterruptedException e) {
+ // ...
+ }
+ tries++;
+ }
+
+ consumer.close();
+
+ if (latestOffsetResponse != null) {
+ for (TopicAndPartition topicAndPartition : topicAndPartitions) {
+ long latestOffset = 0;
+ long offsets[] = latestOffsetResponse.offsets(
+ topicAndPartition.topic(),
+ topicAndPartition.partition());
+ if (offsets.length > 0) {
+ latestOffset = offsets[0];
+ }
+
+ offsetList.add(new OffsetInfo(topicAndPartition.topic().toString(),
+ topicAndPartition.partition(), latestOffset));
+ }
+ }
+ }
+
+ return offsetList;
+ }
+
+ private static SimpleConsumer createConsumer(String broker)
+ {
+ if (!broker.matches(".+:\\d+")) {
+ throw new InvalidParameterException("The kakfa broker " + broker
+ + " must follow address:port pattern");
+ }
+ String[] hostPort = broker.split(":");
+ SimpleConsumer consumer = new SimpleConsumer(hostPort[0],
+ Integer.valueOf(hostPort[1]), Integer.parseInt(props
+ .getProperty(CamusJob.KAFKA_TIMEOUT_VALUE, "30000")),
+ Integer.parseInt(props.getProperty(
+ CamusJob.KAFKA_FETCH_BUFFER_SIZE, "83886080")),
+ props.getProperty(CamusJob.KAFKA_CLIENT_NAME,
+ "KafkaMetadatFetcher"));
+ return consumer;
+ }
+
+ public static List getTopicMetadataFromKafka()
+ {
+ ArrayList metaRequestTopics = new ArrayList();
+ String brokerString = props.getProperty(CamusJob.KAFKA_BROKERS);
+ if (brokerString.isEmpty()) {
+ throw new InvalidParameterException(
+ "kafka.brokers must contain at least one node");
+ }
+ List brokers = Arrays.asList(brokerString.split("\\s*,\\s*"));
+ Collections.shuffle(brokers);
+ boolean fetchMetaDataSucceeded = false;
+ int i = 0;
+ List topicMetadataList = null;
+ Exception savedException = null;
+ while (i < brokers.size() && !fetchMetaDataSucceeded) {
+ SimpleConsumer consumer = createConsumer(brokers.get(i));
+ log.info(String.format("Fetching metadata from broker %s with client id %s for %d topic(s) %s",
+ brokers.get(i), consumer.clientId(),
+ metaRequestTopics.size(), metaRequestTopics));
+ try {
+ topicMetadataList = consumer.send(
+ new TopicMetadataRequest(metaRequestTopics))
+ .topicsMetadata();
+ fetchMetaDataSucceeded = true;
+ }
+ catch (Exception e) {
+ savedException = e;
+ e.printStackTrace();
+ }
+ finally {
+ consumer.close();
+ i++;
+ }
+ }
+ if (!fetchMetaDataSucceeded) {
+ throw new RuntimeException("Failed to obtain metadata!",
+ savedException);
+ }
+ return topicMetadataList;
+ }
+}
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/EtlKey.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/EtlKey.java
index 240f573b4..f8078f2cf 100644
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/EtlKey.java
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/EtlKey.java
@@ -20,7 +20,7 @@ public class EtlKey implements WritableComparable, IEtlKey {
public static final Text SERVER = new Text("server");
public static final Text SERVICE = new Text("service");
public static EtlKey DUMMY_KEY = new EtlKey();
-
+
private String leaderId = "";
private int partition = 0;
private long beginOffset = 0;
@@ -41,7 +41,7 @@ public EtlKey() {
}
public EtlKey(EtlKey other) {
-
+
this.partition = other.partition;
this.beginOffset = other.beginOffset;
this.offset = other.offset;
@@ -76,7 +76,7 @@ public void set(String topic, String leaderId, int partition, long beginOffset,
// this time will be used for
// debugging.
}
-
+
public void clear() {
leaderId = "";
partition = 0;
@@ -133,7 +133,7 @@ public long getBeginOffset() {
public void setOffset(long offset) {
this.offset = offset;
}
-
+
public long getOffset() {
return this.offset;
}
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java
index dc0d97ee8..cc8af4611 100644
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java
@@ -4,6 +4,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Map;
import java.util.Iterator;
import kafka.api.PartitionFetchInfo;
@@ -12,12 +13,14 @@
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.javaapi.OffsetRequest;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
+import org.apache.log4j.Logger;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
import com.linkedin.camus.etl.kafka.CamusJob;
@@ -55,8 +58,8 @@ public KafkaReader(TaskAttemptContext context, EtlRequest request,
this.fetchBufferSize = fetchBufferSize;
this.context = context;
- log.info("bufferSize=" + fetchBufferSize);
- log.info("timeout=" + clientTimeout);
+ System.out.println("bufferSize=" + fetchBufferSize);
+ System.out.println("timeout=" + clientTimeout);
// Create the kafka request from the json
@@ -75,7 +78,7 @@ public KafkaReader(TaskAttemptContext context, EtlRequest request,
CamusJob.getKafkaTimeoutValue(context),
CamusJob.getKafkaBufferSize(context),
CamusJob.getKafkaClientName(context));
- log.info("Connected to leader " + uri
+ System.out.println("Connected to leader " + uri
+ " beginning reading at offset " + beginOffset
+ " latest offset=" + lastOffset);
fetch();
@@ -141,9 +144,6 @@ public boolean getNext(EtlKey key, BytesWritable payload ,BytesWritable pKey) th
*/
public boolean fetch() throws IOException {
- if (currentOffset >= lastOffset) {
- return false;
- }
long tempTime = System.currentTimeMillis();
TopicAndPartition topicAndPartition = new TopicAndPartition(
kafkaRequest.getTopic(), kafkaRequest.getPartition());
@@ -164,11 +164,30 @@ public boolean fetch() throws IOException {
try {
fetchResponse = simpleConsumer.fetch(fetchRequest);
if (fetchResponse.hasError()) {
- log.info("Error encountered during a fetch request from Kafka");
- log.info("Error Code generated : "
- + fetchResponse.errorCode(kafkaRequest.getTopic(),
+ if (fetchResponse.errorCode(kafkaRequest.getTopic(), kafkaRequest.getPartition()) == 1) {
+ // Offset is out of range
+ log.warn("Offset was out of range! topic=" + kafkaRequest.getTopic() + " partition=" + kafkaRequest.getPartition() + " offset=" + currentOffset);
+ Map requestInfo = new HashMap();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(-2, 1));
+ OffsetRequest request = new OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
+
+ long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(kafkaRequest.getTopic(), kafkaRequest.getPartition());
+ if (offsets.length > 0) {
+ currentOffset = offsets[0];
+ System.out.println("Reset current offset, currentOffset=" + currentOffset);
+ return fetch();
+ } else {
+ log.warn("Unable to reset current offset!");
+ return false;
+ }
+ } else {
+ System.out.println("Error encountered during a fetch request from Kafka");
+ System.out.println("Error Code generated : "
+ + fetchResponse.errorCode(kafkaRequest.getTopic(),
kafkaRequest.getPartition()));
- return false;
+ return false;
+ }
} else {
ByteBufferMessageSet messageBuffer = fetchResponse.messageSet(
kafkaRequest.getTopic(), kafkaRequest.getPartition());
@@ -195,7 +214,7 @@ public boolean fetch() throws IOException {
}
}
log.debug("Number of offsets to be skipped: " + skipped);
- while(skipped !=0 )
+ while (skipped !=0)
{
MessageAndOffset skippedMessage = messageIter.next();
log.debug("Skipping offset : " + skippedMessage.offset());
@@ -204,7 +223,7 @@ public boolean fetch() throws IOException {
if (!messageIter.hasNext()) {
System.out
- .println("No more data left to process. Returning false");
+ .println("No more data left to process for topic=" + kafkaRequest.getTopic() + " partition=" + kafkaRequest.getPartition() + ". Returning false");
messageIter = null;
return false;
}
@@ -212,7 +231,7 @@ public boolean fetch() throws IOException {
return true;
}
} catch (Exception e) {
- log.info("Exception generated during fetch");
+ System.out.println("Exception generated during fetch");
e.printStackTrace();
return false;
}
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java~ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java~
deleted file mode 100644
index dc0d97ee8..000000000
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/KafkaReader.java~
+++ /dev/null
@@ -1,278 +0,0 @@
-package com.linkedin.camus.etl.kafka.common;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import kafka.api.PartitionFetchInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchRequest;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import com.linkedin.camus.etl.kafka.CamusJob;
-
-/**
- * Poorly named class that handles kafka pull events within each
- * KafkaRecordReader.
- *
- * @author Richard Park
- */
-public class KafkaReader {
- // index of context
- private static Logger log = Logger.getLogger(KafkaReader.class);
- private EtlRequest kafkaRequest = null;
- private SimpleConsumer simpleConsumer = null;
-
- private long beginOffset;
- private long currentOffset;
- private long lastOffset;
- private long currentCount;
-
- private TaskAttemptContext context;
-
- private Iterator messageIter = null;
-
- private long totalFetchTime = 0;
- private long lastFetchTime = 0;
-
- private int fetchBufferSize;
-
- /**
- * Construct using the json representation of the kafka request
- */
- public KafkaReader(TaskAttemptContext context, EtlRequest request,
- int clientTimeout, int fetchBufferSize) throws Exception {
- this.fetchBufferSize = fetchBufferSize;
- this.context = context;
-
- log.info("bufferSize=" + fetchBufferSize);
- log.info("timeout=" + clientTimeout);
-
- // Create the kafka request from the json
-
- kafkaRequest = request;
-
- beginOffset = request.getOffset();
- currentOffset = request.getOffset();
- lastOffset = request.getLastOffset();
- currentCount = 0;
- totalFetchTime = 0;
-
- // read data from queue
-
- URI uri = kafkaRequest.getURI();
- simpleConsumer = new SimpleConsumer(uri.getHost(), uri.getPort(),
- CamusJob.getKafkaTimeoutValue(context),
- CamusJob.getKafkaBufferSize(context),
- CamusJob.getKafkaClientName(context));
- log.info("Connected to leader " + uri
- + " beginning reading at offset " + beginOffset
- + " latest offset=" + lastOffset);
- fetch();
- }
-
- public boolean hasNext() throws IOException {
- if (messageIter != null && messageIter.hasNext())
- return true;
- else
- return fetch();
-
- }
-
- /**
- * Fetches the next Kafka message and stuffs the results into the key and
- * value
- *
- * @param key
- * @param payload
- * @param pKey
- * @return true if there exists more events
- * @throws IOException
- */
- public boolean getNext(EtlKey key, BytesWritable payload ,BytesWritable pKey) throws IOException {
- if (hasNext()) {
-
- MessageAndOffset msgAndOffset = messageIter.next();
- Message message = msgAndOffset.message();
-
- ByteBuffer buf = message.payload();
- int origSize = buf.remaining();
- byte[] bytes = new byte[origSize];
- buf.get(bytes, buf.position(), origSize);
- payload.set(bytes, 0, origSize);
-
- buf = message.key();
- if(buf != null){
- origSize = buf.remaining();
- bytes = new byte[origSize];
- buf.get(bytes, buf.position(), origSize);
- pKey.set(bytes, 0, origSize);
- }
-
- key.clear();
- key.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(),
- kafkaRequest.getPartition(), currentOffset,
- msgAndOffset.offset() + 1, message.checksum());
-
- currentOffset = msgAndOffset.offset() + 1; // increase offset
- currentCount++; // increase count
-
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Creates a fetch request.
- *
- * @return false if there's no more fetches
- * @throws IOException
- */
-
- public boolean fetch() throws IOException {
- if (currentOffset >= lastOffset) {
- return false;
- }
- long tempTime = System.currentTimeMillis();
- TopicAndPartition topicAndPartition = new TopicAndPartition(
- kafkaRequest.getTopic(), kafkaRequest.getPartition());
- log.debug("\nAsking for offset : " + (currentOffset));
- PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(
- currentOffset, fetchBufferSize);
-
- HashMap fetchInfo = new HashMap();
- fetchInfo.put(topicAndPartition, partitionFetchInfo);
-
- FetchRequest fetchRequest = new FetchRequest(
- CamusJob.getKafkaFetchRequestCorrelationId(context),
- CamusJob.getKafkaClientName(context),
- CamusJob.getKafkaFetchRequestMaxWait(context),
- CamusJob.getKafkaFetchRequestMinBytes(context), fetchInfo);
-
- FetchResponse fetchResponse = null;
- try {
- fetchResponse = simpleConsumer.fetch(fetchRequest);
- if (fetchResponse.hasError()) {
- log.info("Error encountered during a fetch request from Kafka");
- log.info("Error Code generated : "
- + fetchResponse.errorCode(kafkaRequest.getTopic(),
- kafkaRequest.getPartition()));
- return false;
- } else {
- ByteBufferMessageSet messageBuffer = fetchResponse.messageSet(
- kafkaRequest.getTopic(), kafkaRequest.getPartition());
- lastFetchTime = (System.currentTimeMillis() - tempTime);
- log.debug("Time taken to fetch : "
- + (lastFetchTime / 1000) + " seconds");
- log.debug("The size of the ByteBufferMessageSet returned is : " + messageBuffer.sizeInBytes());
- int skipped = 0;
- totalFetchTime += lastFetchTime;
- messageIter = messageBuffer.iterator();
- //boolean flag = false;
- Iterator messageIter2 = messageBuffer
- .iterator();
- MessageAndOffset message = null;
- while (messageIter2.hasNext()) {
- message = messageIter2.next();
- if (message.offset() < currentOffset) {
- //flag = true;
- skipped++;
- } else {
- log.debug("Skipped offsets till : "
- + message.offset());
- break;
- }
- }
- log.debug("Number of offsets to be skipped: " + skipped);
- while(skipped !=0 )
- {
- MessageAndOffset skippedMessage = messageIter.next();
- log.debug("Skipping offset : " + skippedMessage.offset());
- skipped --;
- }
-
- if (!messageIter.hasNext()) {
- System.out
- .println("No more data left to process. Returning false");
- messageIter = null;
- return false;
- }
-
- return true;
- }
- } catch (Exception e) {
- log.info("Exception generated during fetch");
- e.printStackTrace();
- return false;
- }
-
- }
-
- /**
- * Closes this context
- *
- * @throws IOException
- */
- public void close() throws IOException {
- if (simpleConsumer != null) {
- simpleConsumer.close();
- }
- }
-
- /**
- * Returns the total bytes that will be fetched. This is calculated by
- * taking the diffs of the offsets
- *
- * @return
- */
- public long getTotalBytes() {
- return (lastOffset > beginOffset) ? lastOffset - beginOffset : 0;
- }
-
- /**
- * Returns the total bytes that have been fetched so far
- *
- * @return
- */
- public long getReadBytes() {
- return currentOffset - beginOffset;
- }
-
- /**
- * Returns the number of events that have been read r
- *
- * @return
- */
- public long getCount() {
- return currentCount;
- }
-
- /**
- * Returns the fetch time of the last fetch in ms
- *
- * @return
- */
- public long getFetchTime() {
- return lastFetchTime;
- }
-
- /**
- * Returns the totalFetchTime in ms
- *
- * @return
- */
- public long getTotalFetchTime() {
- return totalFetchTime;
- }
-}
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriter.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriter.java
new file mode 100644
index 000000000..a71460b7e
--- /dev/null
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriter.java
@@ -0,0 +1,37 @@
+package com.linkedin.camus.etl.kafka.common;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.etl.IEtlKey;
+
+@SuppressWarnings("rawtypes")
+public class StringRecordWriter extends RecordWriter {
+ private final OutputStream output;
+ private final String recordDelimiter;
+
+ public StringRecordWriter(OutputStream compressedOutput,
+ String recordDelimiter) {
+ this.output = compressedOutput;
+ this.recordDelimiter = recordDelimiter;
+ }
+
+ @Override
+ public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
+ String record = (String) data.getRecord() + recordDelimiter;
+ // Need to specify Charset because the default might not be UTF-8.
+ // Bug fix for https://jira.airbnb.com:8443/browse/PRODUCT-5551.
+ output.write(record.getBytes(Charset.forName("UTF-8")));
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ output.close();
+ }
+}
\ No newline at end of file
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterProvider.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterProvider.java
index 9780e28fd..161e94553 100644
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterProvider.java
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterProvider.java
@@ -1,17 +1,20 @@
package com.linkedin.camus.etl.kafka.common;
-import com.linkedin.camus.coders.CamusWrapper;
-import com.linkedin.camus.etl.IEtlKey;
-import com.linkedin.camus.etl.RecordWriterProvider;
-import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.IOException;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.log4j.Logger;
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.etl.IEtlKey;
+import com.linkedin.camus.etl.RecordWriterProvider;
+import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
/**
@@ -19,6 +22,8 @@
* a String record as bytes to HDFS without any reformatting or compession.
*/
public class StringRecordWriterProvider implements RecordWriterProvider {
+
+
public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
public static final String DEFAULT_RECORD_DELIMITER = "";
@@ -29,9 +34,10 @@ public class StringRecordWriterProvider implements RecordWriterProvider {
// init(JobContext context) method signature that EtlMultiOutputFormat would always call.
@Override
public String getFilenameExtension() {
- return "";
+ return ".gz";
}
+ @SuppressWarnings("rawtypes")
@Override
public RecordWriter getDataRecordWriter(
TaskAttemptContext context,
@@ -56,21 +62,12 @@ context, fileName, getFilenameExtension()
);
// Create a FSDataOutputStream stream that will write to path.
- final FSDataOutputStream writer = path.getFileSystem(context.getConfiguration()).create(path);
+ FSDataOutputStream writer = path.getFileSystem(context.getConfiguration()).create(path);
- // Return a new anonymous RecordWriter that uses the
- // FSDataOutputStream writer to write bytes straight into path.
- return new RecordWriter() {
- @Override
- public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
- String record = (String)data.getRecord() + recordDelimiter;
- writer.write(record.getBytes());
- }
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(context.getConfiguration());
+ CompressionCodec codec = codecFactory.getCodec(path);
+ final CompressionOutputStream compressedOutput = codec.createOutputStream(writer);
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- writer.close();
- }
- };
+ return new StringRecordWriter(compressedOutput, recordDelimiter);
}
}
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.java
index afed9091b..d3e6a7e9c 100644
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.java
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlInputFormat.java
@@ -68,13 +68,13 @@ public class EtlInputFormat extends InputFormat {
public static final String ETL_AUDIT_IGNORE_SERVICE_TOPIC_LIST = "etl.audit.ignore.service.topic.list";
private static Logger log = null;
-
+
public EtlInputFormat()
{
if (log == null)
log = Logger.getLogger(getClass());
}
-
+
public static void setLogger(Logger log){
EtlInputFormat.log = log;
}
@@ -88,7 +88,7 @@ public RecordReader createRecordReader(
/**
* Gets the metadata from Kafka
- *
+ *
* @param context
* @return
*/
@@ -106,7 +106,7 @@ public List getKafkaMetadata(JobContext context) {
Exception savedException = null;
while (i < brokers.size() && !fetchMetaDataSucceeded) {
SimpleConsumer consumer = createConsumer(context, brokers.get(i));
- log.info(String.format("Fetching metadata from broker %s with client id %s for %d topic(s) %s",
+ System.out.println(String.format("Fetching metadata from broker %s with client id %s for %d topic(s) %s",
brokers.get(i), consumer.clientId(), metaRequestTopics.size(), metaRequestTopics));
try {
topicMetadataList = consumer.send(new TopicMetadataRequest(metaRequestTopics)).topicsMetadata();
@@ -126,7 +126,7 @@ public List getKafkaMetadata(JobContext context) {
CamusJob.stopTiming("kafkaSetupTime");
return topicMetadataList;
}
-
+
private SimpleConsumer createConsumer(JobContext context, String broker) {
if (!broker.matches(".+:\\d+"))
throw new InvalidParameterException("The kakfa broker " + broker + " must follow address:port pattern");
@@ -142,7 +142,7 @@ private SimpleConsumer createConsumer(JobContext context, String broker) {
/**
* Gets the latest offsets and create the requests as needed
- *
+ *
* @param context
* @param offsetRequestInfo
* @return
@@ -174,25 +174,47 @@ public ArrayList fetchLatestOffsetAndCreateEtlRequests(
partitionEarliestOffsetRequestInfo);
}
- OffsetResponse latestOffsetResponse = consumer
+ int tries = 0;
+ OffsetResponse latestOffsetResponse = null;
+ OffsetResponse earliestOffsetResponse = null;
+ while (tries < 3) {
+ latestOffsetResponse = consumer
.getOffsetsBefore(new OffsetRequest(latestOffsetInfo,
- kafka.api.OffsetRequest.CurrentVersion(), CamusJob
- .getKafkaClientName(context)));
- OffsetResponse earliestOffsetResponse = consumer
+ kafka.api.OffsetRequest.CurrentVersion(), CamusJob
+ .getKafkaClientName(context)));
+ earliestOffsetResponse = consumer
.getOffsetsBefore(new OffsetRequest(earliestOffsetInfo,
- kafka.api.OffsetRequest.CurrentVersion(), CamusJob
- .getKafkaClientName(context)));
+ kafka.api.OffsetRequest.CurrentVersion(), CamusJob
+ .getKafkaClientName(context)));
+ if (!latestOffsetResponse.hasError() && !earliestOffsetResponse.hasError()) {
+ break;
+ }
+ try {
+ Thread.sleep(300);
+ } catch (java.lang.InterruptedException e) {
+ // ...
+ }
+ ++tries;
+ }
consumer.close();
for (TopicAndPartition topicAndPartition : topicAndPartitions) {
- long latestOffset = latestOffsetResponse.offsets(
+ long latestOffset = 0;
+ long offsets[] = latestOffsetResponse.offsets(
topicAndPartition.topic(),
- topicAndPartition.partition())[0];
- long earliestOffset = earliestOffsetResponse.offsets(
+ topicAndPartition.partition());
+ if (offsets.length > 0) {
+ latestOffset = offsets[0];
+ }
+ long earliestOffset = 0;
+ offsets = earliestOffsetResponse.offsets(
topicAndPartition.topic(),
- topicAndPartition.partition())[0];
+ topicAndPartition.partition());
+ if (offsets.length > 0) {
+ earliestOffset = offsets[0];
+ }
EtlRequest etlRequest = new EtlRequest(context,
topicAndPartition.topic(), Integer.toString(leader
- .getLeaderId()), topicAndPartition.partition(),
+ .getLeaderId()), topicAndPartition.partition(),
leader.getUri());
etlRequest.setLatestOffset(latestOffset);
etlRequest.setEarliestOffset(earliestOffset);
@@ -224,7 +246,7 @@ public List filterWhitelistTopics(
if (Pattern.matches(regex, topicMetadata.topic())) {
filteredTopics.add(topicMetadata);
} else {
- log.info("Discarding topic : " + topicMetadata.topic());
+ System.out.println("Discarding topic : " + topicMetadata.topic());
}
}
return filteredTopics;
@@ -259,17 +281,17 @@ public List getSplits(JobContext context) throws IOException,
for (TopicMetadata topicMetadata : topicMetadataList) {
if (Pattern.matches(regex, topicMetadata.topic())) {
- log.info("Discarding topic (blacklisted): "
+ System.out.println("Discarding topic (blacklisted): "
+ topicMetadata.topic());
} else if (!createMessageDecoder(context, topicMetadata.topic())) {
- log.info("Discarding topic (Decoder generation failed) : "
+ System.out.println("Discarding topic (Decoder generation failed) : "
+ topicMetadata.topic());
} else {
for (PartitionMetadata partitionMetadata : topicMetadata
.partitionsMetadata()) {
if (partitionMetadata.errorCode() != ErrorMapping
.NoError()) {
- log.info("Skipping the creation of ETL request for Topic : "
+ System.out.println("Skipping the creation of ETL request for Topic : "
+ topicMetadata.topic()
+ " and Partition : "
+ partitionMetadata.partitionId()
@@ -360,7 +382,7 @@ public int compare(EtlRequest r1, EtlRequest r2) {
request.getPartition(), 0, request
.getLastOffset()));
}
- log.info(request);
+ System.out.println(request);
}
writePrevious(offsetKeys.values(), context);
@@ -497,7 +519,7 @@ private Map getPreviousOffsets(Path[] inputs,
for (Path input : inputs) {
FileSystem fs = input.getFileSystem(context.getConfiguration());
for (FileStatus f : fs.listStatus(input, new OffsetFileFilter())) {
- log.info("previous offset file:" + f.getPath().toString());
+ System.out.println("previous offset file:" + f.getPath().toString());
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
f.getPath(), context.getConfiguration());
EtlKey key = new EtlKey();
diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java
index ed8e36fde..d3be650b1 100644
--- a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java
+++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlRecordReader.java
@@ -4,6 +4,7 @@
import com.linkedin.camus.coders.MessageDecoder;
import com.linkedin.camus.etl.kafka.CamusJob;
import com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory;
+import com.linkedin.camus.etl.kafka.common.DateUtils;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.EtlRequest;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
@@ -25,6 +26,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
public class EtlRecordReader extends RecordReader {
private static final String PRINT_MAX_DECODER_EXCEPTIONS = "max.decoder.exceptions.to.print";
@@ -102,7 +104,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
} else {
beginTimeStamp = 0;
}
-
+
ignoreServerServiceList = new HashSet();
for(String ignoreServerServiceTopic : EtlInputFormat.getEtlAuditIgnoreServiceTopicList(context))
{
@@ -187,7 +189,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
// rescheduled in the next
// run.
if (System.currentTimeMillis() > maxPullTime) {
- log.info("Max pull time reached");
+ System.out.println("Max pull time reached");
if (reader != null) {
closeReader();
}
@@ -209,7 +211,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
key.set(request.getTopic(), request.getLeaderId(), request.getPartition(),
request.getOffset(), request.getOffset(), 0);
value = null;
- log.info("\n\ntopic:" + request.getTopic() + " partition:"
+ System.out.println("\n\ntopic:" + request.getTopic() + " partition:"
+ request.getPartition() + " beginOffset:" + request.getOffset()
+ " estimatedLastOffset:" + request.getLastOffset());
@@ -234,7 +236,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
mapperContext.getCounter("total", "data-read").increment(msgValue.getLength());
mapperContext.getCounter("total", "event-count").increment(1);
byte[] bytes = getBytes(msgValue);
- byte[] keyBytes = getBytes(msgKey);
+ /*byte[] keyBytes = getBytes(msgKey);
// check the checksum of message.
// If message has partition key, need to construct it with Key for checkSum to match
if (keyBytes.length == 0) {
@@ -247,7 +249,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
throw new ChecksumException("Invalid message checksum "
+ message.checksum() + ". Expected " + key.getChecksum(),
key.getOffset());
- }
+ }*/
long tempTime = System.currentTimeMillis();
CamusWrapper wrapper;
@@ -259,7 +261,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
exceptionCount++;
} else if (exceptionCount == getMaximumDecoderExceptionsToPrint(context)) {
exceptionCount = Integer.MAX_VALUE; //Any random value
- log.info("The same exception has occured for more than " + getMaximumDecoderExceptionsToPrint(context) + " records. All further exceptions will not be printed");
+ System.out.println("The same exception has occured for more than " + getMaximumDecoderExceptionsToPrint(context) + " records. All further exceptions will not be printed");
}
continue;
}
@@ -286,16 +288,16 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
DateTime time = new DateTime(timeStamp);
statusMsg += " begin read at " + time.toString();
context.setStatus(statusMsg);
- log.info(key.getTopic() + " begin read at " + time.toString());
+ System.out.println(key.getTopic() + " begin read at " + time.toString());
endTimeStamp = (time.plusHours(this.maxPullHours)).getMillis();
} else if (timeStamp > endTimeStamp || System.currentTimeMillis() > maxPullTime) {
if (timeStamp > endTimeStamp)
- log.info("Kafka Max history hours reached");
+ System.out.println("Kafka Max history hours reached");
if (System.currentTimeMillis() > maxPullTime)
- log.info("Kafka pull time limit reached");
+ System.out.println("Kafka pull time limit reached");
statusMsg += " max read at " + new DateTime(timeStamp).toString();
context.setStatus(statusMsg);
- log.info(key.getTopic() + " max read at "
+ System.out.println(key.getTopic() + " max read at "
+ new DateTime(timeStamp).toString());
mapperContext.getCounter("total", "request-time(ms)").increment(
reader.getFetchTime());
@@ -308,13 +310,20 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
mapperContext.getCounter("total", "decode-time(ms)").increment(decodeTime);
+ // timestamp of the nearest hour in our configuration
+ long datePartition = DateUtils.getPartition(EtlMultiOutputFormat.getEtlOutputFileTimePartitionMins(context) * 60000L, timeStamp);
+ // more readable form
+ String datePartitionString = new DateTime(datePartition, DateTimeZone.UTC).toString("YYYY/MM/dd/HH");
+
+ mapperContext.getCounter("total", datePartition + "_" + datePartitionString).increment(1);
+
if (reader != null) {
mapperContext.getCounter("total", "request-time(ms)").increment(
reader.getFetchTime());
}
return true;
}
- log.info("Records read : " + count);
+ System.out.println("Records read : " + count);
count = 0;
reader = null;
} catch (Throwable t) {
@@ -339,7 +348,7 @@ private void closeReader() throws IOException {
}
}
}
-
+
public void setServerService()
{
if(ignoreServerServiceList.contains(key.getTopic()) || ignoreServerServiceList.contains("all"))
diff --git a/camus-etl-kafka/src/test/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterTest.java b/camus-etl-kafka/src/test/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterTest.java
new file mode 100644
index 000000000..4f6269ade
--- /dev/null
+++ b/camus-etl-kafka/src/test/java/com/linkedin/camus/etl/kafka/common/StringRecordWriterTest.java
@@ -0,0 +1,59 @@
+package com.linkedin.camus.etl.kafka.common;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.etl.IEtlKey;
+
+public class StringRecordWriterTest {
+
+ /**
+ * Test for Bug https://jira.airbnb.com:8443/browse/PRODUCT-5551
+ * StringRecordWriter need to handle unicode correctly when the default
+ * Charset is not utf-8 This test should fail if -Dfile.encoding=US-ASCII is
+ * set for JVM.
+ */
+ @Test
+ public void testUnicode() throws IOException, InterruptedException {
+ String recordDelimiter = "";
+ OutputStream outputStream = mock(OutputStream.class);
+ ArgumentCaptor argument = ArgumentCaptor.forClass(byte[].class);
+ @SuppressWarnings("rawtypes")
+ RecordWriter writer = new StringRecordWriter(
+ outputStream, recordDelimiter);
+ String record = "中"; // a unicode character
+ CamusWrapper value = new CamusWrapper(record);
+ writer.write(null, value);
+ verify(outputStream).write(argument.capture());
+ byte[] outputBytes = argument.getAllValues().get(0);
+ assertArrayEquals(record.getBytes(Charset.forName("UTF-8")), outputBytes);
+ }
+
+ @Test
+ public void testDelimiter() throws IOException, InterruptedException {
+ String recordDelimiter = ";";
+ OutputStream outputStream = mock(OutputStream.class);
+ ArgumentCaptor argument = ArgumentCaptor.forClass(byte[].class);
+ @SuppressWarnings("rawtypes")
+ RecordWriter writer = new StringRecordWriter(
+ outputStream, recordDelimiter);
+ String record = "abc";
+ CamusWrapper value = new CamusWrapper(record);
+ writer.write(null, value);
+ verify(outputStream).write(argument.capture());
+ byte[] outputBytes = argument.getAllValues().get(0);
+ byte[] inputBytes = (record + recordDelimiter).getBytes(Charset
+ .forName("UTF-8"));
+ assertArrayEquals(inputBytes, outputBytes);
+ }
+}
diff --git a/camus-example/pom.xml b/camus-example/pom.xml
index bee4e695f..48efe83d3 100644
--- a/camus-example/pom.xml
+++ b/camus-example/pom.xml
@@ -5,7 +5,7 @@
com.linkedin.camus
camus-parent
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
camus-example
@@ -32,6 +32,12 @@
org.apache.hadoop
hadoop-client
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ provided
diff --git a/camus-example/src/main/resources/org-xerial-snappy.properties b/camus-example/src/main/resources/org-xerial-snappy.properties
new file mode 100644
index 000000000..111c6c046
--- /dev/null
+++ b/camus-example/src/main/resources/org-xerial-snappy.properties
@@ -0,0 +1,2 @@
+org.xerial.snappy.use.systemlib=false
+org.xerial.snappy.disable.bundled.libs=false
diff --git a/camus-kafka-coders/pom.xml b/camus-kafka-coders/pom.xml
index 7c97e916b..3a5d9fddf 100644
--- a/camus-kafka-coders/pom.xml
+++ b/camus-kafka-coders/pom.xml
@@ -1,32 +1,39 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
-
- com.linkedin.camus
- camus-parent
- 0.1.0-SNAPSHOT
-
+
+ com.linkedin.camus
+ camus-parent
+ 0.3.0-SNAPSHOT
+
- camus-kafka-coders
- Camus Kafka producer
- jar
-
-
- com.linkedin.camus
- camus-schema-registry
-
-
- kafka
- kafka
-
-
-
- com.google.code.gson
- gson
- 2.2.4
- compile
-
-
+
+ 2.0-RC3
+
+ camus-kafka-coders
+ Camus Kafka producer
+ jar
+
+
+ com.linkedin.camus
+ camus-schema-registry
+
+
+ org.apache.kafka
+ kafka_2.10
+
+
+ net.minidev
+ json-smart
+ ${json-smart.version}
+
+
+ junit
+ junit
+ 4.8.1
+ test
+
+
diff --git a/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JitneyTimestampDecoder.java b/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JitneyTimestampDecoder.java
new file mode 100644
index 000000000..0c2b2dffb
--- /dev/null
+++ b/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JitneyTimestampDecoder.java
@@ -0,0 +1,119 @@
+package com.linkedin.camus.etl.kafka.coders;
+
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.Properties;
+
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import net.minidev.json.JSONValue;
+
+import org.apache.log4j.Logger;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.coders.MessageDecoder;
+
+
+/**
+ * MessageDecoder class that will work with Airbnb Jitney message format, which is an json array
+ * with two elements, the one element is the Jitney message meta data, and the second element is
+ * payload, the as below:
+ * [{"eventType":"example_event_type",
+ * "id":"example_id",
+ * "createdAt":1428000472668,
+ * "sequenceOrigin":1428000397254,
+ * "sequenceNumber":3,
+ * "sourceName":"example_source",
+ * "sourceIpAddress":"127.0.0.1",
+ * "sourceHostname":"localhost",
+ * "version":1},
+ * {"example_payload":
+ * {"userId":1,
+ * "activityType":"test_activity",
+ * ...
+ * },
+ * "eventType":"example_event_type"
+ * }]
+ *
+ * This decoder converts the payload into a JSON array, and look for a field named 'createdAt' in
+ * the each of the JSON object, then set the CamusWrapper's timestamp property to the record's
+ * timestamp. If the JSON does not have a timestamp, then System.currentTimeMillis() will be used.
+ * The timestamp will be used for placing the message into the correct partition in kafka.
+ * This MessageDecoder returns a CamusWrapper that works with Strings payloads.
+ */
+public class JitneyTimestampDecoder extends MessageDecoder {
+ private static org.apache.log4j.Logger log = Logger.getLogger(JsonStringMessageDecoder.class);
+
+ // Property for format of timestamp in JSON timestamp field.
+ public static final String CAMUS_MESSAGE_TIMESTAMP_FORMAT = "camus.message.timestamp.format";
+ public static final String DEFAULT_TIMESTAMP_FORMAT = "[dd/MMM/yyyy:HH:mm:ss Z]";
+
+ // Property for the JSON field name of the timestamp.
+ public static final String CAMUS_MESSAGE_TIMESTAMP_FIELD = "camus.message.timestamp.field";
+ public static final String DEFAULT_TIMESTAMP_FIELD = "createdAt";
+
+ private String timestampFormat;
+ private String timestampField;
+
+ @Override
+ public void init(Properties props, String topicName) {
+ this.props = props;
+ this.topicName = topicName;
+
+ timestampFormat = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FORMAT, DEFAULT_TIMESTAMP_FORMAT);
+ timestampField = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FIELD, DEFAULT_TIMESTAMP_FIELD);
+ }
+
+ @Override
+ public CamusWrapper decode(byte[] payload) {
+ long timestamp = 0;
+ String payloadString;
+ // Need to specify Charset because the default might not be UTF-8.
+ // Bug fix for https://jira.airbnb.com:8443/browse/PRODUCT-5551.
+ payloadString = new String(payload, Charset.forName("UTF-8"));
+
+ // Parse the payload into a JsonObject.
+ try {
+ Object obj = JSONValue.parse(payloadString);
+ if (obj instanceof JSONObject) {
+ timestamp = getTimestamp((JSONObject) obj);
+ } else if (obj instanceof JSONArray) {
+ for (Object elem : (JSONArray) obj) {
+ timestamp = getTimestamp((JSONObject) elem);
+ if (timestamp != 0L) {
+ break;
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ log.error("Caught exception while parsing JSON string '" + payloadString + "'.");
+ throw new RuntimeException(e);
+ }
+
+ // If timestamp wasn't set in the above block,
+ // then set it to current time.
+ if (timestamp == 0) {
+ log.warn("Couldn't find or parse timestamp field '" + timestampField + "' in JSON message, defaulting to current time.");
+ timestamp = System.currentTimeMillis();;
+ }
+
+ return new CamusWrapper(payloadString, timestamp);
+ }
+
+ private long getTimestamp(JSONObject jsonObject) {
+ // Attempt to read and parse the timestamp element into a long.
+ if (jsonObject.get(timestampField) != null) {
+ Object ts = jsonObject.get(timestampField);
+ if (ts instanceof String) {
+ try {
+ return new SimpleDateFormat(timestampFormat).parse((String)ts).getTime();
+ } catch (Exception e) {
+ log.error("Could not parse timestamp '" + ts + "' while decoding JSON message.");
+ }
+ } else if (ts instanceof Long) {
+ return (Long)ts;
+ }
+ }
+ return 0L;
+ }
+}
diff --git a/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java b/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java
index ba72f9cd1..af1caa115 100644
--- a/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java
+++ b/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java
@@ -1,18 +1,16 @@
package com.linkedin.camus.etl.kafka.coders;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Properties;
+import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
+import java.util.Properties;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonObject;
+import net.minidev.json.JSONObject;
+import net.minidev.json.JSONValue;
+
+import org.apache.log4j.Logger;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
-import com.linkedin.camus.coders.MessageDecoderException;
-
-import org.apache.log4j.Logger;
/**
@@ -50,33 +48,39 @@ public void init(Properties props, String topicName) {
public CamusWrapper decode(byte[] payload) {
long timestamp = 0;
String payloadString;
- JsonObject jsonObject;
-
- payloadString = new String(payload);
+ JSONObject jobj;
+ // Need to specify Charset because the default might not be UTF-8.
+ // Bug fix for https://jira.airbnb.com:8443/browse/PRODUCT-5551.
+ payloadString = new String(payload, Charset.forName("UTF-8"));
// Parse the payload into a JsonObject.
try {
- jsonObject = new JsonParser().parse(payloadString).getAsJsonObject();
+ jobj = (JSONObject) JSONValue.parse(payloadString);
} catch (RuntimeException e) {
log.error("Caught exception while parsing JSON string '" + payloadString + "'.");
throw new RuntimeException(e);
}
// Attempt to read and parse the timestamp element into a long.
- if (jsonObject.has(timestampField)) {
- String timestampString = jsonObject.get(timestampField).getAsString();
- try {
- timestamp = new SimpleDateFormat(timestampFormat).parse(timestampString).getTime();
- } catch (Exception e) {
- log.error("Could not parse timestamp '" + timestampString + "' while decoding JSON message.");
+ if (jobj.get(timestampField) != null) {
+ Object ts = jobj.get(timestampField);
+ if (ts instanceof String) {
+ try {
+ timestamp = new SimpleDateFormat(timestampFormat).parse((String)ts).getTime();
+ } catch (Exception e) {
+ log.error("Could not parse timestamp '" + ts + "' while decoding JSON message.");
+ }
+ } else if (ts instanceof Long) {
+ timestamp = (Long)ts;
}
}
// If timestamp wasn't set in the above block,
// then set it to current time.
+ final long now = System.currentTimeMillis();
if (timestamp == 0) {
log.warn("Couldn't find or parse timestamp field '" + timestampField + "' in JSON message, defaulting to current time.");
- timestamp = System.currentTimeMillis();
+ timestamp = now;
}
return new CamusWrapper(payloadString, timestamp);
diff --git a/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/StringMessageDecoder.java b/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/StringMessageDecoder.java
new file mode 100644
index 000000000..f06789548
--- /dev/null
+++ b/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/StringMessageDecoder.java
@@ -0,0 +1,55 @@
+package com.linkedin.camus.etl.kafka.coders;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.text.SimpleDateFormat;
+
+import com.linkedin.camus.coders.CamusWrapper;
+import com.linkedin.camus.coders.MessageDecoder;
+import com.linkedin.camus.coders.MessageDecoderException;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * MessageDecoder class that will convert the payload into a JSON object,
+ * look for a field named 'timestamp', and then set the CamusWrapper's
+ * timestamp property to the record's timestamp. If the JSON does not have
+ * a timestamp, then System.currentTimeMillis() will be used.
+ * This MessageDecoder returns a CamusWrapper that works with Strings payloads,
+ * since JSON data is always a String.
+ */
+public class StringMessageDecoder extends MessageDecoder {
+ private static org.apache.log4j.Logger log = Logger.getLogger(StringMessageDecoder.class);
+
+ // Property for format of timestamp in JSON timestamp field.
+ public static final String CAMUS_MESSAGE_TIMESTAMP_FORMAT = "camus.message.timestamp.format";
+ public static final String DEFAULT_TIMESTAMP_FORMAT = "[dd/MMM/yyyy:HH:mm:ss Z]";
+
+ // Property for the JSON field name of the timestamp.
+ public static final String CAMUS_MESSAGE_TIMESTAMP_FIELD = "camus.message.timestamp.field";
+ public static final String DEFAULT_TIMESTAMP_FIELD = "timestamp";
+
+ private String timestampFormat;
+ private String timestampField;
+
+ @Override
+ public void init(Properties props, String topicName) {
+ this.props = props;
+ this.topicName = topicName;
+
+ timestampFormat = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FORMAT, DEFAULT_TIMESTAMP_FORMAT);
+ timestampField = props.getProperty(CAMUS_MESSAGE_TIMESTAMP_FIELD, DEFAULT_TIMESTAMP_FIELD);
+ }
+
+ @Override
+ public CamusWrapper decode(byte[] payload) {
+ long timestamp = System.currentTimeMillis();
+ String payloadString;
+
+ payloadString = new String(payload);
+
+ return new CamusWrapper(payloadString, timestamp);
+ }
+}
diff --git a/camus-kafka-coders/src/test/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoderTest.java b/camus-kafka-coders/src/test/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoderTest.java
new file mode 100644
index 000000000..a9c9bdfd5
--- /dev/null
+++ b/camus-kafka-coders/src/test/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoderTest.java
@@ -0,0 +1,26 @@
+package com.linkedin.camus.etl.kafka.coders;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.nio.charset.Charset;
+
+import org.junit.Test;
+
+import com.linkedin.camus.coders.CamusWrapper;
+
+public class JsonStringMessageDecoderTest {
+
+ /**
+ * Test for Bug https://jira.airbnb.com:8443/browse/PRODUCT-5551
+ * JsonStringMessageDecoder need to handle unicode correctly when the default Charset is not utf-8
+ * This test should fail if -Dfile.encoding=US-ASCII is set for JVM.
+ */
+ @Test
+ public void testUnicodeDecoding() {
+ JsonStringMessageDecoder decoder = new JsonStringMessageDecoder();
+ byte[] payload = "{data:中}".getBytes(Charset.forName("UTF-8"));
+ CamusWrapper camusWrapper = decoder.decode(payload);
+ byte[] decoded = camusWrapper.getRecord().getBytes(Charset.forName("UTF-8"));
+ assertArrayEquals(payload, decoded);
+ }
+}
diff --git a/camus-schema-registry-avro/pom.xml b/camus-schema-registry-avro/pom.xml
index 111c135d4..83d4c2f5b 100644
--- a/camus-schema-registry-avro/pom.xml
+++ b/camus-schema-registry-avro/pom.xml
@@ -5,7 +5,7 @@
com.linkedin.camus
camus-parent
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
camus-schema-registry-avro
diff --git a/camus-schema-registry/pom.xml b/camus-schema-registry/pom.xml
index 1696362cd..0ae8ca59d 100644
--- a/camus-schema-registry/pom.xml
+++ b/camus-schema-registry/pom.xml
@@ -5,7 +5,7 @@
com.linkedin.camus
camus-parent
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
camus-schema-registry
diff --git a/camus-sweeper/dependency-reduced-pom.xml b/camus-sweeper/dependency-reduced-pom.xml
index 6c0b843ab..46b3d0b16 100644
--- a/camus-sweeper/dependency-reduced-pom.xml
+++ b/camus-sweeper/dependency-reduced-pom.xml
@@ -3,13 +3,13 @@
camus-parent
com.linkedin.camus
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
4.0.0
com.linkedin.camus
camus-sweeper
Camus compaction code small files produced Camus
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
${project.artifactId}-${project.version}
@@ -53,6 +53,12 @@
1.0.4
compile
+
+ log4j
+ log4j
+ 1.2.17
+ provided
+
diff --git a/camus-sweeper/pom.xml b/camus-sweeper/pom.xml
index b95da5644..19485606e 100644
--- a/camus-sweeper/pom.xml
+++ b/camus-sweeper/pom.xml
@@ -4,12 +4,12 @@
com.linkedin.camus
camus-sweeper
Camus compaction code small files produced Camus
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
com.linkedin.camus
camus-parent
- 0.1.0-SNAPSHOT
+ 0.3.0-SNAPSHOT
@@ -92,4 +92,4 @@
-
\ No newline at end of file
+
diff --git a/lib/kafka-0.8-SNAPSHOT.jar b/lib/kafka-0.8-SNAPSHOT.jar
index 1e6190a16..7956b7310 100644
Binary files a/lib/kafka-0.8-SNAPSHOT.jar and b/lib/kafka-0.8-SNAPSHOT.jar differ
diff --git a/lib/kafka-0.8-SNAPSHOT.xml b/lib/kafka-0.8-SNAPSHOT.xml
index 321c9548d..7495be227 100644
--- a/lib/kafka-0.8-SNAPSHOT.xml
+++ b/lib/kafka-0.8-SNAPSHOT.xml
@@ -1,82 +1,129 @@
-
-
- 4.0.0
-
- kafka
- kafka
- 0.8-SNAPSHOT
-
-
-
- org.scala-lang
- scala-library
- 2.8.0
-
-
- com.github.sgroschupf
- zkclient
- 0.1
-
-
- org.apache.zookeeper
- zookeeper
- 3.3.4
-
-
- log4j
- log4j
-
-
- jline
- jline
-
-
-
-
- log4j
- log4j
- 1.2.15
-
-
- com.sun.jmx
- jmxri
-
-
- com.sun.jdmk
- jmxtools
-
-
- javax.jms
- jms
-
-
- javax.jms
- mail
-
-
-
-
- org.xerial.snappy
- snappy-java
- 1.0.4.1
-
-
- com.yammer.metrics
- metrics-core
- 2.2.0
-
-
- com.yammer.metrics
- metrics-annotation
- 2.2.0
-
-
- org.scala-lang
- scala-library
- 2.8.0
-
-
-
+
+
+ 4.0.0
+ org.apache.kafka
+ kafka_2.10
+ jar
+ kafka
+ 0.8.1
+ kafka
+
+ org.apache.kafka
+
+
+ org.apache
+ apache
+ 10
+
+
+
+ Apache 2
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.3.4
+
+
+ log4j
+ log4j
+
+
+ jline
+ jline
+
+
+
+
+ org.scala-lang
+ scala-library
+ 2.10.3
+
+
+ log4j
+ log4j
+ 1.2.15
+
+
+ javax.jms
+ jms
+
+
+
+
+ net.sf.jopt-simple
+ jopt-simple
+ 3.2
+
+
+ org.slf4j
+ slf4j-simple
+ 1.6.4
+
+
+ org.scala-lang
+ scala-compiler
+ 2.10.3
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.3.4
+
+
+ com.101tec
+ zkclient
+ 0.3
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.0.4.1
+
+
+ com.yammer.metrics
+ metrics-core
+ 2.2.0
+
+
+ com.yammer.metrics
+ metrics-annotation
+ 2.2.0
+
+
+ com.yammer.metrics
+ metrics-graphite
+ 2.2.0
+
+
+ org.easymock
+ easymock
+ 3.0
+ test
+
+
+ junit
+ junit
+ 4.1
+ test
+
+
+ org.scalatest
+ scalatest_2.10
+ 1.9.1
+ test
+
+
+
+
+ SonaTypeScalaTestrepo
+ SonaType ScalaTest repo
+ https://oss.sonatype.org/content/groups/public/org/scalatest/
+ default
+
+
diff --git a/pom.xml b/pom.xml
index f133cecce..2dfe3b376 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,207 +1,243 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
-
- 3.0.0
-
+
+ 3.0.0
+
- com.linkedin.camus
- camus-parent
- 0.1.0-SNAPSHOT
- pom
- Camus Parent
-
- Camus is LinkedIn's Kafka->HDFS pipeline. It is a mapreduce job that does distributed data loads out of Kafka.
-
- https://github.com/linkedin/camus
+ com.linkedin.camus
+ camus-parent
+ 0.3.0-SNAPSHOT
+ pom
+ Camus Parent
+
+ Camus is LinkedIn's Kafka->HDFS pipeline. It is a mapreduce job that does distributed data loads out of Kafka.
+
+ https://github.com/linkedin/camus
-
- UTF-8
-
+
+ UTF-8
+
+ 1.6
+ true
+ true
+
-
-
-
- com.linkedin.camus
- camus-api
- 0.1.0-SNAPSHOT
-
-
- com.linkedin.camus
- camus-etl-kafka
- 0.1.0-SNAPSHOT
-
-
- com.linkedin.camus
- camus-kafka-coders
- 0.1.0-SNAPSHOT
-
-
- com.linkedin.camus
- camus-schema-registry
- 0.1.0-SNAPSHOT
-
-
- com.linkedin.camus
- camus-schema-registry
- 0.1.0-SNAPSHOT
- test-jar
- test
-
-
- log4j
- log4j
- 1.2.17
-
-
- org.apache.avro
- avro
- 1.7.3
-
-
- org.apache.avro
- avro-mapred
- 1.7.3
-
-
- org.apache.avro
- 1.7.4-SNAPSHOT
- avro-repo-bundle
-
-
- joda-time
- joda-time
- 1.6
-
-
- org.apache.hadoop
- hadoop-client
- 1.0.3
-
-
- kafka
- kafka
- 0.8-SNAPSHOT
-
-
- com.github.sgroschupf
- zkclient
- 0.1
-
-
- org.apache.zookeeper
- zookeeper
- 3.3.4
-
-
- junit
- junit
- 4.8.1
-
-
- org.scala-lang
- scala-library
- 2.8.0
-
-
- org.xerial.snappy
- snappy-java
- 1.0.4.1
-
+
+
+
+ com.linkedin.camus
+ camus-api
+ 0.3.0-SNAPSHOT
+
+
+ com.linkedin.camus
+ camus-etl-kafka
+ 0.3.0-SNAPSHOT
+
+
+ com.linkedin.camus
+ camus-kafka-coders
+ 0.3.0-SNAPSHOT
+
+
+ net.minidev
+ json-smart
+ 2.0-RC3
+
+
+ com.linkedin.camus
+ camus-schema-registry
+ 0.3.0-SNAPSHOT
+
+
+ com.linkedin.camus
+ camus-schema-registry
+ 0.3.0-SNAPSHOT
+ test-jar
+ test
+
+
+ log4j
+ log4j
+ 1.2.17
+ provided
+
+
+ org.apache.avro
+ avro
+ 1.7.3
+
+
+ org.apache.avro
+ avro-mapred
+ 1.7.3
+
+
+ org.apache.avro
+ 1.7.4-SNAPSHOT
+ avro-repo-bundle
+
+
+ joda-time
+ joda-time
+ 1.6
+
+
+ org.apache.hadoop
+ hadoop-client
+ 2.5.0-mr1-cdh5.3.3
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.5.0-cdh5.3.3
+
+
+ org.apache.kafka
+ kafka_2.10
+ 0.8.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ log4j
+ log4j
+
+
+ com.sun.jmx
+ jmxri
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+
+
+ com.github.sgroschupf
+ zkclient
+ 0.1
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.3.4
+ provided
+
+
+ junit
+ junit
+ 4.8.1
+ test
+
+
+ org.scala-lang
+ scala-library
+ 2.10.3
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.0.4.1
+
-
-
+
+
-
- camus-api
- camus-kafka-coders
- camus-etl-kafka
- camus-schema-registry
- camus-schema-registry-avro
- camus-example
- camus-sweeper
-
+
+ camus-api
+ camus-kafka-coders
+ camus-etl-kafka
+ camus-schema-registry
+ camus-schema-registry-avro
+ camus-example
+ camus-sweeper
+
-
-
- Apache License 2.0
- http://www.apache.org/licenses/LICENSE-2.0.html
- repo
-
-
+
+
+ Apache License 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.html
+ repo
+
+
-
- scm:git:git://github.com/linkedin/camus.git
- scm:git:git@github.com:linkedin/camus.git
- http://github.com/linkedin/camus/
-
+
+ scm:git:git://github.com/linkedin/camus.git
+ scm:git:git@github.com:linkedin/camus.git
+ http://github.com/linkedin/camus/
+
-
- github
- http://github.com/linkedin/camus/issues
-
+
+ github
+ http://github.com/linkedin/camus/issues
+
-
-
- apache-releases
- https://repository.apache.org/content/groups/public
-
-
+
+
+ apache-releases
+ https://repository.apache.org/content/groups/public
+
+
+ cloudera
+ https://repository.cloudera.com/artifactory/cloudera-repos/
+
+
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
- 2.2
-
-
-
- test-jar
-
-
-
-
-
- maven-install-plugin
- 2.3.1
- false
-
-
- install-kafka
- validate
-
- install-file
-
-
- kafka
- kafka
- 0.8-SNAPSHOT
- jar
- ${basedir}/lib/kafka-0.8-SNAPSHOT.jar
- ${basedir}/lib/kafka-0.8-SNAPSHOT.xml
-
-
-
- install-avro-repo-client
- validate
-
- install-file
-
-
- org.apache.avro
- avro-repo-bundle
- 1.7.4-SNAPSHOT
- jar
- ${basedir}/lib/avro-repo-bundle-1.7.4-SNAPSHOT-withdeps.jar
- ${basedir}/lib/avro-repo-bundle-1.7.4-SNAPSHOT-withdeps.xml
-
-
-
-
-
-
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.2
+
+
+
+ test-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ ${java.abi}
+ true
+ true
+
+
+
+
+ maven-install-plugin
+ 2.3.1
+ false
+
+
+ install-avro-repo-client
+ validate
+
+ install-file
+
+
+ org.apache.avro
+ avro-repo-bundle
+ 1.7.4-SNAPSHOT
+ jar
+ ${basedir}/lib/avro-repo-bundle-1.7.4-SNAPSHOT-withdeps.jar
+ ${basedir}/lib/avro-repo-bundle-1.7.4-SNAPSHOT-withdeps.xml
+
+
+
+
+
+