Building an Event Stream Processing Solution With Apache Ignite
In the first article of this three part series, we talked about streaming systems, the associated event paradigm inherent in streams and how these concepts are seen at different levels of abstraction, the characteristics of systems that work with streams and the capabilities a Event Stream Processing (ESP) solution should have. In this second article, we will go into the Apache Ignite facilities that support construction and delivery of ESP solutions and the architecture that best fits delivering an Ignite-based ESP solution.
Stream processing is the exchange of information in real-time between producers and consumers. The information exchange may be at different levels of abstraction, at the lowest level a stream of bytes, or at the highest level the stream of business messages as determined by the producers’ sending components and the receivers’ receiving components. This real-time exchange of messages can deliver the smallest part of a business event, i.e. a “notification”, or it can deliver a fully contextualized, “full state transfer” business event. Information beyond business state change can use streaming or event-based communication (like system commands), but we will disregard those aspects here. The essential capabilities that an ESP solution needs are:
Adapters or connectors that can receive events, subscribe to and receive events, or synthetically poll for events from source systems. This should be a long list and include components across the modern technology landscape, from pure technology/communication protocol adapters like HTTP, TCP, JDBC/ODBC, SMTP, AS2, etc. to messaging platforms like Kafka, JMS, MQTT, MQ, etc. and to specific application systems like SAP ERP, Siebel, JDEdwards, and Microsoft Dynamics. Features like chunking, batching, and other performance features can be important for advanced solution delivery.
Apache Ignite comes with a wide range of streaming components to deliver this capability, as seen on the Data Loading & Streaming page.
Event Filtering & Semantics
The ability to define or filter out from the entire stream those characteristics that make up an event of significance. This can include the ability to define context relative to individual or related events. This could be as simple as subscribing to a particular subject, like “foreign exchange trade type”, or applying a filtering predicate like “Orders greater than or equal to $10,000”. It could also be more complicated and include event-based predicates like “WITHIN” as in “Orders WITHIN the last 4 hours having count(Country) Greater than 1”.
Apache Ignite provides a Continuous Query capability for registering to, filtering for and processing information injected into the storage layer of the cluster. Ignite also provides a proactive Cache Interceptor API to enable change detection, comparison and rules interjection.
Event Processing & Event Dispatching
When an event of interest is received from a source system, the solution must be able to handle these events in a number of ways that best fit the desired solution goals. Example goals may be:
- Simply persist the event data using the SQL or Cache APIs
- Enrich the event data using any manner or enrichment processes then persisting
- Invoke specific business services (e.g. call the Ignite Compute Grid, or the Ignite Service Grid)
- Run machine learning algorithms to classify, predict or forecast complex (e.g. ML or DL algorithms’ classify, predict or forecast methods)
- Run business rules, via simple logic, expressions and compute, or rules frameworks like Drools
- Generate composite events from basic business events via message dispatching
Digital Integration Hub and Streaming
The Digital Integration Hub is often thought of as supporting APIs and Microservices consumers; however, the bottom four layers provide storage and computing capabilities and are supported by integration facilities to interact with source and target systems. In an ESP-based solution, those integration components must support Stream components (protocols, connectors, parsers, etc.) and they must be distinguished between their event reception and their eventual system egress. Finally, the core storage and compute facilities in the “Hub” must accommodate streaming features to fully capture, enrich, compute, process and become a producer of its own enhanced business events to dispatch to target systems. The following diagram reflects those features and capabilities.
Apache Ignite Event Stream Processing Solution
Apache Ignite has the facilities necessary for Event Stream Processing, and these facilities can be assembled into an architecture that meets the overall goals of many business systems. The Ignite Digital Integration Hub architecture supports both business and technical message exchange, plus the orchestration or choreography of event-based, rules processing, complex event processing (CEP) and outbound event dispatching or message delivery to target systems.
Apache Ignite Streaming
The Apache Ignite portion of the above components can be seen as this part of the overall system landscape:
In this picture with Ignite acting as a Stream Data Consumer, we have the following components:
Protocol Listener - receives data from a send (not shown), and handles the network communication or “wire protocol”, like TCP sockets or a higher order HTTP connection, etc. The data received from the wire protocol may, if the protocol and the listener implement it, be organized into a single stream or aggregation element (like HTTP Chunks). The data will include an envelope and payload within it. The enveloped payload may be one message or may wrap several messages together. We can categorize these into two groups:
- Business Message Stream - the protocol listener splits the received data into the discrete messages (think of a logical “stream of messages”), or
- Technical Message Stream - the protocol listener delivers a technical message stream (e.g. Java IO Stream object or a Java NIO channel based object) after removing the transport protocol elements.
Stream Consumer / Parser - Whether the message stream has been split into individual “messages” or is a stream of messages, a parser (or a series of parsers) must process, or extract the business content from the message. The business content or business message can be a single object (think Java Object), or it may be a collection of characters that are meaningful to the business user/application. The Parsers must generate this business message from the supplied stream and are typically either:
- Document Object Model (DOM) based Parser - process the entire message as a complete document based on some document model and then extract some or all of elements of interest, as a single message or multiple messages; DOM parsers typically consume more memory but are conceptually simpler and easier to implement - “it is easier to understand what grandma was saying when you read the entire letter”.
- Steam Event-based Parser - process the message stream and on finding specific occurrences within the stream generate specific business messages, and continue on reading the stream. Continued reading may generate more messages. An example of a Stream Event-based parser is the Streaming API for XML (StAX)). Stream parsers benefit from a smaller memory footprint and speed because messages can be dispatched for processing immediately on receiving the necessary data. However, individual portions of a message may comprise many parts of a stream - e.g. parts from the header, parts from the message sub-group, etc. This adds complexity to parsing a stream AND may inhibit some streams from even being processed in this way. For example, if information in a business message is needed from the stream footer, one cannot dispatch the event until reading the entire stream (i.e. just like a DOM parser).
In receiving data over a wire protocol, in extracting message payload(s) from inside a message envelope, in parsing the structure of the technical message payload into a business message, useful information may be obtained at each level or step of processing (e.g. protocol source address, message receive id, message receive timestamp, message sequence, payload order and group count, stream size, message size, message aggregates, etc.). Some of this information may be kept with the eventual business message or reserved for the prior processing agent.
These first two parts of the stream data consumer may be together in a single component, like the Ignite Kafka Streamer or Ignite MQTT Streamer, which handles both the network communication to the Kafka or MQTT broker, and handles the message enveloping and ordering to deliver a business message to the consumer.
Equally, the first two parts of the inbound pipeline may be composed of multiple components, like an HTTP Listener, a streaming Line parser, a character separator parser (i.e. CSV parser), etc.
Message Consumer - Having received one or multiple business messages in a form that can be readily handled (e.g. a POJO, or collections of values, or maps, etc.), a consumer that is interested in these messages must determine what is to be done with the information and how to process this business data. The message consumer may:
- Publish a message to a service bus
- Call a microservice
- Process an API
- Store the data
- Process Event
- Some or all of the above
Apache Ignite Real-Time Event Stream Processing (ESP) Architecture
In the Data Ingest and Data Egress integration layers, there are interactions between source systems and target systems that are best implemented with a real-time exchange of messages and that streaming is the best metaphor for describing. Looking at the inbound and outbound pipelines and event stream processing components, we see these elements for each business channel:
The Apache Ignite Stream Solution can be thought of as a particular version of the Ignite Digital Integration Hub with a focus on those components handling streaming message ingest and streaming message egress plus those internal components to process the stream and create events.
Apache Ignite Ingestion APIs
If we focus on the Data Ingest Integration Layer and the APIs that Ignite provides for integration into the cluster, we see the core APIs available are organized by type: Data (KeyValue, SQL, DataStreamer) in blue, messaging in orange, and compute/service CLI in green:
- Key-Value API - this provides the core transactional Key-Value data storage API that has the richest capability for storing and storing within transaction contexts.
- SQL API - this provides a SQL Data Manipulation Language (SQL DML) interface that is streamable and supports common set-based processing and tooling that is JDBC/ODBC based.
- DataStreamer - this is an extension of the Key-Value API but foregoes transactionality for batched streaming for the fastest loading performance.
- Queue-based Messaging - this directed, point-to-point communication model enables messages to be exchanged in an ordered or unordered fashion.
- Topic-based Messaging - this publish & subscribe (pub/sub) communication model enables messages to be exchanged between Ignite Client and Cluster in an ordered or unordered fashion.
- Compute CLI - provides methods for running many types of computations over nodes in a cluster to execute Tasks or Closures in a distributed fashion.
- Service CLI - provides for invocation of user-defined services on the cluster with node singleton and cluster singleton patterns and features like automated deployment, continuous availability and proxying for remote execution.
Apache Ignite Event Handling
Data ingestion into the cluster initiates cluster storage and processing, this Event Dispatching and Data Egress is handled by several cluster facilities as shown in the following diagram:
When data is sent into the cluster using one of the above APIs, we (usually) store the data into a table or cache. However, there is an option to go straight from a service to data egress (not shown in the diagram). There are many approaches to dispatching events from tables or caches, with the three main approaches being:
- Cache Interceptor - this component will be fired anytime new or changed data is stored to the cache, enabling delta comparison and data modification.
- Entry Processor - where code associated to a cache entry is invoked with the arrival of data.
- Continuous Query - a selection filter that expresses a particular data set of interest. When new entries arrive that meet the query expression filter, the new entries are emitted to the query listener.
Data egress may take many forms and may require processing logic to be accomplished. The above Cache Interceptor, Cache Entry Processor and Continuous Query and its associated Event Listener are all able to emit data directly. However, one may want to apply additional processing or compute to determine if, when, how, or to whom this data should be emitted. In this case other components may be used, including:
- Services - as described previously, we can encapsulate the necessary data emit logic in a service.
- Compute - similarly compute tasks can be used to execute the necessary data emit logic.
- Machine Learning Model - Ignite provides a rich set of distributed ML algorithms that can process data in real-time. Once the ML Algorithm has been configured, trained and deployed, new data entries can be processed and predictions or classifications can be generated to determine what to make of the new event data (and how to emit).
- Rule - a variant of the Compute or Service, a business rule implemented with Drools, for example, can be solicited to determine how to handle a particular data event.
Source to Target Event Stream Processing Solution
Putting together the Streaming Data Ingest, Event Processing, and Data Egress components into an Apache Ignite Solution can be seen as starting from Source Organizations and their Source Systems through to Target Organizations and their Target Systems as below:
In this article, we looked at how Apache Ignite and its Digital Integration Hub architecture can be patterned for real-time event stream processing. We detailed the Ignite centric view of Event Stream Processing, including the many components that go into an inbound streaming pipeline, event handling and finally dispatching for data egress via the outbound pipeline. Ignite’s combination of data, messaging, and service-oriented APIs enable moving stream event messages from wire protocol, through message parsing on to business message storage and/or service invocation. We described three event dispatching capabilities for handling the event message once it is stored, including the cache interceptor, entry processor, and continuous query. Dispatching events can readily make use of the Ignite cluster’s built-in Service, Compute, and ML processing facilities, or using an embedded Rules Engine like Drools. The combination of event dispatching and the associated logic processing delivers a complex event processing (CEP) system.
With real-time data ingest, event processing and data egress, we have seen how Apache Ignite can deliver an inbound and outbound stream processing plus complex event processing solution. In part 3 we will go into detail on how to use Continuous Queries as part of an Event Stream Processing Solution.