GridGain Developers Hub
GitHub logo GridGain iso GridGain.com
GridGain Software Documentation

Using Continuous Queries

A continuous query is a query that monitors data modifications occurring in a cache. Once a continuous query is started, you will get notified of all the data changes that fall into your query filter.

All update events will be propagated to the local listener that must be registered in the query. Continuous query implementation guarantees exactly once delivery of an event to the local listener.

You can also specify a remote filter to narrow down the range of entries that will be monitored for updates.

Initial Query

Whenever a continuous query is prepared for execution, you have an option to specify an initial query that is executed before the continuous query gets registered in the cluster and before you start to receive updates.

Just like scan queries, a continuous query is executed via the query() method that returns a cursor. When an initial query is set, you can use that cursor to iterate over the results of the initial query.

IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

// Setting an optional initial query.
// The query will return entries for the keys greater than 10.
query.setInitialQuery(new ScanQuery<>((k, v) -> k > 10));

//mandatory local listener
query.setLocalListener(events -> {
});

try (QueryCursor<Cache.Entry<Integer, String>> cursor = cache.query(query)) {
    // Iterating over existing data stored in the cache.
    for (Cache.Entry<Integer, String> e : cursor)
        System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
}
Unsupported
Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

// Custom listener
Listener<int32_t, std::string> listener;

// Declaring continuous query.
continuous::ContinuousQuery<int32_t, std::string> query(MakeReference(listener));

// Declaring optional initial query
ScanQuery initialQuery = ScanQuery();

continuous::ContinuousQueryHandle<int32_t, std::string> handle = cache.QueryContinuous(query, initialQuery);

// Iterating over existing data stored in the cache.
QueryCursor<int32_t, std::string> cursor = handle.GetInitialQueryCursor();

while (cursor.HasNext())
{
    std::cout << cursor.GetNext().GetKey() << std::endl;
}

Local Listener

When a cache gets modified (an entry is inserted, updated, or deleted), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.

Note that the continuous query will throw an exception if started without a local listener.

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

query.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {

    @Override
    public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
        throws CacheEntryListenerException {
        // react to the update events here
    }
});
class LocalListener : ICacheEntryEventListener<int, string>
{
    public void OnEvent(IEnumerable<ICacheEntryEvent<int, string>> evts)
    {
        foreach (var cacheEntryEvent in evts)
        {
            //react to update events here
        }
    }
}
public static void ContinuousQueryListenerDemo()
{
    var ignite = Ignition.Start(new IgniteConfiguration
    {
        DiscoverySpi = new TcpDiscoverySpi
        {
            LocalPort = 48500,
            LocalPortRange = 20,
            IpFinder = new TcpDiscoveryStaticIpFinder
            {
                Endpoints = new[]
                {
                    "127.0.0.1:48500..48520"
                }
            }
        }
    });
    var cache = ignite.GetOrCreateCache<int, string>("myCache");

    var query = new ContinuousQuery<int, string>(new LocalListener());

    var handle = cache.QueryContinuous(query);

    cache.Put(1, "1");
    cache.Put(2, "2");
}
/**
 * Listener class.
 */
template<typename K, typename V>
class Listener : public event::CacheEntryEventListener<K, V>
{
public:
    /**
     * Default constructor.
     */
    Listener()
    {
        // No-op.
    }

    /**
     * Event callback.
     *
     * @param evts Events.
     * @param num Events number.
     */
    virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
    {
        for (uint32_t i = 0; i < num; ++i)
        {
            std::cout << "Queried entry [key=" << (evts[i].HasValue() ? evts[i].GetKey() : K())
                << ", val=" << (evts[i].HasValue() ? evts[i].GetValue() : V()) << ']'
                << std::endl;
        }
    }
};

int main()
{
    IgniteConfiguration cfg;
    cfg.springCfgPath = "/path/to/configuration.xml";

    Ignite ignite = Ignition::Start(cfg);

    Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

    // Declaring custom listener.
    Listener<int32_t, std::string> listener;

    // Declaring continuous query.
    continuous::ContinuousQuery<int32_t, std::string> query(MakeReference(listener));

    continuous::ContinuousQueryHandle<int32_t, std::string> handle = cache.QueryContinuous(query);
}

Remote Filter

This filter is executed for each updated key and evaluates whether the update should be propagated to the query’s local listener. If the filter returns true, then the local listener will be notified about the update. Otherwise, the notification will be skipped.

For redundancy reasons, the filter is executed for both primary and backup versions (if backups are configured) of the key. Because of this, a remote filter can be used as a remote listener for update events.

ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

qry.setLocalListener(events ->
    events.forEach(event -> System.out.format("Entry: key=[%s] value=[%s]\n", event.getKey(), event.getValue()))
);

qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
    @Override
    public CacheEntryEventFilter<Integer, String> create() {
        return new CacheEntryEventFilter<Integer, String>() {
            @Override
            public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
                System.out.format("the value for key [%s] was updated from [%s] to [%s]\n", e.getKey(), e.getOldValue(), e.getValue());
                return true;
            }
        };
    }
});
class LocalListener : ICacheEntryEventListener<int, string>
{
    public void OnEvent(IEnumerable<ICacheEntryEvent<int, string>> evts)
    {
        foreach (var cacheEntryEvent in evts)
        {
            //react to update events here
        }
    }
}
class RemoteFilter : ICacheEntryEventFilter<int, string>
{
    public bool Evaluate(ICacheEntryEvent<int, string> e)
    {
        if (e.Key == 1)
        {
            return false;
        }
        Console.WriteLine("the value for key {0} was updated from {1} to {2}", e.Key, e.OldValue, e.Value);
        return true;
    }
}
public static void ContinuousQueryFilterDemo()
{
    var ignite = Ignition.Start(new IgniteConfiguration
    {
        DiscoverySpi = new TcpDiscoverySpi
        {
            LocalPort = 48500,
            LocalPortRange = 20,
            IpFinder = new TcpDiscoveryStaticIpFinder
            {
                Endpoints = new[]
                {
                    "127.0.0.1:48500..48520"
                }
            }
        }
    });
    var cache = ignite.GetOrCreateCache<int, string>("myCache");

    var query = new ContinuousQuery<int, string>(new LocalListener(), new RemoteFilter());

    var handle = cache.QueryContinuous(query);

    cache.Put(1, "1");
    cache.Put(2, "2");
}
template<typename K, typename V>
struct RemoteFilter : event::CacheEntryEventFilter<int32_t, std::string>
{
    /**
     * Default constructor.
     */
    RemoteFilter()
    {
        // No-op.
    }

    /**
     * Destructor.
     */
    virtual ~RemoteFilter()
    {
        // No-op.
    }

    /**
     * Event callback.
     *
     * @param event Event.
     * @return True if the event passes filter.
     */
    virtual bool Process(const CacheEntryEvent<K, V>& event)
    {
        std::cout << "The value for key " << event.GetKey() <<
            " was updated from " << event.GetOldValue() << " to " << event.GetValue() << std::endl;
        return true;
    }
};

namespace ignite
{
    namespace binary
    {
        template<>
        struct BinaryType< RemoteFilter<int32_t, std::string> >
        {
            static int32_t GetTypeId()
            {
                return GetBinaryStringHashCode("RemoteFilter<int32_t,std::string>");
            }

            static void GetTypeName(std::string& dst)
            {
                dst = "RemoteFilter<int32_t,std::string>";

            }

            static int32_t GetFieldId(const char* name)
            {
                return GetBinaryStringHashCode(name);
            }

            static bool IsNull(const RemoteFilter<int32_t, std::string>&)
            {
                return false;
            }

            static void GetNull(RemoteFilter<int32_t, std::string>& dst)
            {
                dst = RemoteFilter<int32_t, std::string>();
            }

            static void Write(BinaryWriter& writer, const RemoteFilter<int32_t, std::string>& obj)
            {
                // No-op.
            }

            static void Read(BinaryReader& reader, RemoteFilter<int32_t, std::string>& dst)
            {
                // No-op.
            }
        };
    }
}

int main()
{
    IgniteConfiguration cfg;
    cfg.springCfgPath = "/path/to/configuration.xml";

    // Start a node.
    Ignite ignite = Ignition::Start(cfg);

    // Get binding.
    IgniteBinding binding = ignite.GetBinding();

    // Registering remote filter.
    binding.RegisterCacheEntryEventFilter<RemoteFilter<int32_t, std::string>>();

    // Get cache instance.
    Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

    // Declaring custom listener.
    Listener<int32_t, std::string> listener;

    // Declaring filter.
    RemoteFilter<int32_t, std::string> filter;

    // Declaring continuous query.
    continuous::ContinuousQuery<int32_t, std::string> qry(MakeReference(listener), MakeReference(filter));
}

Remote Transformer

By default, continuous queries send the whole updated object to the local listener. This can lead to excessive network usage, especially if the object is very large. Moreover, applications often need only a subset of fields of the object.

To address these cases, you can use a continuous query with a transformer. A transformer is a function that is executed on remote nodes for every updated object and sends back only the results of the transformation.

IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");

// Create a new continuous query with a transformer.
ContinuousQueryWithTransformer<Integer, Person, String> qry = new ContinuousQueryWithTransformer<>();

// Factory to create transformers.
Factory factory = FactoryBuilder.factoryOf(
    // Return one field of a complex object.
    // Only this field will be sent over to a local listener.
    (IgniteClosure<CacheEntryEvent, String>)
        event -> ((Person)event.getValue()).getName()
);

qry.setRemoteTransformerFactory(factory);

// Listener that will receive transformed data.
qry.setLocalListener(names -> {
    for (String name : names)
        System.out.println("New person name: " + name);
});
Unsupported

Examples

The following application examples show the typical usage of continuous queries.