本篇文章基于redisson-3.17.6版本源码进行分析
@Test public void testRSemaphore() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redissonClient = Redisson.create(config); RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore"); // 设置5个许可,模拟五个停车位 rSemaphore.trySetPermits(5); // 创建10个线程,模拟10辆车过来停车 for (int i = 1; i <= 10; i++) { new Thread(() -> { try { rSemaphore.acquire(); System.out.println(Thread.currentThread().getName() + "进入停车场..."); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)); System.out.println(Thread.currentThread().getName() + "离开停车场..."); rSemaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }, "A" + i).start(); } try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } }
初始化RSemaphore,需要调用trySetPermits()设置许可数量:
/** * 尝试设置许可数量,设置成功,返回true,否则返回false */ boolean trySetPermits(int permits);
trySetPermits()内部调用了trySetPermitsAsync():
// 异步设置许可 @Override public RFuture<Boolean> trySetPermitsAsync(int permits) { RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 判断分布式信号量的key是否存在,如果不存在,才设置 "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " // set "semaphore" permits // 使用String数据结构设置信号量的许可数 + "redis.call('set', KEYS[1], ARGV[1]); " // 发布一条消息到redisson_sc:{semaphore}通道 + "redis.call('publish', KEYS[2], ARGV[1]); " // 设置成功,返回1 + "return 1;" + "end;" // 否则返回0 + "return 0;", Arrays.asList(getRawName(), getChannelName()), permits); if (log.isDebugEnabled()) { future.thenAccept(r -> { if (r) { log.debug("permits set, permits: {}, name: {}", permits, getName()); } else { log.debug("unable to set permits, permits: {}, name: {}", permits, getName()); } }); } return future; }
可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:
参数说明:
总结设置许可执行流程为:
许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。
public void acquire() throws InterruptedException { acquire(1); } public void acquire(int permits) throws InterruptedException { // 尝试获取锁成功,直接返回 if (tryAcquire(permits)) { return; } // 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息 CompletableFuture<RedissonLockEntry> future = subscribe(); semaphorePubSub.timeout(future); RedissonLockEntry entry = commandExecutor.getInterrupted(future); try { // 不断循环尝试获取许可 while (true) { if (tryAcquire(permits)) { return; } entry.getLatch().acquire(); } } finally { // 取消订阅 unsubscribe(entry); } // get(acquireAsync(permits)); }
可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。
我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:
// 异步获取许可 @Override public RFuture<Boolean> tryAcquireAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return new CompletableFutureWrapper<>(true); } return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 获取当前剩余的许可数量 "local value = redis.call('get', KEYS[1]); " + // 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量 "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + // 通过decrby减少剩余可用许可 "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + // 返回1 "return 1; " + "end; " + // 其它情况,返回0 "return 0;", Collections.<Object>singletonList(getRawName()), permits); }
从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;
总结加锁执行流程为:
通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:
public RFuture<Void> releaseAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return new CompletableFutureWrapper<>((Void) null); } RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, // 通过incrby增加许可数量 "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + // 发布一条消息到redisson_sc:{semaphore}中 "redis.call('publish', KEYS[2], value); ", Arrays.asList(getRawName(), getChannelName()), permits); if (log.isDebugEnabled()) { future.thenAccept(o -> { log.debug("released, permits: {}, name: {}", permits, getName()); }); } return future; }
到此这篇关于Redisson分布式信号量RSemaphore的使用超详细讲解的文章就介绍到这了,更多相关Redisson RSemaphore内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
编程 | 2023-02-24 21:36
编程 | 2023-02-21 12:51
编程 | 2023-02-21 12:47
编程 | 2023-02-21 00:15
编程 | 2023-02-21 00:08
编程 | 2023-02-20 21:46
编程 | 2023-02-20 21:42
编程 | 2023-02-20 21:36
编程 | 2023-02-20 21:32
编程 | 2023-02-20 18:12
网友评论