Distributed data structures (part 2)

In last week's post, "Distributed data structures: Part 1 (overview),"I talked about why you need distributed data structures (hereinafter - RSD) and disassembled several options offered by the distributed cache Apache Ignite. Today I want to talk about the details of the implementation of specific RSD, as well as a small educational program on distributed caches. To begin with, at least in the case of Apache Ignite, RDBs are not implemented from scratch, but are a superstructure over a distributed cache.

To support the operation of the RSD, two caches are used: one is Replicated and one is Partitioned.

Replicated-cache - in this case it is a system cache, ( ignite-sys-cache) responsible, including, for storing information about RSD registered in the system.

Partitioned-cache ( ignite-atomics-sys-cache) stores the data necessary for the operation of the RSD, and their state.

So, most of the RNC is created as follows:

  1. The transaction starts.
  2. In the cache ignite-sys-cache, by the key DATA_STRUCTURES_KEY, it is taken Map<Имя_РСД, DataStructureInfo>(it is created if necessary), and a new element with a description, for example, is added to it IgniteAtomicReference.
  3. In the cache ignite-atomics-sys-cache, by the key from the added previously DataStructureInfoadded the element responsible for the state of the RSD.
  4. The transaction commits.
  5.  

At the first request for creating an RSD, a new instance is created, and subsequent requests are received previously created.

 

IgniteAtomicReference and IgniteAtomicLong (short introduction)

The third step of initialization for both types is to add to ignite-atomics-sys-cacheobjects of type GridCacheAtomicReferenceValueor GridCacheAtomicLongValue.

Both classes contain one single field val.


Accordingly, any change IgniteAtomicReference:

// Изменим значение, если текущее соответствует ожидаемому.
ref.compareAndSet(expVal, newVal);

... this is the start EntryProcessorwith the following method code process:
 

EntryProcessor is ...
Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e, Object... args) {
    GridCacheAtomicReferenceValue<T> val = e.getValue();

    T curVal = val.get();

    // Переменные expVal и newVal  — параметры метода
    // ref.compareAndSet(expVal, newVal);
    if (F.eq(expVal, curVal)) {
        e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));

        return true;
    }

    return false;
}

IgniteAtomicLongis a de facto extension IgniteAtomicReference, so its method is compareAndSetimplemented in a similar way.
 

The method incrementAndGetdoes not have checks on the expected value, but simply adds one.

Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
    GridCacheAtomicLongValue val = e.getValue();

    long newVal = val.get() + 1;

    e.setValue(new GridCacheAtomicLongValue(newVal));

    return newVal;
}

IgniteAtomicSequence ( short introduction)

When you create each instance IgniteAtomicSequence...

// Создадим или получим ранее созданный IgniteAtomicSequence.
final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);

... it is allocated a pool of identifiers.

// Начинаем транзакцию
try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);

// Нижняя граница локального пула идентификаторов
locCntr = seqVal.get();

// Верхняя граница
upBound = locCntr + off;

seqVal.set(upBound + 1);

// Обновляем экземпляр GridCacheAtomicSequenceValue в кеше
dsView.put(key, seqVal);

// Завершаем транзакцию
tx.commit();

Accordingly, the call ...

seq.incrementAndGet(); 

... just increments the local counter until it reaches the upper limit of the value pool.
 

When the boundary is reached, a new identifier pool is allocated, similar to the way it happens when a new instance is created IgniteAtomicSequence.
 

IgniteCountDownLatch ( short introduction)


Counter decrement:

latch.countDown();

... is implemented as follows:

 // Начинаем транзакцию
 try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
    GridCacheCountDownLatchValue latchVal = latchView.get(key);

    int retVal;

    if (val > 0) {
        // Декрементируем значение
        retVal = latchVal.get() - val;

        if (retVal < 0)
            retVal = 0;
    }
    else
        retVal = 0;

    latchVal.set(retVal);

    // Сохраняем значение
    latchView.put(key, latchVal);

    // Завершаем транзакцию
    tx.commit();

    return retVal;
}

Waiting for decrementing the counter to 0 ...

latch.await();

... is implemented through the mechanism of Continuous Queries , that is, each change GridCacheCountDownLatchValuein the cache, all instances are IgniteCountDownLatchnotified of these changes.
 

Each instance IgniteCountDownLatchhas a local:

/** Internal latch (transient). */
private CountDownLatch internalLatch;

Each notification decrements internalLatchto the current value. Therefore, it latch.await()is very simple:

if (internalLatch.getCount() > 0)
    internalLatch.await();

IgniteSemaphore ( short introduction)
 

Getting permission...

semaphore.acquire();

... occurs as follows:

// Пока разрешение не будет получено
for (;;) {
    int expVal = getState();

    int newVal = expVal - acquires;

    try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ)) {
        GridCacheSemaphoreState val = semView.get(key);

        boolean retVal = val.getCount() == expVal;

        if (retVal) {
            // Сохраняем информацию о получивших разрешения.
            // В случае выхода из строя какой-либо node,
            // захваченные ею разрешения будут возвращены.
            {
                UUID nodeID = ctx.localNodeId();

                Map<UUID, Integer> map = val.getWaiters();

                int waitingCnt = expVal - newVal;

                if (map.containsKey(nodeID))
                    waitingCnt += map.get(nodeID);

                map.put(nodeID, waitingCnt);

                val.setWaiters(map);
            }

            // Устанавливаем новое значение
            val.setCount(newVal);

            semView.put(key, val);

            tx.commit();
        }

        return retVal;
    }
}

Returning permission ...

semaphore.release();

... occurs in a similar manner, except that the new value is greater than the current value.

int newVal = cur + releases;

IgniteQueue ( short introduction )
 

Unlike other RSD, IgniteQueuedoes not use ignite-atomics-sys-cacheThe cache used is described via the parameter colCfg.
 

// Создадим или получим ранее созданный IgniteQueue.
IgniteQueue<String> queue = ignite.queue("queueName", 0, colCfg);

Depending on the specified Atomicity Mode (TRANSACTIONAL, ATOMIC), you can get different options IgniteQueue.

queue = new GridCacheQueueProxy(cctx, cctx.atomic() ? 
    new GridAtomicCacheQueueImpl<>(name, hdr, cctx) : 
    new GridTransactionalCacheQueueImpl<>(name, hdr, cctx));

 

In both cases, the state is IgniteQueuecontrolled by:

class GridCacheQueueHeader{
   private long head;
   private long tail;
   private int cap;
... 

To add an element, use AddProcessor...

Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) {
    GridCacheQueueHeader hdr = e.getValue();

    boolean rmvd = queueRemoved(hdr, id);

    if (rmvd || !spaceAvailable(hdr, size))
        return rmvd ? QUEUE_REMOVED_IDX : null;

    GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
        hdr.capacity(),
        hdr.collocated(),
        hdr.head(),
        hdr.tail() + size, // Выделяем место под элемент
        hdr.removedIndexes());

    e.setValue(newHdr);

    return hdr.tail();
}

... which, in fact, simply moves the pointer to the tail of the queue.


After that...

// По ключу, сформированному на основе
// нового hdr.tail()
QueueItemKey key = itemKey(idx);

... a new element is added to the queue:

cache.getAndPut(key, item);

Deleting an element is similar, but the pointer changes not to tail, but to head...

GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
    hdr.capacity(),
    hdr.collocated(),
    hdr.head() + 1, // Двигаем указатель на голову
    hdr.tail(),
    null);

... and the item is deleted.

Long idx = transformHeader(new PollProcessor(id));

QueueItemKey key = itemKey(idx);

T data = (T)cache.getAndRemove(key);

The difference between GridAtomicCacheQueueImpland GridTransactionalCacheQueueImplis that:

 

  • GridAtomicCacheQueueImplwhen adding an element, first incrementally incrementally hdr.tail(), and then adds the element to the cache by the resulting index.
  • GridTransactionalCacheQueueImpl makes both actions within the same transaction.
     

As a result, it GridAtomicCacheQueueImplworks faster, but the data consistency problem may arise: if information about the size of the queue and the data itself is not stored at the same time, then they can not be deducted simultaneously.
 

It is quite possible that inside the method pollit is seen that the queue contains new elements, but the elements themselves are not yet there. This is extremely rare, but still possible.
 

This problem is solved by the timeout value waiting.

long stop = U.currentTimeMillis() + RETRY_TIMEOUT;

while (U.currentTimeMillis() < stop) {
    data = (T)cache.getAndRemove(key);

    if (data != null)
        return data;
}
A few words about the reliability of a non-transactional solution
 

Instead of concluding
 

I would like to note once again that a distributed cache is, in essence, ConcurrentHashMap within a set of computers that are clustered together.
 

Distributed caches can be used to implement many important, complex but reliable systems.
 

A particular case of implementation is distributed data structures, but in general they are used to store and process huge amounts of data in real time, with the possibility of increasing the volumes or speed of processing by simply adding new nodes.