
本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。
在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:
当使用 kafka-console-producer.sh 并指定 --property "parse.key=true" --property "key.separator=:" 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。
Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。
以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.common.serialization.StringDeserializer;
public class FlinkValueOnlyKafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics("test3")
.setGroupId("1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();
env.execute("Flink Value-Only Kafka Consumer");
}
}要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord<byte[], byte[]> 对象,该对象提供了对原始字节形式的键、值、时间戳、分区、偏移量以及头部信息的完全访问。
首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3<String, String, Long> 对象中输出。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import java.io.IOException;
/**
* 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。
* 输出类型为 Tuple3<Key, Value, Timestamp>
*/
public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema<Tuple3<String, String, Long>> {
// transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化
private transient StringDeserializer keyDeserializer;
private transient StringDeserializer valueDeserializer;
/**
* 在反序列化器初始化时调用,用于设置内部状态。
* 通常在这里初始化 Kafka 客户端的反序列化器。
*/
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
// 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器
// 假设键和值都是字符串,使用 StringDeserializer
keyDeserializer = new StringDeserializer();
valueDeserializer = new StringDeserializer();
}
/**
* 核心反序列化逻辑。
*
* @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。
* @param out 用于收集反序列化结果的 Collector。
* @throws IOException 如果反序列化过程中发生 I/O 错误。
*/
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Tuple3<String, String, Long>> out) throws IOException {
// 反序列化键
String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null;
// 反序列化值
String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null;
// 获取时间戳
long timestamp = record.timestamp();
// 将反序列化后的键、值和时间戳封装成 Tuple3 并发出
out.collect(new Tuple3<>(key, value, timestamp));
}
/**
* 返回此反序列化器生产的数据类型信息。
* Flink 使用此信息进行类型检查和序列化。
*/
@Override
public TypeInformation<Tuple3<String, String, Long>> getProducedType() {
// 使用 TypeHint 来获取泛型类型信息
return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint<Tuple3<String, String, Long>>() {});
}
}注意事项:
接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
public class FlinkKeyedKafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址
String topic = "test3";
String groupId = "1";
// 构建 KafkaSource,并指定我们自定义的反序列化器
KafkaSource<Tuple3<String, String, Long>> source = KafkaSource.<Tuple3<String, String, Long>>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器
.build();
// 从 KafkaSource 创建数据流
DataStream<Tuple3<String, String, Long>> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");
// 对数据流进行操作,现在可以访问键、值和时间戳
stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2)
.print();
// 执行 Flink 作业
env.execute("Flink Keyed Kafka Consumer");
}
}为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:
bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。
通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。
以上就是在 Apache Flink 中消费带键 Kafka 记录的实践教程的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号