为了让共享的数组,集合能够被多线程更新,我们现在(.net4.0之后)可以使用并发集合来实现这个功能。而System.Collections和System.Collections.Generic命名空间中所提供的经典列表,集合和数组都不是线程安全的,如果要使用,还需要添加代码来同步。
先看一个例子,通过并行循环向一个List<string>集合添加元素。因为List不是线程安全的,所以必须对Add方法加锁来串行化。
任务开始:
private static int NUM_AES_KEYS =;
static void Main(string[] args)
{
Console.WriteLine("任务开始...");
var sw = Stopwatch.StartNew();
for (int i = ; i < ; i++)
{
ParallelGennerateMD5Keys();
Console.WriteLine(_keyList.Count);
}
Console.WriteLine("结束时间:" + sw.Elapsed); Console.ReadKey();
}
private static List<string> _keyList; private static void ParallelGennerateMD5Keys()
{
_keyList=new List<string>(NUM_AES_KEYS);
Parallel.ForEach(Partitioner.Create(, NUM_AES_KEYS + ), range =>
{
var md5M = MD5.Create();
for (int i = range.Item1; i < range.Item2; i++)
{
byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
byte[] result = md5M.ComputeHash(data);
string hexString = ConverToHexString(result);
lock (_keyList)
{
_keyList.Add(hexString);
}
}
});
}
但如果我们去掉lock,得到的结果如下:
没有一次是满80000的。lock关键字创建了一个临界代码区,当一个任务进入之后,其他任务会被阻塞并等待进入。lock关键字引入了一定的开销,而且会降低可扩展性。对于这个问题,.Net4.0提供了System.Collections.Concurrent命名空间用于解决线程安全问题,它包含了5个集合:ConcurrentQueue<T>,ConcurrentStack<T>,ConcurrentBag<T>,BlockingCollection<T>,ConcurrentDictionary<TKey,TValue>。这些集合都在某种程度上使用了无锁技术,性能得到了提升。
ConcurrentQueue
一个FIFO(先进先出)的集合。支持多任务进并发行入队和出队操作。
ConcurrentQueue是完全无锁的,它是System.Collections.Queue的并发版本。提供三个主要的方法:
- Enqueue--将元素加入到队列尾部。
- TryDequeue--尝试删除队列头部元素。并将元素通过out参数返回。返回值为bool型,表示是否执行成功。
- TryPeek--尝试将队列头部元素通过out参数返回,但不会删除这个元素。返回值bool型,表示操作是否成功。
修改上面的代码:
private static ConcurrentQueue<string> _keyQueue;
private static void ParallelGennerateMD5Keys()
{
_keyQueue = new ConcurrentQueue<string>();
Parallel.ForEach(Partitioner.Create(, NUM_AES_KEYS + ), range =>
{
var md5M = MD5.Create();
for (int i = range.Item1; i < range.Item2; i++)
{
byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
byte[] result = md5M.ComputeHash(data);
string hexString = ConverToHexString(result);
_keyQueue.Enqueue(hexString);
}
});
}
结果如下:
可以看见,它的使用很简单,不用担心同步问题。接下我们通过生产者-消费者模式,对上面的问题进行改造,分解成两个任务。使用两个共享的ConcurrentQueue实例。_byteArraysQueue 和 _keyQueue ,ParallelGennerateMD5Keys 方法生产byte[],ConverKeysToHex方法去消费并产生key。
private static ConcurrentQueue<string> _keyQueue;
private static ConcurrentQueue<byte[]> _byteArraysQueue;
private static void ParallelGennerateMD5Keys(int maxDegree)
{
var parallelOptions = new ParallelOptions{MaxDegreeOfParallelism = maxDegree};
var sw = Stopwatch.StartNew();
_keyQueue = new ConcurrentQueue<string>();
Parallel.ForEach(Partitioner.Create(, NUM_AES_KEYS + ),parallelOptions, range =>
{
var md5M = MD5.Create();
for (int i = range.Item1; i < range.Item2; i++)
{
byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
byte[] result = md5M.ComputeHash(data);
_byteArraysQueue.Enqueue(result);
}
});
Console.WriteLine("MD5结束时间:" + sw.Elapsed);
} private static void ConverKeysToHex(Task taskProducer)
{
var sw = Stopwatch.StartNew();
while (taskProducer.Status == TaskStatus.Running || taskProducer.Status == TaskStatus.WaitingToRun || _byteArraysQueue.Count > )
{
byte[] result;
if (_byteArraysQueue.TryDequeue(out result))
{
string hexString = ConverToHexString(result);
_keyQueue.Enqueue(hexString);
}
}
Console.WriteLine("key结束时间:" + sw.Elapsed);
}
这次我修改了执行次数为180000
private static int NUM_AES_KEYS =180000;
static void Main(string[] args)
{
Console.WriteLine("任务开始...");
var sw = Stopwatch.StartNew();
_byteArraysQueue=new ConcurrentQueue<byte[]>();
_keyQueue=new ConcurrentQueue<string>(); //生产key 和 消费key的两个任务
var taskKeys = Task.Factory.StartNew(()=>ParallelGennerateMD5Keys(Environment.ProcessorCount - 1));
var taskHexString = Task.Factory.StartNew(()=>ConverKeysToHex(taskKeys)); string lastKey;
//隔半秒去看一次。
while (taskHexString.Status == TaskStatus.Running || taskHexString.Status == TaskStatus.WaitingToRun)
{
Console.WriteLine("_keyqueue的个数是{0},_byteArraysQueue的个数是{1}", _keyQueue.Count,_byteArraysQueue.Count);
if (_keyQueue.TryPeek(out lastKey))
{
// Console.WriteLine("第一个Key是{0}",lastKey);
}
Thread.Sleep();
}
//等待两个任务结束
Task.WaitAll(taskKeys, taskHexString); Console.WriteLine("结束时间:" + sw.Elapsed);
Console.WriteLine("key的总数是{0}" , _keyQueue.Count);
Console.ReadKey();
}
从结果可以发现,_bytaArraysQueue里面的byte[] 几乎是生产一个,就被消费一个。
理解生产者和消费者
使用ConcurrentQueue可以很容易的实现并行的生产者-消费者模式或多阶段的线性流水线。如下:
我们可以改造上面的main方法,让一半的线程用于生产,一半的线程用于消费。
static void Main(string[] args)
{
Console.WriteLine("任务开始...");
var sw = Stopwatch.StartNew();
_byteArraysQueue=new ConcurrentQueue<byte[]>();
_keyQueue=new ConcurrentQueue<string>(); var taskKeyMax = Environment.ProcessorCount/; var taskKeys = Task.Factory.StartNew(() => ParallelGennerateMD5Keys(taskKeyMax)); var taskHexMax = Environment.ProcessorCount - taskKeyMax;
var taskHexStrings=new Task[taskHexMax];
for (int i = ; i < taskHexMax; i++)
{
taskHexStrings[i] = Task.Factory.StartNew(() => ConverKeysToHex(taskKeys));
} Task.WaitAll(taskHexStrings); Console.WriteLine("结束时间:" + sw.Elapsed);
Console.WriteLine("key的总数是{0}" , _keyQueue.Count);
Console.ReadKey();
}
而这些消费者的结果又可以继续作为生产者,继续串联下去。
ConcurrentStack
一个LIFO(后进先出)的集合,支持多任务并发进行压入和弹出操作。它是完全无锁的。是System.Collections.Stack的并发版本。
它和ConcurrentQueue非常相似,区别在于使用了不同的方法名,更好的表示一个栈。ConcurrentStack主要提供了下面五个重要方法。
- Push:将元素添加到栈顶。
- TryPop:尝试删除栈顶部的元素,并通过out返回。返回值为bool,表示操作是否成功。
- TryPeek:尝试通过out返回栈顶部的元素,返回值为bool,表示是否成功。
- PushRange:一次将多个元素插入栈顶。
- TryPopRange:一次将多个元素从栈顶移除。
为了判断栈是否包含任意项,可以使用IsEmpty属性判断。
if(!_byteArraysStack.IsEmpty)
而使用Count方法,开销相对较大。另外我们可以将不安全的集合或数组转化为并发集合。下例将数组作为参数传入。操作上和List一样。
private static string[] _HexValues = {"AF", "BD", "CF", "DF", "DA", "FE", "FF", "FA"};
static void Main(string[] args)
{
var invalidHexStack = new ConcurrentStack<string>(_HexValues); while (!invalidHexStack.IsEmpty)
{
string value;
invalidHexStack.TryPop(out value);
Console.WriteLine(value);
}
}
反之,可以用CopyTo和ToArray方法将并发集合创建一个不安全集合。
ConcurrentBag
一个无序对象集合,在同一个线程添加元素(生产)和删除元素(消费)的场合下效率特别高,ConcurrentBag最大程度上减少了同步的需求以及同步带来的开销。然而它在生产线程和消费线程完全分开的情况下,效率低下。
它提供了3个重要方法
- Add--添加元素到无序组
- TryTake--尝试从无序组中删除一个元素,out返回。返回值bool 表示操作是否成功。
- TryPeek--尝试通过out返回一个参数。返回值bool 表示操作是否成功。
下面的实例中Main方法通过Parallel.Invoke并发的加载三个方法。有多个生产者和消费者。对应三个ConcurrentBag<string>:_sentencesBag,_capWrodsInSentenceBag和_finalSentencesBag。
- ProduceSentences 随机生产句子 (消费者)
- CapitalizeWordsInSentence 改造句子 (消费者/生产者)
- RemoveLettersInSentence 删除句子 (消费者)
static void Main(string[] args)
{
Console.WriteLine("任务开始...");
var sw = Stopwatch.StartNew(); _sentencesBag=new ConcurrentBag<string>();
_capWrodsInSentenceBag=new ConcurrentBag<string>();
_finalSentencesBag=new ConcurrentBag<string>(); _producingSentences = true; Parallel.Invoke(ProduceSentences,CapitalizeWordsInSentence,RemoveLettersInSentence); Console.WriteLine("_sentencesBag的总数是{0}", _sentencesBag.Count);
Console.WriteLine("_capWrodsInSentenceBag的总数是{0}", _capWrodsInSentenceBag.Count);
Console.WriteLine("_finalSentencesBag的总数是{0}", _finalSentencesBag.Count);
Console.WriteLine("总时间:{0}",sw.Elapsed);
Console.ReadKey();
}
private static ConcurrentBag<string> _sentencesBag;
private static ConcurrentBag<string> _capWrodsInSentenceBag;
private static ConcurrentBag<string> _finalSentencesBag; private static volatile bool _producingSentences = false;
private static volatile bool _capitalWords = false; private static void ProduceSentences()
{
string[] rawSentences =
{
"并发集合你可知",
"ConcurrentBag 你值得拥有",
"stoneniqiu",
"博客园",
".Net并发编程学习",
"Reading for you",
"ConcurrentBag 是个无序集合"
};
try
{
Console.WriteLine("ProduceSentences...");
_sentencesBag = new ConcurrentBag<string>();
var random = new Random();
for (int i = ; i < NUM_AES_KEYS; i++)
{
var sb = new StringBuilder();
sb.Append(rawSentences[random.Next(rawSentences.Length)]);
sb.Append(' ');
_sentencesBag.Add(sb.ToString());
} }
finally
{
_producingSentences = false;
}
} private static void CapitalizeWordsInSentence()
{
SpinWait.SpinUntil(() => _producingSentences); try
{
Console.WriteLine("CapitalizeWordsInSentence...");
_capitalWords = true;
while ((!_sentencesBag.IsEmpty)||_producingSentences)
{
string sentence;
if (_sentencesBag.TryTake(out sentence))
{
_capWrodsInSentenceBag.Add(sentence.ToUpper()+"stoneniqiu");
}
}
}
finally
{
_capitalWords = false;
}
} private static void RemoveLettersInSentence()
{
SpinWait.SpinUntil(() => _capitalWords);
Console.WriteLine("RemoveLettersInSentence...");
while (!_capWrodsInSentenceBag.IsEmpty || _capitalWords)
{
string sentence;
if (_capWrodsInSentenceBag.TryTake(out sentence))
{
_finalSentencesBag.Add(sentence.Replace("stonenqiu",""));
}
}
}
在CapitalizeWordsInSentence 方法中,使用SpinUntil方法并传入共享bool变量_producingSentences,当其为true的时候,SpinUnit方法会停止自旋。但协调多个生产者和消费者自旋并非最好的解决方案,我们可以使用BlockingCollection(下面会讲)来提升性能。
SpinWait.SpinUntil(() => _producingSentences);
另外两个用作标志的共享bool变量在声明的时候使用了volatile关键字。这样可以确保在不同的线程中进行访问的时候,可以得到这些变量的最新值。
private static volatile bool _producingSentences = false;
private static volatile bool _capitalWords = false;
BlockingCollection
与经典的阻塞队列数据结构类似,适用于多个任务添加和删除数据的生产者-消费者的情形。提供了阻塞和界限的能力。
BlockingCollection是对IProducerConsumerCollection<T>实例的一个包装。而这个接口继承于ICollection,IEnumerable<T>。前面的并发集合都继承了这个接口。因此这些集合都可以封装在BlockingCollection中。
将上面的例子换成BlockingCollection
static void Main(string[] args)
{
Console.WriteLine("任务开始...");
var sw = Stopwatch.StartNew(); _sentencesBC = new BlockingCollection<string>(NUM_SENTENCE);
_capWrodsInSentenceBC = new BlockingCollection<string>(NUM_SENTENCE);
_finalSentencesBC = new BlockingCollection<string>(NUM_SENTENCE); Parallel.Invoke(ProduceSentences,CapitalizeWordsInSentence,RemoveLettersInSentence); Console.WriteLine("_sentencesBag的总数是{0}", _sentencesBC.Count);
Console.WriteLine("_capWrodsInSentenceBag的总数是{0}", _capWrodsInSentenceBC.Count);
Console.WriteLine("_finalSentencesBag的总数是{0}", _finalSentencesBC.Count);
Console.WriteLine("总时间:{0}",sw.Elapsed);
Console.ReadKey();
} private static int NUM_SENTENCE = ;
private static BlockingCollection<string> _sentencesBC;
private static BlockingCollection<string> _capWrodsInSentenceBC;
private static BlockingCollection<string> _finalSentencesBC; private static volatile bool _producingSentences = false;
private static volatile bool _capitalWords = false; private static void ProduceSentences()
{
string[] rawSentences =
{
"并发集合你可知",
"ConcurrentBag 你值得拥有",
"stoneniqiu",
"博客园",
".Net并发编程学习",
"Reading for you",
"ConcurrentBag 是个无序集合"
}; Console.WriteLine("ProduceSentences...");
_sentencesBC = new BlockingCollection<string>();
var random = new Random();
for (int i = ; i < NUM_SENTENCE; i++)
{
var sb = new StringBuilder();
sb.Append(rawSentences[random.Next(rawSentences.Length)]);
sb.Append(' ');
_sentencesBC.Add(sb.ToString());
}
//让消费者知道,生产过程已经完成
_sentencesBC.CompleteAdding(); } private static void CapitalizeWordsInSentence()
{
Console.WriteLine("CapitalizeWordsInSentence...");
//生产者是否完成
while (!_sentencesBC.IsCompleted)
{
string sentence;
if (_sentencesBC.TryTake(out sentence))
{
_capWrodsInSentenceBC.Add(sentence.ToUpper() + "stoneniqiu");
}
}
//让消费者知道,生产过程已经完成
_capWrodsInSentenceBC.CompleteAdding();
} private static void RemoveLettersInSentence()
{
//SpinWait.SpinUntil(() => _capitalWords);
Console.WriteLine("RemoveLettersInSentence...");
while (!_capWrodsInSentenceBC.IsCompleted)
{
string sentence;
if (_capWrodsInSentenceBC.TryTake(out sentence))
{
_finalSentencesBC.Add(sentence.Replace("stonenqiu",""));
}
}
}
无需再使用共享的bool变量来同步。在操作结束后,调用CompeteAdding方法来告之下游的消费者。这个时候IsAddingComplete属性为true。
_sentencesBC.CompleteAdding();
而在生产者中也无需使用自旋了。可以判断IsCompleted属性。而当IsAddingComplete属性为true且集合为空的时候,IsCompleted才为true。这个时候就表示,生产者的元素已经被使用完了。这样代码也更简洁了。
while (!_sentencesBC.IsCompleted)
最后的结果要比使用ConcurrentBag快了0.8秒。一共是200w条数据,处理三次。
ConcurrentDictionary
与经典字典类似,提供了并发的键值访问。它对读操作是完全无锁的,在添加和修改的时候使用了细粒度的锁。是IDictionary的并发版本。
它提供最重要方法如下:
- AddOrUpdate--如果键不存在就添加一个键值对。如果键已经存在,就更新键值对。可以使用函数来生成或者更新键值对。需要在委托内添加同步代码来确保线程安全。
- GetEnumerator--返回遍历整个ConcurrentDictionary的枚举器,而且是线程安全的。
- GetOrAdd--如果键不存在就添加一个新键值对,如果存在就返回这个键现在的值,而不添加新值。
- TryAdd
- TryOrGetVaule
- TryRemove
- TryUpdate
下面的例子创建一个ConcurrentDictionary,然后不断的更新。lock关键字确保一次只有一个线程运行Update方法。
static void Main(string[] args)
{
Console.WriteLine("任务开始...");
var sw = Stopwatch.StartNew(); rectangInfoDic=new ConcurrentDictionary<string, RectangInfo>();
GenerateRectangles();
foreach (var keyValue in rectangInfoDic)
{
Console.WriteLine("{0},{1},更新次数{2}",keyValue.Key,keyValue.Value.Size,keyValue.Value.UpdateTimes);
} Console.WriteLine("总时间:{0}",sw.Elapsed);
Console.ReadKey();
} private static ConcurrentDictionary<string, RectangInfo> rectangInfoDic;
private const int MAX_RECTANGLES = ;
private static void GenerateRectangles()
{
Parallel.For(, MAX_RECTANGLES + , (i) =>
{
for (int j = ; j < ; j++)
{
var newkey = string.Format("Rectangle{0}", i%);
var rect = new RectangInfo(newkey, i, j);
rectangInfoDic.AddOrUpdate(newkey, rect, (key, existRect) =>
{
if (existRect != rect)
{
lock (existRect)
{
existRect.Update(rect.X,rect.Y);
}
return existRect;
}
return existRect;
});
} }); }
Rectangle:
public class RectangInfo:IEqualityComparer<RectangInfo>
{
public string Name { get; set; }
public int X { get; set; }
public int Y { get; set; }
public int UpdateTimes { get; set; } public int Size
{
get { return X*Y; }
} public DateTime LastUpdate { get; set; } public RectangInfo(string name,int x,int y)
{
Name = name;
X = x;
Y = y;
LastUpdate = DateTime.Now;
} public RectangInfo(string key) : this(key, , )
{
} public void Update(int x,int y)
{
X = x;
Y = y;
UpdateTimes++;
} public bool Equals(RectangInfo x, RectangInfo y)
{
return (x.Name == y.Name && x.Size == y.Size);
} public int GetHashCode(RectangInfo obj)
{
return obj.Name.GetHashCode();
}
}
本章学习了五种并发集合,熟悉了生产者-消费者的并发模型,我们可以使用并发集合来设计并优化流水线。希望本文对你有帮助。
阅读书籍:《C#并行编程高级教程》 。
【读书笔记】.Net并行编程(三)---并行集合的更多相关文章
-
.Net多线程 并行编程(三)---并行集合
为了让共享的数组,集合能够被多线程更新,我们现在(.net4.0之后)可以使用并发集合来实现这个功能. 而System.Collections和System.Collections.Generic命名 ...
-
.NET 并行编程&mdash;&mdash;数据并行
本文内容 并行编程 数据并行 环境 计算 PI 矩阵相乘 把目录中的全部图片复制到另一个目录 列出指定目录中的所有文件,包括其子目录 最近,对多线程编程,并行编程,异步编程,这三个概念有点晕了,之前我 ...
-
一、并行编程 - 数据并行 System.Threading.Tasks.Parallel 类
一.并行概念 1.并行编程 在.NET 4中的并行编程是依赖Task Parallel Library(后面简称为TPL) 实现的.在TPL中,最基本的执行单元是task(中文可以理解为"任 ...
-
.NET 并行编程&mdash;&mdash;任务并行
本文内容 并行编程 任务并行 隐式创建和运行任务 显式创建和运行任务 任务 ID 任务创建选项 创建任务延续 创建分离的子任务 创建子任务 等待任务完成 组合任务 任务中的异常处理 取消任务 Task ...
-
【转载】MDX Step by Step 读书笔记(四) - Working with Sets (使用集合)
1. Set - 元组的集合,在 Set 中的元组用逗号分开,Set 以花括号括起来,例如: { ([Product].[Category].[Accessories]), ([Product].[ ...
-
Java并发编程的艺术读书笔记(2)-并发编程模型
title: Java并发编程的艺术读书笔记(2)-并发编程模型 date: 2017-05-05 23:37:20 tags: ['多线程','并发'] categories: 读书笔记 --- 1 ...
-
Java并发编程的艺术读书笔记(1)-并发编程的挑战
title: Java并发编程的艺术读书笔记(1)-并发编程的挑战 date: 2017-05-03 23:28:45 tags: ['多线程','并发'] categories: 读书笔记 --- ...
-
《Essential C++》读书笔记 之 C++编程基础
<Essential C++>读书笔记 之 C++编程基础 2014-07-03 1.1 如何撰写C++程序 头文件 命名空间 1.2 对象的定义与初始化 1.3 撰写表达式 运算符的优先 ...
-
C# 并行编程 之 并发集合 (.Net Framework 4.0)(转)
转载地址:http://blog.csdn.net/wangzhiyu1980/article/details/45497907 此文为个人学习<C#并行编程高级教程>的笔记,总结并调试了 ...
随机推荐
-
正则表达式 match 和 exec 比较
match 和 exec 主要有两点不同: 1.exec是正则表达式的方法,而不是字符串的方法,它的参数才是字符串,如下所示: var re=new RegExp(/\d/); re.exec( &q ...
-
使用 Entity Framework Core 时,通过代码自动 Migration
一 介绍 在使用 Entity Framework Core (下面就叫 EF Core 吧)进行开发时,如果模型有变动,我们要在用 EF Core 提供的命令行工具进行手工迁移,然后再运行程序.但是 ...
-
numpy下的flatten()函数用法
flatten是numpy.ndarray.flatten的一个函数,其官方文档是这样描述的: ndarray.flatten(order='C') Return a copy of the arra ...
-
JQ调用后台方法
首先,先在页面上创建一个asp按钮,添加点击事件,把要在前台调用的后台方法写在这个按钮的点击事件中: <span style="display:none;"><a ...
-
【Android Demo】通过WebService获取今日天气情况--转
因为本身是在搞.NET方面的东东,现在在学习Android,所以想实现Android通过WebService接口来获取数据,网上很多例子还有有问题的.参考:Android 通过WebService进行 ...
-
30多个Android 开发者工具 带你开发带你飞
文中部分工具是收费的,但是绝大多数都是免费的. FlowUp 这是一个帮助你跟踪app整体性能的工具,深入分析关键的性能数据如FPS, 内存, CPU, 磁盘, 等等.FlowUp根据用户数量收费. ...
-
一起来学Go --- (go的变量)
变量 变量是几乎所有编程语言中最基本的组成元素,从根本上说,变量相当于是一块数据存储空间的命名,程序可以通过定义一个变量来申请一块数据存储空间,之后可以通过引用变量名来使用这块存储空间.go语言中的变 ...
-
cocos2d-js(一)引擎的工作原理和文件的调用顺序
Cocos2d-js可以实现在网页上运行高性能的2D游戏,实现原理是通过HTML5的canvas标签,该引擎采用Javascript编写,并且有自己的一些语法,因为没有成熟的IDE,一般建立工程是通过 ...
-
DEV中右键菜单如何只在非空单元格上显示?
问题: 1. 开发时,我的winform程序中有很多gridview,我希望右键菜单只在我点击非空的行时才显示,点击其他空白区域时不显示: 2. 有一个树状导航图,treelist 中的节点都有右键菜 ...
-
sqlserver--install/uninstall
2017 express版本 安装: https://jingyan.baidu.com/article/76a7e409077997fc3a6e1559.html https://www.cnblo ...