首页 > Java > java教程 > 正文

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

霞舞
发布: 2025-09-25 12:02:19
原创
478人浏览过

Flink中JSON字符串到JSONObject转换的陷阱与最佳实践

在Apache Flink中处理JSON字符串时,开发者常遇到将String类型数据转换为JSONObject的需求。然而,直接使用JSONObject.parseObject()可能导致NullPointerException,即使字符串已正确解析。本文将揭示这一常见问题的原因,提供使用org.json库中new JSONObject(value)的正确解决方案,并强调出于性能和类型安全考虑,在生产环境中优先使用POJO进行JSON反序列化的最佳实践。

Flink中JSON字符串转换的常见问题

apache flink流处理应用中,从数据源(如kafka、文件等)获取的原始数据通常是json格式的字符串。为了进一步解析和处理这些结构化数据,我们通常需要将其转换为jsonobject对象。然而,许多开发者在尝试将string类型的数据通过processfunction或其他算子转换为jsonobject并收集输出时,可能会遇到如下异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: Assigned key must not be null!
Caused by: java.lang.NullPointerException: Assigned key must not be null!
登录后复制

尽管在调试过程中发现String已经成功解析成了JSONObject实例,但当尝试通过Collector收集这些对象时,作业却失败了。这表明问题并非出在JSON解析本身,而可能与JSONObject对象的特定实现、其与Flink内部序列化机制的兼容性,或者其内部状态有关。

原始的错误代码片段如下:

import com.alibaba.fastjson.JSONObject; // 假设使用了Fastjson
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingIssue {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 尝试使用JSONObject.parseObject()
                JSONObject jsonObject = JSONObject.parseObject(value);
                out.collect(jsonObject); // 在这里抛出异常
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}
登录后复制

解决方案:使用org.json库的JSONObject构造函数

解决上述NullPointerException的关键在于选择正确的JSONObject实现及其初始化方式。经过验证,使用org.json库提供的JSONObject,并通过其构造函数直接传入JSON字符串可以避免此问题。

首先,确保您的项目中引入了org.json库的依赖:

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20180130</version> <!-- 可以根据需要选择最新稳定版本 -->
</dependency>
登录后复制

然后,在ProcessFunction中将JSONObject.parseObject(value)替换为new JSONObject(value):

import org.json.JSONObject; // 注意这里引入的是org.json.JSONObject
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkJsonProcessingSolution {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<JSONObject> jsonObjDS = inputDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
                // 使用org.json.JSONObject的构造函数
                JSONObject jsonObject = new JSONObject(value);
                out.collect(jsonObject); // 现在可以正常收集
            }
        });
        jsonObjDS.print();

        env.execute();
    }
}
登录后复制

此更改后,Flink作业将能够正常运行并打印出转换后的JSONObject。这表明org.json库的JSONObject实现与Flink的内部机制(尤其是其类型序列化器)兼容性更好,或者其内部状态在被序列化和反序列化时能够保持完整性,从而避免了NullPointerException。

最佳实践:优先使用POJO进行JSON反序列化

尽管上述方法能够解决String到JSONObject的转换问题,但在实际生产环境中,特别是在处理大量数据或对性能有较高要求的Flink应用中,直接传递和处理JSONObject通常不是最佳实践。

原因如下:

Flawless AI
Flawless AI

好莱坞2.0,电影制作领域的生成式AI工具

Flawless AI 32
查看详情 Flawless AI
  1. 序列化与反序列化开销大: JSONObject是一个通用的Map结构,其内部字段类型不固定,这使得Flink在序列化和反序列化JSONObject时,需要进行更多的元数据处理和类型推断,导致额外的CPU和内存开销。相比之下,POJO(Plain Old Java Object)具有固定的结构和明确的字段类型,Flink可以利用Kyro等高效序列化器进行快速、紧凑的序列化。
  2. 缺乏类型安全: JSONObject的操作通常基于字符串键值对,容易出现拼写错误或类型转换错误,且这些错误通常在运行时才能发现。而POJO提供了编译时类型检查,能够有效减少运行时错误。
  3. 可读性和可维护性差: 使用JSONObject意味着需要通过getString("key")、getInt("key")等方法手动提取字段,代码冗长且不易阅读。POJO则允许直接通过属性访问数据,代码更简洁、更具可读性。
  4. Schema演进: 随着业务发展,JSON数据的Schema可能会发生变化。POJO可以更优雅地处理Schema演进,例如通过添加新字段或使用@JsonIgnoreProperties(ignoreUnknown = true)注解忽略未知字段。

推荐做法:将JSON字符串反序列化为POJO

在Flink中,最佳实践是将JSON字符串反序列化为定义好的POJO类。这通常通过自定义DeserializationSchema或使用Flink提供的JSON格式(如JsonRowSerializationSchema)来实现。

例如,对于上述JSON数据,我们可以定义一个POJO类:

import java.io.Serializable;

public class BillInfo implements Serializable {
    private String ADD_TIME;
    private String ORDER_ID;
    private String ADDER_NO;
    private String UPDATER_NO;
    private String S_USER_ID;
    private String B_USER_ID;
    private String BILL_ID;
    private String ADDER_NAME;
    private String UPDATE_TIME;
    private String UPDATER_NAME;

    // 必须有无参构造函数
    public BillInfo() {}

    // Getter和Setter方法
    public String getADD_TIME() { return ADD_TIME; }
    public void setADD_TIME(String ADD_TIME) { this.ADD_TIME = ADD_TIME; }
    public String getORDER_ID() { return ORDER_ID; }
    public void setORDER_ID(String ORDER_ID) { this.ORDER_ID = ORDER_ID; }
    public String getADDER_NO() { return ADDER_NO; }
    public void setADDER_NO(String ADDER_NO) { this.ADDER_NO = ADDER_NO; }
    public String getUPDATER_NO() { return UPDATER_NO; }
    public void setUPDATER_NO(String UPDATER_NO) { this.UPDATER_NO = UPDATER_NO; }
    public String getS_USER_ID() { return S_USER_ID; }
    public void setS_USER_ID(String S_USER_ID) { this.S_USER_ID = S_USER_ID; }
    public String getB_USER_ID() { return B_USER_ID; }
    public void setB_USER_ID(String B_USER_ID) { this.B_USER_ID = B_USER_ID; }
    public String getBILL_ID() { return BILL_ID; }
    public void setBILL_ID(String BILL_ID) { this.BILL_ID = BILL_ID; }
    public String getADDER_NAME() { return ADDER_NAME; }
    public void setADDER_NAME(String ADDER_NAME) { this.ADDER_NAME = ADDER_NAME; }
    public String getUPDATE_TIME() { return UPDATE_TIME; }
    public void setUPDATE_TIME(String UPDATE_TIME) { this.UPDATE_TIME = UPDATE_TIME; }
    public String getUPDATER_NAME() { return UPDATER_NAME; }
    public void setUPDATER_NAME(String UPDATER_NAME) { this.UPDATER_NAME = UPDATER_NAME; }

    @Override
    public String toString() {
        return "BillInfo{" +
               "ADD_TIME='" + ADD_TIME + '\'' +
               ", ORDER_ID='" + ORDER_ID + '\'' +
               ", ADDER_NO='" + ADDER_NO + '\'' +
               ", UPDATER_NO='" + UPDATER_NO + '\'' +
               ", S_USER_ID='" + S_USER_ID + '\'' +
               ", B_USER_ID='" + B_USER_ID + '\'' +
               ", BILL_ID='" + BILL_ID + '\'' +
               ", ADDER_NAME='" + ADDER_NAME + '\'' +
               ", UPDATE_TIME='" + UPDATE_TIME + '\'' +
               ", UPDATER_NAME='" + UPDATER_NAME + '\'' +
               '}';
    }
}

// 如果JSON结构更复杂,包含嵌套对象,则需要定义相应的嵌套POJO
public class RootData implements Serializable {
    private BillInfo bill_info;

    public RootData() {}

    public BillInfo getBill_info() { return bill_info; }
    public void setBill_info(BillInfo bill_info) { this.bill_info = bill_info; }

    @Override
    public String toString() {
        return "RootData{" +
               "bill_info=" + bill_info +
               '}';
    }
}
登录后复制

然后,可以使用Jackson或Gson等库在ProcessFunction中将字符串反序列化为POJO:

import com.fasterxml.jackson.databind.ObjectMapper; // 引入Jackson库
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class FlinkPojoProcessing {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.fromElements(
            "{\"bill_info\":{\"ADD_TIME\":\"2022-11-12 16:05:28:418\",\"ORDER_ID\":\"f3e60c5f-78f6-4ec6-bab6-98b177f7cb67\",\"ADDER_NO\":\"0706\",\"UPDATER_NO\":\"0706\",\"S_USER_ID\":\"s68\",\"B_USER_ID\":\"b77\",\"BILL_ID\":\"8687b584-038c-498c-8f97-ec1ca197da96\",\"ADDER_NAME\":\"sss\",\"UPDATE_TIME\":\"2022-11-12 16:05:28:418\",\"UPDATER_NAME\":\"ssss\"}}"
        );

        SingleOutputStreamOperator<RootData> pojoDS = inputDS.process(new ProcessFunction<String, RootData>() {
            // ObjectMapper 是线程安全的,可以作为类成员或静态成员
            private transient ObjectMapper objectMapper; 

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                super.open(parameters);
                objectMapper = new ObjectMapper();
            }

            @Override
            public void processElement(String value, ProcessFunction<String, RootData>.Context ctx, Collector<RootData> out) throws Exception {
                RootData rootData = objectMapper.readValue(value, RootData.class);
                out.collect(rootData);
            }
        });
        pojoDS.print();

        env.execute();
    }
}
登录后复制

为了使用Jackson,需要添加以下依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.4</version> <!-- 根据需要选择最新稳定版本 -->
</dependency>
登录后复制

总结

在Flink中将JSON字符串转换为JSONObject时,如果遇到NullPointerException,尝试使用org.json库的new JSONObject(value)构造函数通常可以解决问题。然而,从长期维护和性能优化的角度来看,强烈建议将JSON字符串反序列化为POJO。POJO不仅提供了更好的类型安全和代码可读性,还能显著提高Flink应用的序列化和反序列化效率,是构建健壮、高性能流处理应用的基石。

以上就是Flink中JSON字符串到JSONObject转换的陷阱与最佳实践的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号