Friday, October 2, 2015

Execution Plans in Distributed Mode of WSO2 CEP 4.0.0

In both distributed and standalone modes of WSO2 CEP, the processing logic has to be written in SiddhiQL. In standalone mode of CEP the quarries will be executed inside a Siddhi engine that's inside the CEP server itself. In contrast, in distributed mode the quarries will be executed inside Siddhi engines embedded to bolts of a storm topology. Please refer my previous post for details on the architecture of WSO2 CEP 4.0.0 distributed mode.This post intends to give some insight into execution plans in distributed mode, and it does not include writing Siddhi quaries in the scope.

What happens when adding an execution plan?

In WSO2 CEP Siddhi quarries has to be written in "Execution plans". In distributed mode, the management console of the CEP manager(s) MUST be used to create execution plans(you can't use the management console of CEP workers).

When an execution plan is submitted, it will be converted to something call a "Query Plan". A Query plan is an XML generated by parsing the Siddhi quarries of a given execution plan. It defines the components and layout of the storm topology that will be generated for a execution plan. It contains the configurations  such as  input/output streams, number of instances to be spawned and what siddhi queries to be executed inside(for bolts), etc. for each component. The query plan can be viewed by enabling the debug logs of StormTopologyManager class
(Add log4j.logger.org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager=DEBUG in <CEP_HOME>/repository/conf/log4j.properties)


Once the storm topology is generated by looking at the query plan it will be submitted to the storm  along with org.wso2.cep.storm.dependencies.jar. This jar file contains required classes to run the topology in storm cluster. It consists of dependencies such as Siddhi, etc. The Nimbus(manager of storm cluster) will distribute it across the storm cluster. This is located at <CEP_HOME>/repository/conf/cep/storm.


When the topology is successfully submitted to storm the CEP manager should print a log similar to following ,

Figure 1

Working With Execution Plans

In each execution plan we can specify the name of it with @Plan:name annotation. CEP manager will create a storm topology with the name specified in this annotation, plus it will append the tenant ID to the name of the topology. This is done to avoid  name collisions of  storm topologies belonging to different tenants. For an example, if we create an execution plan with  @Plan:name(‘StockAnalysis’) for supper tenant (who's tenant ID is -1234)  we should be able to see a storm topology as follows in Storm UI

Figure 2
When a change is made in the execution plan by editing it, the existing topology will be killed and wiped out from storm cluster and a the topology will be re-submitted with the changes.

Execution plan status

Before start publishing data we should make sure that the execution plains is ready to accept data. This can be achieved by looking at the the status of the Execution plan through the management console of the CEP manager. For an execution plan to be ready for processing data it should meet following conditions,
  • Associated storm topology is in  ACTIVE state in storm cluster
  • All connections between CEP workers and event receiver spouts(inflow connections) are made
  • All connections between Event publisher bolts are CEP workers(outflow connections)  are made.
These 3 conditions can be monitored through the "Distributed Deployment Status" column of the "Execution Plans" page.


When an execution plan is ready to process data it should show a status similar to above image. The status in the above image says,
  • "StockAnalysis[<tenantID>]" topology is in ACTIVE state in storm topology
  • CEP worker at 172.17.42.1 has established all inflow connections by connecting to storm receivers
  • All event publisher bolts has connected to CEP workers, so that all outflow connections are made.

Things to Note

  • If the  name of the execution plan is changed in the middle then the storm topology generated with the previous name will remain in the storm topology. This has to be killed through the Storm UI
  • The number of Execution plans that can be deployed in distributed mode will be bounded by the number of slots(workers) in the storm cluster. By default each execution plan  will require at least one slot to deploy it's storm topology, this value can be changed by adding topology.workers : <number> to repository/conf/cep/storm/storm.yaml. For an example, if we set topology.workers : 2 and number of worker processes in the storm cluster is 10. Then, it's only possible to add 10/2 = 5 execution plans. 
  • Currently, there's no way of controlling how bolts/spouts are spread acorss the storm cluster. It's done in a round robin manner by default storm scheduler.