Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read project and collection from fabric metadata section #13

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>

<version>0.3.9</version>
<configuration>
<imageName>artifactory.corp.olacabs.com:5002/collector:${project.version}</imageName>
Expand Down Expand Up @@ -659,4 +660,4 @@
</repository>
</repositories>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ void read(T record, RecordData recordData)
class RecordData
{
public String collection;
public String project;
public int date;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -73,7 +74,7 @@
public class FabricJsonDeserializer
implements JsonDeserializer
{
private static final Set<String> EXCLUDED_COLUMNS = ImmutableSet.of("_project", "_collection","_shard_time");
private static final Set<String> EXCLUDED_COLUMNS = ImmutableSet.of("_project", "_collection", "_shard_time");
private static final JsonFactory READER = new ObjectMapper().getFactory();
private final DatabaseHandler databaseHandler;
private final FieldNameConfig fieldNameConfig;
Expand Down Expand Up @@ -128,52 +129,57 @@ public void deserialize(JsonPageReader pageReader)

JsonToken t = jp.nextToken();
if (t == JsonToken.START_OBJECT) {
t = jp.nextToken();
// t = jp.nextToken();
}
else {
throw new IllegalArgumentException("Invalid json");
}
for (; ; t = jp.nextToken()) {
for (t = jp.nextToken(); t == FIELD_NAME; t = jp.nextToken()) {

if (JsonToken.FIELD_NAME.equals(t)) {
if ("metadata".equals(jp.getCurrentName())) {
t = jp.nextToken();
boolean foundCollection = false;
boolean foundProject = false;

for (t = jp.nextToken(); t == FIELD_NAME; t = jp.nextToken()) {
String fieldData = jp.getCurrentName();

if (fieldData.equals("tenant")) {
jp.nextToken();
foundProject = true;
project = jp.getValueAsString();
if (foundCollection) {
jp.skipChildren();
}
}
else if (fieldData.equals("schema")) {
jp.nextToken();
foundCollection = true;
collection = checkCollectionValid(jp.getValueAsString());
if (foundProject) {
jp.skipChildren();
}
}
else {
jp.nextToken();
jp.skipChildren();
}
}
}

if (jp.getCurrentName().equals("data")) {
else if (jp.getCurrentName().equals("data")) {
t = jp.nextToken();
if (t != START_OBJECT) {
throw new IllegalArgumentException("data must be an object");
}
propertiesBuffer = jp.readValueAs(TokenBuffer.class);
break;
}
}
jp = propertiesBuffer.asParser(jp);

t = jp.nextToken();
//Extract project and collection
for (t = jp.nextToken(); t == JsonToken.FIELD_NAME; t = jp.nextToken()) {
if (project != null && collection != null) {
break;
}

t = jp.nextToken();
String fieldName = jp.getCurrentName();
switch (fieldName) {
case "_project":
project = jp.getValueAsString();
if (project == null || project.isEmpty()) {
throw new RuntimeException("Project can't be null");
}
project = project.toLowerCase();
break;
case "_collection":
collection = checkCollectionValid(jp.getValueAsString());
break;
default:
// TODO: what to do?
break;
else {
jp.nextToken();
}
}
jp = propertiesBuffer.asParser(jp);
}

private void parseProperties(PageReader pageReader)
Expand Down Expand Up @@ -597,4 +603,4 @@ private FieldType getTypeForUnknown(String fieldName, JsonParser jp)
throw new JsonMappingException(jp, format("The type is not supported: %s", jp.getValueAsString()));
}
}
}
}
80 changes: 48 additions & 32 deletions src/main/java/io/rakam/presto/kafka/KafkaDecoupleMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
import static java.lang.String.format;

public class KafkaDecoupleMessage
implements DecoupleMessage<ConsumerRecord<byte[], byte[]>>
Expand Down Expand Up @@ -51,46 +50,63 @@ public void read(ConsumerRecord<byte[], byte[]> record, RecordData recordData)
if (t != START_OBJECT) {
throw new JsonParseException(parser, "Must be an object");
}
boolean foundCollection = false;
boolean foundProject = false;
for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) {
String rootFieldName = parser.getCurrentName();
if (!"data".equals(rootFieldName)) {
parser.nextToken();
parser.skipChildren();
continue;
}
t = parser.nextToken();
if (t != START_OBJECT) {
throw new JsonParseException(parser, "Data object must be an object");
}

boolean foundCollection = false;
boolean foundDate = false;
for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) {
String fieldData = parser.getCurrentName();
if (fieldData.equals(timeColumn)) {
recordData.date = findData(parser);
if (foundCollection) {
return;
if ("metadata".equals(rootFieldName)) {
t = parser.nextToken();

for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) {
String fieldData = parser.getCurrentName();

if (fieldData.equals("schema")) {
parser.nextToken();
recordData.project = parser.getValueAsString();
foundProject = true;
if (foundCollection) {
parser.skipChildren();
}
}
foundDate = true;
}
else if (fieldData.equals("_collection")) {
parser.nextToken();
recordData.collection = parser.getValueAsString();
if (foundDate) {
return;
else if (fieldData.equals("tenant")) {
parser.nextToken();
recordData.collection = parser.getValueAsString();
foundCollection = true;
if (foundProject) {
parser.skipChildren();
}
}
else {
parser.nextToken();
parser.skipChildren();
}
foundCollection = true;
}
else {
parser.nextToken();
parser.skipChildren();
}
}
else if ("data".equals(rootFieldName)) {
t = parser.nextToken();
if (t != START_OBJECT) {
throw new JsonParseException(parser, "Data object must be an object");
}

throw new JsonParseException(parser, format("Event time property `%s` doesn't exist in JSON", timeColumn));
for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) {
String fieldData = parser.getCurrentName();
if (fieldData.equals(timeColumn)) {
recordData.date = findData(parser);
if (foundCollection) {
return;
}
}
else {
parser.nextToken();
parser.skipChildren();
}
}
}
else {
parser.nextToken();
}
}
throw new JsonParseException(parser, "data property doesn't exist in JSON");
}

public int findData(JsonParser parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private Optional<Queue<List<MiddlewareBuffer.TableCheckpoint>>> flushDataSafe()
changeType(Status.FLUSHING_MIDDLEWARE);

long now = System.currentTimeMillis();
log.debug("Flushing records (%s) from stream buffer, it's been %s since last flush.",
log.info("Flushing records (%s) from stream buffer, it's been %s since last flush.",
DataSize.succinctBytes(buffer.getTotalBytes()).toString(),
Duration.succinctDuration(now - buffer.getPreviousFlushTimeMillisecond(), MILLISECONDS).toString());

Expand All @@ -244,7 +244,7 @@ private Optional<Queue<List<MiddlewareBuffer.TableCheckpoint>>> flushDataSafe()
}

long totalDataSize = data.entrySet().stream().mapToLong(e -> e.getValue().page.getRetainedSizeInBytes()).sum();
log.debug("Flushed records to middleware buffer in %s, the data size is %s",
log.info("Flushed records to middleware buffer in %s, the data size is %s",
Duration.succinctDuration(System.currentTimeMillis() - now, MILLISECONDS).toString(),
DataSize.succinctBytes(totalDataSize));

Expand Down Expand Up @@ -334,4 +334,4 @@ private static class LongHolder
{
long value;
}
}
}
Loading