diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 7675093b2..56c99406d 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -586,6 +586,12 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::cluster_test_suite(&connection_builder).await; + #[allow(irrefutable_let_patterns)] + if let KafkaDriver::Java = driver { + // describeLogDirs is only on java driver + test_cases::describe_log_dirs(&connection_builder).await; + } + for shotover in shotovers { tokio::time::timeout( Duration::from_secs(10), diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index aad87edcb..b4fc70d5b 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,11 +3,11 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin, - KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo, - NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record, - RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, - TransactionDescription, + ConsumerGroupDescription, DescribeReplicaLogDirInfo, ExpectedResponse, IsolationLevel, + KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, + ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, + OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, + TopicPartition, TopicPartitionReplica, TransactionDescription, }, docker_compose::DockerCompose, }; @@ -1801,6 +1801,107 @@ async fn create_and_list_partition_reassignments(connection_builder: &KafkaConne ); } +// Due to `AdminClient.describeLogDirs` querying specified brokers directly, this test is specialized to a 2 shotover node, 6 kafka node cluster. +// So we call it directly from such a test, instead of including it in the standard test suite. +pub async fn describe_log_dirs(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + + // Create a topic that is replicated to every node in the cluster + admin + .create_topics_and_wait(&[ + NewTopic { + name: "describe_logs_test", + num_partitions: 1, + replication_factor: 6, + }, + NewTopic { + name: "describe_logs_test2", + num_partitions: 1, + replication_factor: 6, + }, + ]) + .await; + let producer = connection_builder.connect_producer("all", 100).await; + producer + .assert_produce( + Record { + payload: "initial", + topic_name: "describe_logs_test", + key: None, + }, + Some(0), + ) + .await; + + // describe the topic and assert contains path + let result = admin + .describe_replica_log_dirs(&[ + TopicPartitionReplica { + topic_name: "describe_logs_test".to_owned(), + partition: 0, + broker_id: 0, + }, + TopicPartitionReplica { + topic_name: "describe_logs_test".to_owned(), + partition: 0, + broker_id: 1, + }, + TopicPartitionReplica { + topic_name: "describe_logs_test2".to_owned(), + partition: 0, + broker_id: 0, + }, + ]) + .await; + + /// Assert that the path in the DescribeLogsDir response matches the custom format used by shotover. + /// This format looks like: actual-kafka-broker-id3:/original/kafka/path/here + fn assert_valid_path(info: &DescribeReplicaLogDirInfo) { + let id = info + .path + .as_ref() + .unwrap() + .strip_prefix("actual-kafka-broker-id") + .unwrap() + .strip_suffix(":/bitnami/kafka/data") + .unwrap(); + let id: i32 = id.parse().unwrap(); + assert!( + id < 6, + "There are only 6 brokers so the broker id must be between 0-5 inclusive but was instead {id}" + ); + } + + assert_eq!(result.len(), 3); + assert_valid_path( + result + .get(&TopicPartitionReplica { + topic_name: "describe_logs_test".to_owned(), + partition: 0, + broker_id: 0, + }) + .unwrap(), + ); + assert_valid_path( + result + .get(&TopicPartitionReplica { + topic_name: "describe_logs_test".to_owned(), + partition: 0, + broker_id: 1, + }) + .unwrap(), + ); + assert_valid_path( + result + .get(&TopicPartitionReplica { + topic_name: "describe_logs_test2".to_owned(), + partition: 0, + broker_id: 0, + }) + .unwrap(), + ); +} + async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 6a4c72a71..9d29fa88b 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -34,9 +34,9 @@ use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse, - DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest, - DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse, - FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + DescribeLogDirsResponse, DescribeProducersRequest, DescribeProducersResponse, + DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest, + FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, @@ -59,10 +59,10 @@ use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter, - DescribeProducersRequestSplitAndRouter, DescribeTransactionsSplitAndRouter, - ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, - OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, - ProduceRequestSplitAndRouter, RequestSplitAndRouter, + DescribeLogDirsSplitAndRouter, DescribeProducersRequestSplitAndRouter, + DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, + ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, + OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -1139,6 +1139,10 @@ The connection to the client has been closed." body: RequestBody::ListTransactions(_), .. })) => self.split_and_route_request::(request)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeLogDirs(_), + .. + })) => self.split_and_route_request::(request)?, // route to random broker Some(Frame::Kafka(KafkaFrame::Request { @@ -2225,11 +2229,19 @@ The connection to the client has been closed." } else { let drain = self.pending_requests.drain(..combine_responses).map(|x| { if let PendingRequest { - state: PendingRequestState::Received { response, .. }, + state: + PendingRequestState::Received { + response, + destination, + .. + }, .. } = x { - response + ResponseToBeCombined { + response, + destination, + } } else { unreachable!("Guaranteed by all_combined_received") } @@ -2293,12 +2305,12 @@ The connection to the client has been closed." result } - fn combine_responses(mut drain: impl Iterator) -> Result { + fn combine_responses(mut drain: impl Iterator) -> Result { // Take this response as base. // Then iterate over all remaining combined responses and integrate them into the base. let mut base = drain.next().unwrap(); - match base.frame() { + match base.response.frame() { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Fetch(base), .. @@ -2347,6 +2359,10 @@ The connection to the client has been closed." body: ResponseBody::DescribeGroups(base), .. })) => Self::combine_describe_groups(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeLogDirs(base_body), + .. + })) => Self::combine_describe_log_dirs(base.destination, base_body, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2362,20 +2378,20 @@ The connection to the client has been closed." } } - base.invalidate_cache(); + base.response.invalidate_cache(); - Ok(base) + Ok(base.response) } fn combine_fetch_responses( base_fetch: &mut FetchResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Fetch(next_fetch), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next_fetch.responses) { if let Some(base_response) = base_fetch.responses.iter_mut().find(|response| { @@ -2408,13 +2424,13 @@ The connection to the client has been closed." fn combine_list_offsets_responses( base_list_offsets: &mut ListOffsetsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListOffsets(next_list_offsets), .. - })) = next.frame() + })) = next.response.frame() { for next_topic in std::mem::take(&mut next_list_offsets.topics) { if let Some(base_topic) = base_list_offsets @@ -2448,13 +2464,13 @@ The connection to the client has been closed." fn combine_offset_for_leader_epoch_responses( base_list_offsets: &mut OffsetForLeaderEpochResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetForLeaderEpoch(next_body), .. - })) = next.frame() + })) = next.response.frame() { for next_topic in std::mem::take(&mut next_body.topics) { if let Some(base_topic) = base_list_offsets @@ -2487,7 +2503,7 @@ The connection to the client has been closed." fn combine_produce_responses( base_produce: &mut ProduceResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { let mut base_responses: HashMap = std::mem::take(&mut base_produce.responses) @@ -2498,7 +2514,7 @@ The connection to the client has been closed." if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(next_produce), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next_produce.responses) { if let Some(base_response) = base_responses.get_mut(&next_response.name) { @@ -2531,7 +2547,7 @@ The connection to the client has been closed." fn combine_delete_records( base_body: &mut DeleteRecordsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { let mut base_topics: HashMap = std::mem::take(&mut base_body.topics) @@ -2542,7 +2558,7 @@ The connection to the client has been closed." if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DeleteRecords(next_body), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next_body.topics) { if let Some(base_response) = base_topics.get_mut(&next_response.name) { @@ -2574,13 +2590,13 @@ The connection to the client has been closed." fn combine_delete_groups_responses( base_delete_groups: &mut DeleteGroupsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DeleteGroups(next_delete_groups), .. - })) = next.frame() + })) = next.response.frame() { base_delete_groups .results @@ -2593,13 +2609,13 @@ The connection to the client has been closed." fn combine_offset_fetch( base_offset_fetch: &mut OffsetFetchResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetFetch(next_offset_fetch), .. - })) = next.frame() + })) = next.response.frame() { base_offset_fetch .groups @@ -2612,13 +2628,13 @@ The connection to the client has been closed." fn combine_list_groups( base_list_groups: &mut ListGroupsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListGroups(next_list_groups), .. - })) = next.frame() + })) = next.response.frame() { base_list_groups .groups @@ -2631,7 +2647,7 @@ The connection to the client has been closed." fn combine_describe_producers( base: &mut DescribeProducersResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { let mut base_responses: HashMap = std::mem::take(&mut base.topics) @@ -2642,7 +2658,7 @@ The connection to the client has been closed." if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeProducers(next), .. - })) = next.frame() + })) = next.response.frame() { for next_response in std::mem::take(&mut next.topics) { if let Some(base_response) = base_responses.get_mut(&next_response.name) { @@ -2674,13 +2690,13 @@ The connection to the client has been closed." fn combine_describe_transactions( base: &mut DescribeTransactionsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeTransactions(next), .. - })) = next.frame() + })) = next.response.frame() { base.transaction_states .extend(std::mem::take(&mut next.transaction_states)); @@ -2692,13 +2708,13 @@ The connection to the client has been closed." fn combine_list_transactions( base_list_transactions: &mut ListTransactionsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListTransactions(next_list_transactions), .. - })) = next.frame() + })) = next.response.frame() { base_list_transactions .transaction_states @@ -2711,15 +2727,70 @@ The connection to the client has been closed." Ok(()) } + fn combine_describe_log_dirs( + base_destination: Destination, + base_body: &mut DescribeLogDirsResponse, + drain: impl Iterator, + ) -> Result<()> { + Self::prepend_destination_to_log_dir(base_destination, base_body); + + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeLogDirs(next_body), + .. + })) = next.response.frame() + { + Self::prepend_destination_to_log_dir(next.destination, next_body); + base_body + .results + .extend(std::mem::take(&mut next_body.results)); + } + } + + Ok(()) + } + + /// Rewrite the log dir paths to a custom format to allow results to be disambiguated. + /// Usually only 1 file system path can exist on a single broker machine. + /// However since shotover represents many different brokers, there can be different log dirs with identical paths associated with a single shotover instance. + /// This would be extremely confusing to a user and the client driver could assume that paths are unique. + /// So we need to alter the path to include details of which broker the path resides on. + /// + /// The downsides of this are: + /// * This leaks details of the actual kafka cluster to the user + /// * The path is no longer a valid path + /// + /// The only other possible solution I see would be to have shotover error on this request type instead. + /// But I think its reasonable to instead take these downsides and provide most of the value of this message type to the user. + /// + /// For example: + /// If the path starts as: /original/log/dir/path + /// It will become something like: actual-kafka-broker-id3:/original/log/dir/path + fn prepend_destination_to_log_dir( + destination: Destination, + body: &mut DescribeLogDirsResponse, + ) { + for result in &mut body.results { + let log_dir = result.log_dir.as_str(); + let altered_log_dir = match destination { + Destination::Id(id) => format!("actual-kafka-broker-id{id:?}:{log_dir}"), + Destination::ControlConnection => { + unreachable!("DescribeLogDirs are not sent as control connections") + } + }; + result.log_dir = StrBytes::from_string(altered_log_dir); + } + } + fn combine_describe_groups( base: &mut DescribeGroupsResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeGroups(next), .. - })) = next.frame() + })) = next.response.frame() { base.groups.extend(std::mem::take(&mut next.groups)); } @@ -2730,13 +2801,13 @@ The connection to the client has been closed." fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, - drain: impl Iterator, + drain: impl Iterator, ) -> Result<()> { for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(next_add_partitions_to_txn), .. - })) = next.frame() + })) = next.response.frame() { base_add_partitions_to_txn .results_by_transaction @@ -3879,3 +3950,8 @@ fn collect_broker_ids(shotover_nodes_by_rack: ShotoverNodesByRack) -> Vec HashMap { + transform.split_request_by_routing_to_all_brokers() + } + + fn get_request_frame(request: &mut Message) -> &mut Self::Request { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeLogDirs(request), + .. + })) => request, + _ => unreachable!(), + } + } + + fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) { + // No need to reassemble, each DescribeLogDirs is an exact clone of the original + } +} + pub struct DescribeTransactionsSplitAndRouter; impl RequestSplitAndRouter for DescribeTransactionsSplitAndRouter { diff --git a/test-helpers/src/connection/java.rs b/test-helpers/src/connection/java.rs index 25ac455c9..870bf08d0 100644 --- a/test-helpers/src/connection/java.rs +++ b/test-helpers/src/connection/java.rs @@ -363,6 +363,7 @@ impl Value { } /// Convert this java value into a native rust type + /// When T is Option, it is None when java null, otherwise Some. pub(crate) fn into_rust(self) -> T where T: DeserializeOwned + Any, diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 996010b92..7a9460905 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,9 +1,10 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ConsumerGroupDescription, - ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, - OffsetAndMetadata, OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, - RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, - TopicPartition, TopicPartitionInfo, TransactionDescription, + DescribeReplicaLogDirInfo, ExpectedResponse, ListOffsetsResultInfo, NewPartition, + NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, PartitionReassignment, + ProduceResult, ProducerState, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, + ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, + TransactionDescription, }; use crate::connection::java::{map_iterator, Jvm, Value}; use anyhow::Result; @@ -871,6 +872,39 @@ impl KafkaAdminJava { .await; } + pub async fn describe_replica_log_dirs( + &self, + topic_partitions: &[TopicPartitionReplica], + ) -> HashMap { + let topic_partitions_java = self.jvm.new_set( + "org.apache.kafka.common.TopicPartitionReplica", + topic_partitions + .iter() + .map(|topic_partition| topic_partition_replica_to_java(&self.jvm, topic_partition)) + .collect(), + ); + + let results = self + .admin + .call("describeReplicaLogDirs", vec![topic_partitions_java]) + .call_async("all", vec![]) + .await; + + map_iterator(results) + .map(|(topic_partition, log_dir_info)| { + ( + topic_partition_replica_to_rust(topic_partition), + DescribeReplicaLogDirInfo { + path: log_dir_info + .cast("org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult$ReplicaLogDirInfo") + .call("getCurrentReplicaLogDir", vec!()) + .into_rust() + }, + ) + }) + .collect() + } + pub async fn elect_leaders(&self, topic_partitions: &[TopicPartition]) -> Result<()> { let election_type = self .jvm @@ -1052,6 +1086,26 @@ fn topic_partition_to_rust(tp: Value) -> TopicPartition { } } +fn topic_partition_replica_to_java(jvm: &Jvm, tp: &TopicPartitionReplica) -> Value { + jvm.construct( + "org.apache.kafka.common.TopicPartitionReplica", + vec![ + jvm.new_string(&tp.topic_name), + jvm.new_int(tp.partition), + jvm.new_int(tp.broker_id), + ], + ) +} + +fn topic_partition_replica_to_rust(tp: Value) -> TopicPartitionReplica { + let tp = tp.cast("org.apache.kafka.common.TopicPartitionReplica"); + TopicPartitionReplica { + topic_name: tp.call("topic", vec![]).into_rust(), + partition: tp.call("partition", vec![]).into_rust(), + broker_id: tp.call("brokerId", vec![]).into_rust(), + } +} + fn offset_and_metadata_to_rust(offset_and_metadata: Value) -> OffsetAndMetadata { let offset_and_metadata = offset_and_metadata.cast("org.apache.kafka.clients.consumer.OffsetAndMetadata"); diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 635609f24..e901433b0 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -539,6 +539,18 @@ impl KafkaAdmin { } } } + pub async fn describe_replica_log_dirs( + &self, + topic_partitions: &[TopicPartitionReplica], + ) -> HashMap { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support describe_replica_log_dirs") + } + Self::Java(java) => java.describe_replica_log_dirs(topic_partitions).await, + } + } pub async fn elect_leaders(&self, topic_partitions: &[TopicPartition]) -> Result<()> { match self { @@ -630,6 +642,18 @@ pub struct TopicPartition { pub partition: i32, } +#[derive(Clone, Eq, PartialEq, Hash, Debug)] +pub struct TopicPartitionReplica { + pub topic_name: String, + pub partition: i32, + pub broker_id: i32, +} + +#[derive(Clone, Eq, PartialEq, Hash, Debug)] +pub struct DescribeReplicaLogDirInfo { + pub path: Option, +} + pub enum ResourceSpecifier<'a> { Topic(&'a str), }