Java 并发优化

时间:2023-06-04 15:46:49

线程不安全

SimpleDateFormat不是线程安全的

SimpleDateThread

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date; public class SimpleDateThread extends Thread {
private SimpleDateFormat sdf;
private String dateString;
public SimpleDateThread(SimpleDateFormat sdf,String dateString){
this.sdf=sdf;
this.dateString=dateString;
}
public void run(){
try {
Date dateRef=sdf.parse(dateString);
String newDateString=sdf.format(dateRef).toString();
if(!newDateString.equals(dateString)){
System.out.println("ThreadName="+this.getName()+"报错了 日期字符串:"+dateString+"转换成的日期为:"+newDateString);
}else{
System.out.println("ThreadName="+this.getName()+"转换正确 日期字符串:"+dateString+"转换成的日期为:"+newDateString);
}
} catch (ParseException e) {
e.printStackTrace();
}
}
}

SimpleDateDemo

import java.text.SimpleDateFormat;

public class SimpleDateDemo {
public static void main(String[] args){
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
String[] dateStringArray=new String[]{"2000-01-01","2001-01-01","2002-01-01","2003-01-01","2004-01-01","2005-01-01","2006-01-01","2007-01-01","2008-01-01","2009-01-01"};
SimpleDateThread[] threadArray=new SimpleDateThread[10];
for(int i=0;i<10;i++){
threadArray[i]=new SimpleDateThread(sdf,dateStringArray[i]);
}
for(int i=0;i<10;i++){
threadArray[i].start();
}
}
}

运行结果

Java 并发优化

新增DateTools 类

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date; public class DateTools {
public static Date parse(String formatPattern,String dateString)throws ParseException{
return new SimpleDateFormat(formatPattern).parse(dateString);
}
public static String format(String formatPattern,Date date){
return new SimpleDateFormat(formatPattern).format(date).toString();
}
}

将SimpleDateThread中的代码替换

Date dateRef=DateTools.parse("yyyy-MM-dd",dateString);
String newDateString=DateTools.format("yyyy-MM-dd",dateRef).toString();

可解决线程不安全问题。

非阻塞队列

ConcurrentLinkedQueue

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue=new ConcurrentLinkedQueue<Integer>();
private static int count=2;
private static CountDownLatch latch=new CountDownLatch(count); public static void main(String[] args) throws InterruptedException {
long timeStart=System.currentTimeMillis();
ExecutorService es= Executors.newFixedThreadPool(4);
ConcurrentLinkedQueueTest.offer();
for(int i=0;i<count;i++){
es.submit(new Poll());
}
latch.await();
System.out.println("cost time "+(System.currentTimeMillis()-timeStart)+"ms");
es.shutdown();
} public static void offer(){
for(int i=0;i<100000;i++){
queue.offer(i);
}
} static class Poll implements Runnable{ @Override
public void run() {
while (!queue.isEmpty()){
System.out.println(queue.poll());
}
latch.countDown();
}
}
}

阻塞队列

BlockingQueue

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque; public class BlockingQueueTest {
public class Basket{
BlockingQueue<String> basket=new LinkedBlockingDeque<>(3); public void produce() throws InterruptedException{
basket.put("An apple");
} public String consume() throws InterruptedException{
return basket.take();
}
}
public class Producer implements Runnable{
private String instance;
private Basket basket; public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
} @Override
public void run() {
try {
while (true){
System.out.println("生产者准备生产苹果: "+instance);
basket.produce();
System.out.println("!生产者生产苹果完毕: "+instance);
Thread.sleep(300);
}
} catch (InterruptedException e) {
System.out.println("Producer Interrupted");
}
}
} public class Consumer implements Runnable{
private String instance;
private Basket basket; public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
} @Override
public void run() {
try {
while (true){
System.out.println("消费者准备消费苹果: "+instance);
System.out.println(basket.consume());
System.out.println("!消费者消费苹果完毕: "+instance);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args){
BlockingQueueTest test=new BlockingQueueTest();
Basket basket=test.new Basket();
ExecutorService service= Executors.newCachedThreadPool();
Producer producer=test.new Producer("生产者001",basket);
Producer producer1=test.new Producer("生产者002",basket);
Consumer consumer=test.new Consumer("消费者001",basket);
service.submit(producer);
service.submit(producer1);
service.submit(consumer);
}
}

并发工具类

CyclicBarrier

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class CyclicBarrierDemo {
public static void main(String[] args) throws Exception{
CyclicBarrier barrier=new CyclicBarrier(3);
ExecutorService service= Executors.newFixedThreadPool(3);
service.submit(new Thread(new Runner(barrier, "1号选手")));
service.submit(new Thread(new Runner(barrier, "2号选手")));
service.submit(new Thread(new Runner(barrier, "3号选手")));
service.shutdown();
}
static class Runner implements Runnable{
private CyclicBarrier barrier;
private String name; public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
} @Override
public void run() {
try {
Thread.sleep(1000*(new Random()).nextInt(8));
System.out.println(name+" 准备好了...");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name+" 起跑!");
}
}
}

Java 并发优化

CountDownLatch

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class CountDownLatchDemo {
private static final int PLAYER_AMOUNT=5;
public static void main(String[] args){
CountDownLatchDemo demo=new CountDownLatchDemo();
demo.exec();
} public void exec(){
CountDownLatch begin=new CountDownLatch(1);
CountDownLatch end=new CountDownLatch(PLAYER_AMOUNT);
Player[] players=new Player[PLAYER_AMOUNT];
for(int i=0;i<PLAYER_AMOUNT;i++){
players[i]=new Player(i+1,begin,end);
}
ExecutorService service= Executors.newFixedThreadPool(PLAYER_AMOUNT);
System.out.println("Race begins!");
begin.countDown();
for(Player p:players){
service.execute(p);
}
try {
end.await();
} catch (Exception e) {
e.printStackTrace();
}finally {
System.out.println("Race ends!");
}
service.shutdown();
}
class Player implements Runnable{
private int id;
private CountDownLatch begin;
private CountDownLatch end; public Player(int id, CountDownLatch begin, CountDownLatch end) {
this.id = id;
this.begin = begin;
this.end = end;
} @Override
public void run() {
try {
begin.await();
Thread.sleep((long)(Math.random()*100));
System.out.println("Play"+id+" arrived.");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
end.countDown();
}
}
}
}

Java 并发优化

Semaphore

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; public class SemaphoreDemo {
public static void main(String[] args){
ExecutorService service= Executors.newCachedThreadPool();
final Semaphore semaphore=new Semaphore(5);
for(int i=0;i<20;i++){
final int no=i;
Runnable runnable=new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Accessing: "+no);
Thread.sleep((long)(Math.random()*10000));
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}