Skip to content

Latest commit

 

History

History
122 lines (81 loc) · 7.35 KB

README.md

File metadata and controls

122 lines (81 loc) · 7.35 KB

Finatra Kafka & Kafka Streams

status: unmaintained Build Status Codecov Maven Central

Finatra integration with Kafka Streams to easily build Kafka Streams applications on top of a TwitterServer.

Note: Versions of finatra-kafka and finatra-kafka-streams that are published against Scala 2.12 use Kafka 2.2, versions of that are published against Scala 2.13 use Kafka 2.5. This simplified cross-version support is ephemeral until we can drop Kafka 2.2.

Announcement

Finatra-Kafka was migrated out of Finatra core library as a stand-alone project in 2022. Currently, Twitter has no plans to develop, maintain or support Finatra-Kafka in any form in the future. If your organization is interested in maintaining this framework, please file an issue or open a discussion to engage the community.

Development version

The main branch in Github tracks the latest code. If you want to contribute a patch or fix, please use this branch as the basis of your Pull Request.

Features

Basics

With KafkaStreamsTwitterServer, a fully functional service can be written by simply configuring the Kafka Streams Builder via the configureKafkaStreams() lifecycle method. See the examples section.

Transformers

Implement custom transformers using FinatraTransformer.

Aggregations

There are several included aggregating transformers, which may be used when configuring a StreamsBuilder

: - aggregate - sample - sum

Stores

RocksDB

In addition to using state stores, you may also use a RocksDB-backed store. This affords all of the advantages of using RocksDB, including efficient range scans.

Queryable State

Finatra Kafka Streams supports directly querying the state from a store. This can be useful for creating a service that serves data aggregated within a local Topology. You can use static partitioning to query an instance deterministically known to hold a key.

See how the queryable state is used in the following example.

Queryable Stores

Examples

The integration tests serve as a good collection of example Finatra Kafka Streams servers.

Word Count Server

We can build a lightweight server that counts the unique words from an input topic, storing the results in RocksDB.

class WordCountRocksDbServer extends KafkaStreamsTwitterServer {

  override val name = "wordcount"
  private val countStoreName = "CountsStore"

  override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = {
    builder.asScala
      .stream[Bytes, String]("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
      .flatMapValues(_.split(' '))
      .groupBy((_, word) => word)(Serialized.`with`(Serdes.String, Serdes.String))
      .count()(Materialized.as(countStoreName))
      .toStream
      .to("WordsWithCountsTopic")(Produced.`with`(Serdes.String, ScalaSerdes.Long))
  }
}

Queryable State

We can then expose a Thrift endpoint enabling clients to directly query the state via interactive queries.

class WordCountRocksDbServer extends KafkaStreamsTwitterServer with QueryableState {

  ...

  final override def configureThrift(router: ThriftRouter): Unit = {
    router
      .add(
        new WordCountQueryService(
          queryableFinatraKeyValueStore[String, Long](
            storeName = countStoreName,
            primaryKeySerde = Serdes.String
          )
        )
      )
  }
}

In this example, WordCountQueryService is an underlying Thrift service.

Testing

Finatra Kafka Streams includes tooling that simplifies the process of writing highly testable services. See TopologyFeatureTest, which includes a FinatraTopologyTester that integrates Kafka Streams' TopologyTestDriver with a KafkaStreamsTwitterServer.

License

Licensed under the Apache License, Version 2.0: https://www.apache.org/licenses/LICENSE-2.0