Csv file upload in S3 bucket and processed though lambda and sqs and data will go to dynamodb and got mail of new user and existing user in dynamodb.
→ S3 Bucket - Upload CSV file.
→ Dynamo Db - All data will come on out DynamoDB database.
→ SQS - Flow through SQS and it is connect to S3 Bucket.
→ Lambda - logic through code and it will trigger the SQS.
→ SES - Email will come in template.
→ SSM - System manager used to store the parameter.
Steps:-
Create S3 bucket
Create Lambda
Create SQS
Create SES
Create SSM
Create DynamoDB
Create S3 Bucket
After create a bucket create a event notification -
Properties → Create event notification → Destination → Choose your SQS ( First Read SQS for this) → without giving policy on SQS you will get an error.
Create Lambda
Write logic give all permission to Lambda
import boto3 import csv import os import json # Initialize AWS clients s3 = boto3.client('s3') dynamodb = boto3.resource('dynamodb') ses = boto3.client('ses') ssm = boto3.client('ssm') table_name = 'EmailsTable' # Replace with your DynamoDB table name table = dynamodb.Table(table_name) def fetch_existing_email_ids(): existing_emails = set() scan_kwargs = {} done = False start_key = None while not done: if start_key: scan_kwargs['ExclusiveStartKey'] = start_key response = table.scan(**scan_kwargs) for item in response.get('Items', []): existing_emails.add(item['emailId']) start_key = response.get('LastEvaluatedKey', None) done = start_key is None return existing_emails def fetch_email_ids_from_ssm(): source_email_param = 'sourceMail' # Replace with your SSM parameter name for source email recipient_email_param = 'recipientEmail' # Replace with your SSM parameter name for recipient email source_email = ssm.get_parameter(Name=source_email_param)['Parameter']['Value'] recipient_email = ssm.get_parameter(Name=recipient_email_param)['Parameter']['Value'] return source_email, recipient_email def send_email(subject, body, source_email, recipient_email): try: response = ses.send_email( Source=source_email, Destination={ 'ToAddresses': [ recipient_email, ], }, Message={ 'Subject': { 'Data': subject, }, 'Body': { 'Text': { 'Data': body, }, }, } ) print(f"Email sent! Message ID: {response['MessageId']}") except Exception as e: print(f"Error sending email: {e}") def lambda_handler(event, context): try: # Fetch email IDs from SSM Parameter Store source_email, recipient_email = fetch_email_ids_from_ssm() print(f"Source Email: {source_email}, Recipient Email: {recipient_email}") # Check if the event is from SQS if 'Records' in event and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:sqs': # Extract the S3 event from the SQS message body sqs_body = json.loads(event['Records'][0]['body']) event = sqs_body # Replace the event with the actual S3 event # Fetch existing email IDs from DynamoDB existing_email_ids = fetch_existing_email_ids() print(f"Existing email IDs in DynamoDB: {existing_email_ids}") # Iterate through each record in the event for record in event['Records']: # Get the bucket name and key for the uploaded file s3_bucket = record['s3']['bucket']['name'] s3_key = record['s3']['object']['key'] print(f"Processing file from bucket: {s3_bucket}, key: {s3_key}") # Download the file from S3 to /tmp directory local_file_path = os.path.join('/tmp', os.path.basename(s3_key)) s3.download_file(s3_bucket, s3_key, local_file_path) print(f"Downloaded file to: {local_file_path}") # Read the CSV file and collect email IDs new_email_ids = [] duplicate_email_ids = [] with open(local_file_path, mode='r') as file: csv_reader = csv.DictReader(file) for row in csv_reader: email = row['email'] if email in existing_email_ids: duplicate_email_ids.append(email) else: new_email_ids.append(email) existing_email_ids.add(email) # Add to existing to prevent duplicates in the same batch print(f"New email IDs: {new_email_ids}") print(f"Duplicate email IDs: {duplicate_email_ids}") # Store new email IDs in DynamoDB with table.batch_writer() as batch: for email_id in new_email_ids: print(f"Writing new email ID to DynamoDB: {email_id}") batch.put_item(Item={'emailId': email_id}) print("Finished writing new email IDs to DynamoDB") # Prepare email content new_emails_str = "\\n".join(new_email_ids) duplicate_emails_str = "\\n".join(duplicate_email_ids) email_body = f"New Email IDs:\\n{new_emails_str}\\n\\nDuplicate Email IDs:\\n{duplicate_emails_str}" # Send email with new and duplicate email IDs send_email(subject="CSV Processing Report", body=email_body, source_email=source_email, recipient_email=recipient_email) print("Email sent with new and duplicate email IDs") return { 'statusCode': 200, 'body': f"Processed {len(new_email_ids) + len(duplicate_email_ids)} email IDs. {len(new_email_ids)} new email IDs stored in DynamoDB, {len(duplicate_email_ids)} duplicate email IDs." } except Exception as e: print(f"Error processing event: {e}") raise e
Create SQS
Write Access Policy for S3 Bucket ( V.Imp because without this cant attach S3 bucket (Cant create event Notification )
Create SES
For Email in your Account create this - Verify your account (Sender & Receiver)
Create Dynamo Db and give partition key this will
use to give on lambda logic
Create SSM for Parameter store
We dont want this hard coded thats why we store it in ssm and fetch it in our code for more privacy.
Conclusion → Upload your CSV file and it will directly store the data in our DynamoDB Database and we get a mail which one is already exist or which one is new in database.