GridGain Developers Hub
GitHub logo GridGain iso GridGain.com
GridGain Software Documentation

Collocating Computations with Data

Collocated computation is type of distributed data processing wherein the computational task you want to perform over a specific data set is sent to the nodes where the required data is located and only the results of the computations are sent back. This approach minimizes data transfer between nodes and can significantly reduce the task execution time.

GridGain provides several ways to perform collocated computations, all of which use the affinity function to determine the location of the data.

The compute interface provides the affinityCall(…​) and affinityRun(…​) methods that collocate a task with data either by key or by partition.

Collocating by Key

To send a computational task to the node where a given key is located, use the following methods:

  • IgniteCompute.affinityCall(String cacheName, Object key, IgniteCallable<R> job)

  • IgniteCompute.affinityRun(String cacheName, Object key, IgniteRunnable job)

GridGain will call the configured affinity function to determine the location of the given key.

IgniteCache<Integer, String> cache = ignite.cache("myCache");

IgniteCompute compute = ignite.compute();

int key = 1;

// This closure will execute on the remote node where
// data for the given 'key' is located.
compute.affinityRun("myCache", key, () -> {
    // Peek is a local memory lookup.
    System.out.println("Co-located [key= " + key + ", value= " + cache.localPeek(key) + ']');
});

class MyComputeAction : IComputeAction
{
    [InstanceResource] private readonly IIgnite _ignite;

    public int Key { get; set; }

    public void Invoke()
    {
        var cache = _ignite.GetCache<int, string>("myCache");
        // Peek is a local memory lookup
        Console.WriteLine("Co-located [key= " + Key + ", value= " + cache.LocalPeek(Key) + ']');
    }
}

public static void AffinityRunDemo()
{
    var cfg = new IgniteConfiguration();
    var ignite = Ignition.Start(cfg);

    var cache = ignite.GetOrCreateCache<int, string>("myCache");
    cache.Put(0, "foo");
    cache.Put(1, "bar");
    cache.Put(2, "baz");
    var keyCnt = 3;

    var compute = ignite.GetCompute();

    for (var key = 0; key < keyCnt; key++)
    {
        // This closure will execute on the remote node where
        // data for the given 'key' is located.
        compute.AffinityRun("myCache", key, new MyComputeAction {Key = key});
    }
}

Collocating by Partition

The affinityCall(Collection<String> cacheNames, int partId, IgniteRunnable job) and affinityRun(Collection<String> cacheNames, int partId, IgniteRunnable job) send a given task to the node where the partition with a given ID is located. This is useful when you need to retrieve objects for multiple keys and you know that the keys belong to the same partition. In this case, you can create one task instead of multiple task for each key.

For example, let’s say you want to calculate the arithmetic mean of a specific field for a specific subset of keys. If you want to distribute the computation, you can group the keys by partitions and send each group of keys to the node where the partition is located to get the values. The number of groups and, therefore, the number of tasks will be no more than the total number of partitions (default is 1024). Below is a code snippet that illustrates this example.

// this task sums up the values of the salary field for the given set of keys
private static class SumTask implements IgniteCallable<BigDecimal> {
    private Set<Long> keys;

    public SumTask(Set<Long> keys) {
        this.keys = keys;
    }

    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public BigDecimal call() throws Exception {

        IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();

        BigDecimal sum = new BigDecimal(0);

        for (long k : keys) {
            BinaryObject person = cache.localPeek(k, CachePeekMode.PRIMARY);
            if (person != null)
                sum = sum.add(new BigDecimal((float) person.field("salary")));
        }

        return sum;
    }
}

public static void calculateAverage(Ignite ignite, Set<Long> keys) {

    //get the affinity function configured for the cache
    Affinity<Long> affinityFunc = ignite.affinity("person");

    //this map stores collections of keys for each partition
    HashMap<Integer, Set<Long>> partMap = new HashMap<>();
    keys.forEach(k -> {
        int partId = affinityFunc.partition(k);

        Set<Long> keysByPartition = partMap.computeIfAbsent(partId, key -> new HashSet<Long>());
        keysByPartition.add(k);
    });

    BigDecimal total = new BigDecimal(0);

    IgniteCompute compute = ignite.compute();

    List<String> caches = Arrays.asList("person");

    //iterate over all partitions
    for (Map.Entry<Integer, Set<Long>> pair : partMap.entrySet()) {
        //send a task that gets specific keys for the partition
        BigDecimal sum = compute.affinityCall(caches, pair.getKey().intValue(), new SumTask(pair.getValue()));
        total = total.add(sum);
    }

    System.out.println("the average salary is " + total.floatValue() / keys.size());
}
affinityCall(..) method with partition id as parameter is unsupported in Ignite .NET

If you want to process all the data in the cache, you can iterate over all cache partitions and send tasks that process the data stored on each individual partition.

//this task sums up the value of the 'salary' field for all objects stored in the given partition
private static class SumTask implements IgniteCallable<BigDecimal> {
    private int partId;

    public SumTask(int partId) {
        this.partId = partId;
    }

    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public BigDecimal call() throws Exception {
        //use binary objects to avoid deserialization
        IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();

        BigDecimal total = new BigDecimal(0);
        try (QueryCursor<Cache.Entry<Long, BinaryObject>> cursor = cache
                .query(new ScanQuery<Long, BinaryObject>(partId).setLocal(true))) {
            for (Cache.Entry<Long, BinaryObject> entry : cursor) {
                total = total.add(new BigDecimal((float) entry.getValue().field("salary")));
            }
        }

        return total;
    }
}

Entry Processor

An entry processor is used to process cache entries on the nodes where they are stored and return the result of the processing. With an entry processor, you do not have to transfer the entire object to perform an operation with it, you can perform the operation remotely and only transfer the results.

If an entry processor sets the value for an entry that does not exist, the entry will be added to the cache.

Entry processors are executed atomically within a lock on the given cache key.

IgniteCache<String, Integer> cache = ignite.cache("mycache");

// Increment the value for a specific key by 1.
// The operation will be performed on the node where the key is stored.
// Note that if the cache does not contain an entry for the given key, it will be created.
cache.invoke("mykey", (entry, args) -> {
    Integer val = entry.getValue();

    entry.setValue(val == null ? 1 : val + 1);

    return null;
});
void CacheInvoke()
{
    var ignite = Ignition.Start();

    var cache = ignite.GetOrCreateCache<int, int>("myCache");

    var proc = new Processor();

    // Increment cache value 10 times
    for (int i = 0; i < 10; i++)
        cache.Invoke(1, proc, 5);
}

class Processor : ICacheEntryProcessor<int, int, int, int>
{
    public int Process(IMutableCacheEntry<int, int> entry, int arg)
    {
        entry.Value = entry.Exists ? arg : entry.Value + arg;

        return entry.Value;
    }
}