多线程编程技术

多线程编程技术

多线程编程技术是一种程序设计方法,它允许程序的多个部分(线程)同时执行。多线程的优势在于能够更有效地利用处理器资源,提高程序的性能。多线程对于并行处理任务、提高响应性和处理器利用率等方面具有重要意义。

在C++中,可以使用C++11标准提供的库来实现多线程编程。下面是一个简单的C++多线程示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>

void print_hello() {
std::cout << "Hello from thread!" << std::endl;
}

int main() {
// 创建一个新的线程,执行print_hello函数
std::thread t(print_hello);

// 在主线程中打印一条消息
std::cout << "Hello from main thread!" << std::endl;

// 等待子线程执行完成
t.join();

return 0;
}


在这个示例中,我们首先定义了一个print_hello函数,用于在子线程中执行。在main函数中,我们创建了一个新的线程std::thread t(print_hello),它执行print_hello函数。然后,我们在主线程中打印一条消息,表示主线程的执行。最后,我们使用t.join()等待子线程执行完成,以确保子线程在程序结束前完成任务。

使用互斥量

下面是一个使用互斥量保护共享资源的多线程示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex mtx;
int shared_counter = 0;

void increment(int num_increments) {
for (int i = 0; i < num_increments; ++i) {
std::unique_lock<std::mutex> lock(mtx);
++shared_counter;
lock.unlock();
}
}

int main() {
const int num_threads = 5;
const int num_increments = 1000;

std::vector<std::thread> threads;

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment, num_increments);
}

for (auto &t : threads) {
t.join();
}

std::cout << "Final value of shared_counter: " << shared_counter << std::endl;

return 0;
}


在这个示例中,我们创建了一个名为shared_counter的共享计数器,多个线程将同时访问它。为了避免竞争条件,我们使用互斥量mtx来保护对shared_counter的访问。increment函数中,我们在修改shared_counter前获取互斥量的锁,在修改shared_counter之后,我们立即释放锁。这样,就确保了在同一时刻只有一个线程能够访问shared_counter,从而避免了数据不一致和竞争条件。

main函数中,我们创建了一个std::vector类型的容器来存储多个线程对象。然后,我们创建了num_threads个线程,每个线程执行increment函数,并将num_increments作为参数传递。接下来,我们使用join函数等待所有线程执行完成。最后,我们输出shared_counter的最终值。

注意,这个示例仅用于说明如何使用互斥量保护共享资源。在实际应用中,你可能需要根据具体问题选择合适的同步原语,例如条件变量、信号量(semaphore)或原子操作(atomic operations)等。

这里是一个使用条件变量实现生产者-消费者模型的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;

void producer(int num_items) {
for (int i = 0; i < num_items; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i);
lock.unlock();

cv.notify_one();
}
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !data_queue.empty(); });

int data = data_queue.front();
data_queue.pop();

if (data == -1) {
break;
}

lock.unlock();

std::cout << "Consumed: " << data << std::endl;
}
}

int main() {
std::thread prod_thread(producer, 10);
std::thread cons_thread(consumer);

prod_thread.join();

{
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(-1);
}

cv.notify_one();

cons_thread.join();

return 0;
}


在这个示例中,我们创建了一个生产者线程和一个消费者线程。生产者线程向data_queue中添加数据,消费者线程从data_queue中取出并处理数据。我们使用条件变量cv来同步生产者和消费者线程。当生产者线程向队列中添加数据后,它会调用cv.notify_one()来唤醒等待中的消费者线程。消费者线程在cv.wait()中等待,直到队列非空。当队列为空时,消费者线程将一直等待,直到生产者线程向队列中添加新数据并发出通知。这种方式可以有效地协调生产者和消费者线程之间的工作,避免资源浪费。

在上面的生产者-消费者示例中,我们只使用了一个生产者线程和一个消费者线程。实际上,可以通过创建多个生产者和消费者线程来进一步提高处理能力。在这种情况下,需要确保多个线程之间的同步和资源访问安全。我们可以使用相同的互斥量和条件变量来实现这一目标。

以下是一个具有多个生产者和消费者线程的生产者-消费者示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;

void producer(int id, int num_items) {
for (int i = 0; i < num_items; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(i);
lock.unlock();

cv.notify_one();
std::cout << "Producer " << id << " produced: " << i << std::endl;
}
}

void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !data_queue.empty(); });

int data = data_queue.front();
data_queue.pop();

if (data == -1) {
data_queue.push(-1); // 将-1放回队列,以便其他消费者线程也能收到退出信号
break;
}

lock.unlock();

std::cout << "Consumer " << id << " consumed: " << data << std::endl;
}
}

int main() {
const int num_producers = 3;
const int num_consumers = 5;
const int num_items_per_producer = 10;

std::vector<std::thread> producer_threads;
std::vector<std::thread> consumer_threads;

for (int i = 0; i < num_producers; ++i) {
producer_threads.emplace_back(producer, i, num_items_per_producer);
}

for (int i = 0; i < num_consumers; ++i) {
consumer_threads.emplace_back(consumer, i);
}

for (auto &t : producer_threads) {
t.join();
}

{
std::unique_lock<std::mutex> lock(mtx);
for (int i = 0; i < num_consumers; ++i) {
data_queue.push(-1);
}
}

cv.notify_all();

for (auto &t : consumer_threads) {
t.join();
}

return 0;
}


在这个示例中,我们创建了num_producers个生产者线程和num_consumers个消费者线程。生产者线程将数据添加到data_queue中,消费者线程从data_queue中获取并处理数据。当所有生产者线程完成工作后,我们将-1添加到data_queue中,以通知消费者线程退出。这里我们为每个消费者线程都添加了一个-1,确保所有消费者线程都能收到退出信号。

请注意,多线程编程需要仔细考虑同步和资源访问安全,以避免潜在的问题,如竞争条件、死锁和数据不一致。在实际应用中,还需要根据具体需求选择合适的线程管理策略和线程池技术。线程池是一种管理线程的方式,它预先创建一定数量的线程,并在需要时将任务分配给空闲线程。这样可以减少线程创建和销毁的开销,提高程序的性能和响应性。

线程池

在C++中,可以使用第三方库(例如Intel TBB或Boost.Asio等)来实现线程池功能。也可以自己实现一个简单的线程池,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>

class ThreadPool {
public:
ThreadPool(size_t num_threads) : done(false) {
for (size_t i = 0; i < num_threads; ++i) {
worker_threads.emplace_back(&ThreadPool::worker, this);
}
}

~ThreadPool() {
done = true;
cv.notify_all();
for (auto &t : worker_threads) {
t.join();
}
}

template <typename Func, typename... Args>
auto enqueue(Func &&func, Args &&... args)
-> std::future<typename std::result_of<Func(Args...)>::type> {
using ReturnType = typename std::result_of<Func(Args...)>::type;
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<ReturnType> result = task->get_future();

{
std::unique_lock<std::mutex> lock(mtx);
tasks.emplace([task]() { (*task)(); });
}

cv.notify_one();
return result;
}

private:
void worker() {
while (!done) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return done || !tasks.empty(); });

if (done && tasks.empty()) {
return;
}

task = std::move(tasks.front());
tasks.pop();
}

task();
}
}

std::vector<std::thread> worker_threads;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
std::atomic_bool done;
};


这个简单的线程池实现提供了一个enqueue方法,可以将任务添加到线程池中。线程池中的工作线程在执行任务时,会等待任务队列中的新任务。当任务队列为空时,工作线程会阻塞等待,直到有新任务添加到队列中。当线程池析构时,所有工作线程将退出。

使用线程池可以简化多线程任务的管理,并提高程序性能。通过限制线程的数量,线程池可以帮助降低资源竞争和上下文切换开销。此外,线程池还可以根据系统负载动态调整线程数量,以适应不同的运行环境和任务需求。

以下是一个使用上述线程池实现的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <vector>
#include <chrono>

int long_running_task(int id) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Task " << id << " completed on thread " << std::this_thread::get_id() << std::endl;
return id * 2;
}

int main() {
ThreadPool thread_pool(4);
std::vector<std::future<int>> results;

for (int i = 0; i < 10; ++i) {
results.emplace_back(thread_pool.enqueue(long_running_task, i));
}

for (auto &result : results) {
std::cout << "Result: " << result.get() << std::endl;
}

return 0;
}


在这个示例中,我们创建了一个包含4个工作线程的线程池。我们将10个long_running_task任务添加到线程池中,并将返回的std::future对象存储在一个向量中。这些任务将由线程池中的工作线程执行。最后,我们等待所有任务完成,并输出结果。

请注意,在使用多线程和线程池时,始终要关注线程安全、资源竞争、死锁和性能等问题。在编写多线程代码时,务必谨慎地选择合适的同步原语,并在适当时机对共享资源进行保护。此外,在实际应用中,可能需要根据项目需求选择不同的线程池实现,以满足性能、灵活性和易用性等方面的需求。


多线程编程技术
https://qiangsun89.github.io/2023/04/24/多线程编程技术/
作者
Qiang Sun
发布于
2023年4月24日
许可协议