Distributed Computing
GridGain 9 provides an API for distributing computations across cluster nodes in a balanced and fault-tolerant manner. You can submit individual tasks for execution from Java and .NET clients.
Compute tasks execute user code on nodes. Thus, before submitting a compute job, you should deploy the code you want to execute to the nodes where you plan to perform your computing jobs. Depending on your needs, jobs may be executed on a single node, several nodes, or all nodes in the cluster. GridGain also provides APIs for asynchronous job execution.
Often, compute tasks work with data that is stored in the cluster, and choosing arbitrary nodes may cause additional load on the cluster, as required data is sent to the node. You can avoid this by performing Colocated Computations. In this case, tasks will be executed on nodes that hold the data required for them.
Compute jobs are submitted to specific nodes in the cluster. If other nodes are required, internal cluster connections will be used to send them the data.
User objects must be marshalled before sending them to the node. For most common scenarios (tuples, POJOs, native types), this will be handled automatically, but in more complicated scenarios, you may need to write your own marshalling logic.
Executing Jobs
Single Node Execution
Often, you need to perform a job on one node in the cluster. In this case, there are multiple ways to start job execution:
-
submitAsync()
- sends the job to the cluster and returns a future that will be completed with theJobExecution
object when the job is submitted for execution. -
executeAsync()
- sends the job to the cluster and returns a future that will be completed when job execution result is ready. -
execute()
- sends the job to the cluster and waits for the result of job execution.
When submitting a job, a JobTarget object must be created. Job target can point to a specific node, any node on the cluster, or start a colocated compute job, that will be executed on nodes that hold a specific key. The following methods are available:
-
JobTarget.anyNode()
- the job will be executed on any of the specified nodes. -
JobTarget.node()
- the job will be executed on the specific node. -
JobTarget.colocated()
- the job will be executed on a node that holds the key.
The example below assumes that the NodeNameJob
class has been deployed to the node by using code deployment and performs the job on any node in the cluster.
public static void example() throws ExecutionException, InterruptedException {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
String executionResult = client.compute().execute(JobTarget.anyNode(client.clusterNodes()),
JobDescriptor.builder(NodeNameJob.class).build(), null
);
System.out.println(executionResult);
}
ICompute compute = Client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
IJobExecution<string> execution = await compute.SubmitAsync(
JobTarget.AnyNode(nodes),
new JobDescriptor<string, string>("org.example.NodeNameJob"),
arg: "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
.deployment_units({deployment_unit{"unitName", "1.1.1"}})
.build();
job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();
Multiple Node Execution
To execute the compute task on multiple nodes, you use the same methods as for single node execution, except instead of creating a JobTarget
object to designate execution nodes you use the BroadcastJobTarget
and specify the list of nodes that the job must be executed on.
The BroadcastJobTarget
object can specify the following:
-
BroadcastJobTarget.nodes()
- the job will be executed on all nodes in the list. -
BroadcastJobTarget.table()
- the job will be executed on all nodes that hold partitions of the specified table.
You can control what nodes the task is executed on by setting the list of nodes:
public static void example() throws ExecutionException, InterruptedException {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
String executionResult = client.compute().execute(
BroadcastJobTarget.nodes(node(0), node(1)),
JobDescriptor.builder(NodeNameJob.class).build(), null
);
System.out.println(executionResult);
}
ICompute compute = Client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
IBroadcastExecution<string> execution = await compute.SubmitBroadcastAsync(
BroadcastJobTarget.Nodes(nodes),
new JobDescriptor<object, string>("org.example.NodeNameJob"),
arg: "Hello");
foreach (IJobExecution<string> jobExecution in execution.JobExecutions)
{
string jobResult = await jobExecution.GetResultAsync();
Console.WriteLine($"Job result from node {jobExecution.Node}: {jobResult}");
}
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
.deployment_units({deployment_unit{"unitName", "1.1.1"}})
.build();
broadcast_execution execution = comp.submit_broadcast(broadcast_job_target::nodes(nodes), job_desc, {std::string("Hello")}, {});
for (auto &exec: execution.get_job_executions()) {
std::string result = exec.get_result()->get<std::string>();
}
Job Ownership
If the cluster has Authentication enabled, compute jobs are executed by a specific user. If user permissions are configured on the cluster, the user needs the appropriate distributed computing permissions to work with distributed computing jobs. Only users with JOBS_ADMIN
action can interact with jobs of other users.
Job Execution States
When using asynchronous API, you can keep track of the status of the job on the server and react to status changes. For example:
public static void example() throws ExecutionException, InterruptedException {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
JobExecution<String> execution = client.compute().submit(JobTarget.anyNode(client.clusterNodes()),
JobDescriptor.builder(NodeNameJob.class).build(), null
);
execution.stateAsync().thenApply(status -> {
if (status.toString() == "Failed") {
// Handle failure
}
return null;
});
System.out.println(execution.resultAsync().get());
}
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
IJobExecution<string> execution = await Client.Compute.SubmitAsync(
JobTarget.AnyNode(nodes),
new JobDescriptor<string, string>("org.example.NodeNameJob"),
arg: "Hello");
JobState? state = await execution.GetStateAsync();
if (state?.Status == JobStatus.Failed)
{
// Handle failure
}
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
.deployment_units({deployment_unit{"unitName", "1.1.1"}})
.build();
job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, {});
std::optional<job_status> status = execution.get_status();
if (status && status->state == job_state::FAILED)
{
// Handle failure
}
std::string result = execution.get_result()->get<std::string>();
Possible States and Transitions
The diagram below depicts the possible transitions of job statuses:

The table below lists the possible job statuses:
Status | Description | Transitions to |
---|---|---|
|
The job was created and sent to the cluster, but not yet processed. |
|
|
The job was added to the queue and waiting queue for execution. |
|
|
The job is being executed. |
|
|
The job was executed successfully and the execution result was returned. |
|
|
The job was unexpectedly terminated during execution. |
|
|
Job has received the cancel command, but is still running. |
|
|
Job was successfully cancelled. |
If all job execution threads are busy, new jobs received by the node are put into job queue according to their Job Priority. GridGain sorts all incoming jobs first by priority, then by the time, executing jobs queued earlier first.
Cancelling Executing Jobs
When the node receives the command to cancel the job in the Executing
status, it will immediately send an interrupt to the thread that is responsible for the job. In most cases, this will lead to the job being immediately canceled, however there are cases in which the job will continue. If this happens, the job will be in the Canceling
state. Depending on specific code being executed, the job may complete successfully, be canceled once the uninterruptible operation is finished, or remain in unfinished state (for example, if code is stuck in a loop). You can use the JobExecution.stateAsync()
method to keep track of what status the job is in, and react to status change.
To be able to cancel a compute job, you first create a cancel handler and retrieve a token from it. You can then use this token to cancel the compute job:
CancelHandle cancelHandle = CancelHandle.create(); CancellationToken cancelToken = cancelHandle.token(); CompletableFuture<Void> execution = client.compute().executeAsync(JobTarget.anyNode(client.clusterNodes()), JobDescriptor.builder(NodeNameJob.class).build(), cancelToken, null); cancelHandle.cancel();
Another way to cancel jobs is by using the SQL KILL COMPUTE command. The job id can be retrieved via the COMPUTE_JOBS
system view.
Job Configuration
Job Priority
You can specify a job priority by setting the JobExecutionOptions.priority
property. Jobs with a higher priority will be queued before jobs with lower priority (for exammple, a job with priority 4 will be executed before the job with priority 2).
public static void example() throws ExecutionException, InterruptedException {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
// Create job execution options
JobExecutionOptions options = JobExecutionOptions.builder().priority(1).build();
String executionResult = client.compute().execute(JobTarget.anyNode(client.clusterNodes()),
JobDescriptor.builder(NodeNameJob.class).options(options).build(), null
);
System.out.println(executionResult);
}
var options = JobExecutionOptions.Default with { Priority = 1 };
IJobExecution<string> execution = await Client.Compute.SubmitAsync(
JobTarget.AnyNode(await Client.GetClusterNodesAsync()),
new JobDescriptor<string, string>("org.example.NodeNameJob", Options: options),
arg: "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
.deployment_units({deployment_unit{"unitName", "1.1.1"}})
.build();
job_execution_options options{1, 0};
job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();
Job Retries
You can set the number the job will be retried on failure by setting the JobExecutionOptions.maxRetries
property. If set, the failed job will be retried the specified number of times before movbing to Failed
state.
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
.deployment_units({deployment_unit{"unitName", "1.1.1"}})
.build();
job_execution_options options{0, 5};
job_execution execution = comp.submit(job_target::any_node(nodes), job_desc, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();
var options = JobExecutionOptions.Default with { MaxRetries = 5 };
IJobExecution<string> execution = await Client.Compute.SubmitAsync(
JobTarget.AnyNode(await Client.GetClusterNodesAsync()),
new JobDescriptor<string, string>("org.example.NodeNameJob", Options: options),
arg: "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};
job_execution_options options{0, 5};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();
Job Failover
GridGain 9 implements mechanics to handle issues that happen during job execution. The following situations are handled:
Worker Node Shutdown
If the worker node is shut down, the coordinator node will redistribute all jobs assigned to worker to other viable nodes. If no nodes are found, the job will fail and an exception will be sent to the client.
Coordinator Node Shutdown
If the coordinator node shuts down, all jobs will be cancelled as soon as the node detects that the coordinator is shut down. Note that some jobs may take a long time to cancel.
Client Disconnect
If the client disconnects, all jobs will be cancelled as soon as the coordinator node detects the disconnect. Note that some jobs may take a long time to cancel.
Colocated Computations
In GridGain 9 you can execute colocated computation with colocated
job target. When you do it, the compute task is guaranteed to be executed on the nodes that hold the specified key. This can significantly reduce execution time if your tasks require data.
public static void example() throws ExecutionException, InterruptedException {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
String executionResult = client.compute()
.execute(
JobTarget.colocated("myTable", Tuple.create(Map.of("k", 1))),
JobDescriptor.builder(NodeNameJob.class).build(),
null
);
System.out.println(execution.resultAsync().get());
}
string table = "Person";
string key = "John";
IJobExecution<string> execution = await Client.Compute.SubmitAsync(
JobTarget.Colocated(table, key),
new JobDescriptor<string, string>("org.example.NodeNameJob"),
arg: "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::string table{"Person"};
std::string key{"John"};
// Unit `unitName:1.1.1` contains NodeNameJob class.
auto job_desc = job_descriptor::builder("org.company.package.NodeNameJob")
.deployment_units({deployment_unit{"unitName", "1.1.1"}})
.build();
job_execution execution = comp.submit(job_target::colocated(table, key), job_desc, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();
Alternatively, you can execute the compute job on all nodes in the cluster that hold partitions for the specified table by creating a BroadcastJobTarget.table()
target. In this case, GridGain will automatically find all nodes that hold data partitions for the specified table and execute the job on all of them.
The below example executes the same job on all nodes in the cluster that have partitions for the Person
table:
String executionResult = client.compute().execute(BroadcastJobTarget.table("Person"),
JobDescriptor.builder(NodeNameJob.class).build(), null
);
System.out.println(executionResult);
This API is not presently available for .NET.
This API is not presently available for C++.
MapReduce Tasks
GridGain 9 provides an API for performing MapReduce operations in the cluster. This allows you to split your computing task between multiple nodes before aggregating the result and returning it to the user.
Understanding MapReduce Tasks
A MapReduce task must be executed on a node that has a deployed class implementing the MapReduceTask
interface. This interface provides a way to implement custom map and reduce logic. A node that receives the task becomes a coordinator node, that will be responsible for both mapping tasks to other nodes, reducing their results and returning the final result to the client.
The class must implement two methods: splitAsync
and reduceAsync
.
The splitAsync()
method should be implemented to create compute jobs based on input parameters and map them to worker nodes. The method receives the execution context and your task arguments and returns a completable future containing the list of the job descriptors that will be sent to the worker nodes.
The reduceAsync()
method is called during the reduce step, when all the jobs have completed. The method receives a map from the worker node to the completed job result and returns the final result of the computation.
Creating a Mapper Class
All MapReduce jobs must be submitted to a node that has an appropriate class deployed. Below is an example of a
private static class MapReduceNodeNameTask implements MapReduceTask<String, Object, String, String> {
@Override
public CompletableFuture<List<MapReduceJob<Object, String>>> splitAsync(TaskExecutionContext context, String args) {
return completedFuture(context.ignite().clusterNodes().stream()
.map(node -> MapReduceJob.<Object, String>builder()
.jobDescriptor(JobDescriptor.builder(NodeNameJob.class).build())
.nodes(Set.of(node))
.args(args)
.build())
.collect(Collectors.toList()));
}
@Override
public CompletableFuture<String> reduceAsync(TaskExecutionContext context, Map<UUID, String> results) {
return completedFuture(results.values().stream()
.map(String.class::cast)
.collect(Collectors.joining(",")));
}
}
Executing a MapReduce Task
To execute the MapReduce task, you use one of the following methods:
-
submitMapReduce()
- sends the MapReduce job to the cluster and returns theTaskExecution
object that can be used to monitor or modify the compute task execution. -
executeMapReduceAsync()
- sends the MapReduce job to the cluster in the cluster and gets the future for job execution results. -
executeMapReduce()
- sends the job to the cluster and waits for the result of job execution.
The node that the MapReduce task is sent to must have a class implementing the MapReduceTask
interface.
public static void example() throws ExecutionException, InterruptedException {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
TaskDescriptor<String, String> taskDescriptor = TaskDescriptor.builder(MapReduceNodeNameTask.class).build();
String executionResult = client.compute()..executeMapReduce(taskDescriptor, null);
System.out.println(executionResult);
}
ICompute compute = Client.Compute;
var taskDescriptor = new TaskDescriptor<string, string>("com.example.MapReduceNodeNameTask");
ITaskExecution<string> exec = await compute.SubmitMapReduceAsync(taskDescriptor, "arg");
string result = await exec.GetResultAsync();
Console.WriteLine(result);
This API is not presently available for C++.
© 2025 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.