Ex: Archive simulation

Hands on Exercise

In this exercise you will setup a table with TTL & DynamoDB Stream enabled. We will setup a local Python reader for the stream - the reader code will simulate the archival of expired items by writing the stream records to the local folder (temp).

archival-simulation.png

1. Setup a table in Local DynamoDB

  • Make sure that local DynamoDb is up

  • OR remove –endpoint-url and run against AWS

  • Delete the TTLTest table if it already setup. This is to clear the earlier stream records.

aws dynamodb delete-table \
    --table-name TTLTest  \
    --endpoint-url http://localhost:8000
aws dynamodb create-table \
    --table-name TTLTest  \
    --attribute-definitions '[
       {
          "AttributeName": "PK",
          "AttributeType": "S"
       }
    ]' \
    --key-schema '[
        {
            "AttributeName": "PK",
            "KeyType": "HASH"
        }
    ]' \
    --billing-mode PAY_PER_REQUEST \
    --endpoint-url  http://localhost:8000

2. Update table for TTL

  • TTL attribute name is ttl
aws dynamodb update-time-to-live \
    --table-name TTLTest \
    --time-to-live-specification 'Enabled=true,AttributeName=ttl' \
    --endpoint-url  http://localhost:8000

3. Enable DynamoDb stream

aws dynamodb update-table \
    --table-name TTLTest \
    --stream-specification StreamEnabled=true,StreamViewType=OLD_IMAGE \
    --endpoint-url http://localhost:8000

4. In a terminal launch archive simulator

  • Review the code
  • Code checks the following for writing event to a file under temp folder
    • eventName == REMOVE
    • stream-record[‘dynamodb’][‘userIdentity’][‘PrincipalId’] == “dynamodb.amazonaws.com
python ttl/archive-simulator.py 

5. In a separate terminal add items with TTL

  • Use the command in project root folder
  • Code does a PutItem with ttl=10 seconds and loops on GetItem till item expires
python ttl/put-and-wait-for-expiry.py

6. Check for file under temp folder

  • Check message on terminal running the archive simulator
  • Events for INSERT will be ignored
  • Every time an item is expired archive simulator writes a file under temp folder

7. Add an item without TTL & delete it manually

  • Item delete stream record will be ignored
  • Check message on terminal running the archive simulator
  • The event will be ignored as the item is deleted with DeleteItem API
aws dynamodb  put-item \
    --table-name    TTLTest  \
    --item '{
        "PK": {"S": "test"},
        "text": {"S": "sample text"}
    }' \
    --endpoint-url http://localhost:8000
aws dynamodb  delete-item \
    --table-name    TTLTest  \
    --key '{
        "PK": {"S": "test"}
    }' \
    --endpoint-url http://localhost:8000

8. Clean up

  • Delete files under temp folder
  • Delete the table
aws dynamodb delete-table \
    --table-name TTLTest  \
    --endpoint-url http://localhost:8000

Python code for archival

# OBJECTIVE of this code snippet is to Demonstrate how to 
# you may use a combination of Stream & TTL to archive 
# items automagically

import boto3
import json
import time

# Stream - Table 
TABLE_NAME='TTLTest'

# Get the stream arn needed for reading the stream
client = boto3.client('dynamodb', endpoint_url='http://localhost:8000')
STREAM_ARN = client.describe_table(TableName=TABLE_NAME)['Table']['LatestStreamArn']

# Get info on the stream & shards
streams_client = boto3.client('dynamodbstreams', endpoint_url='http://localhost:8000')

# Reads stream records with Seq number > last_sequence_number
# from ONLY 0th shard
def  write_latest_records(last_sequence_number):

    # 1. Get the shards
    response = streams_client.describe_stream(StreamArn=STREAM_ARN)
    shards = response['StreamDescription']['Shards']

    # 2. Read stream records from the 0th shard
    shard = shards[0]

    # 3. Get the stream record iterator
    #https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html
    shard_iterator = streams_client.get_shard_iterator(
                StreamArn=STREAM_ARN,
                ShardId=shard['ShardId'],
                ShardIteratorType='TRIM_HORIZON'
            )
    iterator = shard_iterator['ShardIterator']

    # 4. Get the records
    # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html
    records_in_response = streams_client.get_records(ShardIterator=iterator, Limit=1000)
    records = records_in_response['Records']

    # 5. Loop through the records. Print only if seq# of record > last_sequence_number
    # Loop to receive the items
    
    for record in records:        
        # print(record)
        if last_sequence_number=='' or record["dynamodb"]["SequenceNumber"] > last_sequence_number :
            # print("eventName = {}, Sequence# = {}".format(record["eventName"], record["dynamodb"]["SequenceNumber"]))
            # print(record["dynamodb"]["Keys"])
            last_sequence_number = record["dynamodb"]["SequenceNumber"]

            # Write to the temp folder
            if record['eventName'] == 'REMOVE':
                # print(record)
                try:
                    if record['userIdentity']['PrincipalId'] == "dynamodb.amazonaws.com" :
                        fname = str(record['dynamodb']['OldImage']['ttl']['N'])+".txt"
                        with open("./temp/"+fname, "w") as write_file:
                            json.dump(record, write_file, indent=4, sort_keys=True, default=str)
                        print(f"Wrote file ./temp/{fname}")                    
                except:
                    print(f"Ignored {record['dynamodb']['OldImage']['PK']} as REMOVE was NOT service generated!!")
            else:
                print(f"Event Ignored : {record['eventName']}")

    # 6. Return the last seq# that was printed
    return last_sequence_number

# Read existing & new stream recods from 0th shard
print('Reading DynamoDB Stream:')
last_sequence_number=''
while True:
    last_sequence_number = write_latest_records(last_sequence_number)
    time.sleep(5)