如何在 PowerShell 的 ForEach-Parallel 循环中添加原子计数器
如何在 PowerShell 的 ForEach-Parallel 循环中添加原子计数器
在这个问题中,解释了如何向一个并发的ThreadSafe集合Powershell:如何将结果添加到数组(ForEach-Object -Parallel)中。
我有一个更简单的用例,我只想增加一个单一的值(整数)。
在Powershell中是否可以使用某种原子整数数据类型来实现?
$myAtomicCounter = 0 $myItems | ForEach-Object -Parallel { #...其他工作 $myAtomicCounter.ThreadSafeAdd(2) #...使用计数器的一些其他工作 } Write-Host($myAtomicCounter)
在PowerShell中,当从多个线程更新单个值时,必须使用锁定机制,例如Mutex
、SemaphoreSlim
甚至Monitor.Enter
,否则更新操作将不是线程安全的。同步哈希表不能确保更新键值的线程安全。
下面是一个简单的演示,证明了上述说法:
$sync = [hashtable]::Synchronized(@{ }) $attempts = 0 do { $sync['Value'] = 0 $attempts++ 0..10 | ForEach-Object -Parallel { $sync = $using:sync Start-Sleep -Milliseconds 200 $sync['Value']++ } -ThrottleLimit 11 } while ($sync['Value'] -eq 11) "It took $attempts attempts to fail..."
假设我们有一个数组的数组:
$toProcess = 0..10 | ForEach-Object { , (Get-Random -Count (Get-Random -Minimum 5 -Maximum 10)) }
如果您想跟踪每个数组中的处理项,可以使用Mutex
来完成:
$processedItems = [hashtable]::Synchronized(@{ Lock = [System.Threading.Mutex]::new() Counter = 0 }) $toProcess | ForEach-Object -Parallel { Start-Sleep (Get-Random -Maximum 5) $ref = $using:processedItems if($ref['Lock'].WaitOne()) { $ref['Counter'] += $_.Count $ref['Lock'].ReleaseMutex() } } $processedCount = ($toProcess | Write-Output | Measure-Object).Count $processedItems['Counter'] -eq $processedCount
另一个线程安全递增计数器的示例使用Monitor.Enter
和一个尝试类似于C# lock
语句的自定义函数:
function lock { param( [Parameter(Mandatory)] [object] $Object, [Parameter(Mandatory)] [scriptblock] $ScriptBlock ) try { [System.Threading.Monitor]::Enter($Object) & $ScriptBlock } finally { [System.Threading.Monitor]::Exit($Object) } } $utils = [hashtable]::Synchronized(@{ LockFunc = $function:lock.ToString() Counter = @(0) }) $toProcess | ForEach-Object -Parallel { $utils = $using:utils $function:lock = $utils['LockFunc'] Start-Sleep (Get-Random -Maximum 5) lock($utils['Counter'].SyncRoot) { $utils['Counter'][0] += $_.Count } } $processedCount = ($toProcess | Write-Output | Measure-Object).Count $utils['Counter'][0] -eq $processedCount
在PowerShell中,更简单的方法是将并行循环的输出转到线性循环中,在该线性循环中可以安全地更新计数器,而无需关心线程安全性:
$counter = 0 $toProcess | ForEach-Object -Parallel { Start-Sleep (Get-Random -Maximum 5) $_ } | ForEach-Object { $counter += $_.Count } $processedCount = ($toProcess | Write-Output | Measure-Object).Count $counter -eq $processedCount
至于C#的建议,使用Interlocked.Increment
作为锁定机制。这在PowerShell中并不总是可靠的:
$i = [ref] 0 0..100 | ForEach-Object -Parallel { $i = $using:i Start-Sleep (Get-Random -Maximum 4) $null = [System.Threading.Interlocked]::Increment($i) }
如果您想使用ConcurrentDictionary
类,仍然需要使用锁。唯一变化的是您如何定义[...ConcurrentDictionary[string, object]]::new()
而不是[hashtable]::Synchronized(@{...
。
请注意,ConcurrentDictionary
提供了在使用类的特定方法时为您提供锁定的功能,但是对于单个键的递增,仍然需要锁定。
在AddOrUpdate
方法中,如果同时在不同的线程上调用AddOrUpdate
,可能会多次调用addValueFactory
,但是它的键/值对可能不会被添加到字典中的每个调用中。
为了递增单个值,您需要预先知道该值,并将+ 1
添加到它。使用锁的原因很简单,2个或多个线程可以同时尝试读取该值,然后同时尝试更新该值,从而创建竞争条件。锁定确保在给定的时间段内只有1个线程可以读取和更新该值。
希望对您有所帮助!