跳转至

实时数据流处理:Kafka Streams与Flink实战

学习目标

完成本教程后,你将能够:

  • 理解流处理的核心概念和应用场景
  • 掌握Kafka Streams的架构和编程模型
  • 掌握Apache Flink的核心组件和API
  • 实现流式数据转换和聚合
  • 掌握窗口操作(滚动窗口、滑动窗口、会话窗口)
  • 处理事件时间和水印机制
  • 实现有状态的流处理应用
  • 构建完整的实时数据分析系统

前置要求

在开始本教程之前,你需要:

知识要求: - 熟悉Java或Python编程语言 - 了解Kafka的基本概念和使用 - 理解大数据处理的基本原理 - 了解分布式系统的基本概念

技能要求: - 能够使用命令行工具 - 会使用Maven或Gradle构建工具 - 了解Docker基础知识 - 能够阅读和理解技术文档

概述

实时数据流处理是现代数据架构的核心组件,它能够对持续产生的数据流进行实时分析和处理。与批处理不同,流处理关注的是数据的连续性和实时性。

为什么需要流处理?

在物联网、金融交易、社交媒体等场景中,数据以流的形式持续产生,传统的批处理方式存在以下问题:

  • 延迟高: 需要等待批次完成才能看到结果
  • 资源浪费: 需要存储大量中间数据
  • 实时性差: 无法及时响应业务需求
  • 复杂度高: 需要维护批处理和实时处理两套系统

流处理通过以下方式解决这些问题:

  • 低延迟: 毫秒级到秒级的处理延迟
  • 持续处理: 数据到达即处理,无需等待
  • 资源高效: 增量计算,减少存储需求
  • 统一架构: 一套系统同时支持实时和批处理

流处理的应用场景:

  • 实时监控和告警
  • 欺诈检测
  • 实时推荐系统
  • IoT数据处理
  • 实时数据分析
  • 日志分析和聚合

背景知识

流处理 vs 批处理

批处理(Batch Processing)

批处理是对有界数据集进行一次性处理:

数据源 → 收集数据 → 批量处理 → 输出结果
         (小时/天)    (分钟/小时)

特点: - 处理有界数据集 - 延迟较高(分钟到小时) - 吞吐量大 - 适合历史数据分析

示例: 每天凌晨处理前一天的订单数据,生成销售报表

流处理(Stream Processing)

流处理是对无界数据流进行持续处理:

数据源 → 实时处理 → 输出结果
         (毫秒/秒)

特点: - 处理无界数据流 - 延迟低(毫秒到秒) - 持续运行 - 适合实时分析

示例: 实时监控网站访问量,异常流量立即告警

流处理的核心概念

1. 数据流(Stream)

数据流是按时间顺序排列的无界数据序列:

事件1 → 事件2 → 事件3 → 事件4 → ...
  ↓       ↓       ↓       ↓
时间戳  时间戳  时间戳  时间戳

特性: - 无界性: 数据持续产生,没有结束 - 有序性: 事件按时间顺序排列 - 不可变性: 已产生的事件不可修改

2. 事件时间 vs 处理时间

事件时间(Event Time): - 事件实际发生的时间 - 由数据源生成 - 用于准确的时间窗口计算

处理时间(Processing Time): - 事件被处理的时间 - 由处理系统记录 - 简单但可能不准确

事件发生 → 网络传输 → 到达系统 → 开始处理
  ↓                                  ↓
事件时间                          处理时间

3. 窗口(Window)

窗口将无界流切分为有界的数据集进行处理:

滚动窗口(Tumbling Window):

[0-5秒] [5-10秒] [10-15秒] [15-20秒]
- 固定大小,不重叠 - 每个事件只属于一个窗口

滑动窗口(Sliding Window):

[0-5秒]
  [2-7秒]
    [4-9秒]
      [6-11秒]
- 固定大小,可重叠 - 每个事件可能属于多个窗口

会话窗口(Session Window):

[事件1-事件3] [间隔] [事件4-事件6] [间隔] [事件7-事件9]
- 动态大小,基于活动间隔 - 适合用户会话分析

4. 水印(Watermark)

水印用于处理乱序事件和延迟数据:

时间线: 0 → 1 → 2 → 3 → 4 → 5 → 6
事件:   1   3   2   5   4   7   6
水印:       1       3       5

作用: - 标记事件时间的进度 - 触发窗口计算 - 处理延迟数据

5. 状态(State)

流处理中的状态是跨事件保存的信息:

无状态操作: 每个事件独立处理

# 过滤操作
stream.filter(lambda x: x > 10)

有状态操作: 需要记住之前的信息

# 计数操作
stream.count()  # 需要保存当前计数

状态类型: - 键控状态(Keyed State): 按key分组的状态 - 算子状态(Operator State): 算子级别的状态

准备工作

环境准备

本教程将使用Docker来快速搭建开发环境。

软件要求: - Docker Desktop 或 Docker Engine - Java 11+ (用于Kafka Streams) - Python 3.8+ (用于PyFlink) - Maven 3.6+ 或 Gradle 7+ - IDE (IntelliJ IDEA 或 VS Code)

安装Docker

如果还没有安装Docker,请参考官方文档: - Windows/Mac: Docker Desktop - Linux: Docker Engine

验证Docker安装:

docker --version
docker-compose --version

创建项目目录

mkdir stream-processing-tutorial
cd stream-processing-tutorial

启动Kafka集群

创建 docker-compose.yml 文件:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

启动服务:

docker-compose up -d

验证Kafka运行:

docker-compose ps
docker-compose logs kafka

步骤1: Kafka Streams入门

1.1 Kafka Streams简介

Kafka Streams是一个用于构建流处理应用的客户端库,它具有以下特点:

核心优势: - 简单: 作为库使用,无需独立集群 - 轻量: 不需要额外的基础设施 - 可扩展: 通过增加应用实例实现扩展 - 容错: 自动故障恢复和状态管理 - 精确一次: 支持精确一次语义

架构特点:

Kafka Topic → Kafka Streams应用 → Kafka Topic
              (作为普通应用运行)

1.2 创建Maven项目

创建 pom.xml 文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-streams-tutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <kafka.version>3.5.0</kafka.version>
    </properties>

    <dependencies>
        <!-- Kafka Streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <!-- JSON处理 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.0</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.7</version>
        </dependency>
    </dependencies>
</project>

1.3 第一个Kafka Streams应用

创建 src/main/java/com/example/WordCountStream.java:

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class WordCountStream {

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());

        // 构建流处理拓扑
        StreamsBuilder builder = new StreamsBuilder();

        // 1. 从input-topic读取数据
        KStream<String, String> textLines = 
            builder.stream("input-topic");

        // 2. 分词并计数
        KTable<String, Long> wordCounts = textLines
            // 将每行文本转换为小写
            .mapValues(value -> value.toLowerCase())
            // 分词
            .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
            // 按单词分组
            .groupBy((key, word) -> word)
            // 计数
            .count();

        // 3. 输出到output-topic
        wordCounts.toStream()
            .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        // 启动流处理应用
        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        // 启动
        streams.start();

        System.out.println("Word Count Stream started...");
    }
}

代码说明: - APPLICATION_ID_CONFIG: 应用ID,用于消费者组和状态存储 - stream(): 从Topic创建KStream - mapValues(): 转换值 - flatMapValues(): 一对多转换 - groupBy(): 按key分组 - count(): 计数聚合 - to(): 输出到Topic

1.4 测试应用

编译项目:

mvn clean package

启动应用:

java -cp target/kafka-streams-tutorial-1.0-SNAPSHOT.jar \
     com.example.WordCountStream

创建测试Topic:

# 创建输入Topic
docker exec -it <kafka-container-id> kafka-topics \
  --create --topic input-topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

# 创建输出Topic
docker exec -it <kafka-container-id> kafka-topics \
  --create --topic output-topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

发送测试数据:

# 启动生产者
docker exec -it <kafka-container-id> kafka-console-producer \
  --topic input-topic \
  --bootstrap-server localhost:9092

# 输入以下文本(每行回车)
hello world
hello kafka streams
kafka is awesome

查看结果:

# 启动消费者
docker exec -it <kafka-container-id> kafka-console-consumer \
  --topic output-topic \
  --bootstrap-server localhost:9092 \
  --from-beginning \
  --property print.key=true \
  --property key.separator=:

# 预期输出:
# hello:2
# world:1
# kafka:2
# streams:1
# is:1
# awesome:1

步骤2: Kafka Streams高级操作

2.1 窗口操作

创建 src/main/java/com/example/WindowedWordCount.java:

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class WindowedWordCount {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-word-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = 
            builder.stream("input-topic");

        // 使用滚动窗口(5秒)
        KTable<Windowed<String>, Long> windowedCounts = textLines
            .mapValues(value -> value.toLowerCase())
            .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
            .groupBy((key, word) -> word)
            // 5秒滚动窗口
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5)))
            .count();

        // 输出窗口结果
        windowedCounts.toStream()
            .map((windowedKey, count) -> {
                String key = windowedKey.key();
                long start = windowedKey.window().start();
                long end = windowedKey.window().end();
                String value = String.format(
                    "word=%s, count=%d, window=[%d-%d]", 
                    key, count, start, end
                );
                return new KeyValue<>(key, value);
            })
            .to("windowed-output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();

        System.out.println("Windowed Word Count started...");
    }
}

窗口类型:

// 1. 滚动窗口(Tumbling Window)
TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5))

// 2. 滑动窗口(Sliding Window)
TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10))
           .advanceBy(Duration.ofSeconds(5))

// 3. 会话窗口(Session Window)
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))

2.2 流连接(Join)

创建 src/main/java/com/example/StreamJoinExample.java:

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class StreamJoinExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-join-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 订单流
        KStream<String, String> orders = 
            builder.stream("orders-topic");

        // 支付流
        KStream<String, String> payments = 
            builder.stream("payments-topic");

        // 连接两个流(基于订单ID)
        KStream<String, String> joined = orders.join(
            payments,
            // 连接函数
            (orderValue, paymentValue) -> 
                "Order: " + orderValue + ", Payment: " + paymentValue,
            // 时间窗口(5分钟内的事件可以连接)
            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
        );

        // 输出连接结果
        joined.to("joined-output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();

        System.out.println("Stream Join started...");
    }
}

Join类型:

// 1. Inner Join: 两边都有数据才输出
stream1.join(stream2, joiner, joinWindow)

// 2. Left Join: 左边有数据就输出
stream1.leftJoin(stream2, joiner, joinWindow)

// 3. Outer Join: 任意一边有数据就输出
stream1.outerJoin(stream2, joiner, joinWindow)

2.3 状态存储

创建 src/main/java/com/example/StatefulProcessing.java:

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;

public class StatefulProcessing {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateful-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 创建状态存储
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder = 
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("user-visit-count"),
                Serdes.String(),
                Serdes.Long()
            );

        builder.addStateStore(storeBuilder);

        // 处理用户访问事件
        KStream<String, String> visits = 
            builder.stream("user-visits-topic");

        visits.process(
            () -> new Processor<String, String, String, String>() {
                private KeyValueStore<String, Long> stateStore;

                @Override
                public void init(ProcessorContext<String, String> context) {
                    this.stateStore = context.getStateStore("user-visit-count");
                }

                @Override
                public void process(Record<String, String> record) {
                    String userId = record.key();

                    // 获取当前计数
                    Long count = stateStore.get(userId);
                    if (count == null) {
                        count = 0L;
                    }

                    // 增加计数
                    count++;
                    stateStore.put(userId, count);

                    System.out.println("User " + userId + " visited " + count + " times");
                }
            },
            "user-visit-count"
        );

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();

        System.out.println("Stateful Processing started...");
    }
}

Apache Flink是一个分布式流处理框架,专为高吞吐、低延迟的流处理而设计。

核心特性: - 真正的流处理: 原生流处理引擎,不是微批处理 - 精确一次: 支持精确一次状态一致性 - 事件时间: 原生支持事件时间和水印 - 低延迟: 毫秒级延迟 - 高吞吐: 每秒处理数百万事件 - 有状态: 强大的状态管理能力

Flink vs Kafka Streams:

特性 Flink Kafka Streams
部署方式 独立集群 作为库使用
复杂度 较高 较低
功能 更丰富 基础功能
性能 更高 中等
适用场景 复杂流处理 简单流处理

使用Docker启动Flink:

# 下载Flink镜像
docker pull flink:latest

# 启动JobManager
docker run -d --name flink-jobmanager \
  -p 8081:8081 \
  flink:latest jobmanager

# 启动TaskManager
docker run -d --name flink-taskmanager \
  --link flink-jobmanager:jobmanager \
  flink:latest taskmanager

访问Flink Web UI: http://localhost:8081

安装PyFlink:

pip install apache-flink

创建 flink_word_count.py:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def word_count():
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # 创建表环境
    settings = EnvironmentSettings.new_instance() \
        .in_streaming_mode() \
        .build()
    table_env = StreamTableEnvironment.create(env, settings)

    # 创建输入表(从Kafka读取)
    table_env.execute_sql("""
        CREATE TABLE input_table (
            word STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'input-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'flink-consumer',
            'format' = 'csv',
            'scan.startup.mode' = 'earliest-offset'
        )
    """)

    # 创建输出表(写入Kafka)
    table_env.execute_sql("""
        CREATE TABLE output_table (
            word STRING,
            count BIGINT
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'output-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'format' = 'csv'
        )
    """)

    # 执行词频统计
    table_env.execute_sql("""
        INSERT INTO output_table
        SELECT word, COUNT(*) as count
        FROM input_table
        GROUP BY word
    """)

if __name__ == '__main__':
    word_count()

创建 flink_datastream.py:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json

def process_sensor_data():
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(2)

    # 配置Kafka消费者
    kafka_props = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'sensor-processor'
    }

    # 从Kafka读取传感器数据
    kafka_consumer = FlinkKafkaConsumer(
        topics='sensor-data',
        deserialization_schema=SimpleStringSchema(),
        properties=kafka_props
    )

    # 创建数据流
    sensor_stream = env.add_source(kafka_consumer)

    # 数据转换和处理
    def parse_sensor_data(data_str):
        try:
            data = json.loads(data_str)
            return (
                data['sensor_id'],
                data['temperature'],
                data['timestamp']
            )
        except:
            return None

    # 解析JSON数据
    parsed_stream = sensor_stream \
        .map(parse_sensor_data, output_type=Types.TUPLE([
            Types.STRING(),
            Types.FLOAT(),
            Types.LONG()
        ])) \
        .filter(lambda x: x is not None)

    # 过滤异常温度
    def filter_abnormal_temp(data):
        sensor_id, temperature, timestamp = data
        return temperature > 80 or temperature < -20

    abnormal_stream = parsed_stream.filter(filter_abnormal_temp)

    # 转换为告警消息
    def create_alert(data):
        sensor_id, temperature, timestamp = data
        alert = {
            'sensor_id': sensor_id,
            'temperature': temperature,
            'timestamp': timestamp,
            'alert_type': 'TEMPERATURE_ABNORMAL'
        }
        return json.dumps(alert)

    alert_stream = abnormal_stream.map(
        create_alert,
        output_type=Types.STRING()
    )

    # 输出到Kafka
    kafka_producer = FlinkKafkaProducer(
        topic='sensor-alerts',
        serialization_schema=SimpleStringSchema(),
        producer_config=kafka_props
    )

    alert_stream.add_sink(kafka_producer)

    # 执行任务
    env.execute("Sensor Data Processing")

if __name__ == '__main__':
    process_sensor_data()

创建 flink_windowing.py:

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.window import TumblingEventTimeWindows, \
    SlidingEventTimeWindows, EventTimeSessionWindows
from pyflink.common.time import Time
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
import json

def windowed_aggregation():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # 模拟数据源
    data = [
        ('user1', 'click', 1000),
        ('user1', 'click', 2000),
        ('user2', 'click', 3000),
        ('user1', 'click', 4000),
        ('user2', 'click', 5000),
        ('user1', 'click', 11000),
    ]

    # 创建数据流
    stream = env.from_collection(
        data,
        type_info=Types.TUPLE([Types.STRING(), Types.STRING(), Types.LONG()])
    )

    # 设置水印策略
    watermark_strategy = WatermarkStrategy \
        .for_monotonous_timestamps() \
        .with_timestamp_assigner(lambda event, timestamp: event[2])

    stream_with_watermark = stream.assign_timestamps_and_watermarks(
        watermark_strategy
    )

    # 按用户分组
    keyed_stream = stream_with_watermark.key_by(lambda x: x[0])

    # 滚动窗口(5秒)
    windowed_stream = keyed_stream.window(
        TumblingEventTimeWindows.of(Time.seconds(5))
    )

    # 聚合:计算每个用户在窗口内的点击次数
    def count_clicks(key, window, elements):
        count = len(list(elements))
        window_start = window.start
        window_end = window.end
        return (key, count, window_start, window_end)

    result = windowed_stream.apply(
        count_clicks,
        output_type=Types.TUPLE([
            Types.STRING(),
            Types.INT(),
            Types.LONG(),
            Types.LONG()
        ])
    )

    # 打印结果
    result.print()

    # 执行
    env.execute("Windowed Aggregation")

if __name__ == '__main__':
    windowed_aggregation()

窗口类型示例:

# 1. 滚动窗口(Tumbling Window)
TumblingEventTimeWindows.of(Time.seconds(5))

# 2. 滑动窗口(Sliding Window)
SlidingEventTimeWindows.of(
    Time.seconds(10),  # 窗口大小
    Time.seconds(5)    # 滑动步长
)

# 3. 会话窗口(Session Window)
EventTimeSessionWindows.with_gap(Time.minutes(5))

步骤4: 实战案例 - IoT实时监控系统

4.1 系统架构

我们将构建一个完整的IoT实时监控系统:

IoT设备 → Kafka → Flink处理 → 告警/存储
                 实时统计
                 可视化展示

功能需求: - 实时接收传感器数据 - 检测异常温度并告警 - 计算每分钟的平均温度 - 统计设备在线状态 - 实时展示监控数据

4.2 数据生成器

创建 iot_data_generator.py:

from kafka import KafkaProducer
import json
import time
import random
from datetime import datetime

class IoTDataGenerator:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='localhost:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

        self.sensor_ids = [f'sensor_{i:03d}' for i in range(1, 11)]

    def generate_sensor_data(self, sensor_id):
        """生成传感器数据"""
        # 正常温度范围: 20-30度
        # 10%概率产生异常温度
        if random.random() < 0.1:
            temperature = random.choice([
                random.uniform(-30, -10),  # 过低
                random.uniform(80, 100)    # 过高
            ])
        else:
            temperature = random.uniform(20, 30)

        data = {
            'sensor_id': sensor_id,
            'temperature': round(temperature, 2),
            'humidity': round(random.uniform(40, 60), 2),
            'pressure': round(random.uniform(1000, 1020), 2),
            'timestamp': int(time.time() * 1000),
            'location': f'Building-{sensor_id[-1]}'
        }

        return data

    def start_generating(self, interval=1):
        """开始生成数据"""
        print("开始生成IoT数据...")

        try:
            while True:
                for sensor_id in self.sensor_ids:
                    data = self.generate_sensor_data(sensor_id)

                    # 发送到Kafka
                    self.producer.send('iot-sensor-data', value=data)

                    print(f"[{datetime.now()}] 发送数据: {sensor_id}, "
                          f"温度: {data['temperature']}°C")

                time.sleep(interval)

        except KeyboardInterrupt:
            print("\n停止生成数据")
        finally:
            self.producer.close()

if __name__ == '__main__':
    generator = IoTDataGenerator()
    generator.start_generating(interval=2)

4.3 实时异常检测

创建 iot_anomaly_detection.py:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import MapFunction, FilterFunction
import json

class SensorDataParser(MapFunction):
    """解析传感器数据"""

    def map(self, value):
        try:
            data = json.loads(value)
            return (
                data['sensor_id'],
                data['temperature'],
                data['humidity'],
                data['timestamp'],
                data['location']
            )
        except Exception as e:
            print(f"解析错误: {e}")
            return None

class AnomalyDetector(FilterFunction):
    """异常检测"""

    def __init__(self, temp_min=-20, temp_max=80):
        self.temp_min = temp_min
        self.temp_max = temp_max

    def filter(self, value):
        if value is None:
            return False

        sensor_id, temperature, humidity, timestamp, location = value

        # 检测温度异常
        return temperature < self.temp_min or temperature > self.temp_max

class AlertGenerator(MapFunction):
    """生成告警消息"""

    def map(self, value):
        sensor_id, temperature, humidity, timestamp, location = value

        alert_type = 'HIGH_TEMP' if temperature > 80 else 'LOW_TEMP'

        alert = {
            'alert_id': f"{sensor_id}_{timestamp}",
            'sensor_id': sensor_id,
            'alert_type': alert_type,
            'temperature': temperature,
            'location': location,
            'timestamp': timestamp,
            'severity': 'CRITICAL' if abs(temperature) > 90 else 'WARNING'
        }

        return json.dumps(alert)

def run_anomaly_detection():
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(2)

    # Kafka配置
    kafka_props = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'anomaly-detector'
    }

    # 创建Kafka消费者
    kafka_consumer = FlinkKafkaConsumer(
        topics='iot-sensor-data',
        deserialization_schema=SimpleStringSchema(),
        properties=kafka_props
    )

    # 从最新位置开始消费
    kafka_consumer.set_start_from_latest()

    # 创建数据流
    sensor_stream = env.add_source(kafka_consumer)

    # 数据处理流程
    alert_stream = sensor_stream \
        .map(SensorDataParser(), output_type=Types.TUPLE([
            Types.STRING(),  # sensor_id
            Types.FLOAT(),   # temperature
            Types.FLOAT(),   # humidity
            Types.LONG(),    # timestamp
            Types.STRING()   # location
        ])) \
        .filter(AnomalyDetector()) \
        .map(AlertGenerator(), output_type=Types.STRING())

    # 打印告警
    alert_stream.print()

    # 创建Kafka生产者
    kafka_producer = FlinkKafkaProducer(
        topic='iot-alerts',
        serialization_schema=SimpleStringSchema(),
        producer_config=kafka_props
    )

    # 输出到Kafka
    alert_stream.add_sink(kafka_producer)

    # 执行任务
    env.execute("IoT Anomaly Detection")

if __name__ == '__main__':
    run_anomaly_detection()

4.4 实时统计分析

创建 iot_statistics.py:

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import AggregateFunction, WindowFunction
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
import json

class SensorDataTimestampAssigner(TimestampAssigner):
    """时间戳分配器"""

    def extract_timestamp(self, value, record_timestamp):
        try:
            data = json.loads(value)
            return data['timestamp']
        except:
            return record_timestamp

class TemperatureAggregator(AggregateFunction):
    """温度聚合器"""

    def create_accumulator(self):
        return (0.0, 0)  # (sum, count)

    def add(self, value, accumulator):
        data = json.loads(value)
        temp = data['temperature']
        return (accumulator[0] + temp, accumulator[1] + 1)

    def get_result(self, accumulator):
        if accumulator[1] == 0:
            return 0.0
        return accumulator[0] / accumulator[1]

    def merge(self, acc1, acc2):
        return (acc1[0] + acc2[0], acc1[1] + acc2[1])

class StatisticsWindowFunction(WindowFunction):
    """窗口统计函数"""

    def apply(self, key, window, inputs):
        avg_temp = list(inputs)[0]

        result = {
            'sensor_id': key,
            'avg_temperature': round(avg_temp, 2),
            'window_start': window.start,
            'window_end': window.end,
            'window_duration': (window.end - window.start) / 1000
        }

        return json.dumps(result)

def run_statistics():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(2)

    # Kafka配置
    kafka_props = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'statistics-processor'
    }

    # 创建Kafka消费者
    kafka_consumer = FlinkKafkaConsumer(
        topics='iot-sensor-data',
        deserialization_schema=SimpleStringSchema(),
        properties=kafka_props
    )

    # 设置水印策略
    watermark_strategy = WatermarkStrategy \
        .for_bounded_out_of_orderness(Time.seconds(5)) \
        .with_timestamp_assigner(SensorDataTimestampAssigner())

    # 创建数据流
    sensor_stream = env.add_source(kafka_consumer) \
        .assign_timestamps_and_watermarks(watermark_strategy)

    # 提取sensor_id作为key
    def extract_sensor_id(value):
        data = json.loads(value)
        return data['sensor_id']

    # 按传感器分组并计算1分钟滚动窗口的平均温度
    statistics_stream = sensor_stream \
        .key_by(extract_sensor_id, key_type=Types.STRING()) \
        .window(TumblingEventTimeWindows.of(Time.minutes(1))) \
        .aggregate(
            TemperatureAggregator(),
            StatisticsWindowFunction(),
            accumulator_type=Types.TUPLE([Types.FLOAT(), Types.INT()]),
            output_type=Types.STRING()
        )

    # 打印统计结果
    statistics_stream.print()

    # 执行任务
    env.execute("IoT Statistics")

if __name__ == '__main__':
    run_statistics()

4.5 运行完整系统

步骤1: 启动Kafka

docker-compose up -d

步骤2: 创建Topic

# 创建传感器数据Topic
docker exec -it <kafka-container> kafka-topics \
  --create --topic iot-sensor-data \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1

# 创建告警Topic
docker exec -it <kafka-container> kafka-topics \
  --create --topic iot-alerts \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1

步骤3: 启动数据生成器

python iot_data_generator.py

步骤4: 启动异常检测

python iot_anomaly_detection.py

步骤5: 启动统计分析

python iot_statistics.py

步骤6: 查看告警

docker exec -it <kafka-container> kafka-console-consumer \
  --topic iot-alerts \
  --bootstrap-server localhost:9092 \
  --from-beginning

预期结果: - 数据生成器持续产生传感器数据 - 异常检测实时发现温度异常并告警 - 统计分析每分钟输出平均温度 - 所有组件独立运行,可以随时启停

步骤5: 高级特性

5.1 精确一次语义

Kafka Streams精确一次:

Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
          StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

Flink精确一次:

# 启用检查点
env.enable_checkpointing(60000)  # 每60秒一次检查点

# 设置检查点模式
from pyflink.datastream.checkpointing_mode import CheckpointingMode
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.EXACTLY_ONCE
)

# 设置最小间隔
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)

# 设置超时时间
env.get_checkpoint_config().set_checkpoint_timeout(600000)

5.2 状态后端配置

Flink状态后端:

from pyflink.datastream.state_backend import HashMapStateBackend, \
    EmbeddedRocksDBStateBackend

# 使用RocksDB状态后端(适合大状态)
state_backend = EmbeddedRocksDBStateBackend()
env.set_state_backend(state_backend)

# 设置检查点存储
env.get_checkpoint_config().set_checkpoint_storage(
    "file:///tmp/flink-checkpoints"
)

5.3 水印和延迟数据处理

from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Time

# 创建水印策略
watermark_strategy = WatermarkStrategy \
    .for_bounded_out_of_orderness(Time.seconds(10)) \
    .with_timestamp_assigner(lambda event: event['timestamp']) \
    .with_idleness(Time.minutes(1))

stream_with_watermark = stream.assign_timestamps_and_watermarks(
    watermark_strategy
)

# 处理延迟数据
from pyflink.datastream.output_tag import OutputTag

# 定义侧输出标签
late_data_tag = OutputTag("late-data", Types.STRING())

# 窗口操作时指定侧输出
windowed_stream = keyed_stream \
    .window(TumblingEventTimeWindows.of(Time.minutes(1))) \
    .allowed_lateness(Time.minutes(5)) \
    .side_output_late_data(late_data_tag)

# 获取延迟数据
late_data_stream = windowed_stream.get_side_output(late_data_tag)

5.4 自定义函数

Flink自定义ProcessFunction:

from pyflink.datastream.functions import ProcessFunction
from pyflink.datastream.functions import RuntimeContext

class CustomProcessor(ProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        # 初始化状态
        from pyflink.datastream.state import ValueStateDescriptor
        from pyflink.common.typeinfo import Types

        state_descriptor = ValueStateDescriptor(
            "my-state",
            Types.LONG()
        )
        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value, ctx):
        # 获取当前状态
        current_count = self.state.value()
        if current_count is None:
            current_count = 0

        # 更新状态
        current_count += 1
        self.state.update(current_count)

        # 输出结果
        yield (value, current_count)

    def on_timer(self, timestamp, ctx):
        # 定时器触发时的处理
        pass

# 使用自定义函数
result = stream.key_by(lambda x: x[0]) \
    .process(CustomProcessor())

5.5 性能优化

Kafka Streams优化:

Properties props = new Properties();

// 增加缓冲区大小
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 1000);

// 设置提交间隔
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

// 启用缓存
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);

// 设置线程数
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

Flink优化:

# 设置并行度
env.set_parallelism(4)

# 设置缓冲超时
env.set_buffer_timeout(100)

# 禁用算子链(用于调试)
env.disable_operator_chaining()

# 设置重启策略
from pyflink.common.restart_strategy import RestartStrategies

env.set_restart_strategy(
    RestartStrategies.fixed_delay_restart(
        3,  # 重启次数
        10000  # 延迟时间(毫秒)
    )
)

故障排除

问题1: Kafka连接失败

现象:

Failed to connect to Kafka broker

解决方法:

# 检查Kafka状态
docker-compose ps

# 查看Kafka日志
docker-compose logs kafka

# 测试连接
telnet localhost 9092

# 检查Topic
docker exec -it <kafka-container> kafka-topics \
  --list --bootstrap-server localhost:9092

现象:

Job execution failed

解决方法:

# 查看Flink日志
docker logs flink-jobmanager
docker logs flink-taskmanager

# 检查Web UI
# http://localhost:8081

# 增加日志级别
import logging
logging.basicConfig(level=logging.DEBUG)

问题3: 数据延迟

现象: 处理延迟过高,数据堆积

可能原因: - 并行度不足 - 资源不足 - 窗口配置不当 - 状态过大

解决方法:

# 增加并行度
env.set_parallelism(8)

# 调整窗口大小
TumblingEventTimeWindows.of(Time.seconds(30))  # 增大窗口

# 使用RocksDB状态后端
state_backend = EmbeddedRocksDBStateBackend()
env.set_state_backend(state_backend)

# 启用增量检查点
env.get_checkpoint_config().enable_incremental_checkpointing(True)

问题4: 内存溢出

现象:

OutOfMemoryError: Java heap space

解决方法:

# 增加JVM内存
export JAVA_OPTS="-Xmx4g -Xms4g"

# 或在Flink配置中设置
# flink-conf.yaml
taskmanager.memory.process.size: 4096m
jobmanager.memory.process.size: 2048m

问题5: 检查点失败

现象:

Checkpoint expired before completing

解决方法:

# 增加检查点超时时间
env.get_checkpoint_config().set_checkpoint_timeout(600000)

# 增加检查点间隔
env.enable_checkpointing(120000)

# 允许更多并发检查点
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

监控和运维

Kafka Streams监控

// 添加状态监听器
streams.setStateListener((newState, oldState) -> {
    System.out.println("State changed from " + oldState + " to " + newState);
});

// 获取指标
StreamsMetrics metrics = streams.metrics();
for (Metric metric : metrics.values()) {
    System.out.println(metric.metricName() + ": " + metric.metricValue());
}

使用Flink Web UI: - 访问 http://localhost:8081 - 查看任务状态、吞吐量、延迟等指标 - 查看检查点历史 - 查看任务拓扑

使用Metrics Reporter:

# 配置Prometheus Reporter
# flink-conf.yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260

日志配置

Kafka Streams日志:

<!-- log4j.properties -->
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.kafka.streams=DEBUG

Flink日志:

import logging

# 配置日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 设置特定模块的日志级别
logging.getLogger('pyflink').setLevel(logging.DEBUG)

总结

通过本教程,你学习了:

  • ✅ 流处理的核心概念和应用场景
  • ✅ Kafka Streams的基本使用和高级特性
  • ✅ Apache Flink的架构和编程模型
  • ✅ 窗口操作(滚动、滑动、会话窗口)
  • ✅ 事件时间和水印机制
  • ✅ 有状态流处理和状态管理
  • ✅ 实战案例:IoT实时监控系统
  • ✅ 性能优化和故障排除

关键要点:

  1. 选择合适的框架
  2. Kafka Streams: 简单场景,与Kafka深度集成
  3. Flink: 复杂场景,需要高性能和丰富功能

  4. 理解核心概念

  5. 事件时间 vs 处理时间
  6. 窗口操作的选择
  7. 水印和延迟数据处理
  8. 状态管理

  9. 保证数据一致性

  10. 精确一次语义
  11. 检查点机制
  12. 状态恢复

  13. 性能优化

  14. 合理设置并行度
  15. 选择合适的状态后端
  16. 调整窗口大小
  17. 监控和调优

进阶挑战

尝试以下挑战来巩固学习:

  1. 挑战1: 实现复杂事件处理(CEP)
  2. 使用Flink CEP检测特定事件模式
  3. 应用场景:欺诈检测、异常行为识别

  4. 挑战2: 实现流表连接

  5. 将实时流与维度表连接
  6. 实现动态维度表更新

  7. 挑战3: 实现多流连接

  8. 连接多个数据流
  9. 处理不同速率的流

  10. 挑战4: 实现自定义窗口

  11. 创建自定义窗口分配器
  12. 实现业务相关的窗口逻辑

  13. 挑战5: 实现端到端监控

  14. 集成Prometheus和Grafana
  15. 创建实时监控仪表板

完整代码

完整的项目代码可以在这里下载:

git clone https://github.com/embedded-platform/stream-processing-tutorial.git
cd stream-processing-tutorial

项目结构:

stream-processing-tutorial/
├── kafka-streams/
│   ├── pom.xml
│   └── src/main/java/com/example/
│       ├── WordCountStream.java
│       ├── WindowedWordCount.java
│       ├── StreamJoinExample.java
│       └── StatefulProcessing.java
├── flink/
│   ├── flink_word_count.py
│   ├── flink_datastream.py
│   ├── flink_windowing.py
│   └── requirements.txt
├── iot-system/
│   ├── iot_data_generator.py
│   ├── iot_anomaly_detection.py
│   └── iot_statistics.py
├── docker-compose.yml
└── README.md

下一步

建议继续学习:

参考资料

官方文档

  1. Kafka Streams
  2. 官方文档
  3. 开发者指南
  4. API文档

  5. Apache Flink

  6. 官方文档
  7. PyFlink文档
  8. 概念和架构

推荐书籍

  1. 《Kafka Streams实战》- William P. Bejeck Jr.
  2. 《Flink基础教程》- Ellen Friedman & Kostas Tzoumas
  3. 《流式系统》- Tyler Akidau等
  4. 《设计数据密集型应用》- Martin Kleppmann

在线资源

  1. Confluent博客
  2. Flink Forward会议视频
  3. 流处理最佳实践
  4. 实时计算社区

相关技术

  • 流处理框架对比: Flink vs Spark Streaming vs Storm
  • 消息队列: Kafka、Pulsar、RabbitMQ
  • 时序数据库: InfluxDB、TimescaleDB、OpenTSDB
  • 可视化工具: Grafana、Kibana、Superset

实践建议

学习路径

  1. 基础阶段(1-2周)
  2. 理解流处理的基本概念
  3. 掌握Kafka Streams基础
  4. 完成简单的流处理示例

  5. 进阶阶段(2-3周)

  6. 学习窗口操作和状态管理
  7. 掌握Flink DataStream API
  8. 实现IoT监控系统案例

  9. 高级阶段(3-4周)

  10. 深入理解事件时间和水印
  11. 学习精确一次语义
  12. 处理生产环境问题

实践项目建议

  1. 实时日志分析系统
  2. 收集应用日志
  3. 实时统计错误率
  4. 异常日志告警

  5. 实时推荐系统

  6. 用户行为流处理
  7. 实时特征计算
  8. 推荐结果更新

  9. 金融风控系统

  10. 交易流实时监控
  11. 异常交易检测
  12. 风险评分计算

  13. 智能交通系统

  14. 车辆轨迹流处理
  15. 实时路况分析
  16. 拥堵预警

生产环境注意事项

  1. 高可用部署
  2. Kafka集群配置
  3. Flink HA配置
  4. 状态后端选择

  5. 监控和告警

  6. 监控处理延迟
  7. 监控检查点状态
  8. 设置告警阈值

  9. 容量规划

  10. 评估数据量和峰值
  11. 规划资源配置
  12. 预留扩展空间

  13. 安全配置

  14. 启用认证和授权
  15. 使用SSL/TLS加密
  16. 网络隔离

  17. 备份和恢复

  18. 定期备份检查点
  19. 制定灾难恢复计划
  20. 测试恢复流程

常见应用场景

1. 实时监控和告警

场景: 监控系统指标,异常时立即告警

# 监控CPU使用率
def monitor_cpu():
    stream = env.add_source(cpu_metrics_source)

    # 检测CPU使用率超过80%
    alerts = stream \
        .filter(lambda x: x['cpu_usage'] > 80) \
        .map(lambda x: create_alert(x))

    alerts.add_sink(alert_sink)

2. 实时数据聚合

场景: 实时统计网站访问量

// 统计每分钟的PV和UV
KStream<String, PageView> pageViews = builder.stream("page-views");

KTable<Windowed<String>, Long> pvCounts = pageViews
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .count();

3. 实时ETL

场景: 实时清洗和转换数据

# 数据清洗和转换
cleaned_stream = raw_stream \
    .filter(lambda x: validate(x)) \
    .map(lambda x: transform(x)) \
    .key_by(lambda x: x['user_id'])

4. 实时推荐

场景: 根据用户行为实时更新推荐

// 用户行为流
KStream<String, UserAction> actions = builder.stream("user-actions");

// 实时计算用户兴趣
KTable<String, UserProfile> profiles = actions
    .groupByKey()
    .aggregate(
        UserProfile::new,
        (key, action, profile) -> profile.update(action)
    );

5. 欺诈检测

场景: 实时检测异常交易

# 检测异常交易模式
def detect_fraud(transactions):
    # 计算用户最近1小时的交易金额
    windowed_sum = transactions \
        .key_by(lambda x: x['user_id']) \
        .window(TumblingEventTimeWindows.of(Time.hours(1))) \
        .sum('amount')

    # 检测异常
    fraud_alerts = windowed_sum \
        .filter(lambda x: x > threshold)

    return fraud_alerts

反馈: 如果你在学习过程中遇到问题,欢迎在评论区留言或提交Issue!

贡献: 欢迎提交Pull Request改进本教程。

许可: 本教程采用 CC BY-SA 4.0 许可协议。