需求场景
- 结合之前的限流项目背景,公共服务被抽离出来只提供RESTful接口,中间加一层HTTP框架来控制访问。
- 调用接口的结果一般是成功、失败和超时。
- 超时的情况是未决的,我们不知道是哪个环节出了问题,也不确定是否当前的请求已经处理完成。
- 如果接口只包含查询、删除、更新这些操作,重试一次即可。
- 需要注意的问题是ABA问题,也即当删除操作超时,下一次删除重试之前新的数据被插入,重试操作删除了新的被插入的数据。-》 大部分业务可以容忍,不容忍的可以特殊处理。
- update x = x+delta 这样格式的更新操作并非幂等,只有 update x=y 这样格式的更新操作才是幂等的。如果插入的数据包含数据库唯一键,可以利用数据库唯一键的排他性,保证不会重复插入数据
- 超时的几种处理方式:
- 提醒用户,让他们自己决定。-》 时间敏感用户适用
- 调用方查询操作结果,判断超时的操作是否成功。-》时间不敏感用户适用,但是局限,因为不是所有调用方都方便查询结果。
- 直接重试。-》时间不敏感用户适用 -》 通用解决方案。
需求分析
- 幂等: (针对这个接口调用的场景)针对同一接口多次发起同一个业务请求,必须保证业务只执行一次。
- 如何判断两个请求是重试关系?-》 幂等号标识。全局唯一。
功能性需求
通过TDD或者用户用例来看一下这个功能如何被使用:
///////// 使用方式一: 在业务代码中处理幂等 //////////// // 接口调用方 Idempotence idempotence = new Idempotence(); String idempotenceId = idempotence.createId(); Order order = createOrderWithIdempotence(..., idempotenceId); // 接口实现方 public class OrderController { private Idempotence idempontence; // 依赖注入 public Order createOrderWithIdempotence(..., String idempotenceId) { // 前置操作 boolean existed = idempotence.check(idempotenceId); if (existed) { // 两种处理方式: // 1. 查询order,并且返回; // 2. 返回duplication operation Exception } idempotence.record(idempotenceId); //...执行正常业务逻辑 } public Order createOrder(...) { //... } } ///////// 使用方式二:在框架层面处理幂等 ////////////// // 接口调用方 Idempotence idempotence = new Idempotence(); String idempotenceId = idempotence.createId(); //...通过feign框架将幂等号添加到http header中... // 接口实现方 public class OrderController { @IdempotenceRequired public Order createOrder(...) { //... } } // 在AOP切面中处理幂等 @Aspect public class IdempotenceSupportAdvice { @Autowired private Idempotence idempotence; @Pointcut("@annotation(com.xzg.cd.idempotence.annotation.IdempotenceRequired)") public void controllerPointcut() { } @Around(value = "controllerPointcut()") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { // 从HTTP header中获取幂等号idempotenceId // 前置操作 boolean existed = idempotence.check(idempotenceId); if (existed) { // 两种处理方式: // 1. 查询order,并且返回; // 2. 返回duplication operation Exception } idempotence.record(idempotenceId) Object result = joinPoint.proceed(); return result; } }
我们可以得出接口调用方生成幂等号,随着请求传入框架。框架从HTTP header或者接口中取得这个然后查询。如果幂等号已经存在说明任务正在执行直接返回;否则记录幂等号并执行任务。
非功能性需求
易用性:接入成本低,易于学习。松耦合
性能:低延迟,因为在请求处理前加入了幂等判断逻辑。
容错性:高容错。不能因为幂等框架的异常影响接口服务的本身可用性。
幂等处理正常流程
接口调用方接收到请求,解析幂等号,传递给幂等框架,框架查询这个幂等号是否存在。如果存在则不需要重复执行;不存在则存储这个幂等号并执行相应的业务逻辑。
业务代码的异常处理
- 由实际调用方的工程师来决定,框架不做决定,提供删除幂等号的接口。
- 一般出现系统错误可以尝试重试(数据库连接错误);一般出现业务错误就不重试了因为重试也是同样的结果(数据不存在)(不删除幂等号)。
业务宕机处理
- 可能存在幂等号记录后业务宕机则,重试业务依旧不能执行;或者系统异常出现后在删除幂等号的时候宕机。
- 幂等逻辑+业务逻辑行为一致为了保证,可以放进事务-》会影响性能,整个系统的变得复杂,成为了分布式事务(业务和幂等系统使用不同的数据库)
- 另一个方法是在业务系统中增加一个记录幂等号的表,幂等号存在这里然后同步进另一个幂等自己的DB,然后事务变成了不是分布式的:
- 好处:解决了分布式事务的复杂性,可以利用DB本身的事务性;
- 坏处:和业务又不解耦了。
- 不能一概而论,这个方案不失为一个方法,但是进一步可以通过SQL自己的日志记录幂等号,用于问题排查。
幂等框架异常处理
- 不同于限流框架,他出异常不管用之后,我们还有监控、运维手段可以及时发现。
- 幂等框架异常,应该让业务也失败。因为如果运行业务继续执行的话,无法判读重试会导致重复执行,业务执行出错。
V1版本功能需求
- 生成幂等号
- 幂等号的存储、查询、删除
幂等号的生成
- 判断两个请求是否是同一个业务请求。(是否重试关系)
- 两种方式:
- 一个集中生成的服务,分配给调用方
- 各个调用方自己生成
- 集中生成需要引入第三方服务
- 优点:修改集中,对调用方隐藏实现
- 缺点:增加系统负责性
- 调用方自己生成:
- 执行效率高
- 修改幂等号生成算法需要修改所有的调用方;实现者水平不同,可能出bug且重复实现。
- 一个折中方式,保证执行效率和代码维护成本:集中实现幂等生成算法,让业务方自己调用,实现复用和集中维护。
幂等号的存储、查询、删除
- 只需要幂等号查重功能:Redis即可,不需要关系型数据库;
- 多线程执行检查-设置逻辑时会有竞争关系-》加分布式锁-》增加复杂度
- Redis本身提供原子操作setnx(key,value)-》单线程,没有并发问题。
最小原型代码实现&Review最小原型代码
public class Idempotence { // comment-1: 如果要替换存储方式,是不是很麻烦呢? private JedisCluster jedisCluster; // comment-2: 如果幂等框架要跟业务系统复用jedisCluster连接呢? // comment-3: 是不是应该注释说明一下redisClusterAddress的格式,以及config是否可以传递进null呢? public Idempotence(String redisClusterAddress, GenericObjectPoolConfig config) { // comment-4: 这段逻辑放到构造函数里,不容易写单元测试呢 String[] addressArray= redisClusterAddress.split(";"); Set<HostAndPort> redisNodes = new HashSet<>(); for (String address : addressArray) { String[] hostAndPort = address.split(":"); redisNodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1]))); } this.jedisCluster = new JedisCluster(redisNodes, config); } // comment-5: generateId()是不是比缩写要好点? // comment-6: 根据接口隔离原则,这个函数跟其他函数的使用场景完全不同,这个函数主要用在调用方,其他函数用在实现方,是不是应该分别放到两个类中? public String genId() { return UUID.randomUUID().toString(); } // comment-7: 返回值的意义是不是应该注释说明一下? public boolean saveIfAbsent(String idempotenceId) { Long success = jedisCluster.setnx(idempotenceId, "1"); return success == 1; } public void delete(String idempotenceId) { jedisCluster.del(idempotenceId); } }
重构最小原型代码
- GenerateId名字更改并抽离到另一个util类中
- storage类单独提取,封装并提供接口,提高灵活性并且运行后续更改存储实现
- storage中增加注释解释检查-存储的返回结果;增加注释解读配置参数
- storage中构造函数抽离出配置过程,提升可测试性
// 代码目录结构
com.xzg.cd.idempotence
--Idempotence
--IdempotenceIdGenerator(幂等号生成类)
--IdempotenceStorage(接口:用来读写幂等号)
--RedisClusterIdempotenceStorage(IdempotenceStorage的实现类)
// 每个类的代码实现 public class Idempotence { private IdempotenceStorage storage; public Idempotence(IdempotenceStorage storage) { this.storage = storage; } public boolean saveIfAbsent(String idempotenceId) { return storage.saveIfAbsent(idempotenceId); } public void delete(String idempotenceId) { storage.delete(idempotenceId); } }
public interface IdempotenceStorage { boolean saveIfAbsent(String idempotenceId); void delete(String idempotenceId); } public class RedisClusterIdempotenceStorage implements IdempotenceStorage { private JedisCluster jedisCluster; /** * Constructor * @param redisClusterAddress the format is 128.91.12.1:3455;128.91.12.2:3452;289.13.2.12:8978 * @param config should not be null */ public RedisIdempotenceStorage(String redisClusterAddress, GenericObjectPoolConfig config) { Set<HostAndPort> redisNodes = parseHostAndPorts(redisClusterAddress); this.jedisCluster = new JedisCluster(redisNodes, config); } public RedisIdempotenceStorage(JedisCluster jedisCluster) { this.jedisCluster = jedisCluster; } /** * Save {@idempotenceId} into storage if it does not exist. * @param idempotenceId the idempotence ID * @return true if the {@idempotenceId} is saved, otherwise return false */ public boolean saveIfAbsent(String idempotenceId) { Long success = jedisCluster.setnx(idempotenceId, "1"); return success == 1; } public void delete(String idempotenceId) { jedisCluster.del(idempotenceId); } @VisibleForTesting protected Set<HostAndPort> parseHostAndPorts(String redisClusterAddress) { String[] addressArray= redisClusterAddress.split(";"); Set<HostAndPort> redisNodes = new HashSet<>(); for (String address : addressArray) { String[] hostAndPort = address.split(":"); redisNodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1]))); } return redisNodes; } }
public class IdempotenceIdGenerator { public String generateId() { return UUID.randomUUID().toString(); } }