The Kafka cloud—options and best practices

Kafka connectors

Apache Kafka® has a pluggable architecture that makes it easy to build complex pipelines with no code using Kafka Connect. The term “Kafka connectors” refers to various Kafka Connect plugins you can use to move large data sets in and out of Kafka. You can simply plug existing connectors into different databases or file systems and customize them to suit specific requirements. This simplifies the process of streaming data from various data sources into Kafka and from Kafka to a wide range of targets.

This chapter explores Kafka connector concepts, configurations, and use cases.

Summary of key Kafka connector concepts

ConceptDescription
Kafka ConnectA framework that specializes in integrating other data systems with Kafka.
Kafka connectorsKafka Connect plugins that interface between external systems and the Kafka Connect runtime. They encapsulate all logic specific to the external system. They allow the runtime to remain generic and avoid implementing technical details related to individual external system integrations.
Source connectorsPlugins that ingest data from external systems into Kafka.
Sink connectorsPlugins that write data from the Kafka topic to an external system.
TaskA connector instance that performs data transfer. Each connector has multiple tasks, and each task handles a data subset for transfer.
WorkerThe server that runs the connectors.
ConverterComponents that serialize and deserialize messages between connectors and Kafka.
TransformerComponents that transform messages one at a time as they move through Kafka Connect. They perform lightweight transformations on messages that would otherwise require additional stream processing applications.
PredicatesPredicates allow you to use transformations based on certain conditions.

Overview of Kafka connectors

At its core, Kafka Connect is a runtime that runs and manages data pipelines involving Kafka. Kafka connectors are the plugins you use in Kafka Connect to establish a connection to an external data source. You can customize the connectors to your specific needs.

There are two types of connectors:

  1. Source connectors are plugins that ingest data from external systems like MySQL into Kafka. They convert data into the required format and write it to a specified Kafka topic.
  2. Sink connectors are plugins that write data from the Kafka topic to an external system, such as Amazon Simple Storage Service (S3) or Elasticsearch, in the required format.
Example of a Kafka Connect with both a source and a sink connector

Kafka Connect provides a framework and a code execution runtime to implement and operate source and sink connectors.

Tasks

A task is a connector instance that performs the data transfer. Each task in a connector handles a data subset, and multiple tasks can run in parallel.

Tasks do not retain their state internally. Instead, the state of each task is maintained within dedicated topics in Kafka.

  • config.storage.topic’ stores the connectors and task configurations
  • status.storage.topic’ stores the connector status and tasks.

The corresponding connector manages both topics. You can initiate, halt, or restart tasks as needed, ensuring a robust and scalable data pipeline architecture.

Workers

Workers are the processes on which Kafka connectors run. You can configure Kafka Connect as a standalone or distributed system. In a standalone system, there is only one worker, while in a distributed system, there are multiple workers. The workers evenly distribute all connectors and their tasks among themselves, ensuring each worker handles a comparable workload.

Worker rebalancing also occurs when connectors adjust the number of tasks they require or when you update configurations. If one worker fails unexpectedly, another worker takes over the task of the failed worker.

A Kafka Connect cluster composed of three workers handling two pipelines.

Converters

Connectors work with converter components to ensure compatibility between Kafka's internal data format and external systems. Messages must be transformed into byte arrays before ingestion to Kafka and re-transformed for the target data system. For source connectors, converters serialize the Kafka message before writing to a cluster.  For sink connectors, converters deserialize the byte stream from Kafka before passing the message to the connector.

Converters and connectors in the Kafka pipeline

Kafka Connect provides default converters like JSON. You can also create custom converters to accommodate diverse data formats.

Transformers

Transformers, or Single Message Transformations (SMTs), are optional components that transform messages as they move through Kafka Connect. Different connectors running in the same Kafka Connect cluster can have different transformations. You can use transformations to change data types, add field names, or modify topic names. It is also possible to chain multiple transformations in a specific order to perform several modifications.

For source connectors, transformations are invoked after the connector and before the converter. For sink connectors, transformations are invoked after the converter and before the connector.

Kafka Connect pipeline with one or more transformations

Predicates

Predicates allow you to use transformations based on certain conditions. They allow you to define rules determining whether a message should proceed through the pipeline or be filtered out. Some transformations, such as filters, are always intended to be used with a predicate.

Below is an example of a JDBC Source connector that copies data from the database to the Kafka cluster. Here the transformers:

  • InsertField adds a field called topic with a static value jdbc-mytopic to each record.
  • MaskField masks the password field in each record.
  • Predicate filterInActive filters out records from topics matching the pattern “.*Inactive.*”.
{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://localhost:3306/mydatabase",
    "connection.user": "myuser",
    "connection.password": "mypassword",
    "topic.prefix": "jdbc-",
    "poll.interval.ms": 1000,
    "tasks.max": 1,
    "transforms": "InsertField,MaskField",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.static.field": "topic",
    "transforms.InsertField.static.value": "jdbc-mytopic",
    "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.MaskField.fields": "password",
    "predicates": "filterInActive",
    "predicates.filterInActive.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.filterInActive.pattern": ".*Inactive.*"
  }
}

Configuring connectors

You need to define configurations to run connectors. Some configuration options are set in the worker properties you provide when you start Kafka Connect and apply by default to all connectors; others are specified in the individual connector configurations.

For example, to use the FileStreamSinkConnector to stream the contents of a topic called topic-to-export to a file called /tmp/sink.out, you create a file named sink-config.json that contains the desired configuration for the connector.

{
  "name": "file-sink",
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "tasks.max": "1",
  "topics": "topic-to-export",
  "file": "/tmp/sink.out",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter"
}

If you want to add transformations, you also have to declare them in the connector configurations The connector then applies the transformations to the messages it handles.

Once you have updated the configurations, start Kafka Connect by navigating to the directory containing your Kafka distribution and running the following command.

./bin/connect-distributed.sh ./config/connect-distributed.properties

Kafka connector use cases

Capturing data changes

You can use connectors to capture data modifications in databases such as MySQL, PostgreSQL, or MongoDB in scenarios where maintaining up-to-date copies is essential for analytics. Connectors configured for specific databases ensure efficient and reliable change replication. You can also synchronize changes across systems and facilitate near-real-time data processing and analysis.

Building data lakes

Connectors can transfer data into specialized data lakes or archive it to cost-effective storage solutions. This is useful when dealing with large data volumes or when you want to retain data over extended periods for compliance or auditing. In machine learning/AI, you can avoid synthetic data generation and make actual event streams accessible for model training.

Aggregating logs

Consolidating data such as logs, metrics, and events from all applications simplifies management. Without Kafka connectors, integrating observability data across multiple systems would require significant effort in development and deployment. It would necessitate updates across various points when modifying data structures or destinations. Kafka connectors reduce the complexity of data collection. You can leverage many pre-built connectors without custom coding.

Mirroring clusters with MirrorMaker and Kafka connectors

The action of copying data between two Kafka clusters is called mirroring. Kafka MirrorMaker 2 is a tool that enables data replication between Kafka clusters. It utilizes the Kafka Connect framework for efficient data mirroring.

MirrorSourceConnector

The MirrorSourceConnector in MirrorMaker 2 replicates topic data across Kafka clusters while preserving each record's original partition assignment. It mirrors topic configurations, including partition counts and ACLs, and periodically synchronizes them. Additionally, the connector establishes an offset-syncs topic that maps offsets between source and target clusters.

MirrorCheckpointConnector

The MirrorCheckpointConnector replicates consumer group offsets from a source Kafka cluster to a target cluster. It identifies consumer groups on the source cluster, retrieves their committed offsets, and uses the offset-syncs topic to accurately translate offsets for the target cluster. It stores the translated offsets as checkpoints in a compacted topic with a single partition on the target cluster.

MirrorHeartBeatConnector

The MirrorHeartbeatConnector regularly generates compact heartbeat records. These records contain source and target cluster identifiers as keys and timestamps as values. They are stored in a dedicated heartbeats topic with a single partition. The presence of the heartbeats records serves as a reliable indicator of the mirroring process's operational status, even in the absence of other data traffic.

Best practices when using connectors

Deploy Kafka Connect in distributed mode for scalability and fault tolerance. This allows you to scale out by adding more workers as needed.

Incorporate a schema registry in Kafka Connect for schema management across systems. This will maintain consistency and reduce the risk of data corruption or schema conflicts.

Set up monitoring and alerting for Kafka Connect clusters and connectors. To detect issues early, you can monitor metrics such as throughput, latency, error rates, and connector status. You can also implement a dead letter queue (DLQ) for messages that fail processing.

Conclusion

Kafka and its connectors are one among several options for building data pipelines. Kafka is a decade-old solution that comes with several complexities in configuration and management. As your data grows, configurations become complex, requiring ongoing maintenance and fine-tuning. For example, if a task fails, you must manually restart tasks that fail by using the Kafka Connect REST API.

Redpanda Connect is an alternative solution for building data pipelines with Redpanda, a modern data ingestion platform built from the ground up that keeps developers in mind. It provides several advanced features that utilize cloud infrastructure and cutting-edge technologies to drive cost efficiencies. Redpanda Connect features 200+ certified and enterprise-ready pre-built connectors. You can deploy it as a static Go binary without any external dependencies. It is available in both enterprise and community editions.

Chapters