本地消息表是一个实现弱最终一致性的方案,通过对操作状态进行本地记录,失败时进行一定次数的重试,来保证调用时成功的可能。

核心代码:

LocalMessageDO:本地消息表

java
@Builder
@Getter
@Setter
@TableName("local_message")
public class LocalMessageDO {
    /**
     * 主键
     */
    private Long id;
    /**
     * 业务ID
     */
    private String bizId;
    /**
     * 业务类型
     */
    private String bizType;
    /**
     * 状态 INIT, SUCCESS, FAIL
     */
    private String status;
    /**
     * 已重试次数
     */
    private Integer retryTimes;
    /**
     * 最大重试次数
     */
    private Integer maxRetryTimes;
    /**
     * 请求快照
     */
    private String reqSnapshot;
    /**
     * 失败原因
     */
    private String failReason;
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
    /**
     * 更新时间
     */
    private LocalDateTime updateTime;

}

InvokeCtx:请求的参数

java
@Builder  
@Getter  
@Setter  
public class InvokeCtx {  
    private String className;  
    private String methodName;  
    private String paramsType;  
    private String params;  
}

InvokeStateHandle:状态流转处理

java
public class InvokeStateHandle {  
    private static final ThreadLocal<Boolean> isProcessing 
    = ThreadLocal.withInitial(() -> false);  
  
    public static boolean inProcess() {  
        return isProcessing.get();  
    }  
  
    public static void process() {  
        isProcessing.set(true);  
    }  
  
    public static void finish() {  
        isProcessing.remove();  
    }  
}

ScanTask:对事件进行扫描并重试

java
@Component  
@Slf4j  
public class ScanTask {  
    @Autowired  
    private LocalMessageService localMessageService;  
    @Autowired  
    private RedissonClient redissonClient;  
    @Autowired  
    private ApplicationContext applicationContext;  
  
    private static final List<String> waitRetryStatusList = List.of(  
            LocalMessageStatusEnum.RETRY.getValue()  
    );  
  
    //    @Scheduled(cron = "30 * * * * ?")  
    @Scheduled(initialDelay = 5000, fixedRate = 2000) // 等待 5 秒 -> 第一次执行任务 -> 每隔 2 秒重复执行任务  
    public void scan() {  
        log.info("扫描本地事务");  
        localMessageService.loadWaitRetryRecords(waitRetryStatusList).forEach(this::doInvoke);  
    }  
  
    private void invoke(LocalMessageDO localMessageDO, boolean sync) {  
        if (sync) {  
            doSyncInvoke(localMessageDO);  
        } else {  
            doInvoke(localMessageDO);  
        }  
    }  
  
    private void doSyncInvoke(LocalMessageDO localMessageDO) {  
        // todo  
    }
  
    private void doInvoke(LocalMessageDO localMessageDO) {  
        RLock lock = redissonClient.getLock(localMessageDO.getBizId());  
        try {  
            if (lock.tryLock(5, TimeUnit.SECONDS)) {  
                if (StrUtil.isBlank(localMessageDO.getReqSnapshot())) {  
                    log.warn("本地事务记录快照为空,bizId:{}", localMessageDO.getBizId());  
                    return;  
                }  

                InvokeStateHandle.process();  
  
                InvokeCtx ctx = JSON.parseObject(localMessageDO.getReqSnapshot(), InvokeCtx.class);  
                Class<?> target = Class.forName(ctx.getClassName());  
                Object bean = applicationContext.getBean(target);  
                List<Class<?>> paramsType = getParamsType(JSON.parseArray(ctx.getParamsType(), String.class));  
                Method method = target.getMethod(ctx.getMethodName(), paramsType.toArray(Class[]::new));  
                Object[] args = getArgs(paramsType, ctx.getParams());  
                method.invoke(bean, args);  
                localMessageService.invokeSuccess(localMessageDO);  
            }  
        } catch (Exception e) {  
            localMessageService.retry(localMessageDO, e.getMessage());  
        } finally {  
            InvokeStateHandle.finish();  
            lock.unlock();  
        }  
    }  
  
    private Object[] getArgs(List<Class<?>> paramsType, String params) {  
        Object[] result = new Object[paramsType.size()];  
        JSONArray args = JSON.parseArray(params);  
        for (int i = 0; i < args.size(); i++) {  
            result[i] = JSON.parseObject(args.getString(i), paramsType.get(i));  
        }  
        return result;  
    }  
  
    private List<Class<?>> getParamsType(List<String> classes) {  
        List<Class<?>> result = new ArrayList<>();  
        for (String s : classes) {  
            try {  
                result.add(Class.forName(s));  
            } catch (ClassNotFoundException e) {  
                throw new RuntimeException(e);  
            }  
        }  
        return result;  
    }  
}

LocalMessageAspect:AOP逻辑

java
@Aspect  
@Component  
@Slf4j  
public class LocalMessageAspect {  
    @Autowired  
    private LocalMessageService localMessageService;  
  
    @Around("@annotation(localMessage)")  
    public Object process(ProceedingJoinPoint pjp, LocalMessage localMessage) throws Throwable {  
		// 每一次对标注方法的调用都会被AOP代理,使用状态,可以避免对参数的重复记录
        if (InvokeStateHandle.inProcess()) {  
            return pjp.proceed();  
        }  
  
        MethodSignature signature = (MethodSignature) pjp.getSignature();  
        List<String> paramsType = Arrays.stream(signature.getParameterTypes())  
                .map(Class::getName)  
                .toList();  
  
        Object[] args = pjp.getArgs();  
        InvokeCtx invokeCtx = InvokeCtx.builder().  
                className(signature.getDeclaringTypeName()).  
                methodName(signature.getName()).  
                paramsType(JSON.toJSONString(paramsType)).  
                params(JSON.toJSONString(args)).  
                build();  
  
  
        String bizId = localMessage.bizId();  
        bizId = StrUtil.isNotBlank(bizId) ? SpELUtil.parseSpEL(bizId, signature, args) : RandomUtil.randomInt(100) + "";  
        LocalMessageDO localMessageDO = LocalMessageDO.builder().  
                maxRetryTimes(localMessage.retry()).  
                reqSnapshot(JSON.toJSONString(invokeCtx)).  
                createTime(LocalDateTime.now()).  
                updateTime(LocalDateTime.now()).  
                id(IdUtil.getSnowflake(1, 1).nextId()).  
                status(LocalMessageStatusEnum.INIT.getValue()).  
                bizId(bizId).  
                bizType(localMessage.bizType().getName()).  
                build();  
  
        localMessageService.save(localMessageDO);  
  
        Object result = null;  
        try {  
            result = pjp.proceed();  
            localMessageService.invokeSuccess(localMessageDO);  
            log.info("本地事务执行成功, 业务类型:{},业务ID:{}", localMessageDO.getBizType(), localMessageDO.getBizId());  
        } catch (Throwable e) {  
            log.warn("本地事务执行失败, 业务类型:{},业务ID:{}", localMessageDO.getBizType(), localMessageDO.getBizId());  
            localMessageService.retry(localMessageDO, e.getMessage());  
        } finally {  
            InvokeStateHandle.finish();  
        }  
        return result;  
    }  
}

LocalMessage:注解

java
@Target(ElementType.METHOD)  
@Retention(RetentionPolicy.RUNTIME)  
public @interface LocalMessage {  
    /**  
     * 重试次数     */    int retry() default 3;  
  
    /**  
     * 业务类型     */    LocalMessageBizTypeEnum bizType() default LocalMessageBizTypeEnum.NONE;  
  
    /**  
     * 业务id     */    String bizId();  
  
    /**  
     * 是否异步     */    boolean async() default false;  
}

LocalMessageService

java
public void retry(LocalMessageDO localMessageDO, String failReason) {  
    int retryTimes = Optional.ofNullable(localMessageDO.getRetryTimes()).orElse(0) + 1;  
  
    if (retryTimes > localMessageDO.getMaxRetryTimes()) {  
        invokeFail(localMessageDO, failReason);  
        log.warn("本地事务重试次数超过最大限制,bizId:{}", localMessageDO.getBizId());  
    } else {  
        localMessageDO.setRetryTimes(retryTimes);  
        invokeRetry(localMessageDO, failReason);  
    }  
}  
  
public void invokeSuccess(LocalMessageDO localMessageDO) {  
    localMessageDO.setStatus(LocalMessageStatusEnum.SUCCESS.getValue());  
    localMessageDO.setUpdateTime(LocalDateTime.now());  
    localMessageMapper.updateById(localMessageDO);  
}  
  
public void invokeFail(LocalMessageDO localMessageDO, String failReason) {  
    localMessageDO.setStatus(LocalMessageStatusEnum.FAIL.getValue());  
    localMessageDO.setUpdateTime(LocalDateTime.now());  
    localMessageDO.setFailReason(failReason);  
    localMessageMapper.updateById(localMessageDO);  
}  
  
public void invokeRetry(LocalMessageDO localMessageDO, String failReason) {  
    localMessageDO.setStatus(LocalMessageStatusEnum.RETRY.getValue());  
    localMessageDO.setUpdateTime(LocalDateTime.now());  
    localMessageDO.setFailReason(failReason);  
    localMessageMapper.updateById(localMessageDO);  
}

开发中遇见的问题

在实体类上使用了@Builder注解,当字段定义的顺序和查询顺序不一致时,会出现赋值错误的问题。 解决此问题最简单的方式是: 同时使用: @Builder @Getter @Setter @NoArgsConstructor @AllArgsConstructor