Complex Event Processing is used to process a large stream of information and can be used for real-time event monitoring or correlation. Events can be processed in two ways, that is either in the 'stream' mode or in the 'cloud' mode. The following image illustrates the difference between the two modes:
GitHub Link for Repository
https://github.com/sumithpuri/skp-code-marathon-sherlock
https://github.com/sumithpuri/skp-code-marathon-sherlock
The continuous flow of information or events can be classified into one of these brackets (or even both) for analysis or correlation. The cloud mode would be useful in the following circumstances: user behavior, market data and activity monitoring. The stream mode could be most useful in applications such as: real-time event monitoring, event correlation, and sensor networks.
The most useful end-applications are Threat Detection, Anomaly Detection, Airport Security, Market Prediction, Forecasting Profits, Automating Algorithmic Trading Decision among a host of other applications.
By the way, Sliding Window and Batch Window will need more clarity for any discussion on Compex Event Processing. For most Architects and Engineers - this will come across a very novel way of Analyzing Information - If this is the first time they are reading about this:
The Batch Window illustration as given below, demonstrates that the information window is processed in discrete or fixed slot or block of events.
[Image Available From Oracle]
[Image Available From Oracle]
[Publicly Available Image from Google Image Search]
Introducing Sherlock! (Mystery That is Data), which is an event correlation application that demonstrates the above concept of Complex Event Processing. It is built for the domain of banking for anomaly and threat detection. It will analyze the following use-cases that have been listed at the 'Top Threats, Especially in Banking Sector By SANS Instittute'. SANS Institute is a Co-operative Research and Training Institute for Information Security.
1. Detect if there are more than Ten Port or IP Scan Attempts from the Same IP Address (and Port) in any of the last 10 seconds [Port Scan and IP Scan By SANS Institute]
2. Detect if there are more than Five Repeated Login Attempts from the Same IP Address in any of the last 30 seconds [Database Login or Intrusion Attempts by SANS Institute]
3. Detect if the traffic on a Port x has all of a Sudden Spiked than the History - Any of the last 30 seconds had more than five accesses [Repeated Port Access by SANS Institute]
I will demonstrate only the first use-case in this blog (including how to run the 'Intelligent Data Loader' and Possibly Hookup with a 'User Interface') to understand the anomaly and complex event processing. You may need to do the following before you can download and understand Sherlock!:
A. Download Drools 6.1.0 Distribution (Include in Classpath)
B. Download the Eclipse Plugin for Drools (Include in Classpath)
C. Use JDK 1.8.0 and JEE 1.7 Libraries (If Required) (Include in Classpath)
D. Brief Read on MVEL Dialect and Drools Fusion (Above/Official)
1. Create the SherlockEvent (and SherlockEventCorrelation) Java Object
package com.bw2015.sherlock.biz.vo;
/**
* @author spuri
*
*/
public class SherlockEvent {
private int eventId;
private String eventType;
private String eventDescription;
private String eventSourceIp;
private String eventDestinationIp;
private String eventSourcePort;
private String eventDestinationPort;
private String eventSourceCountry;
private String eventDestinationCountry;
private String eventSourceUsername;
private String eventDestinationUsername;
private String eventRemarks;
private long eventSourceTime;
private long eventDestinationTimestamp; ... // Refer Bundled Code
2. Code the 'Rule/Condition' using Drools 'MVEL' Dialect (Use-Case 01)
package com.bw2015.sherlock.biz.cep
// list any import classes here.
import com.bw2015.sherlock.biz.vo.SherlockEvent;
import com.bw2015.sherlock.biz.vo.SherlockEventCorrelation;
declare SherlockEvent
@role(event)
@expires(20s)
@timestamp (eventDestinationTimestamp)
end
declare SherlockEventCorrelation
@role(event)
@expires(20s)
@timestamp (eventDestinationTimestamp)
end
global Long startTime;
global Long startMemory;
global Long totalFactCount;
global java.util.HashMap threatMap;
// use case 01
// detect if there are more than ten port or ip scan attempts from the same ip address (and port)
// to the destination ip address (and multiple ports) in the given window
rule "Port and IP Scan Event Processing Initial"
dialect "mvel"
no-loop
when
e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)
not SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort)
then
SherlockEventCorrelation plec = new SherlockEventCorrelation();
plec.setEventSourceIp(e1.eventSourceIp);
plec.setEventDestinationIp(e1.eventDestinationIp);
plec.setEventSourcePort(e1.eventSourcePort);
plec.setEventDestinationPort(e1.eventDestinationPort);
plec.setEventCorrelation(0);
insert(plec);
end
rule "Port and IP Scan Event Processing Correlation"
dialect "mvel"
no-loop
when
e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)
ce: SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort, $eventCorr : eventCorrelation >= 0)
then
$eventCorr++;
ce.eventCorrelation=$eventCorr;
if(ce.eventCorrelation >= 10) {
System.out.println("");
System.out.println("+++++++++++++++ USE CASE 01 +++++++++++++++");
System.out.println("SOURCE INET: " + ce.eventSourceIp);
System.out.println("SOURCE PORT: " + ce.eventSourcePort);
System.out.println("DESTIN INET: " + ce.eventDestinationIp);
System.out.println("EVENT ACTN: " + "port and ip scan");
System.out.println("TIMESTAMP : " + new java.util.Date(ce.eventDestinationTimestamp));
System.out.println("OCCURENCES : " + $eventCorr);
System.out.println("+++++++++++++++++++++++++++++++++++++++++++");
System.out.println("");
}
update( ce );
threatMap.put(new java.util.Date(), ce);
end
3. Configure Drools Fusion
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule">
<kbase name="event" packages="event" eventProcessingMode="stream">
<ksession name="sherlock-event"/>
</kbase>
</kmodule>
The above file named 'kmodule.xml' is include in the META-INF of your project. Make sure you make it available in the classpath of your main class.
4. Code the Drools Java Runtime to Send or Process Events
The SherlockComplexEventProcessing includes the Java Code for Drools Fusion Runtime. The following are the most important activites perfromed by this runtime.
X. Declare Drools Java Runtime Variables
Y. Initialize and Instantiate Drools Variables
Z. Send Event by Event for Complex Event Processing to Drools Runtime
X. Declare Drools Java Runtime Variables
package com.bw2015.sherlock.biz.cep;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.internal.KnowledgeBase;
import org.kie.internal.KnowledgeBaseFactory;
import org.kie.internal.builder.KnowledgeBuilder;
import org.kie.internal.builder.KnowledgeBuilderFactory;
import com.bw2015.sherlock.biz.vo.SherlockEvent;
import com.bw2015.sherlock.biz.vo.SherlockEventCorrelation;
/**
* @author spuri
*
* SherlockComplexEventProcessing is a Sherlock service that provides the most
* essential part of feeding data to the Knowledge Is Everything API of Drools.
* It will provide data that is in order or even out-of-order. In essence, it
* provides the core of the Sherlock Intellect.
*
*/
public class SherlockComplexEventProcessing {
private static SherlockComplexEventProcessing cepService = null;
// Drools Fusion Runtime Configuration
private KieBaseConfiguration kieConfiguration;
private KnowledgeBase kieBase;
private KieServices ks;
private KieContainer kContainer;
private KieSession kSession;
private KnowledgeBuilder kbuilder; ... // Refer Bundled Code
Y. Initialize and Instantiate Drools Variables
public void init() {
try {
System.out.println("initializing kie runtime for drools fusion...");
kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
// kbuilder.add(ResourceFactory.newClassPathResource("event.drl"),
// ResourceType.DRL);
if (kbuilder.hasErrors()) {
System.out.println(kbuilder.getErrors().toString());
}
kieConfiguration = KieServices.Factory.get().newKieBaseConfiguration();
kieConfiguration.setProperty("drools.dialect.mvel.strict", "false");
kieConfiguration.setProperty("org.kie.demo", "false");
kieConfiguration.setOption(EventProcessingOption.STREAM);
ks = KieServices.Factory.get();
kContainer = ks.getKieClasspathContainer();
kieBase = KnowledgeBaseFactory.newKnowledgeBase(kieConfiguration);
kieBase.addKnowledgePackages(kbuilder.getKnowledgePackages());
// clock type for the session
KieSessionConfiguration sessionConfiguration = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
sessionConfiguration.setOption(ClockTypeOption.get("realtime"));
kSession = kContainer.newKieSession("sherlock-event", sessionConfiguration);
kSession.setGlobal("threatMap", new HashMap<Long,SherlockEventCorrelation>());
kSession.setGlobal("startTime", new Date().getTime());
kSession.setGlobal("startMemory", Runtime.getRuntime().freeMemory());
kSession.setGlobal("totalFactCount", totalFactCount);
System.out.println("initialized the kie runtime for drools fusion...");
} catch (Exception e) {
e.printStackTrace();
}
}
Z. Send Event by Event for Complex Event Processing to Drools Runtime
public void execute(SherlockEvent event) {
// try {
// anything to with event object
kSession.setGlobal("totalFactCount", totalFactCount++);
kSession.insert(event);
kSession.fireAllRules();
HashMap threatM=(HashMap) kSession.getGlobal("threatMap");
LinkedList list=new LinkedList();
list.addAll(threatM.values());
threats.pushAll(list);
if(prevTime==0) prevTime=Long.parseLong(kSession.getGlobal("startTime").toString());
currTime=new Date().getTime();
}
}
5. Setup the Data Loader (Asynchronous is Preferred Mode - Think of JMS Extension)
Now run the SherlockDataLoaderDriver which in turn starts the SherlockDataLoaderThread to intelligently load random data and 'Inject Positive Cases' into the large stream of information. We have controlled the above data load to create only 100 random records and then wait for 10 seconds. You can change this for your demo or poc purposes to suite a larger data stream and lesser or greater wait time.