首页 > Java > java教程 > 正文

在 Apache Flink 中消费带键 Kafka 记录的实践教程

心靈之曲
发布: 2025-11-05 17:43:14
原创
837人浏览过

在 Apache Flink 中消费带键 Kafka 记录的实践教程

本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。

1. 理解带键 Kafka 记录及其重要性

在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:

  • 消息顺序保证:同一个键的所有消息会被发送到同一个分区,从而保证了这些消息的消费顺序。
  • 状态管理:在 Flink 等流处理框架中,键是进行有状态操作(如聚合、连接)的基础。
  • 数据路由:消费者可以根据键来过滤或路由消息。

当使用 kafka-console-producer.sh 并指定 --property "parse.key=true" --property "key.separator=:" 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。

2. Flink KafkaSource 的默认行为与限制

Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。

以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.common.serialization.StringDeserializer;

public class FlinkValueOnlyKafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics("test3")
                .setGroupId("1")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();

        env.execute("Flink Value-Only Kafka Consumer");
    }
}
登录后复制

3. 自定义 KafkaRecordDeserializationSchema 读取带键记录

要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord<byte[], byte[]> 对象,该对象提供了对原始字节形式的键、值、时间戳、分区、偏移量以及头部信息的完全访问。

3.1 定义自定义反序列化器

首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3<String, String, Long> 对象中输出。

喵记多
喵记多

喵记多 - 自带助理的 AI 笔记

喵记多 27
查看详情 喵记多
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

import java.io.IOException;

/**
 * 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。
 * 输出类型为 Tuple3<Key, Value, Timestamp>
 */
public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema<Tuple3<String, String, Long>> {

    // transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化
    private transient StringDeserializer keyDeserializer;
    private transient StringDeserializer valueDeserializer;

    /**
     * 在反序列化器初始化时调用,用于设置内部状态。
     * 通常在这里初始化 Kafka 客户端的反序列化器。
     */
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        // 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器
        // 假设键和值都是字符串,使用 StringDeserializer
        keyDeserializer = new StringDeserializer();
        valueDeserializer = new StringDeserializer();
    }

    /**
     * 核心反序列化逻辑。
     *
     * @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。
     * @param out    用于收集反序列化结果的 Collector。
     * @throws IOException 如果反序列化过程中发生 I/O 错误。
     */
    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Tuple3<String, String, Long>> out) throws IOException {
        // 反序列化键
        String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null;
        // 反序列化值
        String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null;
        // 获取时间戳
        long timestamp = record.timestamp();

        // 将反序列化后的键、值和时间戳封装成 Tuple3 并发出
        out.collect(new Tuple3<>(key, value, timestamp));
    }

    /**
     * 返回此反序列化器生产的数据类型信息。
     * Flink 使用此信息进行类型检查和序列化。
     */
    @Override
    public TypeInformation<Tuple3<String, String, Long>> getProducedType() {
        // 使用 TypeHint 来获取泛型类型信息
        return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint<Tuple3<String, String, Long>>() {});
    }
}
登录后复制

注意事项:

  • open 方法:在反序列化器首次使用时调用,用于初始化资源。将 Kafka 客户端的反序列化器(如 StringDeserializer)放在这里初始化可以避免在每次 deserialize 调用时重复创建对象,提高效率。
  • deserialize 方法:这是核心逻辑所在。ConsumerRecord 提供了 key()、value()、timestamp()、topic()、partition()、offset() 和 headers() 等方法。您可以使用 Kafka 客户端提供的反序列化器(例如 StringDeserializer、LongDeserializer 或自定义的 Avro/Protobuf 反序列化器)来将 byte[] 转换为实际的数据类型。
  • getProducedType 方法:必须返回此反序列化器将发出的数据流的 TypeInformation。这对于 Flink 的类型系统至关重要。

3.2 在 Flink KafkaSource 中使用自定义反序列化器

接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class FlinkKeyedKafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址
        String topic = "test3";
        String groupId = "1";

        // 构建 KafkaSource,并指定我们自定义的反序列化器
        KafkaSource<Tuple3<String, String, Long>> source = KafkaSource.<Tuple3<String, String, Long>>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器
                .build();

        // 从 KafkaSource 创建数据流
        DataStream<Tuple3<String, String, Long>> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");

        // 对数据流进行操作,现在可以访问键、值和时间戳
        stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2)
              .print();

        // 执行 Flink 作业
        env.execute("Flink Keyed Kafka Consumer");
    }
}
登录后复制

3.3 Kafka 生产者示例(用于测试)

为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
登录后复制

然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。

4. 总结

通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。

以上就是在 Apache Flink 中消费带键 Kafka 记录的实践教程的详细内容,更多请关注php中文网其它相关文章!

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号