Skip to content

Commit

Permalink
[KIP-848] Added support for testing with new 'consumer' group protocol.
Browse files Browse the repository at this point in the history
* [KIP-848] Added support for testing with new 'consumer' group protocol.

* Build fixes

* updated trivup

* Updated trivup install path

* Fixed failing test

* Style fix

* Added more tests to be run with the new protocol

* Fixed failing tests

* Added Test common for common functionalities

* Enabling SR again

* Style fixes

* Some refactoring

* Added consumer protocol integration tests in semaphore

* Ignoring failing admin tests

* Fix typo

* Fixed failing test case

* Added new fixure for single broker and using this fixure for test_serializer tests

* Build fixes

* Fixed transiet test failures for proto

* Fixed another test

* Added Test*Consumer classes instead of functions

* Build issue

* Added common TestUtils

* Using specific commit for trivup

* Removed trivup 0.12.5

* PR comments

* Style check

* Skipping one list offsets assert for Zookeeper

* 1) Moved sleep after result and assert.
2) Added a function to create a topic and wait for propogation.

* Using create_topic_and_wait_propogation instead of create_topic function

* Internally using create_topic in create_topic_and_wait_propogation

* Removed kafka single broker cluster fixure

* Removed unnecessary import time

* Using broker version 3.8.0 for classic protocol and enabled test which was failing in 3.7.0

* Changed fixure scope to session

* Style fixes
  • Loading branch information
pranavrth committed Oct 14, 2024
1 parent f354a7a commit 7dda52f
Show file tree
Hide file tree
Showing 31 changed files with 368 additions and 196 deletions.
10 changes: 9 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,20 @@ blocks:
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
jobs:
- name: Build
- name: Build and Tests with 'classic' group protocol
commands:
- sem-version python 3.8
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- tools/source-package-verification.sh
- name: Build and Tests with 'consumer' group protocol
commands:
- sem-version python 3.8
# use a virtualenv
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- tools/source-package-verification.sh
- name: "Source package verification with Python 3 (Linux arm64)"
dependencies: []
Expand Down
61 changes: 61 additions & 0 deletions tests/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
from confluent_kafka import Consumer, DeserializingConsumer
from confluent_kafka.avro import AvroConsumer

_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL'
_TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE'


def _update_conf_group_protocol(conf=None):
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
conf['group.protocol'] = 'consumer'


def _trivup_cluster_type_kraft():
return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft'


class TestUtils:
@staticmethod
def use_kraft():
return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft()

@staticmethod
def use_group_protocol_consumer():
return _GROUP_PROTOCOL_ENV in os.environ and os.environ[_GROUP_PROTOCOL_ENV] == 'consumer'


class TestConsumer(Consumer):
def __init__(self, conf=None, **kwargs):
_update_conf_group_protocol(conf)
super(TestConsumer, self).__init__(conf, **kwargs)


class TestDeserializingConsumer(DeserializingConsumer):
def __init__(self, conf=None, **kwargs):
_update_conf_group_protocol(conf)
super(TestDeserializingConsumer, self).__init__(conf, **kwargs)


class TestAvroConsumer(AvroConsumer):
def __init__(self, conf=None, **kwargs):
_update_conf_group_protocol(conf)
super(TestAvroConsumer, self).__init__(conf, **kwargs)
31 changes: 20 additions & 11 deletions tests/integration/admin/test_basic_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import confluent_kafka
import struct
import time
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState

from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState, KafkaError
from confluent_kafka.admin import (NewPartitions, ConfigResource,
AclBinding, AclBindingFilter, ResourceType,
ResourcePatternType, AclOperation, AclPermissionType)
Expand Down Expand Up @@ -58,6 +58,8 @@ def verify_admin_acls(admin_client,
for acl_binding, f in fs.items():
f.result() # trigger exception if there was an error

time.sleep(1)

acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY,
None, None, AclOperation.ANY, AclPermissionType.ANY)
acl_binding_filter2 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.PREFIXED,
Expand All @@ -83,6 +85,8 @@ def verify_admin_acls(admin_client,
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
expected_acl_bindings)

time.sleep(1)

#
# Delete the ACLs with TOPIC and GROUP
#
Expand All @@ -94,6 +98,9 @@ def verify_admin_acls(admin_client,
assert deleted_acl_bindings == expected, \
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
expected)

time.sleep(1)

#
# All the ACLs should have been deleted
#
Expand Down Expand Up @@ -201,14 +208,14 @@ def test_basic_operations(kafka_cluster):
# Second iteration: create topic.
#
for validate in (True, False):
our_topic = kafka_cluster.create_topic(topic_prefix,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
},
validate_only=validate
)
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
},
validate_only=validate
)

admin_client = kafka_cluster.admin()

Expand Down Expand Up @@ -270,7 +277,7 @@ def consume_messages(group_id, num_messages=None):
print('Read all the required messages: exiting')
break
except ConsumeError as e:
if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF:
if msg is not None and e.code == KafkaError._PARTITION_EOF:
print('Reached end of %s [%d] at offset %d' % (
msg.topic(), msg.partition(), msg.offset()))
eof_reached[(msg.topic(), msg.partition())] = True
Expand Down Expand Up @@ -345,6 +352,8 @@ def verify_config(expconfig, configs):
fs = admin_client.alter_configs([resource])
fs[resource].result() # will raise exception on failure

time.sleep(1)

#
# Read the config back again and verify.
#
Expand Down
31 changes: 16 additions & 15 deletions tests/integration/admin/test_delete_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def test_delete_records(kafka_cluster):
admin_client = kafka_cluster.admin()

# Create a topic with a single partition
topic = kafka_cluster.create_topic("test-del-records",
{
"num_partitions": 1,
"replication_factor": 1,
})
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
{
"num_partitions": 1,
"replication_factor": 1,
})

# Create Producer instance
p = kafka_cluster.producer()
Expand Down Expand Up @@ -73,16 +73,17 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster):
admin_client = kafka_cluster.admin()
num_partitions = 3
# Create two topics with a single partition
topic = kafka_cluster.create_topic("test-del-records",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})
topic2 = kafka_cluster.create_topic("test-del-records2",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})
topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})

topics = [topic, topic2]
partitions = list(range(num_partitions))
# Create Producer instance
Expand Down
30 changes: 21 additions & 9 deletions tests/integration/admin/test_describe_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import pytest

from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
ResourcePatternType, AclOperation, AclPermissionType)
from confluent_kafka.error import ConsumeError
from confluent_kafka import ConsumerGroupState, TopicCollection

from tests.common import TestUtils

topic_prefix = "test-topic"


Expand Down Expand Up @@ -82,10 +86,12 @@ def perform_admin_operation_sync(operation, *arg, **kwargs):

def create_acls(admin_client, acl_bindings):
perform_admin_operation_sync(admin_client.create_acls, acl_bindings)
time.sleep(1)


def delete_acls(admin_client, acl_binding_filters):
perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters)
time.sleep(1)


def verify_provided_describe_for_authorized_operations(
Expand Down Expand Up @@ -115,6 +121,7 @@ def verify_provided_describe_for_authorized_operations(
acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL,
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
create_acls(admin_client, [acl_binding])
time.sleep(1)

# Check with updated authorized operations
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
Expand All @@ -126,6 +133,7 @@ def verify_provided_describe_for_authorized_operations(
acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY,
None, None, AclOperation.ANY, AclPermissionType.ANY)
delete_acls(admin_client, [acl_binding_filter])
time.sleep(1)
return desc


Expand Down Expand Up @@ -196,20 +204,24 @@ def test_describe_operations(sasl_cluster):

# Create Topic
topic_config = {"compression.type": "gzip"}
our_topic = sasl_cluster.create_topic(topic_prefix,
{
"num_partitions": 1,
"config": topic_config,
"replication_factor": 1,
},
validate_only=False
)
our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix,
{
"num_partitions": 1,
"config": topic_config,
"replication_factor": 1,
},
validate_only=False
)

# Verify Authorized Operations in Describe Topics
verify_describe_topics(admin_client, our_topic)

# Verify Authorized Operations in Describe Groups
verify_describe_groups(sasl_cluster, admin_client, our_topic)
# Skip this test if using group protocol `consumer`
# as there is new RPC for describe_groups() in
# group protocol `consumer` case.
if not TestUtils.use_group_protocol_consumer():
verify_describe_groups(sasl_cluster, admin_client, our_topic)

# Delete Topic
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)
30 changes: 18 additions & 12 deletions tests/integration/admin/test_incremental_alter_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time

from confluent_kafka.admin import ConfigResource, \
ConfigEntry, ResourceType, \
AlterConfigOpType
Expand Down Expand Up @@ -52,18 +54,18 @@ def test_incremental_alter_configs(kafka_cluster):
num_partitions = 2
topic_config = {"compression.type": "gzip"}

our_topic = kafka_cluster.create_topic(topic_prefix,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
})
our_topic2 = kafka_cluster.create_topic(topic_prefix2,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
})
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
})
our_topic2 = kafka_cluster.create_topic_and_wait_propogation(topic_prefix2,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
})

admin_client = kafka_cluster.admin()

Expand Down Expand Up @@ -103,6 +105,8 @@ def test_incremental_alter_configs(kafka_cluster):

assert_operation_succeeded(fs, 2)

time.sleep(1)

#
# Get current topic config
#
Expand Down Expand Up @@ -134,6 +138,8 @@ def test_incremental_alter_configs(kafka_cluster):

assert_operation_succeeded(fs, 1)

time.sleep(1)

#
# Get current topic config
#
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/admin/test_list_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
from confluent_kafka import TopicPartition, IsolationLevel
from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec


def test_list_offsets(kafka_cluster):
Expand All @@ -27,11 +27,11 @@ def test_list_offsets(kafka_cluster):
admin_client = kafka_cluster.admin()

# Create a topic with a single partition
topic = kafka_cluster.create_topic("test-topic-verify-list-offsets",
{
"num_partitions": 1,
"replication_factor": 1,
})
topic = kafka_cluster.create_topic_and_wait_propogation("test-topic-verify-list-offsets",
{
"num_partitions": 1,
"replication_factor": 1,
})

# Create Producer instance
p = kafka_cluster.producer()
Expand Down
Loading

0 comments on commit 7dda52f

Please sign in to comment.