本地消息表是一个实现弱最终一致性的方案,通过对操作状态进行本地记录,失败时进行一定次数的重试,来保证调用时成功的可能。
核心代码:
LocalMessageDO:本地消息表
@Builder
@Getter
@Setter
@TableName("local_message")
public class LocalMessageDO {
/**
* 主键
*/
private Long id;
/**
* 业务ID
*/
private String bizId;
/**
* 业务类型
*/
private String bizType;
/**
* 状态 INIT, SUCCESS, FAIL
*/
private String status;
/**
* 已重试次数
*/
private Integer retryTimes;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 请求快照
*/
private String reqSnapshot;
/**
* 失败原因
*/
private String failReason;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}InvokeCtx:请求的参数
@Builder
@Getter
@Setter
public class InvokeCtx {
private String className;
private String methodName;
private String paramsType;
private String params;
}InvokeStateHandle:状态流转处理
public class InvokeStateHandle {
private static final ThreadLocal<Boolean> isProcessing
= ThreadLocal.withInitial(() -> false);
public static boolean inProcess() {
return isProcessing.get();
}
public static void process() {
isProcessing.set(true);
}
public static void finish() {
isProcessing.remove();
}
}ScanTask:对事件进行扫描并重试
@Component
@Slf4j
public class ScanTask {
@Autowired
private LocalMessageService localMessageService;
@Autowired
private RedissonClient redissonClient;
@Autowired
private ApplicationContext applicationContext;
private static final List<String> waitRetryStatusList = List.of(
LocalMessageStatusEnum.RETRY.getValue()
);
// @Scheduled(cron = "30 * * * * ?")
@Scheduled(initialDelay = 5000, fixedRate = 2000) // 等待 5 秒 -> 第一次执行任务 -> 每隔 2 秒重复执行任务
public void scan() {
log.info("扫描本地事务");
localMessageService.loadWaitRetryRecords(waitRetryStatusList).forEach(this::doInvoke);
}
private void invoke(LocalMessageDO localMessageDO, boolean sync) {
if (sync) {
doSyncInvoke(localMessageDO);
} else {
doInvoke(localMessageDO);
}
}
private void doSyncInvoke(LocalMessageDO localMessageDO) {
// todo
}
private void doInvoke(LocalMessageDO localMessageDO) {
RLock lock = redissonClient.getLock(localMessageDO.getBizId());
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
if (StrUtil.isBlank(localMessageDO.getReqSnapshot())) {
log.warn("本地事务记录快照为空,bizId:{}", localMessageDO.getBizId());
return;
}
InvokeStateHandle.process();
InvokeCtx ctx = JSON.parseObject(localMessageDO.getReqSnapshot(), InvokeCtx.class);
Class<?> target = Class.forName(ctx.getClassName());
Object bean = applicationContext.getBean(target);
List<Class<?>> paramsType = getParamsType(JSON.parseArray(ctx.getParamsType(), String.class));
Method method = target.getMethod(ctx.getMethodName(), paramsType.toArray(Class[]::new));
Object[] args = getArgs(paramsType, ctx.getParams());
method.invoke(bean, args);
localMessageService.invokeSuccess(localMessageDO);
}
} catch (Exception e) {
localMessageService.retry(localMessageDO, e.getMessage());
} finally {
InvokeStateHandle.finish();
lock.unlock();
}
}
private Object[] getArgs(List<Class<?>> paramsType, String params) {
Object[] result = new Object[paramsType.size()];
JSONArray args = JSON.parseArray(params);
for (int i = 0; i < args.size(); i++) {
result[i] = JSON.parseObject(args.getString(i), paramsType.get(i));
}
return result;
}
private List<Class<?>> getParamsType(List<String> classes) {
List<Class<?>> result = new ArrayList<>();
for (String s : classes) {
try {
result.add(Class.forName(s));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
return result;
}
}LocalMessageAspect:AOP逻辑
@Aspect
@Component
@Slf4j
public class LocalMessageAspect {
@Autowired
private LocalMessageService localMessageService;
@Around("@annotation(localMessage)")
public Object process(ProceedingJoinPoint pjp, LocalMessage localMessage) throws Throwable {
// 每一次对标注方法的调用都会被AOP代理,使用状态,可以避免对参数的重复记录
if (InvokeStateHandle.inProcess()) {
return pjp.proceed();
}
MethodSignature signature = (MethodSignature) pjp.getSignature();
List<String> paramsType = Arrays.stream(signature.getParameterTypes())
.map(Class::getName)
.toList();
Object[] args = pjp.getArgs();
InvokeCtx invokeCtx = InvokeCtx.builder().
className(signature.getDeclaringTypeName()).
methodName(signature.getName()).
paramsType(JSON.toJSONString(paramsType)).
params(JSON.toJSONString(args)).
build();
String bizId = localMessage.bizId();
bizId = StrUtil.isNotBlank(bizId) ? SpELUtil.parseSpEL(bizId, signature, args) : RandomUtil.randomInt(100) + "";
LocalMessageDO localMessageDO = LocalMessageDO.builder().
maxRetryTimes(localMessage.retry()).
reqSnapshot(JSON.toJSONString(invokeCtx)).
createTime(LocalDateTime.now()).
updateTime(LocalDateTime.now()).
id(IdUtil.getSnowflake(1, 1).nextId()).
status(LocalMessageStatusEnum.INIT.getValue()).
bizId(bizId).
bizType(localMessage.bizType().getName()).
build();
localMessageService.save(localMessageDO);
Object result = null;
try {
result = pjp.proceed();
localMessageService.invokeSuccess(localMessageDO);
log.info("本地事务执行成功, 业务类型:{},业务ID:{}", localMessageDO.getBizType(), localMessageDO.getBizId());
} catch (Throwable e) {
log.warn("本地事务执行失败, 业务类型:{},业务ID:{}", localMessageDO.getBizType(), localMessageDO.getBizId());
localMessageService.retry(localMessageDO, e.getMessage());
} finally {
InvokeStateHandle.finish();
}
return result;
}
}LocalMessage:注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LocalMessage {
/**
* 重试次数 */ int retry() default 3;
/**
* 业务类型 */ LocalMessageBizTypeEnum bizType() default LocalMessageBizTypeEnum.NONE;
/**
* 业务id */ String bizId();
/**
* 是否异步 */ boolean async() default false;
}LocalMessageService
public void retry(LocalMessageDO localMessageDO, String failReason) {
int retryTimes = Optional.ofNullable(localMessageDO.getRetryTimes()).orElse(0) + 1;
if (retryTimes > localMessageDO.getMaxRetryTimes()) {
invokeFail(localMessageDO, failReason);
log.warn("本地事务重试次数超过最大限制,bizId:{}", localMessageDO.getBizId());
} else {
localMessageDO.setRetryTimes(retryTimes);
invokeRetry(localMessageDO, failReason);
}
}
public void invokeSuccess(LocalMessageDO localMessageDO) {
localMessageDO.setStatus(LocalMessageStatusEnum.SUCCESS.getValue());
localMessageDO.setUpdateTime(LocalDateTime.now());
localMessageMapper.updateById(localMessageDO);
}
public void invokeFail(LocalMessageDO localMessageDO, String failReason) {
localMessageDO.setStatus(LocalMessageStatusEnum.FAIL.getValue());
localMessageDO.setUpdateTime(LocalDateTime.now());
localMessageDO.setFailReason(failReason);
localMessageMapper.updateById(localMessageDO);
}
public void invokeRetry(LocalMessageDO localMessageDO, String failReason) {
localMessageDO.setStatus(LocalMessageStatusEnum.RETRY.getValue());
localMessageDO.setUpdateTime(LocalDateTime.now());
localMessageDO.setFailReason(failReason);
localMessageMapper.updateById(localMessageDO);
}开发中遇见的问题
在实体类上使用了@Builder注解,当字段定义的顺序和查询顺序不一致时,会出现赋值错误的问题。
解决此问题最简单的方式是:
同时使用:
@Builder
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor