Apache® Ignite™ and Apache® Spark™ Integration using Ignite RDDs

In this two-part series, we will look at how Apache® Ignite™ and Apache® Spark™ can be used together.

Ignite is a memory-centric distributed database, caching, and processing platform. It is designed for transactional, analytical, and streaming workloads, delivering in-memory performance at scale.

Spark is a streaming and compute engine that typically ingests data from HDFS or other storage. Historically, it has been inclined towards OLAP and focussed on Map-Reduce payloads.

The two technologies are, therefore, complementary.

Using Ignite and Spark together

Combining these two technologies provides Spark users with a number of significant benefits:

  • True in-memory performance at scale can be achieved by avoiding data movement from a data source to Spark workers and applications.
  • RDD, DataFrame and SQL performance can be boosted.
  • State and data can be more easily shared amongst Spark jobs.

Figure 1 shows how we can combine these two technologies and highlights some of the key benefits.

Figure 1. Ignite and Spark Integration.

Figure 1. Ignite and Spark Integration.

In this first article, we will focus on Ignite RDDs. In the second article, we will focus on Ignite DataFrames.

Ignite RDD

Ignite provides an implementation of the Spark RDD, called Ignite RDD. This implementation allows any data and state to be shared in memory as RDDs across Spark jobs. The Ignite RDD provides a shared, mutable view of the same data in-memory in Ignite across different Spark jobs, workers, or applications. In contrast, native Spark RDDs cannot be shared across Spark jobs or applications.

The Ignite RDD is implemented as a view over distributed Ignite storage. It can be deployed with an Ignite node either within the Spark job executing process, on a Spark worker, or in a separate Ignite cluster. So, depending upon the chosen deployment mode, the shared state may exist only during the lifetime of a Spark application, or it may exist beyond the lifetime of a Spark application.

Ignite can also help Spark users with SQL performance. Whilst SparkSQL supports quite a rich SQL syntax, it doesn't implement any indexing. Consequently, SparkSQL queries may take minutes even on moderately small data sets because a full scan is required. By using Ignite, Spark users can configure primary and secondary indexes that can bring orders of magnitude performance improvement.

Ignite RDD example

Let’s now write some code and build some applications to see how we can use the Ignite RDD and gain its benefits. You can download the code from GitHub if you would like to follow along.

We will write two small Scala applications and then two small Java applications. This is to illustrate that we can use multiple languages to access the Ignite RDD as may be the case in an organization that uses different programming languages and frameworks. Furthermore, we will run our applications from two different environments: the terminal for our Scala applications and an IDE for our Java applications. As a bonus, we will also run some SQL code from one of our Java applications.

For Scala, one application will write tuples into an Ignite RDD and another application will perform some filtering and return a result for us. We will use maven to build a jar file with our code and then run this code from a terminal window. Here is the code in detail:


object RDDWriter extends App {
  val conf = new SparkConf().setAppName("RDDWriter")
  val sc = new SparkContext(conf)
  val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
  val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
  sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i)))
  ic.close(true)
  sc.stop()
}

object RDDReader extends App {
  val conf = new SparkConf().setAppName("RDDReader")
  val sc = new SparkContext(conf)
  val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml")
  val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD")
  val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500)
  println("The count is " + greaterThanFiveHundred.count())
  ic.close(true)
  sc.stop()
}

In our Scala RDDWriter, we first create the SparkConf that includes the application name. Next, we need to create a SparkContext based upon this configuration. Finally, we need to create an IgniteContext from the SparkContext. There are several ways to create the IgniteContext. In our example, we will use an xml file called example-shared-rdd.xml. This xml file ships with the Ignite distribution and contains some pre-configured settings that will be perfect for our needs. Obviously you need to modify the path (/path_to_ignite_home) for your environment. Next, we specify that the Ignite RDD holds tuples of integer values. Finally, we store the integer values from 1 to 1000 into the Ignite RDD. The numbers are stored using 10 parallel operations.

In our Scala RDDReader, the initialization and setup are identical to the Scala RDDWriter and we will use the same xml file, as shown in the code above. Our application will perform some filtering and we are interested in how many values we have stored greater than 500. This answer is then printed out.

Further details about IgniteContext and IgniteRDD can be found in the Apache Ignite documentation.

To build the jar file, we can use the following maven command:

mvn clean install

Next, for our Java code, we will write an application that will add more tuples to our Ignite RDD and another application that will perform some filtering and return a result for us. Here is the Java RDDWriter code in detail:


public class RDDWriter {
    public static void main(String args[]) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("RDDWriter")
                .setMaster("local")
                .set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);
        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        List<Integer> data = new ArrayList<>(20);

        for (int i = 1001; i <= 1020; i++) {
            data.add(i);
        }

        JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);

        sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
            public Tuple2<Integer, Integer> call(Integer val) throws Exception {
                return new Tuple2<Integer, Integer>(val, val);
            }
        }));

        igniteContext.close(true);

        sparkContext.close();
    }
}

In our Java RDDWriter, we first create the SparkConf that includes the application name and the number of executor instances. Next, we need to create a SparkContext based upon this configuration. Finally, we need to create an IgniteContext from the SparkContext. There are several ways to create the IgniteContext. In our example, we will use an xml file called example-shared-rdd.xml. This xml file ships with the Ignite distribution and contains some pre-configured settings that will be perfect for our needs. Obviously you need to modify the path (/path_to_ignite_home) for your environment. Next, we add an additional 20 values to the Ignite RDD.

In our Java RDDReader, the initialization and setup are identical to the Java RDDWriter and we will use the same xml file, as shown in the code below. Our application will perform some filtering and we are interested in how many values we have stored greater than 500. This answer is then printed out. Here is our code for our Java RDDReader:


public class RDDReader {
    public static void main(String args[]) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("RDDReader")
                .setMaster("local")
                .set("spark.executor.instances", "2");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        Logger.getRootLogger().setLevel(Level.OFF);
        Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);

        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
                sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true);

        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

        JavaPairRDD<Integer, Integer> greaterThanFiveHundred =
                sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
                    public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {
                        return tuple._2() > 500;
                    }
                });

        System.out.println("The count is " + greaterThanFiveHundred.count());

        System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

        Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10");

        df.show();

        igniteContext.close(true);

        sparkContext.close();
    }
}

Finally, we are ready to test our code.

Running the applications

In the first terminal window, we will start Spark master, as follows:

$SPARK_HOME/sbin/start-master.sh

In the second terminal window, we will start a Spark worker, as follows:

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port

Modify the ip address and port number (ip:port) for your environment.

In the third terminal window, we will launch an Ignite node, as follows:

$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml

This is using the example-shared-rdd.xml file that we previously discussed.

In the fourth terminal window, we can run our Scala RDDWriter application, as follows:

$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

Modify the ip address and port number (ip:port) and the path (/path_to_jar_file) for your environment.

Once this has completed, we can run the Scala RDDReader application, as follows:

$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDReader" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"

Modify the ip address and port number (ip:port) and the path (/path_to_jar_file) for your environment.

This produces the following output:

The count is 500

This is what we would expect.

Next, we will shut down our Spark worker and Spark master. Our Ignite node will remain running and the Ignite RDD is still available for use by other applications. We will connect to the Ignite RDD from our Java applications using an IDE.

Running the Java RDDWriter should extend the list of tuples that we previously stored in the Ignite RDD. We can test this by running the Java RDDReader and it produces the following output:

The count is 520

This is what we would expect.

Finally, the SQL query performs a SELECT over the Ignite RDD and returns the first 10 values within the range > 10 and < 100. It outputs the following:

+----+
|_VAL|
+----+
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
|  20|
+----+

This is what we would expect.

Summary

In this article we have seen how we can easily access the Ignite RDD using multiple programming languages from multiple environments. We have been able to write values to and read values from the Ignite RDD and the state has been preserved by Ignite even after Spark was shut down. So, we can see that this provides considerable flexibility and benefits for Spark users.

In the next article in this series, we will look at Ignite DataFrames and the benefits that they can bring when using Ignite with Spark.