GridGain Developers Hub

Continuous Queries

Continuous queries monitors data modifications in a cache. All update events are propagated to the local subscriber. Continuous query implementation guarantees exactly once delivery of an event to the subscriber.

Guarantees

Continuous queries provide the following guarantees:

  • Exactly-once delivery.

  • Order is preserved within a partition. For example, if a key gets updated multiple times, GridGain will always preserve the update order.

  • Order is not preserved across multiple partitions. For example, if keys belonging to different partitions get updated, the order of events is not defined.

  • Continuous queries only observe committed changes. Changes made within an explicit transaction are only observed after you commit it.

  • It is not guaranteed that all changes from a single transaction are a part of a single TableRowEventBatch event. A transaction can affect more rows than pageSize.

Creating a Subscriber

When you modify a cache (insert, update, or delete an entry), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.

Note that the continuous query throws an exception if started without a local listener.

public class SubscriberExample implements Flow.Subscriber<TableRowEventBatch<Tuple>> {

    private volatile Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(TableRowEventBatch<Tuple> batch) {
        List<TableRowEvent<Tuple>> items = batch.rows();
        for (TableRowEvent<Tuple> item : items) {
            System.out.println("onNext: " + item.type() + ", old=" + item.oldEntry() + ", new=" + item.entry());
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError: " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }

    void cancel() {
        if (subscription != null) {
            subscription.cancel();
        }
    }
}
ITable? table = await Client.Tables.GetTableAsync("Person");
IRecordView<IIgniteTuple> view = table!.RecordBinaryView;
IAsyncEnumerable<ITableRowEventBatch<IIgniteTuple>> continuousQuery = view.QueryContinuouslyAsync();

// Add data after the query is started.
await view.UpsertAsync(null, new IgniteTuple { ["Id"] = 1, ["Name"] = "John" });
await view.UpsertAsync(null, new IgniteTuple { ["Id"] = 2, ["Name"] = "Jane" });
await foreach (ITableRowEventBatch<IIgniteTuple> batch in continuousQuery)
{
    foreach (ITableRowEvent<IIgniteTuple> rowEvent in batch.Events)
    {
        Console.WriteLine(rowEvent);
    }
}

Dedicated Executor Thread

When a continuous query is created, it accepts a Subscriber which gets invoked when the query receives data. By default, Subscriber’s methods are executed on the Java’s Common Thread Pool. You can fine-tune the thread pool used by creating a dedicated executor and passing it in the continuous query options. This guarantees that the Subscriber’s methods will be handled in a dedicated thread pool, providing better control over resource allocation.

Below is the example of creating an executor and using it in your continuous query to the subscriber defined above:

ExecutorService executor = Executors.newSingleThreadExecutor();

var options = ContinuousQueryOptions.builder()
        .executor(executor)
        .build();

view.queryContinuously(subscriber, options);

Continuous Query Parameters

You can configure the following properties of continuous queries:

  • pageSize - The number of entries returned from a single partition in one network call. Default value: 1000.

  • pollIntervalMs - Poll interval, in milliseconds. Default value: 1000.

  • skipOldEntries - When true, TableRowEvent#oldEntry() will return null for TableRowEventType#UPDATED events. It helps to reduce network traffic while running a continuous query and avoid sending old entries to the subscriber.

  • startTimestampMillis - Start timestamp in epoch time.

  • executor - The executor thread to use for continuous queries.

  • partitions - The list of table partitions that will be involved in the continuous query. By default, all partitions are included.

The example below shows how to configure and execute a continuous query:

var options = ContinuousQueryOptions.builder()
        .pollIntervalMs(10)
        .pageSize(pageSize)
        .skipOldEntries(false)
        .build();

view.queryContinuously(subscriber, options);
var options = new ContinuousQueryOptions
{
    ColumnNames = ["id", "name"],
    EventTypes = [TableRowEventType.Created, TableRowEventType.Updated],
    PageSize = 42,
    PollInterval = TimeSpan.FromSeconds(0.5),
    Watermark = IContinuousQueryWatermark.FromInstant(Instant.FromDateTimeUtc(
        DateTime.UtcNow.AddDays(-1))),
    EnableEmptyBatches = false
};

Continuous Query Events

These event types describe the change delivered to Subscriber in TableRowEvent when continuous query is executing.

Event Type Description

CREATED

Row created.

UPDATED

Row updated.

REMOVED

Row removed.

ARCHIVED

Row archived. This event happens when you have a table with ARCHIVE AT condition set and rows get archived according to this condition, meaning those rows are removed on a primary storage but will still be available on the secondary storage.