当前位置:首页 >娱乐 >Springboot+Redisson封装分布式锁Starter 封装分布微服务环境中

Springboot+Redisson封装分布式锁Starter 封装分布微服务环境中

2024-06-30 20:17:03 [百科] 来源:避面尹邢网

Springboot+Redisson封装分布式锁Starter

作者:IT动力 开发 前端 我们将分布式锁基于缓存扩展了一版,封装分布也就是式锁说本starter即有分布式缓存功能,又有分布式锁功能。封装分布而注解版的式锁分布式锁能够解决大多数场景的并核问题,小粒度的封装分布Lock锁方式补全其他场景。

1、式锁为什么要使用分布式锁?

在分布式,封装分布微服务环境中,式锁我们的封装分布服务被拆分为很多个,并且每一个服务可能存在多个实例,式锁部署在不同的封装分布服务器上。

此时JVM中的式锁synchronized和lock锁,将只能对自己所在服务的封装分布JVM加锁,而跨机器,式锁跨JMV的封装分布场景,仍然需要锁的场景就需要使用到分布式锁了。

Springboot+Redisson封装分布式锁Starter 封装分布微服务环境中

2、为什么要使用Redis实现分布式锁?

因为Redis的性能很好,并且Redis是单线程的,天生线程安全。

Springboot+Redisson封装分布式锁Starter 封装分布微服务环境中

并且Redis的key过期效果与Zookeeper的临时节点的效果相似,都能实现锁超时自动释放的功能。

Springboot+Redisson封装分布式锁Starter 封装分布微服务环境中

而且Redis还可以使用lua脚本来保证redis多条命令实现整体的原子性,Redisson就是使用lua脚本的原子性来实现分布式锁的。

3、我们如何基于Redisson封装分布式锁?

1)、基于RedissonClient实现手动加锁

2)、基于AOP+Redisson封装注解版的分布式锁

3)、将分布式锁功能封装成一个starter, 引入jar包即可实现分布式锁

4、代码实现

4.1、整合封装Redisson

我们前面封装了基于Redis扩展了SpringCache,封装了
redis-cache-spring-boot-starter。

我们的分布式锁基于这个模块实现,下面引入依赖。

引入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>itdl-parent</artifactId>        <groupId>com.itdl</groupId>        <version>1.0</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>redis-lock-spring-boot-starter</artifactId>    <description>Redis实现分布式锁的自定义starter封装模块</description>    <properties>        <maven.compiler.source>${ java.version}</maven.compiler.source>        <maven.compiler.target>${ java.version}</maven.compiler.target>    </properties>    <dependencies>        <!--redis cache-->        <dependency>            <groupId>com.itdl</groupId>            <artifactId>redis-cache-spring-boot-starter</artifactId>        </dependency>        <!--redisson-->        <dependency>            <groupId>org.redisson</groupId>            <artifactId>redisson</artifactId>        </dependency>        <!--aop-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-aop</artifactId>        </dependency>    </dependencies></project>

编写RedisLockConfig配置RedissonClient

/** * Redis实现分布式锁的配置(使用Redisson) */@Configuration  // 标识为一个配置项,注入Spring容器@AutoConfigureBefore({ CustomRedisConfig.class, CacheNullValuesHandle.class})@ConditionalOnProperty(value = "redis.enable", havingValue = "true")    // 开启redis.enable=true时生效@Slf4jpublic class RedisLockConfig {     private volatile boolean isCluster = false;    private volatile String redisHostsStr = "";    @Bean    @ConditionalOnMissingBean    public RedissonClient redissonClient(CustomRedisProperties redisProperties){         // 构建配置        Config config =  buildConfig(redisProperties);        RedissonClient redissonClient = Redisson.create(config);        log.info("==============创建redisClient{ }版成功:{ }==================", isCluster ? "集群": "单机", redisHostsStr);        return redissonClient;    }    private Config buildConfig(CustomRedisProperties redisProperties) {         final Config config = new Config();        // 根据逗号切割host列表        Set<String> hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());        if (CollectionUtils.isEmpty(hosts)){             throw new RuntimeException("redis host address cannot be empty");        }        // 只有一个host, 表示是单机host        if (hosts.size() == 1){             String hostPort = hosts.stream().findFirst().get();            redisHostsStr = "redis://" + hostPort.trim();            config.useSingleServer()                    .setAddress(redisHostsStr)                    .setDatabase(redisProperties.getDatabase())                    .setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())            ;            isCluster = false;        }else {             // 集群处理            String[] redisHosts = new String[hosts.size()];            int i = 0;            for (String host : hosts) {                 String[] split = host.split(":");                if (split.length != 2){                     throw new RuntimeException("host or port err");                }                redisHosts[i] = "redis://" + host.trim();                i++;            }            redisHostsStr = String.join(",", redisHosts);            // 配置集群            config.useClusterServers()                    .addNodeAddress(redisHosts)                    .setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())                    // 解决Not all slots covered! Only 10922 slots are available                    .setCheckSlotsCoverage(false);            isCluster = true;        }        return config;    }}

我们配置时需要优先配置好redis-cache-spring-boot-starter,使用@AutoConfigureBefore({ CustomRedisConfig.class, CacheNullValuesHandle.class})

直接使用,不再重复造轮子,然后我们根据自定义属性配置文件CustomRedisProperties来创建RedissonClient的Bean。

编写META-INF/spring.factories进行自动配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.itdl.lock.config.RedisLockConfig

在测试模块缓存service添加分布式锁

@Cacheable(cacheNames = "demo2#3", key = "#id")public TestEntity getById2(Long id){     // 创建分布式锁    RLock lock = redissonClient.getLock("demo2_lock");    // 加锁        lock.lock(10, TimeUnit.SECONDS);    if (id > 1000){         log.info("id={ }没有查询到数据,返回空值", id);        return null;    }    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);    log.info("模拟查询数据库:{ }", testEntity);    // 释放锁    lock.unlock();    return testEntity;}

我们这里的@Cacheable没有加sync=true, 此时并发请求会存在线程安全问题,但是我们在方法体局部添加了分布式锁,因此我们的程序会按照顺序执行。

如果我们的参数被定死了,最终请求会被先存储到缓存,所以后续的查询就会走缓存,这能很好的测试分布式锁的效果。

编写测试程序

@SpringBootTestpublic class TestRedisLockRunner6 {     @Autowired    private MyTestService myTestService;    // 创建一个固定线程池    private ExecutorService executorService = Executors.newFixedThreadPool(16);    /**     * 多线程访问请求,测试切面的线程安全性     */    @Test    public void testMultiMyTestService() throws InterruptedException {         for (int i = 0; i < 100; i++) {             executorService.submit(() -> {                 // 每次查询同一个参数                TestEntity t1 = myTestService.getById2(1L);            });        }        // 主线程休息10秒种        Thread.sleep(10000);    }}

我们可以看到,结果并没有符合我们预期,但是又部分符合我们预期,为什么呢?

因为我们的@Cacheable是存在线程安全问题的,因为它先查询缓存这个操作存在并发问题,查询时就同时有N个请求进入@Cacheable, 并且都查询没有缓存。

然后同时执行方法体,但方法体加了分布式锁,所以排队进行处理,因此序号有序。

但打印数量不足总数,是因为这一批次没有全部到达@Cacheable,而是执行完毕之后才将缓存回填,所以后续的请求就是走缓存了。

解决方案:我们加上sync=true之后就能实现,只查询一次数据库,就可以回填缓存了。如果我们去掉@Cacheable注解,则会每一次都查询数据库,但是时按照顺序执行的。

加上sync=true测试

效果达到了我们的预期,继续看一下去掉@Cacheable注解的情况。

去掉@Cacheable注解测试

我们的分布式锁功能是没有问题的,但是每次我们都需要执行getLock(), lock.lock(), lock.unlock(),是不是很麻烦,能不能一个注解搞定?

当然是可以的。

4.2、封装注解版分布式锁

编写@RedisLock注解

/** * 自定义Redis分布式锁 */@Target({ ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface RedisLock {     /**分布式锁的名称,支持el表达式*/    String lockName() default "";    /**锁类型 默认为可重入锁*/    LockType lockType() default REENTRANT_LOCK;    /**获取锁等待时间,默认60秒*/    long waitTime() default 30000L;    /** 锁自动释放时间,默认60秒*/    long leaseTime() default 60000L;    /**     * 被加锁方法执行完是否立即释放锁     */    boolean immediatelyUnLock() default true;    /** 时间单位, 默认毫秒*/    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;}

编写分布式锁切面RedisLockAop

/** * Redis分布式锁的切面逻辑实现 */@ConditionalOnProperty(value = "redis.enable", havingValue = "true")    // 开启redis.enable=true时生效@AutoConfigureBefore(RedisLockConfig.class)@Aspect@Configuration@Slf4jpublic class RedisLockAop {     @Resource    private RedissonClient redissonClient;    /**     * 切点     */    @Pointcut("@annotation(com.itdl.lock.anno.RedisLock)")    public void pointcut(){     }        /**     * 环绕通知 注解针对的是方法,这里切点也获取方法进行处理就可以了     */    @Around("pointcut()")    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {         // 获取方法        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();        // 获取方法上的分布式锁注解        RedisLock redisLock = method.getDeclaredAnnotation(RedisLock.class);        // 获取注解的参数        // 锁名称        String lockName = redisLock.lockName();        // 锁类型        LockType lockType = redisLock.lockType();        // 获取RedissonClient的Lock        RLock lock = getRLock(lockName, lockType, redisLock);        //获取到锁后, 开始执行方法,执行完毕后释放锁        try {             log.debug("=========>>>获取锁成功, 即将执行业务逻辑:{ }", lockName);            Object proceed = joinPoint.proceed();            // 释放锁            if (redisLock.immediatelyUnLock()) {                 //是否立即释放锁                lock.unlock();            }            log.debug("=========>>>获取锁成功且执行业务逻辑成功:{ }", lockName);            return proceed;        } catch (Exception e) {             log.error("=========>>>获取锁成功但执行业务逻辑失败:{ }", lockName);            e.printStackTrace();            throw new RedisLockException(LockErrCode.EXEC_BUSINESS_ERR);        }finally {             // 查询当前线程是否保持此锁定 被锁定则解锁            lock.unlock();            log.debug("=========>>>释放锁成功:{ }", lockName);        }    }    /**     * 根据锁名称和类型创建锁     * @param lockName 锁名称     * @param lockType 锁类型     * @return 锁     */    private RLock getRLock(String lockName, LockType lockType, RedisLock redisLock) throws InterruptedException {         RLock lock;        switch (lockType){             case FAIR_LOCK:                lock = redissonClient.getFairLock(lockName);                break;            case READ_LOCK:                lock = redissonClient.getReadWriteLock(lockName).readLock();                break;            case WRITE_LOCK:                lock = redissonClient.getReadWriteLock(lockName).writeLock();                break;            default:                // 默认加可重入锁,也就是普通的分布式锁                lock = redissonClient.getLock(lockName);                break;        }        // 首先尝试获取锁,如果在规定时间内没有获取到锁,则调用lock等待锁,直到获取锁为止        if (lock.tryLock()) {             lock.tryLock(redisLock.waitTime(), redisLock.leaseTime(), redisLock.timeUnit());        }else {             // 如果leaseTime>0,规定时间内获取锁,超时则自动释放锁            long leaseTime = redisLock.leaseTime();            if (leaseTime > 0) {                 lock.lock(redisLock.leaseTime(), redisLock.timeUnit());            } else {                 // 自动释放锁时间设置为0或者负数,则加锁不设置超时时间                lock.lock();            }        }        return lock;    }}

话不多说,封装的逻辑已经在注释中写的很清晰了。

将切面也放入自动配置spring.factories中

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.itdl.lock.config.RedisLockConfig,\com.itdl.lock.anno.RedisLockAop

测试注解版分布式锁

@RedisLock(lockName = "demo4_lock")public TestEntity getById4(Long id) throws InterruptedException {     index++;    log.info("current index is : { }", index);    Thread.sleep(new Random().nextInt(10) * 100);    TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);    log.info("模拟查询数据库:{ }", testEntity);    return testEntity;}

可以看到,我们就是一个注解分布式锁的效果,而分布式锁与缓存注解通常不会一起使用,因为一般会在存在事务问题的地方我们会使用锁,在多个JMV操作同一条数据做写操作时需要加分布式锁。

编写测试程序

@SpringBootTestpublic class TestRedisLockRunner6 {     @Autowired    private MyTestService myTestService;    // 创建一个固定线程池    private ExecutorService executorService = Executors.newFixedThreadPool(16);    /**     * 多线程访问请求,测试切面的线程安全性     */    @Test    public void testMultiMyTestService() throws InterruptedException {         for (int i = 0; i < 100; i++) {             executorService.submit(() -> {                 try {                     TestEntity t1 = myTestService.getById4(1L);                } catch (InterruptedException e) {                     e.printStackTrace();                }            });        }        // 主线程休息10秒种        Thread.sleep(60000);    }}

测试结果

5、小结

我们将分布式锁基于缓存扩展了一版,也就是说本starter即有分布式缓存功能,又有分布式锁功能。

而注解版的分布式锁能够解决大多数场景的并核问题,小粒度的Lock锁方式补全其他场景。

将两者封装成为一个starter,我们就可以很方便的使用分布式锁功能,引入相关包即可,开箱即用。

责任编辑:武晓燕 来源: 今日头条 Redisson分布式缓存

(责任编辑:知识)

    推荐文章
    热点阅读