【消息队列】一文搞懂Kafka

面试七股多一股 2024-02-25 15:21:21
一、为什么要使用消息队列1、生活中收快递

2、下单功能:同步

①问题1:耦合度高

②问题2:响应时间长

③问题3:并发压力传递

④问题4:系统结构弹性不足

3、下单功能:异步

①好处1:功能解耦

②好处2:快速响应

③好处3:异步削峰限流

削峰填谷:

④好处4:系统结构弹性大,易于扩展

二、什么是消息队列1、概念

消息队列是实现应用程序和应用程序之间通信的中间件产品

2、消息队列底层实现的两大主流方式

由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要目前主流的

消息队列通信协议标准包括:

AMQP (Advanced Message Queuing Protocol):通用协议,IBM公司研发JMS (Java Message Service):专门为Java语言服务,SUN公司研发,一组由Java接口组成的Java标准

对比:

3、主流消息队列产品

RabbitMQ

ActiveMQ

RocketMQ

Kafka

研发团队

Rabbit(公司)

Apache(社区)

阿里(公司)

Apache(社区)

开发语言

Erlang

Java

Java

Scala&Java

核心机制

基于AMQP的消息队列模型使用生产者-消费者模式,将消息发布到队列中,然后被消费者订阅和处理

基于JMS的消息传递模型支持点对点模型和发布-订阅模型

分布式的消息队列模型采用主题(Topic)和标签(Tag)的方式进行消息的分类和过滤

分布式流平台,通过发布-订阅模型进行高吞吐量的消息处理

协议支持

XMPPSTOMPSMTP

XMPPSTOMPOpenWireREST

自定义协议

自定义协议社区封装了HTTP协议支持

客户端支持语言

官方支持Erlang、Java、Ruby等社区产出多种API,几乎支持所有语言

JavaC/C++PythonPHPPerl.NET等

JavaC++不成熟

官方支持Java社区产出多种API,如PHP、Python等

可用性

镜像队列

主从复制

主从复制

分区和副本

单机吞吐量

每秒十万左右级别

每秒数万级

每秒十万+级(双十一)

每秒百万级

消息延迟

微秒级

毫秒级

毫秒级

毫秒以内

消息确认

完整的消息确认机制

内置消息表,消息保存到数据库实现持久化

功能特性

并发能力强,性能极好,延时低,社区活跃,管理界面丰富

老牌产品成熟度高文档丰富

MQ功能比较完备扩展性佳

只支持主要的MQ功能毕竟是专门为大数据领域服务的

三、Kafka介绍1、Kafka是什么

Kafka是Apache开源的一款基于zookeeper协调的分布式消息系统,具有高吞吐率、高性能、实时、高可靠等特点,可实时处理流式数据。它最初由LinkedIn公司开发,使用Scala语言编写。

Kafka历经数年的发展,从最初纯粹的消息引擎,到近几年开始在流处理平台生态圈发力,多个组织或公司发布了各种不同特性的产品。常见产品如下:

Apache Kafka :最“正统”的Kafka也是开源版,它是后面其他所有发行版的基础。Cloudera/Hortonworks Kafka :集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。Confluent Kafka :主要提供基于Kafka的企业级流处理解决方案。

Apache Kafka,它现在依然是开发人数最多、版本迭代速度最快的Kafka。我们使用此产品学习。Apache 目前为止总共演进了8个大版本,分别是0.7、0.8、0.9、0.11、1.0、2.0和3.0,我们选择3.5.1版本讲解(截止2023.8)。

2、Kafka的特点高吞吐量、低延迟:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息,它的延迟最低只有几毫秒持久性:支持消息持久化,即使数TB级别的消息也能够保持长时间的稳定性能。可靠性:支持数据备份防止丢失容错性:支持通过Kafka服务器和消费机集群来分区消息,允许集群中的节点失败(若分区副本数量为n,则允许n-1个节点失败)高并发:单机可支持数千个客户端同时读写,支持在线水平扩展。可无缝对接hadoop、strom、spark等,支持Hadoop并行数据加载,3、Kafka官网地址

kafka官网

kafka.apache.org/

kafka下载

kafka.apache.org/downloads

4、Kafka应用场景

ID

设计目标

功能

1

日志收集

一个公司用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer

2

消息系统

解耦生产者和消费者、缓存消息等

3

用户活动跟踪

用来记录Web用户或者APP用户的各种活动,如网页搜索、搜索、点击,用户数据收集然后进行用户行为分析。

4

运营指标

Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告

5

流式处理

比如Spark Streaming和Storm

四、Kafka内部结构一、Producer

生产者:消息发送端

二、Consumer

消费者:消息接收端

三、broker

一个Kafka服务器实例,在Kafka集群中会有多个broker实例

四、Topic

Topic中文意思是主题,在Kafka中只是一个逻辑概念,代表某一类消息。

结合具体项目中的业务功能,我们可以为每一个具体功能创建一个Topic。

五、Partition

Partition就是分区,为什么要分区?

有了分区就可以把消息数据分散到不同broker上保存。

六、Replication

数据分区之后有一个问题:每个broker上保存一部分数据,如果某个broker宕机,那么数据就会不完整。

所以Kafka允许分区创建副本

七、主从

当分区存在副本时,就会区分Leader、Follower:

Leader:主分片,负责接收生产者端发送过来的消息,对接消费者端消费消息Follower:不和生产者、消费者交互,仅负责和Leader同步数据

创建Topic时通过“分区数”指定Partition的数量,通过“复制因子”指定副本数量分区数和复制因子都不能为0分区数为1,复制因子为1表示:1个Partition内有1个Leader(此时数据只有一份,没有冗余的副本,生产环境不建议)复制因子为2表示每个Partition中包含1个Leader和1个Follower八、注册

Kafka工作过程中,broker、Partition……信息都需要在Zookeeper中注册

五、图形化界面软件Eagle一、创建Docker容器# 搜索镜像docker search efak# 创建容器docker run -d --name kafka-eagle \-p 8048:8048 \-e EFAK_CLUSTER_ZK_LIST="192.168.200.100:2181" \nickzurich/efak:latest

提示:如果无法启动,往往是因为Zookeeper所需内存不足,可以试着把Zookeeper内存调整为更大的值再试。

二、使用1、访问地址

http://192.168.200.100:8048

默认登录信息:

账号:admin密码:1234562、查看broker列表

3、主题相关操作①新建

注意:Kafka集群中broker实例的数量需要大于等于复制因子(Replication factor),如果复制因子大于broker实例数量,那么就会看到下面保存信息——

②查看主题列表

③查看主题详情

点击主题名称查看详情:

4、查看分区中的消息

六、客户端原生API一、生产者1、创建主题kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic topic-java-client2、启动消费者监听主题kafka-console-consumer.sh --bootstrap-server 192.168.200.100:9092 --topic topic-java-client3、引入依赖<!-- kafka-clients 2023.10--><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.0</version></dependency>4、Java程序import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public MyProducerDemo{ public static final String TOPIC_NAME = "topic-java-client"; public static void main(String[] args) { // 1. 创建Kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给Kafka配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.100:9092"); // key,value序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建Kafka生产者对象 KafkaProducer kafkaProducer = new KafkaProducer(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, "hello-kafka-from-java-client~" + i)); } System.out.println("----MyProducerDemo发送完毕"); // 5. 关闭资源 kafkaProducer.close(); }}

ProducerRecord参数说明:

public ProducerRecord<K, V> { //主题名称,必选参数 private final String topic; //分区号,大于等于0的整数,可选参数。 private final Integer partition; //消息的头信息,类型是RecordHeaders,可选属性。 private final Headers headers; //键,可选参数。 private final K key; //消息内容,必选参数。 private final V value; //每条消息都有一个时间戳,可选参数 private final Long timestamp;}5、send()方法返回值

KafkaProducer的send()方法返回Future类型的对象,可以调用Future的get()方法同步获取任务执行结果。

此时程序就成了前一个消息发送完成再发送后一个的同步模式。

也就是说不调用get()方法就是异步模式。

// 同步for (int i = 0; i < 5; i++) { // 发送消息的任务交给子线程去做 Future future = kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, "hello-kafka-from-java-client~~~" + i)); TimeUnit.SECONDS.sleep(1); // 但是因为调用了 get() 方法,就变成子线程必须执行完发送消息的任务 // for 循环的本次循环体才算执行完,才能继续执行下一次循环 // 下一次循环就是发送下一条消息 future.get();}6、获取消息发送结果

给KafkaProducer的send()方法再传入一个CallBack类型的参数,以异步回调的方式获取消息发送结果,从而得知消息发送是成功还是失败。

①Java代码kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, "hello-kafka-from-java-client*******"), new Callback() { // onCompletion() 方法在发送消息操作完成时被调用 // 参数 RecordMetadata recordMetadata:发送消息相关的元数据 // 参数 Exception e:发送消息失败时,失败原因封装的异常信息 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { long offset = recordMetadata.offset(); System.out.println("offset = " + offset); int partition = recordMetadata.partition(); System.out.println("partition = " + partition); long timestamp = recordMetadata.timestamp(); System.out.println("timestamp = " + timestamp); String topic = recordMetadata.topic(); System.out.println("topic = " + topic); } else { System.out.println("e = " + e); } }});②失败情况举例

把broker地址改成错的:

e = org.apache.kafka.common.errors.TimeoutException: Topic topic-java-client not present in metadata after 60000 ms.

二、消费者import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.TimeUnit;public MyConsumerDemo{ public static final String TOPIC_NAME = "topic-java-client"; public static void main(String[] args) throws InterruptedException { // 1、创建Kafka消费者的配置对象 Properties properties = new Properties(); // 2、给Kafka配置对象添加配置信息:bootstrap.servers properties.put("bootstrap.servers", "192.168.200.100:9092"); properties.setProperty("group.id", "test"); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 3、创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 4、订阅指定主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { // 5、从broker拉取信息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // 6、每隔 1 秒做一次打印,让消费端程序持续运行 TimeUnit.SECONDS.sleep(1); System.out.println("....进行中"); } }}

七、Kafka集群一、集群搭建1、重要原则Kafka节点只要注册到同一个Zookeeper上就代表它们是同一个集群的Kafka通过broker.id来区分集群中的不同节点2、规划简单起见,我们只使用一个VMWare虚拟机,所以各个broker实例需要设定不同端口号Kafka程序不需要复制,对应各自不同的配置文件启动多个进程就能组成集群Zookeeper还是使用原来的2181即可

端口号

配置文件

日志目录

实例01

7000

/opt/k-cluster/server7000.properties

/opt/k-cluster/log7000

实例02

8000

/opt/k-cluster/server8000.properties

/opt/k-cluster/log8000

实例03

9000

/opt/k-cluster/server9000.properties

/opt/k-cluster/log9000

3、具体操作①创建目录mkdir -p /opt/k-cluster/log7000mkdir -p /opt/k-cluster/log8000mkdir -p /opt/k-cluster/log9000②复制配置文件cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server7000.propertiescp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server8000.propertiescp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server9000.properties③修改配置文件[1]7000broker.id=1listeners=PLAINTEXT://192.168.200.100:7000advertised.listeners=PLAINTEXT://192.168.200.100:7000log.dirs=/opt/k-cluster/log7000[2]8000broker.id=2listeners=PLAINTEXT://192.168.200.100:8000advertised.listeners=PLAINTEXT://192.168.200.100:8000log.dirs=/opt/k-cluster/log8000[3]9000broker.id=3listeners=PLAINTEXT://192.168.200.100:9000advertised.listeners=PLAINTEXT://192.168.200.100:9000log.dirs=/opt/k-cluster/log90004、启动集群各实例

注意:此前需要先启动Zookeeper

kafka-server-start.sh -daemon /opt/k-cluster/server7000.propertieskafka-server-start.sh -daemon /opt/k-cluster/server8000.propertieskafka-server-start.sh -daemon /opt/k-cluster/server9000.properties

验证各个端口号:

lsof -i:2181lsof -i:7000lsof -i:8000lsof -i:9000

如果因为内存不足而启动失败,可以修改对应启动脚本程序中的内存大小:

Zookeeper启动脚本程序:zookeeper-server-start.shZookeeper中Kafka堆内存大小变量名称:KAFKA_HEAP_OPTSKafka启动脚本程序:kafka-server-start.shKafka堆内存大小变量名称:KAFKA_HEAP_OPTS5、停止集群# 停止Kafka,无需指定端口号就能停止各个实例:kafka-server-stop.sh# 停止zkzookeeper-server-stop.sh二、使用集群1、在集群上创建主题kafka-topics.sh \--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \--create \--partitions 3 \--replication-factor 3 \--topic my-cluster-topic2、查看集群主题kafka-topics.sh \--bootstrap-server 192.168.200.100:7000 \--describe --topic my-cluster-topic

3、集群消息发送kafka-console-producer.sh \--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \--topic my-cluster-topic4、集群消息消费kafka-console-consumer.sh \--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \--from-beginning \--topic my-cluster-topic5、集群消息消费相关问题①问题描述

通过集群接收消息时,接收不到

②问题产生原因

多个broker实例部署在同一个虚拟机上

192.168.200.100:7000192.168.200.100:8000192.168.200.100:9000

这只是我们在测试环境下,非正式的这么安排,实际开发中不会把集群的所有实例放在一个机器上

③问题解决方案一

消费端接收消息时指定分区

kafka-console-consumer.sh \--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \--from-beginning \--partition 0 \--topic my-cluster-topickafka-console-consumer.sh \--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \--from-beginning \--partition 1 \--topic my-cluster-topickafka-console-consumer.sh \--bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 \--from-beginning \--partition 2 \--topic my-cluster-topic④问题解决方案二第一步:把apache-zookeeper-3.9.1-bin.tar.gz上传到Linux系统/opt目录下第二步:解压apache-zookeeper-3.9.1-bin.tar.gz文件cd /opttar -zxvf apache-zookeeper-3.9.1-bin.tar.gz第三步:运行zkCli.sh脚本文件,登录到Zookeeper服务器/opt/apache-zookeeper-3.9.1-bin/bin/zkCli.sh第四步:删除__consumer_offsets主题deleteall /brokers/topics/__consumer_offsets第五步:退出Zookeeperquit第六步:重启 先关闭然后重新启动Zookeeper 先关闭然后重新启动集群各实例八 客户端SpringBoot一、生产者1、配置POM<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.3</version> <relativePath/></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--spring-kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--hutool--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.19</version> </dependency></dependencies><build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins></build>

2、配置YAMLspring: kafka: bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer3、主启动类package com.atguigu.kafka;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic KafkaMainType { public static void main(String[] args) { SpringApplication.run(KafkaMainType.class, args); } }4、配置类创建主题package com.atguigu.kafka.config;import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.TopicBuilder;@Configurationpublic KafkaConfig { @Bean public NewTopic springTestTopic() { return TopicBuilder.name("topic-spring-boot") // 主题名称 .partitions(3) // 分区数量 .replicas(3) // 副本数量 .build(); }}

到这里我们可以运行主启动类,看看主题是否创建成功

kafka-topics.sh --bootstrap-server 192.168.200.100:7000 --list5、发送消息①命令行监听消息kafka-console-consumer.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --topic topic-spring-boot --partition 0kafka-console-consumer.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --topic topic-spring-boot --partition 1kafka-console-consumer.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --topic topic-spring-boot --partition 2②Java代码package com.atguigu.kafka.test;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;@SpringBootTestpublic KafkaTest { @Resource private KafkaTemplate kafkaTemplate; @Test public void testSendMessage() { String topicName = "topic-spring-boot"; String message = "hello spring boot message"; kafkaTemplate.send(topicName, message); }}二、消费者1、配置POM<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.3</version> <relativePath/></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--spring-kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--hutool--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.19</version> </dependency></dependencies><build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins></build>2、配置YAMLspring: Kafka: bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: consumer-group3、主启动类package com.atguigu.kafka;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic KafkaMainTypeConsumer { public static void main(String[] args) { SpringApplication.run(KafkaMainTypeConsumer.class, args); }}4、接收消息的监听器package com.atguigu.kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic KafkaMessageListener { @KafkaListener(topics = {"topic-spring-boot"}) public void simpleConsumerPartition(ConsumerRecord<String, String> record) { System.out.println("进入simpleConsumer方法"); System.out.printf( "分区 = %d, 偏移量 = %d, key = %s, 内容 = %s, 时间戳 = %d%n", record.partition(), record.offset(), record.key(), record.value(), record.timestamp() ); }}

注意:这里我们没有指定具体接收哪个分区的消息,所以如果接收不到消息,那么就需要登录Zookeeper删除__consumer_offsets

deleteall /brokers/topics/__consumer_offsets三、实体类对象类型的消息1、创建实体类package com.atguigu.kafka.entity;import lombok.AllArgsConstructor;import lombok.Data;@Data@AllArgsConstructorpublic UserDTO { private String name; private Integer age; private String mobile;}2、发送消息的方法@Testpublic void testSendEntity() { String topicName = "topic-spring-boot230628"; UserDTO userDTO = new UserDTO("tom", 25, "12345343"); kafkaTemplate.send(topicName, userDTO);}3、异常异常全类名:java.lang.ClassCastException异常信息:class com.atguigu.kafka.entity.UserDTO cannot be cast to java.lang.String (com.atguigu.kafka.entity.UserDTO is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')异常原因:目前使用的序列化器是StringSerializer,不支持非字符串序列化解决办法:把序列化器换成支持复杂类型的4、修改YAML配置spring: kafka: bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer # value-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
0 阅读:0