Example: Ignite Data Replication with Kafka Connector
This example demonstrates how to configure and run both the Source and Sink connectors.

Prerequisites
-
GridGain Enterprise or Ultimate version 8.4.9 or later is installed. The
IGNITE_HOME
environment variable points to GridGain installation directory on every GridGain node. -
Kafka 2.0 is installed. The
KAFKA_HOME
environment variable points to Kafka installation directory on every node.
Step 1: Install GridGain Kafka Connector
1.1. Prepare GridGain Connector Package
The connector is in the $IGNITE_HOME/integration/gridgain-kafka-connect
directory. Execute this script on one of the GridGain nodes to pull missing connector dependencies into the package:
cd $IGNITE_HOME/integration/gridgain-kafka-connect
./copy-dependencies.sh
1.2. Register GridGain Connector with Kafka
In this example we assume /opt/kafka/connect
is the Kafka connectors installation directory.
For every Kafka Connect Worker:
-
Copy GridGain Connector package directory you prepared on the previous step from the GridGain node to
/opt/kafka/connect
on the Kafka Connect worker. -
Edit Kafka Connect Worker configuration (
$KAFKA_HOME/config/connect-standalone.properties
for single-worker Kafka Connect cluster or$KAFKA_HOME/config/connect-distributed.properties
for multiple node Kafka Connect cluster) to register the connector on the plugin path:plugin.path=/opt/kafka/connect/gridgain-kafka-connect
Step 2: Start Ignite Clusters and Load Data
Create source and sink Ignite cluster configurations. The following configuration files allow running both the clusters on the same host:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="source" />
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="sink" />
<property name="clientConnectorConfiguration">
<bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
<property name="port" value="10900"/>
</bean>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="localPort" value="47510" />
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47510..47519</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Start source Ignite cluster (assuming the Ignite configuration file is in the current directory):
$IGNITE_HOME/bin/ignite.sh ignite-server-source.xml
[15:47:30] Ignite node started OK (id=aae42b8b, instance name=source)
[15:47:30] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]
> ignite.bat ignite-server-source.xml
[15:47:30] Ignite node started OK (id=aae42b8b, instance name=source)
[15:47:30] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]
Start sink Ignite cluster:
$IGNITE_HOME/bin/ignite.sh -J-DIGNITE_JETTY_PORT=8081 ignite-server-sink.xml
[15:48:16] Ignite node started OK (id=b8dcc29b, instance name=sink)
[15:48:16] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]
> ignite.bat -J-DIGNITE_JETTY_PORT=8081 ignite-server-sink.xml
[15:48:16] Ignite node started OK (id=b8dcc29b, instance name=sink)
[15:48:16] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]
Run Ignite Web Agent for the sink Ignite cluster (GridGain Web Agent’s home is current directory):
ignite-web-agent.sh -n 'http://localhost:8081'
[2018-05-30 20:08:41,672][INFO ][pool-1-thread-1][ClusterListener] Connection successfully established to cluster with nodes: [445A2B54]
Load some data into source Ignite:
$IGNITE_HOME/bin/sqlline.sh --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1
jdbc:ignite:thin://127.0.0.1/> CREATE TABLE IF NOT EXISTS Person (id int, city_id int, name varchar, PRIMARY KEY (id));
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (2, 'John Smith', 4);
> sqlline.bat --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1
jdbc:ignite:thin://127.0.0.1/> CREATE TABLE IF NOT EXISTS Person (id int, city_id int, name varchar, PRIMARY KEY (id));
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (2, 'John Smith', 4);
Open GridGain Web Console Monitoring Dashboard and see there are no caches in the Sink Ignite cluster.
Step 3: Start Kafka and GridGain Source and Sink Connectors
Start Kafka cluster:
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
> zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
> kafka-server-start.bat %KAFKA_HOME%\config\server.properties
Create GridGain Source and Sink Connector configuration files (replace IGNITE_CONFIG_PATH
with the absolute path to the Ignite configuration created above):
name=gridgain-quickstart-source
tasks.max=1
connector.class=org.gridgain.kafka.source.IgniteSourceConnector
igniteCfg=IGNITE_CONFIG_PATH/ignite-server-source.xml
topicPrefix=quickstart-
name=gridgain-quickstart-sink
topics=quickstart-SQL_PUBLIC_PERSON,quickstart-SQL_PUBLIC_CITY
tasks.max=1
connector.class=org.gridgain.kafka.sink.IgniteSinkConnector
igniteCfg=<IGNITE_CONFIG_PATH>/ignite-server-sink.xml
topicPrefix=quickstart-
Start connectors (assuming the connector configuration files are in the current directory):
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties connect-gridgain-source.properties connect-gridgain-sink.properties
Step 4: Observe Initial Data Replication
Open GridGain Web Console Monitoring Dashboard and see the Sink Connector created two caches in the sink Ignite cluster and the SQL_PUBLIC_PERSON
cache already contains 2 entries:

In the Web Console, create a Query workbook and run a Scan query against the SQL_PUBLIC_PERSON
cache. Press the Export button and check the generated CSV file to see that the Person
data is exactly the same as added in the source Ignite cluster.

Step 5: Observe Runtime Data Replication
Add more person data to the source cluster:
$ sqlline.sh --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (3, 'Mike', 5);
In the Web Console, run the Scan query again to see the new Mike’s data has been replicated in the Sink Ignite Cluster:

Step 6: Observe Dynamic Cache Replication
Create a new city
cache in the source cluster and load some data into it:
$ sqlline.sh --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1
jdbc:ignite:thin://127.0.0.1/> CREATE TABLE IF NOT EXISTS City (id int, name varchar, PRIMARY KEY (id));
jdbc:ignite:thin://127.0.0.1/> INSERT INTO City (id, name) VALUES (3, 'San-Francisco');
jdbc:ignite:thin://127.0.0.1/> INSERT INTO City (id, name) VALUES (4, 'Oakland');
In the Web Console, run a Scan query against cache SQL_PUBLIC_CITY
and see the new city
cache with data has been replicated in the Sink Ignite Cluster:

© 2023 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.