CSV data to DynamoDb (Verification)

CSV data to DynamoDb (Verification)

Csv file upload in S3 bucket and processed though lambda and sqs and data will go to dynamodb and got mail of new user and existing user in dynamodb.

→ S3 Bucket - Upload CSV file.

→ Dynamo Db - All data will come on out DynamoDB database.

→ SQS - Flow through SQS and it is connect to S3 Bucket.

→ Lambda - logic through code and it will trigger the SQS.

→ SES - Email will come in template.

→ SSM - System manager used to store the parameter.

Steps:-

  1. Create S3 bucket

  2. Create Lambda

  3. Create SQS

  4. Create SES

  5. Create SSM

  6. Create DynamoDB

  7. Create S3 Bucket

    After create a bucket create a event notification -

    Properties → Create event notification → Destination → Choose your SQS ( First Read SQS for this) → without giving policy on SQS you will get an error.

  1. Create Lambda

    Write logic give all permission to Lambda

     import boto3
     import csv
     import os
     import json
    
     # Initialize AWS clients
     s3 = boto3.client('s3')
     dynamodb = boto3.resource('dynamodb')
     ses = boto3.client('ses')
     ssm = boto3.client('ssm')
     table_name = 'EmailsTable'  # Replace with your DynamoDB table name
     table = dynamodb.Table(table_name)
    
     def fetch_existing_email_ids():
         existing_emails = set()
         scan_kwargs = {}
         done = False
         start_key = None
    
         while not done:
             if start_key:
                 scan_kwargs['ExclusiveStartKey'] = start_key
             response = table.scan(**scan_kwargs)
             for item in response.get('Items', []):
                 existing_emails.add(item['emailId'])
             start_key = response.get('LastEvaluatedKey', None)
             done = start_key is None
    
         return existing_emails
    
     def fetch_email_ids_from_ssm():
         source_email_param = 'sourceMail'  # Replace with your SSM parameter name for source email
         recipient_email_param = 'recipientEmail'  # Replace with your SSM parameter name for recipient email
    
         source_email = ssm.get_parameter(Name=source_email_param)['Parameter']['Value']
         recipient_email = ssm.get_parameter(Name=recipient_email_param)['Parameter']['Value']
    
         return source_email, recipient_email
    
     def send_email(subject, body, source_email, recipient_email):
         try:
             response = ses.send_email(
                 Source=source_email,
                 Destination={
                     'ToAddresses': [
                         recipient_email,
                     ],
                 },
                 Message={
                     'Subject': {
                         'Data': subject,
                     },
                     'Body': {
                         'Text': {
                             'Data': body,
                         },
                     },
                 }
             )
             print(f"Email sent! Message ID: {response['MessageId']}")
         except Exception as e:
             print(f"Error sending email: {e}")
    
     def lambda_handler(event, context):
         try:
             # Fetch email IDs from SSM Parameter Store
             source_email, recipient_email = fetch_email_ids_from_ssm()
             print(f"Source Email: {source_email}, Recipient Email: {recipient_email}")
    
             # Check if the event is from SQS
             if 'Records' in event and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:sqs':
                 # Extract the S3 event from the SQS message body
                 sqs_body = json.loads(event['Records'][0]['body'])
                 event = sqs_body  # Replace the event with the actual S3 event
    
             # Fetch existing email IDs from DynamoDB
             existing_email_ids = fetch_existing_email_ids()
             print(f"Existing email IDs in DynamoDB: {existing_email_ids}")
    
             # Iterate through each record in the event
             for record in event['Records']:
                 # Get the bucket name and key for the uploaded file
                 s3_bucket = record['s3']['bucket']['name']
                 s3_key = record['s3']['object']['key']
                 print(f"Processing file from bucket: {s3_bucket}, key: {s3_key}")
    
                 # Download the file from S3 to /tmp directory
                 local_file_path = os.path.join('/tmp', os.path.basename(s3_key))
                 s3.download_file(s3_bucket, s3_key, local_file_path)
                 print(f"Downloaded file to: {local_file_path}")
    
                 # Read the CSV file and collect email IDs
                 new_email_ids = []
                 duplicate_email_ids = []
                 with open(local_file_path, mode='r') as file:
                     csv_reader = csv.DictReader(file)
                     for row in csv_reader:
                         email = row['email']
                         if email in existing_email_ids:
                             duplicate_email_ids.append(email)
                         else:
                             new_email_ids.append(email)
                             existing_email_ids.add(email)  # Add to existing to prevent duplicates in the same batch
                 print(f"New email IDs: {new_email_ids}")
                 print(f"Duplicate email IDs: {duplicate_email_ids}")
    
                 # Store new email IDs in DynamoDB
                 with table.batch_writer() as batch:
                     for email_id in new_email_ids:
                         print(f"Writing new email ID to DynamoDB: {email_id}")
                         batch.put_item(Item={'emailId': email_id})
                 print("Finished writing new email IDs to DynamoDB")
    
                 # Prepare email content
                 new_emails_str = "\\n".join(new_email_ids)
                 duplicate_emails_str = "\\n".join(duplicate_email_ids)
                 email_body = f"New Email IDs:\\n{new_emails_str}\\n\\nDuplicate Email IDs:\\n{duplicate_emails_str}"
    
                 # Send email with new and duplicate email IDs
                 send_email(subject="CSV Processing Report", body=email_body, source_email=source_email, recipient_email=recipient_email)
                 print("Email sent with new and duplicate email IDs")
    
             return {
                 'statusCode': 200,
                 'body': f"Processed {len(new_email_ids) + len(duplicate_email_ids)} email IDs. {len(new_email_ids)} new email IDs stored in DynamoDB, {len(duplicate_email_ids)} duplicate email IDs."
             }
    
         except Exception as e:
             print(f"Error processing event: {e}")
             raise e
    
  2. Create SQS

    Write Access Policy for S3 Bucket ( V.Imp because without this cant attach S3 bucket (Cant create event Notification )

    1. Create SES

      For Email in your Account create this - Verify your account (Sender & Receiver)

    2. Create Dynamo Db and give partition key this will

      use to give on lambda logic

    3. Create SSM for Parameter store

      We dont want this hard coded thats why we store it in ssm and fetch it in our code for more privacy.

      Conclusion → Upload your CSV file and it will directly store the data in our DynamoDB Database and we get a mail which one is already exist or which one is new in database.