import sys
import boto3
from datetime import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit, current_timestamp
# Get job parameters
args = getResolvedOptions(sys.argv, [
"JOB_NAME", "driver", "url", "user", "password",
"bucket_name", "staging_table", "merge_query"
])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Parameters from job configuration
driver = args["driver"]
url = args["url"]
user = args["user"]
password = args["password"]
bucket_name = args["bucket_name"]
staging_table = args["staging_table"]
merge_query = args["merge_query"]
# Optional parameters
prefix = args.get("prefix", "") # S3 folder prefix
suffix = args.get("suffix", ".csv") # File extension filter
# Step 1: Get all files from S3 bucket
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
all_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith(suffix)]
# Step 2: Read already processed files from Glue catalog
try:
processed_df = glueContext.create_dynamic_frame.from_catalog(
database="processed_files_db",
table_name="processed_files_table",
transformation_ctx="load_processed"
).toDF()
processed_files = [row['file_path'] for row in processed_df.collect()]
except Exception as e:
print(f"No processed files found or error reading catalog: {e}")
processed_files = []
# Step 3: Determine files to process
to_process = list(set(all_files) - set(processed_files))
print(f"Found {len(to_process)} files to process")
# Step 4: Process each file
for file_path in to_process:
try:
print(f"Processing file: {file_path}")
s3_path = f"s3://{bucket_name}/{file_path}"
# Read CSV file from S3
df = spark.read \
.option("header", "false") \
.option("inferSchema", "true") \
.csv(s3_path)
# Define column names based on your DMS output format
# Adjust these column names according to your actual DMS configuration
column_names = [
"operation", # I, U, D for Insert, Update, Delete
"id",
"name",
"email",
"updated_at",
# Add more columns as needed
]
df = df.toDF(*column_names)
# Add metadata columns
df = df.withColumn("batch_id", lit(file_path)) \
.withColumn("load_timestamp", current_timestamp())
# Step 5: Write to Firebolt staging table
df.write \
.format("jdbc") \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.option("dbtable", staging_table) \
.option("batchsize", 10000) \ # Adjust batch size as needed
.mode("append") \
.save()
# Step 6: Execute merge query via JDBC
print(f"Executing merge query for batch: {file_path}")
jvm = spark._sc._jvm
jvm.java.lang.Class.forName(driver)
conn = jvm.java.sql.DriverManager.getConnection(url, user, password)
stmt = conn.createStatement()
# Replace placeholder in merge query with actual batch_id
batch_merge_query = merge_query.replace("${batch_id}", file_path) #if using batch_id in merge query
stmt.execute(batch_merge_query)
stmt.close()
conn.close()
# Step 7: Record processed file
schema = StructType([
StructField("file_path", StringType(), True),
StructField("processed_at", TimestampType(), True)
])
now = datetime.utcnow()
row = [(file_path, now)]
processed_file_df = spark.createDataFrame(row, schema=schema)
processed_file_dyf = DynamicFrame.fromDF(processed_file_df, glueContext, "new_files")
# Write to processed files table
glueContext.write_dynamic_frame.from_options(
frame=processed_file_dyf,
connection_type="s3",
connection_options={"path": "s3://glue-table-catalog-bucket/processed_files_table/"},
format="parquet"
)
print(f"Successfully processed file: {file_path}")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
# Optionally, continue with next file or fail the job
continue
job.commit()