记录一下最近这学期学习的sylar服务器框架项目,输出整理一下项目的结构,用到的知识和自己的体会
项目仓库地址
https://github.com/sylar-yin/sylar/
整理博客过程中参考的大佬资料链接:
============================================
文章目录
基础介绍
该模块实现了一个多线程多协程的调度器,支持指定N个线程,运行M个协程(其实就是先指定调度器线程池的线程数,然后就能随意往里面schedule任务,支持传入 协程/函数 的 实例/指针),实现了协程切换的自动化
该模块只包含一个类Scheduler
Scheduler 协程调度器类
提供了构造N线程调度器(可选是否将本线程纳入线程池),停止/启动调度器,调取任务(可指定运行线程),使用迭代器批量调度任务等方法
为了方便实现,在类的名称空间内声明了任务结构体FiberAndThread,支持使用函数或已经包装好的协程构造,在调度器内部也使用该结构体作为函数参数
成员变量存储了互斥锁,线程池,任务队列以及线程ID,各种要维护的状态
// 协程调度器类
class Scheduler {
public:
typedef std::shared_ptr
<Scheduler> ptr;
typedef Mutex MutexType;
// 构造函数
Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "");
// 虚析构函数
virtual ~Scheduler();
// 返回调度器名称
const std::string& getName() const { return m_name; }
// 返回当前线程的调度器
static Scheduler* GetThis();
// 返回当前线程的主协程
static Fiber* GetMainFiber();
// 启动调度器
void start();
// 停止调度器
void stop();
// 调度协程
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
need_tickle = scheduleNoLock(fc, thread);
}
if(need_tickle) {
tickle();
}
}
// 批量调度协程
template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
while(begin != end) {
need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
begin++;
}
}
if(need_tickle) {
tickle();
}
}
protected:
// 唤醒空闲线程
virtual void tickle();
// 调度器核心调度逻辑,在每个线程池线程中都被 调用
void run();
// 返回bool值,检查调度器是否可以安全停止工作
virtual bool stopping();
// 调度器中的线程在空闲时执行的函数
virtual void idle();
// 设置自己为当前线程的调度器
void setThis();
// 返回是否有空闲线程
bool hasIdleThreads() { return m_idleThreadCount > 0; }
private:
template<class FiberOrCb>
bool scheduleNoLock(FiberOrCb fc, int thread = -1) {
// 检查任务队列中有任务
bool need_tickle = m_fibers.empty();
// 构造任务结构体
FiberAndThread ft(fc, thread);
// 如果任务结构体有效,添加任务至任务队列
if(ft.fiber || ft.cb) {
m_fibers.push_back(ft);
}
// 返回任务队列是否需要tickle
return need_tickle;
}
private:
// 任务结构体
struct FiberAndThread {
Fiber::ptr fiber; // 协程
std::function<void()> cb; // 协程执行函数
int thread; // 线程ID
// 各种构造函数
FiberAndThread(Fiber::ptr f, int thr)
: fiber(f), thread(thr) {}
FiberAndThread(Fiber::ptr* f, int thr)
: thread(thr) {
fiber.swap(*f);
}
FiberAndThread(std::function<void()> f, int thr)
: cb(f), thread(thr) {}
FiberAndThread(std::function<void()>* f, int thr)
: thread(thr) {
cb.swap(*f);
}
// 默认构造函数
FiberAndThread()
: thread(-1) {
}
// 重置
void reset() {
fiber = nullptr;
cb = nullptr;
thread = -1;
}
};
private:
MutexType m_mutex; // 互斥锁
std::vector<Thread::ptr> m_threads; // 线程池
std::list
<FiberAndThread> m_fibers; // 任务队列
Fiber::ptr m_rootFiber; // 调度协程(use_caller==true时有效)
std::string m_name; // 调度器名称
protected:
std::vector
<int> m_threadIds; // 线程ID数组
size_t m_threadCount = 0; // 线程数量
std::atomic
<size_t> m_activeThreadCount = {0}; // 工作线程数量
std::atomic
<size_t> m_idleThreadCount = {0}; // 空闲线程数量
bool m_stopping = true; // 是否正在停止
bool m_autoStop = false; // 是否自动停止
int m_rootThread = 0; // 主线程ID(use_caller==true时有效)
};
日志器和全局变量
声明日志器,以及两个静态全局线程局部变量来记录当前线程的调度器和主协程
static sylar::Logger::ptr g_logger = SYLAR_LOG_NAME("system"); // 日志器
static thread_local Scheduler* t_scheduler = nullptr; // 指向当前线程的调度器
static thread_local Fiber* t_fiber = nullptr; // 指向当前线程的主协程
// 返回当前协程的调度器
Scheduler* Scheduler::GetThis() {
return t_scheduler;
}
// 返回当前线程的主协程
Fiber* Scheduler::GetMainFiber() {
return t_fiber;
}
// 设置自己为当前线程的调度器
void Scheduler::setThis() {
t_scheduler = this;
}
构造函数
调用后会根据用户传入参数计算线程数量,用户可以选择是否将本线程也纳入线程池中
// 构造函数
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name)
: m_name(name) {
// 断言检查线程数量合法
SYLAR_ASSERT(threads > 0);
// 是否将协程调度器线程也纳入调度器
if(use_caller) {
// 构造当前线程的主协程
sylar::Fiber::GetThis();
// 线程池线程要数量-1,因为当前线程也纳入线程池
threads--;
// 断言检查当前线程未绑定其他调度器
SYLAR_ASSERT(GetThis() == nullptr);
// 设置当前线程的调度器为this
t_scheduler = this;
// 设置m_rootFiber为该复用线程的调度器协程(使用Fiber的构造函数参数指明该协程的特殊性)
m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, true));
// 将当前线程名称设置为调度器名称
sylar::Thread::SetName(m_name);
// 将线程主协程的值设为调度器协程
t_fiber = m_rootFiber.get();
// 设置调度器调用线程的ID
m_rootThread = sylar::GetThreadId();
// 将调用调度器的线程加入线程ID数组
m_threadIds.push_back(m_rootThread);
} else { // 若不使用
// 将rootThread设为-1
m_rootThread = -1;
}
// 设置调度线程数量
m_threadCount = threads;
}
析构函数
为虚函数,因为后续Scheduler会派生出IOManager类
// 虚析构函数
Scheduler::~Scheduler() {
// 断言检查调度器是否正在停止
SYLAR_ASSERT(m_stopping);
// 当前线程的调度器是否是正在析构的调度器
if(GetThis() == this) {
// 置空指针
t_scheduler = nullptr;
}
}
// 返回当前协程的调度器
Scheduler* Scheduler::GetThis() {
return t_scheduler;
}
// 返回当前线程的主协程
Fiber* Scheduler::GetMainFiber() {
return t_fiber;
}
启动调度器
调用后会初始化线程池,具体的初始化内容就是创建新线程,然后在线程上运行run()函数(协程调度逻辑就写在run()函数内),最后将线程加入线程池
// 启动调度器
void Scheduler::start() {
// 线程安全
MutexType::Lock lock(m_mutex);
// 若已经在运行直接返回
if(!m_stopping) {
return;
}
// 标记正在运行
m_stopping = false;
// 断言检查线程池是否为空
SYLAR_ASSERT(m_threads.empty());
// 初始化线程池
m_threads.resize(m_threadCount);
for(size_t i = 0; i < m_threadCount; i++) {
// 创建线程,并与Scheduler::run绑定,启动时执行调度逻辑
m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + "_" + std::to_string(i)));
// 添加到线程ID列表
m_threadIds.push_back(m_threads[i]->getId());
}
lock.unlock();
}
唤醒空闲线程
这里Scheduler只是提供了这个虚函数接口,并没有实际意义上实现功能,留给未来继承Scheduler的IOManager来实现
// 唤醒空闲线程
void Scheduler::tickle() {
SYLAR_LOG_INFO(g_logger) << "tickle";
}
idle函数
意味空闲,线程若没有任务要运行,就来执行运行着idle()函数的idle协程(idle_fiber)
可以看到这里的idle逻辑十分简单粗暴,直接将idle协程Yield了(然后线程又会回去重新寻找可执行任务)
这样做的原因是idle函数也是留给未来IOManager重载的函数,在IOManager里会实现更智能的idle逻辑,这里就先打一下日志直接切回去
// 调度器中的线程在空闲时执行的函数
void Scheduler::idle() {
SYLAR_LOG_INFO(g_logger) << "idle";
while(!stopping()) {
sylar::Fiber::YieldToHold();
}
}
判断是否可以停止调度器(内部方法)
只有当停止标志全部为true,任务队列为空,并且活跃线程数为0时才返回true
// 返回bool值,检查调度器是否可以安全停止
bool Scheduler::stopping() {
MutexType::Lock lock(m_mutex);
return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0;
run方法(调度器核心逻辑)
简单来说就是先创建一个idle协程放着,然后去循环,每次循环都试图寻找可执行任务,如果找到了就分配线程执行它
期间维护着活跃线程数这个原子变量,如果需要tickle就tickle一下
如果没找到可执行任务,那么这个线程就去执行idle_fiber
(暂时忽略 set_hook_enable(true),这是后来hook模块完成后加上的)
// 调度器核心调度逻辑,在每个线程池线程中都被调用
void Scheduler::run() {
SYLAR_LOG_DEBUG(g_logger) << "run";
// 开启hook
set_hook_enable(true);
setThis(); // 设置this为当前线程的调度器
// 如果当前线程不是调用调度器的线程
if(sylar::GetThreadId() != m_rootThread) {
// 创建主协程并设置t_fiber(当前线程的主协程)
t_fiber = Fiber::GetThis().get();
}
// 创建idle协程
Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));
// 创建回调函数协程
Fiber::ptr cb_fiber;
// 临时存储从任务队列中取出的任务
FiberAndThread ft;
// 任务调度的无限循环
while(true) {
ft.reset(); // 重置任务结构体
bool tickle_me = false; // 标记是否需要唤醒其他线程
bool is_active = false; // 标记是否有活跃的任务
{
MutexType::Lock lock(m_mutex); // 任务队列线程安全
// 遍历任务队列
auto it = m_fibers.begin();
while(it != m_fibers.end()) {
// 若任务指定的线程不是当前线程,设置tickleme,跳过该任务
if(it->thread != -1 && it->thread != sylar::GetThreadId()) {
it++;
tickle_me = true;
continue;
}
// 断言检查,确保任务中包含有效的协程或回调函数
SYLAR_ASSERT(it->fiber || it->cb);
// 如果包含协程并且协程正在执行,跳过该任务
if(it->fiber && it->fiber->getState() == Fiber::EXEC) {
it++;
continue;
}
// 找到可执行的任务
ft = *it; // 取出
m_fibers.erase(it++); // 从任务队列删除
m_activeThreadCount++; // 增加活跃线程数
is_active = true; // 标记存在活跃任务
break;
}
// 若找到了可执行任务,设置tickleme
tickle_me |= it != m_fibers.end();
}
// 若需要唤醒其他线程,tickle一下
if(tickle_me) {
tickle();
}
// 若取出的任务是协程,并且状态不是TERM或者和EXCEPT
if(ft.fiber && (ft.fiber->getState() != Fiber::TERM) && ft.fiber->getState() != Fiber::EXCEPT) {
ft.fiber->swapIn(); // 切换到该协程执行
m_activeThreadCount--; // 任务结束后,活跃线程数量-1
// 若协程在执行后状态为READY,重新将协程加入任务队列
if(ft.fiber->getState() == Fiber::READY) {
schedule(ft.fiber);
// 若协程在执行后状态不是TERM也不是EXCEPT,将协程状态设为HOLD
} else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) {
ft.fiber->m_state = Fiber::HOLD;
}
ft.reset(); // 重置任务结构体
// 若取出的任务是回调函数
} else if(ft.cb) {
// 创建或复用cb_fiber来执行回调函数
if(cb_fiber) {
cb_fiber->reset(ft.cb);
} else {
cb_fiber.reset(new Fiber(ft.cb));
}
ft.reset(); // 重置任务结构体
cb_fiber->swapIn(); // 切换回调函数线程执行
m_activeThreadCount--; // 执行完毕,更新活跃线程数
// 若执行后协程状态为READY,重新调度
if(cb_fiber->getState() == Fiber::READY) {
schedule(cb_fiber);
cb_fiber.reset();
// 若执行后状态为TERM或EXCEPT,释放嗯协程资源
} else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) {
cb_fiber->reset(nullptr); // reset,复用cb_fiber对象
// 其余情况设置为HOLD状态
} else {
cb_fiber->m_state = Fiber::HOLD;
cb_fiber.reset();
}
// 如果没有任务要执行
} else {
// 若标记过有活跃任务
if(is_active) {
m_activeThreadCount--; // 活跃线程数-1
continue; // 返回循环,取下一个任务
}
// 若idle协程结束,打印日志退出循环
if(idle_fiber->getState() == Fiber::TERM) {
SYLAR_LOG_INFO(g_logger) << "idle fiber term";
break;
}
m_idleThreadCount++; // 增加空闲线程数
idle_fiber->swapIn(); // 切换到空闲协程执行
m_idleThreadCount--; // 减少空闲线程数
// 若空闲协程的状态不是TERM或EXCEPT,设置为HOLD
if(idle_fiber->getState() != Fiber::TERM && idle_fiber->getState() != Fiber::EXCEPT) {
idle_fiber->m_state = Fiber::HOLD;
}
}
}
}
停止调度器
会将线程池中的任务消耗完之后再停止
// 停止调度器
void Scheduler::stop() { // 设置自动停止标志
m_autoStop = true;
if(m_rootFiber
&& m_threadCount == 0
&& (m_rootFiber->getState() == Fiber::TERM
|| m_rootFiber->getState() == Fiber::INIT)) {
// 打印日志并标记调度器已停止
SYLAR_LOG_INFO(g_logger) << this << " stopped";
m_stopping = true;
// 检查是否可安全停止,否则继续执行后续代码
if(stopping()) {
return;
}
}
if(m_rootThread != -1) { // 如果调度器线程也被加入调度
SYLAR_ASSERT(GetThis() == this); // 断言检查当前调度器实例是否为t_scheduler
} else { // 如果调度器线程没被加入调度
SYLAR_ASSERT(GetThis() != this); //
}
// 标记正在停止,唤醒所有线程
m_stopping = true;
for(size_t i = 0; i < m_threadCount; i++) {
tickle();
}
if(m_rootFiber) { // 若有rootFiber多tickle一次
tickle();
}
// 若有rootFiber,call一下清理未完成任务
if(m_rootFiber) {
if(!stopping()) {
m_rootFiber->call();
}
}
// 清理线程池中任务
std::vector<Thread::ptr> thrs;
{
MutexType::Lock lock(m_mutex);
thrs.swap(m_threads);
}
for(auto& i : thrs) {
i->join();
}
}
调度任务
可以看到就是将任务结构体push_back到线程池
// 调度协程
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
need_tickle = scheduleNoLock(fc, thread);
}
if(need_tickle) {
tickle();
}
}
// 批量调度协程
template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
while(begin != end) {
need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
begin++;
}
}
if(need_tickle) {
tickle();
}
}
template<class FiberOrCb>
bool scheduleNoLock(FiberOrCb fc, int thread = -1) {
// 检查任务队列中有任务
bool need_tickle = m_fibers.empty();
// 构造任务结构体
FiberAndThread ft(fc, thread);
// 如果任务结构体有效,添加任务至任务队列
if(ft.fiber || ft.cb) {
m_fibers.push_back(ft);
}
// 返回任务队列是否需要tickle
return need_tickle;
}
任务结构体设计
可以看到可以接收多种类型的参数进行构造,十分方便使用
// 任务结构体
struct FiberAndThread {
Fiber::ptr fiber; // 协程
std::function<void()> cb; // 协程执行函数
int thread; // 线程ID
// 各种构造函数
FiberAndThread(Fiber::ptr f, int thr)
: fiber(f), thread(thr) {}
FiberAndThread(Fiber::ptr* f, int thr)
: thread(thr) {
fiber.swap(*f);
}
FiberAndThread(std::function<void()> f, int thr)
: cb(f), thread(thr) {}
FiberAndThread(std::function<void()>* f, int thr)
: thread(thr) {
cb.swap(*f);
}
// 默认构造函数
FiberAndThread()
: thread(-1) {
}
// 重置
void reset() {
fiber = nullptr;
cb = nullptr;
thread = -1;
}
};
测试
#include "sylar/sylar.h"
sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();
void test_fiber() {
static int s_count = 5;
SYLAR_LOG_INFO(g_logger) << "test in fiber s_count= " << s_count;
sleep(1);
if(--s_count >= 0) {
sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetThreadId());
}
}
int main(int argc, char** argv) {
unlink("/home/ayanami/Rei/tmp/test_scheduler.txt");
g_logger->addAppender(sylar::FileLogAppender::ptr(new sylar::FileLogAppender("/home/ayanami/Rei/tmp/test_scheduler.txt")));
SYLAR_LOG_INFO(g_logger) << "main";
sylar::Scheduler sc(2, true, "test");
sc.start();
sleep(2);
SYLAR_LOG_INFO(g_logger) << "schedule";
sc.schedule(&test_fiber);
sc.stop();
SYLAR_LOG_INFO(g_logger) << "over";
return 0;
}
可以看到测试结果符合预期

总结
实现了一个在N线程上运行M协程的调度器,同时预留了子类可重载的方法

ദ്ദി˶ー̀֊ー́ )✧