This document defines how to describe Kafka-specific information on AsyncAPI.
Current version is 0.5.0
.
This object contains information about the server representation in Kafka.
servers:
production:
bindings:
kafka:
schemaRegistryUrl: 'https://my-schema-registry.com'
schemaRegistryVendor: 'confluent'
bindingVersion: '0.5.0'
This object contains information about the channel representation in Kafka (eg. a Kafka topic).
Field Name | Type | Description | Applicability [default] | Constraints |
---|---|---|---|---|
topic |
string | Kafka topic name if different from channel name. | OPTIONAL | - |
partitions |
integer | Number of partitions configured on this topic (useful to know how many parallel consumers you may run). | OPTIONAL | Must be positive |
replicas |
integer | Number of replicas configured on this topic. | OPTIONAL | MUST be positive |
topicConfiguration |
TopicConfiguration Object | Topic configuration properties that are relevant for the API. | OPTIONAL | - |
bindingVersion |
string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [latest ] |
- |
This object MUST contain only the properties defined above.
channels:
user-signedup:
bindings:
kafka:
topic: 'my-specific-topic-name'
partitions: 20
replicas: 3
topicConfiguration:
cleanup.policy: ["delete", "compact"]
retention.ms: 604800000
retention.bytes: 1000000000
delete.retention.ms: 86400000
max.message.bytes: 1048588
bindingVersion: '0.5.0'
This objects contains information about the API relevant topic configuration in Kafka.
Field Name | Type | Description | Applicability [default] | Constraints |
---|---|---|---|---|
cleanup.policy |
array | The cleanup.policy configuration option. |
OPTIONAL | array may only contain delete and/or compact |
retention.ms |
long | The retention.ms configuration option. |
OPTIONAL | see kafka documentation |
retention.bytes |
long | The retention.bytes configuration option. |
OPTIONAL | see kafka documentation |
delete.retention.ms |
long | The delete.retention.ms configuration option. |
OPTIONAL | see kafka documentation |
max.message.bytes |
integer | The max.message.bytes configuration option. |
OPTIONAL | see kafka documentation |
confluent.key.schema.validation |
boolean | It shows whether the schema validation for the message key is enabled. Vendor specific config. | OPTIONAL | - |
confluent.key.subject.name.strategy |
string | The name of the schema lookup strategy for the message key. Vendor specific config. | OPTIONAL | Clients should default to the vendor default if not supplied. |
confluent.value.schema.validation |
boolean | It shows whether the schema validation for the message value is enabled. Vendor specific config. | OPTIONAL | - |
confluent.value.subject.name.strategy |
string | The name of the schema lookup strategy for the message value. Vendor specific config. | OPTIONAL | Clients should default to the vendor default if not supplied. |
This object MAY contain the properties defined above including optional additional properties.
topicConfiguration:
cleanup.policy: ["delete", "compact"]
retention.ms: 604800000
retention.bytes: 1000000000
delete.retention.ms: 86400000
max.message.bytes: 1048588
confluent.key.schema.validation: true
confluent.key.subject.name.strategy: "TopicNameStrategy"
confluent.value.schema.validation: true
confluent.value.subject.name.strategy: "TopicNameStrategy"
This object contains information about the operation representation in Kafka (eg. the way to consume messages)
Field Name | Type | Description | Applicability [default] | Constraints |
---|---|---|---|---|
groupId |
Schema Object | Reference Object | Id of the consumer group. | OPTIONAL | - |
clientId |
Schema Object | Reference Object | Id of the consumer inside a consumer group. | OPTIONAL | - |
bindingVersion |
string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [latest ] |
- |
This object MUST contain only the properties defined above.
channels:
user-signedup:
operations:
userSignup:
action: receive
bindings:
kafka:
groupId:
type: string
enum: ['myGroupId']
clientId:
type: string
enum: ['myClientId']
bindingVersion: '0.5.0'
This object contains information about the message representation in Kafka.
Field Name | Type | Description | Applicability [default] | Constraints |
---|---|---|---|---|
key |
Schema Object | Reference Object | AVRO Schema Object | The message key. NOTE: You can also use the reference object way. | OPTIONAL | - |
schemaIdLocation |
string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. header or payload ). |
OPTIONAL | MUST NOT be specified if schemaRegistryUrl is not specified at the Server level |
schemaIdPayloadEncoding |
string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g confluent / apicurio-legacy / apicurio-new ). |
OPTIONAL | MUST NOT be specified if schemaRegistryUrl is not specified at the Server level |
schemaLookupStrategy |
string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if schemaRegistryUrl is not specified at the Server level |
bindingVersion |
string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [latest ] |
- |
This object MUST contain only the properties defined above.
This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier.
channels:
test:
address: test-topic
messages:
testMessage:
bindings:
kafka:
key:
type: string
enum: ['myKey']
schemaIdLocation: 'payload'
schemaIdPayloadEncoding: '4'
bindingVersion: '0.5.0'
This is another example that describes the use if Apicurio schema registry. We describe the apicurio-new
way of serializing without details on how it's implemented. We reference a specific lookup strategy that may be used to retrieve schema Id from registry during serialization.
channels:
test:
address: test-topic
messages:
testMessage:
bindings:
kafka:
key:
type: string
enum: ['myKey']
schemaIdLocation: 'payload'
schemaIdPayloadEncoding: 'apicurio-new'
schemaLookupStrategy: 'TopicIdStrategy'
bindingVersion: '0.5.0'