C++ Thread

发布时间 2023-12-21 00:45:39作者: K1øN
title: C++ thread
layout: page
categories: cpp

C++ Thread

Recover

命中率、缺失率、命中时间和缺失代价是与高速缓存(Cache)性能和效率相关的关键概念。这些术语用于描述缓存系统的性能特征和效果。让我逐一解释这些概念:

  1. 命中率(Hit Rate)

    • 命中率是指在高速缓存中找到所需数据的概率,即在缓存中发生命中的频率。
    • 命中率计算公式:命中次数 / (命中次数 + 不命中次数)。
    • 高命中率表示大部分请求的数据都可以在缓存中找到,缓存系统的性能良好。
  2. 缺失率(Miss Rate)

    • 缺失率是指在高速缓存中无法找到所需数据的概率,即在缓存未命中的频率。
    • 缺失率计算公式:不命中次数 / (命中次数 + 不命中次数)。
    • 低缺失率表示高速缓存很少需要从主内存中加载数据,这有助于提高性能。
  3. 命中时间(Hit Time)

    • 命中时间是指从高速缓存中找到所需数据并返回给请求者所需的时间。它通常是非常短暂的,因为高速缓存设计用于提供快速的数据访问。
    • 较低的命中时间有助于减少访问缓存的延迟,提高整体性能。
  4. 缺失代价(Miss Penalty)

    • 缺失代价是指当发生缓存未命中时,从主内存加载所需数据所需的时间和成本。
    • 缺失代价通常远高于命中时间,因为从主内存加载数据需要更多的时钟周期和更多的时间。
    • 高速缓存系统的性能和效率在很大程度上受到缺失代价的影响。较低的缺失代价意味着即使发生缓存未命中,也可以快速地恢复性能。

参考: https://www.youtube.com/watch?v=pfIC-kle4b0

Concurrency

What is the difference between concurrency and parallelism?

  • Concurrency: 多个任务,这些任务可能需要按照某种顺序执行,可能有依赖关系,可能共享资源,可能某些任务可以同时进行;
    • 比如一群人聚餐,一部分人可以同时讲话,有些时候两个人同时讲话又是不好的,桌子上的食物是大家共享的,大家也会发生对于食物的竞争;
  • Parallelism:所有的任务可以同时进行;

Process and Thread

Process 可以是操作系统里面的一个应用程序,一个 process 可以调用多个 thread;

所有 thread 共享的资源:

  • shared libraries
  • Runtime heap
  • read/write data
  • read-only code/data
  • kernel context:
    • VM structures
    • descriptor table
    • brk pointer

每个thread 独自享用的资源:

  • Stack
  • Thread context
    • data registers
    • Condition codes
    • sp
    • pc

每个thread 有自己的 thread ID (TID),有自己的 logical control flow, ...

When using threads?

  • heavy computation
    • using threads on GPU for graphics
    • GPUS have 100s or 1000s of threads that are good for massively parallel tasks
  • threads to separate work
    • gives performance
    • also simplifies the logic of your problem

回到正题: Threads in Modern C++

  • specifically, std::thread
  • or boost library ...

some_thread.join()

join 就是要求当前线程等待 some_thread 执行完成,才开始执行 join() 下面的内容;

一个简单的例子:

#include <thread>
#include <iostream>

void test(int x) {
    std::cout << "hello thread 1" << std::endl;
    std::cout << "the value is " << x << std::endl;
}
auto main() -> int {
    std::thread myThread(&test, 100);
    myThread.join();
    std::cout << "hello main thread!" << std::endl;
}

main thread -> std::thread myThread(&test, 100)

(Both threads are executing concurrently)

myThread -> std::cout -> return

myThread.join(): hey main, you should wait for me to finish the work!

#include <thread>
#include <iostream>

auto main() -> int {
    auto lambda = [](int x) {
        std::cout << "hello from our thread!" << std::endl;
        std::cout << x << std::endl;
    };
    std::thread myThread(lambda, 100);
    myThread.join();
    std::cout << "hello main thread!" << std::endl;
}

Using lambda is the same as above.

What if we need 10 threads?

#include <thread>
#include <vector>
#include <iostream>

auto main() -> int {
    auto lambda = [](int x) {
        std::cout << "hello from our thread, TID: " << std::this_thread::get_id() << std::endl;
        std::cout << "Argument passed: " << x << std::endl;
    };
    std::vector<std::thread> threads;
    for(int i = 0; i < 10; i++) {
        threads.push_back(std::thread(lambda, i));
        threads[i].join();
    }
    std::cout << "hello main thread!" << std::endl;
}
hello from our thread, TID: 0x16b31f000
Argument passed: 0
hello from our thread, TID: 0x16b31f000
Argument passed: 1
hello from our thread, TID: 0x16b31f000
Argument passed: 2
hello from our thread, TID: 0x16b31f000
Argument passed: 3
hello from our thread, TID: 0x16b31f000
Argument passed: 4
hello from our thread, TID: 0x16b31f000
Argument passed: 5
hello from our thread, TID: 0x16b31f000
Argument passed: 6
hello from our thread, TID: 0x16b31f000
Argument passed: 7
hello from our thread, TID: 0x16b31f000
Argument passed: 8
hello from our thread, TID: 0x16b31f000
Argument passed: 9
hello main thread!

虽然但是,这样写并不需要使用 thread.

如果是这样——

#include <thread>
#include <vector>
#include <iostream>

auto main() -> int {
    auto lambda = [](int x) {
        std::cout << "hello from our thread! Argument passed: " << x << std::endl;
    };
    std::vector<std::thread> threads;
    for(int i = 0; i < 10; i++) {
        threads.push_back(std::thread(lambda, i));
    }

    for(auto& item : threads) {
        item.join();
    }
    std::cout << "hello main thread!" << std::endl;
}
kionmiyasaka@macmini ~/D/p/test> cd "/Users/kionmiyasaka/Documents/projects/test/" && /opt/homebr
ew/opt/llvm/bin/clang++ -std=c++23 thread.cpp -o thread && "/Users/kionmiyasaka/Documents/project
s/test/"thread
hello from our thread, TID: hello from our thread, TID: 0x16d11f000
Argument passed: 0
0x16d1ab000
Argument passed: 1
hello from our thread, TID: 0x16d3db000
hello from our thread, TID: hello from our thread, TID: 0x16d237000
0x16d2c3000
hello from our thread, TID: Argument passed: 3
hello from our thread, TID: 0x16d4f3000
Argument passed: 7
hello from our thread, TID: 0x16d57f000
Argument passed: 8
Argument passed: 5
Argument passed: 2
hello from our thread, TID: 0x16d34f000
Argument passed: 4
0x16d467000hello from our thread, TID: 0x16d60b000
Argument passed: 9

Argument passed: 6
hello main thread!

This is totally a mess!

C++20 - std::jthread

std::jthread: launches a thread and joins the thread immediatly

?会在调用 dtor 的时候调用

(本机无法验证,因为clang++没有实装 std::jthread)

How do threads working together?

  • 1000 threads counting the sizes of a file in a directory;

For example,

#include <thread>
#include <vector>
#include <iostream>

static int shared_value = 0;
void increment_shared_value() {
    shared_value = shared_value + 1;
}

auto main() -> int {    
    std::vector<std::thread> threads;
    for(int i = 0; i < 1000; i++) {
        threads.push_back(std::thread(increment_shared_value));
    }

    for(auto& item : threads) {
        item.join();
    }
    std::cout << shared_value << std::endl;
    std::cout << "hello main thread!" << std::endl;
}
kionmiyasaka@macmini ~/D/p/test> cd "/Users/kionmiyasaka/Documents/projects/test/" && /opt/homebrew/opt/llvm/bin/clang++ -std=c++23 thread.cpp -o thread && "/Users/kionmiyasaka/Documents/projects/test/"thread
1000
hello main thread!
kionmiyasaka@macmini ~/D/p/test> cd "/Users/kionmiyasaka/Documents/projects/test/" && /opt/homebrew/opt/llvm/bin/clang++ -std=c++23 thread.cpp -o thread && "/Users/kionmiyasaka/Documents/projects/test/"thread
1000
hello main thread!
kionmiyasaka@macmini ~/D/p/test> cd "/Users/kionmiyasaka/Documents/projects/test/" && /opt/homebrew/opt/llvm/bin/clang++ -std=c++23 thread.cpp -o thread && "/Users/kionmiyasaka/Documents/projects/test/"thread
997
hello main thread!
kionmiyasaka@macmini ~/D/p/test> cd "/Users/kionmiyasaka/Documents/projects/test/" && /opt/homebrew/opt/llvm/bin/clang++ -std=c++23 thread.cpp -o thread && "/Users/kionmiyasaka/Documents/project
s/test/"thread
999
hello main thread!

执行多几次,发现结果是不一样的,也就是说出现了 undefined behavior;

Pitfalls of concurrent programs

一千个线程都同时访问同一个 shared_value -> 也就是说出现了所谓的 data race;

  • 我们可以使用 lock 来保护我们的数据;
  • 在C++ 中这个lock 就是 mutex;
    • A mutex, allows 'mutual exclusion' to block of code;
    • so the operation is 'atomic' in the sense that only 1 operation can happen while the lock is held;

解决方法:

#include <atomic>
#include <thread>
#include <vector>
#include <iostream>

std::mutex gLock;
static int shared_value = 0;

void increment_shared_value() {
    gLock.lock();
    	shared_value = shared_value + 1;
    	// this region is called the 'critical section' that is protected by the lock
    gLock.unlock();
	// 保证只有一个线程在执行这个代码块
}

auto main() -> int {    
    std::vector<std::thread> threads;
    for(int i = 0; i < 1000; i++) {
        threads.push_back(std::thread(increment_shared_value));
    }

    for(auto& item : threads) 
        item.join();
    }
    std::cout << shared_value << std::endl;
    std::cout << "hello main thread!" << std::endl;
}

使用 std::mutex 之后,结果就正确了:

kionmiyasaka@macmini ~/D/p/test> cd "/Users/kionmiyasaka/Documents/projects/test/" && /opt/homebrew/opt/llvm/bin/clang++ -std=c++23 thread.cpp -o thread && "/Users/kionmiyasaka/Documents/project
s/test/"thread
1000
hello main thread!

What happens if the lock is never returned? 就好像把房间的钥匙扔掉一样——

This is called Dead lock

void increment_shared_value() {
    gLock.lock();
    	shared_value = shared_value + 1;
    	// this region is called the 'critical section' that is protected by the lock
    // gLock.unlock();
	// Oops, never return the lock
}

This is going to crash program...

如何解决 deadlock?

  • code review
    • ensure every lock has a pair of unlock
  • static analysis tools

So lets make sure we have a lock for every unlick

What if...

void increment_shared_value() {
    gLock.lock();
    try {
        shared_value = shared_value + 1;
        throw "Dangerous exception abort";
    } catch(...) {
        std::cout << "Handle exception by returning from thread\n";
        return;
        // Oops, forget to unlock too!
        // The safest thing when we catch an exception is to abort.
    }
    gLock.unlock();
}

虽然但是,我们还是有更好的办法——

Prefer lock_guard (C++11) over lock/unlock

void increment_shared_value() {
    std::lock_guard<std::mutex> lockGuard(gLock);    
    try {
        shared_value = shared_value + 1;
        throw "Dangerous exception abort";
    } catch(...) {
        std::cout << "Handle exception by returning from thread\n";
        return;
        // Oops, no need for unlock!
    }
}

std::lock_guard follows RAII and will release the lock after leaving scope; This is also a good example of RAII;

void increment_shared_value() {
    std::lock_guard<std::mutex> lockGuard(gLock);    
    try {
        shared_value = shared_value + 1;
        throw "Dangerous exception abort";
    } catch(...) {
        std::cout << "Handle exception by returning from thread\n";
        return;
        // Oops, no need for unlock!
    }
}

Prefer using std::scoped_lock (C++17) over std::lock_guard

  • An update to lock_guard, but can acquire multiple locks at once;

Prefer using std::atomic over using mutexs

或者说,无锁程序会比有锁的更好;

#include <atomic>
#include <thread>
#include <vector>
#include <iostream>

std::mutex gLock;
static std::atomic<int> shared_value = 0;

void increment_shared_value() {
    std::lock_guard<std::mutex> lockGuard(gLock);    
    shared_value = shared_value + 1;
}

auto main() -> int {    
    std::vector<std::thread> threads;
    for(int i = 0; i < 1000; i++) {
        threads.push_back(std::thread(increment_shared_value));
    }

    for(auto& item : threads) {
        item.join();
    }
    std::cout << shared_value << std::endl;
    std::cout << "hello main thread!" << std::endl;
}

After C++11, static local variables are thread save

  • Also it will be initialized only once;

我们不需要到处都给自己的程序加锁;

static global variable will initialized once, but it is not thread save;

以下施工中——

Another example:


#include <thread>
#include <iostream>

using std::cout;
using std::endl;


int main() {
    int cnt = 0;
    int maxn = 100000;
    std::thread worker1([&]() {
        for(int i = 0; i < maxn; i++) {
            cnt ++;
        }
    } );

    std::thread worker2([&]() {
        for(int i = 0; i < maxn; i++) {
            cnt++;
        }
    });
    worker1.join();
    worker2.join();
    cout << cnt << endl;
    return 0;
}

处理好核心之间竞争数据的问题

void test() {
    Timer            timer;
    std::atomic<int> a(-1);
    std::thread      t0([&]() { work(a); });
    std::thread      t1([&]() { work(a); });
    std::thread      t2([&]() { work(a); });
    std::thread      t3([&]() { work(a); });
    t0.join();
    t1.join();
    t2.join();
    t3.join();
}

这种情况下使用越多的核心速度会越慢,因为多个核心竞争同一个数据;

void test_maybe_better() {
    Timer            timer;
    std::atomic<int> a(-1);
    std::atomic<int> b(-1);
    std::atomic<int> c(-1);
    std::atomic<int> d(-1);
    std::thread      t0([&]() { work(a); });
    std::thread      t1([&]() { work(b); });
    std::thread      t2([&]() { work(c); });
    std::thread      t3([&]() { work(d); });
    t0.join();
    t1.join();
    t2.join();
    t3.join();
}

尝试通过区分开不同的原子变量来避免核心之间竞争资源——但是这样并没有解决什么问题,和之前一样慢。

完整测试程序:

#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>


class Timer {
private:
    std::chrono::time_point<std::chrono::high_resolution_clock> start_time;

public:
    Timer() {
        start_time = std::chrono::high_resolution_clock::now();
    }
    ~Timer() {

        auto end_time = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
        std::cout << "elapse time " << duration.count() << " ms" << std::endl;
    }
};

void work(std::atomic<int>& a) {
    for(int i = 1; i < 10000; i++) {
        a++;
    }
}

void test_one_core() {
    Timer            timer;
    std::atomic<int> a(-1);
    work(a);
    work(a);
    work(a);
    work(a);
}

void test() {
    Timer            timer;
    std::atomic<int> a(-1);
    std::thread      t0([&]() { work(a); });
    std::thread      t1([&]() { work(a); });
    std::thread      t2([&]() { work(a); });
    std::thread      t3([&]() { work(a); });
    t0.join();
    t1.join();
    t2.join();
    t3.join();
}

void test_maybe_better() {
    Timer            timer;
    std::atomic<int> a(-1);
    std::atomic<int> b(-1);
    std::atomic<int> c(-1);
    std::atomic<int> d(-1);
    std::thread      t0([&]() { work(a); });
    std::thread      t1([&]() { work(b); });
    std::thread      t2([&]() { work(c); });
    std::thread      t3([&]() { work(d); });
    t0.join();
    t1.join();
    t2.join();
    t3.join();
}

int main() {
    test_one_core();
    test();
    test_maybe_better();
    return 0;
}