Skip to content

Commit

Permalink
Adding 'chimpstorm'
Browse files Browse the repository at this point in the history
set of examples and utils for Infochimps
Storm+Trident flows.
  • Loading branch information
dieterichlawson committed Oct 9, 2013
1 parent a3bb6db commit 5690dce
Show file tree
Hide file tree
Showing 8 changed files with 544 additions and 0 deletions.
69 changes: 69 additions & 0 deletions chimpstorm/logback/cluster.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>

<configuration scan="true" scanPeriod="5 seconds">
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder><pattern>%7r %m [%30c{1}]%n</pattern></encoder>
</appender>

<appender name="A1" class="ch.qos.logback.core.ConsoleAppender">
<encoder><pattern>%7r %m [%30c{1}]%n</pattern></encoder>
</appender>

<!-- <appender name="A1" class="ch.qos.logback.core.FileAppender"> -->
<!-- <file>/tmp/chimpstorm-logs/storm.log</file> -->
<!-- <encoder><pattern>%d %-8r %m [%30c{1}]%n</pattern></encoder> -->
<!-- </appender> -->

<root level="INFO">
<appender-ref ref="A1"/>
</root>

<appender name="METRICS" class="ch.qos.logback.core.FileAppender">
<file>/tmp/chimpstorm-logs/metrics.log</file>
<encoder>
<pattern>%d %-8r %m%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="A1"/>
</root>

<logger name="backtype.storm.metric.LoggingMetricsConsumer" additivity="false" >
<level value="INFO"/>
<appender-ref ref="METRICS"/>
</logger>

<logger name="com.infochimps" additivity="false" >
<level value="DEBUG"/>
<appender-ref ref="CONSOLE"/> <!-- output this to the console and the log -->
</logger>

<logger name="chimpstorm" additivity="false" >
<level value="DEBUG"/>
<appender-ref ref="CONSOLE"/> <!-- output this to the console and the log -->
</logger>

<logger name="backtype.storm.daemon" additivity="false" >
<level value="INFO"/>
</logger>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="backtype.storm.zookeeper" additivity="false" ><level value="WARN"/></logger>
<logger name="com.netflix.curator" additivity="false" ><level value="WARN"/></logger>
<logger name="backtype.storm.utils.InstrumentedDisruptorQueue" additivity="false" >
<level value="INFO"/>
</logger>
<logger name="storm.trident.topology.MasterBatchCoordinator" additivity="false" >
<level value="INFO"/>
</logger>
<logger name="storm.trident.topology.TridentBoltExecutor" additivity="false" >
<level value="INFO"/>
</logger>
<logger name="storm.trident.spout.TridentSpoutCoordinator" additivity="false" >
<level value="INFO"/>
</logger>
<logger name="storm.trident.testing.VisibleMemoryMapState" additivity="false">
<level value="DEBUG"/>
<appender-ref ref="CONSOLE"/> <!-- output this to the console and the log -->
</logger>
</configuration>
34 changes: 34 additions & 0 deletions chimpstorm/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
(defproject chimpstorm "0.9.0-wip21-ics"
:java-source-paths ["src/java"]
:resource-paths ["src/java"]
:aot :all
:repositories {
"central-1" "http://repo1.maven.org/maven1"
"central-2" "http://repo1.maven.org/maven2"
"clojars" "http://clojars.org/repo/"
"infochimps-releases" "https://s3.amazonaws.com/artifacts.chimpy.us/maven-s3p/releases"
"infochimps-snapshots" "https://s3.amazonaws.com/artifacts.chimpy.us/maven-s3p/snapshots"
"cloudera" "https://repository.cloudera.com/artifactory/cloudera-repos/"
"github-releases" "http://oss.sonatype.org/content/repositories/github-releases/"
"twitter4j" "http://twitter4j.org/maven2"
}

:dependencies [
[commons-collections/commons-collections "3.2.1"]
[com.fasterxml.jackson.core/jackson-databind "2.2.0"]
[org.apache.kafka/kafka-core "0.7.1-incubating"]
[com.amazonaws/aws-java-sdk "1.3.27"]
]

:profiles {:dev
{ :resource-paths ["conf"]
:dependencies [[storm/storm-core "0.9.0-wip21-ics"]
[com.infochimps/storm-util "1.7.0-SNAPSHOT" :exclusions [[org.scala-lang/scala-library]]]
[junit/junit "3.8.1" :scope "test" ]
[org.testng/testng "6.8" ]
[org.mockito/mockito-all "1.9.0"]
[org.easytesting/fest-assert-core "2.0M8"]
[org.clojure/clojure "1.4.0"]
]}}
:min-lein-version "2.0.0"
)
119 changes: 119 additions & 0 deletions chimpstorm/src/java/chimpstorm/examples/github/GithubTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package chimpstorm.examples.github;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.*;
import storm.trident.operation.builtin.*;
import storm.trident.testing.VisibleMemoryMapState;
import storm.trident.testing.Tap;

import chimpstorm.storm.trident.operations.JsonParse;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import com.infochimps.storm.trident.spout.FileBlobStore;
import com.infochimps.storm.trident.spout.IBlobStore;
import com.infochimps.storm.trident.spout.OpaqueTransactionalBlobSpout;
import com.infochimps.storm.trident.spout.StartPolicy;

import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.databind.*;

import java.util.*;
import java.io.IOException;

import com.infochimps.wukong.state.WuEsState;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class GithubTopology {
public static class ExtractLanguageCommits extends BaseFunction {
private static final Logger LOG = LoggerFactory.getLogger(ExtractLanguageCommits.class);

public void execute(TridentTuple tuple, TridentCollector collector){
JsonNode node = (JsonNode) tuple.getValue(0);
if(!node.get("type").toString().equals("\"PushEvent\"")) return;
List values = new ArrayList(2);
//grab the language and the action
values.add(node.get("repository").get("language").asText());
values.add(node.get("payload").get("size").asLong());
collector.emit(values);
return;
}
}

/*
public static class Sum extends BaseAggregator<Accumulator> {
static class Accumulator {
long val = 0;
}
public Accumulator init(Object batchId, TridentCollector collector){
return new Accumulator();
}
public void aggregate(Accumulator acc, TridentTuple tuple, TridentCollector collector){
acc.val += tuple.getLong(0);
}
public void complete(Accumulator acc, TridentCollector collector){
collector.emit(new Values(acc.val));
}
}*/

/*
* Create and run the Github topology
* The topology:
* 1) reads the event stream from the github spout
* 2) parses the JSON
* 3) Extracts the langauge and commit count from the JSON
* 4) Groups by language
* 5) ... TBD ...
*/
public static void main(String[] args) throws Exception, InvalidTopologyException {
IBlobStore bs = new FileBlobStore("/Users/dlaw/dev/github-data/test-data");
OpaqueTransactionalBlobSpout spout = new OpaqueTransactionalBlobSpout(bs, StartPolicy.EARLIEST, null);

WuEsState.Options esOptions = new WuEsState.Options();
esOptions.clusterName = "elasticsearch_dlaw";

TridentTopology topology = new TridentTopology();
topology.newStream("github-activities", spout)
.each(new Fields("line"), new JsonParse(), new Fields("parsed-json"))
// .each(new Fields("parsed-json"), new Tap())
.each(new Fields("parsed-json"), new ExtractLanguageCommits(), new Fields("language", "commits"))
// .each(new Fields("language","commits"), new Tap())
.groupBy(new Fields("language"))
// .persistentAggregate(new VisibleMemoryMapState.Factory(), new Fields("commits"), new Sum(), new Fields("commit-sum"));
.persistentAggregate(
new WuEsState.OpqFactory(Arrays.asList(new InetSocketTransportAddress("localhost", 9300)), esOptions),
new Fields("commits"), new Sum(), new Fields("commit-sum")
);
// .newValuesStream()
// .each(new Fields("language","commit-sum"), new Tap());

Config conf = new Config();
// Process one batch at a time, waiting 2 seconds between, and a 5 second batch timeout
//conf.setMaxSpoutPending(4);
conf.setMessageTimeoutSecs(3);
//conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
// conf.put(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 2000);
System.out.println("Topology created");
//if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("lang-counter", conf, topology.build());
//} else {
// conf.setNumWorkers(3);
// StormSubmitter.submitTopology(args[0], conf, topology.build());
//}
}
}
130 changes: 130 additions & 0 deletions chimpstorm/src/java/chimpstorm/examples/github/GithubTopology.java~
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package chimpstorm.examples.github;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.*;
import storm.trident.operation.builtin.*;
import storm.trident.testing.VisibleMemoryMapState;
//import storm.starter.trident.InstrumentedMemoryMapState;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.infochimps.storm.trident.spout.FileBlobStore;
import com.infochimps.storm.trident.spout.IBlobStore;
import com.infochimps.storm.trident.spout.OpaqueTransactionalBlobSpout;
import com.infochimps.storm.trident.spout.StartPolicy;

import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.databind.*;

import java.util.*;
import java.io.IOException;

public class GithubTopology {
/*
* Identity filter that logs a tuple.
* Useful for logging every tuple in a stream.
*/
@SuppressWarnings({ "serial", "rawtypes" })
public static class LogTuple extends BaseFilter {
private static final Logger LOG = LoggerFactory.getLogger(LogTuple.class);
// just print the tuple
@Override
public boolean isKeep(TridentTuple tuple){
LOG.info(tuple.toString());
return true;
}
}
public static class PrintAndPass extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector){
System.out.println(tuple);
collector.emit(null);
}
}
public static class GithubJsonExtract extends BaseFunction {
//Jackson JSON parser
private final static ObjectMapper jsonMapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(GithubJsonExtract.class);

public void execute(TridentTuple tuple, TridentCollector collector){
try {
JsonNode node = jsonMapper.readTree(tuple.getString(0));
if(! node.get("type").toString().equals("\"PushEvent\"")) return;
List values = new ArrayList(2);
//grab the language and the action
values.add(node.get("repository").get("language"));
values.add(node.get("payload").get("size"));
collector.emit(values);
} catch(JsonProcessingException jpEx){
LOG.error("Error parsing JSON",jpEx);
} catch (IOException ioEx) {
LOG.error("IO Error",ioEx);
}
return;
}
}
/*
public static class Sum extends BaseAggregator<Accumulator> {
static class Accumulator {
long val = 0;
}

public Accumulator init(Object batchId, TridentCollector collector){
return new Accumulator();
}

public void aggregate(Accumulator acc, TridentTuple tuple, TridentCollector collector){
acc.val += tuple.getLong(0);
}

public void complete(Accumulator acc, TridentCollector collector){
collector.emit(new Values(acc.val));
}
}*/

/*
* Create and run the Github topology
* The topology:
* 1) reads the event stream from the github spout
* 2) Extracts the langauge and action from the JSON
* 3) Groups by language
* 4) ... TBD ...
*/
public static void main(String[] args) throws Exception, InvalidTopologyException {
IBlobStore bs = new FileBlobStore("/Users/dlaw/dev/github-data/test-data");
OpaqueTransactionalBlobSpout spout = new OpaqueTransactionalBlobSpout(bs, StartPolicy.EARLIEST, null);

TridentTopology topology = new TridentTopology();
topology.newStream("github-activities", spout)
.parallelismHint(1)
.each(new Fields("line"), new GithubJsonExtract(), new Fields("language","commits"))
// .each(new Fields("language","commits"), new LogTuple())
.groupBy(new Fields("language"))
.persistentAggregate(new VisibleMemoryMapState.Factory(),
new Count(), new Fields("commit-sum"))
.newValuesStream()
.each(new Fields("language","commit-sum"), new LogTuple());

// .partitionAggregate(new Fields("commits"), new Count(), new Fields("commit-sum"))

Config conf = new Config();
conf.setMessageTimeoutSecs(10);
System.out.println("Topology created");
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("lang-counter", conf, topology.build());
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, topology.build());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package chimpstorm.storm.trident.operations;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.infochimps.storm.trident.util.JsonUtil.*;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonParse extends BaseFunction {

private final static Logger LOG = LoggerFactory.getLogger(JsonParse.class);
private final static ObjectMapper jsonMapper = new ObjectMapper();

public void execute(TridentTuple tuple, TridentCollector collector){
try {
JsonNode node = jsonMapper.readTree(tuple.getString(0));
List values = new ArrayList(1);
values.add(node);
collector.emit(values);
} catch(IOException ioEx) {
LOG.error("Error parsing json", ioEx);
}
return;
}
}
Loading

0 comments on commit 5690dce

Please sign in to comment.