std::atomic是什么?

40 浏览
0 Comments

std::atomic是什么?

我理解 std::atomic<> 是一个原子对象。但是有多大程度是原子的呢?据我理解,一个操作可以是原子的。那么,把一个对象变成原子对象到底是什么意思呢?例如,如果有两个线程同时执行以下代码:

a = a + 12;

那么整个操作(比如说 add_twelve_to(int))是原子的吗?还是对变量的更改是原子的(所以是 operator=())?

admin 更改状态以发布 2023年5月22日
0
0 Comments

std::atomic 存在是因为许多ISA直接支持它的硬件

C++标准中关于 std::atomic 的内容已经在其他答案中进行了分析。

那么现在让我们看看 std::atomic 编译成什么以获取不同的见解。

这个实验的主要结论是,现代CPU直接支持原子整数操作,例如x86中的LOCK前缀,而 std::atomic 基本上存在作为这些指令的可移植接口:x86汇编指令中的“lock”指令是什么意思? 在aarch64中,会使用 LDADD

这种支持允许使用比更一般的方法(如 std::mutex)更快的替代方案,后者可以使更复杂的多指令部分成为原子部分,而成本则比 std::atomic 更慢,因为 std::mutex 在Linux中会产生 futex 系统调用,而这比 std::atomic 发出的用户空间指令要慢得多,还可以参见:std::mutex是否创建一个屏障?

让我们来考虑以下多线程程序,它跨多个线程递增全局变量,并具有不同的同步机制,具体取决于使用哪个预处理器定义。

main.cpp

#include 
#include 
#include 
#include 
size_t niters;
#if STD_ATOMIC
std::atomic_ulong global(0);
#else
uint64_t global = 0;
#endif
void threadMain() {
    for (size_t i = 0; i < niters; ++i) {
#if LOCK
        __asm__ __volatile__ (
            "lock incq %0;"
            : "+m" (global),
              "+g" (i) // to prevent loop unrolling
            :
            :
        );
#else
        __asm__ __volatile__ (
            ""
            : "+g" (i) // to prevent he loop from being optimized to a single add
            : "g" (global)
            :
        );
        global++;
#endif
    }
}
int main(int argc, char **argv) {
    size_t nthreads;
    if (argc > 1) {
        nthreads = std::stoull(argv[1], NULL, 0);
    } else {
        nthreads = 2;
    }
    if (argc > 2) {
        niters = std::stoull(argv[2], NULL, 0);
    } else {
        niters = 10;
    }
    std::vector threads(nthreads);
    for (size_t i = 0; i < nthreads; ++i)
        threads[i] = std::thread(threadMain);
    for (size_t i = 0; i < nthreads; ++i)
        threads[i].join();
    uint64_t expect = nthreads * niters;
    std::cout << "expect " << expect << std::endl;
    std::cout << "global " << global << std::endl;
}

GitHub 上游

编译、运行和反汇编:

comon="-ggdb3 -O3 -std=c++11 -Wall -Wextra -pedantic main.cpp -pthread"
g++ -o main_fail.out                    $common
g++ -o main_std_atomic.out -DSTD_ATOMIC $common
g++ -o main_lock.out       -DLOCK       $common
./main_fail.out       4 100000
./main_std_atomic.out 4 100000
./main_lock.out       4 100000
gdb -batch -ex "disassemble threadMain" main_fail.out
gdb -batch -ex "disassemble threadMain" main_std_atomic.out
gdb -batch -ex "disassemble threadMain" main_lock.out

极有可能是 “错误” 的竞争条件输出为 main_fail.out

expect 400000
global 100000

而其他输出则是确定性 “正确” 的:

expect 400000
global 400000

main_fail.out 的反汇编:

   0x0000000000002780 <+0>:     endbr64 
   0x0000000000002784 <+4>:     mov    0x29b5(%rip),%rcx        # 0x5140 
   0x000000000000278b <+11>:    test   %rcx,%rcx
   0x000000000000278e <+14>:    je     0x27b4 
   0x0000000000002790 <+16>:    mov    0x29a1(%rip),%rdx        # 0x5138 
   0x0000000000002797 <+23>:    xor    %eax,%eax
   0x0000000000002799 <+25>:    nopl   0x0(%rax)
   0x00000000000027a0 <+32>:    add    $0x1,%rax
   0x00000000000027a4 <+36>:    add    $0x1,%rdx
   0x00000000000027a8 <+40>:    cmp    %rcx,%rax
   0x00000000000027ab <+43>:    jb     0x27a0 
   0x00000000000027ad <+45>:    mov    %rdx,0x2984(%rip)        # 0x5138 
   0x00000000000027b4 <+52>:    retq

main_std_atomic.out 的反汇编:

   0x0000000000002780 <+0>:     endbr64 
   0x0000000000002784 <+4>:     cmpq   $0x0,0x29b4(%rip)        # 0x5140 
   0x000000000000278c <+12>:    je     0x27a6 
   0x000000000000278e <+14>:    xor    %eax,%eax
   0x0000000000002790 <+16>:    lock addq $0x1,0x299f(%rip)        # 0x5138 
   0x0000000000002799 <+25>:    add    $0x1,%rax
   0x000000000000279d <+29>:    cmp    %rax,0x299c(%rip)        # 0x5140 
   0x00000000000027a4 <+36>:    ja     0x2790 
   0x00000000000027a6 <+38>:    retq   

main_lock.out 的反汇编:

Dump of assembler code for function threadMain():
   0x0000000000002780 <+0>:     endbr64 
   0x0000000000002784 <+4>:     cmpq   $0x0,0x29b4(%rip)        # 0x5140 
   0x000000000000278c <+12>:    je     0x27a5 
   0x000000000000278e <+14>:    xor    %eax,%eax
   0x0000000000002790 <+16>:    lock incq 0x29a0(%rip)        # 0x5138 
   0x0000000000002798 <+24>:    add    $0x1,%rax
   0x000000000000279c <+28>:    cmp    %rax,0x299d(%rip)        # 0x5140 
   0x00000000000027a3 <+35>:    ja     0x2790 
   0x00000000000027a5 <+37>:    retq

结论:

  • 非原子版本将全局变量保存到寄存器中并递增该寄存器。

    因此,在最后,非常可能使用相同的 “错误” 值 100000 将四次写入回全局。

  • std::atomic 编译成 lock addq。LOCK前缀使后续的 inc 原子地获取、修改和更新内存。

  • 我们明确的内联汇编LOCK前缀编译成几乎与 std::atomic 相同的东西,只是我们使用了 inc 而不是 add。不确定为什么GCC选择了 add,考虑到我们的INC生成了更小的一字节的解码。

ARMv8在新的CPU中可以使用LDAXR + STLXR或LDADD:如何在普通的C语言中启动线程? 在Ubuntu 19.10 AMD64,GCC 9.2.1,Lenovo ThinkPad P51上进行了测试。

0
0 Comments

每个std::atomic<>的实例化和完整特化都表示一种不会引起未定义行为的类型,多个线程可以同时对其进行操作(它们的实例),

原子类型的对象是唯一不受数据竞争影响的C++对象;也就是说,一个线程写入原子对象时,另一个线程从中读取是行为明确的。

此外,对原子对象的访问可能会建立线程间同步,并按照std::memory_order的规定对非原子内存访问进行排序。

std::atomic<>封装了早期C++ 11时期使用(例如)MSVC中的插入式函数或者GCC中的原子插入式函数执行的操作。

此外,std::atomic<>通过允许指定不同的内存顺序来给你更多的控制能力,从而指定同步和排序约束条件。如果你想了解更多关于C++ 11的原子和内存模型的内容,以下链接可能有所帮助:

请注意,对于典型用例,您可能会使用重载的算术运算符或另一组算术运算符:

std::atomic value(0);
value++; //This is an atomic op
value += 5; //And so is this

因为运算符语法不允许您指定内存顺序,所以这些操作将使用std::memory_order_seq_cst进行操作,这是C++11中所有原子操作的默认顺序。它保证了所有原子操作之间的顺序一致性(全局总排序)。

但在某些情况下,这可能不是必需的(且没有免费的午餐),因此您可能需要使用更明确的形式:

std::atomic value {0};
value.fetch_add(1, std::memory_order_relaxed); // Atomic, but there are no synchronization or ordering constraints
value.fetch_add(5, std::memory_order_release); // Atomic, performs 'release' operation

现在,看看你的例子:

a = a + 12;

它将不会评估为一个单一的原子操作:它将结果为a.load()(本身是原子的),然后将该值与12相加,最后将结果存储在a.store() (也是原子的)中。正如我之前指出的那样,这里将使用std::memory_order_seq_cst

然而,如果你写 a += 12 ,它将是原子操作(正如我之前指出的那样),并且大致相当于 a.fetch_add(12,std::memory_order_seq_cst)

至于你的评论:

int通常具有原子加载和存储。使用atomic <> 包装它的意义何在?

你的说法只适用于提供这种存储和/或加载原子性保证的体系结构。有些架构不会这样做。此外,通常要求必须在单词/双字对齐地址上执行操作才能保证原子性。 std::atomic <> 保证在每个平台上都是原子性的,无需其他要求。此外,它使您可以编写如下代码:

void* sharedData = nullptr;
std::atomic ready_flag = 0;
// Thread 1
void produce()
{
    sharedData = generateData();
    ready_flag.store(1, std::memory_order_release);
}
// Thread 2
void consume()
{
    while (ready_flag.load(std::memory_order_acquire) == 0)
    {
        std::this_thread::yield();
    }
    assert(sharedData != nullptr); // will never trigger
    processData(sharedData);
}

请注意,断言条件将始终为true(因此不会触发),因此您始终可以确保在while循环退出后,数据已准备好。这是因为:

  • store()到标志是在设置sharedData之后执行的(我们假定generateData()始终返回有用的东西,特别是从不返回 NULL ),并使用std::memory_order_release顺序:

memory_order_release

具有此内存顺序的存储操作执行释放操作:当前线程中的任何读取或写入都不能重新排序此存储之后。当前线程中的所有写入在获取相同的原子变量的其他线程看到。

  • while循环退出后,将使用sharedData,因此在从标志的load()返回非零值之后将使用load()load()使用std::memory_order_acquire顺序:

std::memory_order_acquire

具有此内存顺序的加载操作对受影响的内存位置执行获取操作:当前线程中的任何读取或写入都不能重新排序此加载之前。释放相同的原子变量的其他线程中的所有写入在当前线程中是可见的。

这使您可以精确控制同步,并允许您明确指定代码可能性/可能性/将要/不会如何行动。如果仅保证了原子性,这是不可能的。特别是当涉及非常有趣的同步模型时,例如释放-consuming顺序

0