线程池 队列生产者消费者模型实现

时间:2021-03-15 17:37:19
static {
        try {
            ExecutorService pool = null;
            try {
                // 创建一个可重用固定线程数的线程池
                pool = Executors.newFixedThreadPool(10);
                // 创建10个任务并执行
                for (int i = 0; i < 10; i++) {
                    Runnable r = new GenerateStaticHtml(infoIdQueue);
                    // 将线程放入池中进行执行
                    pool.execute(r);
                }
                // 关闭线程池
                pool.shutdown();
            } catch (Exception e) {
                pool.shutdown();
                e.printStackTrace();
            } finally {
                if (pool != null) {
                    pool.shutdown();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
//线程
public class GenerateStaticHtml implements Runnable {
    private final ConcurrentLinkedQueue<GenerateInfo> infoIdQueue;
    private volatile boolean stop = false;
    public GenerateStaticHtml(ConcurrentLinkedQueue<GenerateInfo> infoIdQueue) {
        this.infoIdQueue = infoIdQueue;
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted() && !stop) {
            while (!infoIdQueue.isEmpty()) {
                //System.out.println(Thread.currentThread().getName() + " ====== " + infoIdQueue.poll());
                try {
                    GenerateInfo info=infoIdQueue.poll();
                    //这里写你的业务逻辑 
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
            //System.out.println("==");
        }
    }

}
//由于上面的程序没有sleep相当于进入死循环,以下为改进方式
//方案一:适当休眠
try {
                Thread.sleep(3 * 1000);
            } catch (InterruptedException ex) {
                logger.error(logMsg(ex.getMessage()));
            }
//方案二就是去掉死循环,有需要处理的都new成线程进入到线程池排队等待执行
private static ExecutorService pool =Executors.newFixedThreadPool(10);//创建线程池,大小为10个.

if(!infoIdQueue.contains(gInfo)){
                    infoIdQueue.offer(gInfo);
                    Runnable r = new GenerateStaticHtml(gInfo);
                    // 将线程放入池中进行执行
                    pool.execute(r);
                }



public class GenerateStaticHtml implements Runnable {
    private final GenerateInfo info;
    public GenerateStaticHtml(GenerateInfo info) {
        this.info = info;
    }
    @Override
    public void run() {
        try {
        //业务逻辑
        } catch (Exception e) {
            e.printStackTrace();
        }
         System.out.println("==");
    }

}
//部分源码分析
/**
 * Factory and utility methods for {@link Executor}, {@link
 * ExecutorService}, {@link ScheduledExecutorService}, {@link
 * ThreadFactory}, and {@link Callable} classes defined in this
 * package. This class supports the following kinds of methods:
 *
 * <ul>
 *   <li> Methods that create and return an {@link ExecutorService}
 *        set up with commonly useful configuration settings.
 *   <li> Methods that create and return a {@link ScheduledExecutorService}
 *        set up with commonly useful configuration settings.
 *   <li> Methods that create and return a "wrapped" ExecutorService, that
 *        disables reconfiguration by making implementation-specific methods
 *        inaccessible.
 *   <li> Methods that create and return a {@link ThreadFactory}
 *        that sets newly created threads to a known state.
 *   <li> Methods that create and return a {@link Callable}
 *        out of other closure-like forms, so they can be used
 *        in execution methods requiring <tt>Callable</tt>.
 * </ul>
 *
 * @since 1.5
 * @author Doug Lea
 */
public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

**

//LinkedBlockingQueue是一个基于已链接节点的、范围任意的blocking queue的实现.里面存放线程

**