[译] Introducing Complex Event Processing (CEP) with Apache Flink

文报 2019-06-21

原文链接

正文

随着传感网络的普及,智能设备持续收集着越来越多的数据,分析近乎实时,不断增长的数据流是一个巨大的挑战。快速应对变化趋势、交付最新的 BI 应用会成为一个公司成败的关键因素。其中关键问题就是数据流的事件模型检测。

Complex event processing (CEP) 要处理的就是在持续事件中匹配模式的问题。匹配结果通常就是:从输入事件中提取的复杂事件。传统 DBMSs 在固定数据上执行查询,而 CEP 在存储的 query 上执行(译者注:某个范围)。所有不相关的数据会立即丢弃,由于 CEP 查询都是在一个无限的数据流中,这样的优势显而易见。更重要的是,输入实时被处理,系统一旦收到某一个序列的所有数据,结果就会被输出。CEP 因此有着非常高效的实时分析能力。

由此,CEP 的处理范式吸引了很多技术人员兴趣,有着广泛的应用场景。值得注意的是,CEP 现在用在了金融应用,例如:股票市场趋势、信用卡欺诈检测。还有基于 RFID 的追踪和监控,例如:库房小偷检测。CEP 还可以被用于基于用户可疑行为的网络入侵检测。

Apache Flink 有着天生的真正的流处理能力,具有低延迟、高吞吐量的特性,和 CEP 简直绝配。因此,Flink 社区在 Flink 1.0 引入了第一个版本的 CEP library。接下来我们会使用一个数据中心监控的案例介绍其使用。

[译] Introducing Complex Event Processing (CEP) with Apache Flink

假设这样一个场景:数据中心有很多机架,每一个机架都有功率和温度监控。监控设备会不断产生功率和温度事件。基于这些监控事件数据流,我们想要检测出可能要过热的机架,从而调整负载和降温。

针对这种场景,我们采取两阶段处理方法。首先,监控温度事件,当检测到连续两个超过阈值的温度事件,即生成一个当前平均温度的警告(warning),温度报警不一定意味着过热。但是如果看到两个连续的升温警告事件,则生成机架过热报警(alert)。此时,需要采取措施冷却机架。

首先,定义来源的监控事件流,每一个 message 都包含来源 rack ID(机架 ID)。温度事件包含当前温度,功率事件包含当前电压。我们把事件模型定义为 POJOs.

public abstract class MonitoringEvent {
        private int rackID;
        ...
    }
    
    public class TemperatureEvent extends MonitoringEvent {
        private double temperature;
        ...
    }
    
    public class PowerEvent extends MonitoringEvent {
        private double voltage;
        ...
    }

我们可以使用 Flink 的 connector(比如:Kafka, RabbitMQ 等),生成 DataStream<MonitoringEvent> inputEventStream 给 Flink 的 CEP 算子提供输入。首先,我们需要定义检测温度警告的事件模式 (pattern),CEP library 提供了非常直观的 Pattern API 来定义复杂的模式。

每个模式都包含了一个可以定义过滤 (filter) 条件的事件序列。模式 (pattern) 的第一个事件通常都命名为"First Event"。

Pattern.<MonitoringEvent>begin("First Event");

这句话会匹配每一个输入的监控事件(monitoring event),而我们只需要温度大于一定阈值的温度事件(TemperatureEvents),所以我们需要添加 subtype 和 where 语句限制。

Pattern.<MonitoringEvent>begin("First Event")
        .subtype(TemperatureEvent.class)
        .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);

之前说:对于同一个机架,当看到两个连续的高温事件(超过阈值)就产生一个温度报警(TemperatureWarning),Pattern API 提供了 next 调用方法,来添加事件到模式定义中。next 添加的事件发生时间必须紧跟着第一个匹配事件之后,才能触发整个模式的匹配。

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

最后模式的定义包含有一个 within 的 API 调用,用来定义两个连续 TemperatureEvents 必须在 10s 内发生才能匹配。时间基于 time characteristic 设置,可以是:处理时间、输入时间或者事件时间。(译者注 Event Time / Processing Time / Ingestion Time 解释)

定义好事件模型之后,可以将其应用到输入数据流中。

PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);

由于告警是针对单个机架的告警,必须使用 keyBy 通过 rackID 字段对输入事件流分流。即匹配出的事件都是同一个机架的。

PatternStream<MonitoringEvent> 可以访问匹配的事件序列。通过使用 select API 可以访问其上数据,给 select API 传入 PatternSelectFunction,PatternSelectFunction 会在每一个匹配上的事件序列上执行。事件序列通过 Map<String, MonitoringEvent> 访问,MonitoringEvent 通过之前分配的事件名称来定位。这里我们通过 select function 针对每一个匹配的模式产生一个 TemperatureWarning 事件。

public class TemperatureWarning {
        private int rackID;
        private double averageTemperature;
        ...
    }
    
    DataStream<TemperatureWarning> warnings = tempPatternStream.select(
        (Map<String, MonitoringEvent> pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            return new TemperatureWarning(
                first.getRackID(), 
                (first.getTemperature() + second.getTemperature()) / 2);
        }
    );

现在我们从原始监控事件流(monitoring event stream)生成了一个复杂事件流 DataStream<TemperatureWarning> 警告。这个复杂事件流可以再次被用作其他复杂事件处理的输入。当同一个机架产生两个连续升温警告时,我们使用 TemperatureWarnings 来生成 TemperatureAlerts。TemperatureAlerts 定义如下:

public class TemperatureAlert {
        private int rackID;
        ...
    }

首先定义报警事件

Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("First Event")
        .next("Second Event")
        .within(Time.seconds(20));

定义描述了在 20s 内有两个 TemperatureWarnings 事件,并且第一个事件名称为 "First Event",紧接着的第二个为 “Second Event”。这来了个事件都没有 where 语句,因为需要访问两个事件才能判断温度时候增长。因此,下面我们需要在 select 语句中使用 filter 条件来提取。这里我们只是生成了 PatternStream。

PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
        warnings.keyBy("rackID"),
        alertPattern);

同样,我们需要 keyBy 对输入的告警数据流针对同一个机架进行分流。然后使用 flatSelect 方法访问匹配的事件序列,当判断温度上升时生成 TemperatureAlert 告警。

DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
        (Map<String, TemperatureWarning> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureWarning first = pattern.get("First Event");
            TemperatureWarning second = pattern.get("Second Event");
    
            if (first.getAverageTemperature() < second.getAverageTemperature()) {
                out.collect(new TemperatureAlert(first.getRackID()));
            }
        });

DataStream<TemperatureAlert> 告警是针对同一个机架的数据流,基于这个数据我们现在可以调整负载和降温。源代码地址(译者注:注意阅读 readme)

总结:

本文描述了使用 Flink CEP library 可以很容易处理事件流。我们通过数据中心的监控和报警案例,完成了服务器机架过热报警的小程序。
未来 Flink 社区会持续扩展 CEP library 的功能和表述能力。接下来的 road map 是支持类正则表达式的模式实现,包括 *, 上下限制和否定。此外,还计划允许 where 语句访问之前匹配的事件字段。这个特性可以让我们提前删除不需要的事件序列。

阅读材料:

本内容为译者添加

相关推荐

登峰小蚁 / 0评论 2020-01-10