Using GridGain® with Kafka® Connector

One of the features of Apache® Ignite™ is its ability to integrate with streaming technologies, such as Spark Streaming, Flink, Kafka, and so on. These streaming capabilities can be used to ingest finite quantities of data or continuous streams of data, with the added bonus of fault tolerance and scale that Ignite provides. Data can be streamed into Ignite at very high rates that may reach many millions of events per second. This is possible, since the data can be streamed into an Ignite cluster in parallel as shown in Figure 1.

Figure 1. Ignite Streamers (Source: Apache).

Figure 1. Ignite Streamers (Source: Apache).

Once the data are in an Ignite cluster, we can run queries, perform analytics or apply machine learning algorithms on that data. Using Figure 1, the steps to perform streaming are as follows:

  1. Streams of data are injected into Ignite by clients.
  2. Data are automatically partitioned amongst the Ignite data nodes.
  3. Data can be concurrently processed in the cluster.
  4. SQL queries (for example) can be concurrently performed on the streamed data.
  5. Clients can subscribe to data changes for never-ending queries.

In this two-part series, we will focus on the Kafka Data Streamer. This consumes messages for a Kafka Topic from a Kafka Broker and inserts these messages into an Ignite cache. There is an Open Source solution for Apache Ignite, and an Enterprise Confluent Certified Kafka Connector for GridGain. A detailed comparison of these two implementations is provided in the GridGain documentation. Let's focus on the Kafka Connector for GridGain, since this provides a range of enterprise-ready features, such as:

  • No coding, configuration-driven.
  • Scalability and resilience.
  • Support for Ignite data schemas to enable automated streaming of data from Ignite to numerous other systems with Kafka Connectors.
  • DevOps-friendly monitoring.

We can see the high-level architecture in Figure 2. GridGain can act as a source and sink and we'll look at some examples of how to do this in this article and the next one.

Figure 2. Certified Kafka Connector (Source: GridGain).

Figure 2. Certified Kafka Connector (Source: GridGain).

Let's now build an application, step-by-step, following the GridGain documentation. In this first example, we will use GridGain as the data source and an RDBMS as the data sink, as shown in Figure 3.

Figure 3. GridGain source and RDBMS sink (Source: GridGain).

Figure 3. GridGain source and RDBMS sink (Source: GridGain).

Using GridGain with Kafka Connector to persist data in an RDBMS

If you'd like to follow along with this example, ensure that you meet the required prerequisites first:

  • Download and install GridGain Enterprise or GridGain Ultimate, version 8.4.9 or higher.
  • Download and install Kafka, version 2.0 or higher.
  • Download and install MySQL, version 8.0 or higher.
  • Download and install DBeaver.
  • Ensure that the variables $GRIDGAIN_HOME and $KAFKA_HOME are correctly set to their relevant installation directories.

Install the GridGain Source Connector

Following the instructions in the GridGain documentation, we'll execute these steps to ensure that all the dependencies are available:

$ cd $GRIDGAIN_HOME/integration/gridgain-kafka-connect
$ ./copy-dependencies.sh

We'll use the following directory for the Kafka Connectors:

/usr/local/opt/kafka/connect

Next, we will copy the gridgain-kafka-connect directory to the above location.

In our example, we'll run a single worker Kafka Connect cluster, so we'll modify the following file:

$KAFKA_HOME/config/connect-standalone.properties

and add the full path to the directory we just copied, as follows:

plugin.path=/usr/local/opt/kafka/connect/gridgain-kafka-connect

This completes the source setup. Let's now focus on the sink setup.

Install the JDBC Sink Connector

We'll start by downloading the Confluent JDBC Connector package and extracting it into a directory called confluentinc-kafka-connect-jdbc.

We'll use MySQL Server as the RDBMS and start by downloading the MySQL JDBC Driver and copying the jar file to the following directory:

confluentinc-kafka-connect-jdbc/lib

Next, we'll copy the confluentinc-kafka-connect-jdbc directory to the Kafka Connectors directory. We also need to add the directory path to the properties file, as follows:

plugin.path=/usr/local/opt/kafka/connect/gridgain-kafka-connect,/usr/local/opt/kafka/connect/confluentinc-kafka-connect-jdbc

Both source and sink are now ready, so let's now configure GridGain.

Start a GridGain Cluster

GridGain ships with some configuration files that can be found in the $GRIDGAIN_HOME/examples/config directory. However, for our example application, we will create a new configuration file, as follows:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47500..47502</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
        <!-- Explicitly enable peer class loading. -->
        <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
</beans>

We'll store this configuration file in the $GRIDGAIN_HOME directory.

To connect to a GridGain cluster and run some SQL code, we can use the GridGain Web Console. We also need to download the Web Agent so that the Web Console can correctly communicate with the GridGain cluster. The option to download the Web Agent is offered from the Web Console. We also need to ensure that the following directory:

$GRIDGAIN_HOME/libs/optional/ignite-rest-http

is copied one level up to the directory:

$GRIDGAIN_HOME/libs/

From the $GRIDGAIN_HOME directory, we can start a GridGain server from the terminal, as follows:

$ $GRIDGAIN_HOME/bin/ignite.sh ignite-server.xml

The output should be similar to that shown in Figure 4.

Figure 4. GridGain Cluster Node.

Figure 4. GridGain Cluster Node.

We can also start the Web Agent by running the following command from the Web Agent directory:

$ ./ignite-web-agent.sh

The output should be similar to that shown in Figure 5.

Figure 5. Web Agent.

Figure 5. Web Agent.

Create a Relational Table and add some Rows

Using the Queries tab on the Web Console, we can create a Relational Table and some Rows, as follows:

CREATE TABLE IF NOT EXISTS Person (id int, city_id int, name varchar, PRIMARY KEY (id));
INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
INSERT INTO Person (id, name, city_id) VALUES (2, 'John Smith', 4);

Once we execute this, we can check the status from Monitoring > Dashboard, as shown in Figure 6. We can see that a table has been created with two rows.

Figure 6. Person Table.

Figure 6. Person Table.

MySQL Database

Let's now prepare the JDBC sink side by creating a database in MySQL. We can do this using the DBeaver tool. We'll create an empty database called gridgain-kafka-mysql and a user called demo with password demo. We will give this user full control over this database, as shown in Figure 7.

Figure 7. MySQL Database and User Demo.

Figure 7. MySQL Database and User Demo.

Start Kafka Cluster

We are now ready to test our application. Let's begin by starting a Zookeper instance, as follows:

$ $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

Next, we'll start a Kafka Broker, as follows:

$ $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

Start Kafka Connect Cluster

In our example application, we are creating a Relational Table and need to send schema details along with the data. Therefore, we need to check that the following file:

$KAFKA_HOME/config/connect-standalone.properties

contains these properties:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

Next, we need to create source and sink properties files, as follows:

  1. kafka-connect-gridgain-source.properties
    name=kafka-connect-gridgain-source
    tasks.max=2
    connector.class=org.gridgain.kafka.source.IgniteSourceConnector
    igniteCfg=GRIDGAIN_CONFIG_PATH/ignite-server.xml
    topicPrefix=quickstart-

    Replace GRIDGAIN_CONFIG_PATH with the full path to where you saved the GridGain configuration file.

  2. kafka-connect-mysql-sink.properties
    name=kafka-connect-mysql-sink
    tasks.max=2
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    topics=quickstart-SQL_PUBLIC_PERSON,quickstart-SQL_PUBLIC_CITY
    connection.url=jdbc:mysql://localhost:3306/gridgain-kafka-mysql
    connection.user=demo
    connection.password=demo
    auto.create=true

We will save these two properties files in the $KAFKA_HOME directory.

Finally, we can start the Kafka Connect Worker from the $KAFKA_HOME directory, as follows:

$ $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties kafka-connect-gridgain-source.properties kafka-connect-mysql-sink.properties

The Results

First, let's switch to DBeaver and refresh the list of tables for our database. There should be a new table with two rows, as shown in Figure 8.

Figure 8. Initial Data Load.

Figure 8. Initial Data Load.

From Web Console, let's create another row in the table using the following command:

INSERT INTO Person (id, name, city_id) VALUES (3, 'Mike', 5);

Once we execute this in the Web Console and refresh the table in DBeaver, we should see another row added, as shown in Figure 9.

Figure 9. Runtime Data Load.

Figure 9. Runtime Data Load.

Finally, let's create a new table in Web Console with the following values:

CREATE TABLE IF NOT EXISTS City (id int, name varchar, PRIMARY KEY (id));
INSERT INTO City (id, name) VALUES (3, 'San-Francisco');
INSERT INTO City (id, name) VALUES (4, 'Oakland');

Once we execute this in the Web Console and refresh the tables in DBeaver, there should be a new table with two rows, as shown in Figure 10.

Figure 10. Dynamic Reconfiguration.

Figure 10. Dynamic Reconfiguration.

Summary

In this example application, we have seen the ease with which we can integrate GridGain and Kafka. The Kafka Connector has been certified by Confluent and provides a range of robust enterprise features beyond basic capabilities supported by the implementation available for Apache Ignite.

In the next article in this series, we will look at how to use the Kafka Connector when building applications in the cloud. Stay tuned!