GridGain Developers Hub

Kafka Connector Configuration

Both the Source and Sink Kafka Connectors are self-documenting. The below configuration reference is captured from IgniteSourceConnectorConfig.conf().toRst() and IgniteSinkConnectorConfig.conf().toRst() methods output.

Source Connector

failoverPolicy

The mode of handling Kafka Connect Worker failover and rebalancing. The options are:

NONE: the Ignite cache updates that happened during the Connector downtime due to failover or rebalancing are lost. This option provides maximum performance.

FULL_SNAPSHOT: pull all data from the Ignite caches each time the Connector starts. This option prevents data loss but is feasible only for small caches.

BACKLOG: resume from the last committed offset. The Connector creates a special Kafka Backlog cache in Ignite where data from all caches are replicated and assigned offsets.The data is pulled from the Kafka Backlog. This option prevents data loss but consumes additional Ignite resources to manage the Kafka backlog cache and is less efficient due to extra data marshalling.

  • Type: string

  • Default: NONE

  • Importance: high

igniteCfg

Path to the Ignite configuration file. $IGNITE_HOME/config/default-config.xml is used if no Ignite config is configured.

  • Type: string

  • Default: ""

  • Importance: high

shallLoadInitialData

Whether to load data already existing in Ignite caches at the time the Connector starts.

  • Type: boolean

  • Default: true

  • Importance: high

shallProcessRemovals

Set this setting to true to make the Connector process removals. In this case the Connector injects a record with a null value into Kafka to indicate that the key was removed.

  • Type: boolean

  • Default: false

  • Importance: medium

backlogCacheName

Name of a backlog cache created in Ignite where data from all caches are replicated and assigned offsets. This setting is valid only when failoverPolicy is set to BACKLOG.

  • Type: string

  • Default: kafka-connect-backlog

  • Importance: low

backlogCacheBackups

The number of backlog cache backups. Setting this property to a value higher than 0 prevents partition loss if a node becomes unstable.

  • Type: int

  • Default: 1

  • Importance: low

backlogFlushFreq

Frequency in milliseconds that the Backlog service flushes data to the Backlog cache. 0 means the setting is disabled. This setting is valid only when failoverPolicy is set to BACKLOG.

  • Type: int

  • Default: 500

  • Importance: low

backlogMemoryRegionName

Name of a memory region used to store backlog cache in Ignite. This setting is valid only when failoverPolicy is set to BACKLOG.

  • Type: string

  • Default: kafka-connect

  • Importance: low

backlogServiceName

Name of a backlog service that manages backlog cache in Ignite. This setting is valid only when failoverPolicy is set to BACKLOG.

  • Type: string

  • Default: kafka-connect-backlog-service

  • Importance: low

batchSize

Maximum number of entries to send to Kafka in single batch.

  • Type: int

  • Default: 10000

  • Importance: low

cacheBlacklist

List of regular expressions to match against names of caches to exclude from copying. If both cacheWhitelist and cacheBlacklist are specified, then cacheWhitelist is analysed first.

  • Type: list

  • Default: null

  • Importance: low

cacheFilter

Class name of a custom java.util.function.Predicate<org.gridgain.kafka.CacheEntry> implementation to filter data pulled from Ignite caches.

  • Type: class

  • Default: null

  • Importance: low

cacheListPollInterval

Frequency in milliseconds to poll for the latest list of caches existing in Ignite.

  • Type: long

  • Default: 5000

  • Importance: low

cacheWhitelist

List of regular expressions to match against names of caches to copy data from.

  • Type: list

  • Default: null

  • Importance: low

isSchemaDynamic

By default key and value schemas are created once and cached. Set this property to true to detect schema changes.

  • Type: boolean

  • Default: false

  • Importance: low

isSchemaless

By default source connector generates cache key and value schemas. Set this property to true to disable schema generation, which improves performance but does not allow non-Ignite sink connectors to understand the data structure.

  • Type: boolean

  • Default: false

  • Importance: low

pollInterval

Frequency in milliseconds to poll for new data in each cache.

  • Type: long

  • Default: 2000

  • Importance: low

topicPrefix

The connector pulls data from Ignite caches into Kafka topics named by prefixing the cache names with this prefix.

  • Type: string

  • Default: ""

  • Importance: low

Sink Connector

igniteCfg

Path to the Ignite configuration file. $IGNITE_HOME/config/default-config.xml is used if no Ignite config is configured.

  • Type: string

  • Default: ""

  • Importance: high

shallProcessUpdates

Indicates if overwriting or removing existing values in the sink cache is enabled. Sink connector performs better if this flag is disabled.

  • Type: boolean

  • Default: false

  • Importance: medium

cacheFilter

Class name of a custom java.util.function.Predicate<org.gridgain.kafka.CacheEntry> implementation to filter data pushed to Ignite caches.

  • Type: class

  • Default: null

  • Importance: low

cachePrefix

Sink cache name is built from this prefix and kafka topic without topic prefix. For example, if topic is ignite.person, topicPrefix is ignite. and cachePrefix is ignite- then sink cache name is ignite-person.

  • Type: string

  • Default: ""

  • Importance: low

flushOnOffsetCommit

Set this setting to true to make the Connector flush records to Ignite by batches on kafka offset commit. By default flush executed after each processed record.

  • Type: boolean

  • Default: false

  • Importance: low

keyFields

A comma-separated list of field names to use for the Ignite cache key. Not applicable if keyPolicy is set to kafka. All fields are used if the setting is not specified.

  • Type: list

  • Default: ""

  • Importance: low

keyPolicy

Specifies what data to use for the Ignite cache key. The options are:

key: Fields from the record key are used.

value: Fields from the record key are used.

kafka: Ignite Binary Object having 3 fields - Kafka record’s topic, partition, and offset - is used as the cache key.

  • Type: string

  • Default: key

  • Importance: low

pushInterval

Frequency in milliseconds to push data to Ignite.

  • Type: long

  • Default: 2000

  • Importance: low

topicPrefix

Kafka topic is built from this prefix and cache name.

  • Type: string

  • Default: ignite-

  • Importance: low