现在假定这么一个业务场景,原K也从kafka中的事务topic获取消息数据,经过一定加工处理后,再也发送到另外一个topic中,不担要求整个过程消息不能丢失,心消息也不能重复发送,原K也即实现端到端的事务Exactly-Once精确一次消息投递。这该如何实现呢?
针对上面的再也业务场景,kafka已经替我们想到了,不担在kafka 0.11版本以后,心消息引入了一个重大的特性:幂等性和事务。
这里提到幂等性的原因,主要是因为事务的启用必须要先开启幂等性,那么什么是幂等性呢?
幂等性是指生产者无论向kafka broker发送多少次重复的数据,broker 端只会持久化一条,保证数据不会重复。
幂等性通过生产者配置项enable.idempotence=true开启,默认情况下为true。
幂等性实现原理
幂等性缺点
根据幂等性的原理,我们发现它存在下面的缺点:
那么如何实现跨分区、kafka broker重启也能保证不重复呢?这就要使用事务了。
所谓事务,就是要求保证原子性,要么全部成功,要么全部失败。那么具体该如何开启呢?
现在我们用java的api来实现一下前面这个“消费-处理-生产“的例子吧。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version></dependency>
Properties prodcuerProps = new Properties();// kafka地址prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");// key序列化prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// value序列化prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 启用幂等性producerProps.put("enable.idempotence", "true");// 设置事务idproducerProps.put("transactional.id", "prod-1");KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
Properties consumerProps = new Properties();consumerProps.put("bootstrap.servers", "localhost:9092");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put("group.id", "my-group-id");// 设置consumer手动提交consumerProps.put("enable.auto.commit", "false");// 设置隔离级别,读取事务已提交的消息consumerProps.put("isolation.level", "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);//订阅主题consumer.subscribe(Collections.singletonList("topic1"));
4.核心逻辑
// 初始化事务 producer.initTransactions();while(true) { // 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L)); if(!records.isEmpty()){ // 准备一个 hashmap 来记录:"分区-消费位移" 键值对 HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); // 开启事务 producer.beginTransaction(); try { // 获取本批消息中所有的分区 Set<TopicPartition> partitions = records.partitions(); // 遍历每个分区 for (TopicPartition partition : partitions) { // 获取该分区的消息 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); // 遍历每条消息 for (ConsumerRecord<String, String> record : partitionRecords) { // 执行数据的业务处理逻辑 ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase()); // 将处理结果写入 kafka producer.send(outRecord); } // 将处理完的本分区对应的消费位移记录到 hashmap 中 long offset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 事务提交的是即将到来的偏移量,这意味着我们需要加 1 offsetsMap.put(partition,new OffsetAndMetadata(offset+1)); } // 向事务管理器提交消费位移 producer.sendOffsetsToTransaction(offsetsMap,"groupid"); // 提交事务 producer.commitTransaction(); } catch(Exeception e) { e.printStackTrace(); // 终止事务 producer.abortTransaction(); } }}
kafka事务的实现引入了事务协调器,如下图所示:
本文讲解了通过kafka事务可以实现端到端的精确一次的消息语义,通过事务机制,KAFKA 实现了对多个 topic 的多个 partition 的原子性的写入,通过一个例子了解了一下如何使用事物。同时也简单介绍了事务实现的原理,它底层必须要依赖kafka的幂等性机制,同时通过类似“二段提交”的方式保证事务的原子性。
责任编辑:武晓燕 来源: JAVA旭阳 kafka事务消息语义(责任编辑:娱乐)
凯撒文化(002425.SZ)业绩快报:2020年度净利润降40.8% 基本每股收益0.15元
中证金力挺民企债券融资专项计划 完善民营企业债券融资支持机制
568万元!四川省攀枝花市获省建筑领域绿色低碳循环发展专项资金支持