带你快速搞定java并发库

时间:2022-11-20 19:00:09

一、总览

计算机程序 = 数据 + 算法。

并发编程的一切根本原因是为了保证数据的正确性,线程的效率性。

Java并发库共分为四个大的部分,如下图

带你快速搞定java并发库

Executor 和 future 是为了保证线程的效率性

Lock 和数据结构 是为了维持数据的一致性。

Java并发编程的时候,思考顺序为,

对自己的数据要么加锁。要么使用提供的数据结构,保证数据的安全性

调度线程的时候使用Executor更好的调度。

二、Executor总览

带你快速搞定java并发库

Executor 提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。

相当于manager,老板让manager去执行一件任务,具体的是谁执行,什么时候执行,就不管了。

看上图的继承关系,介绍几个

带你快速搞定java并发库

内置的线程池基本上都在这里

newScheduledThreadPool 定时执行的线程池

newCachedThreadPool 缓存使用过的线程

newFixedThreadPool 固定数量的线程池

newWorkStealingPool 将大任务分解为小任务的线程池

带你快速搞定java并发库

三、继承结构

构造函数

包含一个定时的service

?
1
2
3
4
5
6
7
8
9
10
11
12
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
static class DelegatedScheduledExecutorService
        extends DelegatedExecutorService
        implements ScheduledExecutorService {
    private final ScheduledExecutorService e;
    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        e = executor;
    }

四、怎么保证只有一个线程

定时执行的时候调用这个方法,调用过程如下,注意看其中的注释,由上往下的调用顺序

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    //  延迟执行
    delayedExecute(t);
    return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 加入任务队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 确保执行
            ensurePrestart();
    }
}
// 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

五、怎么保证时间可以定时执行

?
1
2
3
4
5
6
7
8
9
10
11
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

在每次执行的时候会把下一次执行的时间放进任务中

?
1
2
3
4
5
6
7
8
9
10
private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
 * Returns the trigger time of a delayed action.
 */
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

FutureTask 定时是通过LockSupport.parkNanos(this, nanos);LockSupport.park(this);

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //注意这里
            LockSupport.parkNanos(this, nanos);
        }
        else //注意这里
            LockSupport.park(this);
    }
}

总结:Executor是通过将任务放在队列中,生成的futureTask。然后将生成的任务在队列中排序,将时间最近的需要出发的任务做检查。如果时间不到,就阻塞线程到下次出发时间。

注意:newSingleThreadScheduledExecutor只会有一个线程,不管你提交多少任务,这些任务会顺序执行,如果发生异常会取消下面的任务,线程池也不会关闭,注意捕捉异常

六、使用

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor();
Runnable runnable1 = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("11111111111111");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
Runnable runnable2 = () -> {
    try {
        Thread.sleep(4000);
        System.out.println("222");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS);
single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);

11111111111111 222 11111111111111 222 11111111111111

在项目中要注意关闭线程池

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
actionService = Executors.newSingleThreadScheduledExecutor();
        actionService.scheduleWithFixedDelay(() -> {
            try {
                Thread.currentThread().setName("robotActionService");
                Integer robotId = robotQueue.poll();
                if (robotId == null) {
                    //    关闭线程池
                    actionService.shutdown();
                } else {
                    int aiLv = robots.get(robotId);
                    if (actionQueueMap.containsKey(aiLv)) {
                        ActionQueue actionQueue = actionQueueMap.get(aiLv);
                        actionQueue.doAction(robotId);
                    }
                }
            } catch (Exception e) {
                //    捕捉异常
                LOG.error("",e);
            }
        }, 1, 1, TimeUnit.SECONDS);

总结

本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注服务器之家的更多内容!

原文链接:https://gamwatcher.blog.csdn.net/article/details/88406100