Java:Async异步代码的实现方式

时间:2020-12-13 00:43:35

异步的八种实现方式

  • 线程Thread
  • Future
  • 异步框架CompletableFuture
  • Spring注解@Async
  • Spring ApplicationEvent事件
  • 消息队列
  • 第三方异步框架,比如Hutool的ThreadUtil
  • Guava异步

继承Thread

package com.example;

public class AsyncThread extends Thread {
    @Override
    public void run() {
        System.out.println("current thread name " + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.start();
        // current thread name Thread-0
    }
}

使用线程池

package com.example;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncThread extends Thread {
    // 线程池
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    @Override
    public void run() {
        System.out.println("current thread name " + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        executorService.submit(new AsyncThread());
        // current thread name pool-1-thread-1
        // 进程卡住,没有结束
    }
}

Future

可以获取执行结果

package com.example;

import java.util.concurrent.*;

public class FutureTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        Future<String> future = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("start");
                Thread.sleep(3000);
                System.out.println("end");
                return "success";
            }
        });

        // 等待任务结束,获取返回值
        String result = future.get();
        System.out.println(result);
    }
}

CompletableFuture

package com.example;

import java.util.concurrent.*;

public class AsyncTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> result1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return "success";
        });

        CompletableFuture<Void> result2 = result1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread().getName());
        });

        System.out.println(result1.get());
        System.out.println(result2.get());

        // 输出结果
        // ForkJoinPool.commonPool-worker-1
        // success
        // ForkJoinPool.commonPool-worker-1
        // null
    }
}

SpringBoot的@Async异步

推荐自定义线程池实现异步

项目结构

$ tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── example
        │           └── demo
        │               ├── Application.java
        │               ├── config
        │               │   └── TaskPoolConfig.java
        │               ├── controller
        │               │   └── IndexController.java
        │               └── service
        │                   ├── AsyncService.java
        │                   └── impl
        │                       └── AsyncServiceImpl.java
        └── resources
            ├── application.yml
            ├── static
            └── templates

pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

TaskPoolConfig.java

package com.example.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;


/**
 * 线程池参数配置
 **/
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * 自定义线程池
     **/
    @Bean
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  : " + i);

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);

        return executor;
    }
}

AsyncService.java

package com.example.demo.service;

public interface AsyncService {
    void sendEmail();
}

AsyncServiceImpl.java

package com.example.demo.service.impl;

import com.example.demo.service.AsyncService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncServiceImpl implements AsyncService {
    @Async
    @Override
    public void sendEmail() {
        try {
            // 模拟耗时3秒
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            System.out.println("Email发送异常");
        }

        System.out.println("Email发送成功");
    }
}

IndexController.java

package com.example.demo.controller;

import com.example.demo.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class IndexController {
    @Autowired
    private AsyncService asyncService;

    @GetMapping("/")
    public String index(){
        asyncService.sendEmail();
        return "success";
    }
}

Application.java

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Spring ApplicationEvent事件

基于上一节的异步配置,实现异步事件

$ tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── example
        │           └── demo
        │               ├── Application.java
        │               ├── config
        │               │   └── TaskPoolConfig.java
        │               ├── controller
        │               │   └── IndexController.java
        │               ├── entity
        │               │   └── User.java
        │               ├── event
        │               │   └── AsyncSendEmailEvent.java
        │               └── listener
        │                   └── AsyncSendEmailListener.java
        └── resources
            ├── application.yml
            ├── static
            └── templates


User.java

package com.example.demo.entity;

import lombok.Data;

@Data
public class User {
    private Integer age;
    private String name;
}

AsyncSendEmailEvent.java

package com.example.demo.event;

import com.example.demo.entity.User;
import org.springframework.context.ApplicationEvent;

/**
 * 自定义事件
 */
public class AsyncSendEmailEvent extends ApplicationEvent {
    private User user;

    public AsyncSendEmailEvent(User user) {
        super(user);
        this.user = user;
    }

    public User getUser() {
        return this.user;
    }
}

AsyncSendEmailListener.java

package com.example.demo.listener;

import com.example.demo.event.AsyncSendEmailEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 事件监听器
 */
@Component
public class AsyncSendEmailListener implements ApplicationListener<AsyncSendEmailEvent> {
    @Async
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {

        System.out.println(event.getUser());

        try {
            Thread.sleep(3 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("邮件发送成功");
    }
}

IndexController.java

package com.example.demo.controller;

import com.example.demo.entity.User;
import com.example.demo.event.AsyncSendEmailEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class IndexController {
    @Autowired
    private ApplicationEventPublisher publisher;

    @GetMapping("/")
    public String index() {
        User user = new User();
        user.setName("Tom");
        user.setAge(20);

        // 发布事件
        publisher.publishEvent(new AsyncSendEmailEvent(user));
        return "success";
    }
}

参考