Streaming Data
Data streaming provides a fast, efficient method for loading, organizing, and distributing large volumes of data across your cluster. Data streamer accepts a stream of data and distributes data entries across the cluster, where the processing takes place. Data streaming is available in all table views.

Data streaming provides at-least-once delivery guarantee.
Using Data Streamer API
The Data Streamer API lets you load large amounts of data into your cluster quickly and reliably using a publisher–subscriber model, where you create a publisher that streams your data entries to a table view, and the system distributes these entries across the cluster. You can configure how the data is processed via a DataStreamerOptions
object that allows to set batch sizes, auto-flush intervals, retry limits.
Configuring Data Streamer
DataStreamerOptions
lets you fine-tune how data is streamed into your cluster by setting parameters for batching, retries, parallelism, and auto-flush timing:
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();
var options = new DataStreamerOptions
{
PageSize = 1000,
RetryLimit = 8,
AutoFlushInterval = TimeSpan.FromSeconds(3)
};
-
pageSize
: Specifies the number of entries to process in each page or chunk of data. -
perPartitionParallelOperations
: Determines the number of parallel operations allowed on each partition. -
autoFlushInterval
: Defines the time interval (in milliseconds) after which the system automatically flushes any incomplete buffers. -
retryLimit
: Specifies the maximum number of retry attempts for a failed data submission before giving up.
Streaming Data
Before data is streamed to the cluster, each entry must be wrapped in an instance of the DataStreamerItem<T>
class. This wrapper allows you to perform PUT
and REMOVE
operations with data:
-
Use
DataStreamerItem.of(entry)
to insert new entries into the table. -
Use
DataStreamerItem.removed(entry)
to delete existing ones.
Wrapped data then can be passed to a publisher and streamed to the table.
The example below demonstrates how to use RecordView
, create a publisher, configure the data streamer, insert account records into the existing accounts
table and then delete them:
public class RecordViewPojoDataStreamerExample {
public static void main(String[] args) throws Exception {
/**
* Assuming the 'accounts' table already exists.
*/
try (IgniteClient client = IgniteClient.builder()
.addresses("127.0.0.1:10800")
.build()) {
RecordView<Account> view = client.tables().table("accounts").recordView(Account.class);
streamAccountDataPut(view);
streamAccountDataRemove(view);
}
}
/**
* Streaming data using DataStreamerOperationType#PUT operation type.
*/
private static void streamAccountDataPut(RecordView<Account> view) {
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < ACCOUNTS_COUNT; i++) {
Account entry = new Account(i, "name" + i, rnd.nextLong(100_000), rnd.nextBoolean());
publisher.submit(DataStreamerItem.of(entry));
}
}
streamerFut.join();
}
/**
* Streaming data using DataStreamerOperationType#REMOVE operation type
*/
private static void streamAccountDataRemove(RecordView<Account> view) {
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(1)
.autoFlushInterval(1000)
.retryLimit(16)
.build();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options);
for (int i = 0; i < ACCOUNTS_COUNT; i++) {
Account entry = new Account(i);
publisher.submit(DataStreamerItem.removed(entry));
}
}
streamerFut.join();
}
}
using Apache.Ignite;
using Apache.Ignite.Table;
using var client = await IgniteClient.StartAsync(new("localhost"));
ITable? table = await client.Tables.GetTableAsync("accounts");
IRecordView<Account> view = table!.GetRecordView<Account>();
var options = new DataStreamerOptions
{
PageSize = 10_000,
AutoFlushInterval = TimeSpan.FromSeconds(1),
RetryLimit = 32
};
await view.StreamDataAsync(GetAccountsToAdd(5_000), options);
await view.StreamDataAsync(GetAccountsToRemove(1_000), options);
async IAsyncEnumerable<DataStreamerItem<Account>> GetAccountsToAdd(int count)
{
for (int i = 0; i < count; i++)
{
yield return DataStreamerItem.Create(
new Account(i, $"Account {i}"));
}
}
async IAsyncEnumerable<DataStreamerItem<Account>> GetAccountsToRemove(int count)
{
for (int i = 0; i < count; i++)
{
yield return DataStreamerItem.Create(
new Account(i, string.Empty), DataStreamerOperationType.Remove);
}
}
public record Account(int Id, string Name);
Tracking Failed Entries
If the data streamer fails to process any entries, it collects the failed items in a DataStreamerException
. You can catch this exception and access the failed entries using the failedItems()
method, as shown in the example below.
You can catch both asynchronous errors during background streaming and immediate submission errors:
RecordView<Account> view = client.tables().table("accounts").recordView(Account.class);
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Account>>()) {
streamerFut = view.streamData(publisher, options)
.exceptionally(e -> {
System.out.println("Failed items during background streaming: " +
((DataStreamerException)e.getCause()).failedItems());
return null;
});
/** Trying to insert an account record. */
Account entry = new Account(1, "Account name", rnd.nextLong(100_000), rnd.nextBoolean());
publisher.submit(DataStreamerItem.of(entry));
} catch (DataStreamerException e) {
/** Handle entries that failed during submission. */
System.out.println("Failed items during submission: " + e.failedItems());
}
streamerFut.join();
ITable? table = await Client.Tables.GetTableAsync("my-table");
IRecordView<IIgniteTuple> view = table!.RecordBinaryView;
IList<IIgniteTuple> data = [new IgniteTuple { ["key"] = 1L, ["val"] = "v" }];
try
{
await view.StreamDataAsync(data.ToAsyncEnumerable());
}
catch (DataStreamerException e)
{
Console.WriteLine("Failed items: " + string.Join(",", e.FailedItems));
}
Tuning Memory Usage
Data streamer may require significant amount of memory to handle the requests in orderly manner. Depending on your environment, you may want to increase or reduce the amount of memory reserved by the data streamer.
For every node in the cluster, the streamer reserves an amount of memory equal to pageSize
(1000 entries by default) multiplied by perPartitionParallelOperations
(1 by default) setting. For example, a 10-partition table with default parameters and average entry size of 1KB will reserve 10MB for operations.
You can change these options while creating a DataStreamerOptions
object:
RecordView<Tuple> view = client.tables().table("accounts").recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.batchSize(10_000)
.perNodeParallelOperations(10)
.build();
streamerFut = view.streamData(publisher, options);
// .NET streamer does not have a perNodeParallelOperations option yet.
var options = new DataStreamerOptions
{
PageSize = 10_000
};
Additionally, the data streamer periodically flushes incomplete buffers to ensure that messages are not delayed indefinitely. This is especially useful when a buffer fills slowly or never completely fills due to uneven data distribution.
This behavior is controlled by the autoFlushInterval
property, which is set to 5000 ms by default. You can also configure the retryLimit
parameter to define the maximum number of retry attempts for failed submissions, with a default value of 16.
© 2025 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.