关注

Linux 线程同步与互斥(二):线程同步从条件变量到生产者消费者模型全解,原理 + 源码彻底吃透

在这里插入图片描述

🔥草莓熊Lotso:个人主页

❄️个人专栏: 《C++知识分享》 《Linux 入门到实践:零基础也能懂》

✨生活是默默的坚持,毅力是永久的享受!

🎬 博主简介:

在这里插入图片描述


文章目录


前言:

在 Linux 多线程编程中,我们通过互斥锁解决了临界资源的并发安全问题,但很多同学写的代码依然会出现问题:某个线程疯狂抢占锁,其他线程长期得不到执行造成饥饿问题;线程不断轮询判断临界资源状态,造成 CPU 资源的严重浪费;生产者和消费者强耦合,代码扩展性极差。这些问题的核心,就是只懂线程互斥,不懂线程同步。互斥保证了临界资源的访问安全,而同步则在安全的前提下,让多线程按照合理的顺序协同执行,既避免了饥饿,又最大化发挥了多线程的并发性能。本文将完整覆盖 Linux 线程同步的核心知识点,从同步的核心概念、条件变量的底层原理,到生产者消费者模型的两种经典实现,结合源码逐行拆解,帮你彻底搞懂 Linux 线程同步,不管是业务开发还是面试,都能轻松拿捏。


一. 为什么有了互斥锁,还需要线程同步?

在讲同步之前,我们先搞懂一个核心问题:互斥锁已经能保证临界资源的安全访问了,为什么还需要线程同步?

1.1 互斥锁的局限性:安全但不合理

我们用文档中经典的VIP 自习室例子来理解:

  • VIP 自习室一次只能进一个人,门口只有一把钥匙,这就是互斥锁
  • 你凌晨抢到钥匙进了自习室,学完出门刚挂上钥匙,立刻又抢了回来继续学,如此反复;
  • 其他排队的同学永远抢不到钥匙,只能一直等,这就是线程饥饿问题

互斥锁只保证了「同一时间只有一个线程访问临界资源」,但完全不保证访问的公平性和顺序性。在极端情况下,一个线程可以反复抢占锁,其他线程长期得不到 CPU 调度,代码虽然是线程安全的,但执行效率极低,完全浪费了多线程的并发优势。

在这里插入图片描述
在这里插入图片描述

1.2 线程同步的核心定义

  • 同步:在保证数据安全(互斥)的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,让多线程协同更合理、更高效。
  • 正确的说法(什么是线程同步,为什么需要同步):线程同步指的是线程间对数据资源进行获取,有可能在不满足访问资源条件的情况下访问资源而造成程序逻辑混乱,因此通过进行条件判断来决定线程在不能访问资源时休眠等待或满足资源后唤醒等待的线程的方式实现对资源访问的合理性

简单来说

  • 互斥:解决「能不能访问」的问题,保证临界资源的安全;
  • 同步:解决「什么时候访问」的问题,保证多线程的协同有序。

在这里插入图片描述


二. 线程同步的核心工具:条件变量 (Condition Variable)

2.1 条件变量是什么?

我们还是用自习室的例子来理解(下面还有别的例子):

  • 自习室门口新增了一个等待队列,没抢到钥匙的同学必须去队列排队,不能反复抢锁;
  • 自习室里的同学出来后,必须先通知队列里的第一个同学,而不是自己重新抢锁;
  • 这个「排队 + 通知」的机制,就是条件变量

从技术角度来说,条件变量是 POSIX 线程库提供的同步工具,它提供了线程等待线程唤醒两大核心能力:

  • 当线程访问临界资源的条件不满足时,就让线程进入阻塞等待状态,释放 CPU 资源;
  • 当其他线程将条件修改为满足时,主动唤醒等待的线程,让其继续执行。

在这里插入图片描述
在这里插入图片描述

2.2 条件变量核心 API

条件变量的操作接口都在<pthread.h>头文件中,编译时需要链接-lpthread库,核心 API 分为 4 类:

2.2.1 初始化条件变量

初始化有两种方式,和互斥锁完全对应:

// 方式1:静态初始化(全局/静态变量适用)
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

// 方式2:动态初始化(局部变量适用)
int pthread_cond_init(pthread_cond_t *restrict cond,
                      const pthread_condattr_t *restrict attr);
  • 参数cond:要初始化的条件变量指针;
  • 参数attr:条件变量属性,填NULL表示使用默认属性;
  • 返回值:成功返回 0,失败返回错误码。

2.2.2 销毁条件变量

int pthread_cond_destroy(pthread_cond_t *cond);
  • 注意:用PTHREAD_COND_INITIALIZER静态初始化的条件变量,无需手动销毁;
  • 必须确保没有线程在该条件变量上等待时,再执行销毁。

2.2.3 等待条件满足(核心 API)

int pthread_cond_wait(pthread_cond_t *restrict cond,
                      pthread_mutex_t *restrict mutex);
  • 参数cond:要等待的条件变量;
  • 参数mutex:保护临界资源的互斥锁;
  • 核心功能:让调用线程进入阻塞等待状态,原子性地释放持有的互斥锁;当线程被唤醒时,会自动重新竞争互斥锁,竞争成功后才会返回

2.2.4 唤醒等待的线程

// 唤醒条件变量等待队列中的至少一个线程
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒条件变量等待队列中的所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);

2.2.5 条件变量的最基础demo以及源码解析(API初步用起来)

原生库版本(部分操作直接用的c++11)

#include <iostream>
#include <thread>
#include <unistd.h>
#include <pthread.h>
#include <string>

int tickets = 1000;
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;

void grabTicket(const std::string& name) 
{
    while (true) 
    {
        pthread_mutex_lock(&glock);
        pthread_cond_wait(&gcond, &glock);
        if (tickets > 0) 
        {
            usleep(1000);
            std::cout << name << " grabbed ticket, remaining: " << --tickets << std::endl;
            pthread_mutex_unlock(&glock);
        } else 
        {
            pthread_mutex_unlock(&glock);
            break;
        }
    }
}

int main() 
{
    const int THREAD_COUNT = 4;
    std::thread threads[THREAD_COUNT]; // 创建一个数组去存线线程
    
    // 创建线程
    for (int i = 0; i < THREAD_COUNT; i++) 
    {
        std::string name = "Thread-" + std::to_string(i + 1);
        threads[i] = std::thread(grabTicket, name);
    }

    // 唤醒线程
    while(true)
    {
        std::cout << "wake up ..." << std::endl;
        pthread_cond_signal(&gcond); // 也可以去唤醒一批
        
		// 1. 单播唤醒:一次唤醒一个线程
        // pthread_cond_signal(&cond);
        // 2. 广播唤醒:一次唤醒所有等待的线程
        // pthread_cond_broadcast(&cond);
        sleep(1);
    }

    // 等待所有线程结束
    for (int i = 0; i < THREAD_COUNT; i++) 
    {
        threads[i].join();
    }
    return 0;
}

在这里插入图片描述
代码核心解析

  • 线程函数中,调用pthread_cond_wait前必须先持有互斥锁,符合使用规范;
  • pthread_cond_wait会自动释放锁,让其他线程有机会修改条件,唤醒后自动重新拿锁,保证了原子性;
  • pthread_cond_signal只会唤醒等待队列中的一个线程,而pthread_cond_broadcast会唤醒所有等待的线程,根据业务场景选择使用。

c++11版本

#include <iostream>
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <string>

int tickets = 1000;
std::mutex glock;
std::condition_variable gcond;

void grabTicket(const std::string& name) 
{
    std::unique_lock<std::mutex> lock(glock); // 自动管理.跟锁守卫不太一样, 感兴趣可以查查
    while (true) 
    {
        gcond.wait(lock);                     // 等待被唤醒,自动释放锁
        if (tickets > 0) 
        {
            usleep(1000);
            std::cout << name << " grabbed ticket, remaining: " << --tickets << std::endl;
            // 继续循环,lock 仍被持有,下一次 wait 会再次释放锁
        } 
        else 
        {
            break;                            // 票已售罄,退出线程
        }
    }
}

int main() 
{
    const int THREAD_COUNT = 4;
    std::thread threads[THREAD_COUNT];

    // 创建线程
    for (int i = 0; i < THREAD_COUNT; i++) 
    {
        std::string name = "Thread-" + std::to_string(i + 1);
        threads[i] = std::thread(grabTicket, name);
    }

    // 主线程每秒唤醒一个线程
    while (true)
    {
        std::cout << "wake up ..." << std::endl;
        gcond.notify_one();                   // 对应 pthread_cond_signal
        sleep(1);
    }

    // 以下代码永远不会执行(因为主线程无限循环),但保留原结构
    for (int i = 0; i < THREAD_COUNT; i++) 
    {
        threads[i].join();
    }
    return 0;
}

2.3 灵魂拷问:为什么 wait 必须传入互斥锁?(跟后面的BlcokQueue那块有关)

这是面试的高频考点,也是理解条件变量的核心,我们分两步拆解:
在这里插入图片描述

2.3.1 错误的设计:先解锁,再等待

很多同学会想:我先上锁判断条件不满足(其他线程拿不到了),先解锁,再等待不就行了?我们看这段错误代码:

// 错误示范!!!
pthread_mutex_lock(&mutex);
while (condition_is_false) 
{
    pthread_mutex_unlock(&mutex);
    // 【致命窗口】解锁之后、等待之前,条件可能已经满足,信号被彻底错过!
    pthread_cond_wait(&cond, &mutex); // 这里的mutex只是占位,无实际意义
    pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

致命问题:解锁和等待不是原子操作。在解锁之后、进入等待之前,其他线程可能已经修改了条件,发出了唤醒信号,但当前线程还没进入等待队列,这个信号就被永久错过了,线程会永远阻塞在 wait 调用中。

2.3.2 正确的原子性保证

pthread_cond_wait的核心设计,就是把 「释放互斥锁」和「进入等待状态」做成了原子操作

  • 线程调用 wait 时,会原子性地释放持有的互斥锁,让其他线程可以修改临界资源;
  • 当线程被唤醒时,会自动重新竞争互斥锁,只有竞争成功,才会从 wait 函数返回,继续访问临界资源。

这就彻底解决了信号丢失的问题,也保证了临界资源访问的安全性。

2.4 条件变量使用规范:为什么必须用 while 判断条件?(跟后面的BlcokQueue那块有关)

很多新手会用if判断条件是否满足,这是严重的错误,必须用 while 循环重新判断条件

// 正确规范:等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假) { // 绝对不能用if!!!
    pthread_cond_wait(cond, mutex);
}
// 条件满足,修改临界资源
pthread_mutex_unlock(&mutex);

// 正确规范:发送信号代码
pthread_mutex_lock(&mutex);
// 设置条件为真,修改临界资源
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

核心原因:伪唤醒问题

  • pthread_cond_wait可能在没有收到pthread_cond_signal/pthread_cond_broadcast的情况下异常返回(比如系统信号中断、多核 CPU 竞争、广播唤醒后条件被其他线程抢先修改),这就是伪唤醒
  • 如果用if判断,伪唤醒后线程会直接往下执行,此时条件依然不满足,就会访问非法的临界资源,造成程序崩溃;而用while循环,线程被唤醒后会重新判断条件,条件不满足就继续等待,彻底规避了伪唤醒的风险。

2.5 条件变量的 C++ 封装

我们把条件变量封装成一个通用的 Cond 类,配合之前封装的互斥锁使用,代码更简洁、更安全:要引入我们之前自己封装的 Mutex.hpp

  • Mutex.hpp
#ifndef MUTEX_HPP
#define MUTEX_HPP

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex()
    {
        pthread_mutex_init(&_lock, nullptr);
    }
    ~Mutex()
    {
        pthread_mutex_destroy(&_lock);
    }
    void Lock()
    {
        pthread_mutex_lock(&_lock);
    }
    void UnLock()
    {
        pthread_mutex_unlock(&_lock);
    }
    pthread_mutex_t* Origin()
    {
        return &_lock;
    }
private:
    pthread_mutex_t _lock;
};


class LockGuard
{
public:
    LockGuard(Mutex* lockptr) : _lockptr(lockptr)
    {
        _lockptr->Lock();
    }
    ~LockGuard()
    {
        _lockptr->UnLock();
    }
private:
    Mutex* _lockptr;
};
#endif
  • Cond.hpp
#ifndef COND_HPP
#define COND_HPP
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

class Cond 
{
public:
    Cond()
    {
        pthread_cond_init(&cond, nullptr);
    }
    void Wait(Mutex &mutex)
    {
        pthread_cond_wait(&cond, mutex.Origin());
    }
    void NotifyOne()
    {
        pthread_cond_signal(&cond);
    }
    void NotifyAll()
    {
        pthread_cond_broadcast(&cond);
    }
    ~Cond()
    {
        pthread_cond_destroy(&cond);
    }
private:
    pthread_cond_t cond;
};
#endif

封装设计要点

  • 不把互斥锁内置到 Cond 类中,避免耦合,因为一个互斥锁可以对应多个条件变量;
  • 封装了原生 API,屏蔽了底层细节,使用更简洁;
  • 利用 RAII 机制,构造时初始化,析构时销毁,避免资源泄漏。

在这里插入图片描述


三. 线程同步的经典范式:生产者消费者模型

生产者消费者模型是多线程编程中最经典、最常用的同步模型,也是面试的必考点,我们用「321 原则」就能彻底吃透它。

在这里插入图片描述

3.1 什么是生产者消费者模型?

我们用生活中的超市例子来理解:

  • 生产者:食品工厂,负责生产商品,不用关心谁来买、什么时候买;
  • 消费者:我们买东西的人,不用关心商品是谁生产的、怎么生产的;
  • 交易场所:超市,工厂把商品放到超市,我们从超市买商品。

对应到计算机领域:

  • 生产者线程:负责生产数据 / 任务,写入缓冲区;
  • 消费者线程:负责从缓冲区读取数据 / 任务,执行处理;
  • 交易场所:一块带同步机制的缓冲区(阻塞队列、环形队列等)。

在这里插入图片描述
在这里插入图片描述

3.2 核心记忆点:321 原则

这是面试时回答生产者消费者模型的万能框架,必须烂熟于心 (只是框架,给提供逻辑思路,大家可不要上去就直接321原则哈哈)

数字核心含义详细说明
3三种关系1. 生产者之间:互斥关系
2. 消费者之间:互斥关系
3. 生产者与消费者之间:互斥 + 同步关系
2两种角色生产者线程、消费者线程(两种角色可以有多个线程)
1一个交易场所线程间共享的缓冲区(核心是对这个缓冲区的互斥与同步控制)

3.3 该模型的三大核心优势

  • 解耦:生产者和消费者不直接通信,通过缓冲区交互,一方代码修改不会影响另一方,彻底解决了强耦合问题。
  • 支持并发(高效):生产者生产完数据直接放入缓冲区,不用等待消费者处理,继续生产;消费者同时从缓冲区取数据处理,生产和消费真正并行执行,最大化利用多核 CPU。
  • 支持忙闲不均:生产者短时间生产大量数据时,缓冲区可以暂存数据,消费者慢慢处理;生产者空闲时,缓冲区里的缓存数据也能让消费者持续有活干,避免了线程频繁阻塞唤醒。

在这里插入图片描述

3.4 基于阻塞队列 (BlockingQueue) 的模型实现

阻塞队列是实现生产者消费者模型最常用的数据结构,它的核心特性是:

  • 队列为空时,消费者取数据的操作会被阻塞,直到队列中有数据;
  • 队列满时,生产者放数据的操作会被阻塞,直到队列中有空闲位置。

我们基于 C++ 模板实现一个通用的阻塞队列,完整兼容单生产 / 单消费、多生产 / 多消费场景。然后后续我们再通过我们自己封装的互斥锁和条件变量来对这个代码进行改造

在这里插入图片描述

3.4.1 阻塞队列完整源码与逐行解析

我们这个阻塞队列是兼容单生产单消费和多生产多消费模型的,差别只是在于创建的线程而已:

  • 实际使用测试参考版
#ifndef BLOCKQUEUE_HPP
#define BLOCKQUEUE_HPP
#include <iostream>
#include <pthread.h>
#include <queue>

template <class T>
class BlockQueue 
{
public:
    BlockQueue(int cap = 5): _cap(cap), _consumer_sleep_cnt(0), _productor_sleep_cnt(0), _low_water(cap / 2), _high_water(cap / 3 * 2)
    {
        // 锁 && 两个条件变量 -- 初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond_consumer, nullptr);
        pthread_cond_init(&_cond_productor, nullptr);
    }
    void Push(const T& in)
    {
        // 生产者
        pthread_mutex_lock(&_mutex);
        // 1. 判断是否为满本质也是临界区
        // 2. 那么判断本身也一定是在加锁和解锁之间
        // 3. 那么判断结果的处理,也必然在临界区内部
        while(_queue.size() == _cap) // 选择while循环判断避免出现函数调用失败和伪唤醒的问题
        {
            // 4. 等待的本质, 核心操作就是把线程设挂起到cond的等待队列中
            // 结论:这个函数在底层会原子性的自动释放第二个参数对应的锁。不然我带着锁走,我的消费者永远申请不到锁,我也就不可能再被唤醒了,死锁了。
            // a.函数调用失败 b. 伪唤醒
            _productor_sleep_cnt++;
            pthread_cond_wait(&_cond_productor, &_mutex);
            _productor_sleep_cnt--;
            // 5. 当线程被唤醒时, 也必然是在临界区内部被唤醒
            // 结论:pthread_cond_wait对应的线程在被唤醒的时候, 自动重新申请锁,锁申请成功这个函数才会返回
        }
        _queue.push(in);
        // 满足条件去唤醒,但是我们为了方便实验观察就把这里先注释掉直接无脑唤醒
        // 生产者这里去唤醒消费者
        // if(_consumer_sleep_cnt > 0 && _queue.size() > _high_water)
        //     pthread_cond_signal(&_cond_consumer);
        pthread_cond_signal(&_cond_consumer); // 在解锁之前和之后都可以,因为都会去申请锁
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T* out)
    {
        // 消费者
        // 用的都是同一个锁
        pthread_mutex_lock(&_mutex);
        while(_queue.empty())
        {
            _consumer_sleep_cnt++;
            pthread_cond_wait(&_cond_consumer, &_mutex);
            _consumer_sleep_cnt--;
        }
        *out = _queue.front();
        _queue.pop();
        // 消费者这里去唤醒生产者
        // if(_productor_sleep_cnt > 0 && _queue.size() < _low_water)
        //     pthread_cond_signal(&_cond_productor);
        pthread_cond_signal(&_cond_productor);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        // 锁 && 两个条件变量 -- 销毁
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond_consumer);
        pthread_cond_destroy(&_cond_productor);
    }
private:
    int _cap; // 记录容量
    // 条件变量
    pthread_cond_t _cond_consumer; // empty
    pthread_cond_t _cond_productor; // full

    // 唤醒条件
    int _consumer_sleep_cnt;
    int _productor_sleep_cnt;

    // 高低水位
    unsigned int _low_water;
    unsigned int _high_water;

    std::queue<T> _queue;
    pthread_mutex_t _mutex;

};

#endif
  • 带注释版
// BlockQueue.hpp
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Lock.hpp"
#include "Cond.hpp"

template <typename T>
class BlockQueue
{
private:
    // 内部工具函数:判断队列是否满
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }

    // 内部工具函数:判断队列是否空
    bool IsEmpty()
    {
        return _block_queue.empty();
    }

public:
    // 构造函数:初始化队列容量、锁、条件变量
    BlockQueue(int cap) : _cap(cap)
    {
        _productor_wait_num = 0; // 生产者等待计数
        _consumer_wait_num = 0;  // 消费者等待计数
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr); // 生产者专属条件变量
        pthread_cond_init(&_consum_cond, nullptr);  // 消费者专属条件变量
    }

    // 生产者接口:数据入队
    void Enqueue(T &in)
    {
        // 1. 访问临界资源(队列)必须先加锁
        pthread_mutex_lock(&_mutex);
        
        // 2. 循环判断队列是否满,条件不满足则等待
        // 必须用while,防止伪唤醒
        while(IsFull())
        {
            _productor_wait_num++; // 记录等待的生产者数量
            // 队列满了,生产者等待,原子性释放锁
            pthread_cond_wait(&_product_cond, &_mutex);
            _productor_wait_num--; // 被唤醒后,等待计数减1
        }

        // 3. 条件满足,队列有空闲位置,执行生产
        _block_queue.push(in);
        std::cout << "生产数据:" << in << std::endl;

        // 4. 有消费者在等待,就唤醒消费者来消费
        if(_consumer_wait_num > 0)
            pthread_cond_signal(&_consum_cond);

        // 5. 解锁
        pthread_mutex_unlock(&_mutex);
    }

    // 消费者接口:数据出队
    void Pop(T *out)
    {
        // 1. 访问临界资源必须先加锁
        pthread_mutex_lock(&_mutex);

        // 2. 循环判断队列是否空,条件不满足则等待
        while(IsEmpty())
        {
            _consumer_wait_num++; // 记录等待的消费者数量
            // 队列空了,消费者等待,原子性释放锁
            pthread_cond_wait(&_consum_cond, &_mutex);
            _consumer_wait_num--; // 被唤醒后,等待计数减1
        }

        // 3. 条件满足,队列有数据,执行消费
        *out = _block_queue.front();
        _block_queue.pop();
        std::cout << "消费数据:" << *out << std::endl;

        // 4. 有生产者在等待,就唤醒生产者来生产
        if(_productor_wait_num > 0)
            pthread_cond_signal(&_product_cond);

        // 5. 解锁
        pthread_mutex_unlock(&_mutex);
    }

    // 析构函数:销毁锁和条件变量
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consum_cond);
    }

private:
    std::queue<T> _block_queue;    // 底层队列,临界资源
    int _cap;                       // 队列最大容量
    pthread_mutex_t _mutex;         // 保护队列的互斥锁
    pthread_cond_t _product_cond;   // 生产者条件变量:队列满了等待
    pthread_cond_t _consum_cond;    // 消费者条件变量:队列空了等待
    int _productor_wait_num;        // 等待中的生产者数量
    int _consumer_wait_num;         // 等待中的消费者数量
};
#endif

核心设计解析

  1. 双条件变量设计:生产者和消费者各用一个条件变量,避免了广播唤醒所有线程造成的无效竞争,性能更高。
  2. 等待计数优化:只有当确实有线程在等待时,才发送唤醒信号,避免了无效的系统调用。
  3. 模板化设计:支持任意类型的数据,不仅可以放 int 等基础类型,还可以放函数对象、任务类等,实现任务的生产消费。
  4. 完全兼容多生产多消费:因为队列的访问被互斥锁完全保护,不管多少个生产者、消费者线程,都能安全访问,代码无需任何修改。

3.4.2 单生产单消费 Demo

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "BlockQueue.hpp"

// 消费 - 处理int类型
void* Consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        // sleep(1);
        int data = 0;
        bq->Pop(&data);
        std::cout << "消费了数据: " << data << std::endl;
    }
    // return nullptr;
}

// 生产 - 处理int类型
void* Productor(void* args)
{
     BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
     int data = 10;
    while(true)
    {
        sleep(1);
        bq->Push(data);
        std::cout << "生产了数据: " << data << std::endl;
        ++data;
    }
    // return nullptr;
}

// 测试单生产者单消费者模型 - int类型
int main()
{
    // 1. 创建出一个交易场所
    BlockQueue<int> *bq = new BlockQueue<int>();
    // 2. 创建出两个角色
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);
    pthread_create(&p, nullptr, Productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    return 0;
}

运行效果:生产者每秒生产一个数据,消费者同步消费,队列空时消费者阻塞,队列满时生产者阻塞,完美实现了同步与互斥。
在这里插入图片描述

3.4.3 多生产多消费 Demo

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "BlockQueue.hpp"
#include "Task.hpp"

// 消费 - 处理int类型
void* Consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        // sleep(1);
        int data = 0;
        bq->Pop(&data);
        std::cout << "消费了数据: " << data << std::endl;
    }
    // return nullptr;
}

// 生产 - 处理int类型
void* Productor(void* args)
{
     BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
     int data = 10;
    while(true)
    {
        sleep(1);
        bq->Push(data);
        std::cout << "生产了数据: " << data << std::endl;
        ++data;
    }
    // return nullptr;
}

// 测试多生产者多消费者模型 - int类型
int main()
{
    // 定义线程数量
    const int PRODUCER_COUNT = 2; // 2个生产者
    const int CONSUMER_COUNT = 3; // 3个消费者
    
    // 1. 创建出一个交易场所
    BlockQueue<int> *bq = new BlockQueue<int>();
    
    // 2. 创建出多个角色
    pthread_t producers[PRODUCER_COUNT];
    pthread_t consumers[CONSUMER_COUNT];
    
    // 创建生产者线程
    for (int i = 0; i < PRODUCER_COUNT; ++i) {
        pthread_create(&producers[i], nullptr, Productor, bq);
    }
    
    // 创建消费者线程
    for (int i = 0; i < CONSUMER_COUNT; ++i) {
        pthread_create(&consumers[i], nullptr, Consumer, bq);
    }

    // 等待所有线程结束
    for (int i = 0; i < PRODUCER_COUNT; ++i) {
        pthread_join(producers[i], nullptr);
    }
    for (int i = 0; i < CONSUMER_COUNT; ++i) {
        pthread_join(consumers[i], nullptr);
    }
    
    delete bq;
    return 0;
}
  • 测试结果会有点乱这里就不放了

3.4.4 传任务类 Demo

  • Task.hpp
#ifndef TASK_HPP
#define TASK_HPP

#include <iostream>
#include <functional>
#include <string>

class Task
{
public:
    using func_t = std::function<int(int, int, char)>;

    Task() : _a(0), _b(0), _op('+'), _result(0) {}
    Task(int a, int b, char op) : _a(a), _b(b), _op(op), _result(0) {}

    void Run()
    {
        switch (_op)
        {
        case '+': _result = _a + _b; break;
        case '-': _result = _a - _b; break;
        case '*': _result = _a * _b; break;
        case '/': _result = _b != 0 ? _a / _b : 0; break;
        default: _result = 0; break;
        }
    }
    std::string GetResult()
    {
        return std::to_string(_result);
    }
    std::string GetTaskRes()
    {
        Run();
        return std::to_string(_a) + " " + std::string(1, _op) + " " + 
               std::to_string(_b) + " = ";
    }

    int GetResult() const { return _result; }

private:
    int _a;
    int _b;
    char _op;
    int _result;
};

#endif

  • main.cc
// 消费 - 处理Task类型
void* Consumer(void* args)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    while(true)
    {
        // sleep(1);
        Task task;
        bq->Pop(&task);
        std::cout << "消费了任务: " << task.GetTaskRes() << task.GetResult() << std::endl;
    }
    // return nullptr;
}

// 生产 - 处理Task类型
void* Productor(void* args)
{
     BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
    while(true)
    {
        sleep(1);
        // 生成随机任务
        int a = rand() % 100;
        int b = rand() % 100;
        char op = "+-*/"[rand() % 4];
        Task task(a, b, op);
        bq->Push(task);
        std::cout << "生产了任务: " << task.GetTaskRes() << "?" << std::endl;
    }
    // return nullptr;
}

// 测试Task任务的阻塞队列
int main()
{
    // 初始化随机数种子
    srand(time(nullptr));
    
    // 1. 创建出一个交易场所
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    // 2. 创建出两个角色
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);
    pthread_create(&p, nullptr, Productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    return 0;
}

在这里插入图片描述

3.5 BlockQueue_v2:我们用自己封装的条件变量和互斥锁替换一下

我们就用上面封装过的Mutex.hppCond.hpp来替换下,再来个BlockQueue.hpp

#ifndef BLOCKQUEUE_HPP
#define BLOCKQUEUE_HPP
#include <iostream>
#include <pthread.h>
#include <queue>

template <class T>
class BlockQueue 
{
public:
    BlockQueue(int cap = 5): _cap(cap), _consumer_sleep_cnt(0), _productor_sleep_cnt(0), _low_water(cap / 2), _high_water(cap / 3 * 2)
    {
        // 锁 && 两个条件变量 -- 初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond_consumer, nullptr);
        pthread_cond_init(&_cond_productor, nullptr);
    }
    void Push(const T& in)
    {
        // 生产者
        pthread_mutex_lock(&_mutex);
        // 1. 判断是否为满本质也是临界区
        // 2. 那么判断本身也一定是在加锁和解锁之间
        // 3. 那么判断结果的处理,也必然在临界区内部
        while(_queue.size() == _cap) // 选择while循环判断避免出现函数调用失败和伪唤醒的问题
        {
            // 4. 等待的本质, 核心操作就是把线程设挂起到cond的等待队列中
            // 结论:这个函数在底层会原子性的自动释放第二个参数对应的锁。不然我带着锁走,我的消费者永远申请不到锁,我也就不可能再被唤醒了,死锁了。
            // a.函数调用失败 b. 伪唤醒
            _productor_sleep_cnt++;
            pthread_cond_wait(&_cond_productor, &_mutex);
            _productor_sleep_cnt--;
            // 5. 当线程被唤醒时, 也必然是在临界区内部被唤醒
            // 结论:pthread_cond_wait对应的线程在被唤醒的时候, 自动重新申请锁,锁申请成功这个函数才会返回
        }
        _queue.push(in);
        // 满足条件去唤醒,但是我们为了方便实验观察就把这里先注释掉直接无脑唤醒
        // 生产者这里去唤醒消费者
        // if(_consumer_sleep_cnt > 0 && _queue.size() > _high_water)
        //     pthread_cond_signal(&_cond_consumer);
        pthread_cond_signal(&_cond_consumer); // 在解锁之前和之后都可以,因为都会去申请锁
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T* out)
    {
        // 消费者
        // 用的都是同一个锁
        pthread_mutex_lock(&_mutex);
        while(_queue.empty())
        {
            _consumer_sleep_cnt++;
            pthread_cond_wait(&_cond_consumer, &_mutex);
            _consumer_sleep_cnt--;
        }
        *out = _queue.front();
        _queue.pop();
        // 消费者这里去唤醒生产者
        // if(_productor_sleep_cnt > 0 && _queue.size() < _low_water)
        //     pthread_cond_signal(&_cond_productor);
        pthread_cond_signal(&_cond_productor);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        // 锁 && 两个条件变量 -- 销毁
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond_consumer);
        pthread_cond_destroy(&_cond_productor);
    }
private:
    int _cap; // 记录容量
    // 条件变量
    pthread_cond_t _cond_consumer; // empty
    pthread_cond_t _cond_productor; // full

    // 唤醒条件
    int _consumer_sleep_cnt;
    int _productor_sleep_cnt;

    // 高低水位
    unsigned int _low_water;
    unsigned int _high_water;

    std::queue<T> _queue;
    pthread_mutex_t _mutex;

};

#endif
  • 测试代码的话跟v1版本的是一样的其实,结果也都是差不多,这里就不反复演示了。

四. 更高效的同步工具:POSIX 信号量(下篇博客中更详细讲解)

4.1 信号量是什么?

信号量本质是一个原子操作的资源计数器,用来衡量临界资源的可用数量,实现对临界资源的预订机制

  • 信号量的值 > 0:表示当前有可用的临界资源;
  • 信号量的值 = 0:表示没有可用资源,线程会进入阻塞等待。

它提供了两个核心原子操作:

  • P 操作:等待资源,将信号量的值减 1,如果值 < 0,线程阻塞;
  • V 操作:释放资源,将信号量的值加 1,如果有线程在等待,唤醒其中一个。

互斥锁本质就是二元信号量(初始值为 1),同一时间只允许一个线程访问资源;而信号量可以实现多线程同时访问多个临界资源,更灵活。

在这里插入图片描述
在这里插入图片描述

4.2 信号量核心 API

信号量的接口在<semaphore.h>头文件中,编译时需要链接-lpthread库:

// 1. 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 参数:
// sem:要初始化的信号量指针
// pshared:0表示线程间共享,非0表示进程间共享
// value:信号量初始值(可用资源数量)

// 2. 销毁信号量
int sem_destroy(sem_t *sem);

// 3. P操作:等待资源,信号量-1
int sem_wait(sem_t *sem);

// 4. V操作:释放资源,信号量+1
int sem_post(sem_t *sem);

4.3 信号量的简单封装

// Sem.hpp
#pragma once
#include <semaphore.h>

class Sem
{
public:
    // 构造函数:初始化信号量
    Sem(int n)
    {
        sem_init(&_sem, 0, n);
    }

    // P操作:等待资源
    void P()
    {
        sem_wait(&_sem);
    }

    // V操作:释放资源
    void V()
    {
        sem_post(&_sem);
    }

    // 析构函数:销毁信号量
    ~Sem()
    {
        sem_destroy(&_sem);
    }

private:
    sem_t _sem;
};

五. 基于环形队列 + 信号量的生产者消费者模型(下篇博客中更详细讲解)

基于阻塞队列的模型用一把锁保护了整个队列,生产和消费无法真正并行;而基于环形队列 + 信号量的模型,能让生产和消费在不同位置并行执行,性能更高。

在这里插入图片描述

5.1 环形队列的核心原理

我们用数组模拟环形队列,通过模运算实现环形特性,核心规则:

  • productor_step记录生产者的写入位置,consumer_step记录消费者的读取位置;
  • head == tail时,队列要么为空,要么为满(判断需要区分);
  • 生产者不能超过消费者一圈,否则会覆盖未消费的数据;
  • 消费者不能超过生产者,否则会读取到无效数据。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


结尾:

🍓 我是草莓熊 Lotso!若这篇技术干货帮你打通了学习中的卡点:
👀 【关注】跟我一起深耕技术领域,从基础到进阶,见证每一次成长
❤️ 【点赞】让优质内容被更多人看见,让知识传递更有力量
⭐ 【收藏】把核心知识点、实战技巧存好,需要时直接查、随时用
💬 【评论】分享你的经验或疑问(比如曾踩过的技术坑?),一起交流避坑
🗳️ 【投票】用你的选择助力社区内容方向,告诉大家哪个技术点最该重点拆解
技术之路难免有困惑,但同行的人会让前进更有方向~愿我们都能在自己专注的领域里,一步步靠近心中的技术目标!

结语:本文从互斥锁的局限性出发,完整讲解了 Linux 线程同步的核心知识:线程同步的核心是在安全的前提下,让多线程有序协同,避免饥饿,提升效率;条件变量是实现同步的核心工具,重点掌握 wait 函数的原子性、while 循环防伪唤醒的使用规范;生产者消费者模型是同步的经典范式,用 321 原则就能彻底吃透,阻塞队列实现简单易用,环形队列 + 信号量实现并发性能更高;信号量本质是资源计数器,既能实现互斥,也能实现同步,比条件变量更灵活。线程同步是 Linux 多线程编程的核心,也是后端开发面试的重中之重。掌握了这些知识,你就能写出既安全又高效的多线程代码,不管是业务开发还是面试,都能游刃有余。如果本文对你有帮助,欢迎点赞、收藏、关注,后续会继续分享 Linux 系统编程、C++ 后端开发的更多硬核内容。

✨把这些内容吃透超牛的!放松下吧✨
ʕ˘ᴥ˘ʔ
づきらど

在这里插入图片描述

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/2503_91389547/article/details/160089883

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--