Getting Started with Ignite Data Loading: Ignite Data Streamer Loading

In this third article of the three-part series “Getting Started with Ignite Data Loading,” we continue to review data loading into Ignite tables and caches, but now we focus on using the Ignite Data Streamer facility to load data in large volume and with highest speed.

Apache Ignite Data-Loading Facilities

In the first article of this series, we discussed the facilities that are available to Ignite for loading data from external systems. Among a range of facilities, we focused on the two fastest load facilities: CacheStore and Ignite Data Streamer. The following section provides a quick review of the Ignite CacheStore. Then, we move onto Ignite Data Streamer.

CacheStore

The Ignite CacheStore is the primary vehicle used in the in-memory data grid (IMDG) scenario for synchronization with external (third- party), persistence stores. A CacheStore works through the cache read-through and write-through features and the underlying persistence layer, such as an RDBMS or NoSQL DB. Additionally, the Ignite CacheStore interface has a cache Load method that can be used for hot-loading the cache or for loading data on demand. In the second article of this series, we used a utility class, CacheLoadOnlyStoreAdapter, to build out a custom “Load Only” type of CacheStore. This type of cache is useful for having the Ignite cluster load data from external systems (that is, from versions of a CacheStore that do not perform read-through and write-through). A pictorial representation of the Cache API, CacheStore, cache, data-source relationship is shown in the following diagram:

Apche Ignite Data Streamer Loading 1


Ignite Data Streamer

As we see on the Ignite Data Loading & Streaming page, the Ignite Data Streamer API and the streamers that use it are “...built to inject large amounts of continuous streams of data into Ignite stream caches. Data streamers are built in a scalable and fault-tolerant fashion and provide at-least-once-guarantee semantics for all the data streamed into Ignite.” Ignite comes out of the box with a collection of pre-built data streamers for systems that you may already use, like Kafka, JMS, MQTT, and others. In this article, we walk through the development of Ignite Data Streamer for loading CSV files and review JSON and JDBC source-streamer examples from the article’s project.

Data Integration Pattern: Apache Ignite Digital Integration Hub

In the article “Getting Started with Ignite Patterns,” I talked about the Digital Integration Hub (DIH) and how Apache Ignite can be used to implement it. I described a “horizontal” data pattern where Ignite’s compute and event-based processing, plus data-integration facilities, are used to move data from source systems on the left, into the Ignite IMDB, and then out to target systems on the right.

The following diagram of the horizontal DIH pattern illustrates the basis for this data-loading series, with ingestion performed by the Ignite Data Streamer that is highlighted in the red ellipse:

Apche Ignite DataStreamer Loading 2

Client Passes Data to Server

In the preceding diagram, a red ellipse indicates that the Data Ingest function is performed by a Data Streamer type of ingestion. However, in this article’s project, unlike the LoadOnly type of CacheStore that was described in the second article of this series, the application that we write will perform the data reading, the parsing, and the submitting of key/value pairs to the data streamer for ingest into the cluster. Most people consider this client-driven method to be the standard data-loading flow. A diagram of the client-driven, data-load flow pattern is shown in the following diagram:

Apche Ignite DataStreamer Loading 3

The preceding diagram shows an Ignite client application that is written for a particular data source (in this case, CSV files). The application, which is named “LoadCachesFromCsv,” knows how to reach out and actively get the data. This application then performs the writing to or “loading” of the data into the cluster. In this data-load flow, one can use any of the Ignite write APIs—put(), putAll(), SQL Insert, and so on, but the Ignite Data Streamer is most applicable and delivers the fastest load performance.

CSV File Data Streamer Project

In this article, we document how to create an Ignite Data Streamer project for data ingestion into Ignite caches and tables. The process for constructing this project is described in the following three steps. A complete project can be found in GitHub.

1. Standard Web Console Maven Project

If we have a quick look at the default Ignite Maven project that the GridGain Web Console generates, we see a well-organized structure and a set of pre-built components. We use these components as a starting point to build our Ignite Data Streamer client-based CSV file loader. I created a video tutorial on how to do this with a MySQL/MariaDB Sales database. In this series, we see how the Web Console can generate a fully formed Maven Java project that  has a complete Ignite configuration with cluster configuration and cache configurations patterned on what is found through introspection in the targeted data source.  

Apche Ignite DataStreamer Loading 4

A few interesting elements of the default WC project are:

  • model folder holding the Java object model representation of the caches/tables that you “imported” using the Import Wizard.
  • Config folder holding the cluster configuration factory classes.
  • load folder holding a client application that initiates the load of data using the CacheStore.
  • Startup folder holding classes that start a server or client node using the configuration loaded from code or XML Spring Bean.
  • Resources folder holding the XML Spring Bean server/client configurations and a properties file used to externalize particular configuration parameters.
  • Docker/.dockerignore files to support the starting of a Docker image that is extended to this project.
  • pom.xml file that instructs Maven what and how to build this project.
  • Jdbc-drivers folder where jar files can be placed that will be included for compilation into the project.
  • README.txt text file that describes this project.

2. Modify for Re-usability and Shared Use

We will perform the following changes to the standard Web Console Maven project, to make the project more independent and to make it usable for other purposes:

a. Adjust Maven pom.xml - The Maven pom.xml file is the blue-print for building the project. We will adjust it to parameterize it (for ongoing use and re-use), and make some customizations:

Add the following reusable properties to the file, which are referenced later in the file:


<properties>
    <ignite. class="language-bash"version>2.8.1</ignite.version>
    <java.version>1.8</java.version>
</properties>

We can use the reusable properties to change from hard-coded values, as per the configuration wizard, to values that may change with the project over time, such as <version>${ignite.version}</version>

Add additional libraries referenced in the project:


<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-csv</artifactId>
    <version>1.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
</dependency>

For our project we may want to leverage existing libraries (e.g. in this case CSV and MySQL libraries).

b. Rename properties file - We will rename the file, from a generic “secrets.properties” name to a project-specific name. Renaming enables us to separate the properties files that are deployed to Ignite runtime, so they do not interfere with each other. For example, assume that both a Sales project and an HR project have a secret.properties file (for example, sales-1.0.0.jar and hr-1.0.0.jar) in their deployed jar files. In this case, the Java runtime does not have visibility to both properties files. The runtime can read only the properties file that is loaded first (dependent on the class loader, the order of jar- file loading, and so on).

Here, we changed the name to “sales.properties” (to reference the business area that we are working with) and added a property (dataLocation). The property, which can be used in our configuration, eliminates the need for hard-coding a location.

c. Move Packages - In the out-of-the-box Web Console Maven project, the config, load and startup packages and associated classes are defined at the package root. This placement may interfere with deployments of other projects, because their classes will have the same package and class name and thus conflict with those here in the Sales project. To avoid this problem, we move the config, load and startup packages to our Sales package (com.gridgain.sales).

Apche Ignite Data Streamer Loading 6

d. Utilities - One of the most common tasks for loading data is parsing from one format to another, or to facilitate empty-value handling. This project includes a ParseTypes utility class that can be used anywhere parsing functionality is required, no need to rewrite code each time it is used.


package com.gridgain.sales.utility;
import java.math.BigDecimal;...
public class ParseTypes {
    public static Double parseDouble(String strNumber) {...}
    public static Float parseFloat(String strNumber) {...}
    public static BigDecimal parseBigDecimal(String strNumber) {...}
    public static Integer parseInteger(String strNumber) {...}
    public static Short parseShort(String strNumber) {...}
    public static Date parseDate(String strDate) {...}
    public static Timestamp parseTimestamp(String strDate) {...}
    public static Date parseTimestampToDate(String strDate) {...}
    public static Time parseTime(String strTime) {...}
    public static String parseString(String str) {...}
    public static byte[] parseBytes(String str) {...}
}

As you see, there are utility methods for converting from string values that are read from a data source that is not strongly typed (for example, from a CSV file) to data types that can then be loaded directly into an Ignite cache’s key or value field.

We’ll choose one method to examine in more detail. We’ll see how a string is converted to a BigDecimal object:


public static BigDecimal parseBigDecimal(String strNumber) {
    BigDecimal retVal = null;
    if (strNumber != null && strNumber.length() > 0) {
        try {
            return BigDecimal.valueOf(new Double(strNumber));
        } catch(Exception e) {
            System.out.println("parseBigDecimal - value: " + strNumber + "; Parsing ERROR: " + e );
            return retVal;
        }
    }
    else return retVal;
}

The code handles a case in which we are unable to understand (parse) the stringified BigDecimal. A null ReturnValue (retVal) is returned, and an error is printed. You may want your loaders to behave differently.

Create Ignite Data Streamer for CSV Files

The core deliverable of this project is an implementation of the Ignite Data Streamer. The implementation creates an Ignite Client connection, then reads, parses, and loads CSV files to the cluster.

a. Class Definition

This example is contained in the class LoadCachesFromCsv:


/**
 * Ignite Client loads data from CSV files to Caches via Ignite Data Streamer
 */
public class LoadCachesFromCsv {
    /**
     * Start an Ignite Client and perform CSV Reading -> Cache Writing (via Ignite Data Streamer)
     *
     * @param args Command line arguments, none required.
     * @throws IgniteException If failed.
     */

This main method of this class starts an Ignite client, reads the CSV file or files, and uses the data streamer to inject data into the table or cache.

b. main Method

We start and run this Ignite Data Streamer example through the class’s main method. We begin by setting a few defaults and initializing a counter (the variable n, to count how many rows we load):


public static void main(String[] args) throws IgniteException, IOException {
    DecimalFormat numFormat = (DecimalFormat)NumberFormat.getCurrencyInstance();
    String symbol = numFormat.getCurrency().getSymbol();
    numFormat.setNegativePrefix("-"+symbol);
    numFormat.setNegativeSuffix("");
    CSVParser csvParser = null;
    int n = 0; // record counter

c. Start Ignite Client

The program will act as a client to the Ignite cluster, and we will use the Ignite Thick Client API. We use the Ignite start method to create a client connection to the cluster for our program:


try (Ignite ignite = Ignition.start("sales-client.xml")){
    System.out.println(">>> CSV Stream Loading caches:");

We used a SpringBean configuration injection approach to initialize an Ignite client. However, you can also use the pure Java code configuration approach that is also provided in the standard Web Console project’s ClientNodeCodeStartup.java:


Ignition.start(ClientConfigurationFactory.createConfiguration());
d. Handle Each File with BufferedReader and CSVParser

Each Ignite cache or table will be loaded from a CSV file. We leverage both the Java File and Commons CSV handling libraries. We start by creating a buffered reader to our desired file. Then, we define a CSVParser object that uses the reader:


/** Product Cache */
System.out.println(">>>>>>>>>>>>>>>>> ProductCache...");
Reader reader = Files.newBufferedReader(Paths.get("/data/sales/product.csv"), StandardCharsets.UTF_8);
csvParser = new CSVParser(
    reader,
    CSVFormat.DEFAULT
        .withEscape('\\')
        .withQuoteMode(QuoteMode.NONE)
        .withFirstRecordAsHeader()
        .withTrim()
);

In creating the CSVParser we made some decisions about how the component operates. For example, we indicated that the CSV files have header rows and that elements are to be trimmed, to eliminate any possible ending space.

e. Create Ignite Streamer and Parse Each Record into Key and Value

At the core of the loading process, we create an IgniteDataStreamer object, parse the CSV file, create our Key and Value objects, and pass the objects to the streamer. Then, the steamer can do its work.


try (IgniteDataStreamer<String, Product> streamer = ignite.dataStreamer("ProductCache")){
    n = 0; // record counter
    for (CSVRecord csvRecord : csvParser) {
        String k = ParseTypes.parseString(csvRecord.get(0));
        Product v = null;
        try {
            v = new Product(
                ParseTypes.parseString(csvRecord.get(1)),
                ParseTypes.parseString(csvRecord.get(2)),
                ParseTypes.parseString(csvRecord.get(3)),
                ParseTypes.parseString(csvRecord.get(4)),
                ParseTypes.parseString(csvRecord.get(5)),
                ParseTypes.parseShort(csvRecord.get(6)),
                ParseTypes.parseBigDecimal(csvRecord.get(7)),
                ParseTypes.parseBigDecimal(csvRecord.get(8))
            );
        } catch (NumberFormatException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        streamer.addData(k, v);
        n += 1;
    }
} catch (Exception e ) {
    System.out.println("Caught Exception - loading Product: " + e);
} finally {
    System.out.println(">>> Load Product count: " + n);
}

Here, we see that the variables k and v (standing for Key and Value instance) are being created. For the Product cache, the Key is of type “String,”  so a simple assignment is performed. However, the Value type is a “Product” object; and we use the object’s main constructor to feed the object all necessary values from the fields of the parsed-out CSV files. We also take care to parse into the necessary field-data types. For example, the seventh and eighth fields of the Product object are numbers that require exact precision (versus float or double precision). So we take advantage of the ParseType utility class to parse the string representation that is read from the CSV text file and into a BigDecimal object.

After we have the Key and Value components of the cache entry, we use the Ignite Data Streamer addData(Key, Value) method, increment the counter, and continue to the next row of the file.

It is important to note that the addData() method is asynchronous. Thus, the method enables us to continue reading and parsing the input file as fast as possible; allows the Ignite Data Streamer to chunk individual records into efficient blocks of data that can be spread throughout the cluster; and achieves the highest possible data-ingestion rate.

Build, Deploy, and Run

As with any Maven project, our customized Ignite Maven project needs us to do the following:

  • Compile. Compile the source code of the project.
  • Package. The compiled code and resources are packaged into a distribution format (usually a jar file).
  • Install. Install the package into the local repository.
  • Deploy. Copy the package to a remote repository.
  • Run. Run an Ignite cluster with the deployed package in its runtime classpath.

We do not have time to discuss the intricacies of the Maven lifecycle or the Maven definition of “install” and “deploy.”  For this article, we think of “deploy” as a task that sends build components to a remote repository, and we think of “install” as a task that occurs on the local machine.  We will use the install task to copy the file to our local runtime. To perform the copy, we will leverage the maven-dependency-plugin and its “copy” goal. So, by adding the following block to the existing pom file, we enable a seamless “copy” to runtime for our runtime uses:


<plugins>
  <plugin>
    <artifactId>maven-dependency-plugin</artifactId>
      <executions>
      ...
        <execution>
            <id>copy-installed</id>
            <phase>install</phase>
            <goals>
                <goal>copy</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <artifactItem>
                        <groupId>${project.groupId}</groupId>
                        <artifactId>${project.artifactId}</artifactId>
                        <version>${project.version}</version>
                        <type>${project.packaging}</type>
                    </artifactItem>
                </artifactItems>
                <outputDirectory>/apps/ignite-${ignite.version}/libs/</outputDirectory>
            </configuration>
        </execution>

*** Note: the outputDirectory must match your runtime installation location and its libs sub-folder.

The five build-deploy-run steps can be simplified to the following to command line instructions:
1. mvn clean install This command performs all clean-up, compiling, building and, with the added copy plugin, copying of the built jar to your Ignite runtime.
Alternatively, you can manually copy the target jar to a runtime destination folder of your choosing.
2. bin/ignite.sh -v sales-server.xml on macOS, Linux, zOS, or WSL or bin/ignite.bat -v sales-server.xml on Windows, starts an instance node of your cluster.

Once your cluster is running, you can kick off the load by telling the cluster to perform its cache.load() method:
MAIN_CLASS=com.gridgain.sales.load.LoadCachesFromCacheStore && bin/ignite.sh -v sales-server.xml on MacOS / linux / zOS / WSL
or set MAIN_CLASS=com.gridgain.sales.load.LoadCachesFromCacheStore && bin/ignite.bat -v sales-server.xml on Windows, starts an instance node of a client node and initiates the loading of all 8 cluster caches.
*** Note: Both versions of the command set an environment variable (MAIN_CLASS).
The variable instructs the ignite.sh/.bat start script to use a program class other than the default start class.
&& is a logical operator that instructs the command shell to issue the second part of the command only if the first part was successful.

In the project Scripts folder I have included two scripts (salesdataload-server.bat/sh and salesdataload-load.bat/sh) in the GitHub project that runs the server and the load program. When you run the load program (script), you see the following client-output messages:


>>> Loading caches...
>>> Loading cache: OfficeCache
>>> Loading cache: ProductLineCache
>>> Loading cache: ProductCache
>>> Loading cache: EmployeeCache
>>> Loading cache: CustomerCache
>>> Loading cache: OrderCache
>>> Loading cache: OrderDetailCache
>>> Loading cache: PaymentCache
>>> All caches loaded!

Summary

In this article, we built an Ignite Data Streamer CSV file load client. With Ignite deployed as an IMDB, the load client implements the data-ingestion, integration part of the “horizontal” data pattern of a Digital Integration Hub. We saw that we can modify the well-defined structure of a Web Console project  to build an extensible project.  We used the IgniteDataStreamer class that Ignite provides to leverage the high-performance, stream loading of cache entries. Also, we used the Java File reader library and Apache Commons CSV Parser, plus our own utility class, to speed the development of a CSV file parser that serves our purposes.

We have completed our Getting Started with Ignite Data Load series. Thanks for your interest!

Cheers,
Glenn