Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add integration tests for running remote invoke on kinesis service #6072

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions tests/integration/remote/invoke/remote_invoke_integ_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ def create_resources_and_boto_clients(cls):
cls.stepfunctions_client = boto_client_provider("stepfunctions")
cls.xray_client = boto_client_provider("xray")
cls.sqs_client = boto_client_provider("sqs")
cls.kinesis_client = boto_client_provider("kinesis")

def get_kinesis_records(self, shard_id, sequence_number, stream_name):
"""Helper function to get kinesis records using the provided shard_id and sequence_number

Parameters
----------
shard_id: string
Shard Id to fetch the record from
sequence_number: string
Sequence number to get the record for
stream_name: string
Name of the kinesis stream to get records from
Returns
-------
list
Returns a list of records received from the kinesis data stream
"""
response = self.kinesis_client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_SEQUENCE_NUMBER",
StartingSequenceNumber=sequence_number,
)
shard_iter = response["ShardIterator"]
response = self.kinesis_client.get_records(ShardIterator=shard_iter, Limit=1)
records = response["Records"]

return records

@staticmethod
def get_command_list(
Expand Down
227 changes: 226 additions & 1 deletion tests/integration/remote/invoke/test_remote_invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid
import base64
import time
import math

from parameterized import parameterized
from unittest import skip
Expand Down Expand Up @@ -319,6 +320,129 @@ def test_invoke_boto_parameters(self):
self.assertEqual(received_message.get("MessageAttributes"), message_attributes)


@skip("Skip remote invoke Kinesis integration tests")
@pytest.mark.xdist_group(name="sam_remote_invoke_kinesis_resource_priority")
class TestKinesisPriorityInvoke(RemoteInvokeIntegBase):
template = Path("template-kinesis-priority.yaml")

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.stack_name = f"{cls.__name__}-{uuid.uuid4().hex}"
cls.create_resources_and_boto_clients()
cls.stream_name = cls.stack_resource_summaries["KinesisStream"].physical_resource_id

def test_invoke_empty_event_provided(self):
command_list = self.get_command_list(stack_name=self.stack_name)
expected_message_body = "{}"

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, expected_message_body)

@parameterized.expand([('{"foo": "bar"}'), ('"Hello World"'), '{"hello": "world", "foo": 1, "bar": {}}'])
def test_invoke_with_event_provided(self, event):
command_list = self.get_command_list(
stack_name=self.stack_name,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_invoke_with_event_file_provided(self):
event_file_path = str(self.events_folder_path.joinpath("default_event.json"))
command_list = self.get_command_list(stack_name=self.stack_name, event_file=event_file_path)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
with open(event_file_path, "r") as f:
expected_message = f.read()
self.assertEqual(received_data, expected_message)

def test_invoke_with_physical_id_provided_as_resource_id(self):
event = '{"foo": "bar"}'
command_list = self.get_command_list(
resource_id=self.stream_name,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_invoke_boto_parameters(self):
event = '{"foo": "bar"}'
command_list = self.get_command_list(
stack_name=self.stack_name,
event=event,
parameter_list=[
(
"PartitionKey",
"override-partition-key",
),
("SequenceNumberForOrdering", "0"),
],
output="json",
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)
self.assertIn("ResponseMetadata", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)


@pytest.mark.xdist_group(name="sam_remote_invoke_multiple_resources")
class TestMultipleResourcesInvoke(RemoteInvokeIntegBase):
template = Path("template-multiple-resources.yaml")
Expand Down Expand Up @@ -557,7 +681,7 @@ def test_sqs_invoke_with_resource_id_provided_as_arn(self):
self.assertEqual(given_message, received_message.get("Body"))
self.assertEqual(received_message["MessageId"], remote_invoke_result_stdout["MessageId"])

def test_invoke_boto_parameters_fifo_queue(self):
def test_sqs_invoke_boto_parameters_fifo_queue(self):
given_message = "Hello World"
resource_logical_id = "MyFIFOSQSQueue"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
Expand Down Expand Up @@ -600,6 +724,107 @@ def test_invoke_boto_parameters_fifo_queue(self):
# Message id will be the same as it got deduped and a new message was not created
self.assertEqual(received_message["MessageId"], remote_invoke_result_stdout["MessageId"])

def test_kinesis_invoke_with_resource_id_and_stack_name(self):
resource_logical_id = "KinesisStream"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported")
event = '{"foo": "bar"}'
stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id

command_list = self.get_command_list(
resource_id=resource_logical_id,
stack_name=self.stack_name,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_kinesis_invoke_with_resource_id_provided_as_arn(self):
resource_logical_id = "KinesisStream"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported")

stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id
output = self.cfn_client.describe_stacks(StackName=self.stack_name)
kinesis_stream_arn = None
for detail in output["Stacks"][0]["Outputs"]:
if detail["OutputKey"] == "KinesisStreamArn":
kinesis_stream_arn = detail["OutputValue"]

event = '{"foo": "bar"}'
command_list = self.get_command_list(
resource_id=kinesis_stream_arn,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_kinesis_invoke_with_boto_parameters(self):
# ExplicitHashKey can be used to specify the shard to put the record to if the hashkey provided is
# between the start and end range of that shard's hash key range.
resource_logical_id = "KinesisStream"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported")
stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id

describe_stream_response = self.kinesis_client.describe_stream(StreamName=stream_name)
shards = describe_stream_response["StreamDescription"]["Shards"]
specific_shard = shards[2]
start_hash_key = int(specific_shard["HashKeyRange"]["StartingHashKey"])
end_hash_key = int(specific_shard["HashKeyRange"]["EndingHashKey"])
explicit_hash_key = math.floor((start_hash_key + end_hash_key) / 2)

event = '{"foo": "bar"}'
command_list = self.get_command_list(
resource_id=resource_logical_id,
stack_name=self.stack_name,
event=event,
parameter_list=[
("ExplicitHashKey", str(explicit_hash_key)),
],
output="json",
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)
self.assertEqual(remote_invoke_result_stdout["ShardId"], specific_shard["ShardId"])
self.assertIn("ResponseMetadata", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)


@pytest.mark.xdist_group(name="sam_remote_invoke_nested_resources")
class TestNestedTemplateResourcesInvoke(RemoteInvokeIntegBase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: AWS Kinesis Data Stream

Resources:
# Define an AWS Kinesis Data Stream
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

Outputs:
# Kinesis Data Stream name
KinesisStream:
Description: Kinesis Data Stream name
Value: !Ref KinesisStream
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ Resources:
Type: AWS::SQS::Queue
Properties:
FifoQueue: true

# Define an AWS Kinesis Data Stream
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 3

Outputs:
MySQSQueueArn:
Value: !GetAtt MySQSQueue.Arn
Value: !GetAtt MySQSQueue.Arn
KinesisStreamArn:
Description: Kinesis Data Stream name
Value: !GetAtt KinesisStream.Arn
Loading