《实战Java高并发程序设计》读书笔记五

时间:2021-05-05 23:49:12

第五章 并行模式与算法《实战Java高并发程序设计》读书笔记五

1、单例模式

  • 是一种对象创建模式,用于产生一个对象的具体实例,它可以确保系统一个类只产生一个实例。
  • 对于频繁创建使用的对象可以省略new 操作花费的时间,可以减少系统开销。
  • 由于new 操作的次数减少,系统内存使用频率降低,这将减轻GC压力,缩短GC停顿时间。
  • 构造函数私有,instance对象需要是私有且静态的

2、不变模式

核心:一个对象一旦被创建,则它的内部状态将永远不会发生改变。

不变模式的实现:

  • 去除所有的setter方法以及所有修改自身属性的方法。
  • 属性设为私有,并且final标记,确保其不可修改。
  • 确保子类可以重载修改它的行为。
  • 有一个可以创建完整对象的构造函数。

使用不变模式的例子:元数据的包装类和String类

3、生产者消费者模式

  • 生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务,生产者和消费者之间则通过共享内存缓冲进行通信。
  • 生产者消费者模式中的内存缓存区,它做为生产者消费者之间的通信桥梁,避免了生产者和消费者之间的直接通信,从而将生产者和消费者进行解耦。
  • 缓冲区的主要功能是数据在多线程间的共享,此外通过该缓冲区可以缓解生产者和消费者间的性能差异。

    共享数据:

    package com.ecut.pattern;
    
    public class PCData {
    
        private final int intData;
    
        public PCData(String s) {
            intData = Integer.valueOf(s);
        }
    
        public PCData(int i) {
            intData = i;
        }
    
        @Override
        public String toString() {
    
            return "PCdata:" + intData;
        }
    }

    生产者:

    package com.ecut.pattern;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Producer implements Runnable {
    
        private volatile boolean isRunning = true;
    
        private AtomicInteger count = new AtomicInteger();
    
        private BlockingQueue<PCData> queue;
    
        public Producer(BlockingQueue<PCData> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (isRunning) {
                PCData data = new PCData(count.incrementAndGet());
                if (!queue.offer(data)) {
                    System.out.println(Thread.currentThread().getName() + "failed to put data:" + data);
                } else {
                    System.out.println(Thread.currentThread().getName() + " product data :" + data);
                }
            }
        }
    
        public void stop() {
            isRunning = false;
        }
    }

    消费者:

    package com.ecut.pattern;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer implements Runnable {
    
        private BlockingQueue<PCData> queue;
    
        public Consumer(BlockingQueue<PCData> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    PCData pcData = queue.take();
                    if (pcData != null) {
                        System.out.println(Thread.currentThread().getName() + " consumer data " + pcData);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    Main函数:

    package com.ecut.pattern;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<PCData> blockingQueue = new LinkedBlockingDeque<PCData>(10);
            Producer producer1 = new Producer(blockingQueue);
            Producer producer2 = new Producer(blockingQueue);
            Producer producer3 = new Producer(blockingQueue);
            Consumer consumer1 = new Consumer(blockingQueue);
            Consumer consumer2 = new Consumer(blockingQueue);
            Consumer consumer3 = new Consumer(blockingQueue);
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(producer1);
            executorService.execute(producer2);
            executorService.execute(producer3);
            executorService.execute(consumer1);
            executorService.execute(consumer2);
            executorService.execute(consumer3);
            Thread.sleep(10000);
            producer1.stop();
            producer2.stop();
            producer3.stop();
            executorService.shutdown();
        }
    }

 4、Future模式

  • Future模式的核心是异步调用,可以被调用者立即返回,让他在后台慢慢处理这个请求。调用者可以先处理其他任务。
  • RunnableFuture继承了Future和Runnable接口,其中run方法用于构造真实数据,它有一个具体实现类FutureTask类。FutureTask类内部run方法使调用Callable接口的run方法。
    package com.ecut.pattern;
    
    import java.util.concurrent.Callable;
    
    public class RealData implements Callable<String> {
        private String para;
    
        public RealData(String para) {
            this.para = para;
        }
    
        @Override
        public String call() throws Exception {
            StringBuffer stringBuffer = new StringBuffer();
            for(int i = 0 ; i< 10 ; i++){
                stringBuffer.append(para);
            }
            return stringBuffer.toString();
        }
    }

    FutureTest类:

    package com.ecut.pattern;
    
    import java.util.concurrent.*;
    
    public class FutureTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //构造futureTask
            FutureTask<String> futureTask = new FutureTask<String>(new RealData("a"));
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            //执行futureTask实际上是调用的RealData的call方法
            executorService.submit(futureTask);
            System.out.println("请求完毕!");
            //异步调用,因此这里可以进行其他的业务处理
            Thread.sleep(2000);
            //如果call方法没有执行完则依然等待
            System.out.println("数据为" + futureTask.get());
        }
    }

    运行结果如下:

    请求完毕!
    数据为aaaaaaaaaa

5、并行流水线

 并行流水线:将有依赖的操作分配在不同的线程进行计算。

package com.ecut.parallel;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Pipeline {

    public static class Msg {
        public double i;
        public double j;
        public String orgStr = null;
    }

    public static class Plus implements Runnable {

        public static BlockingQueue<Msg> blockingQueue = new LinkedBlockingQueue<>();

        @Override
        public void run() {
            try {
                Msg msg = blockingQueue.take();
                msg.j = msg.i + msg.j;
                Multiply.blockingQueue.add(msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

    public static class Multiply implements Runnable {

        public static BlockingQueue<Msg> blockingQueue = new LinkedBlockingQueue<>();

        @Override
        public void run() {
            try {
                Msg msg = blockingQueue.take();
                msg.j = msg.j * msg.i;
                Div.blockingQueue.add(msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class Div implements Runnable {

        public static BlockingQueue<Msg> blockingQueue = new LinkedBlockingQueue<>();

        @Override
        public void run() {
            try {
                Msg msg = blockingQueue.take();
                msg.j = msg.j / 2;
                System.out.println(msg.orgStr + "=" + msg.j);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 1; i < 1000; i++) {
            for (int j = 1; j < 1000; j++) {
                Msg msg = new Msg();
                msg.i = i;
                msg.j = j;
                msg.orgStr = "((" + i + "+" + j + ")" + "*" + i + ")/2";
                Plus.blockingQueue.add(msg);
                new Thread(new Plus()).start();
                new Thread(new Multiply()).start();
                new Thread(new Div()).start();
            }
        }


    }
}

6、并行排序

例子:奇偶排序、希尔排序

源码地址:

https://github.com/SaberZheng/concurrent-test

转载请于明显处标明出处:

https://www.cnblogs.com/AmyZheng/p/10481967.html