Continuous Queries
Continuous queries monitors data modifications in a cache. All update events are propagated to the local subscriber. Continuous query implementation guarantees exactly once delivery of an event to the subscriber.
Guarantees
Continuous queries provide the following guarantees:
-
Exactly-once delivery.
-
Order is preserved within a partition. For example, if a key gets updated multiple times, GridGain will always preserve the update order.
-
Order is not preserved across multiple partitions. For example, if keys belonging to different partitions get updated, the order of events is not defined.
-
Continuous queries only observe committed changes. Changes made within an explicit transaction are only observed after you commit it.
-
It is not guaranteed that all changes from a single transaction are a part of a single
TableRowEventBatchevent. A transaction can affect more rows thanpageSize.
Creating a Subscriber
When you modify a cache (insert, update, or delete an entry), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.
Note that the continuous query throws an exception if started without a local listener.
public class SubscriberExample implements Flow.Subscriber<TableRowEventBatch<Tuple>> {
private volatile Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(TableRowEventBatch<Tuple> batch) {
List<TableRowEvent<Tuple>> items = batch.rows();
for (TableRowEvent<Tuple> item : items) {
System.out.println("onNext: " + item.type() + ", old=" + item.oldEntry() + ", new=" + item.entry());
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError: " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
void cancel() {
if (subscription != null) {
subscription.cancel();
}
}
}
ITable? table = await Client.Tables.GetTableAsync("Person");
IRecordView<IIgniteTuple> view = table!.RecordBinaryView;
IAsyncEnumerable<ITableRowEventBatch<IIgniteTuple>> continuousQuery = view.QueryContinuouslyAsync();
// Add data after the query is started.
await view.UpsertAsync(null, new IgniteTuple { ["Id"] = 1, ["Name"] = "John" });
await view.UpsertAsync(null, new IgniteTuple { ["Id"] = 2, ["Name"] = "Jane" });
await foreach (ITableRowEventBatch<IIgniteTuple> batch in continuousQuery)
{
foreach (ITableRowEvent<IIgniteTuple> rowEvent in batch.Events)
{
Console.WriteLine(rowEvent);
}
}
Continuous Query Watermark
When starting a continuous query, GridGain creates a watermark object that represents the continuous query cursor position. This cursor object can be used in a variety of ways to make your work with continuous queries more consistent.
Resuming Continuous Queries
When you need to resume the continuous query, you can continue it at the current watermark position by passing the watermark. The watermark must be stored to guarantee the continuous query can be resumed and passed to the continuous query when it is restarted.
The example below shows how you can store the watermark every time data is updated on a subscriber.
public ContinuousQueryWatermark latestWatermark;
@Override
public void onNext(TableRowEventBatch<Tuple> batch) {
for (TableRowEvent<Tuple> event : batch.rows()) {
latestWatermark = event.watermark();
}
}
Then, when you need to resume a continuous query, pass the watermark to it.
// Pass the watermark to continuous query.
ContinuousQueryOptions options = ContinuousQueryOptions.builder().watermark(latestWatermark).build();
// Start the continuous query.
accounts.queryContinuously(subscriber, options);
Starting Continuous Queries in the Past
By default, continuous queries start in present. By setting the watermark in the past, you can choose a specific time to start the continuous query at. All updates since the specified timestamp will be propagated automatically.
You need to pass the time with Instant data type to the ContinuousQueryWatermark.ofInstant() method.
The example below starts the continuous query 1 hour in the past.
// Get the time one hour ago.
Instant startTime = Instant.now().minus(Duration.ofHours(1));
// Create a watermark object.
ContinuousQueryWatermark wm = ContinuousQueryWatermark.ofInstant(startTime);
// Pass the watermark to continuous query.
ContinuousQueryOptions options = ContinuousQueryOptions.builder().watermark(wm).build();
// Start the continuous query.
accounts.queryContinuously(subscriber, options);
Starting Continuous Queries From Transaction Timestamp
When starting a continuous query, you can get a timestamp of a read-only transaction and pass it to the query to start from exactly the timestamp of the transaction. This way, you can read data from your GridGain cluster with a transaction, apply this data as a snapshot, and then start the continuous query from exactly the point in time the transaction was executed at, without missing or duplicating any updates, ensuring exactly once delivery.
The continuous query started this way behaves the same way as a query started in the past, except instead of an Instant it starts at the specific point in time for which the transaction gathered data.
The example below shows how you can read data from a view with a read-only transaction:
// This is a sample view that we will work with.
KeyValueView<Long, Account> accounts = table.keyValueView(Mapper.of(Long.class), Mapper.of(Account.class));
// Create transaction configuration that sets it to read-only mode.
var txOpts = new TransactionOptions().readOnly(true);
// Start a transaction.
Transaction tx = client.transactions().begin(txOpts);
// Read data from the view into a list.
List<Entry<Long, Account>> initialRows = new ArrayList<>();
try (Cursor<Entry<Long, Account>> initialQuery = accounts.query(tx, null)) {
initialQuery.forEachRemaining(initialRows::add);
}
// Commit the transaction.
tx.commit();
After the transaction completes, you can apply this data in the appropriate way for your application.
Then, use the ContinuousQueryWatermark.afterTransaction() method to obtain the transaction timestamp, and start your continuous query from that point by setting a watermark to it.
The example below starts a simple continuous query starting from the watermark:
// Create a watermark at the transaction timestamp.
ContinuousQueryWatermark wm = ContinuousQueryWatermark.afterTransaction(tx);
// Pass the watermark to continuous query.
ContinuousQueryOptions options = ContinuousQueryOptions.builder().watermark(wm).build();
// Start the continuous query.
accounts.queryContinuously(subscriber, options);
Dedicated Executor Thread
When a continuous query is created, it accepts a Subscriber which gets invoked when the query receives data. By default, Subscriber’s methods are executed on the Java’s Common Thread Pool. You can fine-tune the thread pool used by creating a dedicated executor and passing it in the continuous query options. This guarantees that the Subscriber’s methods will be handled in a dedicated thread pool, providing better control over resource allocation.
Below is the example of creating an executor and using it in your continuous query to the subscriber defined above:
ExecutorService executor = Executors.newSingleThreadExecutor();
var options = ContinuousQueryOptions.builder()
.executor(executor)
.build();
view.queryContinuously(subscriber, options);
Continuous Query Parameters
You can configure the following properties of continuous queries:
-
pageSize- The number of entries returned from a single partition in one network call. Default value:1000. -
pollIntervalMs- Poll interval, in milliseconds. Default value:1000. -
skipOldEntries- Whentrue,TableRowEvent#oldEntry()will returnnullforTableRowEventType#UPDATEDevents. It helps to reduce network traffic while running a continuous query and avoid sending old entries to the subscriber. -
startTimestampMillis- Start timestamp in epoch time. -
executor- The executor thread to use for continuous queries. -
partitions- The list of table partitions that will be involved in the continuous query. By default, all partitions are included. -
watermark- The watermark transaction that will be applied before starting the continuous query.
The example below shows how to configure and execute a continuous query:
var options = ContinuousQueryOptions.builder()
.pollIntervalMs(10)
.pageSize(pageSize)
.skipOldEntries(false)
.build();
view.queryContinuously(subscriber, options);
var options = new ContinuousQueryOptions
{
ColumnNames = ["id", "name"],
EventTypes = [TableRowEventType.Created, TableRowEventType.Updated],
PageSize = 42,
PollInterval = TimeSpan.FromSeconds(0.5),
Watermark = IContinuousQueryWatermark.FromInstant(Instant.FromDateTimeUtc(
DateTime.UtcNow.AddDays(-1))),
EnableEmptyBatches = false
};
Continuous Query Events
These event types describe the change delivered to Subscriber in TableRowEvent when continuous query is executing.
| Event Type | Description |
|---|---|
|
Row created. |
|
Row updated. |
|
Row removed. |
|
Row archived. This event happens when you have a table with |
© 2025 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.