Skip to content

Commit

Permalink
Add integration tests for running remote invoke on kinesis service (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hnnasit authored Oct 12, 2023
1 parent 6ccd872 commit ad9cf6f
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 2 deletions.
29 changes: 29 additions & 0 deletions tests/integration/remote/invoke/remote_invoke_integ_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ def create_resources_and_boto_clients(cls):
cls.stepfunctions_client = boto_client_provider("stepfunctions")
cls.xray_client = boto_client_provider("xray")
cls.sqs_client = boto_client_provider("sqs")
cls.kinesis_client = boto_client_provider("kinesis")

def get_kinesis_records(self, shard_id, sequence_number, stream_name):
"""Helper function to get kinesis records using the provided shard_id and sequence_number
Parameters
----------
shard_id: string
Shard Id to fetch the record from
sequence_number: string
Sequence number to get the record for
stream_name: string
Name of the kinesis stream to get records from
Returns
-------
list
Returns a list of records received from the kinesis data stream
"""
response = self.kinesis_client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_SEQUENCE_NUMBER",
StartingSequenceNumber=sequence_number,
)
shard_iter = response["ShardIterator"]
response = self.kinesis_client.get_records(ShardIterator=shard_iter, Limit=1)
records = response["Records"]

return records

@staticmethod
def get_command_list(
Expand Down
227 changes: 226 additions & 1 deletion tests/integration/remote/invoke/test_remote_invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid
import base64
import time
import math

from parameterized import parameterized
from unittest import skip
Expand Down Expand Up @@ -319,6 +320,129 @@ def test_invoke_boto_parameters(self):
self.assertEqual(received_message.get("MessageAttributes"), message_attributes)


@skip("Skip remote invoke Kinesis integration tests")
@pytest.mark.xdist_group(name="sam_remote_invoke_kinesis_resource_priority")
class TestKinesisPriorityInvoke(RemoteInvokeIntegBase):
template = Path("template-kinesis-priority.yaml")

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.stack_name = f"{cls.__name__}-{uuid.uuid4().hex}"
cls.create_resources_and_boto_clients()
cls.stream_name = cls.stack_resource_summaries["KinesisStream"].physical_resource_id

def test_invoke_empty_event_provided(self):
command_list = self.get_command_list(stack_name=self.stack_name)
expected_message_body = "{}"

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, expected_message_body)

@parameterized.expand([('{"foo": "bar"}'), ('"Hello World"'), '{"hello": "world", "foo": 1, "bar": {}}'])
def test_invoke_with_event_provided(self, event):
command_list = self.get_command_list(
stack_name=self.stack_name,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_invoke_with_event_file_provided(self):
event_file_path = str(self.events_folder_path.joinpath("default_event.json"))
command_list = self.get_command_list(stack_name=self.stack_name, event_file=event_file_path)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
with open(event_file_path, "r") as f:
expected_message = f.read()
self.assertEqual(received_data, expected_message)

def test_invoke_with_physical_id_provided_as_resource_id(self):
event = '{"foo": "bar"}'
command_list = self.get_command_list(
resource_id=self.stream_name,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_invoke_boto_parameters(self):
event = '{"foo": "bar"}'
command_list = self.get_command_list(
stack_name=self.stack_name,
event=event,
parameter_list=[
(
"PartitionKey",
"override-partition-key",
),
("SequenceNumberForOrdering", "0"),
],
output="json",
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)
self.assertIn("ResponseMetadata", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)


@pytest.mark.xdist_group(name="sam_remote_invoke_multiple_resources")
class TestMultipleResourcesInvoke(RemoteInvokeIntegBase):
template = Path("template-multiple-resources.yaml")
Expand Down Expand Up @@ -557,7 +681,7 @@ def test_sqs_invoke_with_resource_id_provided_as_arn(self):
self.assertEqual(given_message, received_message.get("Body"))
self.assertEqual(received_message["MessageId"], remote_invoke_result_stdout["MessageId"])

def test_invoke_boto_parameters_fifo_queue(self):
def test_sqs_invoke_boto_parameters_fifo_queue(self):
given_message = "Hello World"
resource_logical_id = "MyFIFOSQSQueue"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
Expand Down Expand Up @@ -600,6 +724,107 @@ def test_invoke_boto_parameters_fifo_queue(self):
# Message id will be the same as it got deduped and a new message was not created
self.assertEqual(received_message["MessageId"], remote_invoke_result_stdout["MessageId"])

def test_kinesis_invoke_with_resource_id_and_stack_name(self):
resource_logical_id = "KinesisStream"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported")
event = '{"foo": "bar"}'
stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id

command_list = self.get_command_list(
resource_id=resource_logical_id,
stack_name=self.stack_name,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_kinesis_invoke_with_resource_id_provided_as_arn(self):
resource_logical_id = "KinesisStream"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported")

stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id
output = self.cfn_client.describe_stacks(StackName=self.stack_name)
kinesis_stream_arn = None
for detail in output["Stacks"][0]["Outputs"]:
if detail["OutputKey"] == "KinesisStreamArn":
kinesis_stream_arn = detail["OutputValue"]

event = '{"foo": "bar"}'
command_list = self.get_command_list(
resource_id=kinesis_stream_arn,
event=event,
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)

def test_kinesis_invoke_with_boto_parameters(self):
# ExplicitHashKey can be used to specify the shard to put the record to if the hashkey provided is
# between the start and end range of that shard's hash key range.
resource_logical_id = "KinesisStream"
if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources:
pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported")
stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id

describe_stream_response = self.kinesis_client.describe_stream(StreamName=stream_name)
shards = describe_stream_response["StreamDescription"]["Shards"]
specific_shard = shards[2]
start_hash_key = int(specific_shard["HashKeyRange"]["StartingHashKey"])
end_hash_key = int(specific_shard["HashKeyRange"]["EndingHashKey"])
explicit_hash_key = math.floor((start_hash_key + end_hash_key) / 2)

event = '{"foo": "bar"}'
command_list = self.get_command_list(
resource_id=resource_logical_id,
stack_name=self.stack_name,
event=event,
parameter_list=[
("ExplicitHashKey", str(explicit_hash_key)),
],
output="json",
)

remote_invoke_result = run_command(command_list)
self.assertEqual(0, remote_invoke_result.process.returncode)
remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode())

self.assertIn("SequenceNumber", remote_invoke_result_stdout)
self.assertIn("ShardId", remote_invoke_result_stdout)
self.assertEqual(remote_invoke_result_stdout["ShardId"], specific_shard["ShardId"])
self.assertIn("ResponseMetadata", remote_invoke_result_stdout)

response_record = self.get_kinesis_records(
remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name
)[0]

received_data = response_record["Data"].decode()
self.assertEqual(received_data, event)


@pytest.mark.xdist_group(name="sam_remote_invoke_nested_resources")
class TestNestedTemplateResourcesInvoke(RemoteInvokeIntegBase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: AWS Kinesis Data Stream

Resources:
# Define an AWS Kinesis Data Stream
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

Outputs:
# Kinesis Data Stream name
KinesisStream:
Description: Kinesis Data Stream name
Value: !Ref KinesisStream
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ Resources:
Type: AWS::SQS::Queue
Properties:
FifoQueue: true

# Define an AWS Kinesis Data Stream
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 3

Outputs:
MySQSQueueArn:
Value: !GetAtt MySQSQueue.Arn
Value: !GetAtt MySQSQueue.Arn
KinesisStreamArn:
Description: Kinesis Data Stream name
Value: !GetAtt KinesisStream.Arn

0 comments on commit ad9cf6f

Please sign in to comment.