Kafka Connect Sink

FalkorDB x Kafka Connect Banner

Get Started


1️⃣ Obtaining the Connector

You can build the connector from source or download the pre-built JAR file from the releases. The GitHub repository includes a README with instructions for running the connector locally. The GitHub repository includes a README with instructions for running the connector locally.

2️⃣ Configuring the Connector

Kafka Connector Properties Overview: This document explains the properties required to configure the FalkorDB Sink Connector for Apache Kafka.

Configurations should be specified in a properties file format.

Properties Overview

Property Description
name Specifies the unique name of the connector instance, e.g., falkordb-connector. This name identifies the connector in the Kafka Connect framework.
connector.class Defines the Java class that implements the connector logic. Use com.falkordb.FalkorDBSinkConnector to write data from Kafka topics to FalkorDB.
tasks.max Sets the maximum number of tasks for the connector. A value of 1 uses a single task. Increasing this can boost throughput but requires resources.
topics Specifies the Kafka topic(s) to consume messages from. Set to falkordb-topic to read messages from this topic.
key.converter Defines the converter class for message keys. StringConverter treats keys as simple strings.
value.converter Specifies the converter for message values. StringConverter treats values as strings.
value.converter.schemas.enable Indicates whether schemas should be included with message values. Setting to false excludes schema information.
falkor.url Specifies the connection URL for FalkorDB. Example: redis://localhost:6379. Essential for connecting Kafka to FalkorDB.

The above properties configure a Kafka Sink Connector that reads messages from a specified topic and writes them into FalkorDB using string conversion for both keys and values. Adjusting these properties allows you to tailor the connector’s behavior according to your application’s requirements.

Configuration Example

name=falkordb-connector
connector.class=com.falkordb.FalkorDBSinkConnector
tasks.max=1
topics=falkordb-topic
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
falkor.url=redis://localhost:6379

Kafka Message Format

JSON Structure Overview

The message is an array containing multiple objects, each representing a command to be executed on the graph database. Below is a breakdown of the key components of each message object.

Example:

[
  {
    "graphName": "falkordb",
    "command": "GRAPH_QUERY",
    "cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
    "parameters": {
      "location_param": "Location 0",
      "age_param": 20,
      "name_param": "Person 0"
    }
  },
  {
    "graphName": "falkordb",
    "command": "GRAPH_QUERY",
    "cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
    "parameters": {
      "location_param": "Location 1",
      "age_param": 21,
      "name_param": "Person 1"
    }
  }
]

Key Components

The table below explains essential properties for executing commands in FalkorDB through Kafka messages.

Property Description Example Explainer
graphName Specifies the name of the graph database where the command will be executed. "falkordb". Kafka messages can update multiple graphs.  
command Indicates the type of operation being performed. "GRAPH_QUERY" means a query will be executed against the graph database. "GRAPH_QUERY"  
cypherCommand Contains the actual Cypher query to be executed. Cypher is a query language for graph databases. cypher CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p Creates a Person node with name, age, and location properties.
parameters Holds key-value pairs for placeholders in the cypherCommand. json {"name_param": "Person 0", "age_param": 20, "location_param": "Location 0"} Used to define properties for the new node.
Frequently Asked Questions 5
How do I get the FalkorDB Kafka Connect sink connector?

You can either build it from source or download the pre-built uber JAR from the GitHub releases.

What message format does the connector expect?

Messages must be JSON arrays where each object contains graphName, command (e.g., GRAPH_QUERY), cypherCommand (the Cypher query), and parameters (key-value pairs for query parameters).

Can the connector write to multiple graphs from a single topic?

Yes, each message object includes a graphName field, so messages within the same Kafka topic can target different FalkorDB graphs.

What converters should I use for key and value serialization?

Use org.apache.kafka.connect.storage.StringConverter for both key.converter and value.converter. Set value.converter.schemas.enable=false to exclude schema information from message values.

How do I scale the connector for higher throughput?

Increase the tasks.max property to run multiple parallel tasks. Each task will consume from the topic partition(s) independently. Ensure your FalkorDB instance can handle the additional concurrent connections.