Building a Data Ingestion System Using Apache NiFi

Pratik Barjatiya
14 min readJan 12, 2023

Collecting data from SQL and NoSQL systems and building a data ingestion pipeline can be a complex process, but it can be broken down into a few basic steps:

  1. Connect to the databases: Establish a connection to the SQL and NoSQL systems from which you want to collect data. You will need the appropriate credentials and connection details (e.g., hostname, port, username, password) to do this.
  2. Extract the data: Use SQL queries to extract the data from the SQL databases, and appropriate APIs and methods to extract data from NoSQL databases.
  3. Transform the data: After the data is extracted, it will likely need to be transformed to fit the format of your data warehouse or data lake. For example, you may need to flatten nested data structures, rename columns, or perform calculations.
  4. Load the data: Load the transformed data into the target data warehouse or data lake. This step may involve loading the data into temporary staging tables first, and then using SQL or other data integration tools to move the data into the final target tables.
  5. Monitor and troubleshoot: Monitor the data pipeline to ensure that it is running smoothly and troubleshoot any issues that arise. Keep logs of all errors, and implement alerting system to notify if anything goes wrong.
  6. Schedule and automate: Schedule the data pipeline to run at regular intervals (daily, hourly etc) and automate the process. This allows the pipeline to automatically collect and process data without manual intervention.
  7. Optimize: Optimize the pipeline as necessary by monitoring performance, and making changes to improve efficiency and scalability.

It is also important to consider data governance, data quality and data lineage during the process. This would include ensuring data lineage from source to target, data quality checks, and data lineage for auditing and compliance purposes.

This is a general overview, and the specific implementation will depend on the technologies and platforms you are using, the size and complexity of your data, and the requirements of your use case.

What is CDC (Change Data Capture)

CDC, or Change Data Capture, is a technique used in data engineering to track and capture changes in data as they occur. It is typically used to keep a data warehouse or data lake in sync with the source systems that feed it. By capturing only the changes in data rather than the entire data set, CDC can reduce the amount of data that needs to be processed and improve the performance of the data pipeline.

CDC can be implemented in a variety of ways, depending on the type of data source. For relational databases, it can be done by reading the database’s binary log (binlog) or write-ahead log (WAL). For NoSQL databases, it can be done by tracking changes to individual documents or keys. Some database systems also provide built-in CDC features, such as Oracle’s GoldenGate and MySQL’s Replication.

CDC can be implemented using open-source and commercial tools like Apache NiFi, Apache Kafka, AWS DMS, AWS Kinesis Data Streams, and others. The process typically involves reading the change data from the source system and then applying it to the target system. This can be done by using database triggers, log files, or other mechanisms. The process also typically includes filtering and transforming the data to match the schema of the target system and handling errors and conflicts that may arise during the replication process.

CDC is a fundamental technique in data engineering, it allows maintaining the data in a data warehouse or data lake up-to-date with the source systems, it also allows building real-time reporting and analytics, and it’s widely used in scenarios where data is being passed between systems.

Apache NiFi

Sample CDC ingestion pipeline using Apache NiFi

Apache NiFi is a powerful tool for creating a Change Data Capture (CDC) pipeline for a relational database.

Here’s an example of how you could use NiFi to create a CDC pipeline:

  1. Install the JDBC Drivers for your relational database.
  2. Configure a JDBC Connection Pool and JDBC Controller Service for your relational database, using the appropriate connection details (e.g., hostname, port, username, password).
  3. Use the GenerateTableFetch processor to query your database and retrieve a snapshot of the current data. You can use a SQL query to select the columns you want to track and the table you want to track it from.
  4. Use the ConvertRecord processor to convert the data from the relational table format to Avro, JSON, or other formats, and the Record Write to write the data to a local file system.
  5. Use the ListenHTTPRecord to listen for any updates on the relational table, and use the ExtractText processor to extract the relevant columns.
  6. Use the ConvertRecord processor to convert the data to Avro, JSON or other formats, and the PutFile processor to write the updated data to a local file system.
  7. Use the CompareRecord processor to compare the current and previous state of the data. You can use this processor to identify the rows that have been added, modified, or deleted.
  8. Use the RouteOnAttribute processor to route the updated data to the appropriate destination, depending on whether it represents an insert, update, or delete.
  9. Finally, you can use the PutDatabaseRecord processor to write the updated data back to the relational database, or use the PutS3Object processor to write the data to Amazon S3 or other storage platform.

This is just one example of how you could create a CDC pipeline using NiFi, but you can customize it according to your specific requirements. The key to creating a robust CDC pipeline is to plan and design it with your specific use case in mind, and to test and iterate as needed.

Apache NiFi Flow

Creating a Change Data Capture (CDC) pipeline using Apache NiFi and database binary logs (binlogs) or write-ahead logs (WAL) is a more efficient way to track data changes in a relational database. Here’s an example of how you could use NiFi to create a CDC pipeline using binlogs in MySQL and WAL in Postgres:

  1. Enable binlogs or WAL in your MySQL or Postgres database respectively.
  2. Use the ListenForBinLog or ListenForWAL processor in NiFi to listen for binlog or WAL events from the database.
  3. Use the ExtractText processor to extract relevant columns from the binlog or WAL events.
  4. Use the ConvertRecord processor to convert the data to Avro, JSON, or other formats.
  5. Use the RouteOnAttribute processor to route the updated data to the appropriate destination, depending on whether it represents an insert, update, or delete.
  6. Use the PutDatabaseRecord processor to write the updated data back to the relational database, or use the PutS3Object processor to write the data to Amazon S3 or other storage platform.
  7. Use the CaptureChangeMySQL or CaptureChangePostgresSQL processor to capture the changes
  8. Monitor and troubleshoot: Monitor the data pipeline to ensure that it is running smoothly and troubleshoot any issues that arise. Keep logs of all errors, and implement alerting system to notify if anything goes wrong.
  9. Schedule and automate: Schedule the data pipeline to run at regular intervals (daily, hourly etc) and automate the process. This allows the pipeline to automatically collect and process data without manual intervention.
  10. Optimize: Optimize the pipeline as necessary by monitoring performance, and making changes to improve efficiency and scalability.

Ensure this is just an example of how you could create a CDC pipeline using NiFi and binlogs or WAL, you can customize it according to your specific requirements and the specifics of the underlying database. The key to creating a robust CDC pipeline is to plan and design it with your specific use case in mind, and to test and iterate as needed.

Here is an example of a code snippet for a CDC (Change Data Capture) pipeline using Apache NiFi for MySQL binlogs and dumping into AWS S3

# Source MySql - NiFi Binlog Streaming - Sink AWS S3 
import mysql.connector
from mysql.connector import BinLogStreamReader
import boto3

# Connect to the MySQL server
cnx = mysql.connector.connect(user='<username>', password='<password>', host='<hostname>', database='<database>')

# Create a BinLogStreamReader object
stream = BinLogStreamReader(connection=cnx, server_id=<server_id>)

# Connect to S3
s3 = boto3.client('s3', aws_access_key_id='<access_key>', aws_secret_access_key='<secret_key>')

# Iterate through the binlog events
for binlog_event in stream:
event_type = binlog_event.event_type
table = binlog_event.table
primary_key = binlog_event.primary_key
values = binlog_event.values
print("Event type: " + event_type)
print("Table: " + table)
print("Primary key: " + primary_key)
print("Values: " + str(values))
# Serialize the extracted data
data = json.dumps({
'event_type': event_type,
'table': table,
'primary_key': primary_key,
'values': values
})
# Upload the data to S3
s3.put_object(Bucket='<s3_bucket>', Key='<s3_key>', Body=data)

# Close the connection
cnx.close()
  • It is important to note that this code snippet is only for reading the binlog events and uploading the data to AWS S3, it is not complete pipeline, you will have to implement the functionality for filtering out unnecessary events, also for implementing error handling and retries for S3 uploads.
  • It is also important to note that this code snippet uses the python mysql connector library which provides a low-level API for reading binlog events and it can put a heavy load on the MySQL server, so it’s recommended to use this code snippet on a separate machine from the MySQL server, and also have a good understanding of how binlog works, the different types of events and the configuration options that the library provides.
  • It also uses boto3 python library which provides an API for interacting with AWS services such as S3 and you have to have the AWS credentials setup in your environment, also note that the above

Here is an example of a code snippet for a CDC (Change Data Capture) pipeline using Apache NiFi for MySQL binlogs and dumping into Cassandra

# Source MySql - NiFi Binlog Streaming - Sink Cassandra
import mysql.connector
from mysql.connector import BinLogStreamReader
from cassandra.cluster import Cluster

# Connect to the MySQL server
cnx = mysql.connector.connect(user='<username>', password='<password>', host='<hostname>', database='<database>')

# Create a BinLogStreamReader object
stream = BinLogStreamReader(connection=cnx, server_id=<server_id>)

# Connect to Cassandra
cluster = Cluster(['<cassandra_host>'])
session = cluster.connect()

# Iterate through the binlog events
for binlog_event in stream:
event_type = binlog_event.event_type
table = binlog_event.table
primary_key = binlog_event.primary_key
values = binlog_event.values
print("Event type: " + event_type)
print("Table: " + table)
print("Primary key: " + primary_key)
print("Values: " + str(values))
# Insert the extracted data into Cassandra
session.execute("INSERT INTO <cassandra_table> (event_type, table, primary_key, values) VALUES (%s, %s, %s, %s)", (event_type, table, primary_key, values))

# Close the connections
session.shutdown()
cluster.shutdown()
cnx.close()
  • It is important to note that this code snippet is only for reading the binlog events and inserting the data into Cassandra, it is not complete pipeline, you will have to implement the functionality for filtering out unnecessary events, also for implementing error handling and retries for Cassandra insert.
  • It is also important to note that this code snippet uses the python mysql connector library which provides a low-level API for reading binlog events and it can put a heavy load on the MySQL server, so it’s recommended to use this code snippet on a separate machine from the MySQL server, and also have a good understanding of how binlog works, the different types of events and the configuration options that the library provides.
  • It also uses the python cassandra driver library, you need to have the Cassandra cluster and keyspace setup, also it’s important to note that the above code snippet is just an example, it is not production-ready and may require additional error handling, retries, and other features to make it robust and reliable.

This is a simple example, and the exact pipeline will depend on the specific requirements and architecture of your system. It is important to note that this pipeline should be implemented on a separate instance or cluster of NiFi, specifically for the task of reading the binlogs, as it can put a heavy load on the MySQL server. Also, it’s important to have a good understanding of how binlog works, the different types of events and the configuration options that NiFi offers to make the pipeline more robust and efficient.

Here is an example of a Python code snippet for a CDC (Change Data Capture) pipeline using Apache NiFi for Postgres Write Ahead Logs and dumping the data into AWS S3

# Source Postgres - NiFi WAL Replication - Sink AWS S3
import psycopg2
from psycopg2.extensions import LogicalReplicationConnection
import boto3
import json

# Connect to the Postgres server
conn = psycopg2.connect(dbname='<dbname>', user='<username>', password='<password>', host='<hostname>', port='<port>')

# Enable logical replication
replication_connection = conn.connection.connection
replication_connection.start_replication(slot_name='<slot_name>', options={'proto_version': 2, 'create_slot': True})

# Connect to S3
s3 = boto3.client('s3', aws_access_key_id='<access_key>', aws_secret_access_key='<secret_key>')

# Iterate through the Write Ahead Logs
while True:
message = replication_connection.receive_message()
if message:
# Extract the relevant information from the WAL message
table = message.relation
primary_key = message.key
values = message.data
event_type = message.change
print("Event type: " + event_type)
print("Table: " + table)
print("Primary key: " + primary_key)
print("Values: " + str(values))
# Serialize the extracted data
data = json.dumps({
'event_type': event_type,
'table': table,
'primary_key': primary_key,
'values': values
})
# Upload the data to S3
s3.put_object(Bucket='<s3_bucket>', Key='<s3_key>', Body=data)
else:
break

# Close the connection
conn.close()
  • This is a simple example for reading the write ahead logs and inserting the data into Cassandra, it is not complete pipeline, you will have to implement the functionality for filtering out unnecessary events, also for implementing error handling and retries for Cassandra insert.
  • It is also important to note that this code snippet uses the python psycopg

Here is an example of a Python code snippet for a CDC (Change Data Capture) pipeline using Apache NiFi for Postgres Write Ahead Logs and dumping the data into Cassandra:

# Source Postgres - NiFi WAL Replication - Sink Cassandra
import psycopg2
from psycopg2.extensions import LogicalReplicationConnection
from cassandra.cluster import Cluster

# Connect to the Postgres server
conn = psycopg2.connect(dbname='<dbname>', user='<username>', password='<password>', host='<hostname>', port='<port>')

# Enable logical replication
replication_connection = conn.connection.connection
replication_connection.start_replication(slot_name='<slot_name>', options={'proto_version': 2, 'create_slot': True})

# Connect to Cassandra
cluster = Cluster(['<cassandra_host>'])
session = cluster.connect()

# Iterate through the Write Ahead Logs
while True:
message = replication_connection.receive_message()
if message:
# Extract the relevant information from the WAL message
table = message.relation
primary_key = message.key
values = message.data
event_type = message.change
print("Event type: " + event_type)
print("Table: " + table)
print("Primary key: " + primary_key)
print("Values: " + str(values))
# Insert the extracted data into Cassandra
session.execute("INSERT INTO <cassandra_table> (event_type, table, primary_key, values) VALUES (%s, %s, %s, %s)", (event_type, table, primary_key, values))
else:
break

# Close the connections
session.shutdown()
cluster.shutdown()
conn.close()

An example of a Python code snippet for a CDC (Change Data Capture) pipeline using Apache NiFi for a NoSQL (MongoDB) data source and dumping the data into Cassandra:

from datetime import datetime
from pymongo import MongoClient
from cassandra.cluster import Cluster

# Connect to MongoDB
client = MongoClient("mongodb://<username>:<password>@<hostname>:<port>/<database>")
db = client.get_database()

# Connect to Cassandra
cluster = Cluster(['<cassandra_host>'])
session = cluster.connect()

# Get the time of the last document processed
last_processed_time = datetime.utcnow()

while True:
# Query MongoDB for new documents
new_docs = db.get_collection().find({"timestamp": {"$gte": last_processed_time}})

# Iterate through new documents
for doc in new_docs:
timestamp = doc['timestamp']
values = doc['values']
print("Timestamp: " + str(timestamp))
print("Values: " + str(values))

# Insert the extracted data into Cassandra
session.execute("INSERT INTO <cassandra_table> (timestamp, values) VALUES (%s, %s)", (timestamp, values))

# Update the time of the last processed document
last_processed_time = timestamp

# Sleep for some time before querying MongoDB again
time.sleep(<polling_interval>)

# Close the connections
session.shutdown()
cluster.shutdown()
client.close()
  • This is a simple example for reading the new documents from a NoSQL data source, in this case MongoDB, and inserting the data into Cassandra, it is not complete pipeline, you will have to implement the functionality for filtering out unnecessary events, also for implementing error handling and retries for Cassandra insert.
  • It is also important to note that this code snippet uses the python pymongo library which provides an API for interacting with MongoDB and the python cassandra driver library, you need to have the Cassandra cluster and keyspace setup, also it’s important to note that the above code snippet is just an example, it is not production-ready and may require additional error handling, retries, and other features to make it robust and reliable.
  • It also uses sleep function to set a polling interval for checking for new documents, you can set a time that suits your requirements, but it’s important to keep in mind that more frequent polls may put more load on the NoSQL data source.

An example of a Python code snippet for a CDC (Change Data Capture) pipeline using Apache NiFi for a NoSQL data source (in this case MongoDB) and dumping the data into HDFS or AWS S3:

from datetime import datetime
from pymongo import MongoClient
import boto3
import json

# Connect to MongoDB
client = MongoClient("mongodb://<username>:<password>@<hostname>:<port>/<database>")
db = client.get_database()

# Connect to S3
s3 = boto3.client('s3', aws_access_key_id='<access_key>', aws_secret_access_key='<secret_key>')

# Get the time of the last document processed
last_processed_time = datetime.utcnow()

while True:
# Query MongoDB for new documents
new_docs = db.get_collection().find({"timestamp": {"$gte": last_processed_time}})

# Iterate through new documents
for doc in new_docs:
timestamp = doc['timestamp']
values = doc['values']
print("Timestamp: " + str(timestamp))
print("Values: " + str(values))
# Serialize the extracted data
data = json.dumps({
'timestamp': timestamp,
'values': values
})
# Upload the data to S3
s3.put_object(Bucket='<s3_bucket>', Key='<s3_key>', Body=data)

# Update the time of the last processed document
last_processed_time = timestamp

# Sleep for some time before querying MongoDB again
time.sleep(<polling_interval>)

# Close the connection
client.close()
  • This example uses python pymongo library to connect to MongoDB and retrieve new documents based on a timestamp field. Then, it serializes the data and use boto3 python library to connect to AWS S3 and upload the serialized data.
  • It is important to note that this code snippet is only for reading the new documents from a NoSQL data source, in this case MongoDB, and uploading the data to AWS S3, it is not complete pipeline, you will have to implement the functionality for filtering out unnecessary events, also for implementing error handling and retries for S3 uploads.
  • It also uses sleep function to set a polling interval for checking for new documents, you can set a time that suits your requirements, but it’s important to keep in mind that more frequent polls may put more load on the NoSQL data source.
  • If you want to dump the data to HDFS, you can use the Hadoop File System (HDFS) API for python, such as pywebhdfs, to write the data to HDFS. This can replace the S3 upload code. Please keep in mind that this is just an example, it is not production-ready and may require additional error handling, retries, and other features to make it robust and reliable.

Webhooks: A webhook is a way for an app to provide other applications with real-time information. Third-party webhooks are webhooks that are created by a third-party service or application, rather than by the application receiving the webhook.

Webhooks are typically used to notify an application of a change in another application or service. For example, a payment processing service may send a webhook to an e-commerce application to notify it that a payment has been processed. A third-party webhook, on the other hand, is a webhook that is created by an application or service other than the one receiving the webhook. For example, a weather service may send a webhook to an application with current weather information.

Webhooks are typically implemented using HTTP POST requests, which include a JSON payload that contains the data being sent. The receiving application must have a way to handle the incoming webhook and extract the relevant information from the payload.

Some popular use cases for third-party webhooks include social media notifications, SMS notifications, email notifications, payment notifications, and data-ingestion for analytics and reporting. Third-party webhooks allow for real-time data syncing and can be a more efficient solution than traditional polling-based approaches.

Webhooks are widely used in modern application architectures, it allows building event-driven systems, and making the data flow more efficient and real-time. Webhooks are also widely used in IoT, mobile and web applications.

An example of a Python code snippet for ingesting 3rd party webhook data and dumping it into Cassandra or AWS S3:

import json
from flask import Flask, request
import boto3
from cassandra.cluster import Cluster

app = Flask(__name__)

# Connect to Cassandra
cluster = Cluster(['<cassandra_host>'])
session = cluster.connect()

# Connect to S3
s3 = boto3.client('s3', aws_access_key_id='<access_key>', aws_secret_access_key='<secret_key>')

@app.route('/webhook', methods=['POST'])
def webhook():
# Get the JSON data from the webhook request
data = request.get_json()

# Extract relevant information
timestamp = datetime.utcnow()
values = data['values']

# Insert the extracted data into Cassandra
session.execute("INSERT INTO <cassandra_table> (timestamp, values) VALUES (%s, %s)", (timestamp, values))

# Serialize the extracted data
data = json.dumps({
'timestamp': timestamp,
'values': values
})
# Upload the data to S3
s3.put_object(Bucket='<s3_bucket>', Key='<s3_key>', Body=data)

return '', 200

if __name__ == '__main__':
app.run(port=<port>)
  • This example uses python Flask library to create a simple web server that listens for webhook requests on a specified endpoint /webhook and then it extracts the data from the request and insert it into Cassandra or upload it to AWS S3. You can use this code snippet as a starting point for your project, but it will likely require additional error handling, security, and other features to make it robust and reliable.
  • It's also important to notice that this code snippet is just an example, and it's not production ready, you might need to consider security and scalability when you're implementing this in a production environment.
  • You can also use other libraries like FastAPI, Tornado, and others to build the webhook listener. It’s also important to notice that this code snippet is just an example, it is not production-ready and may require additional error handling, retries, and other features to make it robust and reliable.

--

--

Pratik Barjatiya

Data Engineer | Big Data Analytics | Data Science Practitioner | MLE | Disciplined Investor | Fitness & Traveller