Continuous Queries
Continuous queries monitors data modifications in a cache. All update events are propagated to the local subscriber. Continuous query implementation guarantees exactly once delivery of an event to the subscriber.
Guarantees
Continuous queries provide the following guarantees:
-
Exactly-once delivery.
-
Order is preserved within a partition. For example, if a key gets updated multiple times, GridGain will always preserve the update order.
-
Order is not preserved across multiple partitions. For example, if keys belonging to different partitions get updated, the order of events is not defined.
-
Continuous queries only observe committed changes. Changes made within an explicit transaction are only observed after you commit it.
-
It is not guaranteed that all changes from a single transaction are a part of a single
TableRowEventBatch
event. A transaction can affect more rows thanpageSize
.
Creating a Subscriber
When you modify a cache (insert, update, or delete an entry), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.
Note that the continuous query throws an exception if started without a local listener.
public class SubscriberExample implements Flow.Subscriber<TableRowEventBatch<Tuple>> {
private volatile Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(TableRowEventBatch<Tuple> batch) {
List<TableRowEvent<Tuple>> items = batch.rows();
for (TableRowEvent<Tuple> item : items) {
System.out.println("onNext: " + item.type() + ", old=" + item.oldEntry() + ", new=" + item.entry());
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError: " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
void cancel() {
if (subscription != null) {
subscription.cancel();
}
}
}
ITable? table = await Client.Tables.GetTableAsync("Person");
IRecordView<IIgniteTuple> view = table!.RecordBinaryView;
IAsyncEnumerable<ITableRowEventBatch<IIgniteTuple>> continuousQuery = view.QueryContinuouslyAsync();
// Add data after the query is started.
await view.UpsertAsync(null, new IgniteTuple { ["Id"] = 1, ["Name"] = "John" });
await view.UpsertAsync(null, new IgniteTuple { ["Id"] = 2, ["Name"] = "Jane" });
await foreach (ITableRowEventBatch<IIgniteTuple> batch in continuousQuery)
{
foreach (ITableRowEvent<IIgniteTuple> rowEvent in batch.Events)
{
Console.WriteLine(rowEvent);
}
}
Dedicated Executor Thread
When a continuous query is created, it accepts a Subscriber which gets invoked when the query receives data. By default, Subscriber’s methods are executed on the Java’s Common Thread Pool. You can fine-tune the thread pool used by creating a dedicated executor and passing it in the continuous query options. This guarantees that the Subscriber’s methods will be handled in a dedicated thread pool, providing better control over resource allocation.
Below is the example of creating an executor and using it in your continuous query to the subscriber defined above:
ExecutorService executor = Executors.newSingleThreadExecutor();
var options = ContinuousQueryOptions.builder()
.executor(executor)
.build();
view.queryContinuously(subscriber, options);
Continuous Query Parameters
You can configure the following properties of continuous queries:
-
pageSize
- The number of entries returned from a single partition in one network call. Default value:1000
. -
pollIntervalMs
- Poll interval, in milliseconds. Default value:1000
. -
skipOldEntries
- Whentrue
,TableRowEvent#oldEntry()
will returnnull
forTableRowEventType#UPDATED
events. It helps to reduce network traffic while running a continuous query and avoid sending old entries to the subscriber. -
startTimestampMillis
- Start timestamp in epoch time. -
executor
- The executor thread to use for continuous queries. -
partitions
- The list of table partitions that will be involved in the continuous query. By default, all partitions are included.
The example below shows how to configure and execute a continuous query:
var options = ContinuousQueryOptions.builder()
.pollIntervalMs(10)
.pageSize(pageSize)
.skipOldEntries(false)
.build();
view.queryContinuously(subscriber, options);
var options = new ContinuousQueryOptions
{
ColumnNames = ["id", "name"],
EventTypes = [TableRowEventType.Created, TableRowEventType.Updated],
PageSize = 42,
PollInterval = TimeSpan.FromSeconds(0.5),
Watermark = IContinuousQueryWatermark.FromInstant(Instant.FromDateTimeUtc(
DateTime.UtcNow.AddDays(-1))),
EnableEmptyBatches = false
};
Continuous Query Events
These event types describe the change delivered to Subscriber
in TableRowEvent
when continuous query is executing.
Event Type | Description |
---|---|
|
Row created. |
|
Row updated. |
|
Row removed. |
|
Row archived. This event happens when you have a table with |
© 2025 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.