本示例利用线程容器-ThreadPoolExecutor 运行消费者任务线程,基于公平锁机制,控制消费者线程的中断(公平锁相对非公平锁在性能上会有所牺牲,但在执行诸如下载大文件这样的耗时任务时,能体现出其安全稳定的特性)
import android.app.Activity; import android.os.Bundle; import android.util.Log; import android.view.View; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.WeakHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class MainActivity extends Activity { //防止重复任务锁 final Map<String, ReentrantLock> locks = new WeakHashMap<>(); Executor executor; final Map<String, DownloadOptions> allDown = new WeakHashMap<>(); Random random = new Random(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); initExecutorIfNeed(); } //初始化线程容器 private void initExecutorIfNeed() { if (executor == null || ((ExecutorService) executor).isShutdown()) { executor = new ThreadPoolExecutor(3, 5, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } } public void setstatus(View view) { if (view.getId() == R.id.newtask) { //开启新任务,即生产者 initExecutorIfNeed(); DownloadOptions downloadOptions = new DownloadOptions(true, "downloadconsumer" + random .nextInt(100), new Random().nextInt(100) * 1000); allDown.put(downloadOptions.url, downloadOptions); executor.execute(new DownloadConsumer(this, downloadOptions)); } else if (view.getId() == R.id.stopall) { //停止所有任务 stopAll(); } } private void stopAll() { Iterator iterator = allDown.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, DownloadOptions> entry = (Map.Entry<String, DownloadOptions>) iterator.next(); entry.getValue().set(false); allDown.put(entry.getKey(), null); iterator.remove(); } } @Override protected void onDestroy() { super.onDestroy(); stopAll(); ((ExecutorService) executor).shutdownNow(); locks.clear(); } ReentrantLock getReentrantLock(String url) { ReentrantLock reentrantLock = locks.get(url); if (reentrantLock == null) { reentrantLock = new ReentrantLock(); locks.put(url, reentrantLock); } return reentrantLock; } private static class DownloadOptions { //控制下载任务的状态 private volatile boolean status; final String url; final long sometimeForDownload; final ReentrantReadWriteLock lock; final ReentrantReadWriteLock.WriteLock writeLock; final ReentrantReadWriteLock.ReadLock readLock; public DownloadOptions(boolean status, String url, long sometimeForDownload) { this.status = status; this.url = url; this.sometimeForDownload = sometimeForDownload; //利用公平锁,防止消费者线程无法被中途终止的情况 this.lock = new ReentrantReadWriteLock(true); this.writeLock = this.lock.writeLock(); this.readLock = this.lock.readLock(); } public boolean get() { return status; } public void set(boolean status) { if (this.lock.isWriteLocked()) { Log.d("log", "ReentrantReadWriteLock has been locked"); return; } Log.d("log", "try to get the writeLock " + status); this.writeLock.lock(); this.status = status; this.writeLock.unlock(); } public ReentrantReadWriteLock.ReadLock getReadLock() { return readLock; } } //消费者线程,用于执行下载任务 private static class DownloadConsumer implements Runnable { final MainActivity activity; //下载选项 final DownloadOptions options; final String url; //模拟下载所耗费的时间 final long sometimeForDownload; public DownloadConsumer(MainActivity activity, DownloadOptions options) { this.activity = activity; this.options = options; this.url = options.url; this.sometimeForDownload = options.sometimeForDownload; } @Override public void run() { Log.d("log", "consumer for " + url + " ready go"); if (activity == null) { return; } ReentrantLock lock = activity.getReentrantLock(url); if (lock.isLocked()) { Log.d("log", "ReentrantLock has been locked"); } lock.lock(); long starttime = System.currentTimeMillis(); try { while ((System.currentTimeMillis() - starttime) < sometimeForDownload) { ReentrantReadWriteLock.ReadLock readLock = options.getReadLock(); readLock.lock(); boolean status = options.get(); readLock.unlock(); if (!status) { Log.d("log", "consumer for " + url + " received the stop order"); break; } try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } Log.d("log", "consumer for " + url + " has been finished"); } } }
运行效果图: