Java高并发程序设计笔记8之线程池

时间:2022-12-03 17:57:03
       简单的线程池实现。网上有好多demo,至于为什么需要线程池,由于每次系统调用都会创建一个线程的话,系统的开销比较大
,如果用一个线程池来复用线程的话,可以有效避免系统的开销

JDK内置线程池

Java高并发程序设计笔记8之线程池

线程池的种类
newFixedThreadPool 
newSingleThreadExecutor 
newCachedThreadPool
newScheduledThreadPool

下面是简单的列子

package com.cosmistar.jk;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by Administrator on 2016/11/20.
*/
public class ThreadPoolDemo {
public static void main(String args[]){
// ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorService threadPool = Executors.newSingleThreadExecutor();
TaskInfo t1 = new TaskInfo("MT1");
TaskInfo t2 = new TaskInfo("MT2");
TaskInfo t3 = new TaskInfo("MT3");
threadPool.execute(t1);
threadPool.execute(t2);
threadPool.execute(t3);
}
}
class TaskInfo implements Runnable{
private String name;
public TaskInfo(String name){
this.name = name;
}
public void run(){
System.out.println("\n========="+name+"开始执行===========");
for(int i = 0;i<30;i++){
System.out.println("["+name+"_"+i+"]");
}
System.out.println("\n========="+name+"执行结束===========");
}
}

扩展和增强线程池(回调接口)
beforeExecute 
afterExecute
terminated

ForkJoin

使用接口
RecursiveAction
RecursiveTask

Java高并发程序设计笔记8之线程池

下面转载博客http://blog.csdn.net/qq58831588/article/details/45045363

我们创建了ForkJoinPool对象,和一个将在线程池中执行的ForkJoinTask的子类。我们使用了无参数的构造ForkJoinPool对象,因此它将执行默认的配置。创建一个线程数等于计算机CPU数目的线程池,创建好ForkJoinPool对象之后,那些线程也创建就绪了,在线程池中等待任务的到达,然后开始执行。
由于Task类继承了RecursiveAction类,因此不返回结果。在这里,我们使用了推荐的结构来实现任务。如果任务需要更新大于10个产品,它将拆分这些元素为两部分,创建两个任务,并将拆分的部分相应地分配给新创建的任务。通过使用Task类的First和last属性,来获知任务将要更新的产品列表所在的位置范围。我们已经使用first和last属性,来操作产品列表中仅有的一份副本,而没有为每一个任务去创建不同的产品列表。
调用invokeAll()方法来执行一个主任务所创建的多个子任务。这是一个同步调用,这个任务将等待子任务完成,然后继续执行(或者是结束)。当一个主任务等待它的子任务时,执行这个任务的工作者线程接受另一个等待执行的任务并开始执行。正因为有了这个行为,所以说Fork/join框架提供了一种比Runnable和Callable对象更加高效的任务管理机制.
ForkJoinTask类的invokeAll()方法是执行器框架和Fork/join框架之间的主要差异之一。在执行器框架中,所有的任务必须发送给执行器,线程池中包含了待执行方法的任务,任务的控制也是在线程池中进行的,我们在Task类中使用了invokeAll()方法,Task继承了RecursiveAction类,而RecursiveAction继承了ForkJoinTask类。

举个例子

实现一项更新产品价格的任务。最初的任务将负责更新列表中的所有元素。我们使用10来做为参考大小,如果一个任务需要更新大于10个元素,它会将这个列表分解成两部分,然后分别创建两个任务用来更新各自部分的产品价格。

package com.xingfu.wx_1;  
/**
* 存储产品的名称和价格
* @author W,x
* @version 创建时间:2015年4月14日 下午4:10:26
*
*/
public class Product {

private String name;
private double price;
public Product(String name, double price) {
super();
this.name = name;
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}

}
创建一个ProductListGenerator的类,用来生成一个随机产品列表

package com.xingfu.wx_1;    import java.util.ArrayList;  import java.util.List;    /**  *   随机生成产品列表  * @author W,x  * @version 创建时间:2015年4月14日 下午4:11:50  *   */  public class ProductListGenerator {      public List<Product> generate(int size){          List<Product>ret=new ArrayList<Product>();          Product product;          for(int i=0;i<size;i++){              product=new Product("Product "+i, 10);              ret.add(product);          }          return ret;      }        }  
创建一个Task的类,并继承RecursiveAction类
package com.xingfu.wx_1;    import java.util.List;  import java.util.concurrent.RecursiveAction;    import com.xingfu.wx.Utils;    /**  *  * @author W,x  * @version 创建时间:2015年4月14日 下午4:14:06  *   */  public class Task extends RecursiveAction{        /**      *       */      private static final long serialVersionUID = 5622480283632727215L;        private List<Product> products;            private int first;            private int last;            /**      * 声明一个increment的私有double属性,用来存储产品价格的增加额      */      private double increment;                  public Task(List<Product> products, int first, int last, double increment) {          super();          this.products = products;          this.first = first;          this.last = last;          this.increment = increment;      }          @Override      protected void compute() {          if(last-first<10){              updatePrices();          }else{              int middle=(last+first)/2;              Utils.print("Task : Pending tasks:%s\n",getQueuedTaskCount());              Task t1=new Task(products,first,middle+1,increment);              Task t2=new Task(products, middle+1,last,increment);              invokeAll(t1,t2);          }      }              private void updatePrices() {          for(int i=first;i<last;i++){              Product product=products.get(i);              product.setPrice(product.getPrice()*(i+increment));          }      }          }  
package com.xingfu.wx_1;    import java.util.List;  import java.util.concurrent.ForkJoinPool;  import java.util.concurrent.TimeUnit;    import com.xingfu.wx.Utils;    /**  *   * @author W,x  * @version 创建时间:2015年4月14日 下午4:19:34  *   */  public class Main {      public static void main(String[] args) {          ProductListGenerator generator = new ProductListGenerator();          //创建10000个产品的列表          List<Product> products = generator.generate(10000);          //创建一个新的Task对象用来更新列表中的所有产品,参数first0,参数last为产品列表的大小。10000          Task task = new Task(products, 0, products.size(), 0.20);          ForkJoinPool pool = new ForkJoinPool();          pool.execute(task);          do {              Utils.print("Main : Thread Count: %d\n", pool.getActiveThreadCount());              Utils.print("Main: Thread Steal:%d\n", pool.getStealCount());              Utils.print("Main: Parallelism:%d\n", pool.getParallelism());              try {                  TimeUnit.MILLISECONDS.sleep(5);              } catch (InterruptedException e) {                  e.printStackTrace();              }          } while (!task.isDone());          // 关闭线程池          pool.shutdown();          // 检查任务已经完成并且没有错误          if (task.isCompletedNormally()) {              Utils.print("Main:The process has completed normally.\n");          }          for (int i = 0; i < products.size(); i++) {              Product product = products.get(i);              if (product.getPrice() != 12) {                  Utils.print("Product %s :%f\n", product.getName(), product.getPrice());                }          }          System.out.println("Main: End of the program.\n");      }  }