[C++]sylar高性能服务器框架学习记录:线程模块

记录一下最近这学期学习的sylar服务器框架项目,输出整理一下项目的结构,用到的知识和自己的体会

项目仓库地址

https://github.com/sylar-yin/sylar/

整理博客过程中参考的大佬资料链接:

============================================

基础介绍

该模块提供了线程类以及多种锁,信号量类,基于pthread等linux系统api封装

主要由下面几个类构成

  • Semaphore 信号量类
  • Mutex 互斥锁类
  • RWMutex 读写锁类
  • SpinLock 自旋锁类
  • CASLock 原子锁类
  • ScopedLockImpl 等RAII机制实现类
  • Thread 线程类

Semaphore 信号量类

是对POSIX标准的信号量的封装,提供了构造,析构以及pv操作接口

信号量是一个计数器,用于实现进程之间的互斥和同步,操作分为两种:

  • P操作(这里的wait):如果值大于0,就给值-1;如果值为0,就阻塞进程
  • V操作(这里的notify):如果有其他进程因为等待自己而被阻塞,通知大家恢复运行;如果没有因为自己被阻塞的进程,就给值+1

PV操作均为原子操作(保证不会被调度策略打断)

// 信号量类
class Semaphore : Noncopyable {
public:
    // 构造函数,初始化信号量值为count(不与其他进程共享)
    Semaphore(uint32_t count = 0);
    // 析构函数,调用sem_destory
    ~Semaphore();

    // 获取信号量(wait)
    void wait();
    // 释放信号量(post)
    void notify();
private:
    // 信号量
    sem_t m_semaphore;
};

下面是具体实现

Semaphore::Semaphore(uint32_t count) {
    if(sem_init(&m_semaphore, 0, count)) { // sem_init返回非0表示失败
        throw std::logic_error("sem_init error");
    }
}

Semaphore::~Semaphore() {
    sem_destroy(&m_semaphore);
}

void Semaphore::wait() {
    if(sem_wait(&m_semaphore)) {
        throw std::logic_error("sem_wait error");
    }
}

void Semaphore::notify() {
    if(sem_post(&m_semaphore)) {
        throw std::logic_error("sem_post error");
    }
}

可以通过使用信号量来编写并发代码,但是比较吃操作,所以sylar也封装了各种锁

Mutex 互斥锁类

可以实现最简单的加锁/解锁效果

// 互斥锁类
class Mutex : Noncopyable {
public:
    // 使用RAII机制自动管理的互斥锁的别名
    typedef ScopedLockImpl
<Mutex> Lock;
    // 构造函数
    Mutex() {
        pthread_mutex_init(&m_mutex, nullptr);
    }
    // 析构函数
    ~Mutex() {
        pthread_mutex_destroy(&m_mutex);
    }
    // 获取锁
    void lock() {
        pthread_mutex_lock(&m_mutex);
    }
    // 释放锁
    void unlock() {
        pthread_mutex_unlock(&m_mutex);
    }
private:
    // 互斥锁
    pthread_mutex_t m_mutex;
};

RWMutex 读写锁类

与普通互斥锁相比,允许多个线程同时读,但只允许一个线程写

// 读写锁类
class RWMutex : Noncopyable {
public:
    // 使用RAII机制自动管理的ReadLock的别名
    typedef ReadScopedLockImpl
<RWMutex> ReadLock;
    // 使用RAII机制自动管理的WriteLock的别名
    typedef WriteScopedLockImpl
<RWMutex> WriteLock;

    // 构造函数,初始化读写锁
    RWMutex() {
        pthread_rwlock_init(&m_lock, nullptr);
    }
    // 析构函数,销毁读写锁
    ~RWMutex() {
        pthread_rwlock_destroy(&m_lock);
    }
    // 获取读锁
    void rdlock() {
        pthread_rwlock_rdlock(&m_lock);
    }
    // 获取写锁
    void wrlock() {
        pthread_rwlock_wrlock(&m_lock);
    }
    // 释放锁
    void unlock() {
        pthread_rwlock_unlock(&m_lock);
    }
private:
    // 读写锁
    pthread_rwlock_t m_lock;
};

SpinLock 自旋锁类

使用忙等(busy wait)机制而不是直接让线程休眠,避免线程进入/退出睡眠状态的开销,但是会浪费一些CPU性能

// 自旋锁类(简单封装)
class Spinlock : Noncopyable { 
public: 
    typedef ScopedLockImpl
<Spinlock> Lock;
    Spinlock() {
        pthread_spin_init(&m_mutex, 0);
    }
    ~Spinlock() {
        pthread_spin_destroy(&m_mutex);
    }
    void lock() {
        pthread_spin_lock(&m_mutex);
    }
    void unlock() {
        pthread_spin_unlock(&m_mutex);
    }
private:
    // 自旋锁
    pthread_spinlock_t m_mutex;
};

CASLock 原子锁类

使用了C++标准库提供的原子操作,通过维护一个原子bool变量来支持线程之间的共享资源管理

// 原子锁类(check and set)
class CASLock : Noncopyable {
public:
    typedef ScopedLockImpl
<CASLock> Lock;
    CASLock() {
        m_mutex.clear();
    }
    ~CASLock() {

    }
    void lock() {
        while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
    }
    void unlock() {
        std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
    }
private:
    // 原子布尔类型
    volatile std::atomic_flag m_mutex;
};

ScopedLockImpl 等RAII机制实现类

直接手动管理锁的获取/释放是十分愚蠢以及不安全的,我们使用C++的RAII机制来封装一遍以上的锁,这样随着这些封装类的构造/析构,就可以自动实现加锁/解锁,C++的标准库中也有类似的实现(std::lock_guard)

为此创建了三种模板类,分别用于普通锁,读写锁上写锁和读写锁上读锁,同时依旧保留了手动操作接口

// 作用域类RAII机制自动锁管理类(用于普通锁)
template <class T>
class ScopedLockImpl{
public:
    // 构造函数,自动获取锁
    ScopedLockImpl(T& mutex)    // 必须按引用传递,否则不是一个锁毫无意义
        : m_mutex(mutex) {
            m_mutex.lock();
            m_locked = true;
    }
    // 析构函数,自动释放锁
    ~ScopedLockImpl() {
        unlock();
    }
    // 手动获取锁
    void lock() {
        if(!m_locked) {
            m_mutex.lock();
            m_locked = true;
        }
    }
    // 手动释放锁
    void unlock() {
        if(m_locked) {
            m_mutex.unlock();
            m_locked = false;
        }
    }

private:
    T& m_mutex;     // 互斥量,必须使用引用传递
    bool m_locked;
};

// 作用域类RAII机制自动锁管理类(用于读锁)
template <class T>
class ReadScopedLockImpl{
public:
    ReadScopedLockImpl(T& mutex)
        : m_mutex(mutex) {
            m_mutex.rdlock();
            m_locked = true;
    }

    ~ReadScopedLockImpl() {
        unlock();
    }

    void lock() {
        if(!m_locked) {
            m_mutex.rdlock();
            m_locked = true;
        }
    }

    void unlock() {
        if(m_locked) {
            m_mutex.unlock();
            m_locked = false;
        }
    }

private:
    T& m_mutex;
    bool m_locked;
};

// 作用域类RAII机制自动锁管理类(用于写锁)
template <class T>
class WriteScopedLockImpl{
public:
    WriteScopedLockImpl(T& mutex)
        : m_mutex(mutex) {
            m_mutex.wrlock();
            m_locked = true;
    }

    ~WriteScopedLockImpl() {
        unlock();
    }

    void lock() {
        if(!m_locked) {
            m_mutex.wrlock();
            m_locked = true;
        }
    }

    void unlock() {
        if(m_locked) {
            m_mutex.unlock();
            m_locked = false;
        }
    }

private:
    T& m_mutex;
    bool m_locked;
};

在之前的各种锁类的名称空间中我们已经定义了这些包装类的别名,所以以后调用锁可以这样写:

{
    sylar::Mutex::Lock lock(s_mutex);
    // 这里就是临界区了
}

Thread 线程类

自己封装了pthread,提供了线程创建/管理等接口

成员变量存储了:

  • 线程ID(使用syscall(SYS_gettid)来获取系统唯一线程ID)
  • 线程本体(pthread_t)
  • void()类型的线程执行函数(使用std::function包装)
  • 线程名称
  • 一个信号量
// 线程类
class Thread {
public:
    typedef std::shared_ptr
<Thread> ptr;
    // 构造函数,传入线程执行函数,线程名称,创造线程并wait直到线程初始化完毕
    Thread(std::function<void()> cb, const std::string& name);
    // 析构函数,将线程detach,线程结束后资源会自动回收
    ~Thread();

    // 返回Thread对象的线程ID
    pid_t getId() const { return m_id; }
    // 返回Thread对象的线程名称
    const std::string& getName() const { return m_name; }

    // join方法(阻塞等待直到线程执行完成)
    void join();

    // 返回当前运行的线程的指针
    static Thread* GetThis();
    // 返回当前运行的线程的名称
    static const std::string& GetName();
    // 设置当前运行线程的名称
    static void SetName(const std::string& name); 
private: // 禁止默认拷贝
    Thread(const Thread&) = delete;
    Thread(const Thread&&) = delete;
    Thread& operator=(const Thread&) = delete;

    // 内部实现中真正传入pthread_create的函数(内部再回调m_cb())
    static void* run(void* arg); 
private:
    pid_t m_id = -1;                // 线程ID
    pthread_t m_thread = 0;         // 线程结构
    std::function<void()> m_cb;     // 线程执行函数,void()类型(传参使用lambda或std::bind)
    std::string m_name;             // 线程名称

    Semaphore m_semaphore;          // 信号量
};

同时在thread.cpp实现文件中创建两个静态线程局部变量

// 静态线程本地变量,储存指向当前线程的指针
static thread_local Thread* t_thread = nullptr;
// 静态线程本地变量,储存当前线程名称
static thread_local std::string t_thread_name = "UNKNOW";

以下是方法实现,利用了信号量等待机制来保证资源初始化:

Thread* Thread::GetThis() {
    return t_thread;
}
const std::string& Thread::GetName() {
    return t_thread_name;    
}

void Thread::SetName(const std::string &name) {
    if(name.empty()) {
        return;
    }
    if(t_thread) {
        t_thread->m_name = name;
    }
    t_thread_name = name;
}

Thread::Thread(std::function<void()> cb, const std::string& name) 
        : m_cb(cb)
        , m_name(name) {
    if(name.empty()) {
        m_name = "UNKNOWN";     
    }
    // 创建线程,传入run作为线程运行函数,并将this指针作为run的参数传入
    int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
    if(rt) {
        SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt =" << rt
            << " name=" << name;
        throw std::logic_error("pthread_create error");
    }
    // 阻塞等待直到线程初始化完毕,线程函数跑起来,构造函数才返回
    m_semaphore.wait();
}

Thread::~Thread() {
    if(m_thread) {
        pthread_detach(m_thread);
    }
}

void Thread::join() {
    if(m_thread) {
        int rt = pthread_join(m_thread, nullptr);
        if(rt) {
            SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt =" << rt
                << " name=" << m_name;
            throw std::logic_error("pthread_join error");
        }
        // 句柄置空,防止重复join
        m_thread = 0;
    }
}

void *Thread::run(void *arg) {
    // 取出参数,设置线程信息
    Thread* thread = (Thread*)arg;
    t_thread = thread;
    t_thread_name = thread->m_name;
    thread->m_id = sylar::GetThreadId();
    // 设置名称
    pthread_setname_np(pthread_self(), thread->m_name.substr(0,15).c_str());

    // 取出m_cb,使用std::function的swap方法交换所有权,高效安全
    std::function<void()> cb;
    cb.swap(thread->m_cb); 

    // 通知构造函数结束
    thread->m_semaphore.notify();

    cb();
    return 0;
}

Noncopyable 工具类

锁和线程实例自然不能复制,会引发资源管理紊乱,通过继承这个工具类来禁用拷贝,禁用赋值

#ifndef __SYLAR_NONCOPYABLE_H__
#define __SYLAR_NONCOPYABLE_H__

namespace sylar {

class Noncopyable {
public:
    Noncopyable() = default;
    ~Noncopyable() = default;
    Noncopyable(const Noncopyable&) = delete;
    Noncopyable& operator=(const Noncopyable&) = delete;
};

}

#endif // __SYLAR_NONCOPYABLE_H__

测试

#include "sylar/sylar.h"
#include <unistd.h>

sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();

volatile int count = 0; // volatile阻止编译器优化
sylar::Mutex s_mutex;

void func1() {
    SYLAR_LOG_INFO(g_logger) << "name: " << sylar::Thread::GetName()
                             << " this.name: " << sylar::Thread::GetThis()->getName()
                             << " id: " << sylar::GetThreadId()
                             << " this.id: " << sylar::Thread::GetThis()->getId();
    for(int i = 0; i < 100000; i++) {
        sylar::Mutex::Lock lock(s_mutex);
        ++count;
    }
}

int main(int argc, char** argv) {
    SYLAR_LOG_INFO(g_logger) << "thread test begin";

    std::vector<sylar::Thread::ptr> thrs;
    for(int i = 0; i < 3; i++) {
        sylar::Thread::ptr thr(new sylar::Thread(&func1, "name_" + std::to_string(i)));
        thrs.push_back(thr);
    }

    for(size_t i = 0; i < thrs.size(); i++) {
        thrs[i]->join();
    }
    SYLAR_LOG_INFO(g_logger) << "thread test end";
    SYLAR_LOG_INFO(g_logger) << "count= " << count;

    return 0;
}

可以看到测试结果符合预期

image-20260206175845830

总结

通过封装实现了一套可用性高的线程并发编程工具,并与日志模块i整合,实现了线程安全

评论

  1. Sankkooos
    Android Firefox 134.0
    19 小时前
    2026-2-07 0:40:01

    ദ്ദി˶•̀֊•́)✧

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇