Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create topic #5

Open
wants to merge 38 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f6faa6e
Add action to create a kafka topic
copart-jafloyd Feb 24, 2021
4a60775
Include the runner type
copart-jafloyd Feb 24, 2021
204600a
Update kafka-python to latest 2.0
copart-jafloyd Mar 4, 2021
5d951b3
Add create_topic action based on kafka-python
copart-jafloyd Mar 5, 2021
7ddd53e
Add the actions/lib dir
copart-jafloyd Mar 5, 2021
cbf3ee3
gitignore
thsubramani May 14, 2021
0d3ad4c
topic related action operation
thsubramani May 20, 2021
de13537
added meta data files
thsubramani Jun 30, 2021
65b61be
Merge branch 'master' of https://github.com/copartit/stackstorm-kafka…
thsubramani Jun 30, 2021
f97afa1
fix type string
thsubramani Jul 9, 2021
a6b9fe8
fix parameters
thsubramani Jul 10, 2021
9a7d7be
required parmaeter client options
thsubramani Jul 10, 2021
3eba98b
fix the entrypoint
thsubramani Jul 10, 2021
7f01c8b
fix the entrypoint
thsubramani Jul 10, 2021
6baa9c2
description fix
thsubramani Jul 10, 2021
b43d432
fix role binding
thsubramani Jul 15, 2021
e98e4f2
rename create actions
thsubramani Jul 15, 2021
fb9b2c5
role
thsubramani Jul 15, 2021
9a572cc
fix
thsubramani Jul 16, 2021
b650b63
fix
thsubramani Jul 16, 2021
51f7505
fix
thsubramani Jul 16, 2021
11765c4
fix
thsubramani Jul 16, 2021
112c57e
consumer and remove rolebinding
thsubramani Aug 3, 2021
0b80943
fix duration as optional params
thsubramani Aug 6, 2021
0edb926
fixed time based offset rest issue
thsubramani Nov 12, 2021
cb92b8f
fixed action name
thsubramani Nov 12, 2021
0abfbad
fix to list role bindings
thsubramani Nov 12, 2021
149a8f6
fixed list role
thsubramani Nov 12, 2021
53ef8f8
fix order of parameters
thsubramani Nov 13, 2021
3def3e1
fix order of parameters
thsubramani Nov 13, 2021
3b5eee5
fixed datetime parameter missing
thsubramani Nov 13, 2021
cfb758d
fixed for time duration
thsubramani Nov 18, 2021
4afbb7f
fixed role binding output
thsubramani Nov 18, 2021
19fca48
fixed format new line
thsubramani Nov 18, 2021
da9a3c3
single tab format
thsubramani Nov 18, 2021
e165cb8
fixed offset reset format
thsubramani Nov 19, 2021
e86e22b
fix --
thsubramani Nov 19, 2021
ca9542a
fixed to look for prefixed/existing rolebindings
thsubramani Jun 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Byte-compiled / optimized / DLL files
.idea/
__pycache__/
*.py[cod]
*$py.class
Expand All @@ -16,6 +17,7 @@ downloads/
eggs/
.eggs/
lib/
!actions/lib/
lib64/
parts/
sdist/
Expand Down
Empty file added actions/__init__.py
Empty file.
174 changes: 174 additions & 0 deletions actions/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from typing import Dict, Optional
from kafka.protocol.admin import Response

from kafka.errors import CommitFailedError

from lib.base_admin_action import BaseAdminAction

from kafka import KafkaConsumer
from kafka.structs import TopicPartition, OffsetAndMetadata

import datetime
from datetime import timezone
import pytz


class ConsumerGroupAction(BaseAdminAction):
def run(
self,
kafka_broker,
kafka_broker_port,
topic,
group_id,
to_datetime=None,
by_duration=None,
shift_by=None,
to_earliest=None,
to_latest=None,
to_offset=None,
client_options=None, # to support misc auth methods
):

try:
consumer = KafkaConsumer(group_id=group_id, bootstrap_servers=f"{kafka_broker}:{kafka_broker_port}", **client_options)
if topic.find(':') != -1:
topic, input_partitions = topic.split(":")
input_partitions = input_partitions.split(',') if input_partitions else None
else:
input_partitions = None

partitions = []
offset_dict = {}
if not input_partitions:
# get all partitions
partition_info = consumer.partitions_for_topic(topic)
input_partitions = list(partition_info) if partition_info else None

if input_partitions:
for input_partition in input_partitions:
partitions.append(TopicPartition(topic, int(input_partition)))

if not partitions:
return "seems to be Invalid topic. so no partition exists"

# specific offsets
if to_offset:
for topic_partition in partitions:
offset_dict[topic_partition] = to_offset
# shift by n
elif shift_by:
for topic_partition in partitions:
offset_int = consumer.committed(topic_partition)
print(offset_int)
offset_int = offset_int + int(shift_by) if int(shift_by) > 0 else offset_int - abs(int(shift_by))
offset_dict[topic_partition] = offset_int

# Beginning offsets
elif to_earliest:
offset_dict = consumer.beginning_offsets(partitions)
# timestamp offsets duration
elif by_duration:
dt = (datetime.datetime.now() - datetime.timedelta(days=by_duration)).astimezone(pytz.utc)
timestamp = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
partition_timestamp = {}
for topic_partition in partitions:
partition_timestamp[topic_partition] = timestamp
offset_timestamps = consumer.offsets_for_times(partition_timestamp)

for topic_partition, offset_timestamp in offset_timestamps.items():
offset_dict[topic_partition] = offset_timestamp.offset
# offset_int = consumer.offsets_for_times({partition: timestamp})[partition].offset

# specific timestamp
elif to_datetime:
# date_time_str = '2021-07-28T06:00:00.000Z'
date_time_str = to_datetime
dt = datetime.datetime.strptime(date_time_str, '%Y-%m-%dT%H:%M:%S.%fZ')
#timestamp = dt.replace(tzinfo=timezone.utc).timestamp()
#print(type(timestamp))
timestamp = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
partition_timestamp = {}
for topic_partition in partitions:
partition_timestamp[topic_partition] = timestamp
offset_timestamps = consumer.offsets_for_times(partition_timestamp)

for topic_partition, offset_timestamp in offset_timestamps.items():
offset_dict[topic_partition] = offset_timestamp.offset
# offset_int = consumer.offsets_for_times({partition: timestamp})[partition].offset

# latest offsets reset
elif to_latest:
offset_dict = consumer.end_offsets(partitions)

consumer.assign(partitions)
end_offsets = consumer.end_offsets(partitions)
# print(end_offsets)
offsets = {}
resp_output = []
if offset_dict:
for (topic_partition, offset_int) in offset_dict.items():
offsets[topic_partition] = OffsetAndMetadata(offset_int, '')
old_offset = consumer.committed(topic_partition)
end_offset = end_offsets[topic_partition]
resp_output.append({"TOPIC": topic_partition.topic,
"PARTITION": topic_partition.partition,
"OLD_OFFSET": old_offset,
"NEW_OFFSET": offset_int,
"END_OFFSET": end_offset,
"LAG": self.human_format(end_offset - offset_int)})
consumer.commit(offsets) # type: Response
return self.format_array(resp_output)
except CommitFailedError:
return "consumer group is still active"


def format_array(self, obj):
first = True
rows = ""
for index, line in enumerate(obj):
if first:
coloumns = "\nINDEX \t" + ' \t'.join(map(str, line.keys()))
coloumns = coloumns + "\n" + "-" * len(coloumns)
first = False
rows = rows + '\n' + str(index+1) + ' \t' + ' \t'.join(map(str, line.values()))
return coloumns + rows

def human_format(self, num):
num = float('{:.3g}'.format(num))
magnitude = 0
while abs(num) >= 1000:
magnitude += 1
num /= 1000.0
return '{}{}'.format('{:f}'.format(num).rstrip('0').rstrip('.'), ['', 'K', 'M', 'B', 'T'][magnitude])

if __name__ == "__main__":
import os
KAFKA_BROKER = os.environ.get("KAFKA_BROKER", 'c-kafka-qa4-rn.copart.com')
KAFKA_BROKER_PORT = os.environ.get("KAFKA_BROKER_PORT", 9092)
KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME")
KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD")
TOPIC = os.environ.get("TOPIC", "qa4_lots:2,3")

CLIENT_OPTIONS = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-256",
"sasl_plain_username": KAFKA_USERNAME,
"sasl_plain_password": KAFKA_PASSWORD,
"ssl_check_hostname": False
}

action = ConsumerGroupAction()

res = action.run(
kafka_broker=KAFKA_BROKER,
kafka_broker_port=KAFKA_BROKER_PORT,
topic=TOPIC,
group_id="test-consumer",
to_datetime="2021-11-12T15:41:00.000Z",
#to_latest=True,
client_options=CLIENT_OPTIONS,
)
import pprint

pp = pprint.PrettyPrinter(indent=4)
print(res)
56 changes: 56 additions & 0 deletions actions/consumer_group.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
name: consumer_group
description: Consumer groups offset resets using the kafka api on a kafka broker server
enabled: true
runner_type: python-script
entry_point: consumer.py

# doc strings come from the apache licensed confluent-kafka python package
parameters:
topic:
description: consumer topic with partition as topic:2,3
type: string
required: true
group_id:
description: consumer group id or name
type: string
required: true
to_datetime:
description: reset offset to specific UTC format timestamp
type: string
required: false
by_duration:
description: reset offset to specific days duration
type: integer
required: false
shift_by:
description: shift offset by n
type: integer
required: false
to_earliest:
description: reset offsets to beginning
type: boolean
required: false
to_latest:
description: reset offsets to latest
type: boolean
required: false
default: true
to_offset:
description: offset number
type: integer
required: false
kafka_broker:
description: A kafka broker (server) to talk to
type: string
required: true
kafka_broker_port:
description: The port that the kafka broker is listening on
type: integer
default: 9092
client_options:
description: Connection options like `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, etc.
type: object
required: true
default: {}
additionalProperties: true
58 changes: 58 additions & 0 deletions actions/create_rolebindings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from rbac.mds import MetaDataService

from lib.base_admin_action import BaseAdminAction


class CreateRoleBindingAction(BaseAdminAction):
def run(
self,
kafka_broker,
mds_port,
principal,
role,
resource,
pattern_type,
mds_username,
mds_password,
):
mds = MetaDataService(mds_username, mds_password, kafka_broker, port=mds_port)

list_resources = mds.list_rolebinding_resources(principal, role, format=False)
for resources in list_resources:
exist_resource = resources['resourceType'] + ':' + resources['name']
if exist_resource == resource and resources['patternType'] == pattern_type:
return f"Role bindings already exists: {exist_resource}|{principal}|{pattern_type}|{role}"
elif resource.startswith(exist_resource) and resources['patternType'] == pattern_type:
return f"Role bindings already prefixed. check pattern type/name: {exist_resource}|{principal}|{pattern_type}|{role}"

return mds.create_rolebinding_resources(principal, role, resource, pattern_type)


if __name__ == "__main__":
import os

KAFKA_BROKER = os.environ.get("KAFKA_BROKER", 'c-kafka-qa4-rn.copart.com')
MDS_PORT = os.environ.get("MDS_PORT", 8090)
MDS_USERNAME = os.environ.get("MDS_USERNAME")
MDS_PASSWORD = os.environ.get("MDS_PASSWORD")
PRINCIPAL = os.environ.get("PRINCIPAL", "User:test-qa-consumer")
RESOURCE = os.environ.get("RESOURCE", "Topic:testktable")
ROLE = os.environ.get("ROLE", "DeveloperRead")
PATTERN_TYPE = os.environ.get("PATTERN_TYPE", "LITERAL")

action = CreateRoleBindingAction()

res = action.run(
kafka_broker=KAFKA_BROKER,
mds_port=MDS_PORT,
principal=PRINCIPAL,
role=ROLE,
resource=RESOURCE,
pattern_type=PATTERN_TYPE,
mds_username=MDS_USERNAME,
mds_password=MDS_PASSWORD
)
import pprint

pp = pprint.PrettyPrinter(indent=4)
pp.pprint(res)
42 changes: 42 additions & 0 deletions actions/create_rolebindings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
name: create_rolebindings
description: create role binding resoures using the kafka MDS api on a kafka broker server
enabled: true
runner_type: python-script
entry_point: create_rolebindings.py

# doc strings come from the apache licensed confluent-kafka python package
parameters:
principal:
description: principal name
type: string
required: true
role:
description: role
type: string
required: true
resource:
description: resource
type: string
required: true
pattern_type:
description: type
type: string
required: true

kafka_broker:
description: A kafka broker (server) to talk to
type: string
required: true
mds_port:
description: The port that the kafka MDS is listening on
type: integer
default: 9092
mds_username:
description: LDAP username
type: string
required: true
mds_password:
description: LDAP password
type: string
required: true
Loading