GridGain Developers Hub

Example: Ignite Data Replication with Kafka Connector

This example demonstrates how to configure and run both the Source and Sink connectors.

kc ex replicate

Prerequisites

  • GridGain Enterprise or Ultimate version 8.4.9 or later is installed. The IGNITE_HOME environment variable points to GridGain installation directory on every GridGain node.

  • Kafka 2.0 is installed. The KAFKA_HOME environment variable points to Kafka installation directory on every node.

Step 1: Install GridGain Kafka Connector

1.1. Prepare GridGain Connector Package

The connector is in the $IGNITE_HOME/integration/gridgain-kafka-connect directory. Execute this script on one of the GridGain nodes to pull missing connector dependencies into the package:

cd $IGNITE_HOME/integration/gridgain-kafka-connect
./copy-dependencies.sh

1.2. Register GridGain Connector with Kafka

In this example we assume /opt/kafka/connect is the Kafka connectors installation directory.

For every Kafka Connect Worker:

  1. Copy GridGain Connector package directory you prepared on the previous step from the GridGain node to /opt/kafka/connect on the Kafka Connect worker.

  2. Edit Kafka Connect Worker configuration ($KAFKA_HOME/config/connect-standalone.properties for single-worker Kafka Connect cluster or $KAFKA_HOME/config/connect-distributed.properties for multiple node Kafka Connect cluster) to register the connector on the plugin path:

    plugin.path=/opt/kafka/connect/gridgain-kafka-connect

Step 2: Start Ignite Clusters and Load Data

Create source and sink Ignite cluster configurations. The following configuration files allow running both the clusters on the same host:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="igniteInstanceName" value="source" />

        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="igniteInstanceName" value="sink" />

        <property name="clientConnectorConfiguration">
            <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
                <property name="port" value="10900"/>
            </bean>
        </property>

        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="localPort" value="47510" />
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47510..47519</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Start source Ignite cluster (assuming the Ignite configuration file is in the current directory):

$IGNITE_HOME/bin/ignite.sh ignite-server-source.xml

[15:47:30] Ignite node started OK (id=aae42b8b, instance name=source)
[15:47:30] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]
> ignite.bat ignite-server-source.xml

[15:47:30] Ignite node started OK (id=aae42b8b, instance name=source)
[15:47:30] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]

Start sink Ignite cluster:

$IGNITE_HOME/bin/ignite.sh -J-DIGNITE_JETTY_PORT=8081 ignite-server-sink.xml

[15:48:16] Ignite node started OK (id=b8dcc29b, instance name=sink)
[15:48:16] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]
> ignite.bat -J-DIGNITE_JETTY_PORT=8081 ignite-server-sink.xml

[15:48:16] Ignite node started OK (id=b8dcc29b, instance name=sink)
[15:48:16] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, heap=1.0GB]

Run Ignite Web Agent for the sink Ignite cluster (GridGain Web Agent’s home is current directory):

ignite-web-agent.sh -n 'http://localhost:8081'

[2018-05-30 20:08:41,672][INFO ][pool-1-thread-1][ClusterListener] Connection successfully established to cluster with nodes: [445A2B54]

Load some data into source Ignite:

$IGNITE_HOME/bin/sqlline.sh --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1

jdbc:ignite:thin://127.0.0.1/> CREATE TABLE IF NOT EXISTS Person (id int, city_id int, name varchar, PRIMARY KEY (id));
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (2, 'John Smith', 4);
> sqlline.bat --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1

jdbc:ignite:thin://127.0.0.1/> CREATE TABLE IF NOT EXISTS Person (id int, city_id int, name varchar, PRIMARY KEY (id));
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (2, 'John Smith', 4);

Open GridGain Web Console Monitoring Dashboard and see there are no caches in the Sink Ignite cluster.

Step 3: Start Kafka and GridGain Source and Sink Connectors

Start Kafka cluster:

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
> zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
> kafka-server-start.bat %KAFKA_HOME%\config\server.properties

Create GridGain Source and Sink Connector configuration files (replace IGNITE_CONFIG_PATH with the absolute path to the Ignite configuration created above):

name=gridgain-quickstart-source
tasks.max=1
connector.class=org.gridgain.kafka.source.IgniteSourceConnector

igniteCfg=IGNITE_CONFIG_PATH/ignite-server-source.xml
topicPrefix=quickstart-
name=gridgain-quickstart-sink
topics=quickstart-SQL_PUBLIC_PERSON,quickstart-SQL_PUBLIC_CITY
tasks.max=1
connector.class=org.gridgain.kafka.sink.IgniteSinkConnector

igniteCfg=<IGNITE_CONFIG_PATH>/ignite-server-sink.xml
topicPrefix=quickstart-

Start connectors (assuming the connector configuration files are in the current directory):

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties connect-gridgain-source.properties connect-gridgain-sink.properties

Step 4: Observe Initial Data Replication

Open GridGain Web Console Monitoring Dashboard and see the Sink Connector created two caches in the sink Ignite cluster and the SQL_PUBLIC_PERSON cache already contains 2 entries:

kc ex replicate data01

In the Web Console, create a Query workbook and run a Scan query against the SQL_PUBLIC_PERSON cache. Press the Export button and check the generated CSV file to see that the Person data is exactly the same as added in the source Ignite cluster.

kc ex replicate data02

Step 5: Observe Runtime Data Replication

Add more person data to the source cluster:

$ sqlline.sh --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1

jdbc:ignite:thin://127.0.0.1/> INSERT INTO Person (id, name, city_id) VALUES (3, 'Mike', 5);

In the Web Console, run the Scan query again to see the new Mike’s data has been replicated in the Sink Ignite Cluster:

kc ex replicate data03

Step 6: Observe Dynamic Cache Replication

Create a new city cache in the source cluster and load some data into it:

$ sqlline.sh --color=true --verbose=true -u jdbc:ignite:thin://127.0.0.1

jdbc:ignite:thin://127.0.0.1/> CREATE TABLE IF NOT EXISTS City (id int, name varchar, PRIMARY KEY (id));
jdbc:ignite:thin://127.0.0.1/> INSERT INTO City (id, name) VALUES (3, 'San-Francisco');
jdbc:ignite:thin://127.0.0.1/> INSERT INTO City (id, name) VALUES (4, 'Oakland');

In the Web Console, run a Scan query against cache SQL_PUBLIC_CITY and see the new city cache with data has been replicated in the Sink Ignite Cluster:

kc ex replicate data04