记录一下最近这学期学习的sylar服务器框架项目,输出整理一下项目的结构,用到的知识和自己的体会
项目仓库地址
https://github.com/sylar-yin/sylar/
整理博客过程中参考的大佬资料链接:
============================================
文章目录
基础介绍
该模块提供了线程类以及多种锁,信号量类,基于pthread等linux系统api封装
主要由下面几个类构成
- Semaphore 信号量类
- Mutex 互斥锁类
- RWMutex 读写锁类
- SpinLock 自旋锁类
- CASLock 原子锁类
- ScopedLockImpl 等RAII机制实现类
- Thread 线程类
Semaphore 信号量类
是对POSIX标准的信号量的封装,提供了构造,析构以及pv操作接口
信号量是一个计数器,用于实现进程之间的互斥和同步,操作分为两种:
- P操作(这里的wait):如果值大于0,就给值-1;如果值为0,就阻塞进程
- V操作(这里的notify):如果有其他进程因为等待自己而被阻塞,通知大家恢复运行;如果没有因为自己被阻塞的进程,就给值+1
PV操作均为原子操作(保证不会被调度策略打断)
// 信号量类
class Semaphore : Noncopyable {
public:
// 构造函数,初始化信号量值为count(不与其他进程共享)
Semaphore(uint32_t count = 0);
// 析构函数,调用sem_destory
~Semaphore();
// 获取信号量(wait)
void wait();
// 释放信号量(post)
void notify();
private:
// 信号量
sem_t m_semaphore;
};
下面是具体实现
Semaphore::Semaphore(uint32_t count) {
if(sem_init(&m_semaphore, 0, count)) { // sem_init返回非0表示失败
throw std::logic_error("sem_init error");
}
}
Semaphore::~Semaphore() {
sem_destroy(&m_semaphore);
}
void Semaphore::wait() {
if(sem_wait(&m_semaphore)) {
throw std::logic_error("sem_wait error");
}
}
void Semaphore::notify() {
if(sem_post(&m_semaphore)) {
throw std::logic_error("sem_post error");
}
}
可以通过使用信号量来编写并发代码,但是比较吃操作,所以sylar也封装了各种锁
Mutex 互斥锁类
可以实现最简单的加锁/解锁效果
// 互斥锁类
class Mutex : Noncopyable {
public:
// 使用RAII机制自动管理的互斥锁的别名
typedef ScopedLockImpl
<Mutex> Lock;
// 构造函数
Mutex() {
pthread_mutex_init(&m_mutex, nullptr);
}
// 析构函数
~Mutex() {
pthread_mutex_destroy(&m_mutex);
}
// 获取锁
void lock() {
pthread_mutex_lock(&m_mutex);
}
// 释放锁
void unlock() {
pthread_mutex_unlock(&m_mutex);
}
private:
// 互斥锁
pthread_mutex_t m_mutex;
};
RWMutex 读写锁类
与普通互斥锁相比,允许多个线程同时读,但只允许一个线程写
// 读写锁类
class RWMutex : Noncopyable {
public:
// 使用RAII机制自动管理的ReadLock的别名
typedef ReadScopedLockImpl
<RWMutex> ReadLock;
// 使用RAII机制自动管理的WriteLock的别名
typedef WriteScopedLockImpl
<RWMutex> WriteLock;
// 构造函数,初始化读写锁
RWMutex() {
pthread_rwlock_init(&m_lock, nullptr);
}
// 析构函数,销毁读写锁
~RWMutex() {
pthread_rwlock_destroy(&m_lock);
}
// 获取读锁
void rdlock() {
pthread_rwlock_rdlock(&m_lock);
}
// 获取写锁
void wrlock() {
pthread_rwlock_wrlock(&m_lock);
}
// 释放锁
void unlock() {
pthread_rwlock_unlock(&m_lock);
}
private:
// 读写锁
pthread_rwlock_t m_lock;
};
SpinLock 自旋锁类
使用忙等(busy wait)机制而不是直接让线程休眠,避免线程进入/退出睡眠状态的开销,但是会浪费一些CPU性能
// 自旋锁类(简单封装)
class Spinlock : Noncopyable {
public:
typedef ScopedLockImpl
<Spinlock> Lock;
Spinlock() {
pthread_spin_init(&m_mutex, 0);
}
~Spinlock() {
pthread_spin_destroy(&m_mutex);
}
void lock() {
pthread_spin_lock(&m_mutex);
}
void unlock() {
pthread_spin_unlock(&m_mutex);
}
private:
// 自旋锁
pthread_spinlock_t m_mutex;
};
CASLock 原子锁类
使用了C++标准库提供的原子操作,通过维护一个原子bool变量来支持线程之间的共享资源管理
// 原子锁类(check and set)
class CASLock : Noncopyable {
public:
typedef ScopedLockImpl
<CASLock> Lock;
CASLock() {
m_mutex.clear();
}
~CASLock() {
}
void lock() {
while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
}
void unlock() {
std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
}
private:
// 原子布尔类型
volatile std::atomic_flag m_mutex;
};
ScopedLockImpl 等RAII机制实现类
直接手动管理锁的获取/释放是十分愚蠢以及不安全的,我们使用C++的RAII机制来封装一遍以上的锁,这样随着这些封装类的构造/析构,就可以自动实现加锁/解锁,C++的标准库中也有类似的实现(std::lock_guard)
为此创建了三种模板类,分别用于普通锁,读写锁上写锁和读写锁上读锁,同时依旧保留了手动操作接口
// 作用域类RAII机制自动锁管理类(用于普通锁)
template <class T>
class ScopedLockImpl{
public:
// 构造函数,自动获取锁
ScopedLockImpl(T& mutex) // 必须按引用传递,否则不是一个锁毫无意义
: m_mutex(mutex) {
m_mutex.lock();
m_locked = true;
}
// 析构函数,自动释放锁
~ScopedLockImpl() {
unlock();
}
// 手动获取锁
void lock() {
if(!m_locked) {
m_mutex.lock();
m_locked = true;
}
}
// 手动释放锁
void unlock() {
if(m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex; // 互斥量,必须使用引用传递
bool m_locked;
};
// 作用域类RAII机制自动锁管理类(用于读锁)
template <class T>
class ReadScopedLockImpl{
public:
ReadScopedLockImpl(T& mutex)
: m_mutex(mutex) {
m_mutex.rdlock();
m_locked = true;
}
~ReadScopedLockImpl() {
unlock();
}
void lock() {
if(!m_locked) {
m_mutex.rdlock();
m_locked = true;
}
}
void unlock() {
if(m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex;
bool m_locked;
};
// 作用域类RAII机制自动锁管理类(用于写锁)
template <class T>
class WriteScopedLockImpl{
public:
WriteScopedLockImpl(T& mutex)
: m_mutex(mutex) {
m_mutex.wrlock();
m_locked = true;
}
~WriteScopedLockImpl() {
unlock();
}
void lock() {
if(!m_locked) {
m_mutex.wrlock();
m_locked = true;
}
}
void unlock() {
if(m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex;
bool m_locked;
};
在之前的各种锁类的名称空间中我们已经定义了这些包装类的别名,所以以后调用锁可以这样写:
{
sylar::Mutex::Lock lock(s_mutex);
// 这里就是临界区了
}
Thread 线程类
自己封装了pthread,提供了线程创建/管理等接口
成员变量存储了:
- 线程ID(使用syscall(SYS_gettid)来获取系统唯一线程ID)
- 线程本体(pthread_t)
- void()类型的线程执行函数(使用std::function包装)
- 线程名称
- 一个信号量
// 线程类
class Thread {
public:
typedef std::shared_ptr
<Thread> ptr;
// 构造函数,传入线程执行函数,线程名称,创造线程并wait直到线程初始化完毕
Thread(std::function<void()> cb, const std::string& name);
// 析构函数,将线程detach,线程结束后资源会自动回收
~Thread();
// 返回Thread对象的线程ID
pid_t getId() const { return m_id; }
// 返回Thread对象的线程名称
const std::string& getName() const { return m_name; }
// join方法(阻塞等待直到线程执行完成)
void join();
// 返回当前运行的线程的指针
static Thread* GetThis();
// 返回当前运行的线程的名称
static const std::string& GetName();
// 设置当前运行线程的名称
static void SetName(const std::string& name);
private: // 禁止默认拷贝
Thread(const Thread&) = delete;
Thread(const Thread&&) = delete;
Thread& operator=(const Thread&) = delete;
// 内部实现中真正传入pthread_create的函数(内部再回调m_cb())
static void* run(void* arg);
private:
pid_t m_id = -1; // 线程ID
pthread_t m_thread = 0; // 线程结构
std::function<void()> m_cb; // 线程执行函数,void()类型(传参使用lambda或std::bind)
std::string m_name; // 线程名称
Semaphore m_semaphore; // 信号量
};
同时在thread.cpp实现文件中创建两个静态线程局部变量
// 静态线程本地变量,储存指向当前线程的指针
static thread_local Thread* t_thread = nullptr;
// 静态线程本地变量,储存当前线程名称
static thread_local std::string t_thread_name = "UNKNOW";
以下是方法实现,利用了信号量等待机制来保证资源初始化:
Thread* Thread::GetThis() {
return t_thread;
}
const std::string& Thread::GetName() {
return t_thread_name;
}
void Thread::SetName(const std::string &name) {
if(name.empty()) {
return;
}
if(t_thread) {
t_thread->m_name = name;
}
t_thread_name = name;
}
Thread::Thread(std::function<void()> cb, const std::string& name)
: m_cb(cb)
, m_name(name) {
if(name.empty()) {
m_name = "UNKNOWN";
}
// 创建线程,传入run作为线程运行函数,并将this指针作为run的参数传入
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt =" << rt
<< " name=" << name;
throw std::logic_error("pthread_create error");
}
// 阻塞等待直到线程初始化完毕,线程函数跑起来,构造函数才返回
m_semaphore.wait();
}
Thread::~Thread() {
if(m_thread) {
pthread_detach(m_thread);
}
}
void Thread::join() {
if(m_thread) {
int rt = pthread_join(m_thread, nullptr);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt =" << rt
<< " name=" << m_name;
throw std::logic_error("pthread_join error");
}
// 句柄置空,防止重复join
m_thread = 0;
}
}
void *Thread::run(void *arg) {
// 取出参数,设置线程信息
Thread* thread = (Thread*)arg;
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = sylar::GetThreadId();
// 设置名称
pthread_setname_np(pthread_self(), thread->m_name.substr(0,15).c_str());
// 取出m_cb,使用std::function的swap方法交换所有权,高效安全
std::function<void()> cb;
cb.swap(thread->m_cb);
// 通知构造函数结束
thread->m_semaphore.notify();
cb();
return 0;
}
Noncopyable 工具类
锁和线程实例自然不能复制,会引发资源管理紊乱,通过继承这个工具类来禁用拷贝,禁用赋值
#ifndef __SYLAR_NONCOPYABLE_H__
#define __SYLAR_NONCOPYABLE_H__
namespace sylar {
class Noncopyable {
public:
Noncopyable() = default;
~Noncopyable() = default;
Noncopyable(const Noncopyable&) = delete;
Noncopyable& operator=(const Noncopyable&) = delete;
};
}
#endif // __SYLAR_NONCOPYABLE_H__
测试
#include "sylar/sylar.h"
#include <unistd.h>
sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();
volatile int count = 0; // volatile阻止编译器优化
sylar::Mutex s_mutex;
void func1() {
SYLAR_LOG_INFO(g_logger) << "name: " << sylar::Thread::GetName()
<< " this.name: " << sylar::Thread::GetThis()->getName()
<< " id: " << sylar::GetThreadId()
<< " this.id: " << sylar::Thread::GetThis()->getId();
for(int i = 0; i < 100000; i++) {
sylar::Mutex::Lock lock(s_mutex);
++count;
}
}
int main(int argc, char** argv) {
SYLAR_LOG_INFO(g_logger) << "thread test begin";
std::vector<sylar::Thread::ptr> thrs;
for(int i = 0; i < 3; i++) {
sylar::Thread::ptr thr(new sylar::Thread(&func1, "name_" + std::to_string(i)));
thrs.push_back(thr);
}
for(size_t i = 0; i < thrs.size(); i++) {
thrs[i]->join();
}
SYLAR_LOG_INFO(g_logger) << "thread test end";
SYLAR_LOG_INFO(g_logger) << "count= " << count;
return 0;
}
可以看到测试结果符合预期

总结
通过封装实现了一套可用性高的线程并发编程工具,并与日志模块i整合,实现了线程安全

ദ്ദി˶•̀֊•́)✧