当前位置:首页 >百科 >像Flink一样使用Redis 但 Redis 样使用远不止于此

像Flink一样使用Redis 但 Redis 样使用远不止于此

2024-06-30 16:43:20 [百科] 来源:避面尹邢网

像Flink一样使用Redis

作者:学研妹 数据库 Redis 样使用Redis 是一种功能强大的 NoSQL 内存数据结构存储,已成为开发人员的样使用首选工具。虽然它通常被认为只是样使用一个缓存,但 Redis 样使用远不止于此。它可以作为数据库、样使用消息代理和缓存三者合一。样使用

Apache Flink和 Redis 样使用是两个强大的工具,可以一起使用来构建可以处理大量数据的样使用实时数据处理管道。Flink 为处理数据流提供了一个高度可扩展和容错的样使用平台,而 Redis 样使用提供了一个高性能的内存数据库,可用于存储和查询数据。样使用在本文中,样使用将探讨如何使用 Flink 来使用异步函数调用 Redis,样使用并展示如何使用它以非阻塞方式将数据推送到 Redis。样使用

Redis的样使用故事

图片

像Flink一样使用Redis 但 Redis 样使用远不止于此

“Redis:不仅仅是一个缓存

像Flink一样使用Redis 但 Redis 样使用远不止于此

Redis 是一种功能强大的 NoSQL 内存数据结构存储,已成为开发人员的首选工具。虽然它通常被认为只是一个缓存,但 Redis 远不止于此。它可以作为数据库、消息代理和缓存三者合一。

像Flink一样使用Redis 但 Redis 样使用远不止于此

Redis 的优势之一是它的多功能性。它支持各种数据类型,包括字符串、列表、集合、有序集合、哈希、流、HyperLogLogs 和位图。Redis 还提供地理空间索引和半径查询,使其成为基于位置的应用程序的宝贵工具。

Redis 的功能超出了它的数据模型。它具有内置的复制、Lua 脚本和事务,并且可以使用 Redis Cluster 自动分区数据。此外,Redis 通过 Redis Sentinel 提供高可用性。

注意:在本文中,将更多地关注Redis集群模式

图片

Redis 集群使用带哈希槽的算法分片来确定哪个分片拥有给定的键并简化添加新实例的过程。同时,它使用 Gossiping 来确定集群的健康状况,如果主节点没有响应,可以提升辅助节点以保持集群健康。必须有奇数个主节点和两个副本才能进行稳健设置,以避免脑裂现象(集群无法决定提升谁并最终做出分裂决定)

为了与 Redis 集群对话,将使用lettuce和 Redis Async Java 客户端。

Flink 的故事

图片

Apache Flink 是一个开源、统一的流处理和批处理框架,旨在处理实时、高吞吐量和容错数据处理。它建立在 Apache Gelly 框架之上,旨在支持有界和无界流上的复杂事件处理和有状态计算,它的快速之处在于其利用内存中性能和异步检查本地状态。

故事的主人公

图片

与数据库的异步交互是流处理应用程序的游戏规则改变者。通过这种方法,单个函数实例可以同时处理多个请求,从而允许并发响应并显着提高吞吐量。通过将等待时间与其他请求和响应重叠,处理管道变得更加高效。

我们将以电商数据为例,计算24小时滑动窗口中每个品类的销售额,滑动时间为30秒,下沉到Redis,以便更快地查找下游服务。

充足的数据集

Category, TimeStamp
Electronics,1679832334
Furniture,1679832336
Fashion,1679832378
Food,16798323536

Flink Kafka 消费者类

package Aysnc_kafka_redis;

import AsyncIO.RedisSink;
import akka.japi.tuple.Tuple3;
import deserializer.Ecommdeserialize;
import model.Ecomm;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;

public class FlinkAsyncRedis {

public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ecommdeserialize jsonde = new Ecommdeserialize();

KafkaSource<Ecomm> source = KafkaSource.<Ecomm>builder()
.setTopics("{ dummytopic}")
.setBootstrapServers("{ dummybootstrap}")
.setGroupId("test_flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(jsonde)
.build();


DataStream<Ecomm> orderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Ecomm>(Time.seconds(10)) {
@Override
public long extractTimestamp(Ecomm element) {
return element.getEventTimestamp(); // extract watermark column from stream
}
});

SingleOutputStreamOperator<Tuple3<String, Long, Long>> aggregatedData = orderData.keyBy(Ecomm::getCategory)
.window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30)))
.apply((WindowFunction<Ecomm, Tuple3<String, Long, Long>, String, TimeWindow>) (key, window, input, out) -> {
long count = 0;
for (Ecomm event : input) {
count++; // increment the count for each event in the window
}
out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count
});


// calling async I/0 operator to sink data to redis in UnOrdered way
SingleOutputStreamOperator<String> sinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink(
"{ redisClusterUrl}"),
1000, // the timeout defines how long an asynchronous operation take before it is finally considered failed
TimeUnit.MILLISECONDS,
100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time.

sinkResults.print(); // print out the redis set response stored in the future for every key

env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by this name


}

}

Redis 设置键异步 I/0 运算符

package AsyncIO;

import akka.japi.tuple.Tuple3;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import lombok.AllArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import scala.collection.immutable.List;

import java.util.ArrayList;
import java.util.Collections;

@AllArgsConstructor
public class RedisSink extends RichAsyncFunction<Tuple3<String, Long, Long>, String> {

String redisUrl;

public RedisSink(String redisUrl){
this.redisUrl=redisUrl;
}

private transient RedisClusterClient client = null;
private transient StatefulRedisClusterConnection<String, String> clusterConnection = null;
private transient RedisAdvancedClusterAsyncCommands<String, String> asyncCall = null;


// method executes any operator-specific initialization
@Override
public void open(Configuration parameters) {
if (client == null ) {
client = RedisClusterClient.create(redisUrl);
}
if (clusterConnection == null) {
clusterConnection = client.connect();
}
if (asyncCall == null) {
asyncCall = clusterConnection.async();
}
}

// core logic to set key in redis using async connection and return result of the call via ResultFuture
@Override
public void asyncInvoke(Tuple3<String, Long, Long> stream, ResultFuture<String> resultFuture) {

String productKey = stream.t1();
System.out.println("RedisKey:" + productKey); //for logging
String count = stream.t3().toString();
System.out.println("Redisvalue:" + count); //for logging
RedisFuture<String> setResult = asyncCall.set(productKey,count);

setResult.whenComplete((result, throwable) -> { if(throwable!=null){
System.out.println("Callback from redis failed:" + throwable);
resultFuture.complete(new ArrayList<>());
}
else{
resultFuture.complete(new ArrayList(Collections.singleton(result)));
}});
}

// method closes what was opened during initialization to free any resources
// held by the operator (e.g. open network connections, io streams)
@Override
public void close() throws Exception {
client.close();
}

}

用例:

  • 数据科学模型可以使用流式传输到 Redis 的数据来查找和生成更多在销售季节经常销售的类别的产品。
  • 它可用于在网页上展示图表和数字作为销售统计数据,以在用户中产生积极购买的动力。

要点:

  • Flink 为处理数据流提供了一个高度可扩展和容错的平台,而 Redis 提供了一个高性能的内存数据库,可用于存储和查询数据。
  • 异步编程可用于通过允许对外部系统(如 Redis)进行非阻塞调用来提高数据处理管道的性能。
  • 两者的结合可能有助于带来实时数据决策文化。
责任编辑:武晓燕 来源: Java学研大本营 FlinkRedisNoSQL

(责任编辑:时尚)

    推荐文章
    • 今年全国近八成保障性租赁住房已开工 提高新市民群体满意度

      今年全国近八成保障性租赁住房已开工 提高新市民群体满意度近日,住房和城乡建设部会同国家发改委、财政部、自然资源部、国家税务总局印发了《关于做好2021年度发展保障性租赁住房情况监测评价工作的通知》。《通知》明确,新市民和青年人多、房价偏高或上涨压力较大的大 ...[详细]
    • 26家公司率先发布一季报业绩预告 全部预喜

      26家公司率先发布一季报业绩预告 全部预喜在经济复苏的大背景下,上市公司一季报业绩预告引起了市场的关注。《证券日报》记者根据同花顺数据统计发现,截至3月2日,沪深两市共有26家上市公司披露了2021年一季报业绩预告,值得一提的是,上述公司今年 ...[详细]
    • 银行板块强势反弹 38只成份股全部上涨

      银行板块强势反弹 38只成份股全部上涨经过近三个交易日的下跌调整,3月3日,银行板块强势反弹,截至收盘,银行业指数涨4.65%,报收4183.58点,位列申万一级行业指数涨幅榜第二位。具体来看,38只成份股全部上涨,其中,南京银行和宁波银 ...[详细]
    • A股市场走势跌宕起伏 节后至今北上资金净流入逾131亿元

      A股市场走势跌宕起伏 节后至今北上资金净流入逾131亿元90.1亿元!周三(3月3日)北上资金净买入额逾90亿元,引发了市场各方的关注。“北上资金为代表的外资,在周三再度净流入近百亿元,未来的趋势仍然值得期待。毕竟,在全球主要经济体中,中国是较 ...[详细]
    • 教你新号激活京东金条 京东金条激活方法有哪些?

      教你新号激活京东金条 京东金条激活方法有哪些?京东是一个非常注重品质和送货速度的购物平台,为了满足用户的消费需求,京东也推出了京东白条、京东金条等服务,京东白条主要为用户购物时提供先消费后付款的支付服务,而京东金条就类似于支付宝借呗、微信微粒贷了 ...[详细]
    • 东兴证券超强版v6怎么样?

      东兴证券超强版v6怎么样?东兴证券超强版v6(免费股票软件下载)东兴证券超强版v6集行情、交易、资讯为一体,提供多界面自由切换,一站式登录,闪电下单、对买对卖、自动预警等多种功能,操作简便快捷,东兴证券超强版v6支持LV2,股 ...[详细]
    • 中国人民银行分支机构有哪些?有什么功能?

      中国人民银行分支机构有哪些?有什么功能?中国人民银行分支机构有哪些?中国人民银行及其分支机构。中国人民银行设立行长1人,副行长若干人。中国人民银行实行行长负责制。中国人民银行行长是中国人民银行的最高行政领导人,是中国人民银行的法定代表人,负 ...[详细]
    • 14个省区市委托养老基金投资总金额近6000亿元

      14个省区市委托养老基金投资总金额近6000亿元记者9日从人社部获悉,人社部党组日前刊发《让改革发展成果更多更公平惠及全体人民》一文指出,我国已建成世界上规模最大、覆盖人数最多的社会保障体系。企业退休人员基本养老金连续14年上调,养老保险覆盖人数超 ...[详细]
    • 165.32万元!综保区“提前适用”政策在琼首次落地

      165.32万元!综保区“提前适用”政策在琼首次落地11月29日,海口市空港航空发动机维修工程有限公司向海口海关所属海口美兰机场海关申报入区引射筒自用设备,按照“提前适用”政策享受免征税款165.32万元人民币。这是海口空港综合 ...[详细]
    • 东兴证券超强版v6怎么样?

      东兴证券超强版v6怎么样?东兴证券超强版v6(免费股票软件下载)东兴证券超强版v6集行情、交易、资讯为一体,提供多界面自由切换,一站式登录,闪电下单、对买对卖、自动预警等多种功能,操作简便快捷,东兴证券超强版v6支持LV2,股 ...[详细]
    热点阅读