Real-time transaction processing with Redpanda, Flink, and MongoDB

Learn how to use Redpanda, Flink, and MongoDB to seamlessly handle customer data for financial transactions

By
on
August 10, 2023

Real-time transaction processing has completely changed the way businesses handle data by enabling immediate analysis and response to incoming information. Unlike traditional batch processing, which involves processing data in large volumes at fixed intervals, real-time processing allows for instantaneous data ingestion, analysis, and action.

In today's fast-paced digital landscape, businesses need to stay competitive and make data-driven decisions in real time. Industries like finance and stock market trading use real-time processing for fraud detection and other scenarios where quick decision-making is paramount.

This post delves into a powerful real-time transaction processing solution combining three cutting-edge technologies: Redpanda, Apache Flink®, and MongoDB. Here’s a quick overview to make sure we’re all on the same page:

  • Redpanda is a streaming data platform built from the ground up with a native Kafka API, and engineered to eliminate complexity, maximize performance, and reduce costs. It offers low-latency event streaming and robust reliability, making it an excellent choice for handling high-volume data streams.
  • Apache Flink is an open source stream processing framework that excels at real-time data processing.
  • MongoDB is a flexible NoSQL database that’s adept at handling high-speed data ingestion and real-time analytics.

Together, these technologies form a robust stack for efficient real-time transaction processing in various scenarios. So in this tutorial, you'll learn how to set up and implement a demo project using these technologies by building a real-time transaction processing application. To follow along, you can find the demo code in this GitHub repo.

Let’s dig in.

How to create a real-time transaction system with Redpanda, Flink, and MongoDB

To create a real-time transaction system with Redpanda, Flink, and MongoDB, it's essential to understand each technology's role in the overall architecture.

Redpanda's architecture is designed to ensure message durability, fault tolerance, and seamless scalability, making it well-suited for real-time processing scenarios. Redpanda is a faster alternative to Apache Kafka®, making it an ideal drop-in choice for a Kafka-based technology stack.

Apache Flink, on the other hand, is a powerful stream processing framework that excels at real-time data processing and analytics. It can process and analyze large-scale data streams in real time, with features like event-time processing, fault tolerance, and exactly-once processing guarantees.

Flink offers connectors to facilitate seamless integration between Apache Flink and various external systems. These connectors act as bridges, enabling Flink to consume data from and write data to external systems such as databases, message queues, storage systems, and more. Flink offers out-of-the-box connectors for various data sources and sinks, including Kafka and other external systems.

MongoDB is a popular NoSQL database known for its flexibility and scalability. It provides a document-based data model that allows for dynamic schemas. Moreover, with its distributed architecture and horizontal scalability, MongoDB can handle high-speed data ingestion and provide near-real-time analytics capabilities.

Let's delve into a specific use case from the introduction: fraud detection in financial transactions. In a real-time transaction system, incoming financial transactions need to be processed and analyzed immediately to identify potential fraudulent activities.

Redpanda, with its low-latency event streaming capabilities, acts as the source of the incoming transaction data. Flink, using its Kafka connector, consumes the transaction data from Redpanda and can ingest it into MongoDB for further analysis (like fraudulent transaction detection) or retrieval. Algorithm design for fraudulent transactions is beyond this tutorial, but it does cover how to build a real-time streaming pipeline by injecting mock financial transactions into Redpanda, consuming them through Flink, and ingesting them into a MongoDB collection.

The following is a high-level architecture diagram of this demo tutorial:

Architecture diagram
Architecture diagram

The benefits of using Redpanda, Flink, and MongoDB for real-time transaction processing are numerous. First, the combination of these three technologies provides a highly scalable and fault-tolerant architecture that can handle massive volumes of incoming transaction data. Real-time processing enables businesses to respond promptly to potential fraud, minimizing losses.

However, creating a real-time transaction system with these technologies also comes with challenges. The system needs to ensure data consistency and exactly-once processing, which requires careful design and configuration. Monitoring and managing the system's performance, scalability, and fault tolerance are crucial in order to maintain the desired level of real-time processing.

You can combine the strengths of Redpanda, Flink, and MongoDB to achieve the scalability, performance, and real-time processing capabilities needed to process and analyze transactions as they occur, allowing you to make timely decisions and deliver exceptional customer experiences.

Getting up and running with Redpanda, Flink, and MongoDB

Before you begin, make sure you have the following:

  • Docker installed on your machine, preferably Docker Desktop if you're on Windows/Mac (we're using Docker Desktop 4.15.0)
  • Exposure to Docker and Java programming
  • At least JDK 8 or above (we're using JDK 8, but it should work with the latest version as well)
  • Maven 3+ installed (we're using Maven 3.9.3)
  • An IDE of your choice for Flink application development in Java

For this tutorial, all three technologies will be run as a service locally in a Docker container using a self-defined docker-compose.yml file, as it's an easy approach to start and manage all these services on a single node. If you want to learn more about Docker Compose, check out the documentation.

Redpanda is available for self-hosted use or as a cloud service, and there are several ways to prepare a Redpanda instance. For information on installing or running Redpanda on a variety of platforms, refer to the documentation.

To start, create a project directory called realtime-processing-with-redpanda-flink-mongodb on your machine. Create a docker-compose.yml file and paste in the contents of this GitHub repository file.

Here’s an explanation of each service created using this docker-compose.yml file:

  • redpanda-0 creates a single Redpanda node that listens on port 9092 for incoming requests.
  • flink-job-manager creates a Flink job manager, which is responsible for coordinating the execution of Flink jobs. For more details, refer to the official documentation. Note that the volume mount is pointed to a flink-job directory on the host machine. This is the place where you will place the JAR file of your Flink application once it is ready.
  • flink-taskmanager creates a Flink task manager. Task managers are responsible for executing the tasks of Flink jobs.
  • mongo creates a MongoDB database. This is where the data streamed from Flink jobs will be stored.

Now, to start all these services, execute the following command from the directory where the YAML file is located:

docker-compose -f docker-compose.yml up -d

You can check if the services are up and running by executing the following command:

docker ps

If the containerized services are running, you should get the following output:

CONTAINER ID   IMAGE                                                COMMAND                  CREATED          STATUS          PORTS																NAMES
e9f4dc19ae40   apache/flink:1.17.1                                  "/docker-entrypoint.…"   14 seconds ago   Up 10 seconds   6123/tcp, 8081/tcp 												flink-taskmanager
2ba7040dbc07   mongo:6.0.6                                          "docker-entrypoint.s…"   14 seconds ago   Up 11 seconds   0.0.0.0:27017->27017/tcp											mongo
11427f2cb34d   docker.redpanda.com/redpandadata/redpanda:v23.1.11   "/entrypoint.sh redp…"   14 seconds ago   Up 10 seconds   0.0.0.0:9092->9092/tcp, 8081-8082/tcp, 0.0.0.0:9644->9644/tcp   redpanda-0
fe2271de30b2   apache/flink:1.17.1                                  "/docker-entrypoint.…"   14 seconds ago   Up 11 seconds   6123/tcp, 0.0.0.0:8081->8081/tcp									flink-job-manager

Scenario: preparing financial transaction events for a streaming pipeline

The real-time streaming pipeline that you'll be building as part of this tutorial involves the streaming of various merchants' transactions from different locations towards a specific category. The details of the account, the amount of the transaction, and the timestamp at which the transaction is carried out are incorporated into every event received for processing. You will then stream these events into a Redpanda topic, and they are later processed in real time by the Flink application to ingest them into MongoDB.

You'll use the following sample financial transactions in JSON format in the next steps:

{ "transactionId": "TXN001", "timestamp": "2023-06-28T10:15:00", "accountId": "ACC001", "amount": 250.75, "merchantId": "MERC001", "location": "New York", "category": "Shopping" }
{ "transactionId": "TXN002", "timestamp": "2023-06-28T12:30:45", "accountId": "ACC002", "amount": 1000.00, "merchantId": "MERC002", "location": "San Francisco", "category": "Travel"}
{ "transactionId": "TXN003", "timestamp": "2023-06-28T15:45:20", "accountId": "ACC001", "amount": 500.00, "merchantId": "MERC003", "location": "London", "category": "Shopping" }
{ "transactionId": "TXN004", "timestamp": "2023-06-28T18:20:10", "accountId": "ACC003", "amount": 150.50, "merchantId": "MERC004", "location": "Paris", "category": "Dining" }
{ "transactionId": "TXN005", "timestamp": "2023-06-28T20:05:30", "accountId": "ACC001", "amount": 1000.00, "merchantId": "MERC002", "location": "San Francisco", "category": "Travel"}

Save these sample JSON events in a file named events.json in the project directory realtime-processing-with-redpanda-flink-mongodb. For the purposes of this demo, you'll later push these events to a Redpanda topic using the rpk CLI utility.

Next, you'll move on to developing the Flink application in Java.

Creating and building the Flink application

Open a terminal and create a Maven project by executing the following Maven CLI command from the project directory:

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.17.1

This command creates a new Maven project based on the Flink archetype with the specified group ID, artifact ID, and Flink version. Once executed, you'll see a prompt for a set of inputs in interactive mode. Feed the inputs as shown below for each prompt:

Define value for property 'groupId': com.example.flink
Define value for property 'artifactId': redpanda-flink-mongodb
Define value for property 'version' 1.0-SNAPSHOT: : 
Define value for property 'package' com.example.flink: : 
Confirm properties configuration:
groupId: com.example.flink
artifactId: redpanda-flink-mongodb
version: 1.0-SNAPSHOT
package: com.example.flink
 Y: :

Once complete, you should see a response similar to this:

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  05:02 min

You should now see a new Java project source directory named redpanda-flink-mongodb. Open the Maven project in an IDE of your choice.

As mentioned, you need Flink connector libraries to interact with various external systems. In this demo, you'll use the Flink Kafka connector app to interact with Kafka and develop custom connector code for interacting with MongoDB. To do so, you need to add the dependencies to your project's pom.xml file. You can refer to this GitHub repository file to include the dependencies.

Open the main class file DataStreamJob.java, which is in the package com.example.flink, and replace the main method with the code below:

public static void main(String[] args) throws Exception {
  // Sets up the execution environment, which is the main entry point
  // to build Flink applications.
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  // Configure the Redpanda source
  Properties kafkaProps = new Properties();
  kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "redpanda-0:9092");
  kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");

  // Create the Redpanda source connector
  FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("financial-transactions",
        new SimpleStringSchema(), kafkaProps);

  // Add the Redpanda source to the execution environment
  DataStream<String> transactionStream = env.addSource(kafkaConsumer);

  // Configure the MongoDB sink
  // Add the MongoDB sink to the transaction stream
  transactionStream.addSink(new MongoDBSink());

  // Execute the Flink job
  env.execute("Fraud Detection App");

}

For the purposes of this demo, the required property values, like BOOTSTRAP_SERVERS_CONFIG, the consumer's GROUP_ID_CONFIG, and the topic name, are directly supplied in the source code. In the production application, you might have a property file or an environment setup to get these values. The FlinkKafkaConsumer class is part of the Flink Kafka connector, which enables your application to consume events from Redpanda.

Next, create a new class named MongoDBSink in the same com.example.flink package and paste in the following code:

public class MongoDBSink extends RichSinkFunction<String> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
    private transient MongoCollection<Document> collection;

    @Override
    public void open(Configuration parameters) {
        // Create the MongoDB client to establish connection
        MongoClientSettings settings = MongoClientSettings.builder()
                .applyToClusterSettings(builder ->
                        builder.hosts(Collections.singletonList(new ServerAddress("mongo", 27017))))
                .codecRegistry(createCodecRegistry())
                .build();

        com.mongodb.client.MongoClient mongoClient = MongoClients.create(settings);

        // Access the MongoDB database and collection
        // At this stage, if the MongoDB database and collection do not exist, they would automatically be created
        MongoDatabase database = mongoClient.getDatabase("finance");
        collection = database.getCollection("financial_transactions");
    }

    @Override
    public void invoke(String value, Context context) {
        // Consume the event from the Redpanda topic
        LOG.info("Consumed event : " + value);
        Document transactionDocument = Document.parse(value);
        LOG.info("transactionDocument is : "+ transactionDocument);
        // Optionally you can add fraud detection logic as an additional exercise task from your end here
        // ...

        // Insert into MongoDB collection
        collection.insertOne(transactionDocument);
    }

    @Override
    public void close() {
        // Clean up resources, if needed
    }

    private CodecRegistry createCodecRegistry() {
        // The method createCodecRegistry is a helper method that is used to create a CodecRegistry object for MongoDB.
        // In MongoDB, a CodecRegistry is responsible for encoding and decoding Java objects to
        // BSON (Binary JSON) format, which is the native format used by MongoDB to store and retrieve data.
        return CodecRegistries.fromRegistries(
                MongoClientSettings.getDefaultCodecRegistry(),
                CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
        );
    }
}

The above code serves as your custom Flink sink connector for MongoDB to parse and ingest the consumed events into a MongoDB collection. Again, for this demo, the database name finance and the collection name financial_transactions are directly embedded as part of this connector code.

You can then build this Flink application using the following command:

mvn clean package

Once the build is successful, you will see a JAR file called redpanda-flink-mongodb-1.0-SNAPSHOT in the target directory under the Maven project. Copy this JAR and place it in the flink-job directory of the host machine. You defined this earlier as part of the volume mount of the flink-job-manager service.

Submitting the Flink Job

Now that your Flink application JAR is ready and available, you can submit the job by executing the Flink CLI command. Open the terminal and execute the following command to connect to the Flink job manager container terminal:

docker exec -it flink-job-manager bash

Change your directory to /opt/flink/bin and execute the Flink CLI command ./flink run -c <Main class name> <Path to the Flink application JAR> to submit the job:

cd /opt/flink/bin && \
./flink run -c com.example.flink.DataStreamJob /opt/flink/jobs/redpanda-flink-mongodb-1.0-SNAPSHOT.jar

Your job will be in a running state; leave it running. You can verify the same by opening another Flink job manager container terminal and executing the following command:

cd /opt/flink/bin && \
./flink list -r

Producing Financial Transaction Events into Redpanda

Now that the Flink job is running, open a terminal and start producing events by executing this command:

docker exec -it redpanda-0 bash

This will connect the Redpanda container's shell session. For this demo, you'll produce events to the Redpanda topic financial-transactions using the CLI utility rpk. In production, you'd have a separate producer application for this purpose. From the container terminal, execute the following command:

rpk topic produce financial-transactions

This will open a producer shell session where you can copy and paste the events from the event.json file and press Enter. Your container terminal's console session should be similar to the one shown below:

redpanda@redpanda-0:/$ rpk topic produce financial-transactions

{ "transactionId": "TXN001", "timestamp": "2023-06-28T10:15:00", "accountId": "ACC001", "amount": 250.75, "merchantId": "MERC001", "location": "New York", "category": "Shopping" }
{ "transactionId": "TXN002", "timestamp": "2023-06-28T12:30:45", "accountId": "ACC002", "amount": 1000.00, "merchantId": "MERC002", "location": "San Francisco", "category": "Travel"}  
{ "transactionId": "TXN003", "timestamp": "2023-06-28T15:45:20", "accountId": "ACC001", "amount": 500.00, "merchantId": "MERC003", "location": "London", "category": "Shopping" }       
{ "transactionId": "TXN004", "timestamp": "2023-06-28T18:20:10", "accountId": "ACC003", "amount": 150.50, "merchantId": "MERC004", "location": "Paris", "category": "Dining" }
{ "transactionId": "TXN005", "timestamp": "2023-06-28T20:05:30", "accountId": "ACC001", "amount": 1000.00, "merchantId": "MERC002", "location": "San Francisco", "category": "Travel"}

Produced to partition 0 at offset 0 with timestamp 1688133908233.
Produced to partition 0 at offset 1 with timestamp 1688133908234.
Produced to partition 0 at offset 2 with timestamp 1688133908235.
Produced to partition 0 at offset 3 with timestamp 1688133908235.
Produced to partition 0 at offset 4 with timestamp 1688133909391.

At this point, the produced events will be consumed by the running Flink application, which should parse the events to ingest them into a MongoDB collection. Let's connect to MongoDB to explore the data.

Reading the ingested financial transactions from the Mongo database collection

Open a new terminal window and connect to the MongoDB container session:

docker exec -it mongo bash

Connect to the finance database to explore the data in the financial_transactions collection. To read from the MongoDB collection, first enter the MongoDB shell by executing the following command:

mongosh mongodb://localhost:27017/finance

Then, query financial_transactions using the find() function:

db.financial_transactions.find();

You should see that the produced events are now available to be read from the MongoDB collection:

[
  {
    _id: ObjectId("649db1886253715f6fecc4da"),
    transactionId: 'TXN001',
    timestamp: '2023-06-28T10:15:00',
    accountId: 'ACC001',
    amount: 250.75,
    merchantId: 'MERC001',
    location: 'New York',
    category: 'Shopping'
  },
  {
    _id: ObjectId("649db1c66253715f6fecc4db"),
    transactionId: 'TXN002',
    timestamp: '2023-06-28T12:30:45',
    accountId: 'ACC002',
    amount: 1000,
    merchantId: 'MERC002',
    location: 'San Francisco',
    category: 'Travel'
  }
…
…
]

Conclusion

Congratulations! You've successfully integrated these technologies to carry out real-time stream processing. This post explored the world of real-time transaction processing using Redpanda, Flink, and MongoDB.

You learned how these technologies can be combined to create a real-time transaction system by streaming sample financial transactions carried out by merchants from different locations into a Redpanda topic, then processing those in real time through a Flink application to ingest them into MongoDB. You also saw some of the benefits of real-time processing, such as immediate fraud identification and swift decision-making, as well as challenges, such as ensuring data consistency and low-latency processing.

Overall, the combination of Redpanda, Flink, and MongoDB empowers businesses to process transactions as they occur, gain valuable insights, and deliver exceptional customer experiences in real time. Once again, the source code for this demo tutorial is available in this GitHub repository.

To learn more, browse the Redpanda blog for tutorials and dive into the free courses at Redpanda University. To try it for yourself, take Redpanda for a test drive! If you have any questions or just want to chat with the team and fellow Redpanda users, join the Redpanda Community on Slack.

Graphic for downloading streaming data report
Save Your Spot
Real-time data streaming: What it is and how it works
Dunith Danushka
&
Jenny Medeiros
&
&
September 9, 2024
Text Link
Building a crypto data hub with Rust
HG King
&
Daniel Honig
&
&
August 20, 2024
Text Link
BigQuery to Redpanda: continuous queries for real-time data integration
Praseed Balakrishnan
&
Jobin George
&
&
August 6, 2024
Text Link