引言
随着CPU多核的普及,编程时充分利用这个特性越显重要。首先用传统的嵌套循环进行数组填充,然后用.NET 4.0中的提供的来并行地进行填充,最后对比他们的性能。本文将深入分析并借机回答上篇9楼提出的问题,而System.Threading.Tasks分析,这个将推迟到.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(三)中介绍。内容如下:
- 1、Parallel Class
- 1.1、For方法
- 1.2、ForEach方法
- 1.3、Invoke方法
- 2、并发控制疑问?
- 2.1、使用Lock锁
- 2.2、使用PLINQ——用AsParallel
- 2.3、使用PLINQ——用ParallelEnumerable
- 2.4、使用Interlocked操作
- 2.5、使用Parallel.For的有Thread-Local变量重载函数
- 性能比较
1、Parallel Class
Parallel——这个类提供对通常操作(诸如for、foreach、执行语句块)基于库的数据并行替换。它只是命名空间的一个类,该命名空间中还包括很多其他的类。下面举个例子来说明如何使用Parallel.For(来自MSDN):
01 | using System.Threading.Tasks; |
02 | class Test |
03 | { |
04 | static int N = 1000; |
05 |
06 | static void TestMethod() |
07 | { |
08 | // Using a named method. |
09 | Parallel.For(0, N, Method2); |
10 |
11 | // Using an anonymous method. |
12 | Parallel.For(0, N, delegate ( int i) |
13 | { |
14 | // Do Work. |
15 | }); |
16 |
17 | // Using a lambda expression. |
18 | Parallel.For(0, N, i => |
19 | { |
20 | // Do Work. |
21 | }); |
22 | } |
23 |
24 | static void Method2( int i) |
25 | { |
26 | // Do work. |
27 | } |
28 | } |
上面这个例子简单易懂,上篇我们就是用的Parallel.For,这里就不解释了。其实Parallel类的方法主要分为下面三类:
- For方法
- ForEach方法
- Invoke方法
1.1、For方法
在里面执行的for循环可能并行地运行,它有12个重载。这12个重载中Int32参数和Int64参数的方法各为6个,下面以Int32为例列出:
- ,该方法对区间(fromInclusive,toExclusive)之间的迭代调用body表示的委托。body委托有一个迭代数次的int32参数,如果fromInclusive>=toExclusive,则不会执行任何迭代。
- ,该方法对区间(fromInclusive, toExclusive)之间的迭代调用body表示的委托。body委托有两个参数——表示迭代数次的int32参数、一个可用于过早地跳出循环的实例。如果fromInclusive>=toExclusive,则不会执行任何迭代。 调用通知For操作当前迭代之后的迭代不需要执行。然而,在此之前的迭代如果没有完成仍然需要执行。因此,调用类似于调用break跳出传统的for循环,不是break的原因是它不保证当前迭代之后的迭代绝对不会执行。 如果在当前迭代之前的迭代不必要执行,应该调用而不是。调用通知For循环放弃剩下的迭代,不管它是否在当前迭代之前或之后,因为所以要求的工作已经完成。然而,并不能保证这个。
- ,跟第一个方法类似,但它的区间是[fromInclusive, toExclusive)。
- ,跟第二个方法类似,单的区间是[fromInclusive, toExclusive)。
- ,它的迭代区间是[fromInclusive, toExclusive)。另外body有两个local状态变量用于同一线程的迭代之间共享。localInit委托将在每个线程参与循环执行时调用,并返回这些线程初始的local状态。这些初始状态被传递给body,当它在每个线程上第一次调用时。然后,接下来body调用返回一个可能的修改状态值且传递给下一次body调用。最终,最后一次在每个线程上的body调用返回的一个状态值传递给localFinally委托。每个线程执行在自己的loacl 状态上执行最后一个动作时,localFinally委托将被调用。这个委托可能在多个线程上并发执行,因此,你必须同步访问任何共享变量。
- ,跟上面的方法类似。
下面代码演示了方法(来自MSDN):
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | // Demonstrated features: |
13 | // CancellationTokenSource |
14 | // Parallel.For() |
15 | // ParallelOptions |
16 | // ParallelLoopResult |
17 | // Expected results: |
18 | // An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed. |
19 | // The order of execution of the iterations is undefined. |
20 | // The iteration when i=2 cancels the loop. |
21 | // Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order, |
22 | // it is impossible to say which will start/complete and which won't. |
23 | // At the end, an OperationCancelledException is surfaced. |
24 | // Documentation: |
25 | // (VS.100).aspx |
26 |
27 | static void Main( string [] args) |
28 | { |
29 | CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
30 | ParallelOptions options = new ParallelOptions(); |
31 | options.CancellationToken = cancellationSource.Token; |
32 | try |
33 | { |
34 | ParallelLoopResult loopResult = Parallel.For( |
35 | 0, |
36 | 10, |
37 | options, |
38 | (i, loopState) => |
39 | { |
40 | Console.WriteLine( "Start Thread={0}, i={1}" , Thread.CurrentThread.ManagedThreadId, i); |
41 |
42 | // Simulate a cancellation of the loop when i=2 |
43 | if (i == 2) |
44 | { |
45 | cancellationSource.Cancel(); |
46 | } |
47 |
48 | // Simulates a long execution |
49 | for ( int j = 0; j < 10; j++) |
50 | { |
51 | Thread.Sleep(1 * 200); |
52 |
53 | // check to see whether or not to continue |
54 | if (loopState.ShouldExitCurrentIteration) return ; |
55 | } |
56 |
57 | Console.WriteLine( "Finish Thread={0}, i={1}" , Thread.CurrentThread.ManagedThreadId, i); |
58 | } |
59 | ); |
60 | if (loopResult.IsCompleted) |
61 | { |
62 | Console.WriteLine( "All iterations completed successfully. THIS WAS NOT EXPECTED." ); |
63 | } |
64 | } |
65 | // No exception is expected in this example, but if one is still thrown from a task, |
66 | // it will be wrapped in AggregateException and propagated to the main thread. |
67 | catch (AggregateException e) |
68 | { |
69 | Console.WriteLine( "Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}" , e); |
70 | } |
71 | // Catching the cancellation exception |
72 | catch (OperationCanceledException e) |
73 | { |
74 | Console.WriteLine( "An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}" , e.ToString()); |
75 | } |
76 | } |
77 | } |
78 | } |
1.2、ForEach方法
在迭代中执行的foreach操作可能并行地执行,它有20个重载。这个方法太多,但用法大概跟For方法差不多,请自行参考。
1.3、Invoke方法
提供的每个动作可能并行地执行,它有2个重载。
- :actions是一个要执行的动作数组,这些动作可能并行地执行,但并不保证执行的顺序及一定并行执行。这个方法直到提供的所有操作完成时才返回,不管是否正常地完成或异常终止。
- :跟上面的方法类似,只是增加了一个parallelOptions参数,可以用户调用者取消整个操作。
例如下面代码执行了三个操作(来自MSDN):
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main() |
13 | { |
14 | try |
15 | { |
16 | Parallel.Invoke( |
17 | BasicAction, // Param #0 - static method |
18 | () => // Param #1 - lambda expression |
19 | { |
20 | Console.WriteLine( "Method=beta, Thread={0}" , Thread.CurrentThread.ManagedThreadId); |
21 | }, |
22 | delegate () // Param #2 - in-line delegate |
23 | { |
24 | Console.WriteLine( "Method=gamma, Thread={0}" , Thread.CurrentThread.ManagedThreadId); |
25 | } |
26 | ); |
27 | } |
28 | // No exception is expected in this example, but if one is still thrown from a task, |
29 | // it will be wrapped in AggregateException and propagated to the main thread. |
30 | catch (AggregateException e) |
31 | { |
32 | Console.WriteLine( "An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}" , e.InnerException.ToString()); |
33 | } |
34 | } |
35 |
36 | static void BasicAction() |
37 | { |
38 | Console.WriteLine( "Method=alpha, Thread={0}" , Thread.CurrentThread.ManagedThreadId); |
39 | } |
40 | } |
41 | } |
2、并发控制疑问?
有人提出以下疑问:“如果For里面的东西,对于顺序敏感的话,会不会有问题。并行处理的话,说到底应该是多线程。如果需要Lock住什么东西的话,应该怎么做呢?例如这个例子不是对数组填充,是对文件操作呢?对某个资源操作呢?”
关于对顺序敏感的话,也就是说该如何加锁来控制?下面我举个例子来说明:对1~1000求和。如果我们想那样简单地用Parallel.For,将会产生错误的结果,代码如下:
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main( string [] args) |
13 | { |
14 | int loops=0; |
15 | while (loops <= 100) |
16 | { |
17 | long sum = 0; |
18 | Parallel.For(1, 1001, delegate ( long i) |
19 | { |
20 | sum += i; |
21 | }); |
22 | System.Console.WriteLine(sum); |
23 | loops++; |
24 | } |
25 | } |
26 | } |
27 | } |
在上述代码中,为了校验正确性我进行了重复做了100次,得出如下结果:
图1、100次的前面部分结果
我们知道500500才是正确的答案,这说明Parallel.For不能保证对sum正确的并发执行,对此我们应该加上适当的控制,并借机来回答上面提出的如何加锁的问题。下面有几种方案可以解决这个问题:
2.1、使用Lock锁
这个我就不多解释了,直接上代码:
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main( string [] args) |
13 | { |
14 | int loops = 0; |
15 | object moniter = new object (); |
16 | while (loops <= 100) |
17 | { |
18 | long sum = 0; |
19 | Parallel.For(1, 1001, delegate ( long i) |
20 | { |
21 | lock (moniter) { sum += i; } |
22 | }); |
23 | System.Console.WriteLine(sum); |
24 | loops++; |
25 | } |
26 | } |
27 | } |
28 | } |
我们加上lock锁之后就会得出正确的结果。
2.2、使用PLINQ——用AsParallel
关于PLINQ,以后将会介绍到,这里不会详细介绍,感兴趣的自行查阅资料。代码如下:
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main( string [] args) |
13 | { |
14 | int loops = 0; |
15 | while (loops <= 100) |
16 | { |
17 | long sum = 0; |
18 | sum = Enumerable.Range(0, 1001).AsParallel().Sum(); |
19 | System.Console.WriteLine(sum); |
20 | loops++; |
21 | } |
22 | } |
23 | } |
24 | } |
运行可以得到正确的结果。
2.3、使用PLINQ——用ParallelEnumerable
这个也不多说,直接上代码,因为关于PLINQ将在以后详细介绍,感兴趣的自行查阅资料。
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main( string [] args) |
13 | { |
14 | int loops = 0; |
15 | while (loops <= 100) |
16 | { |
17 | long sum = 0; |
18 | sum = ParallelEnumerable.Range(0, 1001).Sum(); |
19 | System.Console.WriteLine(sum); |
20 | loops++; |
21 | } |
22 | } |
23 | } |
24 | } |
运行同样可以得到正确结果。
2.4、使用Interlocked操作
代码如下:
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main( string [] args) |
13 | { |
14 | int loops = 0; |
15 | while (loops <= 100) |
16 | { |
17 | long sum = 0; |
18 | Parallel.For(1, 1001, delegate ( long i) |
19 | { |
20 | Interlocked.Add( ref sum, i); |
21 | }); |
22 | System.Console.WriteLine(sum); |
23 | loops++; |
24 | } |
25 |
26 | } |
27 | } |
28 | } |
运行可以得到正确结果。
2.5、使用Parallel.For的有Thread-Local变量重载函数
这个方法已经在1.2中介绍,这里直接上代码,代码如下:
01 | using System; |
02 | using System.Collections.Generic; |
03 | using System.Linq; |
04 | using System.Text; |
05 | using System.Threading; |
06 | using System.Threading.Tasks; |
07 |
08 | namespace ConsoleApplication2 |
09 | { |
10 | class Program |
11 | { |
12 | static void Main( string [] args) |
13 | { |
14 | int loops = 0; |
15 | while (loops <= 100) |
16 | { |
17 | int sum = 0; |
18 | Parallel.For(0, 1001, () => 0, (i, state,subtotal) => |
19 | { |
20 | subtotal += i; |
21 | return subtotal; |
22 | }, |
23 | partial => Interlocked.Add( ref sum, partial)); |
24 |
25 | System.Console.WriteLine(sum); |
26 | loops++; |
27 | } |
28 |
29 | } |
30 | } |
31 | } |
运行可得正确结果。
3、性能比较
上面的解决方案那个比较好呢?请大家各抒己见!关于这个我已经测试了一下。
PS:感觉写这篇的时候很累,思绪也很乱,不知大家对这篇还满意(⊙_⊙)?有什么地方需要改进,或者说不易理解,或者哪个地方错了!