Skip to content

Commit

Permalink
1) added logs
Browse files Browse the repository at this point in the history
	2) added new config to control number of committer threads.
	3) formatting changes.
`
  • Loading branch information
satendra.sahu committed Jan 24, 2018
1 parent 3ee4fe5 commit 3935bbf
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 52 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN useradd -ms /bin/bash rakam
ARG CACHEBUST=1

COPY src/main/resources/config_* /home/rakam/
COPY log.properties /home/rakam
COPY *.sh /home/rakam/
COPY target/rakam-data-collector.jar /home/rakam

Expand Down
2 changes: 1 addition & 1 deletion log.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
io.rakam=DEBUG

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
public final class InMemoryFileSystem
extends FileSystem
{

private final MemoryTracker memoryTracker;
Map<String, DynamicSliceOutput> files;

Expand Down Expand Up @@ -111,7 +110,6 @@ public void setWorkingDirectory(Path path)
public boolean mkdirs(Path path, FsPermission fsPermission)
throws IOException
{

throw new UnsupportedOperationException();
}

Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/rakam/presto/BasicMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@

package io.rakam.presto;

import io.airlift.log.Logger;

import java.util.ArrayList;
import java.util.List;

public class BasicMemoryBuffer<T>
implements MemoryBuffer<T>
{
private static final Logger log = Logger.get(BasicMemoryBuffer.class);

private final long millisecondsToBuffer;
private final ArrayList<T> buffer;
private final ArrayList<T> bulkBuffer;
Expand Down Expand Up @@ -76,8 +80,8 @@ public void clear()

public boolean shouldFlush()
{
return ((System.currentTimeMillis() - previousFlushTimeMillisecond) >= getMillisecondsToBuffer())
|| memoryTracker.availableMemory() - (totalBytes * memoryMultiplier) < 0;
return ((System.currentTimeMillis() - previousFlushTimeMillisecond) >= getMillisecondsToBuffer()
|| (memoryTracker.availableMemory() - (totalBytes * memoryMultiplier) < 0));
}

public Records getRecords()
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/rakam/presto/CommitterConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Rakam Incorporation
*/

package io.rakam.presto;

import io.airlift.configuration.Config;

public class CommitterConfig
{
private int committerThreadCount = 2;

public int getCommitterThreadCount()
{
return committerThreadCount;
}

@Config("committer.thread.count")
public void setCommitterThreadCount(int committerThreadCount)
{
this.committerThreadCount = committerThreadCount;
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/rakam/presto/MemoryTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

package io.rakam.presto;

import io.airlift.log.Logger;
import org.weakref.jmx.Managed;

import java.util.concurrent.atomic.AtomicLong;

public class MemoryTracker
{
private static final Logger log = Logger.get(MemoryTracker.class);
private static final long HEAP_MAX_SIZE = (long) (Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory());
private static final double AVAILABLE_RATIO = .4;
private static final double AVAILABLE_RATIO = .7;
private static final long AVAILABLE_HEAP_SIZE = (long) (HEAP_MAX_SIZE * AVAILABLE_RATIO);

private AtomicLong reservedMemory;
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/rakam/presto/MiddlewareBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.facebook.presto.spi.SchemaTableName;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.rakam.presto.deserialization.TableData;

import java.util.ArrayList;
Expand All @@ -20,6 +21,8 @@

public class MiddlewareBuffer
{
private static final Logger log = Logger.get(MiddlewareBuffer.class);

private final Map<SchemaTableName, List<TableCheckpoint>> batches;
private final MiddlewareConfig config;
private final MemoryTracker memoryTracker;
Expand Down Expand Up @@ -71,6 +74,7 @@ public Map<SchemaTableName, List<TableCheckpoint>> getRecordsToBeFlushed()

Set<SchemaTableName> tablesToBeFlushed;
if (memoryNeedsToBeAvailable > availableMemory) {
log.debug("memory needed (%s) is less than available memory (%s) flushing data ", memoryNeedsToBeAvailable, availableMemory);
List<Map.Entry<SchemaTableName, Counter>> sortedTables = bufferSize.entrySet().stream()
.sorted(Comparator.comparingLong(o -> -o.getValue().value))
.collect(Collectors.toList());
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/rakam/presto/ServiceStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.inject.multibindings.OptionalBinder;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.log.Level;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.airlift.log.LoggingConfiguration;
Expand Down Expand Up @@ -128,6 +129,7 @@ protected void setup(Binder binder)
configBinder(binder).bindConfig(BackupConfig.class);
configBinder(binder).bindConfig(FieldNameConfig.class);
configBinder(binder).bindConfig(S3MiddlewareConfig.class);
configBinder(binder).bindConfig(CommitterConfig.class);
configBinder(binder).bindConfig(MiddlewareConfig.class);
binder.bind(StreamWorkerContext.class).in(Scopes.SINGLETON);
binder.bind(TargetConnectorCommitter.class).in(Scopes.SINGLETON);
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/rakam/presto/StreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ public void setMemoryMultiplier(String memoryMultiplier)
this.memoryMultiplier = Integer.parseInt(memoryMultiplier);
}
}

}
6 changes: 4 additions & 2 deletions src/main/java/io/rakam/presto/TargetConnectorCommitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ public class TargetConnectorCommitter
private final AsyncFailsafe<Void> executor;
private AtomicInteger activeFlushCount = new AtomicInteger();


@Inject
public TargetConnectorCommitter(DatabaseHandler databaseHandler)
public TargetConnectorCommitter(DatabaseHandler databaseHandler, CommitterConfig committerConfig)
{

this.databaseHandler = databaseHandler;

RetryPolicy retryPolicy = new RetryPolicy()
Expand All @@ -38,7 +40,7 @@ public TargetConnectorCommitter(DatabaseHandler databaseHandler)
.withMaxRetries(3);

// executorPoolSize = Runtime.getRuntime().availableProcessors() * 2;
executorPoolSize = 2;
executorPoolSize = committerConfig.getCommitterThreadCount();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(executorPoolSize,
new ThreadFactoryBuilder().setNameFormat("target-committer").build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public Connector create(String connectorId, Map<String, String> config, Connecto
ImmutableMap.Builder<String, String> props = ImmutableMap.<String, String>builder()
.put("metadata.db.type", "mysql")
.put("metadata.db.url", config.getMetadataUrl())
.put("backup.threads", String.valueOf(Runtime.getRuntime().availableProcessors() * 3))
// No effect
.put("backup.threads", String.valueOf(Runtime.getRuntime().availableProcessors() * 3))
.put("storage.data-directory", Files.createTempDir().getAbsolutePath())
.put("metadata.db.connections.max", String.valueOf(config.getMaxConnection()))
.put("backup.timeout", "20m");
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/io/rakam/presto/kafka/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class KafkaConfig
private String sessionTimeOut = "12000";
private String requestTimeOut = "15000";
private String historicalDataTopic;
private int outdatedDayIndex = 1;

private static HostAddress toKafkaHostAddress(String value)
{
Expand All @@ -60,10 +61,22 @@ public String getMaxPollRecords()
return maxPollRecords;
}

public int getOutdatedDayIndex()
{
return outdatedDayIndex;
}

@Config("outdated.day.index")
public KafkaConfig setOutdatedDayIndex(int outdatedDayIndex)
{
this.outdatedDayIndex = outdatedDayIndex;
return this;
}

@Config("max.poll.records")
public KafkaConfig setMaxPollRecords(String maxPollRecords)
{
if (Strings.isNullOrEmpty(maxPollRecords)) {
if (!Strings.isNullOrEmpty(maxPollRecords)) {
this.maxPollRecords = maxPollRecords;
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletableFuture<Void> handle(Iterable<ConsumerRecord<byte[], byte[]>> t
log.error(exception);
}
if (latch.decrementAndGet() == 0) {
log.info("%d records are sent to Kafka historical topic in %s.", totalRecords.value,
log.debug("%d records are sent to Kafka historical topic in %s.", totalRecords.value,
Duration.succinctDuration(System.currentTimeMillis() - now, MILLISECONDS).toString());
future.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public void shutdown()
@PostConstruct
public void start()
{
if (config.getHistoricalDataTopic() == null || !config.getHistoricalWorkerEnabled()) {
log.warn("The config `kafka.historical-data-topic` is not set. Ignoring historical processing..");
if (!config.getHistoricalWorkerEnabled()) {
log.warn("Historical Worker is turned off. All the historical records will be pushed to topic: %s", config.getHistoricalDataTopic());
return;
}
workerThread = new Thread(this::execute);
Expand Down
49 changes: 28 additions & 21 deletions src/main/java/io/rakam/presto/kafka/KafkaRealTimeWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.stats.DistributionStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.rakam.presto.BasicMemoryBuffer;
import io.rakam.presto.BatchRecords;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class KafkaRealTimeWorker
private Map<Status, LongHolder> statusSpentTime = new HashMap<>();
private long lastStatusChangeTime;
private Status currentStatus;
private int outdatedRecordIndex;

@Inject
public KafkaRealTimeWorker(KafkaConfig config, MemoryTracker memoryTracker, FieldNameConfig fieldNameConfig, Optional<HistoricalDataHandler> historicalDataHandler, DecoupleMessage decoupleMessage, MiddlewareConfig middlewareConfig, StreamWorkerContext<ConsumerRecord<byte[], byte[]>> context, TargetConnectorCommitter committer)
Expand All @@ -99,7 +101,7 @@ public KafkaRealTimeWorker(KafkaConfig config, MemoryTracker memoryTracker, Fiel
this.decoupleMessage = decoupleMessage;
Set<String> whitelistedCollections = fieldNameConfig.getWhitelistedCollections();
this.whiteListCollections = whitelistedCollections == null ? input -> true : input -> whitelistedCollections.contains(input);

this.outdatedRecordIndex = config.getOutdatedDayIndex();
this.historicalDataHandler = historicalDataHandler.orNull();
this.committer = committer;
this.memoryTracker = memoryTracker;
Expand Down Expand Up @@ -173,21 +175,25 @@ public void execute()
buffer.consumeRecords(records);
realTimeRecordsStats.update(records.count());

changeType(Status.FLUSHING_STREAM);
flushDataSafe();

if (buffer.shouldFlush()) {
changeType(Status.FLUSHING_STREAM);
flushDataSafe();
}

checkpoint();

if (memoryTracker.availableMemoryInPercentage() < .3) {
if (memoryTracker.availableMemoryInPercentage() < .30) {
changeType(Status.FLUSHING_STREAM);
flushDataSafe();

while (memoryTracker.availableMemoryInPercentage() < .3) {
while (memoryTracker.availableMemoryInPercentage() < .30) {
changeType(Status.WAITING_FOR_MEMORY);

try {
log.info("Not enough memory (%s)to process records sleeping for 1s", memoryTracker.availableMemoryInPercentage());
SECONDS.sleep(1);
}
catch (InterruptedException e) {
Expand Down Expand Up @@ -239,8 +245,8 @@ private void flushDataSafe()
changeType(Status.FLUSHING_MIDDLEWARE);

long now = System.currentTimeMillis();
log.info("Flushing records (%s) from stream buffer, it's been %s since last flush.",
succinctBytes(buffer.getTotalBytes()).toString(),
log.debug("Flushing %s records of size (%s) from stream buffer, it's been %s since last flush.", buffer.getTotalRecords(),
DataSize.succinctBytes(buffer.getTotalBytes()).toString(),
Duration.succinctDuration(now - buffer.getPreviousFlushTimeMillisecond(), MILLISECONDS).toString());

Map.Entry<Iterable<ConsumerRecord<byte[], byte[]>>, CompletableFuture<Void>> extractedData = extract(records);
Expand All @@ -256,7 +262,7 @@ private void flushDataSafe()
middlewareBuffer.add(new BatchRecords(data, historicalDataAction, () -> commitSyncOffset(consumer, latestOffsets)));

long totalDataSize = data.entrySet().stream().mapToLong(e -> e.getValue().page.getRetainedSizeInBytes()).sum();
log.info("Flushed records to middleware buffer in %s, the data size is %s",
log.debug("Flushed records to middleware buffer in %s, the data size is %s",
Duration.succinctDuration(System.currentTimeMillis() - now, MILLISECONDS).toString(),
succinctBytes(totalDataSize));
}
Expand All @@ -283,10 +289,9 @@ private void flushDataSafe()
private Map.Entry<Iterable<ConsumerRecord<byte[], byte[]>>, CompletableFuture<Void>> extract(BasicMemoryBuffer<ConsumerRecord<byte[], byte[]>>.Records records)
{
CompletableFuture<Void> historicalDataAction = BatchRecords.COMPLETED_FUTURE;

ProcessedRecords processedRecords = processRecords(records);
int totalRecords = records.buffer.size();

int historicalRecordCount = 0;
Iterable<ConsumerRecord<byte[], byte[]>> realTimeRecords;
if (processedRecords.recordsIndexedByDay.isEmpty()) {
realTimeRecords = () -> Iterators.filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords));
Expand All @@ -298,7 +303,7 @@ private Map.Entry<Iterable<ConsumerRecord<byte[], byte[]>>, CompletableFuture<Vo
else {
realTimeRecords = () -> Iterators.filter(records.buffer.iterator(), new BitMapRecordPredicate(processedRecords.bitmapForRecords));

int historicalRecordCount = totalRecords - processedRecords.realTimeRecordCount;
historicalRecordCount = totalRecords - processedRecords.realTimeRecordCount;

Iterable<ConsumerRecord<byte[], byte[]>> filter = () -> Iterators.filter(records.buffer.iterator(), new NegateBitMapRecordPredicate(processedRecords.bitmapForRecords));
changeType(Status.FLUSHING_HISTORICAL);
Expand All @@ -313,14 +318,14 @@ private Map.Entry<Iterable<ConsumerRecord<byte[], byte[]>>, CompletableFuture<Vo
historicalRecordsStats.update(historicalRecordCount);
}

log.info("realTimeRecords: " + processedRecords.realTimeRecordCount + " historicalRecordCount: " + historicalRecordCount);
return new SimpleImmutableEntry<>(realTimeRecords, historicalDataAction);
}

private ProcessedRecords processRecords(BasicMemoryBuffer<ConsumerRecord<byte[], byte[]>>.Records records)
{
Int2ObjectArrayMap<IntArrayList> recordsIndexedByDay = new Int2ObjectArrayMap<>();
int todayInDate = Ints.checkedCast(LocalDate.now().toEpochDay());
int previousDay = todayInDate - 1;
DecoupleMessage.RecordData recordData = new DecoupleMessage.RecordData();
int realtimeRecordCount = 0;
boolean[] bitmapForRecords = new boolean[records.buffer.size()];
Expand All @@ -343,7 +348,7 @@ private ProcessedRecords processRecords(BasicMemoryBuffer<ConsumerRecord<byte[],
continue;
}

if (dayOfRecord == previousDay || dayOfRecord == todayInDate || historicalDataHandler == null) {
if (historicalDataHandler == null || (dayOfRecord >= (todayInDate - outdatedRecordIndex) && dayOfRecord <= todayInDate)) {
bitmapForRecords[i] = true;
realtimeRecordCount++;
}
Expand All @@ -357,17 +362,19 @@ private ProcessedRecords processRecords(BasicMemoryBuffer<ConsumerRecord<byte[],
}
}

for (Int2ObjectMap.Entry<IntArrayList> entry : recordsIndexedByDay.int2ObjectEntrySet()) {
int day = entry.getIntKey();
IntArrayList recordIndexes = entry.getValue();
if (recordIndexes.size() > 1000 && (recordIndexes.size() * 1.0 / records.buffer.size()) > .25) {
IntListIterator iterator = recordIndexes.iterator();
while (iterator.hasNext()) {
int i = iterator.nextInt();
bitmapForRecords[i] = true;
realtimeRecordCount++;
if (config.getHistoricalWorkerEnabled()) {
for (Int2ObjectMap.Entry<IntArrayList> entry : recordsIndexedByDay.int2ObjectEntrySet()) {
int day = entry.getIntKey();
IntArrayList recordIndexes = entry.getValue();
if (recordIndexes.size() > 1000 && (recordIndexes.size() * 100.0 / records.buffer.size()) > 25) {
IntListIterator iterator = recordIndexes.iterator();
while (iterator.hasNext()) {
int i = iterator.nextInt();
bitmapForRecords[i] = true;
realtimeRecordCount++;
}
recordsIndexedByDay.remove(day);
}
recordsIndexedByDay.remove(day);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class KafkaStreamSourceModule
protected void setup(Binder binder)
{
KafkaConfig config = buildConfigObject(KafkaConfig.class);

configBinder(binder).bindConfig(JsonConfig.class);

binder.bind(KafkaRealTimeWorker.class).asEagerSingleton();
Expand Down
Loading

0 comments on commit 3935bbf

Please sign in to comment.