GridGain Developers Hub

Distributed Computing

GridGain 9 provides an API for distributing computations across cluster nodes in a balanced and fault-tolerant manner. You can submit individual tasks for execution from Java and .NET clients.

You can use Java, .NET and C++ clients to execute compute jobs. Make sure the required classes are deployed to the cluster before executing code.

The example below assumes that the NodeNameJob class has been deployed to the node by using code deployment.

private void example() {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
    IgniteCompute compute = client.compute();

    //Unit `unitName:1.1.1` contains NodeNameJob class.
    List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")));

    JobDescriptor descriptor = JobDescriptor.builder(NodeNameJob.class)
        .units(units)
        .build();

    JobExecution<String> execution = compute.executeAsync(JobTarget.anyNode(cluster.clusterNodes()), descriptor, "Hello");
    var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();

// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };

IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, JobExecutionOptions.Default, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};

job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();

Job Ownership

If the cluster has Authentication enabled, compute jobs are executed by a specific user. If user permissions are configured on the cluster, the user needs the appropriate distributed computing permissions to work with distributed computing jobs. Only users with JOBS_ADMIN action can interact with jobs of other users.

Job Execution States

You can keep track of the status of the job on the server and react to status changes. For example:

private void example() {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
    IgniteCompute compute = client.compute();

    // Unit `unitName:1.1.1` contains NodeNameJob class.
    JobDescriptor descriptor = JobDescriptor.builder(NodeNameJob.class)
        .units(units)
        .build();

    JobExecution<String> execution = compute.executeAsync(JobTarget.anyNode(cluster.clusterNodes()), descriptor, "Hello");

    execution.stateAsync().thenApply(status -> {
       if (status.toString() == "Failed") {
        // Handle failure
       }
    });

    var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();

// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };

IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, JobExecutionOptions.Default, "Hello");

JobStatus? status = await execution.GetStatusAsync();

if (status?.State == JobState.Failed)
{
    // Handle failure
}

string result = await execution.GetResultAsync();
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};

job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, {});

std::optional<job_status> status = execution.get_status();
if (status && status->state == job_state::FAILED)
{
    // Handle failure
}
std::string result = execution.get_result()->get<std::string>();

Possible States and Transitions

The diagram below depicts the possible transitions of job statuses:

compute job statuses

The table below lists the possible job statuses:

Status Description Transitions to

Submitted

The job was created and sent to the cluster, but not yet processed.

Queued, Canceled

Queued

The job was added to the queue and waiting queue for execution.

Executing, Canceled

Executing

The job is being executed.

Canceling, Completed, Queued

Completed

The job was executed successfully and the execution result was returned.

Failed

The job was unexpectedly terminated during execution.

Queued

Canceling

Job has received the cancel command, but is still running.

Completed, Canceled

Canceled

Job was successfully cancelled.

If all job execution threads are busy, new jobs received by the node are put into job queue according to their Job Priority. GridGain sorts all incoming jobs first by priority, then by the time, executing jobs queued earlier first.

Cancelling Executing Jobs

When the node receives the command to cancel the job in the Executing status, it will immediately send an interrupt to the thread that is responsible for the job. In most cases, this will lead to the job being immediately canceled, however there are cases in which the job will continue. If this happens, the job will be in the Canceling state. Depending on specific code being executed, the job may complete successfully, be canceled once the uninterruptible operation is finished, or remain in unfinished state (for example, if code is stuck in a loop). You can use the JobExecution.stateAsync() method to keep track of what status the job is in, and react to status change.

Job Configuration

Job Priority

You can specify a job priority by setting the JobExecutionOptions.priority property. Jobs with a higher priority will be queued before jobs with lower priority (for exammple, a job with priority 4 will be executed before the job with priority 2).

private void example() {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
    IgniteCompute compute = client.compute();

    // Create job execution options
    JobExecutionOptions options = JobExecutionOptions.builder().priority(1).build();

    // Unit `unitName:1.1.1` contains NodeNameJob class.
    JobDescriptor descriptor = JobDescriptor.builder(NodeNameJob.class)
        .units(units)
        .options(options)
        .build();



    JobExecution<String> execution = compute.executeAsync(JobTarget.anyNode(cluster.clusterNodes()), descriptor, "Hello");
    var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();

// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };

// Create job execution options
var options = JobExecutionOptions.Default with { Priority = 1 };

IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, options, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};

job_execution_options options{1, 0};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();

Job Retries

You can set the number the job will be retried on failure by setting the JobExecutionOptions.maxRetries property. If set, the failed job will be retried the specified number of times before movbing to Failed state.

private void example() {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
    IgniteCompute compute = client.compute();
    Set<ClusterNode> nodes = new HashSet<>(client.clusterNodes());

    //Unit `unitName:1.1.1` contains NodeNameJob class.
    List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")));

    // Create job execution options
    JobExecutionOptions options = JobExecutionOptions.builder().maxRetries(5).build();

    JobExecution<String> execution = compute.executeAsync(nodes, units, NodeNameJob.class, options, "Hello");
    var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();

// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };

// Create job execution options
var options = JobExecutionOptions.Default with { MaxRetries = 5 };

IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, options, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;

compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();

// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};

job_execution_options options{0, 5};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();

Job Failover

GridGain 9 implements mechanics to handle issues that happen during job execution. The following situations are handled:

Worker Node Shutdown

If the worker node is shut down, the coordinator node will redistribute all jobs assigned to worker to other viable nodes. If no nodes are found, the job will fail and an exception will be sent to the client.

Coordinator Node Shutdown

If the coordinator node shuts down, all jobs will be cancelled as soon as the node detects that the coordinator is shut down. Note that some jobs may take a long time to cancel.

Client Disconnect

If the client disconnects, all jobs will be cancelled as soon as the coordinator node detects the disconnect. Note that some jobs may take a long time to cancel.

Colocated Computations

In GridGain 9 you can execute colocated computation with executeColocated method. When you do it, the compute task is guaranteed to be executed on the nodes that hold the specified key. This can significantly reduce execution time if your tasks require data.

private void example() {
    IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
    IgniteCompute compute = client.compute();
    String table = "Person";
    String key = "John";


    //Unit `unitName:1.1.1` contains NodeNameJob class.
    List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")));

    JobExecution<String> execution = compute.executeColocatedAsync(table, key, units, NodeNameJob.class, "Hello");
    String result = execution.resultAsync().join()
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
string table = "Person";
string key = "John";

// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };

IJobExecution<string> execution = await compute.SubmitColocatedAsync<string, string>(table, key, units, NodeNameJob, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;

compute comp = client.get_compute();
std::string table{"Person"};
std::string key{"John"};

// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};

job_execution execution = comp.submit_colocated(table, key, units, NODE_NAME_JOB, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();

Executing Jobs on Nodes With Data

You can use the SQL api to run your compute jobs on nodes with data.

The example below finds all table partitions and then runs compute jobs with data from specific local partitions:

public class PartitionMapReduce implements MapReduceTask<List<DeploymentUnit>, String, String, Integer> {
    @Override
    public CompletableFuture<List<MapReduceJob<String, String>>> splitAsync(
            TaskExecutionContext taskContext, List<DeploymentUnit> deploymentUnits) {
        return taskContext.ignite().tables().table("TABLE_NAME").partitionManager().primaryReplicasAsync()
                .thenApply(primaries -> primaries.entrySet().stream().map(entry ->
                        MapReduceJob.<String, String>builder()
                                .node(entry.getValue())
                                .args(entry.getKey().toString())
                                .build())
                        .collect(toList())
                );
    }

    @Override
    public CompletableFuture<Integer> reduceAsync(TaskExecutionContext taskContext, Map<UUID, String> results) {
        // Custom reduce action.
    }

    private static class CustomJob implements ComputeJob<String, String> {

        @Override
        public CompletableFuture<String> executeAsync(JobExecutionContext context, String partition) {
            // Here data read from KV will be local.
            return context.ignite().tables().table("TABLE_NAME")
                    .keyValueView()
                    .queryAsync(null, Criteria.columnValue(
                            "__part",
                            Criteria.equalTo(partition.getBytes(UTF_8)))
                    )
                    .thenApply(CustomJob::customAction);
        }


        private static String customAction(AsyncCursor<Entry<Tuple, Tuple>> cursor) {
            //Some actions with data.

            return "";
        }
    }
}