这不仅仅是从可扩展性和可维护性的角度出发,其实我们做架构做稳定性、隔离是一种减少影响面的基本手段,类似的隔离环境做灰度、分批发布等,这里不做扩展。

先向消息服务器发送一条预处理消息
当本地数据库变更提交之后、再向消息服务器发送一条确认发送的消息
如果本地数据库变更失败、则向消息服务器发送一条取消发送的消息
如果长时间没有向消息服务器发生确认发送的消息,消息系统则会回调一个提前约定的接口、来查看本地业务是否成功,以此决定是否真正发生消息
创建一个消息发送表,将要发送的消息插入到该表中,同本地业务在一个数据库事务中进行提交
之后在由一个定时任务来轮询发送、直到发送成功后在删除当前表记录
创建一个消息发送表,将要发送的消息插入到该表中,同本地业务在一个数据库事务中进行提交
向消息服务器发送消息
发送成功则删除掉当前表记录
对于没有发送成功的消息(也就是表里面没有被删除的记录),再由定时任务来轮询发送
public class DefaultStateProcessRegistry implements BeanPostProcessor {
/**
* 第一层key是订单状态。
* 第二层key是订单状态对应的事件,一个状态可以有多个事件。
* 第三层key是具体场景code,场景下对应的多个处理器,需要后续进行过滤选择出一个具体的执行。
*/
private static Map<String, Map<String, Map<String, List<AbstractStateProcessor>>>> stateProcessMap = new ConcurrentHashMap<>();
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AbstractStateProcessor && bean.getClass().isAnnotationPresent(OrderProcessor.class)) {
OrderProcessor annotation = bean.getClass().getAnnotation(OrderProcessor.class);
String[] states = annotation.state();
String event = annotation.event();
String[] bizCodes = annotation.bizCode().length == 0 ? new String[]{"#"} : annotation.bizCode();
String[] sceneIds = annotation.sceneId().length == 0 ? new String[]{"#"} : annotation.sceneId();
initProcessMap(states, event, bizCodes, sceneIds, stateProcessMap, (AbstractStateProcessor) bean);
}
return bean;
}
private <E extends StateProcessor> void initProcessMap(String[] states, String event, String[] bizCodes, String[] sceneIds,
Map<String, Map<String, Map<String, List<E>>>> map, E processor) {
for (String bizCode : bizCodes) {
for (String sceneId : sceneIds) {
Arrays.asList(states).parallelStream().forEach(orderStateEnum -> {
registerStateHandlers(orderStateEnum, event, bizCode, sceneId, map, processor);
});
}
}
}
/**
* 初始化状态机处理器
*/
public <E extends StateProcessor> void registerStateHandlers(String orderStateEnum, String event, String bizCode, String sceneId,
Map<String, Map<String, Map<String, List<E>>>> map, E processor) {
// state维度
if (!map.containsKey(orderStateEnum)) {
map.put(orderStateEnum, new ConcurrentHashMap<>());
}
Map<String, Map<String, List<E>>> stateTransformEventEnumMap = map.get(orderStateEnum);
// event维度
if (!stateTransformEventEnumMap.containsKey(event)) {
stateTransformEventEnumMap.put(event, new ConcurrentHashMap<>());
}
// bizCode and sceneId
Map<String, List<E>> processorMap = stateTransformEventEnumMap.get(event);
String bizCodeAndSceneId = bizCode + "@" + sceneId;
if (!processorMap.containsKey(bizCodeAndSceneId)) {
processorMap.put(bizCodeAndSceneId, new CopyOnWriteArrayList<>());
}
processorMap.get(bizCodeAndSceneId).add(processor);
}
}
在状态机OrderFsmEngine的sendEvent入口处,针对同一个订单维度加锁(redis分布式锁)、同一时间只允许有一个状态变更操作进行,其他请求则进行排队等待。
在数据库层对当前state做校验、类似与乐观锁方式。最终是将其他请求抛错、由上游业务进行处理。