Building an End-to-End Data Pipeline on Azure Using ADF, Databricks, Synapse, and Power BI
In this blog, we’ll walk through creating a seamless, end-to-end data pipeline using Azure Data Factory (ADF), Azure Databricks, Azure Synapse Analytics, and Power BI. By integrating these tools, we can automate data ingestion, transformation, storage, and visualization.
S2 Data Systems crafts global Data, Cloud, and AI solutions, providing insights, business growth, architectural assessment, strategy development, and secure, scalable systems for diverse industries.
Dedicated to innovation, we tailor dynamic platforms for diverse industries, overcoming challenges and delivering cutting-edge solutions for an enhanced digital corporate experience
This blog includes step-by-step instructions with images to help you follow along.
Step 1: Data Ingestion from PostgreSQL to Azure Blob Storage
1. Creating the Azure Data Factory Pipeline:
- Start by setting up a pipeline in Azure Data Factory (ADF).
- Lookup Activity: This activity fetches the table names from the PostgreSQL database. You can preview the schema and table names during this process.
SELECT table_schema as SchemaName, table_name as TableName from information_schema.tables t where table_schema = 'emp_database'
- For Each Loop: Connect the Lookup Activity to a For Each loop to iterate over the table names fetched.
@activity('Lookup1').output.value
- Copy Data Activity: Inside the For Each loop, use the Copy Data activity to move data from PostgreSQL to Azure Blob Storage (Bronze container).
@{concat('Select * from ', item().schemaname, '.', item().tablename)}
- Folder Structure in Bronze Container: The data should be stored in folders where each schema has its own folder, and inside each schema folder, there are individual folders for each table. Each table’s data will be saved in `.parquet` format.
Step 2: Data Transformation in Databricks (Bronze to Silver)
1. Setting Up Azure Databricks:
- Create a new notebook in Azure Databricks. This notebook will perform the first set of transformations.
- Transformation Logic:
- Read the data from the Bronze container.
- Convert ‘created_at’ and ‘updated_at’ columns from timestamp to date format.
- Store the transformed data in Delta format in the Silver container.
table_name = []
for i in dbutils.fs.ls('/mnt/bronze/emp_database/'):
table_name.append(i.name.split('/')[0])
from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType
for i in table_name:
path = '/mnt/bronze/emp_database/' + i + '/' + i + '.parquet'
df = spark.read.format('parquet').load(path)
column = df.columns
for col in column:
if "_at" in col:
df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()), "UTC"), "yyyy-MM-dd"))
output_path = '/mnt/silver/emp_database/' + i + '/'
df.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(output_path)
Step 3: Further Transformation (Silver to Gold)
1. Second Transformation in Databricks:
- In another Databricks notebook, read the data from the Silver container.
- Apply further transformations, such as updating column names by capitalizing the first letter and making the rest lowercase.
- Store the final transformed data in the Gold container.
table_name = []
for i in dbutils.fs.ls('/mnt/silver/emp_database/'):
table_name.append(i.name.split('/')[0])
for name in table_name:
path = '/mnt/silver/emp_database/' + name
print(path)
df = spark.read.format('delta').load(path)
column_names = df.columns
for old_col_name in column_names:
# Replace spaces with underscores and capitalize words
new_col_name = '_'.join([word.capitalize() for word in old_col_name.split('_')])
df = df.withColumnRenamed(old_col_name, new_col_name)
output_path = '/mnt/gold/emp_database/' + name + '/'
df.write.format('delta').mode("overwrite").option("mergeSchema", "true").save(output_path)
Step 4: Orchestrating the Process with Azure Data Factory
1. Automating the Process:
- Use Azure Key Vault to securely store the key required for accessing Databricks.
- In the ADF pipeline, connect the notebooks where you performed the transformations:
1. First, run the notebook that transforms data from Bronze to Silver.
2. Next, run the notebook that transforms data from Silver to Gold.
- This ensures that the data flows through the entire pipeline automatically.
Step 5: Reading Gold Data in Azure Synapse
1. Creating Views in Synapse:
- In Azure Synapse Analytics, create a new pipeline to read data from the Gold container.
- Use metadata to loop over the tables stored in Gold using a For Each Loop.
@activity('Get TableNames').output.childItems
Inside the loop, call a Stored Procedure that dynamically creates views for each table and stores them in Azure SQL Database.
USE gold_db
GO
CREATE OR ALTER PROC CreateSQLServerlessView_gold @ViewName NVARCHAR(100)
AS
BEGIN
DECLARE @statement VARCHAR(MAX)
SET @statement = N'CREATE OR ALTER VIEW ' + QUOTENAME(@ViewName) + ' AS
SELECT * FROM
OPENROWSET(
BULK ''https://raw123.dfs.core.windows.net/golden/emp_database/' + @ViewName + '/'' ,
FORMAT = ''DELTA''
) AS [result]'
EXEC (@statement)
END
GO
Step 6: Visualizing Data in Power BI
1. Setting Up Power BI:
- Connect Power BI to the Azure SQL Database where the views are stored.
- Build interactive dashboards and reports to get real-time insights from the transformed data.
- Power BI can be configured to refresh automatically, providing up-to-date information.
Step 7: Automating the Entire Process
1. Scheduling in Azure Data Factory:
- Finally, set up a Trigger in ADF to run the entire pipeline on a scheduled basis (e.g., daily). This ensures that the data ingestion, transformation, and visualization processes run automatically without manual intervention.
Conclusion:
This end-to-end data pipeline on Azure integrates powerful services like Azure Data Factory, Databricks, Synapse, and Power BI to create a scalable and automated solution. By following the steps outlined, you can manage data ingestion, transformation, and visualization, all while ensuring a seamless flow from raw data to actionable insights.
Contributors: