Created an ETL Job with AWS Glue sourcing data from external AWS account
The following is an AWS Glue project I received to perform an ETL Job retrieving data from an external AWS account.
A cross-account role had to be created from the external account to establish trust permissions for AWS Glue to crawl data from the source S3 bucket holding exported database files and Eventbridge to trigger AWS Glue whenever an SNS Topic is alerts when a new database file is received in the source S3 bucket.
The ETL job was also triggered from a python lambda script provided by a database engineer to transform the schema datapoints into a target S3 bucket for later review by other resources for analytics. AWS Glue Data Catalog was also created as a data repository during the process.
CDK Python IAC for AWS Glue, Event Bridge with Lambda Rule, S3 Cross Account Access
code
#################### AWS Glue: ETL from External AWS Account Source S3 Bucket to Destination S3 Bucket ############################################ # Ensure that your IAM roles and trust relationships between accounts are properly configured to allow the Glue job in your AWS account to access resources in the external AWS account. # Replace placeholders like 'your-database-name', 'your-glue-role-arn', 'your-etl-job-name', 's3://your-script-bucket/your-etl-script.py', and 's3://your-temp-bucket/glue-temp/' with your actual resource names and # ARNs. After defining and deploying this CDK stack, it will automate the extraction of data from the external AWS account's DB database and load it into your AWS Glue ETL job. import base64 from aws_cdk import core from aws_cdk import aws_glue as glue from aws_cdk import aws_s3 as s3 from aws_cdk import aws_events as events from aws_cdk import aws_events_targets as targets from aws_cdk import aws_sns as sns from aws_cdk import aws_iam # Import the aws_iam module class GlueETLPipelineStack(core.Stack): def __init__(self, scope: core.Construct, id: str, **kwargs) -> None: super().__init__(scope, id, **kwargs) # Constants source_bucket_name = 'your-source-bucket-name' target_bucket_name = 'your-target-bucket-name' external_account_id = 'your-external-account-id' external_account_s3_bucket = 's3://your-external-account-data-bucket/' # S3 bucket for your source data source_bucket = s3.Bucket( self, 'SourceBucket', removal_policy=core.RemovalPolicy.RETAIN # DESTROY For demo purposes; use retention policy as needed ) # Create an S3 bucket for your transformed data (target) target_bucket = s3.Bucket( self, 'TargetBucket', removal_policy=core.RemovalPolicy.RETAIN # DESTROY For demo purposes; use retention policy as needed ) ### the external AWS account's DB data location using the 's3_target' key in the CfnCrawler definition. Replace 's3://your-external-account-data-bucket/' with the actual S3 bucket path in the external AWS account # ### where the DB data is stored. # Create AWS Glue Crawler crawler = glue.CfnCrawler( self, 'GlueCrawler', database_name='your-database-name', role='your-glue-role-arn', # Replace with your Glue role ARN targets={ 's3Targets': [{ 'path': source_bucket.bucket_name, # Source S3 bucket path holding DB data }], 's3_target': 's3://your-external-account-data-bucket/', # Replace with the external AWS account's S3 bucket holding DB data }, ) ### specify the external AWS account's ID in the catalog_id field when creating the CfnDatabase. Replace 'your-external-account-id' with the external AWS account's ID. # Create AWS Glue Data Catalog Database database = glue.CfnDatabase( self, 'GlueDatabase', catalog_id='your-external-account-id', # Replace with the external AWS account's ID database_name='your-database-name', ) # Create an AWS Glue ETL Job from ETL Script file etl_job = glue.CfnJob( self, 'GlueETLJob', name='your-etl-job-name', role='your-glue-role-arn', # Replace with your Glue role ARN command={ 'name': 'glueetl', 'pythonVersion': '3', 'scriptLocation': 's3://your-script-bucket/your-etl-script.py', # S3 location of your ETL script }, default_arguments={ '--job-language': 'python', '--job-bookmark-option': 'job-bookmark-enable', '--TempDir': 's3://your-temp-bucket/glue-temp/', '--job_log_group_name': '/aws/glue/jobs', # Replace with your CloudWatch Logs group }, glue_version='2.0', max_capacity=2.0, max_retries=0, non_overridable_args={}, number_of_workers=2, security_configuration='', timeout=2880, ) # Create an SNS topic in the external AWS account sns_topic = sns.Topic( self, 'ExternalSNSTopic', display_name='External SNSTopic', topic_name='external-sns-topic-name' ) # Add the external account as an authorized AWS Glue principal to publish to the SNS topic sns_topic.add_to_resource_policy( statement=aws_iam.PolicyStatement( actions=["SNS:Publish"], effect=aws_iam.Effect.ALLOW, principals=[aws_iam.ArnPrincipal(f'arn:aws:iam::{external_account_id}:root')], resources=[sns_topic.topic_arn], ) ) ### EventBridge rule triggers AWS Glue ETL job when new DB data files are added to the source S3 bucket c/o SNS Topic alert. # Create an EventBridge rule to trigger Glue ETL job on S3 object creation rule = events.Rule( self, 'S3ObjectCreatedRule', event_pattern={ "source": ["aws.s3"], "detail": { "eventName": ["PutObject", "CopyObject"], # Trigger on new object creation or copying "eventSource": ["s3.amazonaws.com"], "requestParameters": { "bucketName": [source_bucket.bucket_name], "key": [{"prefix": ""}] # Monitor all keys (objects) in the bucket } } } ) # Create an EventBridge rule to trigger Glue ETL job when SNS message is published rule = events.Rule( self, 'SNSTopicRule', event_source=sns_topic, targets=[targets.LambdaFunction(etl_job.lambda_function)] ) app = core.App() GlueETLPipelineStack(app, "GlueETLPipelineStack") app.synth()
ETL Job Python Script
code
#!/usr/bin/env python3 ### ETL script using python for AWS Glue from source database .csv export file received from external AWS account into source S3 bucket. ### Create AWS Glue ETL job using this script within the AWS Glue console. specify the source S3 bucket as the source location. ### Execute the Glue ETL job to run the ETL process and load the data into your source S3 bucket. import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ### Replace 'your-database-name' and 'your-table-name' with the actual database and table names in your Glue Data Catalog. ### Replace 's3://your-external-account-bucket/path/to/exported/file.csv' with the actual S3 path to the .csv export file received from the external AWS account. ### Customize the transformation steps within the script as needed to prepare the data for loading into your source S3 bucket. You can use PySpark operations to perform various transformations. ### you can easily manage and customize constants at the top of the script. ### Simply replace the placeholder values with your actual database name, table name, S3 bucket paths, and any other relevant configuration parameters. # Constants database_name = 'your-database-name' table_name = 'your-table-name' external_account_s3_bucket = 's3://your-external-account-bucket/path/to/exported/file.csv' source_bucket_name = 's3://your-source-bucket/' # Source S3 bucket where transformed data will be stored # Get the GlueContext and SparkContext args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) # Initialize the Glue job job = Job(glueContext) job.init(args['JOB_NAME'], args) # Read the .csv file from the external AWS account's S3 bucket into a DynamicFrame dynamic_frame = glueContext.create_dynamic_frame.from_catalog( database=database_name, table_name=table_name, transformation_ctx='read_csv_from_source' ) ### Perform transformations as needed. For example, you can rename columns, filter data, ### or apply any other transformations here dynamic_frame = dynamic_frame.rename_field('old_column_name', 'new_column_name') # Write the transformed data to the source S3 bucket glueContext.write_dynamic_frame.from_options( frame=dynamic_frame, connection_type='s3', connection_options={'path': source_bucket_name}, format='csv', format_options={'writeHeader': True}, transformation_ctx='write_to_s3' ) # Commit the job job.commit() ### This script assumes that the necessary GlueContext, SparkContext, and Job objects are initialized as part of the Glue ETL job environment. ### Additionally, you may need to adjust any security configurations or IAM roles to ensure that the Glue job has the necessary permissions ### to read from the external S3 bucket and write to the source S3 bucket.
Ralph Quick Cloud Engineer
Ralph Quick is a professional Cloud Engineer specializing in the management, maintenance, and deployment of web service applications and infrastructure for operations. His experience ensures services are running efficiently and securely meeting the needs of your organization or clients.