ScheduledExecutorService任务比预期的运行晚

时间:2022-01-21 14:37:57

I'm running tasks periodically and to provide flexibility for the intervals, the next timeout is calculated at the end of each task, converted to milliseconds from Instant.now(), and scheduled using ScheduledExecutorService#schedule.

我定期运行任务,为提供间隔的灵活性,在每个任务结束时计算下一个超时,并将其转换为从instance .now()到毫秒,并使用ScheduledExecutorService#schedule进行调度。

This code is generally working fine (blue curve on the left), but other days not so good.

这段代码通常工作得很好(左边的蓝色曲线),但是其他日子就不那么好了。

ScheduledExecutorService任务比预期的运行晚

It appears to me that things sometimes go bad at startup (machines are restarted every night) and though that the program should, and does, correct itself ScheduledExecutorService#schedule does not recover and scheduled tasks run late all the time. It appears that a full JVM restart is the only solution.

在我看来,有些事情在启动时有时会变坏(机器每天晚上都重新启动),尽管程序应该这样做,但是,正确的日程安排并没有恢复,而且计划的任务总是会超时。看来,完整的JVM重启是唯一的解决方案。

My initial thoughts was that this was a bug and depending of the time of the machine startup, things could go wrong. But the following log output demonstrates that the problem is related to my usage of ScheduledExecutorService#schedule :

我最初的想法是,这是一个错误,取决于机器启动的时间,事情可能会出错。但是下面的日志输出表明这个问题与我使用ScheduledExecutorService#schedule有关:

// Log time in GMT+2, other times are in GMT
// The following lines are written following system startup (all times are correct)
08 juin 00:08:49.993 [main] WARN  com.pgscada.webdyn.Webdyn - Scheduling next webdyn service time. Currently 2018-06-07T22:08:49.993Z, last connection null
08 juin 00:08:50.586 [main] INFO  com.pgscada.webdyn.Webdyn - The next data sample at 2018-06-07T22:10:00Z and the next FTP connection at 2018-06-07T22:30:00Z
08 juin 00:08:50.586 [main] WARN  com.pgscada.webdyn.Webdyn - Completed webdyn schedule in 9ms, next execution at 2018-06-07T22:10:00Z (in 69414 ms) will run as data-sample
// So we are expecting the next execution to occur at 00:10:00 (or in 69.4 seconds)
// Except that it runs at 00:11:21
08 juin 00:11:21.206 [pool-1-thread-4] INFO  com.pgscada.webdyn.Webdyn - Executing Webdyn service, isDataSample=true, isFtpConnection=false, nextTimeout=2018-06-07T22:10:00Z, lastFtpConnection=null
// But thats OK because it should correct itself
08 juin 00:13:04.151 [pool-1-thread-4] WARN  com.pgscada.webdyn.Webdyn - Scheduling next webdyn service time. Currently 2018-06-07T22:10:00Z, last connection null
08 juin 00:13:04.167 [pool-1-thread-4] INFO  com.pgscada.webdyn.Webdyn - The next data sample at 2018-06-07T22:20:00Z and the next FTP connection at 2018-06-07T22:30:00Z
08 juin 00:13:04.167 [pool-1-thread-4] WARN  com.pgscada.webdyn.Webdyn - Completed webdyn schedule in 0ms, next execution at 2018-06-07T22:20:00Z (in 415833 ms) will run as data-sample
// So now we are expecting the next execution to occur at 00:20:00 (or in 415.8 seconds)
// But it runs at 00:28:06
08 juin 00:28:06.145 [pool-1-thread-4] INFO  com.pgscada.webdyn.Webdyn - Executing Webdyn service, isDataSample=true, isFtpConnection=false, nextTimeout=2018-06-07T22:20:00Z, lastFtpConnection=null

Below is actual production code of the scheduling function.

下面是调度函数的实际生产代码。

ScheduledExecutorService EXECUTORS = Executors.newScheduledThreadPool(10);


private void scheduleNextTimeout(Instant currentTime, Instant lastFtpConnection) {

    try {

        log.info("Scheduling next webdyn service time. Currently {}, last connection {}", currentTime, lastFtpConnection);

        // Parse config files first
        getConfigIni().parse();

        long time = System.nanoTime();
        final Instant earliestPossibleTimeout = Instant.now().plusSeconds(5); 

        Instant nextDataSample = nextTimeout(currentTime);

        if (nextDataSample.isBefore(earliestPossibleTimeout)) {
            final Instant oldTime = nextDataSample;
            nextDataSample = nextTimeout(earliestPossibleTimeout);
            log.warn("Next data sample was calculated to a time in the past '{}', resetting to a future time: {}", oldTime, nextDataSample);
        }

        Instant nextFtp = nextFtpConnection(currentTime, lastFtpConnection);

        if (nextFtp.isBefore(earliestPossibleTimeout)) {
            final Instant oldTime = nextFtp;
            nextFtp = nextFtpConnection(earliestPossibleTimeout, lastFtpConnection);
            log.warn("Next FTP connection was calculated to a time in the past '{}', resetting to a future time: {}", oldTime, nextFtp);
        }

        final boolean isFtpConnection = !nextDataSample.isBefore(nextFtp);
        final boolean isDataSample = !isFtpConnection || nextDataSample.equals(nextFtp);
        log.info("The next data sample at {} and the next FTP connection at {}", nextDataSample, nextFtp);

        final Instant nextTimeout = nextDataSample.isBefore(nextFtp) ? nextDataSample : nextFtp;
        final long millis = Duration.between(Instant.now(), nextTimeout).toMillis();
        EXECUTORS.schedule(() -> {
            log.info("Executing Webdyn service, isDataSample={}, isFtpConnection={}, nextTimeout={}, lastFtpConnection={}",
                    isDataSample, isFtpConnection, nextTimeout, lastFtpConnection);

            long tme = System.nanoTime();
            try {
                connect(isDataSample, isFtpConnection, nextTimeout, lastFtpConnection);
                log.warn("Completed webdyn service in {}s", (System.nanoTime() - tme) / 1000000);
            } catch (final Throwable ex) {
                log.error("Failed webdyn service after {}ms : {}", (System.nanoTime() - tme) / 1000000, ex.getMessage(), ex);
            } finally {
                scheduleNextTimeout(nextTimeout, isFtpConnection ? nextTimeout : lastFtpConnection);
            }
        }, millis, TimeUnit.MILLISECONDS);

        log.warn("Completed webdyn schedule in {}ms, next execution at {} (in {} ms) will run as {}",
                (System.nanoTime() - time) / 1000000, nextTimeout, millis, isFtpConnection ? "ftp-connection" : "data-sample");

    } catch (final Throwable ex) {
        log.error("Fatal error in webdyn schedule : {}", ex.getMessage(), ex);
    }
}

1 个解决方案

#1


5  

As stated in my comment bellow the question, the issue here is that there is a shared, mutable and not-thread-safe resource (EXECUTORS atribute) altered by more than one thread. It is altered by main thread at start and whichever thread is used from the pool for task execution.

正如我在评论中所指出的,这里的问题是,有一个共享的、可变的和非线程安全的资源(executor atribute)被不止一个线程所改变。它在开始时由主线程更改,而任务执行时从池中使用的任何线程都将更改。

The thing to note is, that even when you have only one thread accessing a shared resource at a time (simply because there is only one task running at a time), you still need to think about concurency. It is because without synchronization Java Memory Model does not guarantee the changes made by one thread to be ever visible to other threads, no matter how much later they run.

需要注意的是,即使一次只有一个线程访问共享资源(仅仅因为一次只运行一个任务),您仍然需要考虑并发性。这是因为没有同步Java内存模型不能保证一个线程所做的更改对其他线程来说是可见的,不管它们运行的时间有多晚。

So the solution would be making method scheduleNextTimeout synchronized, thus guaranteeing changes not to be kept local to executing thread and written to main memory.

因此,解决方案将是使方法调度超时同步,从而保证在执行线程和写入主内存时不会将更改保持在本地。

You also could make a synchronized block (synchronized on "this") around the part, which makes access to shared resource, but since the system doesn't seem to be heavy duty one and the rest of code doesn't seem to take a long time, there is no need for that...

您还可以在部件周围创建一个同步的块(在“this”上进行同步),这样就可以访问共享资源,但是由于系统看起来不是很重要,而且其他的代码似乎也不需要很长时间,所以不需要这么做……

Here is very gist of it in nice and short article I once learned from, when confronted with this sort of issues for the first time :) https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#jsr133

当我第一次遇到这类问题时,我曾在一篇短小精悍的文章中了解到这一点:)https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#jsr133

I'm glad I could help.

我很高兴我能帮上忙。

#1


5  

As stated in my comment bellow the question, the issue here is that there is a shared, mutable and not-thread-safe resource (EXECUTORS atribute) altered by more than one thread. It is altered by main thread at start and whichever thread is used from the pool for task execution.

正如我在评论中所指出的,这里的问题是,有一个共享的、可变的和非线程安全的资源(executor atribute)被不止一个线程所改变。它在开始时由主线程更改,而任务执行时从池中使用的任何线程都将更改。

The thing to note is, that even when you have only one thread accessing a shared resource at a time (simply because there is only one task running at a time), you still need to think about concurency. It is because without synchronization Java Memory Model does not guarantee the changes made by one thread to be ever visible to other threads, no matter how much later they run.

需要注意的是,即使一次只有一个线程访问共享资源(仅仅因为一次只运行一个任务),您仍然需要考虑并发性。这是因为没有同步Java内存模型不能保证一个线程所做的更改对其他线程来说是可见的,不管它们运行的时间有多晚。

So the solution would be making method scheduleNextTimeout synchronized, thus guaranteeing changes not to be kept local to executing thread and written to main memory.

因此,解决方案将是使方法调度超时同步,从而保证在执行线程和写入主内存时不会将更改保持在本地。

You also could make a synchronized block (synchronized on "this") around the part, which makes access to shared resource, but since the system doesn't seem to be heavy duty one and the rest of code doesn't seem to take a long time, there is no need for that...

您还可以在部件周围创建一个同步的块(在“this”上进行同步),这样就可以访问共享资源,但是由于系统看起来不是很重要,而且其他的代码似乎也不需要很长时间,所以不需要这么做……

Here is very gist of it in nice and short article I once learned from, when confronted with this sort of issues for the first time :) https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#jsr133

当我第一次遇到这类问题时,我曾在一篇短小精悍的文章中了解到这一点:)https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#jsr133

I'm glad I could help.

我很高兴我能帮上忙。