如何在 PowerShell 的 ForEach-Parallel 循环中添加原子计数器

4 浏览
0 Comments

如何在 PowerShell 的 ForEach-Parallel 循环中添加原子计数器

在这个问题中,解释了如何向一个并发的ThreadSafe集合Powershell:如何将结果添加到数组(ForEach-Object -Parallel)中。

我有一个更简单的用例,我只想增加一个单一的值(整数)。

在Powershell中是否可以使用某种原子整数数据类型来实现?

$myAtomicCounter = 0
$myItems | ForEach-Object -Parallel {
    #...其他工作
    $myAtomicCounter.ThreadSafeAdd(2)
    #...使用计数器的一些其他工作
}
Write-Host($myAtomicCounter)

0
0 Comments

在PowerShell中,当从多个线程更新单个值时,必须使用锁定机制,例如MutexSemaphoreSlim甚至Monitor.Enter,否则更新操作将不是线程安全的。同步哈希表不能确保更新键值的线程安全。

下面是一个简单的演示,证明了上述说法:

$sync = [hashtable]::Synchronized(@{ })
$attempts = 0
do {
    $sync['Value'] = 0
    $attempts++
    0..10 | ForEach-Object -Parallel {
        $sync = $using:sync
        Start-Sleep -Milliseconds 200
        $sync['Value']++
    } -ThrottleLimit 11
}
while ($sync['Value'] -eq 11)
"It took $attempts attempts to fail..."

假设我们有一个数组的数组:

$toProcess = 0..10 | ForEach-Object {
    , (Get-Random -Count (Get-Random -Minimum 5 -Maximum 10))
}

如果您想跟踪每个数组中的处理项,可以使用Mutex来完成:

$processedItems = [hashtable]::Synchronized(@{
    Lock    = [System.Threading.Mutex]::new()
    Counter = 0
})
$toProcess | ForEach-Object -Parallel {
    Start-Sleep (Get-Random -Maximum 5)
    $ref = $using:processedItems
    if($ref['Lock'].WaitOne()) {
        $ref['Counter'] += $_.Count
        $ref['Lock'].ReleaseMutex()
    }
}
$processedCount = ($toProcess | Write-Output | Measure-Object).Count
$processedItems['Counter'] -eq $processedCount

另一个线程安全递增计数器的示例使用Monitor.Enter和一个尝试类似于C# lock语句的自定义函数:

function lock {
    param(
        [Parameter(Mandatory)]
        [object] $Object,
        [Parameter(Mandatory)]
        [scriptblock] $ScriptBlock
    )
    try {
        [System.Threading.Monitor]::Enter($Object)
        & $ScriptBlock
    }
    finally {
        [System.Threading.Monitor]::Exit($Object)
    }
}
$utils = [hashtable]::Synchronized(@{
    LockFunc = $function:lock.ToString()
    Counter  = @(0)
})
$toProcess | ForEach-Object -Parallel {
    $utils = $using:utils
    $function:lock = $utils['LockFunc']
    Start-Sleep (Get-Random -Maximum 5)
    lock($utils['Counter'].SyncRoot) {
        $utils['Counter'][0] += $_.Count
    }
}
$processedCount = ($toProcess | Write-Output | Measure-Object).Count
$utils['Counter'][0] -eq $processedCount

在PowerShell中,更简单的方法是将并行循环的输出转到线性循环中,在该线性循环中可以安全地更新计数器,而无需关心线程安全性:

$counter = 0
$toProcess | ForEach-Object -Parallel {
    Start-Sleep (Get-Random -Maximum 5)
    $_
} | ForEach-Object {
    $counter += $_.Count
}
$processedCount = ($toProcess | Write-Output | Measure-Object).Count
$counter -eq $processedCount

至于C#的建议,使用Interlocked.Increment作为锁定机制。这在PowerShell中并不总是可靠的:

$i = [ref] 0
0..100 | ForEach-Object -Parallel {
    $i = $using:i
    Start-Sleep (Get-Random -Maximum 4)
    $null = [System.Threading.Interlocked]::Increment($i)
}

如果您想使用ConcurrentDictionary类,仍然需要使用锁。唯一变化的是您如何定义[...ConcurrentDictionary[string, object]]::new()而不是[hashtable]::Synchronized(@{...

请注意,ConcurrentDictionary提供了在使用类的特定方法时为您提供锁定的功能,但是对于单个键的递增,仍然需要锁定。

AddOrUpdate方法中,如果同时在不同的线程上调用AddOrUpdate,可能会多次调用addValueFactory,但是它的键/值对可能不会被添加到字典中的每个调用中。

为了递增单个值,您需要预先知道该值,并将+ 1添加到它。使用锁的原因很简单,2个或多个线程可以同时尝试读取该值,然后同时尝试更新该值,从而创建竞争条件。锁定确保在给定的时间段内只有1个线程可以读取和更新该值。

希望对您有所帮助!

0