在单体的应用开发场景中涉及并发同步的时候,大家往往采用 Synchronized(同步)或者同一个JVM内Lock机制来解决多线程间的同步问题。在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的锁就是分布式锁。
超卖问题复现 现象 存在如下的几张表:
商品表
订单表
订单item表
:::danger
:::
错误案例一:数据库 update 相互覆盖 直接在内存中判断是否有库存,计算扣减之后的值更新数据库,并发的情况下会导致相互覆盖发生:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Transactional(rollbackFor = Exception.class) public Long createOrder () throws Exception { Product product = productMapper.selectByPrimaryKey(purchaseProductId); Integer currentCount = product.getCount(); if (purchaseProductNum > currentCount) { throw new Exception ("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买" ); } Integer leftCount = currentCount - purchaseProductNum; product.setCount(leftCount); product.setGmtModified(new Date ()); productMapper.updateByPrimaryKeySelective(product); Order order = new Order (); orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem (); orderItem.setOrderId(order.getId()); return order.getId(); }
错误案例二:扣减串行执行,但是库存被扣减为负数 在 SQL 中加入运算避免值的相互覆盖,但是库存的数量变为负数,因为校验库存是否足够还是在内存中执行的,并发情况下都会读到有库存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Transactional(rollbackFor = Exception.class) public Long createOrder () throws Exception { Product product = productMapper.selectByPrimaryKey(purchaseProductId); Integer currentCount = product.getCount(); if (purchaseProductNum > currentCount) { throw new Exception ("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买" ); } productMapper.updateProductCount(purchaseProductNum,new Date (),product.getId()); Order order = new Order (); orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem (); orderItem.setOrderId(order.getId()); return order.getId(); }
错误案例三:使用 synchronized 实现内存中串行校验,但是依旧扣减为负数 因为我们使用的是事务的注解,synchronized 加在方法上,方法执行结束的时候锁就会释放,此时的事务还没有提交,另一个线程拿到这把锁之后就会有一次扣减,导致负数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Transactional(rollbackFor = Exception.class) public synchronized Long createOrder () throws Exception { Product product = productMapper.selectByPrimaryKey(purchaseProductId); Integer currentCount = product.getCount(); if (purchaseProductNum > currentCount) { throw new Exception ("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买" ); } productMapper.updateProductCount(purchaseProductNum,new Date (),product.getId()); Order order = new Order (); orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem (); orderItem.setOrderId(order.getId()); return order.getId(); }
解决办法 从上面造成问题的原因来看,只要是扣减库存的动作,不是原子性的。导致多个线程。
单体应用:使用 **本地锁 + 数据库中的行锁**** 解决**
分布式应用:
使用数据库中的乐观锁,加一个 version 字段,利用CAS来实现,会导致大量的 update 失败
使用数据库维护一张锁的表 + 悲观锁 select,使用 select for update 实现
使用 Redis 的 setNX 实现分布式锁
使用 zookeeper 的 watcher + 有序临时节点 来实现可阻塞的分布式锁
使用 Redisson 框架内的分布式锁 来实现
使用curator 框架内的分布式锁来实现
单体应用解决超卖的问题 正确示例:将事务包含在锁的控制范围内 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public synchronized Long createOrder () throws Exception { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); Product product = productMapper.selectByPrimaryKey(purchaseProductId); if (product == null ) { platformTransactionManager.rollback(transaction1); throw new Exception ("购买商品:" + purchaseProductId + "不存在" ); } Integer currentCount = product.getCount(); if (purchaseProductNum > currentCount) { platformTransactionManager.rollback(transaction1); throw new Exception ("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买" ); } productMapper.updateProductCount(purchaseProductNum, new Date (), product.getId()); Order order = new Order (); orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem (); orderItem.setOrderId(order.getId()); return order.getId(); platformTransactionManager.commit(transaction1); }
正确示例:使用synchronized的代码块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public Long createOrder () throws Exception { Product product = null ; synchronized (DBOrderService2.class) { TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); product = productMapper.selectByPrimaryKey(purchaseProductId); if (product == null ) { platformTransactionManager.rollback(transaction1); throw new Exception ("购买商品:" + purchaseProductId + "不存在" ); } Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount); if (purchaseProductNum > currentCount) { platformTransactionManager.rollback(transaction1); throw new Exception ("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买" ); } productMapper.updateProductCount(purchaseProductNum, new Date (), product.getId()); platformTransactionManager.commit(transaction1); } TransactionStatus transaction2 = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order (); orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem (); orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction2); return order.getId();
正确示例:使用 Lock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private Lock lock = new ReentrantLock ();public Long createOrder () throws Exception{ Product product = null ; lock.lock(); TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition); try { product = productMapper.selectByPrimaryKey(purchaseProductId); if (product==null ){ throw new Exception ("购买商品:" +purchaseProductId+"不存在" ); } Integer currentCount = product.getCount(); System.out.println(Thread.currentThread().getName()+"库存数:" +currentCount); if (purchaseProductNum > currentCount){ throw new Exception ("商品" +purchaseProductId+"仅剩" +currentCount+"件,无法购买" ); } productMapper.updateProductCount(purchaseProductNum,new Date (),product.getId()); platformTransactionManager.commit(transaction1); } catch (Exception e) { platformTransactionManager.rollback(transaction1); } finally { lock.unlock(); } TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); Order order = new Order (); orderMapper.insertSelective(order); OrderItem orderItem = new OrderItem (); orderItemMapper.insertSelective(orderItem); platformTransactionManager.commit(transaction); return order.getId(); }
常见分布式锁的使用 上面使用的方法只能解决单体项目,当部署多台机器的时候就会失效,因为锁本身就是单机的锁,所以需要使用分布式锁来实现
数据库乐观锁 数据库中的乐观锁,加一个 version 字段,利用CAS来实现,乐观锁的方式支持多台机器并发安全。
但是并发量大的时候会导致大量的 update 失败
数据库分布式锁 db操作性能较差,并且有锁表的风险,一般不考虑。
简单的数据库锁
select for update 直接在数据库新建一张表:
锁的code预先写到数据库中,抢锁的时候,使用 select for update 查询锁对应的key,也就是这里的code,阻塞就说明别人在使用锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Transactional(rollbackFor = Exception.class) public String singleLock () throws Exception { log.info("我进入了方法!" ); DistributeLock distributeLock = distributeLockMapper. selectDistributeLock("demo" ); if (distributeLock==null ) { throw new Exception ("分布式锁找不到" ); } log.info("我进入了锁!" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "我已经执行完成!" ; }
1 2 3 4 5 <select id ="selectDistributeLock" resultType ="com.deltaqin.distribute.model.DistributeLock" > select * from distribute_lock where businessCode = #{businessCode,jdbcType=VARCHAR} for update </select >
使用唯一键作为限制,插入一条数据,其他待执行的SQL就会失败,当数据删除之后再去获取锁 ,这是利用了唯一索引的排他性。
insert lock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Autowired private MethodlockMapper methodlockMapper;@Override public boolean tryLock () { try { methodlockMapper.insert(new Methodlock ("lock" )); }catch (Exception e){ return false ; } return true ; } @Override public void waitLock () { try { Thread.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void unlock () { methodlockMapper.deleteByMethodlock("lock" ); System.out.println("-------释放锁------" ); }
Redis setNx Redis 原生支持的,保证只有一个会话可以设置成功,因为Redis自己就是单线程串行执行的。
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency >
1 spring.redis.host=localhost
封装一个锁对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Slf4j public class RedisLock implements AutoCloseable { private RedisTemplate redisTemplate; private String key; private String value; private int expireTime; public RedisLock (RedisTemplate redisTemplate,String key,int expireTime) { this .redisTemplate = redisTemplate; this .key = key; this .expireTime=expireTime; this .value = UUID.randomUUID().toString(); } @Override public void close () throws Exception { unLock(); } public boolean getLock () { RedisCallback<Boolean> redisCallback = connection -> { RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent(); Expiration expiration = Expiration.seconds(expireTime); byte [] redisKey = redisTemplate.getKeySerializer().serialize(key); byte [] redisValue = redisTemplate.getValueSerializer().serialize(value); Boolean result = connection.set(redisKey, redisValue, expiration, setOption); return result; }; Boolean lock = (Boolean)redisTemplate.execute(redisCallback); return lock; } public boolean unLock () { String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end" ; RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class); List<String> keys = Arrays.asList(key); Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value); log.info("释放锁的结果:" +result); return result; } }
每次获取的时候,自己线程需要 new 对应的 RedisLock:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public String redisLock () { log.info("我进入了方法!" ); try (RedisLock redisLock = new RedisLock (redisTemplate,"redisKey" ,30 )){ if (redisLock.getLock()) { log.info("我进入了锁!!" ); Thread.sleep(15000 ); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成" ); return "方法执行完成" ; }
zookeeper 瞬时znode节点 + watcher监听机制 临时节点具备数据自动删除的功能。当 client 与 ZooKeeper 连接和 session 断掉时,相应的临时节点就会被删除。 zk 有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于 zk 的瞬时有序节点实现分布式锁:
多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁。
其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点
下一个序号的线程得到通知,继续执行
以此类推,创建节点的时候,就确认了线程执行的顺序。
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.4.14</version > <exclusions > <exclusion > <groupId > org.slf4j</groupId > <artifactId > slf4j-log4j12</artifactId > </exclusion > </exclusions > </dependency >
zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren 三个方法都可以添加 watcher ,也就是在调用方法的时候传递 true 就是添加监听。注意这里Lock 实现了 Watcher 和 AutoCloseable:
当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 @Slf4j public class ZkLock implements AutoCloseable , Watcher { private ZooKeeper zooKeeper; private String znode; public ZkLock () throws IOException { this .zooKeeper = new ZooKeeper ("localhost:2181" , 10000 ,this ); } public boolean getLock (String businessCode) { try { Stat stat = zooKeeper.exists("/" + businessCode, false ); if (stat==null ){ zooKeeper.create("/" + businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_" , businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false ); Collections.sort(childrenNodes); String firstNode = childrenNodes.get(0 ); if (znode.endsWith(firstNode)){ return true ; } String lastNode = firstNode; for (String node:childrenNodes){ if (znode.endsWith(node)){ zooKeeper.exists("/" +businessCode+"/" +lastNode,true ); break ; }else { lastNode = node; } } synchronized (this ){ wait(); } return true ; } catch (Exception e) { e.printStackTrace(); } return false ; } @Override public void close () throws Exception { zooKeeper.delete(znode,-1 ); zooKeeper.close(); log.info("我已经释放了锁!" ); } @Override public void process (WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted){ synchronized (this ){ notify(); } } } }
zookeeper curator 在实际的开发中,不建议去自己“重复造轮子”,而建议直接使用Curator客户端中的各种官方实现的分布式锁,例如其中的InterProcessMutex可重入 锁。
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 4.2.0</version > <exclusions > <exclusion > <artifactId > slf4j-api</artifactId > <groupId > org.slf4j</groupId > </exclusion > </exclusions > </dependency >
1 2 3 4 5 6 7 @Bean(initMethod="start",destroyMethod = "close") public CuratorFramework getCuratorFramework () { RetryPolicy retryPolicy = new ExponentialBackoffRetry (1000 , 3 ); CuratorFramework client = CuratorFrameworkFactory. newClient("localhost:2181" , retryPolicy); return client; }
框架已经实现了分布式锁。zk 的 Java 客户端升级版。使用的时候直接指定重试的策略就可以。
官网中分布式锁的实现是在 curator-recipes 依赖中,不要引用错了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Autowired private CuratorFramework client;@Test public void testCuratorLock () { InterProcessMutex lock = new InterProcessMutex (client, "/order" ); try { if ( lock.acquire(30 , TimeUnit.SECONDS) ) { try { log.info("我获得了锁!!!" ); } finally { lock.release(); } } } catch (Exception e) { e.printStackTrace(); } client.close(); }
Redission 重新实现了 Java 并发包下处理并发的类,让其可以跨 JVM 使用。例如 CHM 等。
非 SpringBoot项目引入 https://redisson.org/
引入 Redisson 的依赖,然后配置对应的 XML 即可
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.11.2</version > <exclusions > <exclusion > <artifactId > slf4j-api</artifactId > <groupId > org.slf4j</groupId > </exclusion > </exclusions > </dependency >
编写相应的 redisson.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:context ="http://www.springframework.org/schema/context" xmlns:redisson ="http://redisson.org/schema/redisson" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://redisson.org/schema/redisson http://redisson.org/schema/redisson/redisson.xsd " > <redisson:client > <redisson:single-server address ="redis://127.0.0.1:6379" /> </redisson:client > </beans >
配置对应@ImportResource("classpath*:redisson.xml")资源文件。
SpringBoot项目引入 或者直接使用 springBoot 的 starter 就可以。
https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson-spring-boot-starter</artifactId > <version > 3.19.1</version > </dependency >
修改 application.properties 即可:#spring.redis.host=
设置配置类 1 2 3 4 5 6 @Bean public RedissonClient getRedissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6379" ); return Redisson.create(config); }
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Test public void testRedissonLock () { RLock rLock = redisson.getLock("order" ); try { rLock.lock(30 , TimeUnit.SECONDS); log.info("我获得了锁!!!" ); Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("我释放了锁!!" ); rLock.unlock(); } }
Etcd 参考下面文章,普通项目不会为了一把锁引入etcd,不再赘述:
https://time.geekbang.org/column/article/350285
常见分布式锁的原理 Redisson Redis 2.6 之后才可以执行 lua 脚本,比起管道而言,这是原子性的,模拟一个商品减库存的原子操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 jedis.set("product_stock_10016" , "15" ); String script = " local count = redis.call('get', KEYS[1]) " + " local a = tonumber(count) " + " local b = tonumber(ARGV[1]) " + " if a >= b then " + " redis.call('set', KEYS[1], a-b) " + " return 1 " + " end " + " return 0 " ; Object obj = jedis.eval(script, Arrays.asList("product_stock_10016" ), Arrays.asList("10" )); System.out.println(obj);
尝试加锁的逻辑
上面的 org.redisson.RedissonLock#lock() 通过调用自己方法内部的lock方法的 org.redisson.RedissonLock#tryAcquire 方法。之后调用 org.redisson.RedissonLock#tryAcquireAsync:
首先调用内部的 org.redisson.RedissonLock#tryLockInnerAsync:设置对应的分布式锁
到这里获取锁的逻辑就结束了,如果这里没有获取到,在Future 的回调里面就会直接 return,会在外层有一个 while true 的循环,订阅释放锁的消息准备被唤醒。如果说加锁成功,就开始执行锁续命逻辑。
锁续命逻辑 lua 脚本最后是以毫秒为单位返回 key 的剩余过期时间。成功加锁之后 org.redisson.RedissonLock#scheduleExpirationRenewal中将会调用 org.redisson.RedissonLock#renewExpiration,这个方法内部就有锁续命的逻辑,是一个定时任务,等 10s 执行。
执行的时候尝试执行的续命逻辑使用的是 Lua 脚本,当前的锁有值,就续命,没有就直接返回0:
返回0之后外层会判断,延时成功就会再次调用自己,否则延时调用结束,不再为当前的锁续命。所以这里的续命不是一个真正的定时,而是循环调用自己的延时任务。
循环间隔抢锁机制 如果一开始就加锁成功就直接返回。
如果一开始加锁失败,没抢到锁的线程就会在 while 循环中尝试加锁,加锁成功就结束循环,否则等待当前锁的超时时间之后再次尝试加锁。所以实现逻辑默认是非公平锁:
里面有一个 subscribe 的逻辑,会监听对应加锁的key,当锁释放之后publish 对应的消息,此时如果没有到达对应的锁的超时时间,也会尝试获取锁,避免时间浪费。
释放锁和唤醒其他线程的逻辑 前面没有抢到锁的线程会监听对应的 queue,后面抢到锁的线程释放锁的时候会发送一个消息。
订阅的时候指定收到消息时候的逻辑:会唤醒阻塞之后执行 while 循环
重入锁的逻辑 存在对应的锁,就对对应的 hash 结构的 value 直接 + 1,和 Java 重入锁的逻辑是一致的。
RedLock解决非单体项目的 Redis主从架构的锁失效 https://redis.io/docs/manual/patterns/distributed-locks/
查看 Redis 官方文档,对于单节点的Redis ,使用 setnx 和 lua del 删除分布式锁是足够的,但是主从架构的场景下:锁先加在一个 master 节点上,默认是异步同步到从节点,此时master 挂了会选择slave为master,此时又可以加锁,就会导致超卖。
但是如果使用 zookeeper 来实现的话,由于zk是CP的,所以CP不存在这样的问题。
:::dangerRedis 文档中给出了 RedLock 的解决办法,使用 redLock 真的可以解决吗?
:::
RedLock 原理 基于客户端的实现,是基于多个独立的 Redis Master 节点的一种实现(一般为 5)。client 依次向各个节点申请锁,若能从多数个节点中申请锁成功并满足一些条件限制,那么 client 就能获取锁成功。它通过独立的 N 个 Master 节点,避免了使用主备异步复制协议的缺陷,只要多数 Redis 节点正常就能正常工作,显著提升了分布式锁的安全性、可用性。
注意图中所有的节点都是 master 节点。加锁超过半数成功,就认为是成功。具体流程:
获取锁
获取当前时间T1,作为后续的计时依据;
按顺序地,依次向5个独立的节点来尝试获取锁 SET resource_name my_random_value NX PX 30000
计算获取锁总共花了多少时间,判断获取锁成功与否
时间:T2-T1
多数节点的锁(N/2+1)
当获取锁成功后的有效时间,要从初始的时间减去第三步算出来的消耗时间
如果没能获取锁成功,尽快释放掉锁。
释放锁
向所有节点发起释放锁的操作,不管这些节点有没有成功设置过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public String redlock () { String lockKey = "product_001" ; RLock lock1 = redisson.getLock(lockKey); RLock lock2 = redisson.getLock(lockKey); RLock lock3 = redisson.getLock(lockKey); RedissonRedLock redLock = new RedissonRedLock (lock1, lock2, lock3); try { boolean res = redLock.tryLock(10 , 30 , TimeUnit.SECONDS); if (res) { } } catch (Exception e) { throw new RuntimeException ("lock fail" ); } finally { redLock.unlock(); } return "end" ; }
但是,它的实现建立在一个不安全的系统模型上的,它依赖系统时间,当时钟发生跳跃时,也可能会出现安全性问题。分布式存储专家 Martin 对RedLock 的分析文章 ,Redis 作者的也专门写了一篇文章进行了 反驳 。
RedLock 问题一:持久化机制导致重复加锁 如果是上面的架构图,一般生产都不会配置 AOF 的每一条命令都落磁盘,一般会设置一些间隔时间,比如1s,如果 **ABC** 节点加锁成功,有一个节点 C 恰好是在1S内加锁,还没有落盘,此时挂了,就会导致其他客户端通过 **CDE** 又会加锁成功。
RedLock 问题二:主从下重复加锁
除非多部署一些节点,但是这样会导致加锁时间变长,这样比较下来效果就不如 zk 了。
RedLock 问题三:时钟跳跃导致重复加锁 C 节点发生了时钟跳跃,导致加上的锁没有到达实际的超时时间,就被误以为超时而释放,此时其他客户端就可以重复加锁了。
Curator InterProcessMutex 可重入锁的分析
业务中使用分布式锁的注意点
获取的锁要设置有效期,假设我们未设置 key 自动过期时间,在 Set key value NX 后,如果程序 crash 或者发生网络分区后无法与 Redis 节点通信,毫无疑问其他 client 将永远无法获得锁。这将导致死锁,服务出现中断。
SETNX 和 EXPIRE 命令去设置 key 和过期时间,这也是不正确的,因为你无法保证 SETNX 和 EXPIRE 命令的原子性。
自己使用 setnx 实现Redis锁的时候,注意并发情况下不要释放掉别人的锁(业务逻辑执行时间超过锁的过期时间),导致恶性循环。一般:
加锁的时候需要指定value的内容是当前进程中的当前线程的唯一标记,不要使用线程ID作为当前线程的锁的标记,因为不同实例上的线程ID可能是一样的。
释放锁的逻辑会写在 finally ,释放锁时候要判断锁对应的value,而且要使用 lua 脚本实现原子 del 操作。因为 if 逻辑判断完之后也可能失效导致删除别人的锁
针对扣减库存这个逻辑,lua 脚本里面 实现 Redis 比较库存、扣减库存操作的原子性。通过判断Redis Decr 命令的返回值即可。此命令会返回扣减后的最新库存,若小于 0 则表示超卖。
自己实现分布式锁的坑 setnx 不关心锁的顺序导致删除别人的锁 锁失效之后,别人加锁成功,自己把别人的锁删了。
我们无法预估程序执行需要的锁的时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public String deductStock () { String lockKey = "lock:product_101" ; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deltaqin" ); stringRedisTemplate.expire(lockKey, 10 , TimeUnit.SECONDS); try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock" )); if (stock > 0 ) { int realStock = stock - 1 ; stringRedisTemplate.opsForValue().set("stock" , realStock + "" ); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足" ); } } finally { stringRedisTemplate.delete(lockKey); } return "end" ; }
setnx 关心锁的顺序还是删除了别人的锁 并发会卡在各种地方,卡住的时候过期了,就会删掉别人加的锁:
错误的原因还是因为解锁的逻辑不是原子性的,这里可以参考 Redisson 的解锁逻辑使用 lua 脚本实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public String deductStock () { String lockKey = "lock:product_101" ; String clientId = UUID.randomUUID().toString(); Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30 , TimeUnit.SECONDS); if (!result) { return "error_code" ; } try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock" )); if (stock > 0 ) { int realStock = stock - 1 ; stringRedisTemplate.opsForValue().set("stock" , realStock + "" ); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足" ); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end" ; }
解决办法 这种问题解决的办法就是使用锁续命,比如使用一个定时任务间隔小于锁的超时时间,每隔一段时间就给锁续命,除非线程自己主动删除。这也是 Redisson 的实现思路。
锁优化:分段加锁逻辑 针对一个商品,要开启秒杀的时候,会将商品的库存预先加载到Redis缓存中,比如有100个库存,此时可以分为5个key,每一个 key 有 20 个库存。可以把分布式锁的性能提升5倍。
例如:
product_10111_stock = 100
product_10111_stock1 = 20
product_10111_stock2 = 20
product_10111_stock3 = 20
product_10111_stock4 = 20
product_10111_stock5 = 20
请求来了可以随机可以轮询,扣减完之后就标记不要下次再分配到这个库存了
分布式锁的真相与选择 分布式锁的真相 需要满足的几个特性
互斥:不同线程、进程互斥
超时机制:临界区代码耗时导致,网络原因导致。可以使用额外的线程续命保证
完备的锁接口:阻塞的和非阻塞的接口都要有,lock 和 tryLock
可重入性:当前请求的节点+ 线程唯一标识
公平性:锁唤醒时候,按照顺序唤醒
正确性:进程内的锁不会因为报错死锁,因为崩溃的时候整个进程都会结束。但是多实例部署的时候死锁就很容易发生,如果粗暴使用超时机制解决死锁问题,就默认了下面这个假设:
锁的超时时间 >> 获取锁的时延 + 执行临界区代码的时间 + 各种进程的暂停(比如 GC)
但是这个假设其实无法保证的
:::danger将分布式锁定位为,可以容忍非常小概率互斥语义失效场景下的锁服务。一般来说,一个分布式锁服务,它的正确性要求越高,性能可能就会越低。
:::
分布式锁的选择
数据库:db操作性能较差,并且有锁表的风险,一般不考虑。
Redis:适用于并发量很大、性能要求很高 而可靠性问题可以通过其他方案去弥补的场景。
优点:易于理解
缺点:自己实现、不支持阻塞
Redisson:相对于Jedis其实更多用在分布式的场景。
Zookeeper:适用于高可靠(高可用),而并发量不是太高的场景。
优点:支持阻塞
缺点:需理解Zookeeper、程序复杂
Curator
优点:提供锁的方法
缺点:Zookeeper,强一致,慢
Etcd:安全和可靠性上有保证,但是比较重。
不推荐自己编写的分布式锁,推荐使用 Redisson 和 Curator 实现的分布式锁。