轻量级状态机框架 Sateless4j 与重入锁的定义

时间:2021-08-29 01:25:20

轻量级状态机框架 Sateless4j 实践 - 知乎

一,概括

在使用状态机时,入参中都会定义这样一个可重入锁的变量,关于以下问题进行探讨

1.为什么定义这样一个变量,有什么用,定义为可重入还是不可重复

2.可重入/不可重入,区别与影响

3.可重入/不可重入,对业务流程影响

@ApiModelProperty("重入锁")
private Boolean reentrancyLock = false;
@Data
public class TaskStatusEventDTO {

    @ApiModelProperty("操作类型")
    private String eventTrigger;

    @ApiModelProperty("运单号")
    private String ylTaskId;

    @ApiModelProperty("子运单号")
    private String taskChildId;

    @ApiModelProperty("上下文")
    private Object context;

    @ApiModelProperty("重入锁")
    //重入锁r
    private Boolean reentrancyLock = false;

    @ApiModelProperty("状态变更请求入参")
    private TaskStatusChangeReqDTO taskStatusChangeReqDTO;

}

二,重入锁使用场景

1.可重入

private Boolean reentrancyLock =true;
if (request.getReentrancyLock()) {
    stateMachine.fire(event);
    return stateMachine.getState().getCode();
}

直接执行状态机触发,并返回该请求最终状态

2.不可重入

会根据单号加锁,代码如下

加锁成功,触发状态机流程,stateMachine.fire(event);

加锁失败,不触发状态机流程

最终都会返回次态,return stateMachine.getState().getCode();

String lockName = Constant.YL_TMS_TASK_NO_LOCK + request.getYlTaskId();
lockService.distributedLockFire("运单状态流转", lockName, event, stateMachine, redisTemplate);
return stateMachine.getState().getCode();

锁代码

@Service
@Slf4j
public class LockService<C,R> {

    @Autowired
    @Resource(name = "ylTmsPlatformRedisTemplate")
    private RedisTemplate redisTemplate;


    public void distributedLockFire(String handlerName, String lockName, R event, StateMachine<C, R> stateMachine, RedisTemplate redis) {
        if (ToolUtil.isOneEmpty(handlerName, lockName, event, stateMachine)) {
            throw new YlTmsPlatformException(ErrConstant.INVALID_DATAFILED, "参数不全");
        }
        if (null == redis) {
            redis = redisTemplate;
        }
        NewRedisDistributedLock newRedisDistributedLock = new NewRedisDistributedLock(redis);
        RedisLock lock = newRedisDistributedLock.getLock(lockName, 60L);
        try {
            if (lock != null) {
                log.debug("分布式锁处理器开始执行[{}},{}]", handlerName, lockName);
                stateMachine.fire(event);
            }
        } finally {
            log.debug("分布式锁处理器执行[{},{}]结束", handlerName, lockName);
            assert lock != null;
            lock.unlock();
        }
    }
}

是否可重入代码 

    public String fire(TaskStatusEventDTO request) {
        TaskOperate event = TaskOperate.getEvent(request.getEventTrigger());
        if (null == event) {
            throw new YlTmsPlatformException(ErrConstant.BUSI_RETURN_ERR, "非法的操作");
        }
        if (StringUtils.isBlank(request.getYlTaskId())) {
            throw new YlTmsPlatformException(ErrConstant.INVALID_DATAFILED, "状态扭转异常,运单号不存在");
        }
        YlTask task = taskService.selectTaskByYlTaskId(request.getYlTaskId());
        if (null == task) {
            throw new YlTmsPlatformException(ErrConstant.UNFOUND_DATA, "状态扭转异常,运单查询失败");
        }
        TaskStatusEnum statusEnum = TaskStatusEnum.fromCode(task.getTaskStatus());
        if (null == statusEnum) {
            throw new YlTmsPlatformException(ErrConstant.BUSI_RETURN_ERR, "当前运单状态异常,请检查运单状态");
        }
        StateMachine<TaskStatusEnum, TaskOperate> stateMachine = TaskStateConver.checkFireOrThrow(task.getTaskStatus(), event);
        //可重入
        if (request.getReentrancyLock()) {
            stateMachine.fire(event);
            return stateMachine.getState().getCode();
        }
        String lockName = Constant.YL_TMS_TASK_NO_LOCK + request.getYlTaskId();
        lockService.distributedLockFire("运单状态流转", lockName, event, stateMachine, redisTemplate);
        return stateMachine.getState().getCode();
    }
}

加锁逻辑,即setNx

 public RedisLock getLock(final String key, final long expireSeconds, int maxRetryTimes, long retryIntervalTimeMillis) {
        final String value = UUID.randomUUID().toString();

        for(int i = 0; i < maxRetryTimes; ++i) {
            String status = (String)this.stringRedisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    Jedis jedis = (Jedis)connection.getNativeConnection();
                    String status = jedis.set(key, value, "nx", "ex", expireSeconds);
                    return status;
                }
            });
            if ("OK".equals(status) && value.equals(this.stringRedisTemplate.opsForValue().get(key))) {
                logger.debug("线程[{}]获取了锁,Hash值[{}],尝试次数[{}]", Thread.currentThread().getName() + " :" + value, i + 1);
                return new RedisLockInner(this.stringRedisTemplate, key, value);
            }

            try {
                if (retryIntervalTimeMillis > 0L) {
                    Thread.sleep(retryIntervalTimeMillis);
                } else {
                    Thread.sleep(10L);
                }
            } catch (InterruptedException var11) {
                logger.error("线程[{}]中断,竞争锁失败,Hash值[{}]", Thread.currentThread(), value);
                break;
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        logger.debug("线程[{}]竞争锁失败,Hash值[{}],尝试次数[{}]", Thread.currentThread().getName() + " :" + value, maxRetryTimes);
        return null;
    }

三,举例重现,并发请求

请求1

请求2

请求3

可重入

请求1,fire,请求2,fire,请求3,fire

不可重入

请求1,redis加锁,未释放

请求2,加锁失败,不执行fire(不执行请求2状态变更),返回请求1次态

请求3,加锁失败,同请求2