Skip to content

Latest commit

 

History

History
85 lines (70 loc) · 7.56 KB

File metadata and controls

85 lines (70 loc) · 7.56 KB

Ingest Server-Sent Events (SSE) using Amazon Managed Service for Apache Flink (formerly Amazon Kinesis Data Analytics)


🚨 August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink.


When dealing with real-time data it is often required to send that data over the internet to various sources. Various technologies have enabled this such as web sockets and long polling. Recently server-sent events (SSE) has become a popular technology to push updates to clients. Ingesting this type of data source into AWS requires a client to be running continuously to receive those events. This sample shows how to connect to an SSE endpoint using an Amazon Kinesis Data Analytics application using Apache Flink. As the events arrive they are published to an Amazon Kinesis Data Streams stream then in this sample we simply store the event data in Amazon S3.

Architecture

Architecture

  1. An Amazon Kinesis Data Analytics application using Apache Flink is used to create a connection to the server-sent events HTTP endpoint.
  2. Each event that is ingested is published to the Amazon Kinesis Data Streams stream. The application must be placed in a private subnet of a virtual private cloud (VPC) to allow for outbound connections to be made.
  3. The Amazon Kinesis Data Firehose receives the event payload from the data stream
  4. The Amazon Simple Storage Service bucket is used to store the events for future analysis

Example SSE use cases:

  1. https://developers.facebook.com/docs/graph-api/server-sent-events/
    1. Facebook uses SSE to send out updates for live video comments and reactions
  2. https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams
    1. Wikimedia uses SSE to send all changes to wiki sites
  3. https://iexcloud.io/docs/api/#sse-streaming
    1. IEX uses SSE to stream realtime stock quotes
  4. https://www.mbta.com/developers/v3-api/streaming
    1. MBTA uses SSE to stream realtime transportation predictions

Requirements To Compile

  1. Apache Maven 3.5 or greater installed
  2. Java 11 or greater installed
  3. AWS Cloud Development Kit (CDK)

Compiling

  1. Git clone this repository
  2. Run the maven package command "mvn package"

Setup

  1. Upload the compiled jar file ('amazon-kinesis-data-analytics-apache-flink-server-sent-events-{version}.jar') to an S3 bucket in the account you plan to run.
  2. The CDK will produce two different CloudFormation Templates
    1. amazon-kinesis-data-analytics-apache-flink-server-sent-events-create-vpc.template
      1. This template will create a VPC for you with private and public subnets
    2. amazon-kinesis-data-analytics-apache-flink-server-sent-events-use-existing-vpc.template
      1. This template will allow you to select existing security group Ids and subnet Ids to use with the application
      2. The subnet Ids selected must be private to allow outbound connections from within an Amazon Kinesis Data Analytics Apache Flink application.
      3. The security group Ids selected must allow outbound connections on the HTTP port required. In this sample we create an outbound connection on the standard HTTP port 80.
  3. Use one of the generated CloudFormation Templates from the console or CLI
  4. Fill in the required parameters:
    1. S3Bucket - The S3 bucket where the Amazon Kinesis Data Analytics application gets your application's JAR file
    2. S3StorageBucket - The S3 bucket name used to store the server-sent events data
    3. S3StorageBucketPrefix - The prefix used when storing server-sent events data into the S3 bucket
    4. S3StorageBucketErrorPrefix - The prefix used when storing error events into the S3 bucket
    5. FlinkApplication - The Apache Flink application jar filename located in the S3 bucket
    6. *Subnets - The subnets used for the Amazon Kinesis Data Analytics application (When using an existing VPC template)
    7. *SecurityGroups - The security groups used for the Amazon Kinesis Data Analytics application (When using an existing VPC template)

Running

  1. The sample included will set up the Amazon Kinesis Data Analytics application to connect to the wikimedia events streams recent changes SSE endpoint.
  2. Run the Kinesis Data Analytics Application from the console or cli
  3. Once the application is running, navigate to the S3 bucket supplied in the CloudFormation properties and view the event data records

SSE Endpoints

To connect to a different endpoint you can edit the Amazon Kinesis Data Analytics Runtime Properties. The following properties are available:

  1. Property Group: ProducerConfigProperties
    1. The key value pair properties in this group are fed directly into the FlinkKinesisProducer, please reference these properties
    2. The key AggregationEnabled should be set to false when sending data to endpoints which cannot disaggregate the data. For example if AggregationEnabled is true and the data stream is ingested by an AWS Lambda function you will be required to disaggregate the data yourself inside the function.
  2. Property Group: OutputStreamProperties
    1. DefaultStream - The value of this property should be the Amazon Kinesis Data Streams stream that this application will output events to. Please be aware that the security role for the Amazon Kinesis Data Analytics application will require permissions to this stream.
  3. Property Group: SSESourceProperties
    1. url (required) - The SSE endpoint to connect
    2. headers - A pipe delimited set of key/value pair headers. For example "x-api-key|demo|Accept|text/html" is broken into key: x-api-key with value: demo and key: Accept with value: text/html
    3. types - A pipe delimited set of SSE event types to send to the data stream.
    4. readTimeoutMS - The read timeout value in milliseconds. Note in most cases adjusting this to anything but zero will cause the system to not connect.
    5. reportMessagesReceivedMS - How often to log the number of messages received over a given period of time in milliseconds.

Resource Cleanup

CloudFormation

  1. Delete the CloudFormation stack
  2. Remove the jar file from the S3 bucket

Security

See CONTRIBUTING for more information.

License

This library is licensed under the MIT-0 License. See the LICENSE file.