Companion guide course on DynamoDB Data Modeling
Unlike relational databses aggregation on DynamoDB is not straight-forward. This is due to the fact that there is no join capability.
Put together a solution with following needs in mind:
You may use these steps to try out the solution in your own AWS account.
Enable DynamoDB stream on the table
aws dynamodb update-table \ --table-name acme-bank-v12 \ --stream-specification '{ "StreamEnabled": true, "StreamViewType":"NEW_IMAGE" }'
Run the following commands in the root folder of project repository.
aws iam create-role --role-name ddb-stream-lambda-acmebank-role \ --assume-role-policy-document file://./streams/trust-lambda.json
aws iam create-policy --policy-name ddb-stream-lambda-acmebank-policy \ --policy-document file://./streams/policies-lambda-stream.json
aws iam list-policies --query 'Policies[?PolicyName==`ddb-stream-lambda-acmebank-policy`].Arn' --output text
aws iam attach-role-policy --role-name ddb-stream-lambda-acmebank-role --policy-arn REPLACE-WITH-ARN
Open Lambda management console
Copy paste the code below to your Lambda function
DEPLOY the function
# Code for Lambda import json import boto3 from botocore.exceptions import ClientError from datetime import date TABLE_NAME = "acme-bank-v12" # First call of day will use PutItem to add the aggregate items def is_first_call_of_day(dt): input = { "TableName": TABLE_NAME, "Key": { "PK": {"S":"AGG#"+dt}, "SK": {"S":"CUSTOMERS"} } } dynamodb_client = boto3.client("dynamodb") response = dynamodb_client.get_item(**input) if "Item" in response: print("SUBSEQUENT call") else: print("FIRST Aggregation call of the day") # add aggregate items input = { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"CUSTOMERS"},"agg_count": {"N":"0"}}} dynamodb_client.put_item(**input) input = { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"ACCOUNTS"},"agg_count": {"N":"0"}}} dynamodb_client.put_item(**input) input = { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"TXN#CREDIT"},"agg_sum": {"N":"0"}}} dynamodb_client.put_item(**input) input = { "TableName": TABLE_NAME, "Item" : {"PK": {"S":"AGG#"+dt}, "SK": {"S":"TXN#DEBIT"},"agg_sum": {"N":"0"}}} dynamodb_client.put_item(**input) # create the UpdateItem json def update_aggregate_stats_in_table(deminesion, dimension_attribute, incr_value, dt): input = { "TableName": TABLE_NAME, "Key": { "PK": {"S":"AGG#"+dt}, "SK": {"S":deminesion} }, "UpdateExpression": "SET #agg_attr = #agg_attr + :agg_attr_val", "ExpressionAttributeNames": {"#agg_attr": dimension_attribute}, "ExpressionAttributeValues": {":agg_attr_val": {"N":str(incr_value)}} } # print(f'Update input={input}') dynamodb_client = boto3.client("dynamodb") try: response = dynamodb_client.update_item(**input) print("Successfully updated item.") # Handle response except ClientError as error: print("Client error "+ error.response['Error']['Message']) except BaseException as error: print("Unknown error while updating item: " + error.response['Error']['Message']) # Update table with aggregates def update_aggregate_stats(new_cust_count, new_acct_count, txn_credits_sum, txn_debit_sum): print(f'Agg {new_cust_count} {new_acct_count} {txn_credits_sum} {txn_debit_sum}') dt = date.today().strftime("%Y/%m/%d") is_first_call_of_day(dt) # Optimize - replace with BatchWriteItem # Update customer count if new_cust_count != 0: update_aggregate_stats_in_table("CUSTOMERS", "agg_count", new_cust_count, dt) # Update customer count if new_acct_count != 0: update_aggregate_stats_in_table("ACCOUNTS", "agg_count", new_acct_count, dt) # Update customer count if txn_credits_sum != 0: update_aggregate_stats_in_table("TXN#CREDIT", "agg_sum", txn_credits_sum, dt) # Update customer count if txn_debit_sum != 0: update_aggregate_stats_in_table("TXN#DEBIT", "agg_sum", txn_debit_sum, dt) # Lambda handler def lambda_handler(event, context): new_acct_count = 0 new_cust_count = 0 txn_credits_sum = 0 txn_debit_sum = 0 for record in event['Records']: PK = record['dynamodb']['NewImage']['PK']['S'] SK = record['dynamodb']['NewImage']['SK']['S'] if PK.startswith('TXN#'): # New Txn txn_amount = int(record['dynamodb']['NewImage']['txn_amount']['N']) if txn_amount > 0: txn_credits_sum = txn_credits_sum + txn_amount else: txn_debit_sum = txn_debit_sum + txn_amount # print('new txn') elif PK.startswith('CUST#') and SK.startswith('CUST#'): # New customer is added new_cust_count = new_cust_count + 1 # print('new cust') elif PK.startswith('CUST#') and SK.startswith('ACCT#'): # New account new_acct_count = new_acct_count + 1 # print('new acct') else: print('ignore') # update the aggregate data update_aggregate_stats(new_cust_count, new_acct_count, txn_credits_sum, txn_debit_sum) return { 'statusCode': 200, 'body': json.dumps('processed insert!') }
{"eventName": ["INSERT"]}
{ "dynamodb": { "NewImage": { "PK": {"S":[{"prefix": "CUST#"}, {"prefix": "TXN#"}]} } } }
python acmebank/aggregate/delete-unit-test-transactions.py acme-bank-v12 acmebank/aggregate/bulk-transactions.json aws
python acmebank/aggregate/unit-test-transactions.py acme-bank-v12 acmebank/aggregate/bulk-transactions.json aws
{"PK": "AGG#<<Date>>"}
aws dynamodb query \ --table-name acme-bank-v12 \ --key-condition-expression 'PK = :agg ' \ --expression-attribute-values '{ ":agg": {"S": "AGG#2023/02/09"} }' \ --projection-expression "SK,agg_count,agg_sum"
aws dynamodb put-item \ --table-name acme-bank-v12 \ --item '{ "PK": {"S": "TXN#533#111"}, "SK": {"S": "ACCT#533"}, "txn_amount": {"N": "-100"} }'
aws lambda delete-function --function-name ddb-acmebank-stream-reader
aws iam detach-role-policy --role-name ddb-stream-lambda-acmebank-role --policy-arn <<<Policy ARN>>>
aws iam delete-role --role-name ddb-stream-lambda-acmebank-role
aws iam delete-policy --policy-arn <<<Policy ARN>>>
aws dynamodb delete-table --table-name acme-bank-v12