Practical Introduction to Streaming MapReduce



In this article I’ll introduce the concept of Streaming MapReduce processing using GridGain and Scala. The choice of Scala is simply due to the fact that it provides for very concise notation and GridGain provides very effective DSL for Scala. Rest assured you can equally follow this post in Java or Groovy just as well.

The concept of streaming processing (and Streaming MapReduce in particular) can be basically defined as continues distributed processing of continuously incoming data streams. The obvious difference between other forms of distributed processing is that input data cannot be fully sized (or known) before the processing starts, and the incoming data appears to be "endless" from the point of view of the processing application. Typical examples of streaming processing would be processing incoming web event-level logs, twitter firehose, trade-level information in financial systems, facebook updates, RFID chips updates, etc.

Another interesting observation is that streaming processing is almost always real time. The important point here is that streaming nature of input data necessitates the real time characteristic of processing. If your processing lags behind the volume of incoming live data - you will inevitably run out of space to buffer the incoming data and system will crash.

I will provide two code examples to highlight streaming MapReduce processing with GridGain:

  • First is a very simply canonical MapReduce application that I’ll use to illustrate the basics of GridGain.
  • Second is a bit more involved and will demonstrate how you can write a start-to-end streaming MapReduce application (from ingestion to querying).


Examples 1


Let’s start with GridGain. GridGain is Java-based middleware for in-memory processing of big data in a distributed environment. It is based on high performance in-memory data platform that integrates world’s fastest MapReduce implementation with In-Memory Data Grid technology delivering easy to use and easy to scale software.

For the first example we’ll develop an application that will take the string as an argument and will calculate number of non-space characters in it. It will accomplish it by splitting the argument string into individual words, and calculating the number of characters in each word on remote nodes that are currently available in the grid. In the end - it will aggregate the lengths of all words into the final result.

This is a standard "HelloWorld" example in the word of distributed programming.

First off, we need to create the cluster to work with. If you download GridGain and unzip it - all you need to do is to run a node start script passing it a path to XML configuration file 'bin/ggstart.sh examples/config/spring-cache-popularcounts.xml' to start a node:



Note that you can start as many local nodes as you need - just run this script as many times. Note also that you can start standalone nodes from Visor - GridGain DevOps Console (discussion on that is outside of this blog).

Once you started all the nodes (let's say 2) you'll notice that all nodes started and discovered each other automatically with no drama. You have exactly zero configuration to worry about and everything works completely out-of-the-box.

Now that we have the cluster running let’s write the code. Open your favorite IDEA or text editor and type this:

[source lang="scala"]
import org.gridgain.scalar.scalar
import scalar._

object Main extends App {
scalar("examples/config/spring-cache-popularcounts.xml") {
println("Non-space chars: " + grid$.spreadReduce(
for (w <- input.split(" ")) yield () => w.length)(_.sum))
}
}
[/source]

Depending on what build system you use (SBT, Ant, IDEA, Eclipse, etc.) you just need to include the libs from GridGain (main JAR + JARs in '/libs' subfolder) - and compile.

If everything compiles - just RUN it passing it some input string. That’s all there is to it:



Let me quickly explain what’s happening here (it will apply to the following example as well):

  • First we use scalar "keyword" passing it a path to configuration XML file to startup a node from within our Scala app.
  • grid$ denotes a global projection on all node in the cluster (GridGain employes functional API in its core). Projection provides a monadic set of operations available on any arbitraty set of GridGain nodes.
  • We use method spreadReduce(...) on projection that takes two curried arguments:

    • set of closures to spread-execute on the cluster, and
    • reduction function that will be used to aggregate the remote results.

  • When spreadReduce(...) completes (and it's a synch call among synch and async options) - it returns the non-space count of characters.


Now - let me ask you a question... Did you notice any deployment steps, any Ant, Maven, any copying of JAR or any redeploying after we’ve changed the code?

The answer is no. GridGain provides pretty unique zero deployment technology that allows for complete on-demand class deployment throughout the cluster - leaving you the developer to simply write the code and run your applications as you would do locally. Pretty nifty, isn't it?

Examples 2


Ok, now that we tried something very simple and trivial let’s develop a full featured streaming MapReduce app using what we've learned so far. We'll adopt a canonical example from Hadoop: we’ll ingest number of books into in-memory data grid, and will find 10 most frequent words from those books.

The way we'll be doing it is via streaming MapReduce:
while we are loading books into memory we will be continuously querying the data grid for 10 most frequent words. As data gets loaded the results will change, and when all books are fully loaded we’ll get our correct (and final) tally of 10 most frequent words.


Unlike Hadoop example:

  • We’ll show both programmatic ingestion and querying in one application (no need to pre-copy any stuff into anything like HDFS), and
  • We’ll develop this application in true streaming fashion, i.e. we won’t wait until all data is loaded and we’ll start querying concurrently before all data is loaded


Here's the full source code:
[source lang="scala"]
import org.gridgain.scalar.scalar
import scalar._
import org.gridgain.grid.typedef.X
import java.io.File
import io.Source
import java.util.Timer
import actors.threadpool._

object ScalarPopularWordsRealTimeExample extends App {
private final val WORDS_CNT = 10
private final val BOOK_PATH =
"examples/java/org/gridgain/examples/realtime/books"

type JINT = java.lang.Integer

val dir = new File(X.getSystemOrEnv("GRIDGAIN_HOME"), BOOK_PATH)

if (!dir.exists)
println("Input directory does not exist: " + dir.getAbsolutePath)
else
scalar("examples/config/spring-cache-popularcounts.xml") {
val pool = Executors.newFixedThreadPool(dir.list.length)
val timer = new Timer("words-query-worker")

try {
timer.schedule(timerTask(() => query(WORDS_CNT)), 3000, 3000)

// Populate cache & force one more run to get the final counts.
ingest(pool, dir)
query(WORDS_CNT)

// Clean up after ourselves.
grid$.projectionForCaches(null).bcastRun(
() => grid$.cache().clearAll())
}
finally {
timer.cancel()
pool.shutdownNow()
}
}

def ingest(pool: ExecutorService, dir: File) {
val ldr = dataLoader$[String, Int](null, 2048, 8, 128)

// For every book, allocate a new thread from the pool and start
// populating cache with words and their counts.
try {
(for (book <- dir.list()) yield
pool.submit(() => Source.fromFile(new File(dir, book), "ISO-8859-1").
getLines().foreach(
line => for (w <- line.split("[^a-zA-Z0-9]") if !w.isEmpty)
ldr.addData(w, (i: Int) => if (i == null) 1 else i + 1)
))).foreach(_.get)
}
finally
ldr.close(false) // Wait for data loader to complete.
}

def query(cnt: Int) {
cache$[String, JINT].get.sql(grid$.projectionForCaches(null),
"length(_key) > 3 order by _val desc limit " + cnt).
toIndexedSeq.sortBy[JINT](_._2).reverse.take(cnt).foreach(println _)

println("------------------")
}
}
[/source]

Few notes about the code in general:

  • We use the books that are shipped with GridGain's examples
  • We are passing specific XML configuration file for 'scalar' keyword (it configures TCP discovery and partitioned cache with one backup)
  • We use a simple timer to run a query every 3 seconds while we are loading the books
  • After everything is done - we are cleaning after ourselves (so that you can run this app multiple times without leaving garbage in the data grid)


Notes about ingest(...) and query(...) method:

  • We use GridGain’s data loader in ingest(...) method that provides advanced back-pressure management for asynchronous bulk load distributed operations
  • We use method sql(...) on cache projection (cache projections provide monadic set of data grid operations) to issue a simple distributed SQL query
  • In GridGain you can omit “select * from table” in most cases, and just supply a where clause


That’s all there is to it. Compile it (as always, no deployment or redeployment is necessary) and run it. You will see print out of 10 most frequent words every 3 second while books are being read on and put into the data grid:



Final Thoughts


In about 50 lines of code we’ve put together both ingestion and querying streaming MapReduce app. We’ve run it on the local cluster - and it will run just the same way on 3, 10, 100s or 1000s of nodes deployed anywhere in the world (as long as we have some way to connect to them).

Keep in mind that this is obviously a very simply (almost trivialized) example of streaming MapReduce. Yet with additional few lines of code you can replace book with, let’s say, Twitter firehose keyed by hashtags, and print outs with updates to your social dashboard - and you get a pretty useful system tracking most popular Twitter hashtags in real time in a few hundred lines of code - while automatically scaling to 100s terabytes of data being processed on 1000s of nodes.

Not bad at all!