Java并发库(十九):同步集合类的使用BlockingQueue、

时间:2021-10-15 18:17:48

深切怀念传智播客张孝祥老师,特将其代表作——Java并发库视频研读两遍,受益颇丰,记以后阅

19.java5同步集合类的应用

       传统集合实现同步的问题

       举了一个例子:Map集合线程不同步导致的问题。

       解决办法:使用同步的Map集合      使用集合工具类中的方法将不同步的集合转为同步的Collections.synchronizedMap(newMap())这个方法返回一个同步的集合

       publicstatic <K, V> Map<K, V> synchronizedMap(Map<K, V> m)

{return newSynchronizedMap<K, V>(m);}

SynchronizedMap类相当于一个代理类,通过查看源代码发现:该类中的所有方法都是直接返回:原Map集合方法调用后的结果,只是将返回结果的代码放在了同步代码块中以实现同步,构造是将同步锁默认置为当前对象。

HashSet与HashMap的关系与区别:

       HashSet是单列的,HashMap是双列的(键值对)

       关系:HashSet内部使用的是HashMap中的键,不考虑值。

查看HashSet的源代码发现其内部就是用HashMap实现的,只是没有使用HashMap的V,只使用了它的K。

 

JDK1.5中提供了并发 Collection:提供了设计用于多线程上下文中的 Collection 实现:

ConcurrentHashMapConcurrentSkipListMapConcurrentSkipListSetCopyOnWriteArrayListCopyOnWriteArraySet。当期望许多线程访问一个给定 collection时,ConcurrentHashMap通常优于同步的 HashMapConcurrentSkipListMap通常优于同步的 TreeMap。当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList优于同步的 ArrayList

ConcurrentSkipListMap<K,V>映射可以根据键的自然顺序进行排序,也可以根据创建映射时所提供的Comparator 进行排序,具体取决于使用的构造方法。

ConcurrentSkipListSet<E>一个基于 ConcurrentSkipListMap 的可缩放并发 NavigableSet 实现。set 的元素可以根据它们的自然顺序进行排序,也可以根据创建 set 时所提供的Comparator 进行排序,具体取决于使用的构造方法

CopyOnWriteArrayList<E>ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。

这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。“

CopyOnWriteArraySet<E>对其所有操作使用内部 CopyOnWriteArrayList Set。因此,它共享以下相同的基本属性:

它最适合于具有以下特征的应用程序:set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。它是线程安全的。 因为通常需要复制整个基础数组,所以可变操作(add、set 和 remove 等等)的开销很大。 迭代器不支持可变 remove 操作。 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

 

       传统集合中存在的其它问题:对集合迭代时,不能对集合中的元素进行修改(添加、删除……),Java5中提供的并发集合就解决了这个问题。

 传统集合在迭代的时候修改抛异常的问题。

public class User implements Cloneable{
	private String name;
	private int age;
	
	public User(String name, int age) {
		this.name = name;
		this.age = age;
	}
	public boolean equals(Object obj) {
		if(this == obj) {
			return true;
		}
		if(!(obj instanceof User)) {
			return false;	
		}
		User user = (User)obj;
		//if(this.name==user.name && this.age==user.age)
		if(this.name.equals(user.name) 
			&& this.age==user.age) {
			return true;
		}
		else {
			return false;
		}
	}
	public int hashCode() {
		return name.hashCode() + age;
	}
	
	public String toString() {
		return "{name:'" + name + "',age:" + age + "}";
	}
	public Object clone()  {
		Object object = null;
		try {
			object = super.clone();
		} catch (CloneNotSupportedException e) {}
		return object;
	}
	public void setAge(int age) {
		this.age = age;
	}
	public String getName() {
		return name;
	}
} 
public class CollectionModifyExceptionTest {
	public static void main(String[] args) {
		Collection users = new CopyOnWriteArrayList();
<span style="white-space:pre">		</span>//<span style="font-family: Arial, Helvetica, sans-serif;">Collection users = new ArrayList();</span><span style="white-space:pre">
</span>
			
			//new ArrayList();
		users.add(new User("张三",28));	
		users.add(new User("李四",25));			
		users.add(new User("王五",31));	
		Iterator itrUsers = users.iterator();
		while(itrUsers.hasNext()){
			System.out.println("aaaa");
			User user = (User)itrUsers.next();
			if("李四".equals(user.getName())){
				users.remove(user);
				//itrUsers.remove();
			} else {
				System.out.println(user);				
			}
		}
	}
}



20. 空中网挑选实习生的面试题1

       现有的程序代码模拟产生了16个日志对象,并且需要运行16秒才能打印完这些日志,请在程序中增加4个线程去调用parseLog()方法来分头打印这16个日志对象,程序只需要运行4秒即可打印完这些日志对象。原始代码如下:

public class Test1

{

       publicstatic void main(String[] args)

       {

       SOP(begin:+sys.currentTimeMillis()/1000);

       //模拟处理16行日志,下面的代码产生16个日志对象,需运行16秒才能打印完

       //修改程序代码,开4个线程让这16个日志在4秒钟打完

       for (iint i=0; i<16; i++) //这行代码不能改动

       {

       final String log = “”+(i+1);   //这行代码不能改动

       {

       Test1.parseLog(log);

}

}

}

//parseLog方法内部代码不能改动

public staticvoid parseLog(String log)

{

       SOP(log+”:”+(sys.currentTimeMillis()/1000));

       try

       {

       Thread.sleep(1000);

}

       catch(InterruptedException e)

       {

       e.printStackTrace();

}

}

}

刚看到题目还想着很简单;直接在Test.parseLog(log)的地方new4个线程,都执行打印任务即可,仔细一看不行,在这里new4个线程的话就是16*4个线程了,所以要将线程在for循环外边创建出来,for内部将产生的日志对象装在一个共享变量里,在线程内部从共享变量中取数据打印。要考虑线程同步互斥问题,这个共享变量要具备同步功能,可以使用ArrayBlockingQueue这种阻塞式队列来存储日志对象。也可以使用普通集合,但拿数据要考虑同步问题,可能会浪费时间。

在for循环外部创建线程,定义共享变量

finalBlockingQueue<String> queue = new ArrayBlockingQueue<String>(16);

       for(int i=0; i<4; i++)    创建4个线程

       {

       new Thread(new Runnable()

{

       public void run()

       {

       while (true)

       {     先从集合中取到日志对象,再打印

       String log =queue.take();     要处理异常

       parseLog(log);

}

}

}).start();

}

在16次for循环内部将产生的日志对象装入共享变量中

queue.put(log);      要处理异常

这道题只要用到同步集合来共享数据就可以了  List集合的Vector也可以

 public class Test {
    
    public static void main(String[] args){
        final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
        for(int i=0;i<4;i++){
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            String log = queue.take();
                            parseLog(log);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
                
            }).start();
        }
        
        System.out.println("begin:"+(System.currentTimeMillis()/1000));
        /*模拟处理16行日志,下面的代码产生了16个日志对象,当前代码需要运行16秒才能打印完这些日志。
        修改程序代码,开四个线程让这16个对象在4秒钟打完。
        */
        for(int i=0;i<16;i++){  //这行代码不能改动
            final String log = ""+(i+1);//这行代码不能改动
            {
                    try {
                        queue.put(log);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                     //Test.parseLog(log);
            }
        }
    }
    
    //parseLog方法内部的代码不能改动
    public static void parseLog(String log){
        System.out.println(log+":"+(System.currentTimeMillis()/1000));
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        
    }
    
}

21. 空中网挑选实习生的面试题2

       现成程序中的Test类中的代码在不断地产生数据,然后交给TestDo.doSome()方法去处理。就好像生产者不断地产生数据,消费者不断地消费数据。请将程序改造成有10个线程来消费生产者产生的数据,这些消费者都调用TestDo.doSome()方法去进行处理,故每个消费者都需要一秒才能处理完,程序应保证这些消费者线程依次有序地消费数据,只有上一个消费者消费完后,下一个消费者才能消费数据,下一个消费者是谁都可以,但要保证这些消费者线程拿到的数据是有序的。原始代码如下:

public class Test2

{

       publicstatic void main(String[] args)

       {

       SOP(begin+sys.currentTimeMillis()/1000);

       for (int i=0; i<10; i++)  //这行不能改动

       {

       String input = i+””;       //这行不能改动

       String output =TeatDo.doSome(input);

       SOP(ThreadName+output);

}

}

}

//不能改动此TestDo类

class TestDo

{

       publicstatic String doSome(String input)

       {

       try

       {

       Thread.sleep(1000);

}

       catch (InterruptedException e)

       {

       e.printStackTrace();

}

String output = input + “:” + (Sys.currentTimeMillis());

return output;

}

}

       看这题和上一题差不多,也是数据共享问题了,弄一个同步集合存起来。

       用同样的方法一样解决 newArrayBlockingQueue()

张老师又讲了另一个同步队列:SynchronousQueue一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。

SynchronousQueue一个特殊队列,即便是空的也不能插入元素,也读不到元素,要往里边插入的时候如果没有读取操作,插入操作就会阻塞,等到有读取操作出现时,插入操作检测到了读取操作,才能把数据插入进去,而读取操作正好可以拿到刚刚插入进去的数据。就好比毒品买卖,我拿着毒品给谁呢,只有买毒品的人来了,才能立马给他,他也拿到了。与Exchanger类似,不过Exchanger是单对单的交换,SynchronousQueue可以多个抢数据,我拿着毒品等人来买,一下来了3个人买,谁抢到了就是谁的;或者我拿3包毒品,3个人同时每人一份。

这道题用synchronousQueue的话会一下子将10个数据全打印出来,因为10次循环一次放一个并没有人来取,所以没有放进去,后来一下10个线程来取数据,就一下放进去拿走了。我测试的时候没有这种情况,都是间隔一秒一秒的。测试后发现,将doSome处理后的结果存进去,就会有间隔,而直接存进去,取数据后再处理的话就是一下一片了。分析后知道:put时没有take,10个数据都在等待存入,如果存入的数据是doSome(input)的话,开始取数据时才开始执行doSome所以就会有间隔了。直接存数据,取出后在doSome就是一下拿到10个数据了。

要解决这个问题,可以使用厕所抢坑的方式解决,使用Semaphore来获取许可,每取一次数据释放一次即可。

final Semaphore x = new Semaphore(1);     一次一个

final SynchronousQueue queue = new SynchronousQueue();

每次取数据前都要先获取许可

x.acquire();

取完后释放许可

x.release();

这种方式与使用Lock方式一样

 public class Test {

    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(1);
        final SynchronousQueue<String> queue = new SynchronousQueue<String>();
        for(int i=0;i<10;i++){
            new Thread(new Runnable(){
                @Override
                public void run() {    
                    try {
                        semaphore.acquire();
                        String input = queue.take();
                        String output = TestDo.doSome(input);
                        System.out.println(Thread.currentThread().getName()+ ":" + output);
                        semaphore.release();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }    
                }
            }).start();
        }
        
        System.out.println("begin:"+(System.currentTimeMillis()/1000));
        for(int i=0;i<10;i++){  //这行不能改动
            String input = i+"";  //这行不能改动
            try {
                queue.put(input);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

//不能改动此TestDo类
class TestDo {
    public static String doSome(String input){
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String output = input + ":"+ (System.currentTimeMillis() / 1000);
        return output;
    }
}

22. 空中网挑选实习生的面试题3

       现有程序同时启动了4个线程去调用TestDo.doSome(key, value)方法,由于TestDo.doSome(key, value)方法内的代码是先暂停1秒,然后再输出以秒为单位的当前时间值,所以,会打印出4个相同的时间值,如下所示:

              4:4:1258199615

              1:1:1258199615

              3:3:1258199615

              1:2:1258199615

       请修改代码,如果有几个线程调用TestDo.doSome(key,value)方法时,传递进去的key相等(equals比较为true),则这几个线程应互斥排队输出结果,即当有两个线程的key都是“1”时,她们中的一个要比另外其他线程晚1秒输出结果,如下所示:

              4:4:1258199615

              1:1:1258199615

              3:3:1258199615

              1:2:1258199616

       总之,当每个线程中指定的key相等时,这些相等key的线程应每隔一秒依次输出时间值(要用互斥),如果key不同,则并行执行(相互之间不互斥)。原始代码如下:

//不能改动此Test类

public class Test3 extends Thread

{

       privateTestDo testDo;

       privateString key;

       privateString value;

       publicTest3(String key, String key2, String value)

       {

       this.testDo = TestDo.getInstance();

       /*常量“1”和1”是同一个对象,下面这行代码就是要用“1+“”的

方式产生新的对象,以实现内容没有改变,仍然相等(都还为“1”),

但对象却不再是同一个的效果

*/

this.key = key + key2;

this.value = value;

}

       publicstatic void main(String[] args) throws InterruptedException

       {

       Test3 a = new Test3(“1”, “”, “1”);

       Test3 b = new Test3(“1”, “”, “2”);

       Test3 c = new Test3(“3”, “”, “3”);

       Test3 d = new Test3(“4”, “”, “4”);

       SOP(begin+:+sys.currentTimeMillis()/1000);

       a.start();

       b.start();

       c.start();

       d.start();

}

       publicvoid run()

       {

       testDo.doSome(key, value);

}

}

 

class TestDo

{

       privateTestDo(){}

       privatestatic TestDo _instance = new TestDo();

public staticTestDo getInstance()

       {

       return _instance;

}

public voiddoSome(Object key, String value)

{

       //此大括号内的代码是需要局部同步的代码,不能改动!

       {

       try

       {

       Thread.sleep(1000);

       SOP(key+”:”+value+”:”+sys.currentTimeMillis()/1000);

}

       catch (InterruptedExceptione)

       {

              e.printStackTrace();

}

}

}

}

       看完这道题第一个想法是在标记位置加上同步代码块,但是锁不好弄了,因为每次都新建了一个key对象来接受实际key,没法获取到实际key对象。     

       想到了Lock对象,所以建一个Lock对象,判断key的值是否和指定值“1“相同,如果相同就锁上,不同不管,finally里在解锁前进行判断,避免没上锁还要解锁发生问题。

Lock lock = newReentrantLock();

       publicvoid doSome(Object key, String value)

       {

              if(key.equals("1"))

                     lock.lock();

                     //System.out.println("OKOKOOK");

              //synchronized("1")

              try

              //此大括号内的代码是需要局部同步的代码,不能改动!

              {

                     try

                     {

                            Thread.sleep(1000);

                            System.out.println(key+":"+value+":"+System.currentTimeMillis()/1000);

                     }

                     catch(InterruptedException e)

                     {

                            e.printStackTrace();

                     }

              }finally

              {

                     if (key.equals("1"))

                            lock.unlock();

              }

       }

但上面的方式写死了,如果换2呢 还要改代码,现在要不管是什么只要相同都互斥,将这些加进来的key放到一个集合ArrayList中,每次传进来一个key,先把传进来的key作为锁对象,再判断这个对象有没有存在锁集合中,如果没有,就把它存进去,同时就用这个key做锁;如果已经存在了,就是说这个key已经做过锁对象了,就需要将以前做锁的那个对象拿出来,再让它来当锁,与传进来的key对象一样,这样就产生互斥效果了。

       需要注意:拿原来的锁对象时要迭代锁集合,因为有多个线程在运行,所以迭代时有可能出现其他线程的key没有做过锁,需要将它加到锁集合中,可是这时候这个线程还在迭代过程中,迭代时不能操作集合中的数据,就会发生异常。要解决这个问题,就需要用到同步集合了。CopyOnWriteArrayList

 

       使用ArrayList时就经常出异常,换CopyOnWriteArrayList后没有异常了

//将所有传过来的key都存起来

    //privateList<Object> keys = new ArrayList<Object>();

    private CopyOnWriteArrayList<Object>keys =newCopyOnWriteArrayList<Object>();

    publicvoiddoSome(Object key, String value)

    {

//以下代码就是为了找到同一个对象(而不是equals的对象)

        //先用这个key当锁,用过一次就存到集合中

        Objecto = key;

        //判断这个锁用过没有

        if (!keys.contains(o))

        {

            //如果这个key没有用过,就用它当锁,把它存到锁集合中

            keys.add(o);

        }

        else  //锁集合中已经有了这个key

        {

            //这个key已经当过锁了,就把它拿出来,还用它做锁,就和现在的key互斥了

            //因为不知道原来key的位置,所有需要进行遍历

            for(Iterator<Object> it =keys.iterator();it.hasNext();)

            {

                //当前遍历到的对象

                Objectoo = it.next();

                //如果找到了,就让它做锁

                if(oo.equals(o))

                {

                    o= oo;

                    break;//找到了,不用再循环了

                }                  

            }

            //o= keys.get(keys.indexOf(o));   //key和o不是同一个对象,拿不到

        }

       

        synchronized(o)

        //此大括号内的代码是需要局部同步的代码,不能改动!

        {

            try

            {

                Thread.sleep(1000);

                System.out.println(key+":"+value+":"+System.currentTimeMillis()/1000);

            }

            catch(InterruptedException e)

            {

                e.printStackTrace();

            }

        }

    }

 

a = “1”+””;

b = “1”+””;

a和b是同一个对象,常量相加equals为真 ==为假

 

Object o1 = new String("1");

       Object o2 = new String("1");

       System.out.println(o1==o2);//false

       System.out.println(o1.equals(o2));//true

       System.out.println(o1); //1

       System.out.println(o2); //1

       Object o3 = "1"+"";

       Object o4 = "1"+"";

       System.out.println(o3==o4);//true//编译器优化

       System.out.println(o3.equals(o4));//true

       Object o5 = "2"+"";

       Object o6 = get("2","");

       System.out.println(o5==o6);//false

       System.out.println(o5.equals(o6));//true

       System.out.println(o5+"__"+o6);//2__2

      

    publicstatic Object get(String a,String b)

    {

       return a+b;

    }

 





同一个对象、equals、并发集合在迭代中修改、添加

//不能改动此Test类    
public class Test extends Thread{
    
    private TestDo testDo;
    private String key;
    private String value;
    
    public Test(String key,String key2,String value){
        this.testDo = TestDo.getInstance();
        /*常量"1"和"1"是同一个对象,下面这行代码就是要用"1"+""的方式产生新的对象,
        以实现内容没有改变,仍然相等(都还为"1"),但对象却不再是同一个的效果*/
        this.key = key+key2;
/*        a = "1"+"";
        b = "1"+""
*/
        this.value = value;
    }


    public static void main(String[] args) throws InterruptedException{
        Test a = new Test("1","","1");
        Test b = new Test("1","","2");
        Test c = new Test("3","","3");
        Test d = new Test("4","","4");
        System.out.println("begin:"+(System.currentTimeMillis()/1000));
        a.start();
        b.start();
        c.start();
        d.start();

    }
    
    public void run(){
        testDo.doSome(key, value);
    }
}

class TestDo {

    private TestDo() {}
    private static TestDo _instance = new TestDo();    
    public static TestDo getInstance() {
        return _instance;
    }

    //private ArrayList keys = new ArrayList();
    private CopyOnWriteArrayList keys = new CopyOnWriteArrayList();
    public void doSome(Object key, String value) {
        Object o = key;
        if(!keys.contains(o)){
            keys.add(o);
        }else{

            for(Iterator iter=keys.iterator();iter.hasNext();){
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                Object oo = iter.next();
                if(oo.equals(o)){
                    o = oo;
                    break;
                }
            }
        }
        synchronized(o)
        // 以大括号内的是需要局部同步的代码,不能改动!
        {
            try {
                Thread.sleep(1000);
                System.out.println(key+":"+value + ":"
                        + (System.currentTimeMillis() / 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}