GridGain Developers Hub
GitHub logo GridGain iso GridGain.com
GridGain Software Documentation

Working with Events

Overview

GridGain can generate events for a variety of operation happening in the cluster, and your application can get notified about those operations. The events include cache operations, discovery events, distributed task execution events, and many more.

Enabling Events

By default, events are disabled, and you have to enable each event type explicitly if you want to use it in your application. To enable specific event types, list them in the includeEventTypes property of IgniteConfiguration as shown below:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <!-- Enable cache events. -->
    <property name="includeEventTypes">
        <list>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_NODE_LEFT"/>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_NODE_JOINED"/>
        </list>
    </property>

    <!-- other configuration properties -->
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();

// Enable cache events.
cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_READ,
        EventType.EVT_CACHE_OBJECT_REMOVED, EventType.EVT_NODE_JOINED, EventType.EVT_NODE_LEFT);

// Start a node.
Ignite ignite = Ignition.start(cfg);
var cfg = new IgniteConfiguration
{
    IncludedEventTypes = new[]
    {
        EventType.CacheObjectPut,
        EventType.CacheObjectRead,
        EventType.CacheObjectRemoved,
        EventType.NodeJoined,
        EventType.NodeLeft
    }
};
var ignite = Ignition.Start(cfg);

Refer to the EventType javadoc for the complete list of events.

Getting the Events Interface

The events functionality is available through the events interface, which provides methods for listening to cluster events. The events interface can be obtained from an instance of Ignite as follows:

Ignite ignite = Ignition.ignite();

IgniteEvents events = ignite.events();
var ignite = Ignition.GetIgnite();
var events = ignite.GetEvents();

The events interface can be associated with a set of nodes. It means that it will be able to access the events that happen on the given set of nodes. In the following example, the events interface is obtained for the set of nodes that host the data for the Person cache.

Ignite ignite = Ignition.ignite();

IgniteEvents events = ignite.events(ignite.cluster().forCacheNodes("person"));
var ignite = Ignition.GetIgnite();
var events = ignite.GetCluster().ForCacheNodes("person").GetEvents();

Listening to Events

You can listen either to local or remote events. Local events are the events that are generated on the node where the listener is registered. Remote events are the events that happen on other nodes.

Note that some events may be fired on multiple nodes even if the corresponding real-world event happens only once. For example, when a node leaves the cluster, the EVT_NODE_LEFT event is generated on every remaining node.

Another example is when you put an object into a cache. In this case, the EVT_CACHE_OBJECT_PUT event occurs on the node that hosts the primary partition into which the object is actually written, which may be different from the node where the put(…​) method is called. In addition, the event will be fired on all nodes that hold the backup partitions for the cache if they are configured.

The events interface provides methods for listening to local events only, and for listening to both local and remote events.

Listening to Local Events

To listen to local events, use the localListen(listener, eventTypes…​) method, as shown below. The method accepts an event listener that is called every time an event of the given type occurs on the local node.

To unregister the local listener, return false in its functional method.

IgniteEvents events = ignite.events();

// Local listener that listens to local events.
IgnitePredicate<CacheEvent> localListener = evt -> {
    System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + ", oldVal=" + evt.oldValue()
            + ", newVal=" + evt.newValue());

    return true; // Continue listening.
};

// Subscribe to specified cache events occurring on the local node.
events.localListen(localListener, EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_READ,
        EventType.EVT_CACHE_OBJECT_REMOVED);

The event listener is an object of the IgnitePredicate<T> class with a type argument that matches the type of events the listener is going to process. For example, cache events (EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_READ, etc.) correspond to the CacheEvent class, discovery events (EVT_NODE_LEFT, EVT_NODE_JOINED, etc) correspond to the DiscoveryEvent class, and so on. If you want to listen to events of different types, you can use the generic Event interface:

IgnitePredicate<Event> localListener = evt -> {
    // process the event
    return true;
};
class LocalListener : IEventListener<CacheEvent>
{
    public bool Invoke(CacheEvent evt)
    {
        Console.WriteLine("Received event [evt=" + evt.Name + ", key=" + evt.Key + ", oldVal=" + evt.OldValue
                          + ", newVal=" + evt.NewValue);
        return true;
    }
}

public static void LocalListenDemo()
{
    var cfg = new IgniteConfiguration
    {
        IncludedEventTypes = new[]
        {
            EventType.CacheObjectPut,
            EventType.CacheObjectRead,
            EventType.CacheObjectRemoved,
        }
    };
    var ignite = Ignition.Start(cfg);
    var events = ignite.GetEvents();
    events.LocalListen(new LocalListener(), EventType.CacheObjectPut, EventType.CacheObjectRead,
        EventType.CacheObjectRemoved);

    var cache = ignite.GetOrCreateCache<int, int>("myCache");
    cache.Put(1, 1);
    cache.Put(2, 2);
}

Listening to Remote Events

The IgniteEvents.remoteListen(…​) method can be used to register a listener that will listen for both remote and local events. It accepts a local listener, a filter, and a list of event types you want to listen to.

The filter is deployed to all the nodes associated with the events interface, including the local node. The events that pass the filter will be sent to the local listener.

The method returns a unique identifier that can be used to unregister the listener and filters. To do this, call IgniteEvents.stopRemoteListen(uuid). Another way to unregister the listener is to return false in the apply() method.

IgniteEvents events = ignite.events();

IgnitePredicate<CacheEvent> filter = evt -> {
    System.out.println("remote event: " + evt.name());
    return true;
};

// Subscribe to specified cache events on all nodes that have cache running.
UUID uuid = events.remoteListen(new IgniteBiPredicate<UUID, CacheEvent>() {

    @Override
    public boolean apply(UUID uuid, CacheEvent e) {

        // process the event

        return true;
    }
}, filter, EventType.EVT_CACHE_OBJECT_PUT);
Unsupported

Batching Events

Each activity in a cache can result in an event notification being generated and sent. For systems with high cache activity, getting notified for every event could be network intensive, possibly leading to a decreased performance of cache operations.

Event notifications can be grouped together and sent in batches or timely intervals to mitigate the impact on performance. Here is an example of how this can be done:

Ignite ignite = Ignition.ignite();

// Get an instance of named cache.
final IgniteCache<Integer, String> cache = ignite.cache("cacheName");

// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
    @Override public boolean apply(CacheEvent evt) {
        System.out.println("Cache event: " + evt);

        int key = evt.key();

        return key >= 10;
    }
};

// Subscribe to cache events occurring on all nodes
// that have the specified cache running.
// Send notifications in batches of 10.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(
		10 /*batch size*/, 0 /*time intervals*/, false, null, rmtLsnr, EVTS_CACHE);

// Generate cache events.
for (int i = 0; i < 20; i++)
    cache.put(i, Integer.toString(i));
remoteListen(..) is unsupported in .NET

Storing and Querying Events

You can configure an event storage that will keep events on the nodes where they occur. You can then query events in your application.

The event storage can be configured to keep events for a specific period, keep only most recent events, or keep the events that satisfy a specific filter. See the MemoryEventStorageSpi javadoc for details.

Below is an example of event storage configuration:

<bean class="org.apache.ignite.configuration.IgniteConfiguration" id="ignite.cfg">

    <property name="eventStorageSpi" >
        <bean class="org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi">
            <property name="expireAgeMs" value="600000"/>
        </bean>
    </property>

</bean>
IgniteConfiguration igniteCfg = new IgniteConfiguration();
MemoryEventStorageSpi eventStorageSpi = new MemoryEventStorageSpi();
eventStorageSpi.setExpireAgeMs(600000);

igniteCfg.setEventStorageSpi(eventStorageSpi);

Ignite ignite = Ignition.start(igniteCfg);
var cfg = new IgniteConfiguration
{
    EventStorageSpi = new MemoryEventStorageSpi()
    {
        ExpirationTimeout = TimeSpan.FromMilliseconds(600000)
    },
    IncludedEventTypes = new[]
    {
        EventType.CacheObjectPut,
        EventType.CacheObjectRead,
        EventType.CacheObjectRemoved,
    }
};
var ignite = Ignition.Start(cfg);

Querying Local Events

The following example illustrates how you can query local EVT_CACHE_OBJECT_PUT events stored in the event storage.

Collection<CacheEvent> cacheEvents = events.localQuery(e -> {
    //process the event
    return true;
}, EventType.EVT_CACHE_OBJECT_PUT);
var events = ignite.GetEvents();
var cacheEvents = events.LocalQuery(EventType.CacheObjectPut);

Querying Remote Events

This is an example of querying remote events:

Collection<CacheEvent> storedEvents = events.remoteQuery(e -> {
    //process the event
    return true;
}, 0, EventType.EVT_CACHE_OBJECT_PUT);
class EventFilter : IEventFilter<CacheEvent>
{
    public bool Invoke(CacheEvent evt)
    {
        return true;
    }
}
// ....


    var events = ignite.GetEvents();
    var storedEvents = events.RemoteQuery(new EventFilter(), null, EventType.CacheObjectPut);