diff --git a/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala b/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala index d2898a558..16eb8dbb2 100644 --- a/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala +++ b/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala @@ -119,7 +119,7 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) /** * A Flink function that is responsible for converting an array of pre-aggregates (aka a tile) to a form * that can be written out to the KV store (PutRequest object). - * + * * @param groupByServingInfoParsed The GroupBy we are working with * @tparam T The input data type */ @@ -146,7 +146,7 @@ case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParse eventProcessingErrorCounter.inc() avroConversionErrorCounter.inc() } - + def avroConvertTileToPutRequest(in: TimestampedTile): PutRequest = { val tsMills = in.latestTsMillis @@ -157,13 +157,13 @@ case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParse val valueBytes = in.tileBytes logger.debug( - s""" + s""" |Avro converting tile to PutRequest - tile=${in} |groupBy=${groupByServingInfoParsed.groupBy.getMetaData.getName} tsMills=$tsMills keys=$keys - |keyBytes=${java.util. Base64.getEncoder.encodeToString(keyBytes)} + |keyBytes=${java.util.Base64.getEncoder.encodeToString(keyBytes)} |valueBytes=${java.util.Base64.getEncoder.encodeToString(valueBytes)} |streamingDataset=$streamingDataset""".stripMargin - ) + ) PutRequest(keyBytes, valueBytes, streamingDataset, Some(tsMills)) } diff --git a/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala b/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala index 4f48a2ced..090772362 100644 --- a/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala +++ b/flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala @@ -72,15 +72,15 @@ class FlinkRowAggregationFunction( // Given that the rowAggregator is transient, it may be null when a job is restored from a checkpoint if (rowAggregator == null) { logger.debug( - f"The Flink RowAggregator was null for groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills" - ) + f"The Flink RowAggregator was null for groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills" + ) initializeRowAggregator() } logger.debug( - f"Flink pre-aggregates BEFORE adding new element: accumulatorIr=[${accumulatorIr.ir - .mkString(", ")}] groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element" - ) + f"Flink pre-aggregates BEFORE adding new element: accumulatorIr=[${accumulatorIr.ir + .mkString(", ")}] groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element" + ) val partialAggregates = Try { rowAggregator.update(accumulatorIr.ir, row) @@ -89,9 +89,9 @@ class FlinkRowAggregationFunction( partialAggregates match { case Success(v) => { logger.debug( - f"Flink pre-aggregates AFTER adding new element [${v.mkString(", ")}] " + - f"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element" - ) + f"Flink pre-aggregates AFTER adding new element [${v.mkString(", ")}] " + + f"groupBy=${groupBy.getMetaData.getName} tsMills=$tsMills element=$element" + ) TimestampedIR(v, Some(tsMills)) } case Failure(e) => @@ -186,12 +186,12 @@ class FlinkRowAggProcessFunction( tileBytes match { case Success(v) => { logger.debug( - s""" + s""" |Flink aggregator processed element irEntry=$irEntry |tileBytes=${java.util.Base64.getEncoder.encodeToString(v)} |windowEnd=$windowEnd groupBy=${groupBy.getMetaData.getName} |keys=$keys isComplete=$isComplete tileAvroSchema=${tileCodec.tileAvroSchema}""" - ) + ) // The timestamp should never be None here. out.collect(TimestampedTile(keys, v, irEntry.latestTsMillis.get)) }