Tuesday, September 29, 2015

WSO2 CEP 4.0.0 in Distributed Mode


WSO2 CEP is an powerful open source lightweight, low latency  complex event processing engine. Apache storm is a real time distributed processing system. The integration of WSO2 CEP with Apache storm combines the advanced complex event processing capabilities of WSO2 CEP with the powerful scalability offered by the distributees of Apache Storm.

With this integration WSO2 CEP can execute different Siddhi queries on different nodes of a storm cluster. Therefore, high resource consuming queries can be executed on nodes with more resource so that, other lightweight queries are not affected and can run smoothly without being starved. Also, it’s possible to  have multiple instances of each query running in the cluster to handle higher volumes of events. By partitioning the event streams each partition can be processed parallely in the cluster to achieve higher throughput. These capabilities allows to build highly scalable CEP systems which can handle massive volumes of event per time unit which cannot be handled with a monolithic CEP server. 

High Level Architecture

By this integration the Siddhi engine is placed to run in an storm cluster as bolts instead of running it inside WSO2 CEP server. Therefore, all the processing of events is done inside the  external storm cluster.

Since Siddhi runs inside storm, it allows to spawn multiple instances of siddhi engines to run across a storm cluster. This results in a system which highly horizontally scalable and capable of handling large volumes of events per time unit.
Event receiving and publishing is done by CEP Servers while event processing is done in a storm cluster. With this architecture we can increase the number of CEP servers to scale with the incoming/outgoing load as well as scale up the storm cluster easily which does the processing for increasing loads with the powerful scalability of Apache storm.



In the above architecture, several components are working in collaboration.  Let’s see what each components does in brief

CEP Workers

A given CEP worker can play two roles at the same time. They are,
  • CEP receiver : Receives events from an external event source and forwards them to the Event Receiver Spout in storm cluster. CEP Receiver will create a connection for each imported stream of each execution plan with storm. Therefore, events of different imported streams are fed into storm cluster via different connections.
  • CEP Publisher : Receives events from Event publisher bolt and send them to external clients. CEP receiver will start listening for events on a certain port. This service can receive all events belonging to all exported streams of a execution plan.
There should be at least 1 CEP worker and can have any number to match the load.

CEP Manager

CEP manager hosts an administrative service which provides connectivity details of   components to facilitate them to connect with each other. Also, the manager is responsible generating topologies and for deploying topologies to the storm cluster for each execution plan.

The management console of CEP manager must be used to create, update and delete execution plans. CEP workers will synchronize themselves with CEP manager by fetching all the artifacts that are related to the execution plans with the usage of deployment synchronization.

It’s advisable to have at least two CEP managers in a production deployment. If the primary CEP Manager goes down other CEP manager will take over and the cluster will continue to function.

A given CEP instance can act as Manager and worker at the same time.

Storm Cluster

There will be a topology residing in storm cluster for each Execution plan. All the topologies will be a combination of following types of components,
  • Event Receiver Spouts: Receives events from the CEP receiver and pass them through to the Siddhi Bolt for processing. Event receiver spouts receive events belonging to imported streams of execution plans.
  • Siddhi Bolts : A storm bolt which runs Siddhi engine embedded in it. Actual event processing is done inside these. It receives events from Event Receiver Spout and publishes processed events to Event publisher bolt. Each siddhi bolts will execute a single query or a set of queries belonging to the same group(We can group queries when writing the quarries in an execution plan).
  • Event publisher Bolts : Receives processed events from Siddhi Bolt and pass them through to a CEP publisher. Event publisher bolts publishes events belongs to exported streams of an execution plan
Number of the each components to be spawned can be specified in the execution plan.

How do components connect with each other

All these components distributed across several machines has to connect with each other to function as a single system. This section describes how the inter-connections are made. There are basically two types of interconnectivity
  • Inflow : Connections between CEP receivers and Storm receiver spouts. Incoming events from external events sources are flowing through these.
  • Outflow: Connections between Storm publisher bolts and CEP publishers. Processed events are published to external event sinks through outflow connections.
When an execution plan is deployed, the CEP manager will generate a storm topology accordingly and submit to storm cluster . 

Location of the CEP managers is specified through a configuration, so that all CEP workers and Storm components know where CEP manager is. CEP manager will contain two maps, one to keep track of the event receiver spouts of each execution plan and another to keep track of CEP publishers for each execution plan.

Event Receiver spouts and CEP publishers will start event receiver servers which will start listening for events on a certain port of the residing host. The port on which each of these components will start listening on is decided by them and communicated to the CEP manager.
Event receiver spouts of all storm topologies and all CEP publishers will register themselves with CEP manager by specifying the IP address, port and with the execution plan name and tenant ID (“I’m ready to receive events for MyExecutionPlan of tenant -1234 through my.host.name and i'm listening on port 15000”).

Then, the CEP Workers  will connect to CEP manager and ask for a storm receiver spout by specifying its execution plan name and the tenant ID (“Give me the IP address and the listening port of a Event Receiver spout for Execution plan MyExecutionPlan of teant -1234”). So that, CEP Worker can connect with event receiver spouts and pass the events from external event source to the storm topology via them. This way the inflow connections are made. Below diagram depicts this process,

Likewise, event publisher bolts will ask for  CEP Workers from storm manager service (“Give me the IP and port of a CEP Worker for MyExecutionPlan of teant ID -1234”). So that, Event publisher bolts can publish out the processed events to be sent out to external event sinks. This way the outflow connections are made.

When it comes to actual TCP connections, there will be a point-to-point connection made from each CEP worker to event receiver spout per imported stream per execution plan(if there are two execution plans here both are importing stream foo and bar there will be 4 connections made. If there are 2 wrokers 8 connections will be made), so that events belonging to different streams are sent through different TCP connections even for the same execution plan. On the event receiving side CEP worker will start only one service per execution plan and it can receive events belonging to all exported streams of the execution plan. A given event publisher bolt will make only one point-to-point connection to only one such service of a single CEP workers to publish it's events belongs to all exported streams.

Below diagram shows how the connections are made when there's only 1 CEP worker and one execution plan deployed. Where the execution plan imports 3 streams and exports 2 streams.

When all inflow and outflow connections are made and the storm topology is active the deployment is ready for processing.

The decision on which CEP Worker connects to which Event Receiver spout and Which event publisher bolt connects which CEP worker is taken by CEP manager. It will consider the following criteria when taking this decision, (Let's call listening {IP:Port} of a Event Receiver spout/CEP publisher as Endpoint)
  1. If the endpoint has not send heartbeat for a while CEP manager will  ignore that endpoint and log a warning.(This may be a connectivity issue or the endpoint has failed.)
  2. If there’s an endpoint which is in the same machine as the requester it will be given returned(So that events can be sent out quickly becuase it doesn't have to go through the network)
  3. If there are no endpoints in the same machine, returns the endpoint with least number of connections.(So that the connections are distributed across endpoints evenly and no endpoint gets overloaded/)

Sending and Receiving Events

A given CEP Worker will only connect to a single storm receiver. And all CEP workers will be connected to at least one Event receiver spout. An external event source can send events to any CEP Worker.

But receiving processed events is bit different.  Events will not be published through all CEP Workers for a given execution plan since All CEP workers are NOT connected a Event Publisher bolt.

The number of CEP workers who publishes events for a given execution plan is <= to Event publisher bolts of that execution plan. . For an example if there’s only one event publisher bolt in an execution plan, that bolt will only connect to a single CEP worker and processed events are published only through it.

Further Reference 

[1]  - Sample on WSO2 CEP 4.0.0 worrking with Apache storm
[2] - Deployment guide for a distributed setup of WSO2 CEP 4.0.0

No comments:

Post a Comment