多线程队列接收

发布时间 2023-03-30 23:50:40作者: 余生请多指教ANT
package org.example.file.mult;
//函数值接口
@FunctionalInterface
public interface FuncationCallback {
    void callback(String param);
}

 

回调接收

 

package org.example.file.mult;

import java.util.ArrayList;

public class FuncationCallbackImpl {
   //函数式 回调参数处理
    public FuncationCallbackImpl(ArrayList arrayList,  FuncationCallback funcationCallback) {
        arrayList.forEach(ele->{
            funcationCallback.callback(ele+"456789");
        });

    }
}

 

队列业务实现

 

package org.example.file.mult;


import org.apache.tomcat.util.threads.TaskThreadFactory;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Securite {
    //有界队列,根据实际业务设置即可
    public static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
   //静态线程池,一会多线程执行能用到,根据自己的机器性能配置即可
    public static Executor executor = new ThreadPoolExecutor(3, 10, 2000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TaskThreadFactory("测试队列", false, 7));

    public Securite() {
    }


    public void exec(Integer ele) {
        queue.offer(ele);
    }
    //全局静态 内存可见性常量,空值任务暂停使用
    public static volatile int a = 0;//刷回主内存

    //静态内部类,有利于在主程序空值进度
    public static class MultTask implements Runnable {


        private ArrayBlockingQueue<Integer> arrayBlockingQueue1;
        //线程-队列构造器  便于每个线程都能冲全局队列取值
        public MultTask(ArrayBlockingQueue<Integer> arrayBlockingQueue1) {
            this.arrayBlockingQueue1 = arrayBlockingQueue1;
        }


        @Override
        public void run() {
            //循环,这里要注意和arrayBlockingQueue1.take()配合使用,避免空悬打满cpu
            while (true) {
                try {
                    //当参数等于8时,后面的线程停止取队列的元素进行操作,来达到外界可控的目的
                    if (a == 8) {
                        System.out.println("开始终端了");
                        Thread.sleep(5000);
                        System.out.println("5秒后继续");
//                        a = 51;
                        return;
                    }
                    Integer take = arrayBlockingQueue1.take();
                    String name = Thread.currentThread().getName();
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(take);
                    //队列每次取值后再回调函数里处理后的值
                    new FuncationCallbackImpl(arrayList, new FuncationCallback() {
                        @Override
                        public void callback(String param) {
                            System.out.println("返回param:" + param);
                        }
                    });
                    //TODO 根据自己的业务进行后续处理
                    System.out.println(">>>>>>>>>>>>>>>>>>>>>:" + take + "<><><><><><><>:" + name);
                } catch (InterruptedException e) {
                }

            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Securite securite = new Securite();
        for (int i = 0; i < 10; i++) {
            if (i == 8) {
                a = 8;
            }
            securite.exec(i);
            executor.execute(new MultTask(queue));

        }
        System.out.println("10s后在运行一次");
        Thread.sleep(1500);
        securite.exec(10);
        executor.execute(new MultTask(queue));

    }
}