You need NoSQL Workbench and Local DynamoDb for this exercise. Refer to section #2 Tools in the guide for installation instructions.
acmebankmodel/modeling/example-order-model.json
This sample model is for customer orders
Commit the model to Local DynamoDB Table
aws dynamodb update-table \
--table-name example-customer-order-table \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
--endpoint-url http://localhost:8000
The sample Python code is for reference only; it is not production ready. It reads ONLY from 1 shard !!! The purpose is to demonstrate the use of stream API and show you the working of Stream.
python streams/read-stream.py
aws dynamodb put-item \
--table-name example-customer-order-table \
--item '{
"email": {"S":"anil@example.com"},
"order_number": {"S":"2023/01/01#1344"},
"order_amount": {"N":"234"},
"order_status": {"S":"pending"}
}' \
--endpoint-url http://localhost:8000
aws dynamodb update-item \
--table-name example-customer-order-table \
--key '{"email": {"S":"anil@example.com"},"order_number": {"S":"2023/01/01#1344"}}' \
--update-expression 'SET order_status = :shipped' \
--expression-attribute-values '{":shipped": {"S": "shipped"}}' \
--endpoint-url http://localhost:8000
aws dynamodb delete-item \
--table-name example-customer-order-table \
--key '{"email": {"S":"anil@example.com"},"order_number": {"S":"2023/01/01#1344"}}' \
--endpoint-url http://localhost:8000
aws dynamodb delete-table \
--table-name example-customer-order-table \
--endpoint-url http://localhost:8000
import boto3
import json
import time
# Stream - Table
TABLE_NAME='example-customer-order-table'
# Get the stream arn needed for reading the stream
client = boto3.client('dynamodb', endpoint_url='http://localhost:8000')
STREAM_ARN = client.describe_table(TableName=TABLE_NAME)['Table']['LatestStreamArn']
# Get info on the stream & shards
streams_client = boto3.client('dynamodbstreams', endpoint_url='http://localhost:8000')
# Reads stream records with Seq number > last_sequence_number
# from ONLY 0th shard
def get_latest_records(last_sequence_number):
# 1. Get the shards
response = streams_client.describe_stream(StreamArn=STREAM_ARN)
shards = response['StreamDescription']['Shards']
# 2. Read stream records from the 0th shard
shard = shards[0]
# 3. Get the stream record iterator
#https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html
shard_iterator = streams_client.get_shard_iterator(
StreamArn=STREAM_ARN,
ShardId=shard['ShardId'],
ShardIteratorType='TRIM_HORIZON'
)
iterator = shard_iterator['ShardIterator']
# 4. Get the records
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html
records_in_response = streams_client.get_records(ShardIterator=iterator, Limit=1000)
records = records_in_response['Records']
# 5. Loop through the records. Print only if seq# of record > last_sequence_number
# Loop to receive the items
for record in records:
# print(record)
if last_sequence_number=='' or record["dynamodb"]["SequenceNumber"] > last_sequence_number :
print("eventName = {}, Sequence# = {}".format(record["eventName"], record["dynamodb"]["SequenceNumber"]))
print(record["dynamodb"]["Keys"])
last_sequence_number = record["dynamodb"]["SequenceNumber"]
# 6. Return the last seq# that was printed
return last_sequence_number
# Read existing & new stream recods from 0th shard
print('Reading DynamoDB Stream:')
last_sequence_number=''
while True:
last_sequence_number = get_latest_records(last_sequence_number)
time.sleep(5)