Friday, October 2, 2015

Execution Plans in Distributed Mode of WSO2 CEP 4.0.0

In both distributed and standalone modes of WSO2 CEP, the processing logic has to be written in SiddhiQL. In standalone mode of CEP the quarries will be executed inside a Siddhi engine that's inside the CEP server itself. In contrast, in distributed mode the quarries will be executed inside Siddhi engines embedded to bolts of a storm topology. Please refer my previous post for details on the architecture of WSO2 CEP 4.0.0 distributed mode.This post intends to give some insight into execution plans in distributed mode, and it does not include writing Siddhi quaries in the scope.

What happens when adding an execution plan?

In WSO2 CEP Siddhi quarries has to be written in "Execution plans". In distributed mode, the management console of the CEP manager(s) MUST be used to create execution plans(you can't use the management console of CEP workers).

When an execution plan is submitted, it will be converted to something call a "Query Plan". A Query plan is an XML generated by parsing the Siddhi quarries of a given execution plan. It defines the components and layout of the storm topology that will be generated for a execution plan. It contains the configurations  such as  input/output streams, number of instances to be spawned and what siddhi queries to be executed inside(for bolts), etc. for each component. The query plan can be viewed by enabling the debug logs of StormTopologyManager class
(Add log4j.logger.org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager=DEBUG in <CEP_HOME>/repository/conf/log4j.properties)


Once the storm topology is generated by looking at the query plan it will be submitted to the storm  along with org.wso2.cep.storm.dependencies.jar. This jar file contains required classes to run the topology in storm cluster. It consists of dependencies such as Siddhi, etc. The Nimbus(manager of storm cluster) will distribute it across the storm cluster. This is located at <CEP_HOME>/repository/conf/cep/storm.


When the topology is successfully submitted to storm the CEP manager should print a log similar to following ,

Figure 1

Working With Execution Plans

In each execution plan we can specify the name of it with @Plan:name annotation. CEP manager will create a storm topology with the name specified in this annotation, plus it will append the tenant ID to the name of the topology. This is done to avoid  name collisions of  storm topologies belonging to different tenants. For an example, if we create an execution plan with  @Plan:name(‘StockAnalysis’) for supper tenant (who's tenant ID is -1234)  we should be able to see a storm topology as follows in Storm UI

Figure 2
When a change is made in the execution plan by editing it, the existing topology will be killed and wiped out from storm cluster and a the topology will be re-submitted with the changes.

Execution plan status

Before start publishing data we should make sure that the execution plains is ready to accept data. This can be achieved by looking at the the status of the Execution plan through the management console of the CEP manager. For an execution plan to be ready for processing data it should meet following conditions,
  • Associated storm topology is in  ACTIVE state in storm cluster
  • All connections between CEP workers and event receiver spouts(inflow connections) are made
  • All connections between Event publisher bolts are CEP workers(outflow connections)  are made.
These 3 conditions can be monitored through the "Distributed Deployment Status" column of the "Execution Plans" page.


When an execution plan is ready to process data it should show a status similar to above image. The status in the above image says,
  • "StockAnalysis[<tenantID>]" topology is in ACTIVE state in storm topology
  • CEP worker at 172.17.42.1 has established all inflow connections by connecting to storm receivers
  • All event publisher bolts has connected to CEP workers, so that all outflow connections are made.

Things to Note

  • If the  name of the execution plan is changed in the middle then the storm topology generated with the previous name will remain in the storm topology. This has to be killed through the Storm UI
  • The number of Execution plans that can be deployed in distributed mode will be bounded by the number of slots(workers) in the storm cluster. By default each execution plan  will require at least one slot to deploy it's storm topology, this value can be changed by adding topology.workers : <number> to repository/conf/cep/storm/storm.yaml. For an example, if we set topology.workers : 2 and number of worker processes in the storm cluster is 10. Then, it's only possible to add 10/2 = 5 execution plans. 
  • Currently, there's no way of controlling how bolts/spouts are spread acorss the storm cluster. It's done in a round robin manner by default storm scheduler.

Tuesday, September 29, 2015

WSO2 CEP 4.0.0 in Distributed Mode

Overview

WSO2 CEP is an powerful open source lightweight, low latency  complex event processing engine. Apache storm is a real time distributed processing system. The integration of WSO2 CEP with Apache storm combines the advanced complex event processing capabilities of WSO2 CEP with the powerful scalability offered by the distributees of Apache Storm.

With this integration WSO2 CEP can execute different Siddhi queries on different nodes of a storm cluster. Therefore, high resource consuming queries can be executed on nodes with more resource so that, other lightweight queries are not affected and can run smoothly without being starved. Also, it’s possible to  have multiple instances of each query running in the cluster to handle higher volumes of events. By partitioning the event streams each partition can be processed parallely in the cluster to achieve higher throughput. These capabilities allows to build highly scalable CEP systems which can handle massive volumes of event per time unit which cannot be handled with a monolithic CEP server. 

High Level Architecture

By this integration the Siddhi engine is placed to run in an storm cluster as bolts instead of running it inside WSO2 CEP server. Therefore, all the processing of events is done inside the  external storm cluster.

Since Siddhi runs inside storm, it allows to spawn multiple instances of siddhi engines to run across a storm cluster. This results in a system which highly horizontally scalable and capable of handling large volumes of events per time unit.
 
Event receiving and publishing is done by CEP Servers while event processing is done in a storm cluster. With this architecture we can increase the number of CEP servers to scale with the incoming/outgoing load as well as scale up the storm cluster easily which does the processing for increasing loads with the powerful scalability of Apache storm.

 

Components

In the above architecture, several components are working in collaboration.  Let’s see what each components does in brief

CEP Workers

A given CEP worker can play two roles at the same time. They are,
  • CEP receiver : Receives events from an external event source and forwards them to the Event Receiver Spout in storm cluster. CEP Receiver will create a connection for each imported stream of each execution plan with storm. Therefore, events of different imported streams are fed into storm cluster via different connections.
  • CEP Publisher : Receives events from Event publisher bolt and send them to external clients. CEP receiver will start listening for events on a certain port. This service can receive all events belonging to all exported streams of a execution plan.
There should be at least 1 CEP worker and can have any number to match the load.

CEP Manager

CEP manager hosts an administrative service which provides connectivity details of   components to facilitate them to connect with each other. Also, the manager is responsible generating topologies and for deploying topologies to the storm cluster for each execution plan.

The management console of CEP manager must be used to create, update and delete execution plans. CEP workers will synchronize themselves with CEP manager by fetching all the artifacts that are related to the execution plans with the usage of deployment synchronization.

It’s advisable to have at least two CEP managers in a production deployment. If the primary CEP Manager goes down other CEP manager will take over and the cluster will continue to function.

A given CEP instance can act as Manager and worker at the same time.

Storm Cluster

There will be a topology residing in storm cluster for each Execution plan. All the topologies will be a combination of following types of components,
  • Event Receiver Spouts: Receives events from the CEP receiver and pass them through to the Siddhi Bolt for processing. Event receiver spouts receive events belonging to imported streams of execution plans.
  • Siddhi Bolts : A storm bolt which runs Siddhi engine embedded in it. Actual event processing is done inside these. It receives events from Event Receiver Spout and publishes processed events to Event publisher bolt. Each siddhi bolts will execute a single query or a set of queries belonging to the same group(We can group queries when writing the quarries in an execution plan).
  • Event publisher Bolts : Receives processed events from Siddhi Bolt and pass them through to a CEP publisher. Event publisher bolts publishes events belongs to exported streams of an execution plan
Number of the each components to be spawned can be specified in the execution plan.

How do components connect with each other

All these components distributed across several machines has to connect with each other to function as a single system. This section describes how the inter-connections are made. There are basically two types of interconnectivity
  • Inflow : Connections between CEP receivers and Storm receiver spouts. Incoming events from external events sources are flowing through these.
  • Outflow: Connections between Storm publisher bolts and CEP publishers. Processed events are published to external event sinks through outflow connections.
When an execution plan is deployed, the CEP manager will generate a storm topology accordingly and submit to storm cluster . 

Location of the CEP managers is specified through a configuration, so that all CEP workers and Storm components know where CEP manager is. CEP manager will contain two maps, one to keep track of the event receiver spouts of each execution plan and another to keep track of CEP publishers for each execution plan.

Event Receiver spouts and CEP publishers will start event receiver servers which will start listening for events on a certain port of the residing host. The port on which each of these components will start listening on is decided by them and communicated to the CEP manager.
Event receiver spouts of all storm topologies and all CEP publishers will register themselves with CEP manager by specifying the IP address, port and with the execution plan name and tenant ID (“I’m ready to receive events for MyExecutionPlan of tenant -1234 through my.host.name and i'm listening on port 15000”).

Then, the CEP Workers  will connect to CEP manager and ask for a storm receiver spout by specifying its execution plan name and the tenant ID (“Give me the IP address and the listening port of a Event Receiver spout for Execution plan MyExecutionPlan of teant -1234”). So that, CEP Worker can connect with event receiver spouts and pass the events from external event source to the storm topology via them. This way the inflow connections are made. Below diagram depicts this process,

Likewise, event publisher bolts will ask for  CEP Workers from storm manager service (“Give me the IP and port of a CEP Worker for MyExecutionPlan of teant ID -1234”). So that, Event publisher bolts can publish out the processed events to be sent out to external event sinks. This way the outflow connections are made.

When it comes to actual TCP connections, there will be a point-to-point connection made from each CEP worker to event receiver spout per imported stream per execution plan(if there are two execution plans here both are importing stream foo and bar there will be 4 connections made. If there are 2 wrokers 8 connections will be made), so that events belonging to different streams are sent through different TCP connections even for the same execution plan. On the event receiving side CEP worker will start only one service per execution plan and it can receive events belonging to all exported streams of the execution plan. A given event publisher bolt will make only one point-to-point connection to only one such service of a single CEP workers to publish it's events belongs to all exported streams.

Below diagram shows how the connections are made when there's only 1 CEP worker and one execution plan deployed. Where the execution plan imports 3 streams and exports 2 streams.



When all inflow and outflow connections are made and the storm topology is active the deployment is ready for processing.

The decision on which CEP Worker connects to which Event Receiver spout and Which event publisher bolt connects which CEP worker is taken by CEP manager. It will consider the following criteria when taking this decision, (Let's call listening {IP:Port} of a Event Receiver spout/CEP publisher as Endpoint)
  1. If the endpoint has not send heartbeat for a while CEP manager will  ignore that endpoint and log a warning.(This may be a connectivity issue or the endpoint has failed.)
  2. If there’s an endpoint which is in the same machine as the requester it will be given returned(So that events can be sent out quickly becuase it doesn't have to go through the network)
  3. If there are no endpoints in the same machine, returns the endpoint with least number of connections.(So that the connections are distributed across endpoints evenly and no endpoint gets overloaded/)

Sending and Receiving Events

A given CEP Worker will only connect to a single storm receiver. And all CEP workers will be connected to at least one Event receiver spout. An external event source can send events to any CEP Worker.

But receiving processed events is bit different.  Events will not be published through all CEP Workers for a given execution plan since All CEP workers are NOT connected a Event Publisher bolt.

The number of CEP workers who publishes events for a given execution plan is <= to Event publisher bolts of that execution plan. . For an example if there’s only one event publisher bolt in an execution plan, that bolt will only connect to a single CEP worker and processed events are published only through it.

Further Reference 

[1]  - Sample on WSO2 CEP 4.0.0 worrking with Apache storm
[2] - Deployment guide for a distributed setup of WSO2 CEP 4.0.0