diff --git a/.gitignore b/.gitignore index 604612d..2c9027d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Byte-compiled / optimized / DLL files +.idea/ __pycache__/ *.py[cod] *$py.class @@ -16,6 +17,7 @@ downloads/ eggs/ .eggs/ lib/ +!actions/lib/ lib64/ parts/ sdist/ diff --git a/actions/__init__.py b/actions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/actions/consumer.py b/actions/consumer.py new file mode 100644 index 0000000..9494d1e --- /dev/null +++ b/actions/consumer.py @@ -0,0 +1,174 @@ +from typing import Dict, Optional +from kafka.protocol.admin import Response + +from kafka.errors import CommitFailedError + +from lib.base_admin_action import BaseAdminAction + +from kafka import KafkaConsumer +from kafka.structs import TopicPartition, OffsetAndMetadata + +import datetime +from datetime import timezone +import pytz + + +class ConsumerGroupAction(BaseAdminAction): + def run( + self, + kafka_broker, + kafka_broker_port, + topic, + group_id, + to_datetime=None, + by_duration=None, + shift_by=None, + to_earliest=None, + to_latest=None, + to_offset=None, + client_options=None, # to support misc auth methods + ): + + try: + consumer = KafkaConsumer(group_id=group_id, bootstrap_servers=f"{kafka_broker}:{kafka_broker_port}", **client_options) + if topic.find(':') != -1: + topic, input_partitions = topic.split(":") + input_partitions = input_partitions.split(',') if input_partitions else None + else: + input_partitions = None + + partitions = [] + offset_dict = {} + if not input_partitions: + # get all partitions + partition_info = consumer.partitions_for_topic(topic) + input_partitions = list(partition_info) if partition_info else None + + if input_partitions: + for input_partition in input_partitions: + partitions.append(TopicPartition(topic, int(input_partition))) + + if not partitions: + return "seems to be Invalid topic. so no partition exists" + + # specific offsets + if to_offset: + for topic_partition in partitions: + offset_dict[topic_partition] = to_offset + # shift by n + elif shift_by: + for topic_partition in partitions: + offset_int = consumer.committed(topic_partition) + print(offset_int) + offset_int = offset_int + int(shift_by) if int(shift_by) > 0 else offset_int - abs(int(shift_by)) + offset_dict[topic_partition] = offset_int + + # Beginning offsets + elif to_earliest: + offset_dict = consumer.beginning_offsets(partitions) + # timestamp offsets duration + elif by_duration: + dt = (datetime.datetime.now() - datetime.timedelta(days=by_duration)).astimezone(pytz.utc) + timestamp = dt.replace(tzinfo=timezone.utc).timestamp() * 1000 + partition_timestamp = {} + for topic_partition in partitions: + partition_timestamp[topic_partition] = timestamp + offset_timestamps = consumer.offsets_for_times(partition_timestamp) + + for topic_partition, offset_timestamp in offset_timestamps.items(): + offset_dict[topic_partition] = offset_timestamp.offset + # offset_int = consumer.offsets_for_times({partition: timestamp})[partition].offset + + # specific timestamp + elif to_datetime: + # date_time_str = '2021-07-28T06:00:00.000Z' + date_time_str = to_datetime + dt = datetime.datetime.strptime(date_time_str, '%Y-%m-%dT%H:%M:%S.%fZ') + #timestamp = dt.replace(tzinfo=timezone.utc).timestamp() + #print(type(timestamp)) + timestamp = dt.replace(tzinfo=timezone.utc).timestamp() * 1000 + partition_timestamp = {} + for topic_partition in partitions: + partition_timestamp[topic_partition] = timestamp + offset_timestamps = consumer.offsets_for_times(partition_timestamp) + + for topic_partition, offset_timestamp in offset_timestamps.items(): + offset_dict[topic_partition] = offset_timestamp.offset + # offset_int = consumer.offsets_for_times({partition: timestamp})[partition].offset + + # latest offsets reset + elif to_latest: + offset_dict = consumer.end_offsets(partitions) + + consumer.assign(partitions) + end_offsets = consumer.end_offsets(partitions) + # print(end_offsets) + offsets = {} + resp_output = [] + if offset_dict: + for (topic_partition, offset_int) in offset_dict.items(): + offsets[topic_partition] = OffsetAndMetadata(offset_int, '') + old_offset = consumer.committed(topic_partition) + end_offset = end_offsets[topic_partition] + resp_output.append({"TOPIC": topic_partition.topic, + "PARTITION": topic_partition.partition, + "OLD_OFFSET": old_offset, + "NEW_OFFSET": offset_int, + "END_OFFSET": end_offset, + "LAG": self.human_format(end_offset - offset_int)}) + consumer.commit(offsets) # type: Response + return self.format_array(resp_output) + except CommitFailedError: + return "consumer group is still active" + + + def format_array(self, obj): + first = True + rows = "" + for index, line in enumerate(obj): + if first: + coloumns = "\nINDEX \t" + ' \t'.join(map(str, line.keys())) + coloumns = coloumns + "\n" + "-" * len(coloumns) + first = False + rows = rows + '\n' + str(index+1) + ' \t' + ' \t'.join(map(str, line.values())) + return coloumns + rows + + def human_format(self, num): + num = float('{:.3g}'.format(num)) + magnitude = 0 + while abs(num) >= 1000: + magnitude += 1 + num /= 1000.0 + return '{}{}'.format('{:f}'.format(num).rstrip('0').rstrip('.'), ['', 'K', 'M', 'B', 'T'][magnitude]) + +if __name__ == "__main__": + import os + KAFKA_BROKER = os.environ.get("KAFKA_BROKER", 'c-kafka-qa4-rn.copart.com') + KAFKA_BROKER_PORT = os.environ.get("KAFKA_BROKER_PORT", 9092) + KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME") + KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD") + TOPIC = os.environ.get("TOPIC", "qa4_lots:2,3") + + CLIENT_OPTIONS = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-256", + "sasl_plain_username": KAFKA_USERNAME, + "sasl_plain_password": KAFKA_PASSWORD, + "ssl_check_hostname": False + } + + action = ConsumerGroupAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + kafka_broker_port=KAFKA_BROKER_PORT, + topic=TOPIC, + group_id="test-consumer", + to_datetime="2021-11-12T15:41:00.000Z", + #to_latest=True, + client_options=CLIENT_OPTIONS, + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + print(res) diff --git a/actions/consumer_group.yaml b/actions/consumer_group.yaml new file mode 100644 index 0000000..bec0eb6 --- /dev/null +++ b/actions/consumer_group.yaml @@ -0,0 +1,56 @@ +--- +name: consumer_group +description: Consumer groups offset resets using the kafka api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: consumer.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + topic: + description: consumer topic with partition as topic:2,3 + type: string + required: true + group_id: + description: consumer group id or name + type: string + required: true + to_datetime: + description: reset offset to specific UTC format timestamp + type: string + required: false + by_duration: + description: reset offset to specific days duration + type: integer + required: false + shift_by: + description: shift offset by n + type: integer + required: false + to_earliest: + description: reset offsets to beginning + type: boolean + required: false + to_latest: + description: reset offsets to latest + type: boolean + required: false + default: true + to_offset: + description: offset number + type: integer + required: false + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + kafka_broker_port: + description: The port that the kafka broker is listening on + type: integer + default: 9092 + client_options: + description: Connection options like `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, etc. + type: object + required: true + default: {} + additionalProperties: true diff --git a/actions/create_rolebindings.py b/actions/create_rolebindings.py new file mode 100644 index 0000000..5656ad5 --- /dev/null +++ b/actions/create_rolebindings.py @@ -0,0 +1,58 @@ +from rbac.mds import MetaDataService + +from lib.base_admin_action import BaseAdminAction + + +class CreateRoleBindingAction(BaseAdminAction): + def run( + self, + kafka_broker, + mds_port, + principal, + role, + resource, + pattern_type, + mds_username, + mds_password, + ): + mds = MetaDataService(mds_username, mds_password, kafka_broker, port=mds_port) + + list_resources = mds.list_rolebinding_resources(principal, role, format=False) + for resources in list_resources: + exist_resource = resources['resourceType'] + ':' + resources['name'] + if exist_resource == resource and resources['patternType'] == pattern_type: + return f"Role bindings already exists: {exist_resource}|{principal}|{pattern_type}|{role}" + elif resource.startswith(exist_resource) and resources['patternType'] == pattern_type: + return f"Role bindings already prefixed. check pattern type/name: {exist_resource}|{principal}|{pattern_type}|{role}" + + return mds.create_rolebinding_resources(principal, role, resource, pattern_type) + + +if __name__ == "__main__": + import os + + KAFKA_BROKER = os.environ.get("KAFKA_BROKER", 'c-kafka-qa4-rn.copart.com') + MDS_PORT = os.environ.get("MDS_PORT", 8090) + MDS_USERNAME = os.environ.get("MDS_USERNAME") + MDS_PASSWORD = os.environ.get("MDS_PASSWORD") + PRINCIPAL = os.environ.get("PRINCIPAL", "User:test-qa-consumer") + RESOURCE = os.environ.get("RESOURCE", "Topic:testktable") + ROLE = os.environ.get("ROLE", "DeveloperRead") + PATTERN_TYPE = os.environ.get("PATTERN_TYPE", "LITERAL") + + action = CreateRoleBindingAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + mds_port=MDS_PORT, + principal=PRINCIPAL, + role=ROLE, + resource=RESOURCE, + pattern_type=PATTERN_TYPE, + mds_username=MDS_USERNAME, + mds_password=MDS_PASSWORD + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/create_rolebindings.yaml b/actions/create_rolebindings.yaml new file mode 100644 index 0000000..6a27fe0 --- /dev/null +++ b/actions/create_rolebindings.yaml @@ -0,0 +1,42 @@ +--- +name: create_rolebindings +description: create role binding resoures using the kafka MDS api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: create_rolebindings.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + principal: + description: principal name + type: string + required: true + role: + description: role + type: string + required: true + resource: + description: resource + type: string + required: true + pattern_type: + description: type + type: string + required: true + + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + mds_port: + description: The port that the kafka MDS is listening on + type: integer + default: 9092 + mds_username: + description: LDAP username + type: string + required: true + mds_password: + description: LDAP password + type: string + required: true \ No newline at end of file diff --git a/actions/create_topic.py b/actions/create_topic.py new file mode 100644 index 0000000..febdfc5 --- /dev/null +++ b/actions/create_topic.py @@ -0,0 +1,72 @@ +from typing import Dict, Optional + +from kafka.admin import NewTopic +from kafka.protocol.admin import Response + +from kafka.errors import TopicAlreadyExistsError + +from lib.base_admin_action import BaseAdminAction + + +class CreateTopicAction(BaseAdminAction): + def run( + self, + kafka_broker, + kafka_broker_port, + topic, + partitions, + replication_factor, + replica_assignment=None, # type: Optional[Dict[str, int]] + topic_config=None, + client_options=None, # to support misc auth methods + ): + if replica_assignment: + replica_assignment = {int(k): int(v) for k, v in replica_assignment.items()} + client = self.admin_client(kafka_broker, kafka_broker_port, client_options) + + new_topic = NewTopic( + topic, + num_partitions=partitions, + replication_factor=replication_factor, + replica_assignments=replica_assignment, + topic_configs=topic_config, + ) + try: + response = client.create_topics([new_topic]) # type: Response + return "Topic %s is created" % topic + except TopicAlreadyExistsError: + return "Topic %s is already exists" % topic + + +if __name__ == "__main__": + import os + + KAFKA_BROKER = os.environ.get("KAFKA_BROKER") + KAFKA_BROKER_PORT = os.environ.get("KAFKA_BROKER_PORT") + KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME") + KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD") + TOPIC = os.environ.get("TOPIC") + + CLIENT_OPTIONS = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-256", + "sasl_plain_username": KAFKA_USERNAME, + "sasl_plain_password": KAFKA_PASSWORD, + } + + action = CreateTopicAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + kafka_broker_port=KAFKA_BROKER_PORT, + topic=TOPIC, + partitions=1, + replication_factor=1, + replica_assignment=None, + topic_config=None, + client_options=CLIENT_OPTIONS, + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/create_topic.yaml b/actions/create_topic.yaml new file mode 100644 index 0000000..832ded1 --- /dev/null +++ b/actions/create_topic.yaml @@ -0,0 +1,48 @@ +--- +name: create_topic +description: Create a topic using the kafka api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: create_topic.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + topic: + description: The topic to create + type: string + required: true + partitions: + description: Number of partitions to create + type: integer + default: 1 + replication_factor: + description: Replication factor of partitions, or -1 if replica_assignment is used. + type: integer + default: 2 + required: false + replica_assignment: + description: mapping from partition id to the assigned replica + type: object + additionalProperties: + type: integer + topic_config: + description: Dict (str:str) of topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs + type: object + additionalProperties: + type: string + required: false + + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + kafka_broker_port: + description: The port that the kafka broker is listening on + type: integer + default: 9092 + client_options: + description: Connection options like `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, etc. + type: object + required: true + default: {} + additionalProperties: true diff --git a/actions/delete_topic.py b/actions/delete_topic.py new file mode 100644 index 0000000..38114a8 --- /dev/null +++ b/actions/delete_topic.py @@ -0,0 +1,49 @@ +from typing import Dict, Optional +from kafka.protocol.admin import Response +from lib.base_admin_action import BaseAdminAction +from kafka.errors import UnknownTopicOrPartitionError + +class CreateTopicAction(BaseAdminAction): + def run( + self, + kafka_broker, + kafka_broker_port, + topic, + client_options=None, # to support misc auth methods + ): + client = self.admin_client(kafka_broker, kafka_broker_port, client_options) + try: + response = client.delete_topics([topic]) # type: Response + return "Topic %s deleted" % topic + except UnknownTopicOrPartitionError: + return "Unknown Topic %s" % topic + + +if __name__ == "__main__": + import os + + KAFKA_BROKER = os.environ.get("KAFKA_BROKER") + KAFKA_BROKER_PORT = os.environ.get("KAFKA_BROKER_PORT") + KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME") + KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD") + TOPIC = os.environ.get("TOPIC", "test") + + CLIENT_OPTIONS = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-256", + "sasl_plain_username": KAFKA_USERNAME, + "sasl_plain_password": KAFKA_PASSWORD, + } + + action = CreateTopicAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + kafka_broker_port=KAFKA_BROKER_PORT, + topic=TOPIC, + client_options=CLIENT_OPTIONS, + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/delete_topic.yaml b/actions/delete_topic.yaml new file mode 100644 index 0000000..9a3887b --- /dev/null +++ b/actions/delete_topic.yaml @@ -0,0 +1,28 @@ +--- +name: delete_topic +description: Delete a topic using the kafka api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: delete_topic.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + topic: + description: The topic to delete + type: string + required: true + + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + kafka_broker_port: + description: The port that the kafka broker is listening on + type: integer + default: 9092 + client_options: + description: Connection options like `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, etc. + type: object + required: true + default: {} + additionalProperties: true diff --git a/actions/lib/__init__.py b/actions/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/actions/lib/base_admin_action.py b/actions/lib/base_admin_action.py new file mode 100644 index 0000000..409ed5d --- /dev/null +++ b/actions/lib/base_admin_action.py @@ -0,0 +1,27 @@ +import abc +import six + +from st2common.runners.base_action import Action +from kafka.admin import KafkaAdminClient + + +@six.add_metaclass(abc.ABCMeta) +class BaseAdminAction(Action): + DEFAULT_CLIENT_ID = "st2-kafka-admin" + + def admin_client( + self, + kafka_broker, + kafka_broker_port, + client_options=None, # to support misc auth methods + ): + if not client_options: + client_options = {} + client = KafkaAdminClient( + bootstrap_servers=["%s:%s" % (kafka_broker, kafka_broker_port)], + client_id=self.config.get("client_id") or self.DEFAULT_CLIENT_ID, + **client_options + ) + return client + + diff --git a/actions/list_consumer_groups.py b/actions/list_consumer_groups.py new file mode 100644 index 0000000..22b8ee1 --- /dev/null +++ b/actions/list_consumer_groups.py @@ -0,0 +1,50 @@ +from typing import Dict, Optional +from kafka.protocol.admin import Response +from lib.base_admin_action import BaseAdminAction + + +class ListConsumerGroupAction(BaseAdminAction): + def run( + self, + kafka_broker, + kafka_broker_port, + broker_ids=None, + client_options=None, # to support misc auth methods + ): + + client = self.admin_client(kafka_broker, kafka_broker_port, client_options) + + print(broker_ids) + + response = client.list_consumer_groups(broker_ids) # type: Response + return response + + +if __name__ == "__main__": + import os + + KAFKA_BROKER = os.environ.get("KAFKA_BROKER",'c-kafka-qa4-rn.copart.com') + KAFKA_BROKER_PORT = os.environ.get("KAFKA_BROKER_PORT", 9092) + KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME", 'admin') + KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD", 'admin-secret') + + CLIENT_OPTIONS = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-256", + "sasl_plain_username": KAFKA_USERNAME, + "sasl_plain_password": KAFKA_PASSWORD, + "ssl_check_hostname": False + } + + action = ListConsumerGroupAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + kafka_broker_port=KAFKA_BROKER_PORT, + broker_ids=None, + client_options=CLIENT_OPTIONS, + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/list_consumer_groups.yaml b/actions/list_consumer_groups.yaml new file mode 100644 index 0000000..c821848 --- /dev/null +++ b/actions/list_consumer_groups.yaml @@ -0,0 +1,23 @@ +--- +name: list_consumer_groups +description: List consumer groups using the kafka api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: list_consumer_groups.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + kafka_broker_port: + description: The port that the kafka broker is listening on + type: integer + default: 9092 + client_options: + description: Connection options like `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, etc. + type: object + required: true + default: {} + additionalProperties: true diff --git a/actions/list_rolebindings.py b/actions/list_rolebindings.py new file mode 100644 index 0000000..6270025 --- /dev/null +++ b/actions/list_rolebindings.py @@ -0,0 +1,45 @@ +from rbac.mds import MetaDataService + +from typing import Dict, Optional + +from lib.base_admin_action import BaseAdminAction + + +class ListRoleBindingAction(BaseAdminAction): + def run( + self, + kafka_broker, + mds_port, + principal, + mds_username, + mds_password, + role=None + ): + mds = MetaDataService(mds_username, mds_password, kafka_broker, port=mds_port) + return mds.list_rolebinding_resources(principal, role) + + +if __name__ == "__main__": + import os + + KAFKA_BROKER = os.environ.get("KAFKA_BROKER",'c-kafka-qa4-rn.copart.com') + MDS_PORT = os.environ.get("MDS_PORT", 8090) + MDS_USERNAME = os.environ.get("MDS_USERNAME") + MDS_PASSWORD = os.environ.get("MDS_PASSWORD") + PRINCIPAL = os.environ.get("PRINCIPAL", "User:test-qa-consumer") + ROLE = os.environ.get("ROLE", "DeveloperRead") + + action = ListRoleBindingAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + mds_port=MDS_PORT, + principal=PRINCIPAL, + mds_username=MDS_USERNAME, + mds_password=MDS_PASSWORD, + role=ROLE, + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/list_rolebindings.yaml b/actions/list_rolebindings.yaml new file mode 100644 index 0000000..bf8dcaf --- /dev/null +++ b/actions/list_rolebindings.yaml @@ -0,0 +1,34 @@ +--- +name: list_rolebindings +description: List role binding resoures using the kafka MDS api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: list_rolebindings.py +parameters: + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + mds_port: + description: The port that the kafka MDS is listening on + type: integer + default: 8090 + required: false + principal: + description: principal + type: string + required: true + role: + description: role + type: string + required: false + + mds_username: + description: MDS username (LDAP) + type: string + required: true + + mds_password: + description: MDS password (LDAP) + type: string + required: true diff --git a/actions/list_topic.py b/actions/list_topic.py new file mode 100644 index 0000000..974fb98 --- /dev/null +++ b/actions/list_topic.py @@ -0,0 +1,42 @@ +from kafka.protocol.admin import Response + +from lib.base_admin_action import BaseAdminAction + + +class CreateTopicAction(BaseAdminAction): + def run( + self, + kafka_broker, + kafka_broker_port, + client_options=None, # to support misc auth methods + ): + client = self.admin_client(kafka_broker, kafka_broker_port, client_options) + response = client.list_topics() # type: Response + return response + + +if __name__ == "__main__": + import os + KAFKA_BROKER = os.environ.get("KAFKA_BROKER") + KAFKA_BROKER_PORT = os.environ.get("KAFKA_BROKER_PORT") + KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME") + KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD") + + CLIENT_OPTIONS = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-256", + "sasl_plain_username": KAFKA_USERNAME, + "sasl_plain_password": KAFKA_PASSWORD, + } + + action = CreateTopicAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + kafka_broker_port=KAFKA_BROKER_PORT, + client_options=CLIENT_OPTIONS, + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/list_topic.yaml b/actions/list_topic.yaml new file mode 100644 index 0000000..e300966 --- /dev/null +++ b/actions/list_topic.yaml @@ -0,0 +1,23 @@ +--- +name: list_topic +description: list a topics using the kafka api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: list_topic.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + kafka_broker_port: + description: The port that the kafka broker is listening on + type: integer + default: 9092 + client_options: + description: Connection options like `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, etc. + type: object + required: true + default: {} + additionalProperties: true diff --git a/actions/produce.py b/actions/produce.py index ad45397..0758ce5 100644 --- a/actions/produce.py +++ b/actions/produce.py @@ -1,6 +1,21 @@ +import six + from st2common.runners.base_action import Action -from kafka import SimpleProducer, KafkaClient -from kafka.util import kafka_bytestring +from kafka import KafkaProducer +from kafka.producer.future import RecordMetadata + + +# copied from kafka-python <1.0 +def kafka_bytestring(s): + """ + Takes a string or bytes instance + Returns bytes, encoding strings in utf-8 as necessary + """ + if isinstance(s, six.binary_type): + return s + if isinstance(s, six.string_types): + return s.encode('utf-8') + raise TypeError(s) class ProduceMessageAction(Action): @@ -36,10 +51,13 @@ def run(self, topic, message, hosts=None): # set default for empty value _client_id = self.config.get('client_id') or self.DEFAULT_CLIENT_ID - client = KafkaClient(_hosts, client_id=_client_id) - client.ensure_topic_exists(topic) - producer = SimpleProducer(client) - result = producer.send_messages(topic, kafka_bytestring(message)) - - if result[0]: - return result[0]._asdict() + producer = KafkaProducer( + bootstrap_servers=_hosts.split(','), + client_id=_client_id, + # TODO: Support security_protocol + sasl_mechanism + sasl_* + ) + future = producer.send(topic=topic, value=kafka_bytestring(message)) + producer.flush() + if not future.failed(): + result = future.get() # type: RecordMetadata + return result._asdict() diff --git a/actions/rbac/__init__.py b/actions/rbac/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/actions/rbac/mds.py b/actions/rbac/mds.py new file mode 100644 index 0000000..e4635f0 --- /dev/null +++ b/actions/rbac/mds.py @@ -0,0 +1,162 @@ +import requests +import json +import base64 + + +class MetaDataService: + def __init__(self, user, password, env, port=8090): + self.base_url = f'http://{env}:{port}/security/1.0' + self.base_decode = f"{user}:{password}" + self.basic = base64.b64encode(self.base_decode.encode("ascii")).decode("ascii") + self.bearer_token = self.authenticate() + self.headers = { + "Authorization": f"Bearer {self.bearer_token}", + "content-type": "application/json", + } + self.cluster_id = self.get_cluster_id() + self.cluster_data = {"clusters": {"kafka-cluster": self.cluster_id}} + self.pattern_types = ['PREFIXED', 'CONFLUENT_ONLY_TENANT_MATCH', 'MATCH', 'CONFLUENT_ALL_TENANT_ANY', + 'CONFLUENT_ALL_TENANT_PREFIXED', 'CONFLUENT_ALL_TENANT_LITERAL', 'UNKNOWN', 'ANY', 'LITERAL'] + + def authenticate(self): + headers = { + "Authorization": f"Basic {self.basic}", + "content-type": "application/json", + } + try: + r = requests.get(f"{self.base_url}/authenticate", headers=headers) + return json.loads(r.content)['auth_token'] + except json.decoder.JSONDecodeError: + return "authentication issue" + + def get_cluster_id(self): + try: + r = requests.get(f"{self.base_url}/metadataClusterId", headers=self.headers) + return r.content.decode('ascii') + except: + return None + + def get_rolenames(self): + try: + # print(self.headers) + r = requests.get(f"{self.base_url}/roleNames", headers=self.headers) + return json.loads(r.content) + except json.decoder.JSONDecodeError: + return None + + def list_rolebinding_resources(self, principal, rolename=None, format=True): + + if rolename: + role_names = self.get_rolenames() + if role_names and rolename not in role_names: + return "Invalid roleName" + + r = requests.post(f"{self.base_url}/principals/{principal}/roles/{rolename}/resources", headers=self.headers, data=json.dumps(self.cluster_data)) + resp = json.loads(r.content) + update_resource = [] + for dict_obj in resp: + dict_obj['Principal'] = principal + dict_obj['Role'] = rolename + update_resource.append(dict_obj) + if format: + resource = self.format_array(update_resource) + else: + resource = resp + else: + r = requests.post(f"{self.base_url}/lookup/principal/{principal}/resources", + headers=self.headers, data=json.dumps(self.cluster_data)) + resource = json.loads(r.content) + + if format: + resource = self.format_array(self.format_dict(resource)) + + if not len(resource): + return "either User/Group or role bindings does not exists" + return resource + + def create_rolebinding_resources(self, principal, rolename , resource, pattern_type='LITERAL'): + if pattern_type not in self.pattern_types: + return "Invalid pattern type" + role_names = self.get_rolenames() + if role_names and rolename not in role_names: + return "Invalid roleName" + + resource_type, name = resource.split(":") + data = {"scope": self.cluster_data, + "resourcePatterns": [ + { + "resourceType": resource_type, + "name": name, + "patternType": pattern_type + }] + } + + # print(data) + r = requests.post(f"{self.base_url}/principals/{principal}/roles/{rolename}/bindings", headers=self.headers, + data=json.dumps(data)) + if r.status_code == 204: + return "role binding created" + else: + return r.content + + def delete_rolebinding_resources(self, principal, rolename , resource, pattern_type='LITERAL'): + if pattern_type not in self.pattern_types: + return "Invalid pattern type" + role_names = self.get_rolenames() + if role_names and rolename not in role_names: + return "Invalid roleName" + + resource_type, name = resource.split(":") + data = {"scope": self.cluster_data, + "resourcePatterns": [ + { + "resourceType": resource_type, + "name": name, + "patternType": pattern_type + }] + } + + # print(data) + r = requests.delete(f"{self.base_url}/principals/{principal}/roles/{rolename}/bindings", headers=self.headers, + data=json.dumps(data)) + if r.status_code == 204: + return "role binding deleted" + else: + return r.content + + def format_array(self, obj): + first = True + rows = "" + for index, line in enumerate(obj): + if first: + coloumns = "\nIndex\t" + '\t'.join(map(str, line.keys())) + coloumns = coloumns + "\n" + "-" * 100 + first = False + rows = rows + '\n' + str(index+1) + '\t' + '\t'.join(map(str, line.values())) + return coloumns + rows + + def format_dict(self, obj, resp=[], new_key={}): + for key, value in obj.items(): + if key.startswith('User:') or key.startswith('Group:'): + new_key['Principal'] = key + if key.startswith('Developer'): + new_key['Role'] = key + if type(value).__name__ == "dict": + self.format_dict(value, resp, new_key) + elif type(value).__name__ == "list": + for dict_obj in value: + resp.append({**dict_obj, **new_key}) + return resp + + +if __name__ == "__main__": + mds = MetaDataService('xx', 'xx', "c-kafka-qa4.copart.com") + # mds.authenticate() + # print(mds.bearer_token) + # print(mds.get_clusterid()) + #print(mds.cluster_id) + #print(mds.get_roleNames()) + print(mds.list_rolebinding_resources("User:test-qa-consumer")) + #print(mds.create_rolebinding_resources("User:test-qa-consumer", "DeveloperRead", 'Topic:testktable', "PREFIXED")) + + diff --git a/actions/remote_create_topic.yaml b/actions/remote_create_topic.yaml new file mode 100644 index 0000000..41ec407 --- /dev/null +++ b/actions/remote_create_topic.yaml @@ -0,0 +1,23 @@ +--- +name: remote_create_topic +pack: kafka +description: Create a topic using the kafka cli on a kafka broker server +enabled: true +runner_type: remote-shell-cmd +entry_point: "" +parameters: + topic: + description: The topic to create + type: string + required: true + host: + description: The kafka broker host with kafka cli installed + type: string + required: true + zookeeper: + description: The zookeeper connection string (like "host:port,host:port") for old style consumbers + type: string + default: "" + cmd: + default: "kafka-topics {% if zookeeper %}--zookeeper {{ zookeeper }}{% endif %} --create --topic '{{ topic }}'" + immutable: true diff --git a/actions/remove_rolebindings.py b/actions/remove_rolebindings.py new file mode 100644 index 0000000..3af3246 --- /dev/null +++ b/actions/remove_rolebindings.py @@ -0,0 +1,49 @@ +from rbac.mds import MetaDataService + +from lib.base_admin_action import BaseAdminAction + + +class RemoveRoleBindingAction(BaseAdminAction): + def run( + self, + kafka_broker, + mds_port, + principal, + role, + resource, + pattern_type, + mds_username, + mds_password, + ): + mds = MetaDataService(mds_username, mds_password, kafka_broker, port=mds_port) + return mds.delete_rolebinding_resources(principal, role, resource, pattern_type) + + +if __name__ == "__main__": + import os + + KAFKA_BROKER = os.environ.get("KAFKA_BROKER",'c-kafka-qa4-rn.copart.com') + MDS_PORT = os.environ.get("MDS_PORT", 8090) + MDS_USERNAME = os.environ.get("MDS_USERNAME") + MDS_PASSWORD = os.environ.get("MDS_PASSWORD") + PRINCIPAL = os.environ.get("PRINCIPAL", "User:test-qa-consumer") + RESOURCE = os.environ.get("RESOURCE", "Topic:testktable") + ROLE = os.environ.get("ROLE", "DeveloperRead") + PATTERN_TYPE = os.environ.get("PATTERN_TYPE", "LITERAL") + + action = RemoveRoleBindingAction() + + res = action.run( + kafka_broker=KAFKA_BROKER, + mds_port=MDS_PORT, + principal=PRINCIPAL, + role=ROLE, + resource=RESOURCE, + pattern_type=PATTERN_TYPE, + mds_username=MDS_USERNAME, + mds_password=MDS_PASSWORD + ) + import pprint + + pp = pprint.PrettyPrinter(indent=4) + pp.pprint(res) diff --git a/actions/remove_rolebindings.yaml b/actions/remove_rolebindings.yaml new file mode 100644 index 0000000..4c98277 --- /dev/null +++ b/actions/remove_rolebindings.yaml @@ -0,0 +1,42 @@ +--- +name: remove_rolebindings +description: create role binding resoures using the kafka MDS api on a kafka broker server +enabled: true +runner_type: python-script +entry_point: remove_rolebindings.py + +# doc strings come from the apache licensed confluent-kafka python package +parameters: + principal: + description: principal name + type: string + required: true + role: + description: role + type: string + required: true + resource: + description: resource + type: string + required: true + pattern_type: + description: type + type: string + required: true + + kafka_broker: + description: A kafka broker (server) to talk to + type: string + required: true + mds_port: + description: The port that the kafka MDS is listening on + type: integer + default: 9092 + mds_username: + description: LDAP username + type: string + required: true + mds_password: + description: LDAP password + type: string + required: true \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e2dbc6e..81e0640 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -kafka-python>=0.9.4,<1.0 +kafka-python>=2.0,<2.1