通过Redis+Mysql来自定义Spring-Statemachine的持久化

发布时间 2023-07-30 17:02:11作者: loveletters

我们在使用Spring状态机的时候,往往需要对于StateMachine持久化操作,但是官方为我们提供的基于redis的持久化并不是特别好,一方面是因为只存redis容易导致数据丢失,另一方面因为状态机的特性需要对应的StateMachine的数据永久有效,导致redis中的key永不过期。

我现在希望实现将StateMachine持久化到数据库跟redis中,redis的有效期为3天,查询redis中没有再去查询数据库,然后更新到redis中。

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-ui</artifactId>
            <version>1.6.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.statemachine</groupId>
            <artifactId>spring-statemachine-kryo</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.statemachine</groupId>
            <artifactId>spring-statemachine-starter</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

我们这里拿一个简单的审批流程举例,流程有这么几种状态CREATE、WAIT_REVIEW、WAIT_MODIFY、COMPLETED

enum class WorkflowState(val key: Int, val desc: String) {
    CREATE(1,"已创建"),
    WAIT_REVIEW(2,"已提交,待审核"),
    WAIT_MODIFY(3,"已驳回,待提交"),
    COMPLETED(4,"已完成");

    companion object {
        fun getByKey(key: Int): WorkflowState {
            for (e in WorkflowState.values()) {
                if (e.key == key) {
                    return e
                }
            }
            throw RuntimeException("enum not exists.")
        }
    }
}

对于流程的推进有这么几个事件

enum class WorkflowStateChangeEvent {
    /**
     *  提交
     */
    SUBMIT,

    /**
     *  拒绝
     */
    REJECT,

    /**
     *  重新提交
     */
    RE_SUBMIT,

    /**
     *  同意
     */
    AGREE

}

我们来配置一下流程中各个状态与事件的关系

@Configuration
@EnableStateMachine(name = ["workflowStateMachine"])
class WorkflowStateMachineConfig:StateMachineConfigurerAdapter<WorkflowState, WorkflowStateChangeEvent>() {
    override fun configure(states: StateMachineStateConfigurer<WorkflowState, WorkflowStateChangeEvent>) {
        states.withStates()
      // 初始状态为CREATE
            .initial(WorkflowState.CREATE)
            .states(EnumSet.allOf(WorkflowState::class.java))
    }

    override fun configure(transitions: StateMachineTransitionConfigurer<WorkflowState, WorkflowStateChangeEvent>) {
     // CREATE->WAIT_REVIEW 通过SUBMIT事件
      transitions
           .withExternal().source(WorkflowState.CREATE).target(WorkflowState.WAIT_REVIEW).event(WorkflowStateChangeEvent.SUBMIT)
            .and() // WAIT_REVIEW->WAIT_MODIFY 通过REJECT事件
           .withExternal().source(WorkflowState.WAIT_REVIEW).target(WorkflowState.WAIT_MODIFY).event(WorkflowStateChangeEvent.REJECT)
            .and() // WAIT_MODIFY->WAIT_REVIEW 通过RE_SUBMIT事件
            .withExternal().source(WorkflowState.WAIT_MODIFY).target(WorkflowState.WAIT_REVIEW).event(WorkflowStateChangeEvent.RE_SUBMIT)
            .and() // WAIT_REVIEW->COMPLETED 通过AGREE事件
            .withExternal().source(WorkflowState.WAIT_REVIEW).target(WorkflowState.COMPLETED).event(WorkflowStateChangeEvent.AGREE)

    }

}

自定义StateMachinePersister来持久化

@Component
class CustomStateMachinePersister<S,E>(
    stateMachinePersist:CustomStateMachinePersist<S,E>
): AbstractStateMachinePersister<S, E, String>(stateMachinePersist)
@Component
class CustomStateMachinePersist<S,E>(
    private val processStateMapper: ProcessStateMapper,
    redisConnectionFactory: RedisConnectionFactory
) :StateMachinePersist<S,E,String>{
    val persistenceRepository = PersistenceRepository<S,E>(redisConnectionFactory)

    override fun write(context: StateMachineContext<S, E>, contextObj: String) {
        val values = persistenceRepository.serialize(context)
        persistenceRepository.save(context,contextObj,3L,TimeUnit.DAYS)

        val wrapper = KtQueryWrapper(ProcessState::class.java).eq(ProcessState::code,contextObj)

        val processState = processStateMapper.selectOne(wrapper)
        if (Objects.nonNull(processState)){
            processState.value = values
            processStateMapper.updateById(processState)
            return
        }
        processStateMapper.insert(ProcessState(null,contextObj,values))

    }

    override fun read(contextObj: String): StateMachineContext<S, E>? {
        val context = persistenceRepository.getContext(contextObj)
        if (Objects.nonNull(context)){
            return context
        }

        val wrapper = KtQueryWrapper(ProcessState::class.java).eq(ProcessState::code,contextObj)
        val result = processStateMapper.selectOne(wrapper)
        if (Objects.isNull(result)){
            return null
        }
        val str = result.value
        persistenceRepository.save(str,contextObj,3L,TimeUnit.DAYS)
        return persistenceRepository.deserialize(str)
    }


}

这里的ProcessStateMapper为我们存储数据库中的mapper

interface ProcessStateMapper :BaseMapper<ProcessState>{
}

@TableName("t_process_state")
data class ProcessState(
    @TableId(type = IdType.AUTO)
    var id:Long?,
    var code:String,
    var value:ByteArray
)

PersistenceRepository是用来序列化跟反序列化StateMachineContext,以及存入redis中的方法。

class PersistenceRepository<S, E>(redisConnectionFactory: RedisConnectionFactory) {
    private val kryoThreadLocal = ThreadLocal.withInitial {
        val kryo = Kryo()
        kryo.addDefaultSerializer(
            StateMachineContext::class.java,
            StateMachineContextSerializer<S, E>()
        )
        kryo.addDefaultSerializer(MessageHeaders::class.java, MessageHeadersSerializer())
        kryo.addDefaultSerializer(UUID::class.java, UUIDSerializer())
        kryo
    }
    private val redisOperations: RedisOperations<String, ByteArray>

    init {
        redisOperations = createDefaultTemplate(redisConnectionFactory)
    }
 		fun save(byteArray: ByteArray,id: String, time: Long, timeUnit: TimeUnit){
        redisOperations.opsForValue()[id] = byteArray
        redisOperations.expire(id, time, timeUnit)
    }
    fun save(context: StateMachineContext<S, E>, id: String, time: Long, timeUnit: TimeUnit) {
        redisOperations.opsForValue()[id] = serialize(context)
        redisOperations.expire(id, time, timeUnit)
    }

    fun getContext(id: String): StateMachineContext<S, E>? {
        return deserialize(redisOperations.opsForValue()[id])
    }

    private fun createDefaultTemplate(connectionFactory: RedisConnectionFactory): RedisTemplate<String, ByteArray> {
        val template = RedisTemplate<String, ByteArray>()
        template.keySerializer = StringRedisSerializer()
        template.hashKeySerializer = StringRedisSerializer()
        template.setConnectionFactory(connectionFactory)
        template.afterPropertiesSet()
        return template
    }

    fun serialize(context: StateMachineContext<S, E>): ByteArray {
        val kryo = kryoThreadLocal.get()
        val out = ByteArrayOutputStream()
        val output = Output(out)
        kryo.writeObject(output, context)
        output.close()
        return out.toByteArray()
    }

    fun <S, E> deserialize(data: ByteArray?): StateMachineContext<S, E>? {
        return if ((data != null) && data.isNotEmpty()) {
            val kryo: Kryo = kryoThreadLocal.get()
            val input = ByteArrayInputStream(data)
            val kryoInput = Input(input)
            kryo.readObject(kryoInput, StateMachineContext::class.java) as StateMachineContext<S, E>
        } else {
            null
        }
    }
}

到此我们就已经实现了redis+mysql的持久化功能,简单的测试一下

@Service
class WorkflowServiceImpl(
    private val workflowStateMachine: StateMachine<WorkflowState, WorkflowStateChangeEvent>,
    private val stateMachineMemPersister: StateMachinePersister<WorkflowState, WorkflowStateChangeEvent, String>,
    private val workflowMapper: WorkflowMapper
) : ServiceImpl<WorkflowMapper, Workflow>(), IWorkflowService {

    @Synchronized
    fun sendEvent(changeEvent: WorkflowStateChangeEvent, workflow: Workflow): Boolean {
        var result = false

        try {
            workflowStateMachine.start()
            stateMachineMemPersister.restore(workflowStateMachine, workflow.code)
            val message = MessageBuilder.withPayload(changeEvent).setHeader("workflow", workflow).build()
            result = workflowStateMachine.sendEvent(message)
            if (!result) {
                return false
            }
            stateMachineMemPersister.persist(workflowStateMachine, workflow.code)


        } catch (e: Exception) {
            slog.error("流程操作失败:$e")
        } finally {
            workflowStateMachine.stop()
        }
        return result
    }

    override fun create(workflow: Workflow): Workflow {
        workflow.status = WorkflowState.WAIT_REVIEW.key
        workflow.code = UUID.randomUUID().toString()
        val result = sendEvent(WorkflowStateChangeEvent.SUBMIT, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.insert(workflow)
        return workflow
    }

    override fun reject(id: Long): Workflow {
        val workflow = workflowMapper.selectById(id)
        if (Objects.isNull(workflow)) {
            throw RuntimeException("流程不存在")
        }
        workflow.status = WorkflowState.WAIT_MODIFY.key
        val result = sendEvent(WorkflowStateChangeEvent.REJECT, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.updateById(workflow)
        return workflow
    }

    override fun agree(id: Long): Workflow {
        val workflow = workflowMapper.selectById(id)
        if (Objects.isNull(workflow)) {
            throw RuntimeException("流程不存在")
        }
        workflow.status = WorkflowState.COMPLETED.key
        val result = sendEvent(WorkflowStateChangeEvent.AGREE, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.updateById(workflow)
        return workflow
    }

    override fun reSubmit(id: Long): Workflow {
        val workflow = workflowMapper.selectById(id)
        if (Objects.isNull(workflow)) {
            throw RuntimeException("流程不存在")
        }
        workflow.status = WorkflowState.WAIT_REVIEW.key
        val result = sendEvent(WorkflowStateChangeEvent.RE_SUBMIT, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.updateById(workflow)
        return workflow
    }
}
@RestController
@RequestMapping("/workflow")
@Tag(name = "流程控制")
class WorkflowController(
    private val workflowService: IWorkflowService
) {


    @Operation(summary = "创建流程")
    @PostMapping("/create")
    fun create(@RequestBody workflow: Workflow):String{
        workflowService.create(workflow)
        return "success"
    }

    @GetMapping("/agree/{id}")
    fun agree(@PathVariable("id") id:Long):String{
        workflowService.agree(id)
        return "success"
    }

    @GetMapping("/reject/{id}")
    fun reject(@PathVariable("id") id:Long):String{
        workflowService.reject(id)
        return "success"
    }

    @GetMapping("/reSubmit/{id}")
    fun complete(@PathVariable("id") id:Long):String{
        workflowService.reSubmit(id)
        return "success"
    }
}

先创建一个流程,这个时候流程的状态应该是WAIT_REVIEW已提交,待审核,这个时候只能进行REJECTAGREE操作。

我们试一下RE_SUBMIT操作应该是不行的。

我们进行一下REJECT操作,应该是可以的。REJECT操作完成之后我们的状态变成了WAIT_MODIFY,只能进行RE_SUBMIT操作。

并且redis中也是存在的,删除redis中的数据也不影响我们状态的正确推进。

但是不能删除数据库中的持久化的数据,如果删除了并且redis中也过期后,会影响我们流程的正常推进。