Skip to content

Commit

Permalink
adicionado exemplo de consumer por input
Browse files Browse the repository at this point in the history
  • Loading branch information
cleciusjm committed Oct 14, 2020
1 parent 78ecaf2 commit 06ba91c
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/main/kotlin/experttalk/kafka/sample/Kafka.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Kafka(
KafkaConsumer<String, String>(Properties().also {
it[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
it[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "true"
it[ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG] = "1000"
it[ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG] = "100"
it[ConsumerConfig.GROUP_ID_CONFIG] = clientId
it[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = STR_DESERIALIZER
it[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = STR_DESERIALIZER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import org.apache.kafka.clients.producer.ProducerRecord

suspend fun main() {
with(Kafka().producer) {
for (i in (0..10)) {
send(ProducerRecord("experttalk.sample.topic", "msg $i"))
for (i in (0..1000)) {
send(ProducerRecord(DEFAULT_TOPIC, "msg $i"))
delay(500)
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/main/kotlin/experttalk/kafka/sample/mainConsumer.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package experttalk.kafka.sample

import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration

fun main() {
with(Kafka().consumer) {
subscribe(listOf(DEFAULT_TOPIC))
for (message in poll(Duration.ofMinutes(1))) {
println("Recebida: $message")
fun main() = with(Kafka(clientId = "kafka-client2").consumer) {
subscribe(listOf(DEFAULT_TOPIC))
while (true) {
for (message in poll(Duration.ofDays(1))) {
println("Recebida: ${message.value()}")
}
}
}
12 changes: 12 additions & 0 deletions src/main/kotlin/experttalk/kafka/sample/mainConsumer2.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package experttalk.kafka.sample

import java.time.Duration

fun main() = with(Kafka().consumer) {
subscribe(listOf(DEFAULT_TOPIC))
while (true) {
for (message in poll(Duration.ofDays(1))) {
println("Recebida: ${message.value()}")
}
}
}
14 changes: 14 additions & 0 deletions src/main/kotlin/experttalk/kafka/sample/mainInputProducer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package experttalk.kafka.sample

import org.apache.kafka.clients.producer.ProducerRecord

fun main() {
with(Kafka().producer) {

while (true) {
print("Mensagem: ")
val message = readLine() ?: continue
send(ProducerRecord(DEFAULT_TOPIC, message))
}
}
}

0 comments on commit 06ba91c

Please sign in to comment.