Counting Words In Real Time On GridGain

We have promised a while back to publish the code from live coding GridGain presentation we did at QCon London earlier this year. Since presentation was in Scala, the code we will be posting here is in Scala.

First a brief intro. We all know Hadoop's counting words example which takes a file with words and then produces another file with number of occurrences next to each word. Hadoop does this example very well, however the main caveat with Hadoop's example is that it is not real time.

The counting words example we did at QCon actually counted words in real time. The program was split into two parts. First part is responsible for loading the words in real time into GridGain data grid, and the second part was querying the grid every 3 seconds to continuously print out top 10 words stored so far.

The example was done using 'Scalar' - GridGain DSL for Scala, but it could have been done In Java as well using GridGain Java APIs.

Continuously Populate Words In Real Time


Let's start by continuously loading data grid with new words. To do that, we downloaded several books in text format and started concurrently reading them from the populate(...) method, one thread per book. For every word read, we store it in cache, having the word itself as a key and number of current occurrences as a value. Also note how we let grid asynchronously update cache using asynchronous run while reading the next line from the book file (in reality you would most likely have more than one asynchronous job or have GridGain data loading functionality do it for you).

Populate Words In Real Time

def populate(threadPool: CompletionService, dir: File) {
	 val bookFileNames = dir.list()

	 // For every book, start a new thread and start populating cache
	 // with words and their counts.
	 for (bookFileName <- bookFileNames) {
	 	threadPool.submit(new Callable {
	 def call() = {
	val cache = grid$.cache[String, JInt]

	var fut: GridFuture[_] = null;

	 Source.fromFile(new File(dir, name)).getLines().foreach(line => {
	 line.split("[^a-zA-Z0-9]").foreach(word => {
	 if (!word.isEmpty) {
	 if (fut != null)
	 fut.get()

	 fut = grid$.affinityRunAsync(null, word, () => {
	 // Increment word counter and store it in cache.
	 // We use cache transaction to make sure that
	 // gets and puts are consistent and atomic.
	 cache.inTx(
	 () => cache += (word -> (cache.getOrElse(word, 0) + 1))
	 )

	 ()
	 })
	 }
	 })
	 })

	 None // Return nothing.
	 }
	 })
	 }

	 // Wait for all threads to finish.
	 books.foreach(_ => threadPool.take().get())
 }


Distributed SQL Query


Now let's implement our distributed query against GridGain data grid which will run every 3 seconds. Note that we are using standard SQL syntax to query remote grid nodes. Interesting enough that GridGain data grid allows you to use SQL virtually without any limitations. You can use any native SQL function and even SQL JOINs between different classes. Here, for example, we are using SQL length(...) function to only query words greater than 3 letters long just to get rid of frequent short articles like "a" or "the" in our searches. We are also using desc keyword to sort word counts in descending order and limitkeyword to limit our selection only to 10 words.

Distributed SQL Query


def queryPopularWords(cnt: Int) {
 // Type alias for sequences of strings (for readability only).
 type SSeq = Seq[String] 

 grid$.cache[String, JInt].sqlReduce(
 // PROJECTION (where to run):
 grid$.projectionForCaches(null),
 // SQL QUERY (what to run):
 "length(_key) > 3 order by _val desc limit " + cnt,
 // REMOTE REDUCER (how to reduce on remote nodes):
 (it: Iterable[(String, JInt)]) =>
 // Pre-reduce by converting 
 // Seq[(String, JInt)] to Map[JInt, Seq[String]].
 (it : Map.empty[JInt, SSeq])((e, m) => 
 m + (e._2 -> (m.getOrElse(e._2, Seq.empty[String]) :+ e._1))),
 // LOCAL REDUCER (how to finally reduce on local node):
 (it: Iterable[Map[JInt, SSeq]]) => {
 // Print 'cnt' of most popular words collected from all remote nodes.
 (new TreeMap()(implicitly[Ordering[JInt]].reverse) ++ it.flatten)
 .take(cnt).foreach(println _)

 println("------------") // Formatting.
 }
 )
 }



Start Example


And finally let's implement our main(...) method that calls our populate(...) and queryPopularWords(...) methods we just defined.

Start Example

def main(args: Array[String]) {
 // Initialize book directory
 val bookDir = new File(BOOK_PATH);
 
 // Start GridGain with specified configuration file.
 scalar("examples/config/spring-cache-popularwords.xml") {
 // Create as many threads as we have book, so we can use
 // thread per book to load data grid concurrently.
 val threadPool = Executors.newFixedThreadPool(bookDir.list.length);
 
 val popWordsQryTimer = new Timer("words-query-worker");
 
 try {
 // Schedule word queries to run every 3 seconds.
 popWordsQryTimer.schedule(new TimerTask {
 def run() {
 queryPopularWords(10) // Query top 10 words from data grid.
 }
 }, 3000, 3000)
 
 // Populate cache with word counts.
 populate(new ExecutorCompletionService(threadPool), bookDir)
 
 // Force one more run to print final counts.
 queryPopularWords(POPULAR_WORDS_CNT)
 }
 finally {
 popWordsQryTimer.cancel() // Cancel timer.
 
 threadPool.shutdownNow() // Graceful shutdown.
 }
 }
 }


To execute the example, start several GridGain stand-alone nodes using examples/config/spring-cache-popularwords.xml configuration file and then start the example we just created from IDE. You may wish to add more printouts for better visibility of what's happening.

This example is also shipped with GridGain 4.0 and also available in GridGain GitHub Repository.