第五章 并行模式与算法
1、单例模式
- 是一种对象创建模式,用于产生一个对象的具体实例,它可以确保系统一个类只产生一个实例。
- 对于频繁创建使用的对象可以省略new 操作花费的时间,可以减少系统开销。
- 由于new 操作的次数减少,系统内存使用频率降低,这将减轻GC压力,缩短GC停顿时间。
- 构造函数私有,instance对象需要是私有且静态的
2、不变模式
核心:一个对象一旦被创建,则它的内部状态将永远不会发生改变。
不变模式的实现:
- 去除所有的setter方法以及所有修改自身属性的方法。
- 属性设为私有,并且final标记,确保其不可修改。
- 确保子类可以重载修改它的行为。
- 有一个可以创建完整对象的构造函数。
使用不变模式的例子:元数据的包装类和String类
3、生产者消费者模式
- 生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务,生产者和消费者之间则通过共享内存缓冲进行通信。
- 生产者消费者模式中的内存缓存区,它做为生产者消费者之间的通信桥梁,避免了生产者和消费者之间的直接通信,从而将生产者和消费者进行解耦。
- 缓冲区的主要功能是数据在多线程间的共享,此外通过该缓冲区可以缓解生产者和消费者间的性能差异。
共享数据:
package com.ecut.pattern; public class PCData { private final int intData; public PCData(String s) { intData = Integer.valueOf(s); } public PCData(int i) { intData = i; } @Override public String toString() { return "PCdata:" + intData; } }
生产者:
package com.ecut.pattern; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { private volatile boolean isRunning = true; private AtomicInteger count = new AtomicInteger(); private BlockingQueue<PCData> queue; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { while (isRunning) { PCData data = new PCData(count.incrementAndGet()); if (!queue.offer(data)) { System.out.println(Thread.currentThread().getName() + "failed to put data:" + data); } else { System.out.println(Thread.currentThread().getName() + " product data :" + data); } } } public void stop() { isRunning = false; } }
消费者:
package com.ecut.pattern; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue<PCData> queue; public Consumer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { try { while (true) { PCData pcData = queue.take(); if (pcData != null) { System.out.println(Thread.currentThread().getName() + " consumer data " + pcData); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
Main函数:
package com.ecut.pattern; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueue<PCData> blockingQueue = new LinkedBlockingDeque<PCData>(10); Producer producer1 = new Producer(blockingQueue); Producer producer2 = new Producer(blockingQueue); Producer producer3 = new Producer(blockingQueue); Consumer consumer1 = new Consumer(blockingQueue); Consumer consumer2 = new Consumer(blockingQueue); Consumer consumer3 = new Consumer(blockingQueue); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(producer1); executorService.execute(producer2); executorService.execute(producer3); executorService.execute(consumer1); executorService.execute(consumer2); executorService.execute(consumer3); Thread.sleep(10000); producer1.stop(); producer2.stop(); producer3.stop(); executorService.shutdown(); } }
4、Future模式
- Future模式的核心是异步调用,可以被调用者立即返回,让他在后台慢慢处理这个请求。调用者可以先处理其他任务。
- RunnableFuture继承了Future和Runnable接口,其中run方法用于构造真实数据,它有一个具体实现类FutureTask类。FutureTask类内部run方法使调用Callable接口的run方法。
package com.ecut.pattern; import java.util.concurrent.Callable; public class RealData implements Callable<String> { private String para; public RealData(String para) { this.para = para; } @Override public String call() throws Exception { StringBuffer stringBuffer = new StringBuffer(); for(int i = 0 ; i< 10 ; i++){ stringBuffer.append(para); } return stringBuffer.toString(); } }
FutureTest类:
package com.ecut.pattern; import java.util.concurrent.*; public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { //构造futureTask FutureTask<String> futureTask = new FutureTask<String>(new RealData("a")); ExecutorService executorService = Executors.newSingleThreadExecutor(); //执行futureTask实际上是调用的RealData的call方法 executorService.submit(futureTask); System.out.println("请求完毕!"); //异步调用,因此这里可以进行其他的业务处理 Thread.sleep(2000); //如果call方法没有执行完则依然等待 System.out.println("数据为" + futureTask.get()); } }
运行结果如下:
请求完毕! 数据为aaaaaaaaaa
5、并行流水线
并行流水线:将有依赖的操作分配在不同的线程进行计算。
package com.ecut.parallel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Pipeline { public static class Msg { public double i; public double j; public String orgStr = null; } public static class Plus implements Runnable { public static BlockingQueue<Msg> blockingQueue = new LinkedBlockingQueue<>(); @Override public void run() { try { Msg msg = blockingQueue.take(); msg.j = msg.i + msg.j; Multiply.blockingQueue.add(msg); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class Multiply implements Runnable { public static BlockingQueue<Msg> blockingQueue = new LinkedBlockingQueue<>(); @Override public void run() { try { Msg msg = blockingQueue.take(); msg.j = msg.j * msg.i; Div.blockingQueue.add(msg); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class Div implements Runnable { public static BlockingQueue<Msg> blockingQueue = new LinkedBlockingQueue<>(); @Override public void run() { try { Msg msg = blockingQueue.take(); msg.j = msg.j / 2; System.out.println(msg.orgStr + "=" + msg.j); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { for (int i = 1; i < 1000; i++) { for (int j = 1; j < 1000; j++) { Msg msg = new Msg(); msg.i = i; msg.j = j; msg.orgStr = "((" + i + "+" + j + ")" + "*" + i + ")/2"; Plus.blockingQueue.add(msg); new Thread(new Plus()).start(); new Thread(new Multiply()).start(); new Thread(new Div()).start(); } } } }
6、并行排序
例子:奇偶排序、希尔排序
源码地址:
https://github.com/SaberZheng/concurrent-test
转载请于明显处标明出处: