diff --git a/tests/integration/remote/invoke/remote_invoke_integ_base.py b/tests/integration/remote/invoke/remote_invoke_integ_base.py index 7e3dc4cf3e..062223d53a 100644 --- a/tests/integration/remote/invoke/remote_invoke_integ_base.py +++ b/tests/integration/remote/invoke/remote_invoke_integ_base.py @@ -59,6 +59,35 @@ def create_resources_and_boto_clients(cls): cls.stepfunctions_client = boto_client_provider("stepfunctions") cls.xray_client = boto_client_provider("xray") cls.sqs_client = boto_client_provider("sqs") + cls.kinesis_client = boto_client_provider("kinesis") + + def get_kinesis_records(self, shard_id, sequence_number, stream_name): + """Helper function to get kinesis records using the provided shard_id and sequence_number + + Parameters + ---------- + shard_id: string + Shard Id to fetch the record from + sequence_number: string + Sequence number to get the record for + stream_name: string + Name of the kinesis stream to get records from + Returns + ------- + list + Returns a list of records received from the kinesis data stream + """ + response = self.kinesis_client.get_shard_iterator( + StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType="AT_SEQUENCE_NUMBER", + StartingSequenceNumber=sequence_number, + ) + shard_iter = response["ShardIterator"] + response = self.kinesis_client.get_records(ShardIterator=shard_iter, Limit=1) + records = response["Records"] + + return records @staticmethod def get_command_list( diff --git a/tests/integration/remote/invoke/test_remote_invoke.py b/tests/integration/remote/invoke/test_remote_invoke.py index 42f2ace9bb..d9af6fb4e3 100644 --- a/tests/integration/remote/invoke/test_remote_invoke.py +++ b/tests/integration/remote/invoke/test_remote_invoke.py @@ -2,6 +2,7 @@ import uuid import base64 import time +import math from parameterized import parameterized from unittest import skip @@ -319,6 +320,129 @@ def test_invoke_boto_parameters(self): self.assertEqual(received_message.get("MessageAttributes"), message_attributes) +@skip("Skip remote invoke Kinesis integration tests") +@pytest.mark.xdist_group(name="sam_remote_invoke_kinesis_resource_priority") +class TestKinesisPriorityInvoke(RemoteInvokeIntegBase): + template = Path("template-kinesis-priority.yaml") + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.stack_name = f"{cls.__name__}-{uuid.uuid4().hex}" + cls.create_resources_and_boto_clients() + cls.stream_name = cls.stack_resource_summaries["KinesisStream"].physical_resource_id + + def test_invoke_empty_event_provided(self): + command_list = self.get_command_list(stack_name=self.stack_name) + expected_message_body = "{}" + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, expected_message_body) + + @parameterized.expand([('{"foo": "bar"}'), ('"Hello World"'), '{"hello": "world", "foo": 1, "bar": {}}']) + def test_invoke_with_event_provided(self, event): + command_list = self.get_command_list( + stack_name=self.stack_name, + event=event, + ) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, event) + + def test_invoke_with_event_file_provided(self): + event_file_path = str(self.events_folder_path.joinpath("default_event.json")) + command_list = self.get_command_list(stack_name=self.stack_name, event_file=event_file_path) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name + )[0] + + received_data = response_record["Data"].decode() + with open(event_file_path, "r") as f: + expected_message = f.read() + self.assertEqual(received_data, expected_message) + + def test_invoke_with_physical_id_provided_as_resource_id(self): + event = '{"foo": "bar"}' + command_list = self.get_command_list( + resource_id=self.stream_name, + event=event, + ) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, event) + + def test_invoke_boto_parameters(self): + event = '{"foo": "bar"}' + command_list = self.get_command_list( + stack_name=self.stack_name, + event=event, + parameter_list=[ + ( + "PartitionKey", + "override-partition-key", + ), + ("SequenceNumberForOrdering", "0"), + ], + output="json", + ) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + self.assertIn("ResponseMetadata", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], self.stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, event) + + @pytest.mark.xdist_group(name="sam_remote_invoke_multiple_resources") class TestMultipleResourcesInvoke(RemoteInvokeIntegBase): template = Path("template-multiple-resources.yaml") @@ -557,7 +681,7 @@ def test_sqs_invoke_with_resource_id_provided_as_arn(self): self.assertEqual(given_message, received_message.get("Body")) self.assertEqual(received_message["MessageId"], remote_invoke_result_stdout["MessageId"]) - def test_invoke_boto_parameters_fifo_queue(self): + def test_sqs_invoke_boto_parameters_fifo_queue(self): given_message = "Hello World" resource_logical_id = "MyFIFOSQSQueue" if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources: @@ -600,6 +724,107 @@ def test_invoke_boto_parameters_fifo_queue(self): # Message id will be the same as it got deduped and a new message was not created self.assertEqual(received_message["MessageId"], remote_invoke_result_stdout["MessageId"]) + def test_kinesis_invoke_with_resource_id_and_stack_name(self): + resource_logical_id = "KinesisStream" + if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources: + pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported") + event = '{"foo": "bar"}' + stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id + + command_list = self.get_command_list( + resource_id=resource_logical_id, + stack_name=self.stack_name, + event=event, + ) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, event) + + def test_kinesis_invoke_with_resource_id_provided_as_arn(self): + resource_logical_id = "KinesisStream" + if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources: + pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported") + + stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id + output = self.cfn_client.describe_stacks(StackName=self.stack_name) + kinesis_stream_arn = None + for detail in output["Stacks"][0]["Outputs"]: + if detail["OutputKey"] == "KinesisStreamArn": + kinesis_stream_arn = detail["OutputValue"] + + event = '{"foo": "bar"}' + command_list = self.get_command_list( + resource_id=kinesis_stream_arn, + event=event, + ) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, event) + + def test_kinesis_invoke_with_boto_parameters(self): + # ExplicitHashKey can be used to specify the shard to put the record to if the hashkey provided is + # between the start and end range of that shard's hash key range. + resource_logical_id = "KinesisStream" + if self.stack_resource_summaries[resource_logical_id].resource_type not in self.supported_resources: + pytest.skip("Skip remote invoke Kinesis integration tests as resource is not supported") + stream_name = self.stack_resource_summaries[resource_logical_id].physical_resource_id + + describe_stream_response = self.kinesis_client.describe_stream(StreamName=stream_name) + shards = describe_stream_response["StreamDescription"]["Shards"] + specific_shard = shards[2] + start_hash_key = int(specific_shard["HashKeyRange"]["StartingHashKey"]) + end_hash_key = int(specific_shard["HashKeyRange"]["EndingHashKey"]) + explicit_hash_key = math.floor((start_hash_key + end_hash_key) / 2) + + event = '{"foo": "bar"}' + command_list = self.get_command_list( + resource_id=resource_logical_id, + stack_name=self.stack_name, + event=event, + parameter_list=[ + ("ExplicitHashKey", str(explicit_hash_key)), + ], + output="json", + ) + + remote_invoke_result = run_command(command_list) + self.assertEqual(0, remote_invoke_result.process.returncode) + remote_invoke_result_stdout = json.loads(remote_invoke_result.stdout.strip().decode()) + + self.assertIn("SequenceNumber", remote_invoke_result_stdout) + self.assertIn("ShardId", remote_invoke_result_stdout) + self.assertEqual(remote_invoke_result_stdout["ShardId"], specific_shard["ShardId"]) + self.assertIn("ResponseMetadata", remote_invoke_result_stdout) + + response_record = self.get_kinesis_records( + remote_invoke_result_stdout["ShardId"], remote_invoke_result_stdout["SequenceNumber"], stream_name + )[0] + + received_data = response_record["Data"].decode() + self.assertEqual(received_data, event) + @pytest.mark.xdist_group(name="sam_remote_invoke_nested_resources") class TestNestedTemplateResourcesInvoke(RemoteInvokeIntegBase): diff --git a/tests/integration/testdata/remote_invoke/template-kinesis-priority.yaml b/tests/integration/testdata/remote_invoke/template-kinesis-priority.yaml new file mode 100644 index 0000000000..143ab6e1d3 --- /dev/null +++ b/tests/integration/testdata/remote_invoke/template-kinesis-priority.yaml @@ -0,0 +1,16 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: AWS Kinesis Data Stream + +Resources: + # Define an AWS Kinesis Data Stream + KinesisStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 1 + +Outputs: + # Kinesis Data Stream name + KinesisStream: + Description: Kinesis Data Stream name + Value: !Ref KinesisStream \ No newline at end of file diff --git a/tests/integration/testdata/remote_invoke/template-multiple-resources.yaml b/tests/integration/testdata/remote_invoke/template-multiple-resources.yaml index 5399707e71..217ead522c 100644 --- a/tests/integration/testdata/remote_invoke/template-multiple-resources.yaml +++ b/tests/integration/testdata/remote_invoke/template-multiple-resources.yaml @@ -103,7 +103,16 @@ Resources: Type: AWS::SQS::Queue Properties: FifoQueue: true + + # Define an AWS Kinesis Data Stream + KinesisStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 3 Outputs: MySQSQueueArn: - Value: !GetAtt MySQSQueue.Arn \ No newline at end of file + Value: !GetAtt MySQSQueue.Arn + KinesisStreamArn: + Description: Kinesis Data Stream name + Value: !GetAtt KinesisStream.Arn \ No newline at end of file