diff --git a/Dockerfile b/Dockerfile index 4c9f50e..3734fb5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ RUN useradd -ms /bin/bash rakam ARG CACHEBUST=1 COPY src/main/resources/config_* /home/rakam/ +COPY log.properties /home/rakam COPY *.sh /home/rakam/ COPY target/rakam-data-collector.jar /home/rakam diff --git a/log.properties b/log.properties index 5ee22d0..8b13789 100644 --- a/log.properties +++ b/log.properties @@ -1 +1 @@ -io.rakam=DEBUG + diff --git a/src/main/java/com/facebook/presto/raptor/storage/InMemoryFileSystem.java b/src/main/java/com/facebook/presto/raptor/storage/InMemoryFileSystem.java index 1da121f..14c1600 100644 --- a/src/main/java/com/facebook/presto/raptor/storage/InMemoryFileSystem.java +++ b/src/main/java/com/facebook/presto/raptor/storage/InMemoryFileSystem.java @@ -24,7 +24,6 @@ public final class InMemoryFileSystem extends FileSystem { - private final MemoryTracker memoryTracker; Map files; @@ -111,7 +110,6 @@ public void setWorkingDirectory(Path path) public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { - throw new UnsupportedOperationException(); } diff --git a/src/main/java/io/rakam/presto/BasicMemoryBuffer.java b/src/main/java/io/rakam/presto/BasicMemoryBuffer.java index 76e7979..b1c523c 100644 --- a/src/main/java/io/rakam/presto/BasicMemoryBuffer.java +++ b/src/main/java/io/rakam/presto/BasicMemoryBuffer.java @@ -4,12 +4,16 @@ package io.rakam.presto; +import io.airlift.log.Logger; + import java.util.ArrayList; import java.util.List; public class BasicMemoryBuffer implements MemoryBuffer { + private static final Logger log = Logger.get(BasicMemoryBuffer.class); + private final long millisecondsToBuffer; private final ArrayList buffer; private final ArrayList bulkBuffer; @@ -76,8 +80,8 @@ public void clear() public boolean shouldFlush() { - return ((System.currentTimeMillis() - previousFlushTimeMillisecond) >= getMillisecondsToBuffer()) - || memoryTracker.availableMemory() - (totalBytes * memoryMultiplier) < 0; + return ((System.currentTimeMillis() - previousFlushTimeMillisecond) >= getMillisecondsToBuffer() + || (memoryTracker.availableMemory() - (totalBytes * memoryMultiplier) < 0)); } public Records getRecords() diff --git a/src/main/java/io/rakam/presto/CommitterConfig.java b/src/main/java/io/rakam/presto/CommitterConfig.java new file mode 100644 index 0000000..6b7cedd --- /dev/null +++ b/src/main/java/io/rakam/presto/CommitterConfig.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Rakam Incorporation + */ + +package io.rakam.presto; + +import io.airlift.configuration.Config; + +public class CommitterConfig +{ + private int committerThreadCount = 2; + + public int getCommitterThreadCount() + { + return committerThreadCount; + } + + @Config("committer.thread.count") + public void setCommitterThreadCount(int committerThreadCount) + { + this.committerThreadCount = committerThreadCount; + } +} diff --git a/src/main/java/io/rakam/presto/MemoryTracker.java b/src/main/java/io/rakam/presto/MemoryTracker.java index f0cfe81..48d289a 100644 --- a/src/main/java/io/rakam/presto/MemoryTracker.java +++ b/src/main/java/io/rakam/presto/MemoryTracker.java @@ -4,14 +4,16 @@ package io.rakam.presto; +import io.airlift.log.Logger; import org.weakref.jmx.Managed; import java.util.concurrent.atomic.AtomicLong; public class MemoryTracker { + private static final Logger log = Logger.get(MemoryTracker.class); private static final long HEAP_MAX_SIZE = (long) (Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory()); - private static final double AVAILABLE_RATIO = .4; + private static final double AVAILABLE_RATIO = .7; private static final long AVAILABLE_HEAP_SIZE = (long) (HEAP_MAX_SIZE * AVAILABLE_RATIO); private AtomicLong reservedMemory; diff --git a/src/main/java/io/rakam/presto/MiddlewareBuffer.java b/src/main/java/io/rakam/presto/MiddlewareBuffer.java index 8fba924..7a8c0f2 100644 --- a/src/main/java/io/rakam/presto/MiddlewareBuffer.java +++ b/src/main/java/io/rakam/presto/MiddlewareBuffer.java @@ -6,6 +6,7 @@ import com.facebook.presto.spi.SchemaTableName; import com.google.common.collect.ImmutableSet; +import io.airlift.log.Logger; import io.rakam.presto.deserialization.TableData; import java.util.ArrayList; @@ -20,6 +21,8 @@ public class MiddlewareBuffer { + private static final Logger log = Logger.get(MiddlewareBuffer.class); + private final Map> batches; private final MiddlewareConfig config; private final MemoryTracker memoryTracker; @@ -71,6 +74,7 @@ public Map> getRecordsToBeFlushed() Set tablesToBeFlushed; if (memoryNeedsToBeAvailable > availableMemory) { + log.debug("memory needed (%s) is less than available memory (%s) flushing data ", memoryNeedsToBeAvailable, availableMemory); List> sortedTables = bufferSize.entrySet().stream() .sorted(Comparator.comparingLong(o -> -o.getValue().value)) .collect(Collectors.toList()); diff --git a/src/main/java/io/rakam/presto/ServiceStarter.java b/src/main/java/io/rakam/presto/ServiceStarter.java index d7a9bf0..2de51c5 100644 --- a/src/main/java/io/rakam/presto/ServiceStarter.java +++ b/src/main/java/io/rakam/presto/ServiceStarter.java @@ -19,6 +19,7 @@ import com.google.inject.multibindings.OptionalBinder; import io.airlift.bootstrap.Bootstrap; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.airlift.log.Level; import io.airlift.log.Logger; import io.airlift.log.Logging; import io.airlift.log.LoggingConfiguration; @@ -128,6 +129,7 @@ protected void setup(Binder binder) configBinder(binder).bindConfig(BackupConfig.class); configBinder(binder).bindConfig(FieldNameConfig.class); configBinder(binder).bindConfig(S3MiddlewareConfig.class); + configBinder(binder).bindConfig(CommitterConfig.class); configBinder(binder).bindConfig(MiddlewareConfig.class); binder.bind(StreamWorkerContext.class).in(Scopes.SINGLETON); binder.bind(TargetConnectorCommitter.class).in(Scopes.SINGLETON); diff --git a/src/main/java/io/rakam/presto/StreamConfig.java b/src/main/java/io/rakam/presto/StreamConfig.java index 3aba143..822206d 100644 --- a/src/main/java/io/rakam/presto/StreamConfig.java +++ b/src/main/java/io/rakam/presto/StreamConfig.java @@ -39,5 +39,4 @@ public void setMemoryMultiplier(String memoryMultiplier) this.memoryMultiplier = Integer.parseInt(memoryMultiplier); } } - } diff --git a/src/main/java/io/rakam/presto/TargetConnectorCommitter.java b/src/main/java/io/rakam/presto/TargetConnectorCommitter.java index 599eb92..414be29 100644 --- a/src/main/java/io/rakam/presto/TargetConnectorCommitter.java +++ b/src/main/java/io/rakam/presto/TargetConnectorCommitter.java @@ -26,9 +26,11 @@ public class TargetConnectorCommitter private final AsyncFailsafe executor; private AtomicInteger activeFlushCount = new AtomicInteger(); + @Inject - public TargetConnectorCommitter(DatabaseHandler databaseHandler) + public TargetConnectorCommitter(DatabaseHandler databaseHandler, CommitterConfig committerConfig) { + this.databaseHandler = databaseHandler; RetryPolicy retryPolicy = new RetryPolicy() @@ -38,7 +40,7 @@ public TargetConnectorCommitter(DatabaseHandler databaseHandler) .withMaxRetries(3); // executorPoolSize = Runtime.getRuntime().availableProcessors() * 2; - executorPoolSize = 2; + executorPoolSize = committerConfig.getCommitterThreadCount(); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(executorPoolSize, new ThreadFactoryBuilder().setNameFormat("target-committer").build()); diff --git a/src/main/java/io/rakam/presto/connector/raptor/RaptorDatabaseHandler.java b/src/main/java/io/rakam/presto/connector/raptor/RaptorDatabaseHandler.java index ccbf30b..f1a9fa3 100644 --- a/src/main/java/io/rakam/presto/connector/raptor/RaptorDatabaseHandler.java +++ b/src/main/java/io/rakam/presto/connector/raptor/RaptorDatabaseHandler.java @@ -165,8 +165,8 @@ public Connector create(String connectorId, Map config, Connecto ImmutableMap.Builder props = ImmutableMap.builder() .put("metadata.db.type", "mysql") .put("metadata.db.url", config.getMetadataUrl()) - .put("backup.threads", String.valueOf(Runtime.getRuntime().availableProcessors() * 3)) // No effect + .put("backup.threads", String.valueOf(Runtime.getRuntime().availableProcessors() * 3)) .put("storage.data-directory", Files.createTempDir().getAbsolutePath()) .put("metadata.db.connections.max", String.valueOf(config.getMaxConnection())) .put("backup.timeout", "20m"); diff --git a/src/main/java/io/rakam/presto/kafka/KafkaConfig.java b/src/main/java/io/rakam/presto/kafka/KafkaConfig.java index 32c3da3..785dd5a 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaConfig.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaConfig.java @@ -34,6 +34,7 @@ public class KafkaConfig private String sessionTimeOut = "12000"; private String requestTimeOut = "15000"; private String historicalDataTopic; + private int outdatedDayIndex = 1; private static HostAddress toKafkaHostAddress(String value) { @@ -60,10 +61,22 @@ public String getMaxPollRecords() return maxPollRecords; } + public int getOutdatedDayIndex() + { + return outdatedDayIndex; + } + + @Config("outdated.day.index") + public KafkaConfig setOutdatedDayIndex(int outdatedDayIndex) + { + this.outdatedDayIndex = outdatedDayIndex; + return this; + } + @Config("max.poll.records") public KafkaConfig setMaxPollRecords(String maxPollRecords) { - if (Strings.isNullOrEmpty(maxPollRecords)) { + if (!Strings.isNullOrEmpty(maxPollRecords)) { this.maxPollRecords = maxPollRecords; } return this; diff --git a/src/main/java/io/rakam/presto/kafka/KafkaHistoricalDataHandler.java b/src/main/java/io/rakam/presto/kafka/KafkaHistoricalDataHandler.java index c48ed5b..e48b146 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaHistoricalDataHandler.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaHistoricalDataHandler.java @@ -52,7 +52,7 @@ public CompletableFuture handle(Iterable> t log.error(exception); } if (latch.decrementAndGet() == 0) { - log.info("%d records are sent to Kafka historical topic in %s.", totalRecords.value, + log.debug("%d records are sent to Kafka historical topic in %s.", totalRecords.value, Duration.succinctDuration(System.currentTimeMillis() - now, MILLISECONDS).toString()); future.complete(null); } diff --git a/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java b/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java index 67d531f..b6c0dfd 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaHistoricalWorker.java @@ -92,8 +92,8 @@ public void shutdown() @PostConstruct public void start() { - if (config.getHistoricalDataTopic() == null || !config.getHistoricalWorkerEnabled()) { - log.warn("The config `kafka.historical-data-topic` is not set. Ignoring historical processing.."); + if (!config.getHistoricalWorkerEnabled()) { + log.warn("Historical Worker is turned off. All the historical records will be pushed to topic: %s", config.getHistoricalDataTopic()); return; } workerThread = new Thread(this::execute); diff --git a/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java b/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java index 17f841d..88b7718 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java @@ -12,6 +12,7 @@ import io.airlift.log.Logger; import io.airlift.stats.CounterStat; import io.airlift.stats.DistributionStat; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.rakam.presto.BasicMemoryBuffer; import io.rakam.presto.BatchRecords; @@ -90,6 +91,7 @@ public class KafkaRealTimeWorker private Map statusSpentTime = new HashMap<>(); private long lastStatusChangeTime; private Status currentStatus; + private int outdatedRecordIndex; @Inject public KafkaRealTimeWorker(KafkaConfig config, MemoryTracker memoryTracker, FieldNameConfig fieldNameConfig, Optional historicalDataHandler, DecoupleMessage decoupleMessage, MiddlewareConfig middlewareConfig, StreamWorkerContext> context, TargetConnectorCommitter committer) @@ -99,7 +101,7 @@ public KafkaRealTimeWorker(KafkaConfig config, MemoryTracker memoryTracker, Fiel this.decoupleMessage = decoupleMessage; Set whitelistedCollections = fieldNameConfig.getWhitelistedCollections(); this.whiteListCollections = whitelistedCollections == null ? input -> true : input -> whitelistedCollections.contains(input); - + this.outdatedRecordIndex = config.getOutdatedDayIndex(); this.historicalDataHandler = historicalDataHandler.orNull(); this.committer = committer; this.memoryTracker = memoryTracker; @@ -173,6 +175,9 @@ public void execute() buffer.consumeRecords(records); realTimeRecordsStats.update(records.count()); + changeType(Status.FLUSHING_STREAM); + flushDataSafe(); + if (buffer.shouldFlush()) { changeType(Status.FLUSHING_STREAM); flushDataSafe(); @@ -180,14 +185,15 @@ public void execute() checkpoint(); - if (memoryTracker.availableMemoryInPercentage() < .3) { + if (memoryTracker.availableMemoryInPercentage() < .30) { changeType(Status.FLUSHING_STREAM); flushDataSafe(); - while (memoryTracker.availableMemoryInPercentage() < .3) { + while (memoryTracker.availableMemoryInPercentage() < .30) { changeType(Status.WAITING_FOR_MEMORY); try { + log.info("Not enough memory (%s)to process records sleeping for 1s", memoryTracker.availableMemoryInPercentage()); SECONDS.sleep(1); } catch (InterruptedException e) { @@ -239,8 +245,8 @@ private void flushDataSafe() changeType(Status.FLUSHING_MIDDLEWARE); long now = System.currentTimeMillis(); - log.info("Flushing records (%s) from stream buffer, it's been %s since last flush.", - succinctBytes(buffer.getTotalBytes()).toString(), + log.debug("Flushing %s records of size (%s) from stream buffer, it's been %s since last flush.", buffer.getTotalRecords(), + DataSize.succinctBytes(buffer.getTotalBytes()).toString(), Duration.succinctDuration(now - buffer.getPreviousFlushTimeMillisecond(), MILLISECONDS).toString()); Map.Entry>, CompletableFuture> extractedData = extract(records); @@ -256,7 +262,7 @@ private void flushDataSafe() middlewareBuffer.add(new BatchRecords(data, historicalDataAction, () -> commitSyncOffset(consumer, latestOffsets))); long totalDataSize = data.entrySet().stream().mapToLong(e -> e.getValue().page.getRetainedSizeInBytes()).sum(); - log.info("Flushed records to middleware buffer in %s, the data size is %s", + log.debug("Flushed records to middleware buffer in %s, the data size is %s", Duration.succinctDuration(System.currentTimeMillis() - now, MILLISECONDS).toString(), succinctBytes(totalDataSize)); } @@ -283,10 +289,9 @@ private void flushDataSafe() private Map.Entry>, CompletableFuture> extract(BasicMemoryBuffer>.Records records) { CompletableFuture historicalDataAction = BatchRecords.COMPLETED_FUTURE; - ProcessedRecords processedRecords = processRecords(records); int totalRecords = records.buffer.size(); - + int historicalRecordCount = 0; Iterable> realTimeRecords; if (processedRecords.recordsIndexedByDay.isEmpty()) { realTimeRecords = () -> Iterators.filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords)); @@ -298,7 +303,7 @@ private Map.Entry>, CompletableFuture Iterators.filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords)); - int historicalRecordCount = totalRecords - processedRecords.realTimeRecordCount; + historicalRecordCount = totalRecords - processedRecords.realTimeRecordCount; Iterable> filter = () -> Iterators.filter(records.buffer.iterator(), new NegateBitMapRecordPredicate(processedRecords.bitmapForRecords)); changeType(Status.FLUSHING_HISTORICAL); @@ -313,6 +318,7 @@ private Map.Entry>, CompletableFuture(realTimeRecords, historicalDataAction); } @@ -320,7 +326,6 @@ private ProcessedRecords processRecords(BasicMemoryBuffer recordsIndexedByDay = new Int2ObjectArrayMap<>(); int todayInDate = Ints.checkedCast(LocalDate.now().toEpochDay()); - int previousDay = todayInDate - 1; DecoupleMessage.RecordData recordData = new DecoupleMessage.RecordData(); int realtimeRecordCount = 0; boolean[] bitmapForRecords = new boolean[records.buffer.size()]; @@ -343,7 +348,7 @@ private ProcessedRecords processRecords(BasicMemoryBuffer= (todayInDate - outdatedRecordIndex) && dayOfRecord <= todayInDate)) { bitmapForRecords[i] = true; realtimeRecordCount++; } @@ -357,17 +362,19 @@ private ProcessedRecords processRecords(BasicMemoryBuffer entry : recordsIndexedByDay.int2ObjectEntrySet()) { - int day = entry.getIntKey(); - IntArrayList recordIndexes = entry.getValue(); - if (recordIndexes.size() > 1000 && (recordIndexes.size() * 1.0 / records.buffer.size()) > .25) { - IntListIterator iterator = recordIndexes.iterator(); - while (iterator.hasNext()) { - int i = iterator.nextInt(); - bitmapForRecords[i] = true; - realtimeRecordCount++; + if (config.getHistoricalWorkerEnabled()) { + for (Int2ObjectMap.Entry entry : recordsIndexedByDay.int2ObjectEntrySet()) { + int day = entry.getIntKey(); + IntArrayList recordIndexes = entry.getValue(); + if (recordIndexes.size() > 1000 && (recordIndexes.size() * 100.0 / records.buffer.size()) > 25) { + IntListIterator iterator = recordIndexes.iterator(); + while (iterator.hasNext()) { + int i = iterator.nextInt(); + bitmapForRecords[i] = true; + realtimeRecordCount++; + } + recordsIndexedByDay.remove(day); } - recordsIndexedByDay.remove(day); } } diff --git a/src/main/java/io/rakam/presto/kafka/KafkaStreamSourceModule.java b/src/main/java/io/rakam/presto/kafka/KafkaStreamSourceModule.java index dc097ae..1b0e625 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaStreamSourceModule.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaStreamSourceModule.java @@ -26,6 +26,7 @@ public class KafkaStreamSourceModule protected void setup(Binder binder) { KafkaConfig config = buildConfigObject(KafkaConfig.class); + configBinder(binder).bindConfig(JsonConfig.class); binder.bind(KafkaRealTimeWorker.class).asEagerSingleton(); diff --git a/src/main/java/io/rakam/presto/kafka/KafkaUtil.java b/src/main/java/io/rakam/presto/kafka/KafkaUtil.java index 4582824..0cafd15 100644 --- a/src/main/java/io/rakam/presto/kafka/KafkaUtil.java +++ b/src/main/java/io/rakam/presto/kafka/KafkaUtil.java @@ -10,6 +10,7 @@ import io.airlift.stats.CounterStat; import io.airlift.stats.DistributionStat; import io.airlift.units.Duration; +import io.rakam.presto.BasicMemoryBuffer; import io.rakam.presto.MemoryTracker; import io.rakam.presto.MiddlewareBuffer.TableCheckpoint; import io.rakam.presto.StreamWorkerContext; @@ -36,6 +37,7 @@ public class KafkaUtil { + private static final Logger log = Logger.get(KafkaUtil.class); public static Properties createConsumerConfig(KafkaConfig config) { @@ -68,7 +70,7 @@ public static Properties createConfig(KafkaConfig config) props.put("bootstrap.servers", kafkaNodes); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); - props.put("auto.offset.reset", offset); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset); props.put("session.timeout.ms", sessionTimeOut); props.put("heartbeat.interval.ms", "1000"); props.put("request.timeout.ms", requestTimeOut); @@ -138,10 +140,11 @@ public static void flush( long totalRecords = map.entrySet().stream().mapToLong(e -> e.getValue().stream() .mapToLong(v -> v.getTable().page.getPositionCount()).sum()).sum(); - log.info("Saving data, %d collections and %d events in total.", map.size(), totalRecords); - long now = System.currentTimeMillis(); + log.debug("Saving data, %d collections and %d events in total.", map.size(), totalRecords); + for (Map.Entry> entry : map.entrySet()) { + long now = System.currentTimeMillis(); CompletableFuture dbWriteWork = committer.process(entry.getKey(), entry.getValue()); dbWriteWork.whenComplete((aVoid, throwable) -> { long totalRecordCount = entry.getValue().stream() @@ -174,7 +177,6 @@ public static void flush( databaseFlushStats.update(totalRecordCount); databaseFlushDistribution.add(totalDuration.toMillis()); - memoryTracker.freeMemory(totalDataSize); }); } diff --git a/src/test/java/io/rakam/presto/KafkaProducerTest.java b/src/test/java/io/rakam/presto/KafkaProducerTest.java index 78c73f0..6280a18 100644 --- a/src/test/java/io/rakam/presto/KafkaProducerTest.java +++ b/src/test/java/io/rakam/presto/KafkaProducerTest.java @@ -23,9 +23,8 @@ public static void main(String[] args) throws Exception { String timestampFormat = "yyyy-MM-dd'T'HH:mm:ss.S'Z'"; - int topicCount = 10; - SimpleDateFormat dateFormat = new SimpleDateFormat(timestampFormat); - DateTime beginTime = new DateTime().minusDays(3); + int topicCount = 100; + DateTime beginTime = new DateTime().minusDays(60); DateTime endTime = new DateTime(); Random random = new Random(10000); @@ -72,7 +71,7 @@ public static void main(String[] args) ObjectNode data = (ObjectNode) node.get("data"); //Assign topicName to string variable - String topicName = "presto_test_10"; + String topicName = "presto_test_11"; // create instance for properties to access producer configs Properties props = new Properties(); diff --git a/src/test/java/io/rakam/presto/StressTest.java b/src/test/java/io/rakam/presto/StressTest.java index 31a1bc7..52aa532 100644 --- a/src/test/java/io/rakam/presto/StressTest.java +++ b/src/test/java/io/rakam/presto/StressTest.java @@ -46,6 +46,7 @@ public class StressTest { private static final Logger log = Logger.get(StressTest.class); + public static void main(String[] args) throws Exception { @@ -84,12 +85,12 @@ public static void main(String[] args) MiddlewareConfig middlewareConfig = new MiddlewareConfig() // .setMaxFlushDuration(Duration.succinctNanos(0)) ; - + CommitterConfig committerConfig = new CommitterConfig(); JsonDeserializer deserializer = new FabricJsonDeserializer(databaseHandler, fieldNameConfig); KafkaJsonMessageTransformer transformer = new KafkaJsonMessageTransformer(fieldNameConfig, databaseHandler, deserializer); final MemoryTracker memoryTracker = new MemoryTracker(); StreamWorkerContext context = new StreamWorkerContext(transformer, new KafkaRecordSizeCalculator(), memoryTracker, streamConfig); - TargetConnectorCommitter targetConnectorCommitter = new TargetConnectorCommitter(databaseHandler); + TargetConnectorCommitter targetConnectorCommitter = new TargetConnectorCommitter(databaseHandler, committerConfig); AtomicLong totalRecord = new AtomicLong(-1); AtomicLong lastPoll = new AtomicLong(System.currentTimeMillis()); diff --git a/src/test/java/io/rakam/presto/TestTargetConnectorCommitter.java b/src/test/java/io/rakam/presto/TestTargetConnectorCommitter.java index 32ab215..6041903 100644 --- a/src/test/java/io/rakam/presto/TestTargetConnectorCommitter.java +++ b/src/test/java/io/rakam/presto/TestTargetConnectorCommitter.java @@ -103,7 +103,7 @@ public void testCommitter() throws Exception { CountDownLatch latch = new CountDownLatch(3); - + CommitterConfig committerConfig = new CommitterConfig(); TestingMetadata connectorMetadata = new TestingMetadata(); catalogManager.registerCatalog(createTestingCatalog("testconnector", new ConnectorId("testconnector"), @@ -115,7 +115,7 @@ public void testCommitter() TestDatabaseHandler databaseHandler = new TestDatabaseHandler("test", "test", ImmutableList.of()); databaseHandler.setLatchForInsert(latch); - TargetConnectorCommitter committer = new TargetConnectorCommitter(databaseHandler); + TargetConnectorCommitter committer = new TargetConnectorCommitter(databaseHandler,committerConfig); SchemaTableName table = new SchemaTableName("test", "test"); TableData tableData = new TableData(new Page(1), ImmutableList.of()); @@ -131,7 +131,7 @@ public void testMultipleCommitter() throws Exception { CountDownLatch latch = new CountDownLatch(4); - + CommitterConfig committerConfig = new CommitterConfig(); TestingMetadata connectorMetadata = new TestingMetadata(); connectorMetadata.createTable(session.toConnectorSession(), new ConnectorTableMetadata(new SchemaTableName("test", "test0"), ImmutableList.of()), false); @@ -144,7 +144,7 @@ public void testMultipleCommitter() TestDatabaseHandler databaseHandler = new TestDatabaseHandler("test", "test", ImmutableList.of()); databaseHandler.setLatchForInsert(latch); - TargetConnectorCommitter committer = new TargetConnectorCommitter(databaseHandler); + TargetConnectorCommitter committer = new TargetConnectorCommitter(databaseHandler,committerConfig); SchemaTableName table0 = new SchemaTableName("test", "test0"); SchemaTableName table1 = new SchemaTableName("test", "test1"); @@ -167,7 +167,7 @@ public void testSchemaChange() { CountDownLatch latch = new CountDownLatch(4); TestingMetadata connectorMetadata = new TestingMetadata(); - + CommitterConfig committerConfig = new CommitterConfig(); catalogManager.registerCatalog(createTestingCatalog("testconnector", new ConnectorId("testconnector"), new TestingConnector(connectorMetadata), testTransactionManager, testMetadataManager)); @@ -180,7 +180,7 @@ public void testSchemaChange() TestDatabaseHandler databaseHandler = new TestDatabaseHandler("test", "test", ImmutableList.of()); databaseHandler.setLatchForInsert(latch); - TargetConnectorCommitter committer = new TargetConnectorCommitter(databaseHandler); + TargetConnectorCommitter committer = new TargetConnectorCommitter(databaseHandler,committerConfig); TableData page1 = new TableData(new Page(createStringsBlock("test")), ImmutableList.of(new ColumnMetadata("test1", VarcharType.VARCHAR))); TableData page2 = new TableData(new Page(createStringsBlock("test"), createLongsBlock(1)), schema); diff --git a/start.sh b/start.sh index e8ec67a..1acae7b 100644 --- a/start.sh +++ b/start.sh @@ -10,6 +10,11 @@ echo kafka.historical-data-topic=${KAFKA_HISTORICAL_TOPIC} >> /home/rakam/config echo stream.max-flush-duration=${STREAM_FLUSH_DURATION} >> /home/rakam/config_${SERVICE_ENV}.properties echo middleware.max-flush-duration=${MIDDLEWARE_FLUSH_DURATION} >> /home/rakam/config_${SERVICE_ENV}.properties echo stream.memory-multiplier=${STREAM_MEMORY_MULTIPLIER} >> /home/rakam/config_${SERVICE_ENV}.properties +echo max.poll.records=${MAX_POLL_RECORDS} >> /home/rakam/config_${SERVICE_ENV}.properties +echo outdated.day.index=${OUTDATED_DAY_INDEX} >> /home/rakam/config_${SERVICE_ENV}.properties +echo committer.thread.count=${COMMITTER_THREAD_COUNT} >> /home/rakam/config_${SERVICE_ENV}.properties -java -jar -Xms${JAVA_PROCESS_MIN_HEAP} -Xmx${JAVA_PROCESS_MAX_HEAP} -XX:+${GC_ALGO} -XX:+HeapDumpOnOutOfMemoryError /home/rakam/rakam-data-collector.jar /home/rakam/config_${SERVICE_ENV}.properties server +echo io.rakam=${LOG_LEVEL} >> /home/rakam/log.properties + +java -jar -Xms${JAVA_PROCESS_MIN_HEAP} -Xmx${JAVA_PROCESS_MAX_HEAP} -XX:+${GC_ALGO} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/mesos/sandbox -Dlog.levels-file=/home/rakam/log.properties /home/rakam/rakam-data-collector.jar /home/rakam/config_${SERVICE_ENV}.properties server