Java并发编程(二) - 并发基础

时间:2021-12-23 18:04:20

Java并发编程(二) - 并发基础

1. 概述


这里讲一下Java并发编程的基础知识

2. 线程与进程


2.1 进程
百度百科中是这样介绍进程的:

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

2.2 线程
《Java编发编程的艺术》对线程的定义如下:

现代操作系统调度的最小单位是线程,也叫轻量级进程(Light Weight Process),在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。

3. Java线程的状态


Java中线程状态转换如下图(注:图来自《Java并发编程的艺术》):
Java并发编程(二) - 并发基础

注:阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块是的状态,但是阻塞在concurrent包中的Lock接口的线程状态是等待状态,因为Lock包中的接口的实现依赖于LockSupport类中的相关方法。

4. 线程优先级


4.1 定义
现代操作系统基本采用时分的形式调度运行线程,操作系统会分出一个个时间片,线程会分配到若干个时间片,当线程的时间片用完后就会发生线程调度,并等待着下次分配。线程分配到的时间片多少决定使用线程资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器资源的线程属性。

4.2 Java中的线程优先级
Java中可以通过setPriority(int)方法来设置线程优先级,线程优先级范围是1-10,级别越高优先级越高,默认优先级为5。

Java运行在JVM上,本质上还是要看操作系统。所以在设置优先级后,还是要看操作系统的调度,Java并发编程中设置优先级好像并没有什么用处

5. Daemon线程


Daemon线程又叫后台线程,它主要被用作程序中后台调度以及支持性工作,如:垃圾处理线程就是Daemon线程。在Java中通过调用setDaemon()方法可将线程设置为Daemon线程,如果前台线程(非Daemon线程)执行完毕后,其Daemon线程将直接退出。如下所示:

public class Test {
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Daemon Thread finally");
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
        System.out.println("Main Thread End");
    }
}output:
Main Thread End

从上面我们可以看出:当Main线程退出后,Daemon线程直接退出,连finally块中语句都不会执行。

6. 进程间通信


6.1 Synchronized关键字
关键字Synchronized可以修饰方法或者代码块,它能确保多个线程在同一时刻,只能有一个线程处于方法或者代码块中,它保证可线程对变量访问的可见性以及互斥性。

6.2 等待/通知机制
等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。下面通过消费者/生产者来展示等待/通知机制:

public class Restaurant {
    static AtomicInteger id = new AtomicInteger(0);
    Meal meal = null;
    final Consumer consumer = new Consumer(this);
    final Producer producer = new Producer(this);

    public static void main(String[] args) throws InterruptedException {
        Restaurant restaurant = new Restaurant();
        new Thread(new Consumer(restaurant)).start();
        new Thread(new Producer(restaurant)).start();
    }
}

/** * 消费者 */
class Consumer implements Runnable {
    private final Restaurant restaurant;

    Consumer(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            synchronized (restaurant.consumer) {
                while (restaurant.meal == null) {
                    try {
                        restaurant.consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            synchronized (restaurant.producer) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Consumer Eat Meal: " + restaurant.meal);
                restaurant.meal = null;
                restaurant.producer.notifyAll();
            }
        }
    }
}

/** * 生产者 */
class Producer implements Runnable {
    private final Restaurant restaurant;

    Producer(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            synchronized (restaurant.producer) {
                while (restaurant.meal != null) {
                    try {
                        restaurant.producer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            synchronized (restaurant.consumer) {
                try {
                    TimeUnit.MILLISECONDS.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                restaurant.meal = new Meal(Restaurant.id.incrementAndGet());
                System.out.println("Producer Make Meal: " + restaurant.meal);
                restaurant.consumer.notifyAll();
            }
        }
    }
}

class Meal {
    private final int id;

    Meal(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return String.valueOf(id);
    }
}

等待/通知机制的状态示意图(注:图来自《Java并发编程的艺术》):
Java并发编程(二) - 并发基础

等待/通知的通常写法:
等待方(消费者)

synchronized (对象) {
    while (条件) {
        对象.wait();
    }
    相应的处理逻辑
}

通知者(生产者)

synchronized (对象) {
    改变条件
    对象.notifyAll();
}

这里的等待/通知机制使用的是对象+synchronized的方式,还有一种就是condition+lock的方式,两种方式基本写法一致,下面就是condition+lock方式的消费者/生产者:

public class Restaurant1 {
    static AtomicInteger id = new AtomicInteger(0);
    final Lock lock = new ReentrantLock();
    final Condition eat = lock.newCondition();
    final Condition make = lock.newCondition();
    Meal meal = null;

    public static void main(String[] args) {
        Restaurant1 restaurant1 = new Restaurant1();
        new Thread(new Consumer1(restaurant1)).start();
        new Thread(new Producer1(restaurant1)).start();
    }
}

class Producer1 implements Runnable {

    private final Restaurant1 restaurant1;

    Producer1(Restaurant1 restaurant1) {
        this.restaurant1 = restaurant1;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                restaurant1.lock.lock();
                while (restaurant1.meal == null) {
                    try {
                        restaurant1.eat.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Consumer Eat Meal: " + restaurant1.meal);
                restaurant1.meal = null;
                restaurant1.make.signalAll();
            } finally {
                restaurant1.lock.unlock();
            }
        }
    }
}

class Consumer1 implements Runnable {
    private final Restaurant1 restaurant1;

    Consumer1(Restaurant1 restaurant1) {
        this.restaurant1 = restaurant1;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                restaurant1.lock.lock();
                while (restaurant1.meal != null) {
                    try {
                        restaurant1.make.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                restaurant1.meal = new Meal(Restaurant1.id.incrementAndGet());
                System.out.println("Producer Make Meal: " + restaurant1.meal);
                restaurant1.eat.signalAll();
            } finally {
                restaurant1.lock.unlock();
            }
        }
    }
}

class Meal1 {
    private final int id;

    Meal1(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return String.valueOf(id);
    }
}

注:以上所有的等待/通知都是直接使用等待,还可以使用超时等待机制,方法基本同上,仅仅有一个超时等待的效果,这样就不会一直阻塞了。

7. Join线程


7.1 Join线程的概念
如果一个线程A执行了thread.join()语句,其含义是:当前线程A在thread.join()处阻塞直到thread线程执行终止。

7.2 Join线程的原理
Join线程使用到了等待/通知机制,以下是JDK1.8的代码:

public final synchronized void join(long var1) throws InterruptedException {
        long var3 = System.currentTimeMillis();
        long var5 = 0L;
        if(var1 < 0L) {
            throw new IllegalArgumentException("timeout value is negative");
        } else {
            if(var1 == 0L) {
                // 等待中
                while(this.isAlive()) {
                    this.wait(0L);
                }
            } else {
                // 等待中
                while(this.isAlive()) {
                    long var7 = var1 - var5;
                    if(var7 <= 0L) {
                        break;
                    }

                    this.wait(var7);
                    var5 = System.currentTimeMillis() - var3;
                }
            }

        }
    }

当该线程终止时,JVM会自动调用该线程自己的notifyAll()方法。

8. ThreadLocal


8.1 ThreadLocal的定义
ThreadLocal指的是线程局部变量,即每个线程都有一个以ThreadLocal对象为键,任意对象为值的Map存储结构。也就是说每个线程都可以根据一个ThreadLocal对象查询到绑定在该线程的一个值。

注:具体自己也不太理解ThreadLocal的意义,仅仅知道Spring使用到了ThreadLocal。还有自己之间写动态代理时写了一个事务处理机制,现在看起来写错了,因为一个插入操作,完全没有必要使用事务。

自己在网上搜了一下,根据自己的经验。随便说一下Spring关于事务处理机制(自己没有看过源码,仅仅说说我的大体思路):

  • 首先事务处理在service层。
  • 要使用事务处理,就必须是同一个Connection,也就是说后面所有有关的Dao使用的都是同一个Connection进行数据库操作。
  • Spring在处理多线程环境下通过ThreadLocal找到该线程下的Connection对象,然后使用它进行数据库操作。
  • Spring通过ThreadLocal在线程池中找到一个Connection对象。

注:上面也出现了一个问题就是一个线程仅仅只能有一个Connection,所以还是不太明白Spring怎么处理事务以及管理Connection的。

9. 应用


9.1 自己模拟上面所说的大概思路写的

public class ConnectionManager {
    private static ConnectionPool connectionPool = new ConnectionPool();

    private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
        @Override
        protected Connection initialValue() {
            return connectionPool.fetchConnection(1000);
        }
    };

    public static Connection getConnection() {
        return connectionHolder.get();
    }

    public static void releaseConnection() {
        connectionPool.releaseConnection(connectionHolder.get());
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Connection connection = ConnectionManager.getConnection();
                    System.out.println(connection);
                    releaseConnection();
                }
            });
        }
    }
}

/** * 连接池 */
class ConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();

    ConnectionPool() {
        this(10);
    }

    ConnectionPool(int initialSize) {
        initConnection(initialSize);
    }

    /** * 从连接池中获取Connection:超时等待 * @param mills 等待时间:单位毫秒 */
    Connection fetchConnection(long mills) {
        long future = System.currentTimeMillis() + mills;
        long remaining = mills;
        synchronized (pool) {
            while (pool.isEmpty() && remaining > 0) {
                try {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Connection result = null;
            if (!pool.isEmpty()) {
                result = pool.removeFirst();
            }
            return result;
        }
    }

    /** * 把该Connection放回连接池中 * @param connection connection */
    void releaseConnection(Connection connection) {
        synchronized (pool) {
            pool.addLast(connection);
            pool.notifyAll();
        }
    }

    /** * 初始化连接池 * @param initialSize 连接池大小 */
    private void initConnection(int initialSize) {
        for (int i = 0; i < initialSize; i++) {
            pool.addLast(JDBCUtil.createConnection());
        }
    }
}

/** * JDBC工具类:创建Connection */
class JDBCUtil {
    static Connection createConnection() {
        Connection connection = null;
        String driverClass = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://localhost:3306/hibernate?useUnicode=true&characterEncoding=utf8";
        String user = "root";
        String password = "";
        try {
            Class.forName(driverClass);
            connection = DriverManager.getConnection(url, user, password);
            } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

9.2 一个简单的线程池

class ThreadPool<Job extends Runnable> {
    static final int THREAD_SIZE = 20;
    private ArrayList<Worker> workers = new ArrayList<>();
    private final LinkedList<Job> jobs = new LinkedList<>();

    ThreadPool() {
        this(THREAD_SIZE);
    }

    ThreadPool(int initialSize) {
        initWorker(initialSize);
    }

    /** * 初始化所有的线程 */
    private void initWorker(int initialSize) {
        for (int i = 0; i < initialSize; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker);
      workers.setCurrentThread(thread); thread.start(); } } /** * 执行任务 * @param job 任务 */ public void execute(Job job) { if (job != null) { synchronized (jobs) { jobs.addLast(job); notifyAll(); } } } /** * 关闭线程池 */ public void shutDown() { for (Worker worker : workers) { worker.cancel(); } } /** * Worker:执行任务 */ private class Worker implements Runnable { // 当前线程 private Thread currentThread;     
    public void setCurrentThread(Thread currentThread){
      this.currentThread = currentThread;
    }

@Override public void run() { while (true) { Job job = null; synchronized (jobs) { // 任务队列中没有任务,则等待 while (jobs.isEmpty()) { try { jobs.wait(); } catch (InterruptedException e) { return; } } job = jobs.removeFirst(); } // 执行任务 job.run(); } } /** * 结束该Worker */ public void cancel() { currentThread.interrupt(); } } }

10.References


《Java并发编程的艺术》
彻底理解ThreadLocal