Distributed Computing
GridGain 9 provides an API for distributing computations across cluster nodes in a balanced and fault-tolerant manner. You can submit individual tasks for execution from Java and .NET clients.
You can use Java, .NET and C++ clients to execute compute jobs. Make sure the required classes are deployed to the cluster before executing code.
The example below assumes that the NodeNameJob
class has been deployed to the node by using code deployment.
private void example() {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
IgniteCompute compute = client.compute();
//Unit `unitName:1.1.1` contains NodeNameJob class.
List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")));
JobDescriptor descriptor = JobDescriptor.builder(NodeNameJob.class)
.units(units)
.build();
JobExecution<String> execution = compute.executeAsync(JobTarget.anyNode(cluster.clusterNodes()), descriptor, "Hello");
var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };
IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, JobExecutionOptions.Default, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();
Job Ownership
If the cluster has Authentication enabled, compute jobs are executed by a specific user. If user permissions are configured on the cluster, the user needs the appropriate distributed computing permissions to work with distributed computing jobs. Only users with JOBS_ADMIN
action can interact with jobs of other users.
Job Execution States
You can keep track of the status of the job on the server and react to status changes. For example:
private void example() {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
IgniteCompute compute = client.compute();
// Unit `unitName:1.1.1` contains NodeNameJob class.
JobDescriptor descriptor = JobDescriptor.builder(NodeNameJob.class)
.units(units)
.build();
JobExecution<String> execution = compute.executeAsync(JobTarget.anyNode(cluster.clusterNodes()), descriptor, "Hello");
execution.stateAsync().thenApply(status -> {
if (status.toString() == "Failed") {
// Handle failure
}
});
var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };
IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, JobExecutionOptions.Default, "Hello");
JobStatus? status = await execution.GetStatusAsync();
if (status?.State == JobState.Failed)
{
// Handle failure
}
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, {});
std::optional<job_status> status = execution.get_status();
if (status && status->state == job_state::FAILED)
{
// Handle failure
}
std::string result = execution.get_result()->get<std::string>();
Possible States and Transitions
The diagram below depicts the possible transitions of job statuses:
The table below lists the possible job statuses:
Status | Description | Transitions to |
---|---|---|
|
The job was created and sent to the cluster, but not yet processed. |
|
|
The job was added to the queue and waiting queue for execution. |
|
|
The job is being executed. |
|
|
The job was executed successfully and the execution result was returned. |
|
|
The job was unexpectedly terminated during execution. |
|
|
Job has received the cancel command, but is still running. |
|
|
Job was successfully cancelled. |
If all job execution threads are busy, new jobs received by the node are put into job queue according to their Job Priority. GridGain sorts all incoming jobs first by priority, then by the time, executing jobs queued earlier first.
Cancelling Executing Jobs
When the node receives the command to cancel the job in the Executing
status, it will immediately send an interrupt to the thread that is responsible for the job. In most cases, this will lead to the job being immediately canceled, however there are cases in which the job will continue. If this happens, the job will be in the Canceling
state. Depending on specific code being executed, the job may complete successfully, be canceled once the uninterruptible operation is finished, or remain in unfinished state (for example, if code is stuck in a loop). You can use the JobExecution.stateAsync()
method to keep track of what status the job is in, and react to status change.
Job Configuration
Job Priority
You can specify a job priority by setting the JobExecutionOptions.priority
property. Jobs with a higher priority will be queued before jobs with lower priority (for exammple, a job with priority 4 will be executed before the job with priority 2).
private void example() {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
IgniteCompute compute = client.compute();
// Create job execution options
JobExecutionOptions options = JobExecutionOptions.builder().priority(1).build();
// Unit `unitName:1.1.1` contains NodeNameJob class.
JobDescriptor descriptor = JobDescriptor.builder(NodeNameJob.class)
.units(units)
.options(options)
.build();
JobExecution<String> execution = compute.executeAsync(JobTarget.anyNode(cluster.clusterNodes()), descriptor, "Hello");
var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };
// Create job execution options
var options = JobExecutionOptions.Default with { Priority = 1 };
IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, options, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};
job_execution_options options{1, 0};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();
Job Retries
You can set the number the job will be retried on failure by setting the JobExecutionOptions.maxRetries
property. If set, the failed job will be retried the specified number of times before movbing to Failed
state.
private void example() {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
IgniteCompute compute = client.compute();
Set<ClusterNode> nodes = new HashSet<>(client.clusterNodes());
//Unit `unitName:1.1.1` contains NodeNameJob class.
List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")));
// Create job execution options
JobExecutionOptions options = JobExecutionOptions.builder().maxRetries(5).build();
JobExecution<String> execution = compute.executeAsync(nodes, units, NodeNameJob.class, options, "Hello");
var result = execution.resultAsync();
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
IList<IClusterNode> nodes = await Client.GetClusterNodesAsync();
// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };
// Create job execution options
var options = JobExecutionOptions.Default with { MaxRetries = 5 };
IJobExecution<string> execution = await compute.SubmitAsync<string>(nodes, units, NodeNameJob, options, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::vector<cluster_node> nodes = client.get_nodes();
// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};
job_execution_options options{0, 5};
job_execution execution = comp.submit(nodes, units, NODE_NAME_JOB, {std::string("Hello")}, std::move(options));
std::string result = execution.get_result()->get<std::string>();
Job Failover
GridGain 9 implements mechanics to handle issues that happen during job execution. The following situations are handled:
Worker Node Shutdown
If the worker node is shut down, the coordinator node will redistribute all jobs assigned to worker to other viable nodes. If no nodes are found, the job will fail and an exception will be sent to the client.
Coordinator Node Shutdown
If the coordinator node shuts down, all jobs will be cancelled as soon as the node detects that the coordinator is shut down. Note that some jobs may take a long time to cancel.
Client Disconnect
If the client disconnects, all jobs will be cancelled as soon as the coordinator node detects the disconnect. Note that some jobs may take a long time to cancel.
Colocated Computations
In GridGain 9 you can execute colocated computation with executeColocated
method. When you do it, the compute task is guaranteed to be executed on the nodes that hold the specified key. This can significantly reduce execution time if your tasks require data.
private void example() {
IgniteClient client = IgniteClient.builder().addresses("127.0.0.1:10800").build();
IgniteCompute compute = client.compute();
String table = "Person";
String key = "John";
//Unit `unitName:1.1.1` contains NodeNameJob class.
List<DeploymentUnit> units = List.of(new DeploymentUnit("unitName", Version.parseVersion("1.1.1")));
JobExecution<String> execution = compute.executeColocatedAsync(table, key, units, NodeNameJob.class, "Hello");
String result = execution.resultAsync().join()
}
IIgniteClient client = Client;
ICompute compute = client.Compute;
string table = "Person";
string key = "John";
// Unit `unitName:1.1.1` contains NodeNameJob class.
var units = new List<DeploymentUnit> { new DeploymentUnit("unitName", "1.1.1") };
IJobExecution<string> execution = await compute.SubmitColocatedAsync<string, string>(table, key, units, NodeNameJob, "Hello");
string result = await execution.GetResultAsync();
using namespace ignite;
compute comp = client.get_compute();
std::string table{"Person"};
std::string key{"John"};
// Unit `unitName:1.1.1` contains NodeNameJob class.
std::vector<deployment_unit> units{deployment_unit{"unitName", "1.1.1"}};
job_execution execution = comp.submit_colocated(table, key, units, NODE_NAME_JOB, {std::string("Hello")}, {});
std::string result = execution.get_result()->get<std::string>();
Executing Jobs on Nodes With Data
You can use the SQL api to run your compute jobs on nodes with data.
The example below finds all table partitions and then runs compute jobs with data from specific local partitions:
public class PartitionMapReduce implements MapReduceTask<List<DeploymentUnit>, String, String, Integer> {
@Override
public CompletableFuture<List<MapReduceJob<String, String>>> splitAsync(
TaskExecutionContext taskContext, List<DeploymentUnit> deploymentUnits) {
return taskContext.ignite().tables().table("TABLE_NAME").partitionManager().primaryReplicasAsync()
.thenApply(primaries -> primaries.entrySet().stream().map(entry ->
MapReduceJob.<String, String>builder()
.node(entry.getValue())
.args(entry.getKey().toString())
.build())
.collect(toList())
);
}
@Override
public CompletableFuture<Integer> reduceAsync(TaskExecutionContext taskContext, Map<UUID, String> results) {
// Custom reduce action.
}
private static class CustomJob implements ComputeJob<String, String> {
@Override
public CompletableFuture<String> executeAsync(JobExecutionContext context, String partition) {
// Here data read from KV will be local.
return context.ignite().tables().table("TABLE_NAME")
.keyValueView()
.queryAsync(null, Criteria.columnValue(
"__part",
Criteria.equalTo(partition.getBytes(UTF_8)))
)
.thenApply(CustomJob::customAction);
}
private static String customAction(AsyncCursor<Entry<Tuple, Tuple>> cursor) {
//Some actions with data.
return "";
}
}
}
© 2024 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.