您的浏览器过于古老 & 陈旧。为了更好的访问体验, 请 升级你的浏览器
Ready 发布于2020年04月30日 02:27 最近更新于 2020年05月07日 20:17

原创 面向开发者的 Web 应用安全入门指南(9):事务并发安全

7712 次浏览 读完需要≈ 47 分钟 Java事务并发

内容目录

作为一名 Web 应用开发人员,相信大家对「事务」这个名词一定不陌生。

不过,今天我们要介绍的是许多开发人员比较陌生的事务并发安全问题。在资金/业务数据敏感型系统中,事务并发安全是一道必须迈过去的坎,也必须受到高度的重视,因为它的负面影响非常严重,更重要的是——如果我们不对事务并发作显式的特殊处理,那么事务并发的安全隐患就近乎必然地潜伏在那里!

为了聚焦问题核心,本文默认读者已经掌握了 Spring、JDBC 的基本用法。下面我们开始进入正题。

什么是事务并发安全问题?

现在,我们以实现一个简单的【商品抢购】功能业务为例:

某商品的总数为50,有100个用户几乎同时去抢购,每人单次限购1~5件。

首先,我们按照常规方式创建相关的基础类。大致的关键代码如下:

/** 用户(实体类) */
public class Member {
	/** 主键ID */
	protected Integer ID;
	/** 用户名称 */
	protected String name;

	public Member() { }

	public Member( Integer ID ) {
		this.ID = ID;
	}

	// 省略 getter / setter
}


/** 商品(实体类) */
public class Product {
	/** 主键ID */
	protected Integer ID;
	/** 商品剩余库存数量 */
	protected Integer count;

	// 省略 getter / setter	
	
	/* 对应的SQL脚本:
	CREATE TABLE `table_product` (
	`id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
	`count` int NOT NULL COMMENT '商品剩余数量',
	PRIMARY KEY (`id`)
	) ENGINE=InnoDB DEFAULT CHARSET=utf8;
	 */
}

接着,我们编写【商品抢购】的核心业务逻辑代码:

/** 与 商品 相关的业务逻辑类 */
@Service
public class ProductService {

	// 此处省略 ProductDAO 的类实现,具体的 SQL 可以参见下方业务方法中的相关注释
	@Resource
	protected ProductDAO productDAO;

	/**
	 * 模拟一次购买商品的大致业务过程,并返回本次的抢购结果
	 */
	@Transactional // 基于 Spring 注解的 AOP 事务管理
	public boolean buy( Member member, int productID, int number ) {
		// 为了确保核心逻辑清晰,省略相关的条件检查等代码细节

		// SQL: "SELECT <all columns> FROM table_product WHERE id = ?"
		Product product = productDAO.get( productID );

		int remain = product.getCount() - number;
		if (remain >= 0) {
			// 本方法可能还需要执行 生成订单、账户扣款 等操作,为了更容易复现事务并发问题,我们在此休眠10ms以模拟该操作
			try {
				Thread.sleep( 10 );
			} catch (InterruptedException e) {
				throw new IllegalStateException("模拟业务操作时出现异常!", e);
			}
			product.setCount( remain );
			
			// SQL: "UPDATE table_product SET `count` = <remain> WHERE id = <productID>"
			productDAO.updateCount( product );
			return true;
		}
		return false;
	}
}

然后,我们编写相关的单元测试代码以模拟上述抢购业务的执行过程(基于 JUnit 4.12Spring-Test 5.2.5.RELEASE):

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
@Commit
public class TransactionTest {

	static final Logger LOGGER = LoggerFactory.getLogger(TransactionTest.class);

	@Resource
	protected ProductService productService;
	
	/**
	 * 模拟事务并发冲突:商品库存总数为50,100个用户并发抢购(每人单次限购1~5件),假定服务器端有10个线程轮流处理抢购业务
	 */
	@Test
	public void testConcurrencyConflict() {
		final int productID = 1;
		final int threadCount = 10;
		final int memberCount = 100;
		final ExecutorService threadPool = Executors.newFixedThreadPool( threadCount );
		final Vector<Integer> soldCounter = new Vector<>( memberCount  );
		for (int i = 1; i <= memberCount ; i++) {
			final int memberID = i;
			final Member mockMember = new Member( memberID );
			//
			threadPool.execute(() -> {
				final int mockNumber = memberID % 5 + 1; // 抢购数量:1~5
				final String threadName = Thread.currentThread().getName();
				String result = "异常";
				try {
					result = productService.buy(mockMember, productID, mockNumber) ? "成功" : "失败";
				} catch (Exception e) {
					// ignore exception
				} finally {
					LOGGER.info("线程 {} :用户 [{}] 抢购 {} 件商品……【{}】!", threadName, mockMember.getID(), mockNumber, result);
					if ("成功".equals(result)) {
						soldCounter.add( mockNumber );
					}
				}
			});
		}
		threadPool.shutdown(); // 这里不是立即关闭线程池,而是不再接受新任务
		try {
			threadPool.awaitTermination(10, TimeUnit.MINUTES);
			Integer soldTotal = soldCounter.stream().reduce(0, Integer::sum);
			System.out.println("执行结束时的统计:成功的抢购次数 = " + soldCounter.size() + " ; 抢购到的商品总数 = " + soldTotal);
		} catch (InterruptedException e) {
			LOGGER.error("等待线程池执行完毕时出现异常", e);
		} finally {
			Product product = productService.get( productID );
			LOGGER.info("抢购完毕时的商品剩余数量:{}", product.getCount());
		}
	}
}

最后,我们执行上述单元测试代码,可以得到如下输出:

每次执行单元测试前,都需要先将数据库中对应商品的库存(count)数据重置为 50。

线程 pool-3-thread-7 :用户 [7] 抢购 3 件商品……【成功】!
线程 pool-3-thread-5 :用户 [5] 抢购 1 件商品……【成功】!
线程 pool-3-thread-4 :用户 [4] 抢购 5 件商品……【成功】!
线程 pool-3-thread-8 :用户 [8] 抢购 4 件商品……【成功】!
线程 pool-3-thread-6 :用户 [6] 抢购 2 件商品……【成功】!
线程 pool-3-thread-1 :用户 [1] 抢购 2 件商品……【成功】!
线程 pool-3-thread-2 :用户 [2] 抢购 3 件商品……【成功】!
线程 pool-3-thread-9 :用户 [9] 抢购 5 件商品……【成功】!
线程 pool-3-thread-3 :用户 [3] 抢购 4 件商品……【成功】!
线程 pool-3-thread-10 :用户 [10] 抢购 1 件商品……【成功】!
线程 pool-3-thread-7 :用户 [11] 抢购 2 件商品……【成功】!
线程 pool-3-thread-8 :用户 [13] 抢购 4 件商品……【成功】!
线程 pool-3-thread-3 :用户 [19] 抢购 5 件商品……【成功】!
线程 pool-3-thread-6 :用户 [18] 抢购 4 件商品……【成功】!
(……此处省略一部分冗余的输出……)
线程 pool-3-thread-5 :用户 [92] 抢购 3 件商品……【成功】!
线程 pool-3-thread-3 :用户 [95] 抢购 1 件商品……【成功】!
线程 pool-3-thread-1 :用户 [96] 抢购 2 件商品……【成功】!
线程 pool-3-thread-9 :用户 [97] 抢购 3 件商品……【成功】!
线程 pool-3-thread-10 :用户 [98] 抢购 4 件商品……【成功】!
线程 pool-3-thread-6 :用户 [99] 抢购 5 件商品……【成功】!
线程 pool-3-thread-2 :用户 [100] 抢购 1 件商品……【成功】!
执行结束时的统计:成功的抢购次数 = 100 ; 抢购到的商品总数 = 300
抢购完毕时的商品剩余数量:22

通过分析上面的输出内容,我们就能明显意识到其中存在非常严重的业务问题:明明总共只有 50 件商品,让 100 个用户来抢购,居然所有人都抢购成功了,实际抢购到的商品总数(300)也明显超过了总库存 50!经过这一番百人抢购后,商品的库存居然还剩有 22 ?!

再回头看看我们在ProductService.buy()中编写的代码:代码中明明存在对商品库存的相关判断呀!

每次执行该方法,我们都会从数据库获取最新的商品数据,并且检查商品的库存数量是否 ≥ 用户本次的抢购数量,只有满足该条件才能抢购成功。抢购成功后,我们也会将扣除本次抢购数量后的商品库存更新到数据库。

前述的整个逻辑貌似并没有问题,那怎么会在实际运行时出现这么严重的业务异常呢???

我们可以简化模拟出一个可能存在的代码执行场景来探究问题所在:

// 前置环境说明:商品库存=50,线程(用户) A 准备抢购 1 件商品,线程(用户) B 准备抢购 2 件商品
// 以下简称 ProductService.java 第 17 行代码【 Product product = productDAO.get( productID ); 】 为 代码点①
// 以下简称 ProductService.java 第 30 行代码【 productDAO.updateCount( product ); 】为 代码点②

1、线程 A 先执行到 代码点①,此时获取到的商品库存 = 50
2、线程 B 执行到 代码点②,此时获取到的商品库存也 = 50
3、线程 A 先执行到 代码点①,此时 remain = 50 - 1 = 49,更新到数据库的商品库存 = 49
4、线程 B 执行到 代码点②,此时 remain = 50 - 2 = 48,更新到数据库的商品库存 = 48
5、以上两个线程的代码都执行完毕时,数据库中的商品库存 = 48

通过前述模拟场景,我们发现:在商品库存 = 50 时,用户 A 抢购 1 件商品成功,用户 B 抢购 2 件商品成功,正确的商品库存应该 = 47,但是我们最终更新到数据库的库存却为 48——这就是问题所在!

如何解决事务并发冲突?

从上面我们可以看出:事务并发冲突 和 线程并发冲突 非常类似,ProductService.buy()方法中 对数据库的商品数据进行读写 应当被视为一个原子性操作,以免外部的数据争用影响数据的一致性。

那么,我们该如何解决 事务并发冲突 这一非常严重的问题呢?

方案①:使用 synchronized 关键字来解决?

既然前面我们提到了 事务并发冲突 和线程并发冲突 非常类似,那么很自然地就能联想到:我们是否可以像使用synchronized关键字来解决 线程并发冲突 一样,来解决 事务并发冲突 呢?我们给ProductService.buy()方法加上synchronized关键字修饰,能否解决事务并发冲突呢?

如果我们只有单台应用服务器,理论上貌似是可行的。那么,我们不妨一试:

// 此处省略一切细节,以突出为 buy() 方法添加了 synchronized 修饰关键字
@Service
public class ProductService {

	@Transactional
	public synchronized boolean buy( Member member, int productID, int number ) {
		// 注意上方的关键字
	}
}

接着,我们再次执行上面的单元测试(记得先将数据库中的商品库存重置为50):

线程 pool-3-thread-3 :用户 [3] 抢购 4 件商品……【成功】!
线程 pool-3-thread-2 :用户 [2] 抢购 3 件商品……【成功】!
线程 pool-3-thread-7 :用户 [7] 抢购 3 件商品……【成功】!
线程 pool-3-thread-8 :用户 [8] 抢购 4 件商品……【成功】!
(……此处省略部分冗余的输出……)
线程 pool-3-thread-9 :用户 [37] 抢购 3 件商品……【成功】!
线程 pool-3-thread-4 :用户 [36] 抢购 2 件商品……【成功】!
线程 pool-3-thread-6 :用户 [34] 抢购 5 件商品……【失败】!
线程 pool-3-thread-5 :用户 [35] 抢购 1 件商品……【成功】!
线程 pool-3-thread-1 :用户 [33] 抢购 4 件商品……【成功】!
线程 pool-3-thread-8 :用户 [32] 抢购 3 件商品……【成功】!
线程 pool-3-thread-7 :用户 [31] 抢购 2 件商品……【成功】!
线程 pool-3-thread-3 :用户 [29] 抢购 5 件商品……【失败】!
线程 pool-3-thread-2 :用户 [30] 抢购 1 件商品……【成功】!
(……此处省略部分冗余的输出……)
线程 pool-3-thread-7 :用户 [83] 抢购 4 件商品……【失败】!
线程 pool-3-thread-3 :用户 [100] 抢购 1 件商品……【失败】!
线程 pool-3-thread-2 :用户 [99] 抢购 5 件商品……【失败】!
线程 pool-3-thread-6 :用户 [98] 抢购 4 件商品……【失败】!
线程 pool-3-thread-10 :用户 [97] 抢购 3 件商品……【失败】!
线程 pool-3-thread-1 :用户 [95] 抢购 1 件商品……【失败】!
线程 pool-3-thread-5 :用户 [96] 抢购 2 件商品……【失败】!
线程 pool-3-thread-4 :用户 [94] 抢购 5 件商品……【失败】!
线程 pool-3-thread-9 :用户 [92] 抢购 3 件商品……【失败】!
执行结束时的统计:成功的抢购次数 = 39 ; 抢购到的商品总数 = 111
抢购完毕时的商品剩余数量:0

从上述执行结果来看:为ProductService.buy()方法添加synchronized关键字似乎也不能确保业务数据的一致性。

emmmmmm……总感觉有点不科学……那么,我们先去掉ProductService.buy()方法的synchronized关键字,然后在单元测试中为调用该方法的这行代码包裹相应的synchronized代码块:

	String result = "异常";
	try {
		synchronized (Product.class){ // 这是新增的同步代码块,其他代码完全不变
			result = productService.buy(mockMember, productID, mockNumber) ? "成功" : "失败";
		} // 对应的同步代码块闭合位置
	} catch (Exception e) {
		// ignore exception
	} finally {
		LOGGER.info("线程 {} :用户 [{}] 抢购 {} 件商品……【{}】!", threadName, mockMember.getID(), mockNumber, result);
		if ("成功".equals(result)) {
			soldCounter.add( mockNumber );
		}
	}

接着,我们再次执行单元测试(记得先重置库存),得到如下输出结果:

线程 pool-3-thread-1 :用户 [1] 抢购 2 件商品……【成功】!
线程 pool-3-thread-10 :用户 [10] 抢购 1 件商品……【成功】!
线程 pool-3-thread-9 :用户 [9] 抢购 5 件商品……【成功】!
线程 pool-3-thread-8 :用户 [8] 抢购 4 件商品……【成功】!
线程 pool-3-thread-7 :用户 [7] 抢购 3 件商品……【成功】!
线程 pool-3-thread-6 :用户 [6] 抢购 2 件商品……【成功】!
线程 pool-3-thread-5 :用户 [5] 抢购 1 件商品……【成功】!
线程 pool-3-thread-4 :用户 [4] 抢购 5 件商品……【成功】!
线程 pool-3-thread-3 :用户 [3] 抢购 4 件商品……【成功】!
线程 pool-3-thread-2 :用户 [2] 抢购 3 件商品……【成功】!
线程 pool-3-thread-3 :用户 [19] 抢购 5 件商品……【成功】!
线程 pool-3-thread-4 :用户 [18] 抢购 4 件商品……【成功】!
线程 pool-3-thread-5 :用户 [17] 抢购 3 件商品……【成功】!
线程 pool-3-thread-6 :用户 [16] 抢购 2 件商品……【成功】!
线程 pool-3-thread-7 :用户 [15] 抢购 1 件商品……【成功】!
线程 pool-3-thread-7 :用户 [25] 抢购 1 件商品……【成功】!
线程 pool-3-thread-8 :用户 [14] 抢购 5 件商品……【失败】!
线程 pool-3-thread-9 :用户 [13] 抢购 4 件商品……【成功】!
线程 pool-3-thread-10 :用户 [12] 抢购 3 件商品……【失败】!
线程 pool-3-thread-1 :用户 [11] 抢购 2 件商品……【失败】!
(………此处省略部分冗余的输出…)
线程 pool-3-thread-2 :用户 [94] 抢购 5 件商品……【失败】!
执行结束时的统计:成功的抢购次数 = 17 ; 抢购到的商品总数 = 50
抢购完毕时的商品剩余数量:0

当我们看到上述测试结果末尾处的抢购到的商品总数 = 50抢购完毕时的商品剩余数量:0时,就知道这才是我们想要的正确结果!

因此,在只有单个应用服务器的情况下,synchronized关键字也是可以用来解决事务并发冲突的。

问题:那么,为什么在ProductService.buy()方法上直接加synchronized关键字却无法得到预期的正确结果呢?

实际上,这并不是synchronized关键字的「锅」,而是由 Spring 的事务管理机制(AOP) 和 数据库的事务隔离级别 共同导致的。

限于篇幅,我们稍后将在本文下方的评论中来探讨其中的缘由,在此不再赘述。

方案②:数据库的悲观锁策略

众所周知,基于synchronized关键字的解决方案实际上依赖于 Java 等部分编程语言的线程锁机制,但它具有很大的局限性:

  • 如果存在多台应用服务器集群,则synchronized方案无效。
  • 如果其他程序也连接相同的数据库,并也可能更新同一业务数据,则仍然存在事务并发冲突的安全隐患。
  • 即使是同一个程序,如果其中有多个业务方法都可能修改同一业务数据,则它们也都可能需要添加synchronized关键字,并确保线程锁的监视器对象一致。
  • 部分编程语言自身可能并没有与类似于synchronized的线程锁机制,比如:PHP。

我们知道,数据库事务具有 ACID 四大特性(原子性、一致性、隔离性、持久性),它是数据库为我们提供的用于确保一系列业务操作的数据可靠性的解决方案。那么,对于事务并发冲突问题,数据库有没有为我们提供相应的解决方案呢?

答案当然是肯定的。对于多个事务并发争用同一数据导致的冲突问题,数据库也为我们提供了事务机制的解决方案,这就是悲观锁。它可以很方便地实现跨语言跨应用的事务并发安全。

以 MySQL 为例,开启事务后,我们可以在查询那些可能需要修改的商品数据的SQL语句末尾加上FOR UPDATE关键字(表示本次查询到的数据可能用于后续修改):

SELECT * FROM table_product WHERE id = ? FOR UPDATE

当我们执行上述SQL语句后,数据库在将最新的商品数据返回给我们的同时,也会为数据库中对应的商品数据记录添加一把悲观锁。

悲观锁是一种独占锁(排它锁),添加了悲观锁之后,其他针对该商品数据的FOR UPDATE查询就必须(阻塞)等到当前事务执行完毕(提交或回滚)并释放悲观锁后,才能获取到数据库返回的最新商品数据(同时也会为对应的数据添加新的悲观锁)。

/*
其他一切代码不变,只将下方代码中原来的 "productDAO.get( productID )"
改为 "productDAO.getLocked( productID )" ,对应的SQL语句也作相应的调整(参见下方注释)
 */

@Service
public class ProductService {	

	@Transactional
	public boolean buy( Member member, int productID, int number ) {

		// SQL: "SELECT <all columns> FROM table_product WHERE id = <productID> FOR UPDATE"
		Product product = productDAO.getLocked( productID ); // 只改了这一行代码

	}
}

接下来,我们再次执行之前的单元测试(记得重置库存数据,并清除方案①中的代码变更),可以得到如下结果:

线程 pool-3-thread-1 :用户 [1] 抢购 2 件商品……【成功】!
线程 pool-3-thread-10 :用户 [10] 抢购 1 件商品……【成功】!
线程 pool-3-thread-9 :用户 [9] 抢购 5 件商品……【成功】!
线程 pool-3-thread-8 :用户 [8] 抢购 4 件商品……【成功】!
(………此处省略部分冗余的输出…)
线程 pool-3-thread-5 :用户 [17] 抢购 3 件商品……【成功】!
线程 pool-3-thread-6 :用户 [16] 抢购 2 件商品……【成功】!
线程 pool-3-thread-7 :用户 [15] 抢购 1 件商品……【成功】!
线程 pool-3-thread-7 :用户 [25] 抢购 1 件商品……【成功】!
线程 pool-3-thread-8 :用户 [14] 抢购 5 件商品……【失败】!
线程 pool-3-thread-9 :用户 [13] 抢购 4 件商品……【成功】!
线程 pool-3-thread-10 :用户 [12] 抢购 3 件商品……【失败】!
线程 pool-3-thread-1 :用户 [11] 抢购 2 件商品……【失败】!
线程 pool-3-thread-10 :用户 [29] 抢购 5 件商品……【失败】!
(………此处省略部分冗余的输出…)
线程 pool-3-thread-3 :用户 [95] 抢购 1 件商品……【失败】!
线程 pool-3-thread-2 :用户 [94] 抢购 5 件商品……【失败】!
执行结束时的统计:成功的抢购次数 = 17 ; 抢购到的商品总数 = 50
抢购完毕时的商品剩余数量:0

这个结果当然是符合我们的业务期望的。

关于数据库的悲观锁FOR UPDATE还有许多细节值得说道,否则容易导致严重的事务并发性能问题,甚至可能出现事务死锁(deadlock),稍后我们将会另起文章来探讨其中值得注意的一些细节。

方案③:乐观锁策略

前面我们提到了「悲观锁」,为什么它被叫做「悲观」锁呢?

悲观锁之所以「悲观」,是因为它假定我们经常遇到最悲观的情况,也就是每次去修改数据的时候都有其他事务也在同时争抢着对该数据进行修改(即:业务操作的并发频率极高)。

因此,悲观锁是一种独占锁(排它锁),它能够很好地确保事务并发时数据的一致性,实现起来也很简单,但与此同时,它也会严重限制对应业务的并发处理能力(不能同时并行处理多个业务请求)。

所以,悲观锁更适用于那些对并发处理能力要求不高,但对数据的一致性要求较高的功能业务。

不过,现实情况并没有我们想象的这么「悲观」。

在现实世界中,得益于现代服务器的高性能,完成一次常规的事务处理所耗费的时间非常之短,事务之间产生并发冲突的频率仍然非常低。

然而,悲观锁会严重限制系统的并发处理能力,如果仅仅为了解决 0.1% 概率的事务并发冲突,就全面采用事务悲观锁,那么其他 99.9% 的事务也会跟着受到牵连——系统的并发处理能力会急剧降低,每个事务的平均处理时间也会大幅延长。当业务请求较多时,急剧降低的系统并发处理能力难以抗住较大的请求压力,导致大量请求被阻塞,严重影响用户的业务操作体验,应用系统和数据库也可能因为不堪重负以至于崩溃。

因此,对于那些用户访问量较大、对并发处理能力有一定要求的系统,我们必须重新考虑一种更轻量级的事务并发冲突解决方案。这就是我们现在要介绍的乐观锁。

先看一个示例,我们对之前的【商品抢购】业务代码进行如下改造(未改造的部分保持不变):

/*
商品实体类(和对应的数据表)新增 版本号(version) 字段。
对应的 SQL 如下:
ALTER TABLE `table_product`
ADD COLUMN `version`  bigint UNSIGNED NOT NULL DEFAULT 0 COMMENT '乐观锁的版本号标识' AFTER `count`;
*/

/** 商品(实体类) */
public class Product {
	/** 主键ID */
	protected Integer ID;
	/** 商品剩余库存数量 */
	protected Integer count;
	/** 乐观锁版本号(新增) */
	protected long version;
	// 省略 getter / setter	
}


/*
改造了 productDAO.updateCount() 方法所执行的 SQL 语句(参见下方代码中的注释),
新增 "version = 当前版本号" 的【条件限制】,和 "version = version + 1" 的【字段更新】,
并返回表示【数据库执行更新后的受影响行数 是否 > 0 的 boolean 值】。

如果没有并发冲突,即 数据库中的 version = 当前版本号,则能更新成功;
如果存在并发冲突(即 数据库中的 version 已被其它事务更新,不再 = 当前版本号),则不会更新到数据库,
本次业务请求将被视为【失败】,并抛出异常以回滚整个事务(基于 Spring 的事务管理机制)。
失败后,用户可以重新发起抢购请求。
*/
@Service
public class ProductService {

	@Resource
	protected ProductDAO productDAO;

	/**
	 * 模拟购买商品的过程
	 */
	@Transactional
	public boolean buy( Member member, int productID, int number ) {
		// 为了确保核心逻辑清晰,省略相关的条件检查等代码细节

		// SQL: "SELECT <all columns> FROM table_product WHERE id = ?"
		Product product = productDAO.get( productID );

		int remain = product.getCount() - number;
		if (remain >= 0) {
			// 本方法可能还需要执行 生成订单、账户扣款 等操作,为了更容易复现事务并发问题,我们在此休眠10ms以模拟该操作
			try {
				Thread.sleep( 10 );
			} catch (InterruptedException e) {
				throw new IllegalStateException("模拟业务操作时出现异常!", e);
			}

			product.setCount( remain );

			// SQL: "UPDATE table_product SET `count` = <remain>, version = version + 1 WHERE id = <productID> AND version = <product.version>"
			if( !productDAO.updateCount( product ) ){
				// 遭遇事务并发冲突,本次抢购视为失败,抛出异常以回滚事务
				throw new IllegalStateException("抢购失败了,请稍后再试~");
			}
			return true;
		}
		return false;
	}
}

正如上述代码所示,我们给【商品】新增了一个字段「版本号」(version),并准备用它来实现乐观锁。

我们可以思考:如果没有其他事务与我们并发争用,则数据库的版本号不会被修改。

我们在将新的库存更新到数据库时,也要为 UPDATE SQL 语句添加 version = 本次我们取得的版本号 这一查询条件。如果版本号未被其他事务修改(即:不存在并发冲突),该SQL语句就能够更新成功,否则更新失败(不会修改数据库中的任何数据)。

在UPDATE SQL语句中,我们还需要同时更新version字段,让其值自增长,以标记本次数据变更。

下面,我们再次执行上面的单元测试,可以得到类似如下结果:

线程 pool-3-thread-2 :用户 [2] 抢购 3 件商品……【异常】!
线程 pool-3-thread-7 :用户 [7] 抢购 3 件商品……【异常】!
线程 pool-3-thread-8 :用户 [8] 抢购 4 件商品……【异常】!
(……此处省略部分冗余的输出……)
线程 pool-3-thread-9 :用户 [9] 抢购 5 件商品……【异常】!
线程 pool-3-thread-10 :用户 [10] 抢购 1 件商品……【异常】!
线程 pool-3-thread-3 :用户 [3] 抢购 4 件商品……【成功】!
(……此处省略部分冗余的输出……)
线程 pool-3-thread-6 :用户 [75] 抢购 1 件商品……【异常】!
线程 pool-3-thread-8 :用户 [71] 抢购 2 件商品……【成功】!
线程 pool-3-thread-2 :用户 [74] 抢购 5 件商品……【异常】!
(……此处省略部分冗余的输出……)
线程 pool-3-thread-7 :用户 [91] 抢购 2 件商品……【成功】!
线程 pool-3-thread-9 :用户 [92] 抢购 3 件商品……【异常】!
(……此处省略部分冗余的输出……)
线程 pool-3-thread-2 :用户 [100] 抢购 1 件商品……【异常】!
执行结束时的统计:成功的抢购次数 = 10 ; 抢购到的商品总数 = 26
抢购完毕时的商品剩余数量:24

从上面的结果,我们可以看出:抢购到的商品总数(26) + 商品的剩余数量(24) = 商品的总库存(50)。也就是说,商品的数量是准确无误的。多次测试,这个结果也依旧是可靠的。

这也说明:在高并发的情况下,乐观锁确实能够保证事务并发的数据一致性。

不过,我们也注意到,这100次抢购中只有10次是成功的,抢购结束后商品也还有剩余库存,并没有售罄。

这其实也是乐观锁的「副作用」:在出现事务并发争用时,乐观锁就是以「其中之一执行成功,其他竞争者全部执行失败」来解决事务并发冲突的。

当然,我们的单元测试模拟的就是高并发的场景,因此出现这么多的抢购「失败」并不奇怪。在实际的生产环境中,出现这样高频率的事务并发是比较罕见的;在绝大多数「正常」的时候,我们就享受了乐观锁带来的好处——既保证了事务并发时的数据一致性,又保证了系统的高并发处理能力。

在乐观锁的场景下,当用户抢购失败时,我们一般会在前端页面提示用户「请稍后重试」,用户可以再次点击重新参与抢购。

不过,如果在高并发场景下出现类似上述测试结果那样的高达90%几率的抢购失败/异常,也是会严重影响用户体验的。更何况,上面测试结果中的商品库存直到抢购结束都没有被抢光呢,100个人来抢50个商品,商品却没有被抢光,这也是有些说不过去的。

因此,我们在使用乐观锁的时候,也会针对抢购异常增加一些自动重试机制来确保用户的体验。

并发修改同一记录时,避免更新丢失,需要加锁。要么在应用层加锁,要么在缓存加锁,要么在数据库层使用乐观锁,使用 version 作为更新依据。

说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于3 次。

与资金相关的金融敏感信息,仍然推荐使用悲观锁策略。

——《阿里巴巴Java开发手册》中关于事务锁的建议

现在,我们就参考《阿里巴巴Java开发手册》中的建议继续改造代码,改造后的代码大致如下:

// 在单元测试中增加 抢购出现并发冲突时自动重试3次 的改进措施

		threadPool.execute(() -> {
				final int mockNumber = memberID % 5 + 1; // 抢购数量:1~5
				final String threadName = Thread.currentThread().getName();

				String result;
				int retryCountDown = 3;
				do {
					try {
						result = productService.buy( mockMember, productID, mockNumber ) ? "成功" : "失败";
					} catch (Exception e) {
						result = "异常"; // ignore exception
						// 你也可以在这里添加 Thread.sleep( 10 ~ 1000 ) 来延迟重试操作的时间点,错开事务冲突可能的高峰
					}
				} while ( "异常".equals(result) && retryCountDown-- > 0 );

				LOGGER.info("线程 {} :用户 [{}] 抢购 {} 件商品……【{}】!", threadName, mockMember.getID(), mockNumber, result);
				if ("成功".equals( result )) {
					soldCounter.add( mockNumber );
				}
			});

然后,我们重新执行单元测试,可以得到类似如下输出:

线程 pool-3-thread-5 :用户 [5] 抢购 1 件商品……【成功】!
线程 pool-3-thread-1 :用户 [1] 抢购 2 件商品……【成功】!
线程 pool-3-thread-8 :用户 [8] 抢购 4 件商品……【成功】!
线程 pool-3-thread-6 :用户 [6] 抢购 2 件商品……【异常】!
(……此处省略部分冗余代码……)
线程 pool-3-thread-9 :用户 [16] 抢购 2 件商品……【异常】!
线程 pool-3-thread-7 :用户 [20] 抢购 1 件商品……【异常】!
线程 pool-3-thread-6 :用户 [14] 抢购 5 件商品……【成功】!
线程 pool-3-thread-3 :用户 [19] 抢购 5 件商品……【异常】!
(……此处省略部分冗余代码……)

线程 pool-3-thread-8 :用户 [43] 抢购 4 件商品……【成功】!
线程 pool-3-thread-8 :用户 [49] 抢购 5 件商品……【失败】!
线程 pool-3-thread-1 :用户 [36] 抢购 2 件商品……【异常】!
线程 pool-3-thread-1 :用户 [51] 抢购 2 件商品……【失败】!
线程 pool-3-thread-7 :用户 [38] 抢购 4 件商品……【成功】!
线程 pool-3-thread-3 :用户 [46] 抢购 2 件商品……【失败】!
(……此处省略部分冗余代码……)
线程 pool-3-thread-9 :用户 [99] 抢购 5 件商品……【失败】!
线程 pool-3-thread-6 :用户 [98] 抢购 4 件商品……【失败】!
执行结束时的统计:成功的抢购次数 = 15 ; 抢购到的商品总数 = 50
抢购完毕时的商品剩余数量:0

从上面的测试结果中我们很容易看出:所有的商品都被抢光了,商品的库存数据也准确无误!这完全符合我们的业务期望!

探索更多可能的解决方案

前面我们介绍了synchronized线程锁、悲观锁、乐观锁这三种比较主流的事务并发冲突解决方案。

当然,条条大路通罗马,除了这些解决方案外,我们还可以自行探索一些同样可以解决事务并发冲突问题的方案变种:

  • 【阻塞队列】:先将所有的业务请求都临时存放在同一个阻塞队列中,应用程序可以从中依次(先进先出)取出一个业务请求进行处理,处理完成后,才能处理下一个业务请求。
  • 【Redis分布式锁】:应用程序在每次执行业务处理方法之前,都先去抢占同一Redis服务器的分布式锁,只有成功抢占了的线程才能开始处理本次业务请求,否则只能阻塞等待其他线程处理完毕并释放锁之后,继续去抢占,直到抢占成功(或超时)为止。
  • ……

万变不离其宗,无论我们采用何种方案,究其本质,无非以下两种思路:

  1. 通过 编程语言本身 或 第三方 的 本地锁或分布式锁 强行阻塞所有的业务请求,让他们排队,然后依次单个执行业务处理方法,从而避免事务并发冲突。
  2. 允许业务处理方法并行执行,然后通过某种轻量级的机制来检测并发冲突,并确保:当发生冲突时,有且仅有「一人」能够执行成功并提交更新;「其他人」全部失败并回滚,等待稍后重试。

以上每种方案都具有一定的局限性,我们需要自行根据业务的实际需求和运行场景进行合理的取舍。

  • CodePlayer技术交流群1
  • CodePlayer技术交流群2

3 条评论

Ready [作者] · 4年前

为什么在基于 Spring 事务管理机制 的 ProductService.buy() 方法上直接加synchronized关键字却无法得到预期的正确结果呢?

这其实涉及到 Spring 的事务管理机制(AOP 代理)和 事务的数据隔离级别 的相关知识。我们先来简单剖析一下 在 Spring 事务管理 的 buy() 方法上直接加synchronized关键字修饰 和 在 buy() 方法外加synchronized代码块的执行顺序细节区别:

【在 buy() 方法上直接加 synchronized 关键字修饰】

Spring AOP 开启事务;
try {
    synchronized ( this ){ // 同步锁作用在这里(等价)
        调用 buy() 方法;
    }
} catch( Exception e ) {
    Spring AOP 回滚事务;
    throw e; // 继续对外抛出异常
}
Spring AOP 提交事务;



【在 buy() 方法外加 synchronized 代码块】

synchronized( Product.class ){ // 同步锁作用在这里
    Spring AOP 开启事务;
    try {
        调用 buy() 方法;
    } catch( Exception e ) {
        Spring AOP 回滚事务;
        throw e; // 继续对外抛出异常
    }
    Spring AOP 提交事务;
}
*/

通过上述剖析,我们可以看到两者的同步锁作用范围并不一致:前者是先开启事务,才能抢占同步锁,释放锁后才会提交/回滚事务;后者是先加同步锁,再在同步代码块内部完成事务从开启到提交或回滚的整个过程。

换句话说,前者的同步锁作用范围比较小,需要等到开启事务后,才能抢占同步锁。因此,多个并行的线程是有可能都先开启了事务,然后再排队依次等待内部同步锁区域的代码执行。

这里,我们又要引出事务的数据隔离机制。要知道,当我们开启一个事务后,在提交事务之前,我们在事务中对数据所作的任何修改,对于其他正在进行中的事务来说一般都是不可见的(事务之间的数据可见性取决于该事务隔离级别的具体设置)。

也就是说,有可能存在如下的事务执行场景:

// 前置环境:假定数据库中的当前商品库存为 50
// 事务A 准备抢购 1 件商品;事务B 准备抢购 2 件商品

1、事务A 【开启】 (此时库存 = 50);
2、事务B 【开启】(此时库存 = 50);
3、事务A 先抢占到同步锁;
4、事务A 查询获取商品数据(获取到的商品库存仍然为 50);
5、事务A 抢购到 1 件商品,更新库存 50 - 1 = 49 到数据库;
6、事务A 释放同步锁;
7、事务B 抢占到同步锁;
8、事务B 查询获取商品数据(由于事务A并没有提交,所以库存仍然为 50);
9、事务A 【提交】(此时数据库的库存 = 49);
10、事务B 抢购到 2 件商品,更新库存 50 - 2 = 48 到数据库;
11、事务B 释放同步锁;
12、事务B 【提交】(此时数据库的库存 = 48);

很明显,在这种场景下,两个事务执行完毕后,数据库中的库存数据并不正确(应该为47,但实际为48)!

实际上,在上面假定的事务执行场景中,当事务的隔离级别为可重复读(REPEATABLE READ)及以上级别时,哪怕在事务B查询获取商品数据之前事务A就已经提交了事务,事务B获取到的商品库存仍然为 50。因为可重复读这一隔离级别决定了事务B在整个过程中能看到的数据,只是事务B刚开启时这个时间点之前的数据和事务B自己中途修改的数据(事务B也能看到其他事务新增和删除的数据行)。而 MySQL 数据库默认的事务隔离级别就是可重复读。

2 0 0
Ready [作者] · 4年前

在现实世界中,我们系统中的商品一般都不止一个,如果我们在单台应用服务器的场景下使用synchronized关键字来解决事务并发问题,那么我们可以采取一些措施来优化应用程序的并发处理能力,例如改进 同步锁 的监视器对象:

/* 
synchronized (Product.class){
	result = productService.buy(mockMember, productID, mockNumber) ? "成功" : "失败";
}
*/

// 将之前的代码块优化改进为如下代码块
final String lockMonitor = ( "Product_" + productID ).intern();
synchronized ( lockMonitor ){
	result = productService.buy(mockMember, productID, mockNumber) ? "成功" : "失败";
}

改进前,不同ID的商品抢占的都是同一个同步锁;改进后,每个商品分别抢占各自唯一的同步锁。这样就可以大幅提高商品抢购的业务并发处理能力。因为,我们在业务方法中只会对指定ID的商品库存进行更新,并不会读写其他商品的数据,因此也就不需要持有全局的同步锁。

1 0 0
Ready [作者] · 4年前

使用【乐观锁】策略时,最好对程序作一些额外的优化,例如:

  1. 尽量确保业务方法的传入参数是只读的,不要在方法内随意修改参数对象的属性,这样能够更好地进行方法重试。
  2. 在 1. 的基础上,我们可以使用【 AOP + 特定注解 + 特定异常 】对乐观锁的自动重试逻辑进行统一封装,以便更方便地实现无明显代码入侵的统一的自动重试机制。
0 0 0

撰写评论

打开导航菜单