From 190e31467db06b45a7cd6c36dce6add0c18b1457 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 21 Aug 2024 10:56:34 +0800 Subject: [PATCH] feat: support custom kafka group id prefix (#18115) Signed-off-by: xxchan --- .../source_inline/kafka/consumer_group.slt | 40 ++++ src/connector/src/sink/kafka.rs | 1 + src/connector/src/source/kafka/mod.rs | 14 ++ .../src/source/kafka/source/reader.rs | 6 +- src/connector/src/with_options_test.rs | 6 +- src/connector/with_options_sink.yaml | 123 +++++++++--- src/connector/with_options_source.yaml | 175 +++++++++++++++--- 7 files changed, 311 insertions(+), 54 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index a71329e46509d..3b432637a0ab7 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -20,9 +20,23 @@ WITH( scan.startup.mode = 'earliest', ) FORMAT PLAIN ENCODE JSON; +# custom group id prefix +statement ok +CREATE SOURCE s2(x varchar) +WITH( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_consumer_group', + scan.startup.mode = 'earliest', + group.id.prefix = 'my_group' +) FORMAT PLAIN ENCODE JSON; + + statement ok CREATE MATERIALIZED VIEW mv AS SELECT * from s; +statement ok +CREATE MATERIALIZED VIEW mv2 AS SELECT * from s2; + query ? SELECT * FROM s order by x; ---- @@ -87,9 +101,35 @@ d e f + +query ? +SELECT * FROM mv2 order by x; +---- +a +b +c +d +e +f + + statement ok DROP SOURCE s CASCADE; +statement ok +DROP SOURCE s2 CASCADE; + +## fragment id is not deterministic so comment out +# system ok +# rpk group list +# --- +# BROKER GROUP STATE +# 0 my_group-8 Empty +# 0 rw-consumer-3 Empty +# 0 rw-consumer-4294967295 Empty +# 0 rw-consumer-7 Empty + + system ok pkill rpk diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 3a6914d2249c5..588c5d99ae955 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -280,6 +280,7 @@ impl From for KafkaProperties { rdkafka_properties_consumer: Default::default(), privatelink_common: val.privatelink_common, aws_auth_props: val.aws_auth_props, + group_id_prefix: None, unknown_fields: Default::default(), } } diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 2360f9fb8a337..125bf73a3529f 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -120,6 +120,20 @@ pub struct KafkaProperties { )] pub time_offset: Option, + /// Specify a custom consumer group id prefix for the source. + /// Defaults to `rw-consumer`. + /// + /// Notes: + /// - Each job (materialized view) will have a separated consumer group and + /// contains a generated suffix in the group id. + /// The consumer group will be `{group_id_prefix}-{fragment_id}`. + /// - The consumer group is solely for monintoring progress in some external + /// Kafka tools, and for authorization. RisingWave does not rely on committed + /// offsets, and does not join the consumer group. It just reports offsets + /// to the group. + #[serde(rename = "group.id.prefix")] + pub group_id_prefix: Option, + /// This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which /// combine both key and value fields of the Kafka message. /// TODO: Currently, `Option` can not be parsed here. diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 798069ddb12f9..5ace1820b4249 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -73,9 +73,13 @@ impl SplitReader for KafkaSplitReader { properties.common.set_security_properties(&mut config); properties.set_client(&mut config); + let group_id_prefix = properties + .group_id_prefix + .as_deref() + .unwrap_or("rw-consumer"); config.set( "group.id", - format!("rw-consumer-{}", source_ctx.fragment_id), + format!("{}-{}", group_id_prefix, source_ctx.fragment_id), ); let ctx_common = KafkaContextCommon::new( diff --git a/src/connector/src/with_options_test.rs b/src/connector/src/with_options_test.rs index cd2adc11d0718..4eb4b52fe070d 100644 --- a/src/connector/src/with_options_test.rs +++ b/src/connector/src/with_options_test.rs @@ -170,7 +170,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String { // Generate the output format!( - "# THIS FILE IS AUTO_GENERATED. DO NOT EDIT\n\n{}", + "# THIS FILE IS AUTO_GENERATED. DO NOT EDIT\n# UPDATE WITH: ./risedev generate-with-options\n\n{}", serde_yaml::to_string(&struct_infos).unwrap() ) } @@ -238,14 +238,14 @@ fn extract_comments(attrs: &[Attribute]) -> String { if let Ok(Meta::NameValue(mnv)) = attr.parse_meta() { if mnv.path.is_ident("doc") { if let syn::Lit::Str(lit_str) = mnv.lit { - return Some(lit_str.value()); + return Some(lit_str.value().trim().to_string()); } } } None }) .collect::>() - .join(" ") + .join("\n") .trim() .to_string() } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 653acaadaaaf1..c33c1e79b1e7d 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -1,4 +1,5 @@ # THIS FILE IS AUTO_GENERATED. DO NOT EDIT +# UPDATE WITH: ./risedev generate-with-options BigQueryConfig: fields: @@ -224,11 +225,17 @@ GooglePubSubConfig: required: true - name: pubsub.emulator_host field_type: String - comments: use the connector with a pubsub emulator + comments: |- + use the connector with a pubsub emulator + required: false - name: pubsub.credentials field_type: String - comments: A JSON string containing the service account credentials for authorization, see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide. The provided account credential must have the `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles) + comments: |- + A JSON string containing the service account credentials for authorization, + see the [service-account](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account) credentials guide. + The provided account credential must have the + `pubsub.publisher` [role](https://cloud.google.com/pubsub/docs/access-control#roles) required: false IcebergConfig: fields: @@ -301,7 +308,9 @@ KafkaConfig: default: 'Duration :: from_secs (5)' - name: properties.security.protocol field_type: String - comments: Security protocol used for RisingWave to communicate with Kafka brokers. Could be PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. + comments: |- + Security protocol used for RisingWave to communicate with Kafka brokers. Could be + PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. required: false - name: properties.ssl.endpoint.identification.algorithm field_type: String @@ -368,15 +377,26 @@ KafkaConfig: default: 'Duration :: from_millis (100)' - name: primary_key field_type: String - comments: We have parsed the primary key for an upsert kafka sink into a `usize` vector representing the indices of the pk columns in the frontend, so we simply store the primary key here as a string. + comments: |- + We have parsed the primary key for an upsert kafka sink into a `usize` vector representing + the indices of the pk columns in the frontend, so we simply store the primary key here + as a string. required: false - name: properties.message.max.bytes field_type: usize - comments: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's max.message.bytes limit + comments: |- + Maximum Kafka protocol request message size. Due to differing framing overhead between + protocol versions the producer is unable to reliably enforce a strict max message limit at + produce time and may exceed the maximum size by one message in protocol ProduceRequests, + the broker will enforce the the topic's max.message.bytes limit required: false - name: properties.receive.message.max.bytes field_type: usize - comments: Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. + comments: |- + Maximum Kafka protocol response message size. This serves as a safety precaution to avoid + memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + + 512 to allow for protocol overhead; the value is adjusted automatically unless the + configuration property is explicitly set. required: false - name: properties.statistics.interval.ms field_type: usize @@ -394,19 +414,33 @@ KafkaConfig: required: false - name: properties.queue.buffering.max.messages field_type: usize - comments: Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. A value of 0 disables this limit. + comments: |- + Maximum number of messages allowed on the producer queue. This queue is shared by all + topics and partitions. A value of 0 disables this limit. required: false - name: properties.queue.buffering.max.kbytes field_type: usize - comments: Maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. This property has higher priority than queue.buffering.max.messages. + comments: |- + Maximum total message size sum allowed on the producer queue. This queue is shared by all + topics and partitions. This property has higher priority than queue.buffering.max.messages. required: false - name: properties.queue.buffering.max.ms field_type: f64 - comments: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. + comments: |- + Delay in milliseconds to wait for messages in the producer queue to accumulate before + constructing message batches (MessageSets) to transmit to brokers. A higher value allows + larger and more effective (less overhead, improved compression) batches of messages to + accumulate at the expense of increased message delivery latency. required: false - name: properties.enable.idempotence field_type: bool - comments: 'When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer will fail if user-supplied configuration is incompatible.' + comments: |- + When set to true, the producer will ensure that messages are successfully produced exactly + once and in the original produce order. The following configuration properties are adjusted + automatically (if not modified by the user) when idempotence is enabled: + max.in.flight.requests.per.connection=5 (must be less than or equal to 5), + retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer + will fail if user-supplied configuration is incompatible. required: false - name: properties.message.send.max.retries field_type: usize @@ -422,7 +456,11 @@ KafkaConfig: required: false - name: properties.batch.size field_type: usize - comments: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead. This limit is applied after the first message has been added to the batch, regardless of the first message's size, this is to ensure that messages that exceed batch.size are produced. + comments: |- + Maximum size (in bytes) of all messages batched in one MessageSet, including protocol + framing overhead. This limit is applied after the first message has been added to the + batch, regardless of the first message's size, this is to ensure that messages that exceed + batch.size are produced. required: false - name: properties.compression.codec field_type: CompressionCodec @@ -430,7 +468,10 @@ KafkaConfig: required: false - name: properties.message.timeout.ms field_type: usize - comments: Produce message timeout. This value is used to limits the time a produced message waits for successful delivery (including retries). + comments: |- + Produce message timeout. + This value is used to limits the time a produced message waits for + successful delivery (including retries). required: false default: '5000' - name: properties.max.in.flight.requests.per.connection @@ -538,18 +579,27 @@ MongodbConfig: required: true - name: collection.name field_type: String - comments: The collection name where data should be written to or read from. For sinks, the format is `db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field` for more information. + comments: |- + The collection name where data should be written to or read from. For sinks, the format is + `db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field` + for more information. required: true - name: r#type field_type: String required: true - name: collection.name.field field_type: String - comments: The dynamic collection name where data should be sunk to. If specified, the field value will be used as the collection name. The collection name format is same as `collection.name`. If the field value is null or an empty string, then the `collection.name` will be used as a fallback destination. + comments: |- + The dynamic collection name where data should be sunk to. If specified, the field value will be used + as the collection name. The collection name format is same as `collection.name`. If the field value is + null or an empty string, then the `collection.name` will be used as a fallback destination. required: false - name: collection.name.field.drop field_type: bool - comments: Controls whether the field value of `collection.name.field` should be dropped when sinking. Set this option to true to avoid the duplicate values of `collection.name.field` being written to the result collection. + comments: |- + Controls whether the field value of `collection.name.field` should be dropped when sinking. + Set this option to true to avoid the duplicate values of `collection.name.field` being written to the + result collection. required: false default: Default::default - name: mongodb.bulk_write.max_entries @@ -561,11 +611,17 @@ MqttConfig: fields: - name: url field_type: String - comments: The url of the broker to connect to. e.g. tcp://localhost. Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, to denote the protocol for establishing a connection with the broker. `mqtts://`, `ssl://` will use the native certificates if no ca is specified + comments: |- + The url of the broker to connect to. e.g. tcp://localhost. + Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, + to denote the protocol for establishing a connection with the broker. + `mqtts://`, `ssl://` will use the native certificates if no ca is specified required: true - name: qos field_type: QualityOfService - comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once + comments: |- + The quality of service to use when publishing messages. Defaults to at_most_once. + Could be at_most_once, at_least_once or exactly_once required: false - name: username field_type: String @@ -577,11 +633,19 @@ MqttConfig: required: false - name: client_prefix field_type: String - comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave + comments: |- + Prefix for the mqtt client id. + The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave required: false - name: clean_start field_type: bool - comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.' + comments: |- + `clean_start = true` removes all the state from queues & instructs the broker + to clean all the client state when client disconnects. + + When set `false`, broker will hold the client state and performs pending + operations on the client when reconnection with same `client_id` + happens. Local queue state is also held to retransmit packets after reconnection. required: false default: Default::default - name: inflight_messages @@ -594,11 +658,15 @@ MqttConfig: required: false - name: tls.client_cert field_type: String - comments: Path to client's certificate file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the certificate content. + comments: |- + Path to client's certificate file (PEM). Required for client authentication. + Can be a file path under fs:// or a string with the certificate content. required: false - name: tls.client_key field_type: String - comments: Path to client's private key file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the private key content. + comments: |- + Path to client's private key file (PEM). Required for client authentication. + Can be a file path under fs:// or a string with the private key content. required: false - name: topic field_type: String @@ -757,7 +825,11 @@ SnowflakeConfig: - s3.bucket_name - name: snowflake.s3_path field_type: String - comments: The optional s3 path to be specified the actual file location would be `s3:////` if this field is specified by user(s) otherwise it would be `s3:///` + comments: |- + The optional s3 path to be specified + the actual file location would be `s3:////` + if this field is specified by user(s) + otherwise it would be `s3:///` required: false alias: - s3.path @@ -847,7 +919,12 @@ StarrocksConfig: default: 30 * 1000 - name: commit_checkpoint_interval field_type: u64 - comments: Set this option to a positive integer n, RisingWave will try to commit data to Starrocks at every n checkpoints by leveraging the [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), also, in this time, the `sink_decouple` option should be enabled as well. Defaults to 1 if commit_checkpoint_interval <= 0 + comments: |- + Set this option to a positive integer n, RisingWave will try to commit data + to Starrocks at every n checkpoints by leveraging the + [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), + also, in this time, the `sink_decouple` option should be enabled as well. + Defaults to 1 if commit_checkpoint_interval <= 0 required: false default: Default::default - name: starrocks.partial_update diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 8d808f526bf88..4a208465265e7 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -1,4 +1,5 @@ # THIS FILE IS AUTO_GENERATED. DO NOT EDIT +# UPDATE WITH: ./risedev generate-with-options DatagenProperties: fields: @@ -8,12 +9,23 @@ DatagenProperties: required: false - name: datagen.rows.per.second field_type: u64 - comments: default_rows_per_second =10 when the split_num = 3 and default_rows_per_second =10 there will be three readers that generate respectively 4,3,3 message per second + comments: |- + default_rows_per_second =10 + when the split_num = 3 and default_rows_per_second =10 + there will be three readers that generate respectively 4,3,3 message per second required: false default: '10' - name: fields field_type: HashMap - comments: 'Some connector options of the datagen source''s fields for example: create datagen source with column v1 int, v2 float ''fields.v1.kind''=''sequence'', ''fields.v1.start''=''1'', ''fields.v1.end''=''1000'', ''fields.v2.kind''=''random'', datagen will create v1 by self-incrementing from 1 to 1000 datagen will create v2 by randomly generating from default_min to default_max' + comments: |- + Some connector options of the datagen source's fields + for example: create datagen source with column v1 int, v2 float + 'fields.v1.kind'='sequence', + 'fields.v1.start'='1', + 'fields.v1.end'='1000', + 'fields.v2.kind'='random', + datagen will create v1 by self-incrementing from 1 to 1000 + datagen will create v2 by randomly generating from default_min to default_max required: false GcsProperties: fields: @@ -79,13 +91,21 @@ KafkaProperties: fields: - name: bytes.per.second field_type: String - comments: This parameter is not intended to be exposed to users. This parameter specifies only for one parallelism. The parallelism of kafka source is equal to the parallelism passed into compute nodes. So users need to calculate how many bytes will be consumed in total across all the parallelism by themselves. + comments: |- + This parameter is not intended to be exposed to users. + This parameter specifies only for one parallelism. The parallelism of kafka source + is equal to the parallelism passed into compute nodes. So users need to calculate + how many bytes will be consumed in total across all the parallelism by themselves. required: false alias: - kafka.bytes.per.second - name: max.num.messages field_type: String - comments: This parameter is not intended to be exposed to users. This parameter specifies only for one parallelism. The parallelism of kafka source is equal to the parallelism passed into compute nodes. So users need to calculate how many messages will be consumed in total across all the parallelism by themselves. + comments: |- + This parameter is not intended to be exposed to users. + This parameter specifies only for one parallelism. The parallelism of kafka source + is equal to the parallelism passed into compute nodes. So users need to calculate + how many messages will be consumed in total across all the parallelism by themselves. required: false alias: - kafka.max.num.messages @@ -100,9 +120,27 @@ KafkaProperties: alias: - kafka.time.offset - scan.startup.timestamp_millis + - name: group.id.prefix + field_type: String + comments: |- + Specify a custom consumer group id prefix for the source. + Defaults to `rw-consumer`. + + Notes: + - Each job (materialized view) will have a separated consumer group and + contains a generated suffix in the group id. + The consumer group will be `{group_id_prefix}-{fragment_id}`. + - The consumer group is solely for monintoring progress in some external + Kafka tools, and for authorization. RisingWave does not rely on committed + offsets, and does not join the consumer group. It just reports offsets + to the group. + required: false - name: upsert field_type: String - comments: 'This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which combine both key and value fields of the Kafka message. TODO: Currently, `Option` can not be parsed here.' + comments: |- + This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which + combine both key and value fields of the Kafka message. + TODO: Currently, `Option` can not be parsed here. required: false - name: properties.bootstrap.server field_type: String @@ -120,7 +158,9 @@ KafkaProperties: default: 'Duration :: from_secs (5)' - name: properties.security.protocol field_type: String - comments: Security protocol used for RisingWave to communicate with Kafka brokers. Could be PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. + comments: |- + Security protocol used for RisingWave to communicate with Kafka brokers. Could be + PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. required: false - name: properties.ssl.endpoint.identification.algorithm field_type: String @@ -179,11 +219,19 @@ KafkaProperties: required: false - name: properties.message.max.bytes field_type: usize - comments: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's max.message.bytes limit + comments: |- + Maximum Kafka protocol request message size. Due to differing framing overhead between + protocol versions the producer is unable to reliably enforce a strict max message limit at + produce time and may exceed the maximum size by one message in protocol ProduceRequests, + the broker will enforce the the topic's max.message.bytes limit required: false - name: properties.receive.message.max.bytes field_type: usize - comments: Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. + comments: |- + Maximum Kafka protocol response message size. This serves as a safety precaution to avoid + memory exhaustion in case of protocol hickups. This value must be at least fetch.max.bytes + + 512 to allow for protocol overhead; the value is adjusted automatically unless the + configuration property is explicitly set. required: false - name: properties.statistics.interval.ms field_type: usize @@ -197,26 +245,48 @@ KafkaProperties: required: false - name: properties.queued.min.messages field_type: usize - comments: Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue. + comments: |- + Minimum number of messages per topic+partition librdkafka tries to maintain in the local + consumer queue. required: false - name: properties.queued.max.messages.kbytes field_type: usize required: false - name: properties.fetch.wait.max.ms field_type: usize - comments: Maximum time the broker may wait to fill the Fetch response with `fetch.min.`bytes of messages. + comments: |- + Maximum time the broker may wait to fill the Fetch response with `fetch.min.`bytes of + messages. required: false - name: properties.fetch.queue.backoff.ms field_type: usize - comments: How long to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (`queued.min.messages` or `queued.max.messages.kbytes`) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization. + comments: |- + How long to postpone the next fetch request for a topic+partition in case the current fetch + queue thresholds (`queued.min.messages` or `queued.max.messages.kbytes`) have been + exceeded. This property may need to be decreased if the queue thresholds are set low + and the application is experiencing long (~1s) delays between messages. Low values may + increase CPU utilization. required: false - name: properties.fetch.max.bytes field_type: usize - comments: Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker topic config). `fetch.max.bytes` is automatically adjusted upwards to be at least `message.max.bytes` (consumer config). + comments: |- + Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in + batches by the consumer and if the first message batch in the first non-empty partition of + the Fetch request is larger than this value, then the message batch will still be returned + to ensure the consumer can make progress. The maximum message batch size accepted by the + broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker + topic config). `fetch.max.bytes` is automatically adjusted upwards to be at least + `message.max.bytes` (consumer config). required: false - name: properties.enable.auto.commit field_type: bool - comments: 'Whether to automatically and periodically commit offsets in the background. Note that RisingWave does NOT rely on committed offsets. Committing offset is only for exposing the progress for monitoring. Setting this to false can avoid creating consumer groups. default: true' + comments: |- + Whether to automatically and periodically commit offsets in the background. + + Note that RisingWave does NOT rely on committed offsets. Committing offset is only for exposing the + progress for monitoring. Setting this to false can avoid creating consumer groups. + + default: true required: false - name: broker.rewrite.endpoints field_type: BTreeMap @@ -323,17 +393,26 @@ MongodbCommon: required: true - name: collection.name field_type: String - comments: The collection name where data should be written to or read from. For sinks, the format is `db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field` for more information. + comments: |- + The collection name where data should be written to or read from. For sinks, the format is + `db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field` + for more information. required: true MqttProperties: fields: - name: url field_type: String - comments: The url of the broker to connect to. e.g. tcp://localhost. Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, to denote the protocol for establishing a connection with the broker. `mqtts://`, `ssl://` will use the native certificates if no ca is specified + comments: |- + The url of the broker to connect to. e.g. tcp://localhost. + Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, + to denote the protocol for establishing a connection with the broker. + `mqtts://`, `ssl://` will use the native certificates if no ca is specified required: true - name: qos field_type: QualityOfService - comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once + comments: |- + The quality of service to use when publishing messages. Defaults to at_most_once. + Could be at_most_once, at_least_once or exactly_once required: false - name: username field_type: String @@ -345,11 +424,19 @@ MqttProperties: required: false - name: client_prefix field_type: String - comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave + comments: |- + Prefix for the mqtt client id. + The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave required: false - name: clean_start field_type: bool - comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.' + comments: |- + `clean_start = true` removes all the state from queues & instructs the broker + to clean all the client state when client disconnects. + + When set `false`, broker will hold the client state and performs pending + operations on the client when reconnection with same `client_id` + happens. Local queue state is also held to retransmit packets after reconnection. required: false default: Default::default - name: inflight_messages @@ -362,11 +449,15 @@ MqttProperties: required: false - name: tls.client_cert field_type: String - comments: Path to client's certificate file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the certificate content. + comments: |- + Path to client's certificate file (PEM). Required for client authentication. + Can be a file path under fs:// or a string with the certificate content. required: false - name: tls.client_key field_type: String - comments: Path to client's private key file (PEM). Required for client authentication. Can be a file path under fs:// or a string with the private key content. + comments: |- + Path to client's private key file (PEM). Required for client authentication. + Can be a file path under fs:// or a string with the private key content. required: false - name: topic field_type: String @@ -374,7 +465,9 @@ MqttProperties: required: true - name: qos field_type: MqttQualityOfService - comments: The quality of service to use when publishing messages. Defaults to at_most_once. Could be at_most_once, at_least_once or exactly_once + comments: |- + The quality of service to use when publishing messages. Defaults to at_most_once. + Could be at_most_once, at_least_once or exactly_once required: false NatsProperties: fields: @@ -648,27 +741,55 @@ PubsubProperties: fields: - name: pubsub.subscription field_type: String - comments: 'Pub/Sub subscription to consume messages from. Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV (shared between the actors of its fragment). Otherwise, different MVs on the same Source will both receive part of the messages. TODO: check and enforce this on Meta.' + comments: |- + Pub/Sub subscription to consume messages from. + + Note that we rely on Pub/Sub to load-balance messages between all Readers pulling from + the same subscription. So one `subscription` (i.e., one `Source`) can only used for one MV + (shared between the actors of its fragment). + Otherwise, different MVs on the same Source will both receive part of the messages. + TODO: check and enforce this on Meta. required: true - name: pubsub.emulator_host field_type: String - comments: use the connector with a pubsub emulator + comments: |- + use the connector with a pubsub emulator + required: false - name: pubsub.credentials field_type: String - comments: '`credentials` is a JSON string containing the service account credentials. See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).' + comments: |- + `credentials` is a JSON string containing the service account credentials. + See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). + The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). required: false - name: pubsub.start_offset.nanos field_type: String - comments: '`start_offset` is a numeric timestamp, ideally the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: |- + `start_offset` is a numeric timestamp, ideally the publish timestamp of a message + in the subscription. If present, the connector will attempt to seek the subscription + to the timestamp and start consuming from there. Note that the seek operation is + subject to limitations around the message retention policy of the subscription. See + [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for + more details. required: false - name: pubsub.start_snapshot field_type: String - comments: '`start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism in pub/sub because they guarantee retention of: - All unacknowledged messages at the time of their creation. - All messages created after their creation. Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: |- + `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek + to the snapshot before starting consumption. Snapshots are the preferred seeking mechanism + in pub/sub because they guarantee retention of: + - All unacknowledged messages at the time of their creation. + - All messages created after their creation. + Besides retention guarantees, snapshots are also more precise than timestamp-based seeks. + See [Seeking to a snapshot](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for + more details. required: false - name: pubsub.parallelism field_type: u32 - comments: '`parallelism` is the number of parallel consumers to run for the subscription. TODO: use system parallelism if not set' + comments: |- + `parallelism` is the number of parallel consumers to run for the subscription. + TODO: use system parallelism if not set required: false PulsarProperties: fields: