文报 2019-06-21
随着传感网络的普及,智能设备持续收集着越来越多的数据,分析近乎实时,不断增长的数据流是一个巨大的挑战。快速应对变化趋势、交付最新的 BI 应用会成为一个公司成败的关键因素。其中关键问题就是数据流的事件模型检测。
Complex event processing (CEP) 要处理的就是在持续事件中匹配模式的问题。匹配结果通常就是:从输入事件中提取的复杂事件。传统 DBMSs 在固定数据上执行查询,而 CEP 在存储的 query 上执行(译者注:某个范围)。所有不相关的数据会立即丢弃,由于 CEP 查询都是在一个无限的数据流中,这样的优势显而易见。更重要的是,输入实时被处理,系统一旦收到某一个序列的所有数据,结果就会被输出。CEP 因此有着非常高效的实时分析能力。
由此,CEP 的处理范式吸引了很多技术人员兴趣,有着广泛的应用场景。值得注意的是,CEP 现在用在了金融应用,例如:股票市场趋势、信用卡欺诈检测。还有基于 RFID 的追踪和监控,例如:库房小偷检测。CEP 还可以被用于基于用户可疑行为的网络入侵检测。
Apache Flink 有着天生的真正的流处理能力,具有低延迟、高吞吐量的特性,和 CEP 简直绝配。因此,Flink 社区在 Flink 1.0 引入了第一个版本的 CEP library。接下来我们会使用一个数据中心监控的案例介绍其使用。
假设这样一个场景:数据中心有很多机架,每一个机架都有功率和温度监控。监控设备会不断产生功率和温度事件。基于这些监控事件数据流,我们想要检测出可能要过热的机架,从而调整负载和降温。
针对这种场景,我们采取两阶段处理方法。首先,监控温度事件,当检测到连续两个超过阈值的温度事件,即生成一个当前平均温度的警告(warning),温度报警不一定意味着过热。但是如果看到两个连续的升温警告事件,则生成机架过热报警(alert)。此时,需要采取措施冷却机架。
首先,定义来源的监控事件流,每一个 message 都包含来源 rack ID(机架 ID)。温度事件包含当前温度,功率事件包含当前电压。我们把事件模型定义为 POJOs.
public abstract class MonitoringEvent { private int rackID; ... } public class TemperatureEvent extends MonitoringEvent { private double temperature; ... } public class PowerEvent extends MonitoringEvent { private double voltage; ... }
我们可以使用 Flink 的 connector(比如:Kafka, RabbitMQ 等),生成 DataStream<MonitoringEvent> inputEventStream 给 Flink 的 CEP 算子提供输入。首先,我们需要定义检测温度警告的事件模式 (pattern),CEP library 提供了非常直观的 Pattern API 来定义复杂的模式。
每个模式都包含了一个可以定义过滤 (filter) 条件的事件序列。模式 (pattern) 的第一个事件通常都命名为"First Event"。
Pattern.<MonitoringEvent>begin("First Event");
这句话会匹配每一个输入的监控事件(monitoring event),而我们只需要温度大于一定阈值的温度事件(TemperatureEvents),所以我们需要添加 subtype 和 where 语句限制。
Pattern.<MonitoringEvent>begin("First Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);
之前说:对于同一个机架,当看到两个连续的高温事件(超过阈值)就产生一个温度报警(TemperatureWarning),Pattern API 提供了 next 调用方法,来添加事件到模式定义中。next 添加的事件发生时间必须紧跟着第一个匹配事件之后,才能触发整个模式的匹配。
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD) .next("Second Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD) .within(Time.seconds(10));
最后模式的定义包含有一个 within 的 API 调用,用来定义两个连续 TemperatureEvents 必须在 10s 内发生才能匹配。时间基于 time characteristic 设置,可以是:处理时间、输入时间或者事件时间。(译者注 Event Time / Processing Time / Ingestion Time 解释)
定义好事件模型之后,可以将其应用到输入数据流中。
PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern( inputEventStream.keyBy("rackID"), warningPattern);
由于告警是针对单个机架的告警,必须使用 keyBy 通过 rackID 字段对输入事件流分流。即匹配出的事件都是同一个机架的。
PatternStream<MonitoringEvent> 可以访问匹配的事件序列。通过使用 select API 可以访问其上数据,给 select API 传入 PatternSelectFunction,PatternSelectFunction 会在每一个匹配上的事件序列上执行。事件序列通过 Map<String, MonitoringEvent> 访问,MonitoringEvent 通过之前分配的事件名称来定位。这里我们通过 select function 针对每一个匹配的模式产生一个 TemperatureWarning 事件。
public class TemperatureWarning { private int rackID; private double averageTemperature; ... } DataStream<TemperatureWarning> warnings = tempPatternStream.select( (Map<String, MonitoringEvent> pattern) -> { TemperatureEvent first = (TemperatureEvent) pattern.get("First Event"); TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event"); return new TemperatureWarning( first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2); } );
现在我们从原始监控事件流(monitoring event stream)生成了一个复杂事件流 DataStream<TemperatureWarning> 警告。这个复杂事件流可以再次被用作其他复杂事件处理的输入。当同一个机架产生两个连续升温警告时,我们使用 TemperatureWarnings 来生成 TemperatureAlerts。TemperatureAlerts 定义如下:
public class TemperatureAlert { private int rackID; ... }
首先定义报警事件
Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("First Event") .next("Second Event") .within(Time.seconds(20));
定义描述了在 20s 内有两个 TemperatureWarnings 事件,并且第一个事件名称为 "First Event",紧接着的第二个为 “Second Event”。这来了个事件都没有 where 语句,因为需要访问两个事件才能判断温度时候增长。因此,下面我们需要在 select 语句中使用 filter 条件来提取。这里我们只是生成了 PatternStream。
PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern( warnings.keyBy("rackID"), alertPattern);
同样,我们需要 keyBy 对输入的告警数据流针对同一个机架进行分流。然后使用 flatSelect 方法访问匹配的事件序列,当判断温度上升时生成 TemperatureAlert 告警。
DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect( (Map<String, TemperatureWarning> pattern, Collector<TemperatureAlert> out) -> { TemperatureWarning first = pattern.get("First Event"); TemperatureWarning second = pattern.get("Second Event"); if (first.getAverageTemperature() < second.getAverageTemperature()) { out.collect(new TemperatureAlert(first.getRackID())); } });
DataStream<TemperatureAlert> 告警是针对同一个机架的数据流,基于这个数据我们现在可以调整负载和降温。源代码地址(译者注:注意阅读 readme)
本文描述了使用 Flink CEP library 可以很容易处理事件流。我们通过数据中心的监控和报警案例,完成了服务器机架过热报警的小程序。
未来 Flink 社区会持续扩展 CEP library 的功能和表述能力。接下来的 road map 是支持类正则表达式的模式实现,包括 *, 上下限制和否定。此外,还计划允许 where 语句访问之前匹配的事件字段。这个特性可以让我们提前删除不需要的事件序列。
本内容为译者添加