10k

设计模式之美-课程笔记52-项目实战2-设计实现一个通用的接口幂等框架

需求场景

  1. 结合之前的限流项目背景,公共服务被抽离出来只提供RESTful接口,中间加一层HTTP框架来控制访问。
  2. 调用接口的结果一般是成功、失败和超时。
  3. 超时的情况是未决的,我们不知道是哪个环节出了问题,也不确定是否当前的请求已经处理完成。
  4. 如果接口只包含查询、删除、更新这些操作,重试一次即可。
    1. 需要注意的问题是ABA问题,也即当删除操作超时,下一次删除重试之前新的数据被插入,重试操作删除了新的被插入的数据。-》 大部分业务可以容忍,不容忍的可以特殊处理。
    2. update x = x+delta 这样格式的更新操作并非幂等,只有 update x=y 这样格式的更新操作才是幂等的。如果插入的数据包含数据库唯一键,可以利用数据库唯一键的排他性,保证不会重复插入数据
  5. 超时的几种处理方式:
    1. 提醒用户,让他们自己决定。-》 时间敏感用户适用
    2. 调用方查询操作结果,判断超时的操作是否成功。-》时间不敏感用户适用,但是局限,因为不是所有调用方都方便查询结果。
    3. 直接重试。-》时间不敏感用户适用 -》 通用解决方案。

需求分析

  1. 幂等: (针对这个接口调用的场景)针对同一接口多次发起同一个业务请求,必须保证业务只执行一次。
  2. 如何判断两个请求是重试关系?-》 幂等号标识。全局唯一。

功能性需求

通过TDD或者用户用例来看一下这个功能如何被使用:

///////// 使用方式一: 在业务代码中处理幂等 ////////////
// 接口调用方
Idempotence idempotence = new Idempotence();
String idempotenceId = idempotence.createId();
Order order = createOrderWithIdempotence(..., idempotenceId);

// 接口实现方
public class OrderController {
  private Idempotence idempontence; // 依赖注入
  
  public Order createOrderWithIdempotence(..., String idempotenceId) {
    // 前置操作
    boolean existed = idempotence.check(idempotenceId);
    if (existed) {
      // 两种处理方式:
      // 1. 查询order,并且返回;
      // 2. 返回duplication operation Exception
    }
    idempotence.record(idempotenceId);
        
    //...执行正常业务逻辑
  }
  
  public Order createOrder(...) {
    //...
  }
}

///////// 使用方式二:在框架层面处理幂等 //////////////
// 接口调用方
Idempotence idempotence = new Idempotence();
String idempotenceId = idempotence.createId();
//...通过feign框架将幂等号添加到http header中...

// 接口实现方
public class OrderController {
  @IdempotenceRequired
  public Order createOrder(...) {
    //...
  }
}

// 在AOP切面中处理幂等
@Aspect
public class IdempotenceSupportAdvice {
  @Autowired
  private Idempotence idempotence;

@Pointcut("@annotation(com.xzg.cd.idempotence.annotation.IdempotenceRequired)")
  public void controllerPointcut() {
  }

  @Around(value = "controllerPointcut()")
  public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
    // 从HTTP header中获取幂等号idempotenceId
    
    // 前置操作
    boolean existed = idempotence.check(idempotenceId);
    if (existed) {
      // 两种处理方式:
      // 1. 查询order,并且返回;
      // 2. 返回duplication operation Exception
    }
    idempotence.record(idempotenceId)
    
    Object result = joinPoint.proceed();    
    return result;
  }
}

我们可以得出接口调用方生成幂等号,随着请求传入框架。框架从HTTP header或者接口中取得这个然后查询。如果幂等号已经存在说明任务正在执行直接返回;否则记录幂等号并执行任务。

非功能性需求

易用性:接入成本低,易于学习。松耦合

性能:低延迟,因为在请求处理前加入了幂等判断逻辑。

容错性:高容错。不能因为幂等框架的异常影响接口服务的本身可用性。

幂等处理正常流程

接口调用方接收到请求,解析幂等号,传递给幂等框架,框架查询这个幂等号是否存在。如果存在则不需要重复执行;不存在则存储这个幂等号并执行相应的业务逻辑。

业务代码的异常处理

  1. 由实际调用方的工程师来决定,框架不做决定,提供删除幂等号的接口。
  2. 一般出现系统错误可以尝试重试(数据库连接错误);一般出现业务错误就不重试了因为重试也是同样的结果(数据不存在)(不删除幂等号)。

业务宕机处理

  1. 可能存在幂等号记录后业务宕机则,重试业务依旧不能执行;或者系统异常出现后在删除幂等号的时候宕机。
  2. 幂等逻辑+业务逻辑行为一致为了保证,可以放进事务-》会影响性能,整个系统的变得复杂,成为了分布式事务(业务和幂等系统使用不同的数据库)
  3. 另一个方法是在业务系统中增加一个记录幂等号的表,幂等号存在这里然后同步进另一个幂等自己的DB,然后事务变成了不是分布式的:
    1. 好处:解决了分布式事务的复杂性,可以利用DB本身的事务性;
    2. 坏处:和业务又不解耦了。
      1. 不能一概而论,这个方案不失为一个方法,但是进一步可以通过SQL自己的日志记录幂等号,用于问题排查。

幂等框架异常处理

  1. 不同于限流框架,他出异常不管用之后,我们还有监控、运维手段可以及时发现。
  2. 幂等框架异常,应该让业务也失败。因为如果运行业务继续执行的话,无法判读重试会导致重复执行,业务执行出错。

V1版本功能需求

  • 生成幂等号
  • 幂等号的存储、查询、删除

幂等号的生成

  1. 判断两个请求是否是同一个业务请求。(是否重试关系)
  2. 两种方式:
    1. 一个集中生成的服务,分配给调用方
    2. 各个调用方自己生成
  3. 集中生成需要引入第三方服务
    1. 优点:修改集中,对调用方隐藏实现
    2. 缺点:增加系统负责性
  4. 调用方自己生成:
    1. 执行效率高
    2. 修改幂等号生成算法需要修改所有的调用方;实现者水平不同,可能出bug且重复实现。
  5. 一个折中方式,保证执行效率和代码维护成本:集中实现幂等生成算法,让业务方自己调用,实现复用和集中维护。

幂等号的存储、查询、删除

  1. 只需要幂等号查重功能:Redis即可,不需要关系型数据库;
    1. 多线程执行检查-设置逻辑时会有竞争关系-》加分布式锁-》增加复杂度
    2. Redis本身提供原子操作setnx(key,value)-》单线程,没有并发问题。

最小原型代码实现&Review最小原型代码

public class Idempotence {
  // comment-1: 如果要替换存储方式,是不是很麻烦呢?
  private JedisCluster jedisCluster;

  // comment-2: 如果幂等框架要跟业务系统复用jedisCluster连接呢?
  // comment-3: 是不是应该注释说明一下redisClusterAddress的格式,以及config是否可以传递进null呢?
  public Idempotence(String redisClusterAddress, GenericObjectPoolConfig config) {
    // comment-4: 这段逻辑放到构造函数里,不容易写单元测试呢
    String[] addressArray= redisClusterAddress.split(";");
    Set<HostAndPort> redisNodes = new HashSet<>();
    for (String address : addressArray) {
      String[] hostAndPort = address.split(":");
      redisNodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
    }
    this.jedisCluster = new JedisCluster(redisNodes, config);
  }
  
  // comment-5: generateId()是不是比缩写要好点?
  // comment-6: 根据接口隔离原则,这个函数跟其他函数的使用场景完全不同,这个函数主要用在调用方,其他函数用在实现方,是不是应该分别放到两个类中?
  public String genId() {
    return UUID.randomUUID().toString();
  }

  // comment-7: 返回值的意义是不是应该注释说明一下?
  public boolean saveIfAbsent(String idempotenceId) {
    Long success = jedisCluster.setnx(idempotenceId, "1");
    return success == 1;
  }

  public void delete(String idempotenceId) {
    jedisCluster.del(idempotenceId);
  }
}

重构最小原型代码

  1. GenerateId名字更改并抽离到另一个util类中
  2. storage类单独提取,封装并提供接口,提高灵活性并且运行后续更改存储实现
  3. storage中增加注释解释检查-存储的返回结果;增加注释解读配置参数
  4. storage中构造函数抽离出配置过程,提升可测试性
// 代码目录结构
com.xzg.cd.idempotence
 --Idempotence
 --IdempotenceIdGenerator(幂等号生成类)
 --IdempotenceStorage(接口:用来读写幂等号)
 --RedisClusterIdempotenceStorage(IdempotenceStorage的实现类)
// 每个类的代码实现
public class Idempotence {
  private IdempotenceStorage storage;

  public Idempotence(IdempotenceStorage storage) {
    this.storage = storage;
  }

  public boolean saveIfAbsent(String idempotenceId) {
    return storage.saveIfAbsent(idempotenceId);
  }

  public void delete(String idempotenceId) {
    storage.delete(idempotenceId);
  }
}
public interface IdempotenceStorage {
  boolean saveIfAbsent(String idempotenceId);
  void delete(String idempotenceId);
}


public class RedisClusterIdempotenceStorage implements IdempotenceStorage {
  private JedisCluster jedisCluster;

  /**
   * Constructor
   * @param redisClusterAddress the format is 128.91.12.1:3455;128.91.12.2:3452;289.13.2.12:8978
   * @param config should not be null
   */
  public RedisIdempotenceStorage(String redisClusterAddress, GenericObjectPoolConfig config) {
    Set<HostAndPort> redisNodes = parseHostAndPorts(redisClusterAddress);
    this.jedisCluster = new JedisCluster(redisNodes, config);
  }

  public RedisIdempotenceStorage(JedisCluster jedisCluster) {
    this.jedisCluster = jedisCluster;
  }

  /**
   * Save {@idempotenceId} into storage if it does not exist.
   * @param idempotenceId the idempotence ID
   * @return true if the {@idempotenceId} is saved, otherwise return false
   */
  public boolean saveIfAbsent(String idempotenceId) {
    Long success = jedisCluster.setnx(idempotenceId, "1");
    return success == 1;
  }

  public void delete(String idempotenceId) {
    jedisCluster.del(idempotenceId);
  }

  @VisibleForTesting
  protected Set<HostAndPort> parseHostAndPorts(String redisClusterAddress) {
    String[] addressArray= redisClusterAddress.split(";");
    Set<HostAndPort> redisNodes = new HashSet<>();
    for (String address : addressArray) {
      String[] hostAndPort = address.split(":");
      redisNodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
    }
    return redisNodes;
  }
}
public class IdempotenceIdGenerator {
  public String generateId() {
    return UUID.randomUUID().toString();
  }
}
Thoughts? Leave a comment