Kafka Connect Sink
Get Started
1️⃣ Obtaining the Connector
You can build the connector from source or download the pre-built JAR file from the releases. The GitHub repository includes a README with instructions for running the connector locally. The GitHub repository includes a README with instructions for running the connector locally.
2️⃣ Configuring the Connector
Kafka Connector Properties Overview: This document explains the properties required to configure the FalkorDB Sink Connector for Apache Kafka.
Configurations should be specified in a properties file format.
Properties Overview
| Property | Description |
|---|---|
name |
Specifies the unique name of the connector instance, e.g., falkordb-connector. This name identifies the connector in the Kafka Connect framework. |
connector.class |
Defines the Java class that implements the connector logic. Use com.falkordb.FalkorDBSinkConnector to write data from Kafka topics to FalkorDB. |
tasks.max |
Sets the maximum number of tasks for the connector. A value of 1 uses a single task. Increasing this can boost throughput but requires resources. |
topics |
Specifies the Kafka topic(s) to consume messages from. Set to falkordb-topic to read messages from this topic. |
key.converter |
Defines the converter class for message keys. StringConverter treats keys as simple strings. |
value.converter |
Specifies the converter for message values. StringConverter treats values as strings. |
value.converter.schemas.enable |
Indicates whether schemas should be included with message values. Setting to false excludes schema information. |
falkor.url |
Specifies the connection URL for FalkorDB. Example: redis://localhost:6379. Essential for connecting Kafka to FalkorDB. |
The above properties configure a Kafka Sink Connector that reads messages from a specified topic and writes them into FalkorDB using string conversion for both keys and values. Adjusting these properties allows you to tailor the connector’s behavior according to your application’s requirements.
Configuration Example
name=falkordb-connector
connector.class=com.falkordb.FalkorDBSinkConnector
tasks.max=1
topics=falkordb-topic
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
falkor.url=redis://localhost:6379
Kafka Message Format
JSON Structure Overview
The message is an array containing multiple objects, each representing a command to be executed on the graph database. Below is a breakdown of the key components of each message object.
Example:
[
{
"graphName": "falkordb",
"command": "GRAPH_QUERY",
"cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
"parameters": {
"location_param": "Location 0",
"age_param": 20,
"name_param": "Person 0"
}
},
{
"graphName": "falkordb",
"command": "GRAPH_QUERY",
"cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
"parameters": {
"location_param": "Location 1",
"age_param": 21,
"name_param": "Person 1"
}
}
]
Key Components
The table below explains essential properties for executing commands in FalkorDB through Kafka messages.
| Property | Description | Example | Explainer |
|---|---|---|---|
graphName |
Specifies the name of the graph database where the command will be executed. | "falkordb". Kafka messages can update multiple graphs. |
|
command |
Indicates the type of operation being performed. "GRAPH_QUERY" means a query will be executed against the graph database. |
"GRAPH_QUERY" |
|
cypherCommand |
Contains the actual Cypher query to be executed. Cypher is a query language for graph databases. | cypher CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p |
Creates a Person node with name, age, and location properties. |
parameters |
Holds key-value pairs for placeholders in the cypherCommand. |
json {"name_param": "Person 0", "age_param": 20, "location_param": "Location 0"} |
Used to define properties for the new node. |
Frequently Asked Questions 5
How do I get the FalkorDB Kafka Connect sink connector?
You can either build it from source or download the pre-built uber JAR from the GitHub releases.
What message format does the connector expect?
Messages must be JSON arrays where each object contains graphName, command (e.g., GRAPH_QUERY), cypherCommand (the Cypher query), and parameters (key-value pairs for query parameters).
Can the connector write to multiple graphs from a single topic?
Yes, each message object includes a graphName field, so messages within the same Kafka topic can target different FalkorDB graphs.
What converters should I use for key and value serialization?
Use org.apache.kafka.connect.storage.StringConverter for both key.converter and value.converter. Set value.converter.schemas.enable=false to exclude schema information from message values.
How do I scale the connector for higher throughput?
Increase the tasks.max property to run multiple parallel tasks. Each task will consume from the topic partition(s) independently. Ensure your FalkorDB instance can handle the additional concurrent connections.