多线程ManualResetEvent、等待所有线程

时间:2021-11-06 18:28:35

需求:成员A可能有几十个,我需要更新所有的A,然后根据A的数据,去更新成员B。

 

解决方案:思路是想通过多线程更新所有的A,然后通过等待线程来确定所有的A是否都更新完,最后更新B。

Member B = ....;//B成员的model
IList<Member> list = ......;//查出所有的A成员,装进list里。

ManualResetEvent[] manualEvents
= new ManualResetEvent[list.Count];
//更新所有的A成员
for (int i = 0; i < list.Count; i++)
{
manualEvents[i]
= new ManualResetEvent(false);//初始化的ManualResetEvent必须是false
model = new te();//目前我只知道此处只能传一个参数,所以为了传递多个参数,我写了个model,然后把要参数都放进model里
model.MRevent = manualEvents[i];
model.member
= list[i];//此处是我处理数据需要的参数,把成员的model传进去
ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateMember), model);
}
//等待所有线程执行完毕
WaitHandle.WaitAll(manualEvents);

//更新B成员
model = new te();
model.member
= B;
model.MRevent
= new ManualResetEvent(false);
UpdateMember(model);

处理数据的方法

public void UpdateMember(object tsmodel)
{
te stateInfo
= (te)tsmodel;
Member member = stateInfo.member;
//中间为处理数据更新成员数据
......

stateInfo.MRevent.Set();
//将事件状态设置为终止状态,允许一个或多个等待线程继续(我个人理解是调用此方法,及为该线程结束,ManualResetEvent应该变成true)
}

新建立的model

public class te
{
public Member member{ get; set; }
public ManualResetEvent MRevent { get; set; }//小于64线程的时候使用
public MutipleThreadResetEvent MTRevent { get; set; }//大于64线程的时候使用
}

 

此处出现了新问题:

当线程大于64条时,会报错。应该是WaitHandle.WaitAll(manualEvents);这等待的线程数不能大于64。

错误信息:waithandles 的数目必须少于或等于 64 个。

经过网上查询,找到的解决办法,原文地址http://www.cnblogs.com/xiaofengfeng/archive/2012/12/27/2836183.html

 

原理:封装一个ManualResetEvent对象,一个计数器current,提供SetOne和WaitAll方法;

主线程调用WaitAll方法使ManualResetEvent对象等待唤醒信号;

各个子线程调用setOne方法 ,setOne每执行一次current减1,直到current等于0时表示所有子线程执行完毕 ,调用ManualResetEvent的set方法,这时主线程可以执行WaitAll之后的步骤。

目标:减少ManualResetEvent对象的大量产生和使用的简单性。

/********************************************************************************
* Copyright © 2001 - 2010Comit. All Rights Reserved.
* 文件:MutipleThreadResetEvent.cs
* 作者:杨柳
* 日期:2010年11月13日
* 描述:封装 ManualResetEvent ,该类允许一次等待N(N>64)个事件执行完毕
*
* 解决问题:WaitHandle.WaitAll(evetlist)方法最大只能等待64个ManualResetEvent事件
* ********************************************************************************
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace TestMutipleThreadRestEvent
{
/// <summary>
/// 封装ManualResetEvent
/// </summary>
public class MutipleThreadResetEvent : IDisposable
{
private readonly ManualResetEvent done;
private readonly int total;
private long current;

/// <summary>
/// 构造函数
/// </summary>
/// <param name="total">需要等待执行的线程总数</param>
public MutipleThreadResetEvent(int total)
{
this.total = total;
current
= total;
done
= new ManualResetEvent(false);
}

/// <summary>
/// 唤醒一个等待的线程
/// </summary>
public void SetOne()
{
// Interlocked 原子操作类 ,此处将计数器减1
if (Interlocked.Decrement(ref current) == 0)
{
//当所以等待线程执行完毕时,唤醒等待的线程
done.Set();
}
}

/// <summary>
/// 等待所以线程执行完毕
/// </summary>
public void WaitAll()
{
done.WaitOne();
}

/// <summary>
/// 释放对象占用的空间
/// </summary>
public void Dispose()
{
((IDisposable)done).Dispose();
}
}

}

本质就是只通过1个ManualResetEvent 对象就可以实现同步N(N可以大于64)个线程

修改后的方法:

Member B = ......;//B成员的model
IList<Member> list = ......;//查出所有的A成员,装进list里。
//更所有A成员
using (var countdown = new MutipleThreadResetEvent(list.Count))
{
for (int i = 0; i < list.Count; i++)
{
model
= new te();
model.MTRevent
= countdown;
model.member
= list[i];
//开启N个线程,传递MutipleThreadResetEvent对象给子线程
ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateMember), model);
}

//等待所有线程执行完毕
countdown.WaitAll();
}
//更新B成员
using (var countdown = new MutipleThreadResetEvent(1))
{
model
= new te();
model.MTRevent
= countdown;
model.member
= B;
//开启N个线程,传递MutipleThreadResetEvent对象给子线程
ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateMember), model);
//等待所有线程执行完毕
countdown.WaitAll();
}

处理数据的方法需要小改一下:

public void UpdateMember(object tsmodel)
{
te stateInfo
= (te)tsmodel;
Member member
= stateInfo.member;
//中间为处理数据更新成员数据
......

stateInfo.MTRevent.SetOne();//此处调用封装的方法

}

此时,大于64条线程的可以使用了。