Ex: DynamoDB Streams

Hands on Exercise

You need NoSQL Workbench and Local DynamoDb for this exercise. Refer to section #2 Tools in the guide for installation instructions.

1. Import the model JSON to workbench & commit to DynamoDB

  • acmebankmodel/modeling/example-order-model.json

  • This sample model is for customer orders

  • Commit the model to Local DynamoDB Table

commit-to-local-ddb

2. Enable DynamoDB stream on table example-customer-order-table

  • Run the command in root folder of the project
  • Notice the StreamViewType
aws dynamodb update-table \
    --table-name example-customer-order-table \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
    --endpoint-url http://localhost:8000

3. In a terminal launch the Python code

The sample Python code is for reference only; it is not production ready. It reads ONLY from 1 shard !!! The purpose is to demonstrate the use of stream API and show you the working of Stream.

  • Review the Python code under streams/read-stream.py
  • Every time an item manipulation is carried out - Python code receives a Stream record
  • Code will throw exception is stream is Disabled
python  streams/read-stream.py

4. Add, Update, Delete Items

  • You may use the Workbench to manipulate the items
  • Observe the terminal for records getting read by the python code
aws dynamodb put-item \
    --table-name example-customer-order-table \
    --item '{
              "email": {"S":"anil@example.com"}, 
              "order_number": {"S":"2023/01/01#1344"}, 
              "order_amount": {"N":"234"}, 
              "order_status": {"S":"pending"}
           }'   \
    --endpoint-url http://localhost:8000
aws dynamodb update-item \
   --table-name example-customer-order-table \
   --key '{"email": {"S":"anil@example.com"},"order_number": {"S":"2023/01/01#1344"}}' \
   --update-expression  'SET order_status = :shipped' \
   --expression-attribute-values '{":shipped": {"S": "shipped"}}' \
   --endpoint-url http://localhost:8000
aws dynamodb delete-item \
   --table-name example-customer-order-table \
   --key '{"email": {"S":"anil@example.com"},"order_number": {"S":"2023/01/01#1344"}}' \
   --endpoint-url http://localhost:8000

5. Delete sample table

aws dynamodb delete-table \
    --table-name example-customer-order-table \
    --endpoint-url http://localhost:8000

Python code [read-stream.py]

  • Uses the local DynamoDB table
  • To connect with table on AWS remove the endpoint_url for client creation
import boto3
import json
import time

# Stream - Table 
TABLE_NAME='example-customer-order-table'

# Get the stream arn needed for reading the stream
client = boto3.client('dynamodb', endpoint_url='http://localhost:8000')
STREAM_ARN = client.describe_table(TableName=TABLE_NAME)['Table']['LatestStreamArn']

# Get info on the stream & shards
streams_client = boto3.client('dynamodbstreams', endpoint_url='http://localhost:8000')

# Reads stream records with Seq number > last_sequence_number
# from ONLY 0th shard
def  get_latest_records(last_sequence_number):

    # 1. Get the shards
    response = streams_client.describe_stream(StreamArn=STREAM_ARN)
    shards = response['StreamDescription']['Shards']

    # 2. Read stream records from the 0th shard
    shard = shards[0]

    # 3. Get the stream record iterator
    #https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html
    shard_iterator = streams_client.get_shard_iterator(
                StreamArn=STREAM_ARN,
                ShardId=shard['ShardId'],
                ShardIteratorType='TRIM_HORIZON'
            )
    iterator = shard_iterator['ShardIterator']

    # 4. Get the records
    # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html
    records_in_response = streams_client.get_records(ShardIterator=iterator, Limit=1000)
    records = records_in_response['Records']

    # 5. Loop through the records. Print only if seq# of record > last_sequence_number
    # Loop to receive the items
    
    for record in records:        
        # print(record)
        if last_sequence_number=='' or record["dynamodb"]["SequenceNumber"] > last_sequence_number :
            print("eventName = {}, Sequence# = {}".format(record["eventName"], record["dynamodb"]["SequenceNumber"]))
            print(record["dynamodb"]["Keys"])
            last_sequence_number = record["dynamodb"]["SequenceNumber"]

    # 6. Return the last seq# that was printed
    return last_sequence_number

# Read existing & new stream recods from 0th shard
print('Reading DynamoDB Stream:')
last_sequence_number=''
while True:
    last_sequence_number = get_latest_records(last_sequence_number)
    time.sleep(5)