diff --git a/pom.xml b/pom.xml index 384c509..2673066 100644 --- a/pom.xml +++ b/pom.xml @@ -614,6 +614,7 @@ com.spotify docker-maven-plugin + 0.3.9 artifactory.corp.olacabs.com:5002/collector:${project.version} @@ -659,4 +660,4 @@ - \ No newline at end of file + diff --git a/src/main/java/io/rakam/presto/deserialization/DecoupleMessage.java b/src/main/java/io/rakam/presto/deserialization/DecoupleMessage.java index 80aff72..1aaa960 100644 --- a/src/main/java/io/rakam/presto/deserialization/DecoupleMessage.java +++ b/src/main/java/io/rakam/presto/deserialization/DecoupleMessage.java @@ -14,6 +14,7 @@ void read(T record, RecordData recordData) class RecordData { public String collection; + public String project; public int date; } } diff --git a/src/main/java/io/rakam/presto/deserialization/json/FabricJsonDeserializer.java b/src/main/java/io/rakam/presto/deserialization/json/FabricJsonDeserializer.java index 44d4270..89e6093 100644 --- a/src/main/java/io/rakam/presto/deserialization/json/FabricJsonDeserializer.java +++ b/src/main/java/io/rakam/presto/deserialization/json/FabricJsonDeserializer.java @@ -48,6 +48,7 @@ import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; import static com.fasterxml.jackson.core.JsonToken.END_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL; import static com.google.common.base.Preconditions.checkArgument; @@ -73,7 +74,7 @@ public class FabricJsonDeserializer implements JsonDeserializer { - private static final Set EXCLUDED_COLUMNS = ImmutableSet.of("_project", "_collection","_shard_time"); + private static final Set EXCLUDED_COLUMNS = ImmutableSet.of("_project", "_collection", "_shard_time"); private static final JsonFactory READER = new ObjectMapper().getFactory(); private final DatabaseHandler databaseHandler; private final FieldNameConfig fieldNameConfig; @@ -128,52 +129,56 @@ public void deserialize(JsonPageReader pageReader) JsonToken t = jp.nextToken(); if (t == JsonToken.START_OBJECT) { - t = jp.nextToken(); +// t = jp.nextToken(); } else { throw new IllegalArgumentException("Invalid json"); } - for (; ; t = jp.nextToken()) { + for (t = jp.nextToken(); t == FIELD_NAME; t = jp.nextToken()) { - if (JsonToken.FIELD_NAME.equals(t)) { + if ("metadata".equals(jp.getCurrentName())) { t = jp.nextToken(); + boolean foundCollection = false; + boolean foundProject = false; + + for (t = jp.nextToken(); t == FIELD_NAME; t = jp.nextToken()) { + String fieldData = jp.getCurrentName(); + + if (fieldData.equals("tenant")) { + jp.nextToken(); + foundProject = true; + project = jp.getValueAsString(); + if (foundCollection) { + jp.skipChildren(); + } + } + else if (fieldData.equals("schema")) { + jp.nextToken(); + foundCollection = true; + collection = checkCollectionValid(jp.getValueAsString()); + if (foundProject) { + jp.skipChildren(); + } + } + else { + jp.nextToken(); + jp.skipChildren(); + } + } } - if (jp.getCurrentName().equals("data")) { + else if (jp.getCurrentName().equals("data")) { + t = jp.nextToken(); if (t != START_OBJECT) { throw new IllegalArgumentException("data must be an object"); } propertiesBuffer = jp.readValueAs(TokenBuffer.class); - break; } - } - jp = propertiesBuffer.asParser(jp); - - t = jp.nextToken(); - //Extract project and collection - for (t = jp.nextToken(); t == JsonToken.FIELD_NAME; t = jp.nextToken()) { - if (project != null && collection != null) { - break; - } - - t = jp.nextToken(); - String fieldName = jp.getCurrentName(); - switch (fieldName) { - case "_project": - project = jp.getValueAsString(); - if (project == null || project.isEmpty()) { - throw new RuntimeException("Project can't be null"); - } - project = project.toLowerCase(); - break; - case "_collection": - collection = checkCollectionValid(jp.getValueAsString()); - break; - default: - // TODO: what to do? - break; + else { + jp.nextToken(); } } + jp = propertiesBuffer.asParser(jp); } private void parseProperties(PageReader pageReader) @@ -597,4 +602,4 @@ private FieldType getTypeForUnknown(String fieldName, JsonParser jp) throw new JsonMappingException(jp, format("The type is not supported: %s", jp.getValueAsString())); } } -} \ No newline at end of file +} diff --git a/src/main/java/io/rakam/presto/kafka/KafkaDecoupleMessage.java b/src/main/java/io/rakam/presto/kafka/KafkaDecoupleMessage.java index 743d2c3..fa42322 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaDecoupleMessage.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaDecoupleMessage.java @@ -23,7 +23,6 @@ import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static java.lang.String.format; public class KafkaDecoupleMessage implements DecoupleMessage> @@ -51,46 +50,63 @@ public void read(ConsumerRecord record, RecordData recordData) if (t != START_OBJECT) { throw new JsonParseException(parser, "Must be an object"); } + boolean foundCollection = false; + boolean foundProject = false; for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) { String rootFieldName = parser.getCurrentName(); - if (!"data".equals(rootFieldName)) { - parser.nextToken(); - parser.skipChildren(); - continue; - } - t = parser.nextToken(); - if (t != START_OBJECT) { - throw new JsonParseException(parser, "Data object must be an object"); - } - boolean foundCollection = false; - boolean foundDate = false; - for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) { - String fieldData = parser.getCurrentName(); - if (fieldData.equals(timeColumn)) { - recordData.date = findData(parser); - if (foundCollection) { - return; + if ("metadata".equals(rootFieldName)) { + t = parser.nextToken(); + + for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) { + String fieldData = parser.getCurrentName(); + + if (fieldData.equals("schema")) { + parser.nextToken(); + recordData.project = parser.getValueAsString(); + foundProject = true; + if (foundCollection) { + parser.skipChildren(); + } } - foundDate = true; - } - else if (fieldData.equals("_collection")) { - parser.nextToken(); - recordData.collection = parser.getValueAsString(); - if (foundDate) { - return; + else if (fieldData.equals("tenant")) { + parser.nextToken(); + recordData.collection = parser.getValueAsString(); + foundCollection = true; + if (foundProject) { + parser.skipChildren(); + } + } + else { + parser.nextToken(); + parser.skipChildren(); } - foundCollection = true; - } - else { - parser.nextToken(); - parser.skipChildren(); } } + else if ("data".equals(rootFieldName)) { + t = parser.nextToken(); + if (t != START_OBJECT) { + throw new JsonParseException(parser, "Data object must be an object"); + } - throw new JsonParseException(parser, format("Event time property `%s` doesn't exist in JSON", timeColumn)); + for (t = parser.nextToken(); t == FIELD_NAME; t = parser.nextToken()) { + String fieldData = parser.getCurrentName(); + if (fieldData.equals(timeColumn)) { + recordData.date = findData(parser); + if (foundCollection) { + return; + } + } + else { + parser.nextToken(); + parser.skipChildren(); + } + } + } + else { + parser.nextToken(); + } } - throw new JsonParseException(parser, "data property doesn't exist in JSON"); } public int findData(JsonParser parser) diff --git a/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java b/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java index b6c0dfd..69cb919 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java @@ -334,4 +334,4 @@ private static class LongHolder { long value; } -} \ No newline at end of file +} diff --git a/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java b/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java index 88b7718..77a0df7 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java @@ -94,13 +94,17 @@ public class KafkaRealTimeWorker private int outdatedRecordIndex; @Inject - public KafkaRealTimeWorker(KafkaConfig config, MemoryTracker memoryTracker, FieldNameConfig fieldNameConfig, Optional historicalDataHandler, DecoupleMessage decoupleMessage, MiddlewareConfig middlewareConfig, StreamWorkerContext> context, TargetConnectorCommitter committer) + public KafkaRealTimeWorker(KafkaConfig config, MemoryTracker memoryTracker, FieldNameConfig fieldNameConfig, + Optional historicalDataHandler, DecoupleMessage decoupleMessage, + MiddlewareConfig middlewareConfig, StreamWorkerContext> context, + TargetConnectorCommitter committer) { this.config = config; this.context = context; this.decoupleMessage = decoupleMessage; Set whitelistedCollections = fieldNameConfig.getWhitelistedCollections(); - this.whiteListCollections = whitelistedCollections == null ? input -> true : input -> whitelistedCollections.contains(input); + this.whiteListCollections = + whitelistedCollections == null ? input -> true : input -> whitelistedCollections.contains(input); this.outdatedRecordIndex = config.getOutdatedDayIndex(); this.historicalDataHandler = historicalDataHandler.orNull(); this.committer = committer; @@ -127,12 +131,14 @@ public void start() if (log.isDebugEnabled()) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { - String message = statusSpentTime.entrySet().stream().sorted(Comparator.comparingLong(o -> -o.getValue().value)) - .map(entry -> entry.getKey().name() + ":" + Duration.succinctDuration(entry.getValue().value, MILLISECONDS).toString()) - .collect(Collectors.joining(", ")); - message += format(" %s (%s%%) memory available", - succinctBytes(memoryTracker.availableMemory()).toString(), - memoryTracker.availableMemoryInPercentage() * 100); + String message = + statusSpentTime.entrySet().stream().sorted(Comparator.comparingLong(o -> -o.getValue().value)) + .map(entry -> entry.getKey().name() + ":" + Duration + .succinctDuration(entry.getValue().value, MILLISECONDS).toString()) + .collect(Collectors.joining(", ")); + message += + format(" %s (%s%%) memory available", succinctBytes(memoryTracker.availableMemory()).toString(), + memoryTracker.availableMemoryInPercentage() * 100); log.debug(message); }, 5, 5, SECONDS); } @@ -193,7 +199,8 @@ public void execute() changeType(Status.WAITING_FOR_MEMORY); try { - log.info("Not enough memory (%s)to process records sleeping for 1s", memoryTracker.availableMemoryInPercentage()); + log.info("Not enough memory (%s)to process records sleeping for 1s", + memoryTracker.availableMemoryInPercentage()); SECONDS.sleep(1); } catch (InterruptedException e) { @@ -245,11 +252,13 @@ private void flushDataSafe() changeType(Status.FLUSHING_MIDDLEWARE); long now = System.currentTimeMillis(); - log.debug("Flushing %s records of size (%s) from stream buffer, it's been %s since last flush.", buffer.getTotalRecords(), - DataSize.succinctBytes(buffer.getTotalBytes()).toString(), - Duration.succinctDuration(now - buffer.getPreviousFlushTimeMillisecond(), MILLISECONDS).toString()); + log.debug("Flushing %s records of size (%s) from stream buffer, it's been %s since last flush.", + buffer.getTotalRecords(), DataSize.succinctBytes(buffer.getTotalBytes()).toString(), + Duration.succinctDuration(now - buffer.getPreviousFlushTimeMillisecond(), MILLISECONDS) + .toString()); - Map.Entry>, CompletableFuture> extractedData = extract(records); + Map.Entry>, CompletableFuture> extractedData = + extract(records); Iterable> realTimeRecords = extractedData.getKey(); CompletableFuture historicalDataAction = extractedData.getValue(); @@ -259,9 +268,11 @@ private void flushDataSafe() buffer.clear(); - middlewareBuffer.add(new BatchRecords(data, historicalDataAction, () -> commitSyncOffset(consumer, latestOffsets))); + middlewareBuffer.add(new BatchRecords(data, historicalDataAction, + () -> commitSyncOffset(consumer, latestOffsets))); - long totalDataSize = data.entrySet().stream().mapToLong(e -> e.getValue().page.getRetainedSizeInBytes()).sum(); + long totalDataSize = + data.entrySet().stream().mapToLong(e -> e.getValue().page.getRetainedSizeInBytes()).sum(); log.debug("Flushed records to middleware buffer in %s, the data size is %s", Duration.succinctDuration(System.currentTimeMillis() - now, MILLISECONDS).toString(), succinctBytes(totalDataSize)); @@ -271,8 +282,8 @@ private void flushDataSafe() Map> map = middlewareBuffer.getRecordsToBeFlushed(); if (!map.isEmpty()) { changeType(Status.FLUSHING_MIDDLEWARE); - KafkaUtil.flush(map, committer, checkpointQueue, memoryTracker, - log, databaseFlushStats, databaseFlushDistribution, realTimeRecordsStats, errorStats); + KafkaUtil.flush(map, committer, checkpointQueue, memoryTracker, log, databaseFlushStats, + databaseFlushDistribution, realTimeRecordsStats, errorStats); } } @@ -286,7 +297,8 @@ private void flushDataSafe() } } - private Map.Entry>, CompletableFuture> extract(BasicMemoryBuffer>.Records records) + private Map.Entry>, CompletableFuture> extract( + BasicMemoryBuffer>.Records records) { CompletableFuture historicalDataAction = BatchRecords.COMPLETED_FUTURE; ProcessedRecords processedRecords = processRecords(records); @@ -294,18 +306,21 @@ private Map.Entry>, CompletableFuture> realTimeRecords; if (processedRecords.recordsIndexedByDay.isEmpty()) { - realTimeRecords = () -> Iterators.filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords)); + realTimeRecords = () -> Iterators + .filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords)); totalRecords = processedRecords.realTimeRecordCount; historicalDataAction = BatchRecords.COMPLETED_FUTURE; realTimeRecordsStats.update(totalRecords); historicalRecordsStats.update(0); } else { - realTimeRecords = () -> Iterators.filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords)); + realTimeRecords = () -> Iterators + .filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords)); historicalRecordCount = totalRecords - processedRecords.realTimeRecordCount; - Iterable> filter = () -> Iterators.filter(records.buffer.iterator(), new NegateBitMapRecordPredicate(processedRecords.bitmapForRecords)); + Iterable> filter = () -> Iterators.filter(records.buffer.iterator(), + new NegateBitMapRecordPredicate(processedRecords.bitmapForRecords)); changeType(Status.FLUSHING_HISTORICAL); if (historicalDataHandler != null) { @@ -318,7 +333,8 @@ private Map.Entry>, CompletableFuture(realTimeRecords, historicalDataAction); } @@ -334,21 +350,24 @@ private ProcessedRecords processRecords(BasicMemoryBuffer= (todayInDate - outdatedRecordIndex) && dayOfRecord <= todayInDate)) { + if (historicalDataHandler == null || (dayOfRecord >= (todayInDate - outdatedRecordIndex) + && dayOfRecord <= todayInDate)) { bitmapForRecords[i] = true; realtimeRecordCount++; } @@ -447,7 +466,12 @@ public Map getStatusSpentTime() private enum Status { - POLLING, FLUSHING_STREAM, FLUSHING_MIDDLEWARE, CHECKPOINTING, FLUSHING_HISTORICAL, WAITING_FOR_MEMORY; + POLLING, + FLUSHING_STREAM, + FLUSHING_MIDDLEWARE, + CHECKPOINTING, + FLUSHING_HISTORICAL, + WAITING_FOR_MEMORY; } private static class ProcessedRecords @@ -456,7 +480,8 @@ private static class ProcessedRecords public final boolean[] bitmapForRecords; public final int realTimeRecordCount; - public ProcessedRecords(Int2ObjectArrayMap recordsIndexedByDay, boolean[] bitmapForRecords, int realTimeRecordCount) + public ProcessedRecords(Int2ObjectArrayMap recordsIndexedByDay, boolean[] bitmapForRecords, + int realTimeRecordCount) { this.recordsIndexedByDay = recordsIndexedByDay; this.bitmapForRecords = bitmapForRecords; diff --git a/src/main/java/io/rakam/presto/kafka/KafkaUtil.java b/src/main/java/io/rakam/presto/kafka/KafkaUtil.java index 0cafd15..44b2f12 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaUtil.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaUtil.java @@ -166,7 +166,7 @@ public static void flush( } } else { - log.debug("Saved data in buffer (%s - %d records) for collection %s in %s.", + log.info("Saved data in buffer (%s - %d records) for collection %s in %s.", succinctBytes(totalDataSize).toString(), totalRecordCount, entry.getKey().toString(), totalDuration.toString()); diff --git a/src/test/java/io/rakam/presto/KafkaProducerTest.java b/src/test/java/io/rakam/presto/KafkaProducerTest.java index 6280a18..d18785c 100644 --- a/src/test/java/io/rakam/presto/KafkaProducerTest.java +++ b/src/test/java/io/rakam/presto/KafkaProducerTest.java @@ -23,7 +23,7 @@ public static void main(String[] args) throws Exception { String timestampFormat = "yyyy-MM-dd'T'HH:mm:ss.S'Z'"; - int topicCount = 100; + int topicCount = 10; DateTime beginTime = new DateTime().minusDays(60); DateTime endTime = new DateTime(); Random random = new Random(10000); @@ -46,8 +46,6 @@ public static void main(String[] args) " \"sender\": null\n" + " },\n" + " \"data\": {\n" + - " \"_collection\": \"dapi_pushmessage_event_from_app_rakam\",\n" + - " \"_project\": \"dapi\",\n" + " \"event\": \"PushMessage\",\n" + " \"_actor\": \"8609050301499\",\n" + " \"imei\": \"8609050301499\",\n" + @@ -70,8 +68,7 @@ public static void main(String[] args) ObjectNode node = (ObjectNode) mapper.readTree(fabricEvent); ObjectNode data = (ObjectNode) node.get("data"); - //Assign topicName to string variable - String topicName = "presto_test_11"; + String topicName = "presto_test_9"; // create instance for properties to access producer configs Properties props = new Properties(); @@ -104,20 +101,23 @@ public static void main(String[] args) boolean flag = true; - for (int i = 0; i >= 0; i++) { + for (int i = 0; i <= 100; i++) { int randInt = random.nextInt(); int index = Math.abs(randInt) % topicCount; Timestamp randomDate = new Timestamp(ThreadLocalRandom.current().nextLong(beginTime.getMillis(), endTime.getMillis())); data.put("_actor", String.valueOf(randInt)); - data.put("_collection", collection + "_" + index); - data.put("_time", randomDate.toString()); + data.put("_collection", collection + "_" + Math.abs(random.nextInt(20) + 1)); + data.put("_time", endTime.toString()); data.put("_shard_time", randomDate.toString()); node.put("data", data); producer.send(new ProducerRecord(topicName, Integer.toString(randInt), mapper.writeValueAsString(node))); - Thread.sleep(1); +// Thread.sleep(1); + if (i % 100 == 0) { + System.out.println("produced:" + i); + } } System.out.println("Message sent successfully"); diff --git a/src/test/java/io/rakam/presto/TestKafkaFabricJsonDeserializer.java b/src/test/java/io/rakam/presto/TestKafkaFabricJsonDeserializer.java index 6888ee4..ce4d59d 100644 --- a/src/test/java/io/rakam/presto/TestKafkaFabricJsonDeserializer.java +++ b/src/test/java/io/rakam/presto/TestKafkaFabricJsonDeserializer.java @@ -43,11 +43,15 @@ protected ImmutableList> getSampleData() { ImmutableList.Builder> builder = ImmutableList.builder(); for (int i = 0; i < ITERATION_COUNT; i++) { - ConsumerRecord record = new ConsumerRecord<>("test", -1, -1, new byte[] {}, JsonHelper.encodeAsBytes(ImmutableMap.of("data", ImmutableMap.of( - "_project", "testproject", - "_collection", "testcollection", + ImmutableMap metadata = ImmutableMap.of( + "schema", "testcollection", + "tenant", "testproject"); + ImmutableMap data = ImmutableMap.of( "newcolumn1", "test1", - "newcolumn2", "test2")))); + "newcolumn2", "test2"); + ConsumerRecord record = new ConsumerRecord<>("test", -1, -1, new byte[] {}, JsonHelper.encodeAsBytes(ImmutableMap.of( + "metadata", metadata, + "data", data))); builder.add(record); } return builder.build(); @@ -56,7 +60,8 @@ protected ImmutableList> getSampleData() @Override protected ConsumerRecord getDuplicateFieldRecord() { - byte[] data = "{\"data\": {\"_project\": \"testproject\", \"_collection\": \"testcollection\", \"testcolumn\": \"1\", \"testcolumn\": \"2\"}}".getBytes(StandardCharsets.UTF_8); + + byte[] data = "{\"metadata\":{\"schema\":\"testcollection\",\"tenant\":\"testproject\"},\"data\":{\"testcolumn\": \"1\", \"testcolumn\": \"2\"}}".getBytes(StandardCharsets.UTF_8); ConsumerRecord record = new ConsumerRecord<>("test", -1, -1, new byte[] {}, data); return record; } @@ -65,7 +70,7 @@ protected ConsumerRecord getDuplicateFieldRecord() public void testOrdering() throws IOException { - byte[] data = "{\"data\": {\"testcolumn\": \"1\", \"testcolumn\": \"2\", \"_project\": \"testproject\", \"_collection\": \"testcollection\"}}".getBytes(StandardCharsets.UTF_8); + byte[] data = "{\"data\":{\"testcolumn\": \"1\", \"testcolumn\": \"2\"},\"metadata\":{\"schema\":\"testcollection\",\"tenant\":\"testproject\"}}".getBytes(StandardCharsets.UTF_8); ConsumerRecord record = new ConsumerRecord<>("test", -1, -1, new byte[] {}, data); TestDatabaseHandler databaseHandler = new TestDatabaseHandler(); @@ -100,17 +105,18 @@ public List> getRecordsForEvents(String project, } } } - - ImmutableMap.Builder data = ImmutableMap.builder() - .put("_project", project) - .put("_collection", collection); - + ImmutableMap.Builder data = ImmutableMap.builder(); + ImmutableMap metadata = ImmutableMap.of( + "schema", collection, + "tenant", project); event.forEach((s, o) -> data.put(s, o)); - ConsumerRecord record = new ConsumerRecord<>("test", -1, -1, new byte[] {}, JsonHelper.encodeAsBytes(ImmutableMap.of("data", data.build()))); + ConsumerRecord record = new ConsumerRecord<>("test", -1, -1, new byte[] {}, JsonHelper.encodeAsBytes(ImmutableMap.of( + "metadata", metadata, + "data", data.build()))); builder.add(record); } - + //{"metadata":{"schema":"testcollection2","tenant":"testproject"},"data":{"colstring":"0"}} return builder.build(); } -} \ No newline at end of file +}