GridGain Developers Hub

Continuous Queries

Continuous queries monitors data modifications in a cache. All update events are propagated to the local subscriber. Continuous query implementation guarantees exactly once delivery of an event to the subscriber.

Creating a Subscriber

When you modify a cache (insert, update, or delete an entry), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.

Note that the continuous query throws an exception if started without a local listener.

public static void UseMySubscriber(Table table) throws InterruptedException {
    RecordView < Tuple > view = table.recordView();
    var subscriber = new MySubscriber();

    view.queryContinuously(subscriber, null);

    view.upsert(null, Tuple.create().set("id", 3).set("name", "John Doe"));
    view.upsert(null, Tuple.create().set("id", 3).set("name", "Jane Doe"));
    view.delete(null, Tuple.create().set("id", 3));

    // Wait for some events.
    Thread.sleep(3000);
}

private static class MySubscriber implements Flow.Subscriber < TableRowEventBatch < Tuple >> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(TableRowEventBatch < Tuple > batch) {
        List < TableRowEvent < Tuple >> items = batch.rows();
        for (TableRowEvent < Tuple > item: items) {
            System.out.println("onNext: " + item.type() + ", old=" + item.oldEntry() + ", new=" + item.entry());
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError: " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

Dedicated Executor Thread

When a continuous query is created, it accepts a Subscriber which gets invoked when the query receives data. By default, Subscriber’s methods are executed on the Java’s Common Thread Pool. You can fine-tune the thread pool used by creating a dedicated executor and passing it in the continuous query options. This guarantees that the Subscriber’s methods will be handled in a dedicated thread pool, providing better control over resource allocation.

Below is the example of creating an executor and using it in your continuous query to the subscriber defined above:

ExecutorService executor = Executors.newSingleThreadExecutor();

var subscriber = new MySubscriber();
var options = ContinuousQueryOptions.builder()
    .executor(executor)
    .build();

view.queryContinuously(subscriber, options);

Continuous Query Parameters

You can configure the following properties of continuous queries:

  • pageSize - The number of entries returned from a single partition in one network call. Default value: 1000.

  • pollIntervalMs - Poll interval, in milliseconds. Default value: 1000.

  • startTimestampMillis - Start timestamp in epoch time.

  • executor - The executor thread to use for continuous queries.

The example below shows how to configure and execute a continuous query:

var subscriber = new MySubscriber<Tuple>(count);
var options = ContinuousQueryOptions.builder()
    .pollIntervalMs(10)
    .pageSize(pageSize)
    .build();

view.queryContinuously(subscriber, options);