Skip to content

beam-pyio/dynamodb_pyio

Repository files navigation

dynamodb_pyio

doc test release pypi python

Amazon DynamoDB is a serverless, NoSQL database service that allows you to develop modern applications at any scale. The Apache Beam Python I/O connector for Amazon DynamoDB (dynamodb_pyio) aims to integrate with the database service by supporting source and sink connectors. Currently, the sink connector is available.

Installation

The connector can be installed from PyPI.

$ pip install dynamodb_pyio

Usage

Sink Connector

It has the main composite transform (WriteToDynamoDB), and it expects a list or tuple PCollection element. If the element is a tuple, the tuple's first element is taken. If the element is not of the accepted types, you can apply the GroupIntoBatches or BatchElements transform beforehand. Then, the records of the element are written to a DynamoDB table with help of the batch_writer of the boto3 package. Note that the batch writer will automatically handle buffering and sending items in batches. In addition, it will also automatically handle any unprocessed items and resend them as needed.

The transform also has an option that handles duplicate records.

  • dedup_pkeys - List of keys to be used for deduplicating items in buffer.

Sink Connector Example

The transform can process many records, thanks to the batch writer.

import apache_beam as beam
from dynamodb_pyio.io import WriteToDynamoDB

records = [{"pk": str(i), "sk": i} for i in range(500)]

with beam.Pipeline() as p:
    (
        p
        | beam.Create([records])
        | WriteToDynamoDB(table_name=self.table_name)
    )

Duplicate records can be handled using the dedup_pkeys option.

import apache_beam as beam
from dynamodb_pyio.io import WriteToDynamoDB

records = [{"pk": str(1), "sk": 1} for _ in range(20)]

with beam.Pipeline() as p:
    (
        p
        | beam.Create([records])
        | WriteToDynamoDB(table_name=self.table_name, dedup_pkeys=["pk", "sk"])
    )

Batches of elements can be controlled further with the BatchElements or GroupIntoBatches transform

import apache_beam as beam
from apache_beam.transforms.util import BatchElements
from dynamodb_pyio.io import WriteToDynamoDB

records = [{"pk": str(i), "sk": i} for i in range(100)]

with beam.Pipeline() as p:
    (
        p
        | beam.Create(records)
        | BatchElements(min_batch_size=50, max_batch_size=50)
        | WriteToDynamoDB(table_name=self.table_name)
    )

See Introduction to DynamoDB PyIO Sink Connector for more examples.

Contributing

Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.

License

dynamodb_pyio was created as part of the Apache Beam Python I/O Connectors project. It is licensed under the terms of the Apache License 2.0 license.

Credits

dynamodb_pyio was created with cookiecutter and the pyio-cookiecutter template.