DynamoDB to Opensearch

source code

This project demonstrates how to automatically update indices in an Opensearch domain in response to CRUD operations against the source-of-truth database, in this case, DynamoDB. This keeps the Opensearch indices in sync with the DynamoDB table in near real-time.

Tech Stack

  • Terraform / Terragrunt
  • AWS Lambda, written in Python, deployed with Docker
  • Amazon API Gateway
  • Amazon DynamoDB
  • Amazon Opensearch Service

The REST API and CRUD Operations

The scenario of a transactional application performing CRUD (Create, Read, Update and Delete) operations against a source of truth database is realized with a simple, Serverless application compose of a REST API for the CRUD functionality and a DynamoDB table as the database.

The REST API is implemented with Amazon API Gateway and OpenAPI. There is no “app” as such. The Rest API is directly integrated with the DynamoDB table using API Gateway exensions. This is reflects the agile way of starting out simple, only bringing in Lambdas in the case that there is more complex processing logic required between the API and database.

For example, the POST method integrated directly with the DynamoDB table using Velocity Template Language and API Gateway extensions in the OpenAPI spec as follows:

/books:
    post: 
      x-amazon-apigateway-integration:
        type: aws
        uri: "arn:aws:apigateway:${aws_region}:dynamodb:action/PutItem"
        httpMethod: POST 
        passthroughBehavior: when_no_match
        credentials: ${apigw_role_arn}
        requestTemplates:
          application/json: |
            #set($inputRoot = $input.path('$'))
            {
              "TableName": "${books_table_name}",
              "Item": {
                "book_id": { "S": "$context.requestId" },
                "title": { "S": "$inputRoot.title" },
                "author": { "S": "$inputRoot.author" }
              }
            }
        responses:
          default:
            statusCode: 200 
            responseTemplates:
              application/json: |
                {
                  "message": "New book created."
                  "book_id": "$context.requestId"
                }

Processing the Record Changes in DyanamoDB

The DynamoDB table Terraform resource has DynamoDB Streams enabled through which the changes as a result of CRUD operations are propagated:

resource "aws_dynamodb_table" "books" {
  name         = "${local.app_prefix}${terraform.workspace}-books"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "book_id"

  attribute {
    name = "book_id"
    type = "S"
  }

  stream_enabled   = true
  stream_view_type = "NEW_IMAGE"
}

An aws_lambda_function resource and aws_lambda_event_source_mapping resource are defined in Terraform so that the DynamoDB change stream is the event source for the Lambda function:

resource "aws_lambda_function" "lambda_function" {
  function_name = aws_ecr_repository.image_repo.name
  role          = aws_iam_role.lambda_role.arn
  timeout       = var.lambda_timeout
  image_uri     = docker_registry_image.image.name
  package_type  = "Image"

  environment {
    variables = {
      OPENSEARCH_ENDPOINT = var.opensearch_endpoint
      INDEX_NAME          = var.index_name
    }
  }
}

resource "aws_lambda_event_source_mapping" "read_dynamodb_stream" {
  event_source_arn  = var.change_stream_arn
  function_name     = aws_lambda_function.lambda_function.function_name
  starting_position = "LATEST"
}

The Lambda function processes the record changes and updates the OpenSearch indices in near-real-time. DELETES are removed from the OpenSearch indices, while UPDATES and INSERTS are upserted.

...

def handler(event, context):
    logger.info(event)

    client = OpenSearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = awsauth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
    )

    count = 0
    if (event.get("Records", None) != None):
        for record in event['Records']:
            logger.info(f'DynamoDB Change Record: {record}')            

            if record['eventName'] == 'REMOVE':
                id = record['dynamodb']['Keys']['book_id']['S']
                logger.info(f'Removing document with id: {id}')

                response = client.delete(
                    index = index,
                    id = id,
                )
                count += 1
                logger.info(response)
            else:
                id = record['dynamodb']['Keys']['book_id']['S']
                document = record['dynamodb']['NewImage']       
                logger.info(f'Indexing document: {document}')

                response = client.index(
                    index = index,
                    body = document,
                    id = id, 
                    refresh = True
                )
                count += 1
                logger.info(response)

    return f'{count} records processed.'

Infrastructure as Code with Terraform and Terragrunt

The application and databases are deployed with Terraform and Terragrunt. The Terraform is organized so that each logical unit (usually applicable to one AWS service) is its own Terraform module. For example, the Lambda module is organized in a file/folder structure as follows:

── artifacts
│   └── update_indices
│       ├── Dockerfile
│       ├── lambda
│       │   └── function.py
│       └── requirements.txt
├── backend.tf
├── data.tf
├── locals.tf
├── main.tf
├── providers.tf
└── variables.tf

The Terraform modules are then composed to make a comprehensive solution with Terragrunt. Terragrunt manages the dependencies between the Terraform modules and feeds the outputs of dependencies to the dependent project(s) as Terraform inputs.

For example, the Terragrunt file for the Lambda module needs the outputs of the API and OpenSearch modules which are declared as dependencies. The outputs of the API and OpenSearch modules are then fed as inputs to the Lambda module:

include {
  path = find_in_parent_folders()
}

terraform {
  source = "../../modules/lambda"
}

dependency "api" {
  config_path = "../api"

  mock_outputs = {
    table_arn         = "mock_table_arn_value"
    change_stream_arn = "mock_change_stream_arn_value"
  }
}

dependency "opensearch" {
  config_path = "../opensearch"

  mock_outputs = {
    opensearch_endpoint = "mock_opensearch_endpoint_value"
  }
}

inputs = {
  aws_region          = "us-west-2"
  table_arn           = dependency.api.outputs.table_arn
  change_stream_arn   = dependency.api.outputs.change_stream_arn
  image_tag           = 1
  lambda_timeout      = 30
  opensearch_endpoint = dependency.opensearch.outputs.opensearch_endpoint
  index_name          = "books"
}

How I’ve laid things out here is a good example of how I like to approach a problem such as this - logical pieces that are composed to form a complex solution. I am also planning to use this approach as the foundation for updating OpenSearch indices for a DocumentDB song library in my Karaoke Project, which will definitely be a level of challenge above this one!

I arrived at this solution by piecing together the approaches from a couple other resources:

  • The general approach and the code for the Lambda is taken from this article and then ported the code from Typescript to Python.
  • The Terraform module organization with Terragrunt is an approach I learned on Pablo’s Spot YouTube channel, a standard Terraform / Terragrunt project structure that he favors on many of his projects.