java 多线程处理一个list的集合

时间:2022-07-25 18:30:53
各位大神,我现在的一个List<Object>中有1w条记录,要把这个List遍历的的内容拼接起来。我启动5个线程同时处理,每个线程处理2000条记录。然后把5个线程处理的内容拼接起来形成一个总的list<Object>。 请大家给一个简单的demo。谢谢

8 个解决方案

#1


你去找个多线程下载的demo改下就可以
话说list的定位高效么?先转成数组再随机访问会不会好点

#2


多线程 同时操作一个数据源 要加锁  你这效率一点都不高

#3


你这个还需要用到call()方法吧,因为需要用到每个线程的返回值,而且要用到collections工具来实现安全模式,支持多线程访问。给你提供个方向,实现就不帮你了啊,还是有点麻烦呢

#4


个人觉得主要要处理的是线程间通信的问题,当启动5个线程时,要实时保证5个线程中读取的数据各不相同且连续,才能保证计算结果的正确性。
多线程计算可以使用Thread的join方法,保证所有线程计算完毕等待。
至于线程间通信,Java使用的是内存共享达到线程间通信,你可以使用同步的全局变量进行线程间的通信,这样可以使得同步范围最小化,提高效率。

#5


下面是实现代码,使用了Fork/Join框架。
Fork/Join框架是Java7提供的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;


public class FJTest {
private List<String> list;
public static void main(String[] args) {
System.out.println(new FJTest().run().length());
}

private void init(){
//模拟初始化1W条数据
list = new ArrayList<String>();
for (int i = 0; i < 10000; i++) {
list.add(i + "");
}
}

public String run(){
init();
ForkJoinPool pool = new ForkJoinPool(5);
Task task = new Task(list);
Future<String> result =  pool.submit(task);
String str = "";
try {
str = result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return str;
}

class Task extends RecursiveTask<String>{
private int size = 2000;
private List<String> data;
public Task(List<String> data){
this.data = data;
}
@Override
protected String compute() {
StringBuffer sb = new StringBuffer();
if(data.size() <= size){
System.out.println("******************************** size:" + data.size());
for (String str : data) {
sb.append(str);
}
}else{
//细分成小任务
List<Task> tasks = new ArrayList<FJTest.Task>();
for (int index = 0; index * size < data.size(); index++) {
Task task;
if((index + 1) * size > data.size()){
task = new Task(data.subList(index * size, data.size()));
}else{
task = new Task(data.subList(index * size, (index + 1) * size));
}
task.fork();
tasks.add(task);
}
for (Task task : tasks) {
sb.append(task.join());
}
}

return sb.toString();
}

}

}

#6


你这个问题要用到forkjoinpool  楼主可以查查  这个专门用于多线程一起协作完成一个任务

#7


利用Callable轻松实现:
public void dealListWithMutiThread(){
List<Object> list = new ArrayList<Object>(10000);
int index = 0;
ExecutorService ex = Executors.newFixedThreadPool(5);
int dealSize = 2000;
List<Future<List<Object>>> futures = new ArrayList<>(5);
                //分配
for(int i=0;i<5;i++,index+=dealSize){
int start = index;
if(start>=list.size()) break;
int end = start + dealSize;
end = end>list.size() ? list.size() : end;
futures.add(ex.submit(new Task(list,start,end)));
}
try {
//处理
List<Object>  result = new ArrayList<>();
for(Future<List<Object>> future : futures){
//合并操作
result.addAll(future.get());
}
} catch (Exception e) {
e.printStackTrace();
}
}

private class Task implements Callable<List<Object>>{

private List<Object> list;
private int start;
private int end;

public Task(List<Object> list,int start,int end){
this.list = list;
this.start = start;
this.end = end;
}

@Override
public List<Object> call() throws Exception {
Object obj = null;
List<Object> retList = new ArrayList<Object>();
for(int i=start;i<end;i++){
obj = list.get(i);
//你的处理逻辑
}
//返回处理结果
return retList;
}
}

#8


引用 5 楼 youshu2011 的回复:
下面是实现代码,使用了Fork/Join框架。
Fork/Join框架是Java7提供的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;


public class FJTest {
private List<String> list;
public static void main(String[] args) {
System.out.println(new FJTest().run().length());
}

private void init(){
//模拟初始化1W条数据
list = new ArrayList<String>();
for (int i = 0; i < 10000; i++) {
list.add(i + "");
}
}

public String run(){
init();
ForkJoinPool pool = new ForkJoinPool(5);
Task task = new Task(list);
Future<String> result =  pool.submit(task);
String str = "";
try {
str = result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return str;
}

class Task extends RecursiveTask<String>{
private int size = 2000;
private List<String> data;
public Task(List<String> data){
this.data = data;
}
@Override
protected String compute() {
StringBuffer sb = new StringBuffer();
if(data.size() <= size){
System.out.println("******************************** size:" + data.size());
for (String str : data) {
sb.append(str);
}
}else{
//细分成小任务
List<Task> tasks = new ArrayList<FJTest.Task>();
for (int index = 0; index * size < data.size(); index++) {
Task task;
if((index + 1) * size > data.size()){
task = new Task(data.subList(index * size, data.size()));
}else{
task = new Task(data.subList(index * size, (index + 1) * size));
}
task.fork();
tasks.add(task);
}
for (Task task : tasks) {
sb.append(task.join());
}
}

return sb.toString();
}

}

}


这个预期结果是字符串拼接1到10000,但是实际输出结果是48896.显然有问题啊

#1


你去找个多线程下载的demo改下就可以
话说list的定位高效么?先转成数组再随机访问会不会好点

#2


多线程 同时操作一个数据源 要加锁  你这效率一点都不高

#3


你这个还需要用到call()方法吧,因为需要用到每个线程的返回值,而且要用到collections工具来实现安全模式,支持多线程访问。给你提供个方向,实现就不帮你了啊,还是有点麻烦呢

#4


个人觉得主要要处理的是线程间通信的问题,当启动5个线程时,要实时保证5个线程中读取的数据各不相同且连续,才能保证计算结果的正确性。
多线程计算可以使用Thread的join方法,保证所有线程计算完毕等待。
至于线程间通信,Java使用的是内存共享达到线程间通信,你可以使用同步的全局变量进行线程间的通信,这样可以使得同步范围最小化,提高效率。

#5


下面是实现代码,使用了Fork/Join框架。
Fork/Join框架是Java7提供的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;


public class FJTest {
private List<String> list;
public static void main(String[] args) {
System.out.println(new FJTest().run().length());
}

private void init(){
//模拟初始化1W条数据
list = new ArrayList<String>();
for (int i = 0; i < 10000; i++) {
list.add(i + "");
}
}

public String run(){
init();
ForkJoinPool pool = new ForkJoinPool(5);
Task task = new Task(list);
Future<String> result =  pool.submit(task);
String str = "";
try {
str = result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return str;
}

class Task extends RecursiveTask<String>{
private int size = 2000;
private List<String> data;
public Task(List<String> data){
this.data = data;
}
@Override
protected String compute() {
StringBuffer sb = new StringBuffer();
if(data.size() <= size){
System.out.println("******************************** size:" + data.size());
for (String str : data) {
sb.append(str);
}
}else{
//细分成小任务
List<Task> tasks = new ArrayList<FJTest.Task>();
for (int index = 0; index * size < data.size(); index++) {
Task task;
if((index + 1) * size > data.size()){
task = new Task(data.subList(index * size, data.size()));
}else{
task = new Task(data.subList(index * size, (index + 1) * size));
}
task.fork();
tasks.add(task);
}
for (Task task : tasks) {
sb.append(task.join());
}
}

return sb.toString();
}

}

}

#6


你这个问题要用到forkjoinpool  楼主可以查查  这个专门用于多线程一起协作完成一个任务

#7


利用Callable轻松实现:
public void dealListWithMutiThread(){
List<Object> list = new ArrayList<Object>(10000);
int index = 0;
ExecutorService ex = Executors.newFixedThreadPool(5);
int dealSize = 2000;
List<Future<List<Object>>> futures = new ArrayList<>(5);
                //分配
for(int i=0;i<5;i++,index+=dealSize){
int start = index;
if(start>=list.size()) break;
int end = start + dealSize;
end = end>list.size() ? list.size() : end;
futures.add(ex.submit(new Task(list,start,end)));
}
try {
//处理
List<Object>  result = new ArrayList<>();
for(Future<List<Object>> future : futures){
//合并操作
result.addAll(future.get());
}
} catch (Exception e) {
e.printStackTrace();
}
}

private class Task implements Callable<List<Object>>{

private List<Object> list;
private int start;
private int end;

public Task(List<Object> list,int start,int end){
this.list = list;
this.start = start;
this.end = end;
}

@Override
public List<Object> call() throws Exception {
Object obj = null;
List<Object> retList = new ArrayList<Object>();
for(int i=start;i<end;i++){
obj = list.get(i);
//你的处理逻辑
}
//返回处理结果
return retList;
}
}

#8


引用 5 楼 youshu2011 的回复:
下面是实现代码,使用了Fork/Join框架。
Fork/Join框架是Java7提供的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;


public class FJTest {
private List<String> list;
public static void main(String[] args) {
System.out.println(new FJTest().run().length());
}

private void init(){
//模拟初始化1W条数据
list = new ArrayList<String>();
for (int i = 0; i < 10000; i++) {
list.add(i + "");
}
}

public String run(){
init();
ForkJoinPool pool = new ForkJoinPool(5);
Task task = new Task(list);
Future<String> result =  pool.submit(task);
String str = "";
try {
str = result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return str;
}

class Task extends RecursiveTask<String>{
private int size = 2000;
private List<String> data;
public Task(List<String> data){
this.data = data;
}
@Override
protected String compute() {
StringBuffer sb = new StringBuffer();
if(data.size() <= size){
System.out.println("******************************** size:" + data.size());
for (String str : data) {
sb.append(str);
}
}else{
//细分成小任务
List<Task> tasks = new ArrayList<FJTest.Task>();
for (int index = 0; index * size < data.size(); index++) {
Task task;
if((index + 1) * size > data.size()){
task = new Task(data.subList(index * size, data.size()));
}else{
task = new Task(data.subList(index * size, (index + 1) * size));
}
task.fork();
tasks.add(task);
}
for (Task task : tasks) {
sb.append(task.join());
}
}

return sb.toString();
}

}

}


这个预期结果是字符串拼接1到10000,但是实际输出结果是48896.显然有问题啊