当前位置:首页 >知识 >原来Kafka也有事务啊,再也不担心消息不一致了 原来Kafka也有事务啊

原来Kafka也有事务啊,再也不担心消息不一致了 原来Kafka也有事务啊

2024-06-28 19:28:42 [百科] 来源:避面尹邢网

原来Kafka也有事务啊,原K也再也不担心消息不一致了

作者:JAVA旭阳 开发 架构 本文讲解了通过kafka事务可以实现端到端的事务精确一次的消息语义,通过事务机制,再也KAFKA 实现了对多个 topic​ 的不担多个 partition 的原子性的写入,通过一个例子了解了一下如何使用事物。心消息

前言

现在假定这么一个业务场景,原K也从kafka中的事务topic获取消息数据,经过一定加工处理后,再也发送到另外一个topic中,不担要求整个过程消息不能丢失,心消息也不能重复发送,原K也即实现端到端的事务Exactly-Once精确一次消息投递。这该如何实现呢?

图片

原来Kafka也有事务啊,再也不担心消息不一致了 原来Kafka也有事务啊

Kafka事务介绍

针对上面的再也业务场景,kafka已经替我们想到了,不担在kafka 0.11版本以后,心消息引入了一个重大的特性:幂等性和事务。

原来Kafka也有事务啊,再也不担心消息不一致了 原来Kafka也有事务啊

幂等性

这里提到幂等性的原因,主要是因为事务的启用必须要先开启幂等性,那么什么是幂等性呢?

原来Kafka也有事务啊,再也不担心消息不一致了 原来Kafka也有事务啊

幂等性是指生产者无论向kafka broker发送多少次重复的数据,broker 端只会持久化一条,保证数据不会重复。

幂等性通过生产者配置项enable.idempotence=true开启,默认情况下为true。

幂等性实现原理

图片

  1. 每条消息都有一个主键,这个主键由 <PID, Partition, SeqNumber>组成。
  • PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID。
  • Partition:消息需要发往的分区号。
  • SeqNumber:生产者,他会记录自己所发送的消息,给他们分配一个自增的 ID,这个 ID 就是 SeqNumber,是该消息的唯一标识,每发送一条消息,序列号加 1。
  1. 对于主键相同的数据,kafka 是不会重复持久化的,它只会接收一条。

幂等性缺点

根据幂等性的原理,我们发现它存在下面的缺点:

  • 只能保证单分区、单会话内的数据不重复
  • kafka 挂掉,重新给生产者分配了 PID,还是有可能产生重复的数据

那么如何实现跨分区、kafka broker重启也能保证不重复呢?这就要使用事务了。

事务

所谓事务,就是要求保证原子性,要么全部成功,要么全部失败。那么具体该如何开启呢?

  1. kafka要想开启事务必须要启用幂等性,即生产者配置enable.idempotence=true
  2. kafka生产者需要配置唯一的事务idtransactional.id, 最好为其设置一个有意义的名字。
  3. kafka消费端也有一个配置项isolation.level和事务有很大关系。
  • read_uncommitted:默认值,消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。
  • read_committed:消费端应用只能消费到提交的事务内的消息。

Kafka事务 API

现在我们用java的api来实现一下前面这个“消费-处理-生产“的例子吧。

  1. 引入依赖
<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>3.4.0</version></dependency>
  1. 创建事务的生产者
Properties prodcuerProps = new Properties();// kafka地址prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// key序列化prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// value序列化prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 启用幂等性producerProps.put("enable.idempotence", "true");// 设置事务idproducerProps.put("transactional.id", "prod-1");KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
  • enable.idempotence配置项目为true
  • 设置transactional.id
  1. 创建事务的消费者
Properties consumerProps = new Properties();consumerProps.put("bootstrap.servers", "localhost:9092");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put("group.id", "my-group-id");// 设置consumer手动提交consumerProps.put("enable.auto.commit", "false");// 设置隔离级别,读取事务已提交的消息consumerProps.put("isolation.level", "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);//订阅主题consumer.subscribe(Collections.singletonList("topic1"));
  • enable.auto.commit=false,设置手动提交消费者offset
  • 设置isolation.level=read_committed,消费事务已提交的消息

4.核心逻辑

// 初始化事务 producer.initTransactions();while(true) {  // 拉取消息  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));    if(!records.isEmpty()){         // 准备一个 hashmap 来记录:"分区-消费位移" 键值对        HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();        // 开启事务         producer.beginTransaction();        try {             // 获取本批消息中所有的分区            Set<TopicPartition> partitions = records.partitions();            // 遍历每个分区            for (TopicPartition partition : partitions) {                 // 获取该分区的消息                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);                // 遍历每条消息                for (ConsumerRecord<String, String> record : partitionRecords) {                     // 执行数据的业务处理逻辑                    ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());                    // 将处理结果写入 kafka                    producer.send(outRecord);                }                // 将处理完的本分区对应的消费位移记录到 hashmap 中                long offset = partitionRecords.get(partitionRecords.size() - 1).offset();                // 事务提交的是即将到来的偏移量,这意味着我们需要加 1                offsetsMap.put(partition,new OffsetAndMetadata(offset+1));            }            // 向事务管理器提交消费位移             producer.sendOffsetsToTransaction(offsetsMap,"groupid");            // 提交事务             producer.commitTransaction();        } catch(Exeception e) {             e.printStackTrace();            // 终止事务             producer.abortTransaction();        }    }}
  • initTransactions(): 初始化事务
  • beginTransaction(): 开启事务
  • sendOffsetsToTransaction(): 在事务内提交已经消费的偏移量(主要用于消费者)
  • commitTransaction(): 提交事务
  • abortTransaction(): 放弃事务

Kafka事务实现原理

kafka事务的实现引入了事务协调器,如下图所示:

图片

  1. 生产者使用事务必须配置事务id, kafka根据事务id计算分配事务协调器
  2. 事务协调器返回pid,前面的幂等性中需要
  3. 开始发送消息到topic中,不过这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息
  4. 当生产者事务内的消息发送完毕,会向事务协调器发送 commit 或 abort 请求,等待 kafka 响应
  5. 事务协调器收到请求后先持久化到内置事务主题__transaction_state中,__transaction_state默认有50个分区,每个分区负责一部分事务。事务划分是根据transactional.id的hashcode值%50,计算出该事务属于哪个分区。 该分区Leader副本所在的broker节点即为这个transactional.id对应的Transaction Coordinator节点,这也是上面第一步中的计算逻辑。
  6. 事务协调器后台会跟topic通信,告诉它们事务是成功还是失败的。
  • 如果是成功,topic会汇报自己已经收到消息,协调者收到主题的回应便确认了事务完成,并持久化这一结果。
  • 如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束。
  1. 持久化第6步中的事务成功或者失败的信息, 如果kafka broker配置max.transaction.timeout.ms之前既不提交也不中止事务, kafka broker将中止事务本身。 此属性的默认值为 15 分钟。

总结

本文讲解了通过kafka事务可以实现端到端的精确一次的消息语义,通过事务机制,KAFKA 实现了对多个 topic 的多个 partition 的原子性的写入,通过一个例子了解了一下如何使用事物。同时也简单介绍了事务实现的原理,它底层必须要依赖kafka的幂等性机制,同时通过类似“二段提交”的方式保证事务的原子性。

责任编辑:武晓燕 来源: JAVA旭阳 kafka事务消息语义

(责任编辑:娱乐)

    推荐文章
    • 凯撒文化(002425.SZ)业绩快报:2020年度净利润降40.8% 基本每股收益0.15元

      凯撒文化(002425.SZ)业绩快报:2020年度净利润降40.8% 基本每股收益0.15元凯撒文化(002425.SZ)发布2020年度业绩快报,实现营业总收入5.90亿元,同比下降26.64%;归属于上市公司股东的净利润1.24亿元,同比下降40.80%;基本每股收益0.15元。报告期内 ...[详细]
    • 巴西狂欢节几月几日

      巴西狂欢节几月几日前言:巴西著名的狂欢节是在每年的几月举行??巴西狂欢节在复活节前47天,而复活节是春分月圆后的第一个星期日,故巴西狂欢节日期不确定。巴西狂欢节被称为世界上最大的狂欢节,有“地球上最伟大的表演”之称。在 ...[详细]
    • 松茸是野生的还是人工种的

      松茸是野生的还是人工种的松茸是野生的还是人工种的-业百科松茸有野生也有人工种植,两者区别在于颜色、气味、营养、体积。颜色不同:野生松茸颜色较浅,上面呈现黑褐色,下面为灰褐色,而人工栽培的松茸整体为红。松茸可以人工种植吗?随着 ...[详细]
    • 水杯壁很多气泡能喝吗

      水杯壁很多气泡能喝吗保温杯盖里面是泡沫健康吗?如果纯净水存放时间太长了,使其水变质了,倒入,这样的水是不可以喝的,会对健康造成危害,容易造成肠胃的疾病,出现腹痛、。水杯壁很多气泡能喝吗-业百科如果是加工过程中材质不均匀 ...[详细]
    • 中证金力挺民企债券融资专项计划 完善民营企业债券融资支持机制

      中证金力挺民企债券融资专项计划 完善民营企业债券融资支持机制民企债券融资迎来重要支持方案。证监会11日晚称,交易所债券市场推出民营企业债券融资专项支持计划,以稳定和促进民营企业债券融资。中证金正是大名鼎鼎的国家队,成立于2011年10月,2015年市场大幅波动 ...[详细]
    • 平昌冬奥会男子花样滑冰视频

      平昌冬奥会男子花样滑冰视频2014冬季奥运会男子花样滑冰金牌?是羽生结弦2014年2月,年仅19岁的羽生结弦夺得索契冬奥会金牌,成为亚洲首位冬奥会男子单人滑冠军。羽生结弦,1994年12月7日出生于日本宫城县仙台市,日本花样。 ...[详细]
    • 哆啦a梦超清壁纸

      哆啦a梦超清壁纸前言:哆啦A梦超清图片打开百度--图片---搜索哆啦A梦在“全部大斜哪里选特大尺寸,或者自定义哆啦A梦的壁纸这里一直会有新的,你可以根据你自己的眼光来挑选哦...http://www.dora-wor ...[详细]
    • 薏米可以和绿豆一起煮吗

      薏米可以和绿豆一起煮吗请问薏仁米可以跟一起煮吗?是什么功效?跟黄绿豆能和很有营养的。也可以喝黄豆一起煮。黑芝麻薏米绿豆粉的吃法可以吗?有害吗?薏米具有补益脾胃,利湿的作用。绿豆有一定的清热解毒的作用。意见建议:以上三者可以 ...[详细]
    • 568万元!四川省攀枝花市获省建筑领域绿色低碳循环发展专项资金支持

      568万元!四川省攀枝花市获省建筑领域绿色低碳循环发展专项资金支持近日,攀枝花市争取省建筑领域绿色低碳循环发展专项资金568万元,用于支持攀枝花市政务服务中心、三线建设文化旅游融合发展一期工程、攀西钒钛科技产业园总部办公园区一期等10个项目。据悉,这10个项目中,星 ...[详细]
    • 哆啦a梦超清壁纸

      哆啦a梦超清壁纸前言:哆啦A梦超清图片打开百度--图片---搜索哆啦A梦在“全部大斜哪里选特大尺寸,或者自定义哆啦A梦的壁纸这里一直会有新的,你可以根据你自己的眼光来挑选哦...http://www.dora-wor ...[详细]
    热点阅读