自封线程池

发布时间 2023-10-10 11:42:28作者: 冷扑星

因为在实际的工作上,对于线程池这块也是基本都用的jdk的线程池,要不就是通过completefuture 要不直接就是Callable和Runnable ,因为没有做任务的封装,就导致对于任务的完成结果这类的数据只能通过日志进行查看,并且因为没有相关的规范接口,就导致实际上多线程应用起来很杂乱,所以想着能不能基于线程池来进行一次封装便于规范式开发

相关maven依赖

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>   // 这个依赖就只是用来生成随机的字符串用来测试B任务的
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

先定义bean

1.定义任务的结果状态,也就是执行成功或者失败的枚举,

public enum TaskResultType {
   Success,/*方法执行完成,业务结果也正确*/
   Failure,/*方法执行完成,业务结果错误*/
   Exception/*方法执行抛出了异常*/
}

2.对于任务结果本身的返回上 任务参数的意义也就是后期的出错任务的排查工作,任务执行时间就是个数值的记录

@Data
@AllArgsConstructor
public class TaskResult<R> {
   //方法执行结果
   private final TaskResultType resultType;
   //方法执行后的结果数据
   private final R returnValue;
   //异常原因
   private final String failReason;
   //任务参数
   private Task task;
   //任务执行时间
   private LocalDateTime processTime;

}

规范的定义

1.任务执行器的定义 里面的taskExecute 就是具体想要执行的业务代码逻辑 ,就把它理解为run 方法里面的具体代码就行

public interface ITaskExecutor<R> {
    TaskResult<R> taskExecute(Task data);
}

2.对于任务本身的封装 因为在出错的时候需要数据来排查,所以我需要tracId 来查找任务上下文数据,还需要 t 任务内容 ,泛型是因为他可以兼容各种不同的参数类型,exeCutorName是作为任务执行器的选择,因为 在设想中 线程池能够完成不同类型的任务

使用的方式 就是通过new Task 把想要执行的方法的参数给包装成 Task的形式进行传递到线程池中

@Data
public class Task<T> {
    
    private int traceId;
    //任务内容
    private  T t;
    //工作名称 用于选择 任务执行器
    private String exeCutorName;
}

线程池本身

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/***
 *  类说明:框架的主体类,也是调用者主要使用的类
 这边的线程数和最大线程数可以参照网上的公式,但是太复杂了,公式我贴在最后
 */
public class PendingJobPool {
    //框架运行时的线程数,与机器的CPU数相同
    private static final int THREAD_COUNTS
            = Runtime.getRuntime().availableProcessors();
    private static BlockingQueue<Runnable> taskQueue
            = new ArrayBlockingQueue<Runnable>(5000);
    //这里建议用spring的  ThreadPoolTaskExecutor 他可以命名线程池的前缀,并且他可以使用spring的扩展装饰者模式更灵活
    private static ExecutorService taskExecutor
            = new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS*2,
            60, TimeUnit.SECONDS, taskQueue,new ThreadPoolExecutor.CallerRunsPolicy());


    //用于注册工作任务类型 String 就是工作名
    private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap
            = new ConcurrentHashMap<String, JobInfo<?>>();


    public static Map<String, JobInfo<?>> getMap() {
        return jobInfoMap;
    }

    //以单例模式启动
    private PendingJobPool() {
    }

    private static class JobPoolHolder {
        public static PendingJobPool pool = new PendingJobPool();
    }

    public static PendingJobPool getInstance() {
        return JobPoolHolder.pool;
    }

    //对工作中的任务进行包装,提交给线程池使用,
    //并将处理任务的结果,写入缓存以供查询
    private static class PendingTask<T, R> implements Runnable {

        private JobInfo<R> jobInfo;
        //工作的数据
        private Task<T> task;

        public PendingTask(JobInfo<R> jobInfo, Task task) {
            this.jobInfo = jobInfo;
            this.task = task;
        }

        @Override
        public void run() {
            R r = null;
            ITaskExecutor<R> taskProcesser
                    = jobInfo.getTaskProcesser();
            TaskResult<R> result = null;
            try {
                result = taskProcesser.taskExecute(task);
                LocalDateTime now = LocalDateTime.now();

                if (result == null) {
                    result = new TaskResult<R>(TaskResultType.Exception, r
                            , "result is null",task,now);
                }

            } catch (Exception e) {
                e.printStackTrace();
                result = new TaskResult<R>(TaskResultType.Exception, r
                        , e.getMessage(),task,LocalDateTime.now());
            } finally {
                jobInfo.addTaskResult(result);
            }

        }
    }

    //调用者提交工作中的任务
    public <T, R> void putTask(  Task<T> t) {
        JobInfo<R> jobInfo = getJob(t.getExeCutorName());
        PendingTask<T, R> task = new PendingTask(jobInfo, t);
        taskExecutor.execute(task);
    }


    //调用者注册工作,如工作名,任务的处理器等等
    public <R> void registerJob(String jobName,
                                ITaskExecutor<R> taskProcesser) {
        JobInfo<R> jobInfo =
                new JobInfo<R>(jobName, taskProcesser);
        if (jobInfoMap.putIfAbsent(jobName, jobInfo) != null) {
            throw new RuntimeException(jobName + "已经注册!");
        }

    }

    //搜索工作的信息
    private <R> JobInfo<R> getJob(String jobName) {
        JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
        if (null == jobInfo) {
            throw new RuntimeException(jobName + "是非法任务!");
        }
        return jobInfo;
    }

    //获得工作的整体处理进度
    public <R> String getTaskProgess(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTotalProcess();
    }

    //获得指定的工作的每个任务的处理详情
    public <R> List<TaskResult<R>> getTaskDetail(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTaskDetail();
    }

}

因为之前提到不同类型的任务需要有不同类型的处理器,所以就有了这个类JobInfo 里面对于任务类型的整体数据

就比如A类型的任务的配套执行器,A类型任务总共的执行次数,成功次数等,以及A类型任务的所有任务结果都保存在了taskDetailQueues 里面,但是因为这个任务结果时间久了就没任何意义了,所以这任务数据可以去定时删除clearResultQueue(在测试下 判断条件是 所有任务都执行过了,并且最后一次任务的执行时间在15s前)

/***
 *类说明:提交给框架执行的工作实体类,
* 工作:表示本批次需要处理的同性质任务(Task)的一个集合
 */
@Data
public class JobInfo<R> {
   //工作名,用以区分框架中唯一的工作
   private final String jobName;

   //处理工作中任务的处理器
   private final ITaskExecutor<R> taskProcesser;
   //任务的成功次数
   private AtomicInteger successCount;
   //工作中任务目前已经处理的次数
   private AtomicInteger taskProcessCount;
   //工作中任务数
   private  AtomicInteger taskCount;
   //存放每个任务的处理结果,供查询用
   private List<TaskResult<R>> taskDetailQueues;

   public JobInfo(String jobName,
      ITaskExecutor<R> taskProcesser
       ) {
         this.jobName = jobName;
         this.taskCount = new AtomicInteger(0);;
         successCount = new AtomicInteger(0);
         taskProcessCount = new AtomicInteger(0);
         this.taskProcesser = taskProcesser;
         taskDetailQueues = Collections.synchronizedList(new ArrayList<>());

   }


    //提供工作的整体进度信息
    public String getTotalProcess() {
        return "Success["+successCount.get()+"]/Current["+taskProcessCount.get()
                +"] Total["+ taskCount +"]";
    }
   
   //提供工作中每个任务的处理结果
   public  List<TaskResult<R>> getTaskDetail(){
      return  taskDetailQueues;
   }
   
   //每个任务处理完成后,记录任务的处理结果,因为从业务应用的角度来说,
//  对查询任务进度数据的一致性要不高
// 我们保证最终一致性即可,无需对整个方法加锁
   public void addTaskResult(TaskResult<R> taskResult){
      if(TaskResultType.Success.equals(taskResult.getResultType())){
         successCount.incrementAndGet();
      }
      taskProcessCount.incrementAndGet();
      taskCount.incrementAndGet();
      taskDetailQueues.add(taskResult);
   }
   //删除任务结果队列,清除任务相关数据 ,减少内存空间 ,适合定时任务
   public void clearResultQueue(){
      successCount = new AtomicInteger(0);
      taskProcessCount = new AtomicInteger(0);
      taskCount=new AtomicInteger(0);
      taskDetailQueues = Collections.synchronizedList(new ArrayList<>());
   }

}

代码示例:

定义2个任务的执行器

/**
 * 实际的任务执行器 Thread.sleep(taskTime)相当于就是任务的执行
 */
public class ATaskExecutor implements ITaskExecutor<Integer> {

    @Override
    public TaskResult<Integer> taskExecute(Task task) {
       System.out.println("这是工作A在执行相应的任务,任务数据=" + task.toString());
        Random r = new Random();
        int taskTime = r.nextInt(500);
        AppTest.sleep(taskTime);
        //任务成功
        LocalDateTime processTime = LocalDateTime.now();
        if (taskTime <= 200) {
            Integer dataResult = 1;
            return new TaskResult<Integer>(TaskResultType.Success, dataResult, "success", task, processTime);

        } else if (taskTime > 201 && taskTime <= 400) {//任务出现问题
            return new TaskResult<Integer>(TaskResultType.Failure, -1, "Failure", task, processTime);
        } else {//抛出异常
            try {
                throw new RuntimeException("发生异常!!");
            } catch (Exception e) {
                return new TaskResult<Integer>(TaskResultType.Exception,
                        -2, e.getMessage(), task, processTime);
            }
        }
    }

}
/**
 *实际的任务执行器 Thread.sleep(taskTime)相当于就是任务的执行
 */
public class BTaskExecutor implements ITaskExecutor<String> {

   @Override
   public TaskResult<String> taskExecute(Task task) {
      System.out.println("这是工作B,任务数据="+ task.toString());
      Random r = new Random();
      int taskTime = r.nextInt(500);
      try {
         Thread.sleep(taskTime);
      } catch (InterruptedException e) {

      }
      LocalDateTime processTime = LocalDateTime.now();
      //任务成功
      if (taskTime <= 300) {
         return new TaskResult<String>(TaskResultType.Success, "任务B执行成功", "success",task,processTime);
      } else if (taskTime > 301 && taskTime <= 400) {//任务出现问题
         return new TaskResult<String>(TaskResultType.Failure, "任务B执行失败", "Failure",task,processTime);
      } else {//抛出异常
         try {
            throw new RuntimeException("发生异常!!");
         } catch (Exception e) {
            return new TaskResult<String>(TaskResultType.Exception,
                  "任务B 因为抛出异常执行失败", e.getMessage(),task,processTime);
         }
      }
   }

}

main 方法执行

public class AppTest {


    public static void main(String[] args) throws Exception {
        ATaskExecutor aTaskExecutor = new ATaskExecutor();
        BTaskExecutor bTaskExecutor = new BTaskExecutor();
        PendingJobPool pool = PendingJobPool.getInstance();
        //线程池注册工作
        pool.registerJob("ATask", aTaskExecutor);
        pool.registerJob("BTask", bTaskExecutor);
        Random r = new Random();
        for (int i = 0; i < 5; i++) {

            Task<Integer> aTask = new Task<>();
            int aTaskTraceId = r.nextInt(1000);
            aTask.setT(r.nextInt(1000));
            System.out.println("aTask的traceID=" + aTaskTraceId);
            aTask.setTraceId(aTaskTraceId);
            aTask.setExeCutorName("ATask");
            pool.putTask(aTask);

            int bTaskTraceId = r.nextInt(1000);
            System.out.println("bTask的traceID=" + bTaskTraceId);
            Task<String> bTask = new Task<>();
            bTask.setTraceId(bTaskTraceId);
            bTask.setExeCutorName("BTask");
            bTask.setT(RandomStringUtils.random(3));
            pool.putTask(bTask);

        }
        Map<String, JobInfo<?>> map = PendingJobPool.getMap();


        //没有新任务时 删除任务结果队列,清除任务相关数据 ,减少内存空间 ,适合定时任务


        new Thread(() -> {
            while (true) {
                sleep(3000);
                map.values().stream().forEach((jobInfo) -> {
                    LocalDateTime now = LocalDateTime.now();
                    //删除数据要求是 任务全部执行成功并且最后一次任务的执行时间大于15s
                    if (jobInfo.getTaskProcessCount().get()!=0&&jobInfo.getTaskProcessCount().get() == jobInfo.getTaskProcessCount().get() && jobInfo.getTaskDetailQueues().get(jobInfo.getTaskProcessCount().get()-1).getProcessTime().isBefore(now.minusSeconds(15))) {
                        System.out.println(jobInfo.getJobName()+"任务均执行完成,此时最后执行完成的任务的完成时间是" + jobInfo.getTaskDetailQueues().get(jobInfo.getTaskProcessCount().get()-1).getProcessTime());
                        jobInfo.clearResultQueue();
                    }
                });
            }

        }).start();

        new Thread(()->{
            //查看任务进度
            while (true) {
                sleep(3000);
                map.keySet().stream().forEach((jobName) -> {
                    List<TaskResult<String>> taskDetail = pool.getTaskDetail(jobName);
                    if (!taskDetail.isEmpty()) {
                        System.out.println(jobName + "获得工作的整体处理进度=" + pool.getTaskProgess(jobName));
                        System.out.println(jobName + "任务详情=" + taskDetail);
                    }else {
                        System.out.println(jobName + "此时没有相对应的任务");
                    }
                });
            }
        }).start();



        sleep(10000);

        Task<String> bTask = new Task<>();
        int bTaskTraceId = r.nextInt(1000);
        bTask.setTraceId(bTaskTraceId);
        bTask.setExeCutorName("BTask");
        bTask.setT(RandomStringUtils.random(3));
        pool.putTask(bTask);


    }

    public static void sleep(Integer time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

线程池参数配置

等待队列数=queuePoolSize=(corePoolSize / taskCose) * responseTime
          代入数据maxPoolSize=(80/5~10)*1=16~8,也就是说,queuePoolSize的取值范围是8~16,意思是等待队列里面的线程,可以等待8~16秒,超过了这个时间就需要开新的线程来执行。

最大线程池数=maxPoolSize=(max(taskSecond) - queuePoolSize) / (1 / taskCost)=(100-(8~6)) / (1/5~10) = (92~94) / ( 1/ 5~10) =(92~94)* (5~10)=(92*5)~(94*10)=460~940
           最大线程数=( 最大任务数-等待队列容量)/ 每个线程每秒处理能力

因为是最近有空了才想到是去封装一下的,还没有在工作中使用过