Kafka Sink Connector
Kafka Sink Connector integrates Kafka with GridGain 9, exporting topic data into GridGain tables.
Guarantees
GridGain Kafka sink provides the following guarantees:
-
At least once delivery.
-
Automatic failover and reconnection to GridGain cluster.
-
Message order within the same GridGain partition is preserved. Message order across different partitions is not defined.
Kafka Sink Configuration
Installation
Install the Kafka Sink connector (packaged together with the Kafka Source connector) following the recommended procedure.
Configuration
GridGain Kafka sink connector uses the standard Kafka configuration file to define configuration properties.
Below is the example of the configuration file:
name=gridgain-kafka-connect-sink
topics=topic1,topic2,topic3
connector.class=org.gridgain.kafka.sink.GridGainSinkConnector
tasks.max=1
ignite.addresses=host1:10800,host2:10800
ignite.operationTimeout=30000
ignite.table.name.regex=^events$
ignite.table.name.regex.replacement=Events
ignite.table.name.regex.1=.*
ignite.table.name.regex.replacement.1=UserEvents
Parameter | Description | Default value |
---|---|---|
name |
The name of GridGain kafka connector. |
|
topics |
The list of topics that will be copied to GridGain. |
|
connector.class |
The class name of the GridGain Sink connector. |
|
tasks.max |
The maximum number of sink tasks running in parallel. |
|
retry.backoff |
The period of time before retrying the request, in milliseconds. |
3000 |
nested.struct.mode |
How nested STRUCT fields are handled. Possible values are: |
CONCAT (separator defined in |
nested.struct.concat.separator |
The separator that is used to concatenate nested field names into Ignite column name when |
Underscore symbol ( |
flush.mode |
Flush mode. Possible values are: |
|
ignite.addresses |
Required. Addresses of the GridGain nodes the data will be sent to. |
|
ignite.authenticator.basic.password |
Password for basic authentication to the GridGain cluster. |
|
ignite.authenticator.basic.username |
Username for basic authentication to the GridGain cluster. |
|
ignite.background.reconnect.interval |
Background reconnect interval in milliseconds. Set to 0 to disable background reconnect. |
|
ignite.connect.timeout |
Socket connection timeout, in milliseconds. |
5000 |
ignite.error.handling.policy |
Error handling policy. Supported values: |
|
ignite.heartbeat.interval |
An interval at which the client sends heartbeat messages to the cluster, in milliseconds. 0 disables heartbeats. |
30000 |
ignite.heartbeat.timeout |
Heartbeat response timeout, in milliseconds. The connection is closed if the response is not received before the timeout occurs. |
5000 |
ignite.reconnect.interval |
Reconnect interval, in milliseconds. 0 disables background reconnects. |
30_000 |
ignite.ssl.client.authentication.mode |
Client authentication mode: |
|
ignite.ssl.ciphers |
Comma-separated list of ciphers to be used to set up the SSL connection. |
|
ignite.ssl.enabled |
If true, an SSL/TLS connection is established. |
|
ignite.ssl.key.store.password |
Keystore password to be used to set up the SSL connection. |
|
ignite.ssl.key.store.path |
Keystore path to be used to set up the SSL connection. |
|
ignite.ssl.trust.store.password |
Truststore password to be used to set up the SSL connection. |
|
ignite.ssl.trust.store.path |
Truststore path to be used to set up the SSL connection. |
|
ignite.streamer.auto.flush.interval |
Ignite data streamer’s auto-flush interval. The interval, in milliseconds, after which the data streamer will automatically flush the data to the cluster. |
5000 |
ignite.streamer.page.size |
Ignite data streamer’s page size. The number of entries that will be sent to the cluster per network call. |
1000 |
ignite.streamer.parallel.ops |
Ignite data streamer’s parallel operations. The number of parallel operations per partition. Defines how many in-flight requests can be active per partition. |
1 |
ignite.streamer.retry.limit |
Ignite data streamer’s retry limit. The number of retries in case of a connection issue. |
16 |
ignite.streamer.receiver.class.name |
Specifies the fully qualified class name of the custom receiver to use with the streamer. The receiver must implement |
|
ignite.streamer.receiver.deployment.units |
Accepts a comma-separated list of deployment units in the format |
|
ignite.table.name.regex |
The regular expression pattern that will be replaced in the topic name. See the examples below for how to use regular expressions. |
|
ignite.table.name.regex.replacement |
The value regular expression match will be replaced by. |
|
ignite.table.name.regex.1 |
Optional ordering for regular expressions. Expressions with lower number will be applied first. |
|
ignite.table.name.regex.replacement.1 |
Optional ordering for regular expressions. Expressions with lower number will be applied first. |
Incoming Data Transformation
To apply changes to incoming data, you can use Kafka Single Message Transformations (SMTs) that allow you to modify messages as they flow through the connector. They let you convert field types, rename or remove fields, flatten nested structures, filter values, and more.
For detailed options and examples, please refer to the official SMT documentation.
For more advanced transformations beyond SMTs, see the Streamer Receiver section.
Streamer Receiver
Streamer Receiver functionality allows you to process incoming Kafka data with your own custom logic, implemented in Java, and deployed to GridGain cluster.
You can specify the deployment unit containing your receiver via ignite.streamer.receiver.deployment.units
parameter along with ignite.streamer.receiver.class.name
. If you decide to use a receiver, add these parameters to your connector configuration and define your deployment units and receiver name:
ignite.streamer.receiver.class.name=com.mycompany.MyReceiver
ignite.streamer.receiver.deployment.units=moduleOne:1.0,moduleTwo:2.0
If no deployment unit is specified, the receiver class may not be found on the server. The connector forwards the configuration to the streamer regardless, but to deploy a component in the cluster, it must be included in a deployment unit for the system to know where to look for the receiver class.
Keep in mind that a single receiver is used for the entire connector, which means the same receiver processes all topics defined in the connector. If you require different processing logic for different topics, you should either create separate connectors or implement topic-specific checks within your receiver’s code.
Kafka Topic Mapping
When applying regular expressions to map Kafka topics to GridGain tables, Kafka Sink applies only the first regular expression that returns matches.
In the example below, any text in Kafka topic will be replaced by MyTable:
ignite.table.name.regex=.* ignite.table.name.regex.replacement=MyTable
You can use standard regular expression substitution syntax. The example below will replace topic
with table
while maintaining the number:
ignite.table.name.regex=topic-(\d*) ignite.table.name.regex.replacement=table-$1
Multiple regular expressions can be used to handle different topic names. The example below first tries to match topic-(\d*)
, and if no matches are found in topic name, tries to use the customer-(\d)
expression.
ignite.table.name.regex=topic-(\d*) ignite.table.name.regex.replacement=table-$1 ignite.table.name.regex.1=customer-(\d) ignite.table.name.regex.replacement.2=from-$1
GridGain Configuration
On the receiving GridGain cluster, you need to manually create tables and schemas that match the data from your Kafka topics. Tables are not created automatically, and failure to have the correct table available will cause an error.
Type Conversion
GridGain Kafka Sink Connector uses the schema of the GridGain table to convert Kafka topic messages. For example, when GridGain column has the INT32
type, we expect Kafka data to have the INT8
, INT16
, INT32
, or INT64
data type.
Full list of supported conversions is provided below:
GridGain Type | Kafka Type |
---|---|
INT8 |
INT8, INT16, INT32, INT64 |
INT16 |
INT8, INT16, INT32, INT64 |
INT32 |
INT8, INT16, INT32, INT64 |
INT64 |
INT8, INT16, INT32, INT64 |
FLOAT |
FLOAT, DOUBLE |
DOUBLE |
FLOAT, DOUBLE |
DATE |
STRING, ARRAY |
TIME |
STRING, ARRAY |
DATETIME |
STRING, ARRAY, LONG |
TIMESTAMP |
STRING, ARRAY, DOUBLE |
UUID |
STRING |
BYTE_ARRAY |
STRING, ARRAY, BYTES |
DECIMAL |
STRING, LONG, FLOAT, DOUBLE |
Those conversions have been tested for all standard serializers and deserializers:
-
Json
-
Json with schema
-
Avro
-
Protobuf
© 2025 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.