Modeling : Aggregations (v12)

Watch lesson on UDEMY Watch lesson on Pragmatic Paths

ACME Bank Requirement (Iteration# 12)

  • ACME bank manages multiple dashboards
  • Some of these dashboards are near-real-time
  • Here is one such dashboard. This is a dily rollup of:
    • New customers
    • New Accounts (Customer has one-to-many relationship with Account)
    • Total of Credit transactions
    • Total of Debit transactions

acme-dashboard

Challenge

Unlike relational databses aggregation on DynamoDB is not straight-forward. This is due to the fact that there is no join capability.

Ask

Put together a solution with following needs in mind:

  1. Dashboard will be configured to read current date’s data at any time. It should show the recent data
  2. The aggregation-report can be generated for any date i.e., historical stats/data need to be maintained

Daily statistics persistenc

  • You may store the per-day-stats in the same table

daily-stats-aggregate-view

Solution

  • Use DynamoDB streams
    • INSERTS only are needed as we have to capture new CUST/ACCT/TXN
    • Kinesis streams is an alternative
  • Use a Lambda
    • Although we can use SDK & Kinesis library, it is recommended that you use Lambda
  • Daily aggregated/stats data persistence
    • Use the same table for storing the daily stats

Flow

  1. When CUST/ACCT/TXN items are Inserted into the table Stream Records are created
  2. Lambda function gets triggered
    • Reads the stream records and rollup the aggregates/stats
  3. Updates the daily stats in the DynamoDB table (acme-bank-v12)

solution-flow

Setup steps

You may use these steps to try out the solution in your own AWS account.

1. Setup the table on AWS
  • Import the acmebank/acme-bank-modelv12 to NoSQL Workbench
    • Review the AGG#… items in the aggregate view
  • Commit the model to your AWS account
2. Enable Streams on the table

Enable DynamoDB stream on the table

  • View = NEW_IMAGE only
  • We need the txn_amount from the TXN items otherwise we could have used KEYS_ONLY
aws dynamodb update-table \
    --table-name acme-bank-v12 \
    --stream-specification '{
        "StreamEnabled": true,
        "StreamViewType":"NEW_IMAGE" 
      }'
3. Setup execution role for Lambda function
  • Function should be able to carry out CRUD on the table
  • Function should be able to access Streams

Run the following commands in the root folder of project repository.

  • Creates the execution role for Lambda function
aws iam create-role --role-name   ddb-stream-lambda-acmebank-role \
    --assume-role-policy-document  file://./streams/trust-lambda.json  
  • Create the policy with appropriate permissions
  • Review the required permissions
aws iam create-policy --policy-name ddb-stream-lambda-acmebank-policy \
   --policy-document    file://./streams/policies-lambda-stream.json 
  • Get the ARN for the policy; we need it in next command
aws iam list-policies --query 'Policies[?PolicyName==`ddb-stream-lambda-acmebank-policy`].Arn' --output text
  • Attach the policy to the role
  • Replace the policy ARN in the next command
aws iam attach-role-policy   --role-name ddb-stream-lambda-acmebank-role  --policy-arn           REPLACE-WITH-ARN 
4. Create the Lambda function & setup trigger
  • Open Lambda management console

    • Name = ddb-acmebank-stream-reader
    • Runtime = Python 3.8
    • Change Execution Role > Use an existing role= ddb-stream-lambda-acmebank-role
  • Copy paste the code below to your Lambda function

  • DEPLOY the function

# Code for Lambda 

import json
import boto3
from botocore.exceptions import ClientError
from datetime import date

TABLE_NAME =  "acme-bank-v12"

# First call of day will use PutItem to add the aggregate items
def is_first_call_of_day(dt):
    input = {
        "TableName": TABLE_NAME,
        "Key": {
            "PK": {"S":"AGG#"+dt}, 
            "SK": {"S":"CUSTOMERS"}
        }
    }
    dynamodb_client = boto3.client("dynamodb")
    response = dynamodb_client.get_item(**input)
    if "Item" in response:
        print("SUBSEQUENT call")
    else:
        print("FIRST Aggregation call of the day")
        # add aggregate items
        input =  { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"CUSTOMERS"},"agg_count": {"N":"0"}}}
        dynamodb_client.put_item(**input)
        input =  { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"ACCOUNTS"},"agg_count": {"N":"0"}}}
        dynamodb_client.put_item(**input)
        input =  { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"TXN#CREDIT"},"agg_sum": {"N":"0"}}}
        dynamodb_client.put_item(**input)
        input =  { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"TXN#DEBIT"},"agg_sum": {"N":"0"}}}
        dynamodb_client.put_item(**input)
        

# create the UpdateItem json
def  update_aggregate_stats_in_table(deminesion, dimension_attribute, incr_value, dt):

    input =  {
        "TableName": TABLE_NAME,
        "Key": {
            "PK": {"S":"AGG#"+dt}, 
            "SK": {"S":deminesion}
        },
        "UpdateExpression": "SET #agg_attr = #agg_attr + :agg_attr_val",
        "ExpressionAttributeNames": {"#agg_attr": dimension_attribute},
        "ExpressionAttributeValues": {":agg_attr_val": {"N":str(incr_value)}}
    }
    
    # print(f'Update input={input}')
    
    dynamodb_client = boto3.client("dynamodb")
    try:
        response = dynamodb_client.update_item(**input)
        print("Successfully updated item.")
        # Handle response
    except ClientError as error:
        print("Client error "+ error.response['Error']['Message'])
    except BaseException as error:
        print("Unknown error while updating item: " + error.response['Error']['Message'])
        
        

# Update table with aggregates
def  update_aggregate_stats(new_cust_count, new_acct_count, txn_credits_sum, txn_debit_sum):
    
    print(f'Agg  {new_cust_count} {new_acct_count} {txn_credits_sum} {txn_debit_sum}')
    
    dt = date.today().strftime("%Y/%m/%d")
    is_first_call_of_day(dt)
    
    # Optimize - replace with BatchWriteItem
    # Update customer count
    if new_cust_count != 0:
        update_aggregate_stats_in_table("CUSTOMERS", "agg_count", new_cust_count, dt)
        
    # Update customer count
    if new_acct_count != 0:
        update_aggregate_stats_in_table("ACCOUNTS", "agg_count", new_acct_count, dt)
        
    # Update customer count
    if txn_credits_sum != 0:
        update_aggregate_stats_in_table("TXN#CREDIT", "agg_sum", txn_credits_sum, dt)
        
    # Update customer count
    if txn_debit_sum != 0:
        update_aggregate_stats_in_table("TXN#DEBIT", "agg_sum", txn_debit_sum, dt)
    

# Lambda handler
def lambda_handler(event, context):
    
    new_acct_count = 0
    new_cust_count = 0
    txn_credits_sum = 0
    txn_debit_sum = 0
    
    for record in event['Records']:
        
        PK = record['dynamodb']['NewImage']['PK']['S']
        SK = record['dynamodb']['NewImage']['SK']['S']
        
        if PK.startswith('TXN#'):
            
            # New Txn
            txn_amount = int(record['dynamodb']['NewImage']['txn_amount']['N'])
            if txn_amount > 0:
                txn_credits_sum = txn_credits_sum + txn_amount
            else:
                txn_debit_sum = txn_debit_sum + txn_amount
                
            # print('new txn')
            
        elif PK.startswith('CUST#') and SK.startswith('CUST#'):
            
            # New customer is added
            new_cust_count = new_cust_count + 1
            # print('new cust')
            
        elif PK.startswith('CUST#') and SK.startswith('ACCT#'):
            
            # New account
            new_acct_count = new_acct_count + 1
            # print('new acct')
        else:
            print('ignore')
            
    # update the aggregate data
    update_aggregate_stats(new_cust_count, new_acct_count, txn_credits_sum, txn_debit_sum)
    
    return {
        'statusCode': 200,
        'body': json.dumps('processed insert!')
    }
5. Setup trigger
  • Click on the + Add trigger button
    • Trigger source = DynamoDB
    • DynamoDB table = Select acme-bank-v12
  • Click on Additional Settings » Add button & add the filter criteria
{"eventName": ["INSERT"]}
{
    "dynamodb":
    {
        "NewImage": {
            "PK": {"S":[{"prefix": "CUST#"}, {"prefix": "TXN#"}]}
        }
    }
}

Test it out

1. Add items to table

  • Run this command in project’s root folder
  • Review the scripts & checkout item data in json file
  • This script deletes the test items (from fresh test runs)
python acmebank/aggregate/delete-unit-test-transactions.py acme-bank-v12 acmebank/aggregate/bulk-transactions.json aws
  • This scripts adds the items
python acmebank/aggregate/unit-test-transactions.py acme-bank-v12 acmebank/aggregate/bulk-transactions.json aws

2. Checkout the aggregate data

  • The aggregated daily data is added to the table with {"PK": "AGG#<<Date>>"}
  • Change the date to today’s date to pull the data
  • Note down the values
aws dynamodb query \
    --table-name acme-bank-v12 \
    --key-condition-expression 'PK = :agg ' \
    --expression-attribute-values '{
      ":agg": {"S": "AGG#2023/02/09"}
   }' \
   --projection-expression "SK,agg_count,agg_sum"

3. Add a TXN and check stats again

  • Change the TXN number for each run otherwise stats will not change
aws dynamodb put-item \
    --table-name   acme-bank-v12  \
    --item '{    
        "PK": {"S": "TXN#533#111"},
        "SK": {"S": "ACCT#533"},
        "txn_amount": {"N": "-100"}  
    }' 
  • Check the stats
  • CHANGE the Data

aws dynamodb query \
    --table-name acme-bank-v12 \
    --key-condition-expression 'PK = :agg ' \
    --expression-attribute-values '{
      ":agg": {"S": "AGG#2023/02/09"}
   }' \
   --projection-expression "SK,agg_count,agg_sum"

Cleanup

1. Delete the Lambda function

aws lambda delete-function --function-name ddb-acmebank-stream-reader

2. Delete the Execution role

  • To delete role we need to first detach the policy
aws iam list-policies --query 'Policies[?PolicyName==`ddb-stream-lambda-acmebank-policy`].Arn' --output text
 aws iam detach-role-policy --role-name ddb-stream-lambda-acmebank-role --policy-arn <<<Policy ARN>>>
  • Now delete the role
aws iam delete-role --role-name ddb-stream-lambda-acmebank-role 

3. Delete the Policy

  • To delete the policy we need its ARN
aws iam list-policies --query 'Policies[?PolicyName==`ddb-stream-lambda-acmebank-policy`].Arn' --output text
aws iam delete-policy --policy-arn   <<<Policy ARN>>>

4. Delete the table

aws dynamodb delete-table --table-name acme-bank-v12