Using the initial-query, listener, and remote-filter features of Ignite continuous queries to detect, filter, process, and dispatch real-time events
Real-time handling of streams of business events is a critical part of modern information-management systems, including online transaction processing (OLTP) and online analytical processing (OLAP) systems. This capability is a core part of the event stream processing (ESP) and digital integration hub (DIH) architectures that we reviewed in parts one and two of this series. In this article, we review the APIs and the basic operation of the Apache® Ignite® continuous query facility and build a sample solution that demonstrates the facility’s part within a broader, real-time, digital integration hub architecture that is running as an ESP.
Event stream processing (ESP) solutions need protocols in order to receive events and event-handling facilities in order to take advantage of events in an efficient, real-time manner. The Ignite continuous query facility is one of several event-based detection and processing facilities that are available to developers, with another being the Cache Interceptor facility.
Target Business Solution
In this article, we look at a solution that responds to streaming sales orders to update the Customer360 analytic table in real time. This capability enables OLAP to be completely up to date and to support real-time analysis.
The solution is comprised of the following elements:
Storage: Order, OrderDetail (not shown), Customer (not shown), and Customer360 tables to hold the
transaction and analytic data of the solution
Data Ingestion: An Ignite client that receives data from the source ordering system and uses any one of multiple APIs to insert, put, or stream data into a table or cache
Continuous Query: A continuous query service that reads the Order table and initiates an update listener that defines a function that takes new orders and updates the Customer360 table with the updated information
Continuous Query and Related API
Apache Ignite continuous queries are documented on the Ignite site here. The site concisely describes the queries as “continuously obtain real-time query results” and “...you will get notified of all the data changes that fall into your query filter if any.” The following sections describe first the core class and then the APIs that enable continuous query operation.
Ignite continuous query functionality is available via the ContinuousQuery class. The ContinuousQuery class provides the full class lifecycle—from initialization, through execution, to shutdown. Four key elements work with and provide the functionality of continuous query.
A continuous query operates for as long as the query is active, working with data changes as they occur. The “continuous” activity begins at the time of the initial query. Data entries that are in the solution at the time of the initial query can be dealt with before the process of handling new and newly changed entries begins.
The initial query can be any of the three core Ignite queries: cache-based scan queries, table-based SQL queries, and search index-based text queries, as documented here.
*** The name “InitialQuery” might seem to indicate that the query has only one function—to retrieve rows before the query process becomes continuous. However, the InitialQuery object performs a second function - it returns the query results continuously after the initial query is initially executed.
The continuous query’s remote filter is a function that returns a Boolean result for each updated entry. If the filter evaluates the entry as “true,” the entry is forwarded to the local listener for processing. However, this behavior affects only the steady-state, new, incremental records that occur after the initial query is executed and for which the local listener is the controlling mechanism that deals with entries.
The local listener is the logic that is applied to the iterative cursor and to each entry that is delivered by the data node and that passes the remote filter expression.
Entries are selected by the initial query, updated, and passed through the remote filter. Then, the entries are passed back to the local listener. However, an optional, intermediate step is available. The remote transformer can be used to enable on-node manipulation and transformation of the changed entries before they are returned to the local listener—to minimize the volume of data that is sent over the wire or to adjust or enrich the data that is relevant to the new event.
The continuous query-based solution is patterned on the Ignite digital integration hub, as shown in the following diagram:
The continuous query-based solution leverages continuous queries to handle streaming events, associated processing, and dispatching. Therefore, the solution is an Event Stream Processing solution.
The components of the continuous query-based solution operate in the sequence that is indicated in the following diagram:
The process as defined by the continuous query facility, its configuration and the coding choices made, consists of three distinct phases and includes 11 steps.
- Set InitialQuery: The initial query is set on an instance of the ContinuousQuery. This query will operate on the data nodes.
- Set RemoteFilter (optional): If set, a remote filter is attached to the InitialQuery, and will filter any data that the InitialQuery selects during the initial data handling stage.
- Set LocalListener: The logic of the LocalListener is set. This specified logic will be invoked on any new data entries received and passing the optional remote filter during the steady state handling stage. Options available include GridGain Compute, Service, ML, Rule, or any other Java function.
Initial Data Handling
- Execute InitialQuery: Executing the InitialQuery accounts for data that exists in the distributed cluster prior to the steady-state, continuous-query operation stage begins and continues.
- Handle Initial Data: Data retrieved at the initial data handling stage can be processed by a number of actions to operate on the return data, including GridGain Compute, Service, ML, Rule, or any other Java function. We must specify and code how we process these data entries.
- Optional Result - Data Egress: The result of processing initial data may be of interest to external systems; in this diagram step 5’s initial data handling routine has chosen to send out the data.
Steady State Handling
- Business Change: When changes of interest occur in the source system, data is ingested into an Ignite table or cache (ingestion details ignored - see my articles on Ignite Data Loading).
- Remote Filter Evaluated (optional): The ingested change data causes the continuous query facility to invoke a RemoteFilter expression. Entries that evaluate to “false” are ignored and processing stops.
- Entries Delivered to LocalListener: The LocalListener is invoked for all change data delivered (i.e. all change data that is not filtered out by any prior remote filter).
- LocalListener Actions: The LocalListener, as setup in step 3, is invoked for each changed data-event record.
- Optional Result - Internal Update: The result of processing steady state change data may be of interest to other Ignite caches and tables; in this diagram step 10’s steady state data handling routine has chosen to update internal caches or tables.
Note on Optional Results (Steps 6 and 11): changes detected and processed in the initial data handling stage step #5, or steady state handling stage step #10 can be implemented as GridGain Compute, Service, ML, Rule or other Java code. These implementations may do virtually anything, with two options shown here as initial data egress or steady state internal update. Often these actions would be the same (i.e. whether the change occurred before steady state, or during steady state is likely of no consequence to whether the internal or external system are interested in the updates). The two choices shown here are just illustrative of two possible options.
In this article, we focused on how Apache Ignite and its continuous query facility supports real-time event stream processing. We identified the components that comprise the continuous query facility and detailed the operation of the components—from event-data arrival; through event processing; and, optionally, to data egress. By combining the data-ingest facilities of the digital integration hub with continuous-query facilities that query, filter, transform, and execute on change data events as they occur, we can create an event stream processing solution of any complexity. Using Apache Ignite’s horizontal scalability and distributed, co-located compute, we can deploy solutions that can scale to meet our business challenges.