Android平台多线程实现生产者-消费者模型

时间:2021-05-21 17:59:38

本示例利用线程容器-ThreadPoolExecutor 运行消费者任务线程,基于公平锁机制,控制消费者线程的中断(公平锁相对非公平锁在性能上会有所牺牲,但在执行诸如下载大文件这样的耗时任务时,能体现出其安全稳定的特性)

import android.app.Activity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;

import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.WeakHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MainActivity extends Activity {

    //防止重复任务锁
    final Map<String, ReentrantLock> locks = new WeakHashMap<>();
    Executor executor;

    final Map<String, DownloadOptions> allDown = new WeakHashMap<>();
    Random random = new Random();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        initExecutorIfNeed();
    }

    //初始化线程容器
    private void initExecutorIfNeed() {
        if (executor == null || ((ExecutorService) executor).isShutdown()) {
            executor = new ThreadPoolExecutor(3, 5, 100, TimeUnit.MILLISECONDS, new
                    LinkedBlockingQueue<Runnable>());
        }
    }

    public void setstatus(View view) {
        if (view.getId() == R.id.newtask) {
            //开启新任务,即生产者
            initExecutorIfNeed();
            DownloadOptions downloadOptions = new DownloadOptions(true, "downloadconsumer" + random
                    .nextInt(100), new Random().nextInt(100) * 1000);
            allDown.put(downloadOptions.url, downloadOptions);
            executor.execute(new DownloadConsumer(this, downloadOptions));
        } else if (view.getId() == R.id.stopall) {
            //停止所有任务
            stopAll();
        }
    }

    private void stopAll() {
        Iterator iterator = allDown.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, DownloadOptions> entry = (Map.Entry<String, DownloadOptions>) iterator.next();
            entry.getValue().set(false);
            allDown.put(entry.getKey(), null);
            iterator.remove();
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        stopAll();
        ((ExecutorService) executor).shutdownNow();
        locks.clear();
    }

    ReentrantLock getReentrantLock(String url) {
        ReentrantLock reentrantLock = locks.get(url);
        if (reentrantLock == null) {
            reentrantLock = new ReentrantLock();
            locks.put(url, reentrantLock);
        }
        return reentrantLock;
    }

    private static class DownloadOptions {
        //控制下载任务的状态
        private volatile boolean status;
        final String url;
        final long sometimeForDownload;
        final ReentrantReadWriteLock lock;
        final ReentrantReadWriteLock.WriteLock writeLock;
        final ReentrantReadWriteLock.ReadLock readLock;

        public DownloadOptions(boolean status, String url, long sometimeForDownload) {
            this.status = status;
            this.url = url;
            this.sometimeForDownload = sometimeForDownload;
            //利用公平锁,防止消费者线程无法被中途终止的情况
            this.lock = new ReentrantReadWriteLock(true);
            this.writeLock = this.lock.writeLock();
            this.readLock = this.lock.readLock();
        }

        public boolean get() {
            return status;
        }

        public void set(boolean status) {
            if (this.lock.isWriteLocked()) {
                Log.d("log", "ReentrantReadWriteLock has been locked");
                return;
            }
            Log.d("log", "try to get the writeLock " + status);
            this.writeLock.lock();
            this.status = status;
            this.writeLock.unlock();
        }

        public ReentrantReadWriteLock.ReadLock getReadLock() {
            return readLock;
        }
    }

    //消费者线程,用于执行下载任务
    private static class DownloadConsumer implements Runnable {

        final MainActivity activity;
        //下载选项
        final DownloadOptions options;

        final String url;
        //模拟下载所耗费的时间
        final long sometimeForDownload;

        public DownloadConsumer(MainActivity activity, DownloadOptions options) {
            this.activity = activity;
            this.options = options;
            this.url = options.url;
            this.sometimeForDownload = options.sometimeForDownload;
        }

        @Override
        public void run() {
            Log.d("log", "consumer for " + url + " ready go");
            if (activity == null) {
                return;
            }
            ReentrantLock lock = activity.getReentrantLock(url);
            if (lock.isLocked()) {
                Log.d("log", "ReentrantLock has been locked");
            }

            lock.lock();
            long starttime = System.currentTimeMillis();

            try {
                while ((System.currentTimeMillis() - starttime) < sometimeForDownload) {
                    ReentrantReadWriteLock.ReadLock readLock = options.getReadLock();
                    readLock.lock();
                    boolean status = options.get();
                    readLock.unlock();
                    if (!status) {
                        Log.d("log", "consumer for " + url + " received the stop order");
                        break;
                    }
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                lock.unlock();
            }

            Log.d("log", "consumer for " + url + " has been finished");
        }
    }


}

运行效果图:

Android平台多线程实现生产者-消费者模型Android平台多线程实现生产者-消费者模型


Android平台多线程实现生产者-消费者模型Android平台多线程实现生产者-消费者模型