多线程 一个生产者 多个消费者 问题

时间:2022-10-16 19:36:47
有一个读取数据库的线程(product),一次读取一定数量数据放到内存列表里面,然后这个线程等待 ;有多个消费线程,来将这些数据同步到别的服务器上面去,然后更新数据库状态,当内存内的数据消化完了,唤醒product线程,product线程再去读同样数量的数据, 一直这样重复。下面是伪代码。
product

public class Product implements Runnable {

private volatile boolean running = true;
        
        /**
        * 用于每次读取出来的数据的消耗计数
        */
public static CountDownLatch countDownLatch = null;

public static final BlockingQueue<T> blockList = new LinkedBlockingQueue<T>();

@Override
public void run() {

while (running) {
try {
List<T> list = //数据库读取数据,状态0的数据 , 100条
if (list.size() == 0) {
//没有数据等待20秒
TimeUnit.SECONDS.sleep(20);
} else {
                                        countDownLatch = new CountDownLatch(list.size());
blockList.addAll(list);
countDownLatch.await();
}

} catch (InterruptedException e) {
// 不考虑被打断的情况

}
}


consume:

public class Consume implements Runnable {

private boolean running = true;

@Override
public void run() {
while (running) {
   try {
   T t= Product.blockList.take();
     if (running) {
  //更新数据库t数据的状态,更新为1
     }
  Product.countDownLatch.countDown();
            } catch (InterruptedException e) {// 忽略  }
}
}
}



就剩下91分啦,全上了,大家看看有啥问题不。 多线程 一个生产者 多个消费者 问题

8 个解决方案

#1


自顶沙发。。。

#2


一点不懂:多线程的我,没任何想法

#3


多线程 一个生产者 多个消费者 问题貌似没有什么问题,
一个简单的coutdown的应用吧,
我没实际中用过这个,
等大神来点评

#4


你的思路,已经可以实现了呀,有什么问题呢?


/**
 * 
 */
package com.zuxiang.countdown;

import java.util.concurrent.CountDownLatch;

/**
 * @author zuxiang
 *
 */
public class Product implements Runnable {  
  
    private CountDownLatch downLatch;  
      
    public Product(CountDownLatch downLatch){  
        this.downLatch = downLatch;  
    }  
      
    public void run() {  
        System.out.println("正等待数据消费完......");  
        try {  
            this.downLatch.await();  
        } catch (InterruptedException e) {  
        }  
        System.out.println("所有数据都消费完了,再到数据库查询数据 ....!");  
    }  
  
 
}  
/**
 * 
 */
package com.zuxiang.countdown;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author zuxiang
 * 
 */
public class ProductToConsumptionTest {

public static void main(String[] args) {  
        ExecutorService executor = Executors.newCachedThreadPool();  
          
        Dao dao = new Dao();
        
        List<String> datas = dao.getData();
        

        
        if (datas != null && !datas.isEmpty()){
        
            CountDownLatch latch = new CountDownLatch(datas.size());  
            
            List<Consumption> consumptions = new ArrayList<Consumption>();  
      
            Consumption consumption;
            
            Product product = new Product(latch);  
        
         for (String str : datas){
        
         consumption = new Consumption(latch,str);  
         consumptions.add(consumption);
         }
        
         product = new Product(latch); 
        
         for (Consumption c : consumptions){
        
         executor.execute(c);
         }
        
         executor.execute(product);  
        
         executor.shutdown(); 
        
       
        }
        
        
        
          
        
}
}

/**
 * 
 */
package com.zuxiang.countdown;

import java.util.ArrayList;
import java.util.List;

/**
 * @author zuxiang
 *
 */
public class Dao {

   /**
     * 查询数据
     * @return
     */
    public List<String> getData(){
    
     List<String> list = new ArrayList<String>();
    
     for (int i = 0; i < 10 ; i++)
     {
     list.add("data---" + i);
     }
     System.out.println("数据库查询数据 完成!");
    
     return list;
    }
}
/**
 * 
 */
package com.zuxiang.countdown;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author zuxiang
 *
 */
public class Consumption implements Runnable{  
    
    private CountDownLatch downLatch;  
    private String name;  
      
    public Consumption(CountDownLatch downLatch, String name){  
        this.downLatch = downLatch;  
        this.name = name;  
    }  
      
    public void run() {  
        this.doConsumption();  
        try{  
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));  
        }catch(InterruptedException ie){  
        }  
        System.out.println( this.name + "数据同步到服务器完毕!");  
        this.downLatch.countDown();  
          
    }  
      
    private void doConsumption(){  
        System.out.println(this.name + "正在同步数据到服务器.....");  
    }  
      
}  



#5


有没有问题不是我们来看的,应该是你自己来测试来发现的。
你测试N次都没有问题的话,那基本就是没问题了。

#6


你这样效率很低的。操作系统原理你学过吗?你需要的实际上是PV操作。
Java的队列支持
wait();等待
notify();唤醒的
所以你的程序里其实缺少的是线程同步锁。

#7


引用 6 楼 supperman_009 的回复:
你这样效率很低的。操作系统原理你学过吗?你需要的实际上是PV操作。
Java的队列支持
wait();等待
notify();唤醒的
所以你的程序里其实缺少的是线程同步锁。

这个题目是不牵扯到效率问题的。
一分钟运算100次和1分钟运算10次效果是一样的。
这个题看重的是结果。

#8


>> 当内存内的数据消化完了,唤醒product线程

楼主我觉得你这样线性操作的话,两个线程意义就不大了,多线程如果不能同时操作的话,单线程不就够了?

#1


自顶沙发。。。

#2


一点不懂:多线程的我,没任何想法

#3


多线程 一个生产者 多个消费者 问题貌似没有什么问题,
一个简单的coutdown的应用吧,
我没实际中用过这个,
等大神来点评

#4


你的思路,已经可以实现了呀,有什么问题呢?


/**
 * 
 */
package com.zuxiang.countdown;

import java.util.concurrent.CountDownLatch;

/**
 * @author zuxiang
 *
 */
public class Product implements Runnable {  
  
    private CountDownLatch downLatch;  
      
    public Product(CountDownLatch downLatch){  
        this.downLatch = downLatch;  
    }  
      
    public void run() {  
        System.out.println("正等待数据消费完......");  
        try {  
            this.downLatch.await();  
        } catch (InterruptedException e) {  
        }  
        System.out.println("所有数据都消费完了,再到数据库查询数据 ....!");  
    }  
  
 
}  
/**
 * 
 */
package com.zuxiang.countdown;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author zuxiang
 * 
 */
public class ProductToConsumptionTest {

public static void main(String[] args) {  
        ExecutorService executor = Executors.newCachedThreadPool();  
          
        Dao dao = new Dao();
        
        List<String> datas = dao.getData();
        

        
        if (datas != null && !datas.isEmpty()){
        
            CountDownLatch latch = new CountDownLatch(datas.size());  
            
            List<Consumption> consumptions = new ArrayList<Consumption>();  
      
            Consumption consumption;
            
            Product product = new Product(latch);  
        
         for (String str : datas){
        
         consumption = new Consumption(latch,str);  
         consumptions.add(consumption);
         }
        
         product = new Product(latch); 
        
         for (Consumption c : consumptions){
        
         executor.execute(c);
         }
        
         executor.execute(product);  
        
         executor.shutdown(); 
        
       
        }
        
        
        
          
        
}
}

/**
 * 
 */
package com.zuxiang.countdown;

import java.util.ArrayList;
import java.util.List;

/**
 * @author zuxiang
 *
 */
public class Dao {

   /**
     * 查询数据
     * @return
     */
    public List<String> getData(){
    
     List<String> list = new ArrayList<String>();
    
     for (int i = 0; i < 10 ; i++)
     {
     list.add("data---" + i);
     }
     System.out.println("数据库查询数据 完成!");
    
     return list;
    }
}
/**
 * 
 */
package com.zuxiang.countdown;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author zuxiang
 *
 */
public class Consumption implements Runnable{  
    
    private CountDownLatch downLatch;  
    private String name;  
      
    public Consumption(CountDownLatch downLatch, String name){  
        this.downLatch = downLatch;  
        this.name = name;  
    }  
      
    public void run() {  
        this.doConsumption();  
        try{  
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));  
        }catch(InterruptedException ie){  
        }  
        System.out.println( this.name + "数据同步到服务器完毕!");  
        this.downLatch.countDown();  
          
    }  
      
    private void doConsumption(){  
        System.out.println(this.name + "正在同步数据到服务器.....");  
    }  
      
}  



#5


有没有问题不是我们来看的,应该是你自己来测试来发现的。
你测试N次都没有问题的话,那基本就是没问题了。

#6


你这样效率很低的。操作系统原理你学过吗?你需要的实际上是PV操作。
Java的队列支持
wait();等待
notify();唤醒的
所以你的程序里其实缺少的是线程同步锁。

#7


引用 6 楼 supperman_009 的回复:
你这样效率很低的。操作系统原理你学过吗?你需要的实际上是PV操作。
Java的队列支持
wait();等待
notify();唤醒的
所以你的程序里其实缺少的是线程同步锁。

这个题目是不牵扯到效率问题的。
一分钟运算100次和1分钟运算10次效果是一样的。
这个题看重的是结果。

#8


>> 当内存内的数据消化完了,唤醒product线程

楼主我觉得你这样线性操作的话,两个线程意义就不大了,多线程如果不能同时操作的话,单线程不就够了?