[C++]sylar高性能服务器框架学习记录:协程调度模块

记录一下最近这学期学习的sylar服务器框架项目,输出整理一下项目的结构,用到的知识和自己的体会

项目仓库地址

https://github.com/sylar-yin/sylar/

整理博客过程中参考的大佬资料链接:

============================================

基础介绍

该模块实现了一个多线程多协程的调度器,支持指定N个线程,运行M个协程(其实就是先指定调度器线程池的线程数,然后就能随意往里面schedule任务,支持传入 协程/函数 的 实例/指针),实现了协程切换的自动化

该模块只包含一个类Scheduler

Scheduler 协程调度器类

提供了构造N线程调度器(可选是否将本线程纳入线程池),停止/启动调度器,调取任务(可指定运行线程),使用迭代器批量调度任务等方法

为了方便实现,在类的名称空间内声明了任务结构体FiberAndThread,支持使用函数或已经包装好的协程构造,在调度器内部也使用该结构体作为函数参数

成员变量存储了互斥锁,线程池,任务队列以及线程ID,各种要维护的状态

// 协程调度器类
class Scheduler {
public:
    typedef std::shared_ptr
<Scheduler> ptr;
    typedef Mutex MutexType;

    // 构造函数
    Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "");
    // 虚析构函数
    virtual ~Scheduler();

    // 返回调度器名称
    const std::string& getName() const { return m_name; }

    // 返回当前线程的调度器
    static Scheduler* GetThis();
    // 返回当前线程的主协程
    static Fiber* GetMainFiber();

    // 启动调度器
    void start();
    // 停止调度器
    void stop();

    // 调度协程
    template<class FiberOrCb>
    void schedule(FiberOrCb fc, int thread = -1) {
        bool need_tickle = false;
        {
            MutexType::Lock lock(m_mutex);
            need_tickle = scheduleNoLock(fc, thread);
        }
        if(need_tickle) {
            tickle();
        }
    }

    // 批量调度协程
    template<class InputIterator>
    void schedule(InputIterator begin, InputIterator end) {
        bool need_tickle = false;
        {
            MutexType::Lock lock(m_mutex);
            while(begin != end) {
                need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;   
                begin++;
            }
        }
        if(need_tickle) {
            tickle();
        }
    }
protected:
    // 唤醒空闲线程
    virtual void tickle();
    // 调度器核心调度逻辑,在每个线程池线程中都被   调用
    void run();
    // 返回bool值,检查调度器是否可以安全停止工作
    virtual bool stopping();
    // 调度器中的线程在空闲时执行的函数
    virtual void idle();
    // 设置自己为当前线程的调度器
    void setThis();
    // 返回是否有空闲线程
    bool hasIdleThreads() { return m_idleThreadCount > 0; }
private:
    template<class FiberOrCb>
    bool scheduleNoLock(FiberOrCb fc, int thread = -1) {
        // 检查任务队列中有任务
        bool need_tickle = m_fibers.empty();
        // 构造任务结构体
        FiberAndThread ft(fc, thread);
        // 如果任务结构体有效,添加任务至任务队列
        if(ft.fiber || ft.cb) {
            m_fibers.push_back(ft);
        }
        // 返回任务队列是否需要tickle
        return need_tickle;
    }
private:
    // 任务结构体
    struct FiberAndThread {
        Fiber::ptr fiber;           // 协程
        std::function<void()> cb;   // 协程执行函数
        int thread;                 // 线程ID

        // 各种构造函数
        FiberAndThread(Fiber::ptr f, int thr)
            : fiber(f), thread(thr) {}
        FiberAndThread(Fiber::ptr* f, int thr)
            : thread(thr) {
            fiber.swap(*f);
        }
        FiberAndThread(std::function<void()> f, int thr)
            : cb(f), thread(thr) {}

        FiberAndThread(std::function<void()>* f, int thr)
            : thread(thr) {
            cb.swap(*f);
        }

        // 默认构造函数
        FiberAndThread()
            : thread(-1) {
        }

        // 重置
        void reset() {
            fiber = nullptr;
            cb = nullptr;
            thread = -1;
        }
    };
private:
    MutexType m_mutex;                      // 互斥锁
    std::vector<Thread::ptr> m_threads;     // 线程池
    std::list
<FiberAndThread> m_fibers;     // 任务队列
    Fiber::ptr m_rootFiber;                 // 调度协程(use_caller==true时有效)
    std::string m_name;                     // 调度器名称

protected:
    std::vector
<int> m_threadIds;                   // 线程ID数组
    size_t m_threadCount = 0;                       // 线程数量
    std::atomic
<size_t> m_activeThreadCount = {0};  // 工作线程数量
    std::atomic
<size_t> m_idleThreadCount = {0};    // 空闲线程数量
    bool m_stopping = true;                         // 是否正在停止
    bool m_autoStop = false;                        // 是否自动停止
    int m_rootThread = 0;                           // 主线程ID(use_caller==true时有效)
};

日志器和全局变量

声明日志器,以及两个静态全局线程局部变量来记录当前线程的调度器和主协程

static sylar::Logger::ptr g_logger = SYLAR_LOG_NAME("system");  // 日志器

static thread_local Scheduler* t_scheduler = nullptr;           // 指向当前线程的调度器
static thread_local Fiber* t_fiber = nullptr;                   // 指向当前线程的主协程

// 返回当前协程的调度器
Scheduler* Scheduler::GetThis() {
    return t_scheduler;
}

// 返回当前线程的主协程
Fiber* Scheduler::GetMainFiber() {
    return t_fiber;
}

// 设置自己为当前线程的调度器
void Scheduler::setThis() {
    t_scheduler = this;
}

构造函数

调用后会根据用户传入参数计算线程数量,用户可以选择是否将本线程也纳入线程池中

// 构造函数
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name) 
    : m_name(name) {
    // 断言检查线程数量合法
    SYLAR_ASSERT(threads > 0);

    // 是否将协程调度器线程也纳入调度器
    if(use_caller) {
        // 构造当前线程的主协程
        sylar::Fiber::GetThis();
        // 线程池线程要数量-1,因为当前线程也纳入线程池
        threads--;

        // 断言检查当前线程未绑定其他调度器
        SYLAR_ASSERT(GetThis() == nullptr);
        // 设置当前线程的调度器为this
        t_scheduler = this; 

        // 设置m_rootFiber为该复用线程的调度器协程(使用Fiber的构造函数参数指明该协程的特殊性)
        m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, true));
        // 将当前线程名称设置为调度器名称
        sylar::Thread::SetName(m_name);

        // 将线程主协程的值设为调度器协程
        t_fiber = m_rootFiber.get();
        // 设置调度器调用线程的ID
        m_rootThread = sylar::GetThreadId();
        // 将调用调度器的线程加入线程ID数组
        m_threadIds.push_back(m_rootThread);
    } else {    // 若不使用
        // 将rootThread设为-1
        m_rootThread = -1;
    }
    // 设置调度线程数量
    m_threadCount = threads;
}

析构函数

为虚函数,因为后续Scheduler会派生出IOManager类

// 虚析构函数
Scheduler::~Scheduler() {
    // 断言检查调度器是否正在停止
    SYLAR_ASSERT(m_stopping);
    // 当前线程的调度器是否是正在析构的调度器
    if(GetThis() == this) {
        // 置空指针
        t_scheduler = nullptr;
    }
}

// 返回当前协程的调度器
Scheduler* Scheduler::GetThis() {
    return t_scheduler;
}

// 返回当前线程的主协程
Fiber* Scheduler::GetMainFiber() {
    return t_fiber;
}

启动调度器

调用后会初始化线程池,具体的初始化内容就是创建新线程,然后在线程上运行run()函数(协程调度逻辑就写在run()函数内),最后将线程加入线程池

// 启动调度器
void Scheduler::start() {
    // 线程安全
    MutexType::Lock lock(m_mutex);
    // 若已经在运行直接返回
    if(!m_stopping) {
        return;
    }
    // 标记正在运行
    m_stopping = false;
    // 断言检查线程池是否为空
    SYLAR_ASSERT(m_threads.empty());

    // 初始化线程池
    m_threads.resize(m_threadCount);
    for(size_t i = 0; i < m_threadCount; i++) {
        // 创建线程,并与Scheduler::run绑定,启动时执行调度逻辑
        m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + "_" + std::to_string(i)));
        // 添加到线程ID列表
        m_threadIds.push_back(m_threads[i]->getId());
    }
    lock.unlock();
}

唤醒空闲线程

这里Scheduler只是提供了这个虚函数接口,并没有实际意义上实现功能,留给未来继承Scheduler的IOManager来实现

// 唤醒空闲线程
void Scheduler::tickle() {
    SYLAR_LOG_INFO(g_logger) << "tickle";
}

idle函数

意味空闲,线程若没有任务要运行,就来执行运行着idle()函数的idle协程(idle_fiber)

可以看到这里的idle逻辑十分简单粗暴,直接将idle协程Yield了(然后线程又会回去重新寻找可执行任务)

这样做的原因是idle函数也是留给未来IOManager重载的函数,在IOManager里会实现更智能的idle逻辑,这里就先打一下日志直接切回去

// 调度器中的线程在空闲时执行的函数
void Scheduler::idle() {
    SYLAR_LOG_INFO(g_logger) << "idle";
    while(!stopping()) {
        sylar::Fiber::YieldToHold();
    }
}

判断是否可以停止调度器(内部方法)

只有当停止标志全部为true,任务队列为空,并且活跃线程数为0时才返回true

// 返回bool值,检查调度器是否可以安全停止           
bool Scheduler::stopping() {
    MutexType::Lock lock(m_mutex);
    return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0;

run方法(调度器核心逻辑)

简单来说就是先创建一个idle协程放着,然后去循环,每次循环都试图寻找可执行任务,如果找到了就分配线程执行它

期间维护着活跃线程数这个原子变量,如果需要tickle就tickle一下

如果没找到可执行任务,那么这个线程就去执行idle_fiber

(暂时忽略 set_hook_enable(true),这是后来hook模块完成后加上的)

// 调度器核心调度逻辑,在每个线程池线程中都被调用
void Scheduler::run() {
    SYLAR_LOG_DEBUG(g_logger) << "run";

    // 开启hook
    set_hook_enable(true);

    setThis();  // 设置this为当前线程的调度器
    // 如果当前线程不是调用调度器的线程
    if(sylar::GetThreadId() != m_rootThread)  {
        // 创建主协程并设置t_fiber(当前线程的主协程)
        t_fiber = Fiber::GetThis().get();
    }

    // 创建idle协程
    Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));
    // 创建回调函数协程
    Fiber::ptr cb_fiber;
    // 临时存储从任务队列中取出的任务
    FiberAndThread ft;

    // 任务调度的无限循环
    while(true) {
        ft.reset();             // 重置任务结构体
        bool tickle_me = false; // 标记是否需要唤醒其他线程
        bool is_active = false; // 标记是否有活跃的任务
        {
            MutexType::Lock lock(m_mutex);  // 任务队列线程安全

            // 遍历任务队列
            auto it = m_fibers.begin();
            while(it != m_fibers.end()) {
                // 若任务指定的线程不是当前线程,设置tickleme,跳过该任务
                if(it->thread != -1 && it->thread != sylar::GetThreadId()) {
                    it++;
                    tickle_me = true;
                    continue;
                }

                // 断言检查,确保任务中包含有效的协程或回调函数
                SYLAR_ASSERT(it->fiber || it->cb);
                // 如果包含协程并且协程正在执行,跳过该任务
                if(it->fiber && it->fiber->getState() == Fiber::EXEC)  {
                    it++;
                    continue;
                }

                // 找到可执行的任务
                ft = *it;               // 取出 
                m_fibers.erase(it++);   // 从任务队列删除
                m_activeThreadCount++;  // 增加活跃线程数
                is_active = true;       // 标记存在活跃任务
                break;
            }
            // 若找到了可执行任务,设置tickleme
            tickle_me |= it != m_fibers.end();
        }

        // 若需要唤醒其他线程,tickle一下
        if(tickle_me) {
            tickle();
        }

        // 若取出的任务是协程,并且状态不是TERM或者和EXCEPT
        if(ft.fiber && (ft.fiber->getState() != Fiber::TERM) && ft.fiber->getState() != Fiber::EXCEPT) {
            ft.fiber->swapIn();     // 切换到该协程执行
            m_activeThreadCount--;  // 任务结束后,活跃线程数量-1

            // 若协程在执行后状态为READY,重新将协程加入任务队列
            if(ft.fiber->getState() == Fiber::READY) {
                schedule(ft.fiber);
            // 若协程在执行后状态不是TERM也不是EXCEPT,将协程状态设为HOLD
            } else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) {
                ft.fiber->m_state = Fiber::HOLD;
            }
            ft.reset(); // 重置任务结构体
        // 若取出的任务是回调函数
        } else if(ft.cb) {
            // 创建或复用cb_fiber来执行回调函数
            if(cb_fiber) {
                cb_fiber->reset(ft.cb);
            } else {
                cb_fiber.reset(new Fiber(ft.cb));
            }
            ft.reset();             // 重置任务结构体
            cb_fiber->swapIn();     // 切换回调函数线程执行
            m_activeThreadCount--;  // 执行完毕,更新活跃线程数
            // 若执行后协程状态为READY,重新调度
            if(cb_fiber->getState() == Fiber::READY) {
                schedule(cb_fiber);
                cb_fiber.reset();
            // 若执行后状态为TERM或EXCEPT,释放嗯协程资源
            } else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) {
                cb_fiber->reset(nullptr);   // reset,复用cb_fiber对象
            // 其余情况设置为HOLD状态
            } else {
                cb_fiber->m_state = Fiber::HOLD;
                cb_fiber.reset();
            }
        // 如果没有任务要执行
        } else {
            // 若标记过有活跃任务
            if(is_active) {
                m_activeThreadCount--;  // 活跃线程数-1
                continue;               // 返回循环,取下一个任务
            }
            // 若idle协程结束,打印日志退出循环
            if(idle_fiber->getState() == Fiber::TERM) {
                SYLAR_LOG_INFO(g_logger) << "idle fiber term";
                break;
            }

            m_idleThreadCount++;    // 增加空闲线程数
            idle_fiber->swapIn();   // 切换到空闲协程执行
            m_idleThreadCount--;    // 减少空闲线程数

            // 若空闲协程的状态不是TERM或EXCEPT,设置为HOLD
            if(idle_fiber->getState() != Fiber::TERM && idle_fiber->getState() != Fiber::EXCEPT) {
                idle_fiber->m_state = Fiber::HOLD;
            }
        }
    }
}

停止调度器

会将线程池中的任务消耗完之后再停止

// 停止调度器
void Scheduler::stop() {    // 设置自动停止标志
    m_autoStop = true;
    if(m_rootFiber
            && m_threadCount == 0
            && (m_rootFiber->getState() == Fiber::TERM
                || m_rootFiber->getState() == Fiber::INIT)) {
        // 打印日志并标记调度器已停止
        SYLAR_LOG_INFO(g_logger) << this << " stopped";
        m_stopping = true; 

        // 检查是否可安全停止,否则继续执行后续代码
        if(stopping()) {
            return;
        }
    }

    if(m_rootThread != -1) {    // 如果调度器线程也被加入调度
        SYLAR_ASSERT(GetThis() == this);    // 断言检查当前调度器实例是否为t_scheduler
    } else {                    // 如果调度器线程没被加入调度
        SYLAR_ASSERT(GetThis() != this);    // 
    }

    // 标记正在停止,唤醒所有线程
    m_stopping = true;
    for(size_t i = 0; i < m_threadCount; i++) {
        tickle();
    }
    if(m_rootFiber) {   // 若有rootFiber多tickle一次
        tickle();
    }

    // 若有rootFiber,call一下清理未完成任务
    if(m_rootFiber) {
        if(!stopping()) {
            m_rootFiber->call();
        }
    }

    // 清理线程池中任务
    std::vector<Thread::ptr> thrs;
    {
        MutexType::Lock lock(m_mutex);
        thrs.swap(m_threads);
    }
    for(auto& i : thrs) {
        i->join();
    }
}

调度任务

可以看到就是将任务结构体push_back到线程池

    // 调度协程
    template<class FiberOrCb>
    void schedule(FiberOrCb fc, int thread = -1) {
        bool need_tickle = false;
        {
            MutexType::Lock lock(m_mutex);
            need_tickle = scheduleNoLock(fc, thread);
        }
        if(need_tickle) {
            tickle();
        }
    }

    // 批量调度协程
    template<class InputIterator>
    void schedule(InputIterator begin, InputIterator end) {
        bool need_tickle = false;
        {
            MutexType::Lock lock(m_mutex);
            while(begin != end) {
                need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;   
                begin++;
            }
        }
        if(need_tickle) {
            tickle();
        }
    }

    template<class FiberOrCb>
    bool scheduleNoLock(FiberOrCb fc, int thread = -1) {
        // 检查任务队列中有任务
        bool need_tickle = m_fibers.empty();
        // 构造任务结构体
        FiberAndThread ft(fc, thread);
        // 如果任务结构体有效,添加任务至任务队列
        if(ft.fiber || ft.cb) {
            m_fibers.push_back(ft);
        }
        // 返回任务队列是否需要tickle
        return need_tickle;
    }

任务结构体设计

可以看到可以接收多种类型的参数进行构造,十分方便使用

// 任务结构体
    struct FiberAndThread {
        Fiber::ptr fiber;           // 协程
        std::function<void()> cb;   // 协程执行函数
        int thread;                 // 线程ID

        // 各种构造函数
        FiberAndThread(Fiber::ptr f, int thr)
            : fiber(f), thread(thr) {}
        FiberAndThread(Fiber::ptr* f, int thr)
            : thread(thr) {
            fiber.swap(*f);
        }
        FiberAndThread(std::function<void()> f, int thr)
            : cb(f), thread(thr) {}

        FiberAndThread(std::function<void()>* f, int thr)
            : thread(thr) {
            cb.swap(*f);
        }

        // 默认构造函数
        FiberAndThread()
            : thread(-1) {
        }

        // 重置
        void reset() {
            fiber = nullptr;
            cb = nullptr;
            thread = -1;
        }
    };

测试

#include "sylar/sylar.h"

sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();

void test_fiber() {
    static int s_count = 5;
    SYLAR_LOG_INFO(g_logger) << "test in fiber s_count= " << s_count;

    sleep(1);

    if(--s_count >= 0) {
        sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetThreadId());
    }
}

int main(int argc, char** argv) {
    unlink("/home/ayanami/Rei/tmp/test_scheduler.txt");
    g_logger->addAppender(sylar::FileLogAppender::ptr(new sylar::FileLogAppender("/home/ayanami/Rei/tmp/test_scheduler.txt")));
    SYLAR_LOG_INFO(g_logger) << "main";
    sylar::Scheduler sc(2, true, "test");
    sc.start();
    sleep(2);
    SYLAR_LOG_INFO(g_logger) << "schedule"; 
    sc.schedule(&test_fiber);
    sc.stop();
    SYLAR_LOG_INFO(g_logger) << "over";
    return 0;
}   

可以看到测试结果符合预期

image-20260206222857248

总结

实现了一个在N线程上运行M协程的调度器,同时预留了子类可重载的方法

评论

  1. Sankkooos
    Android Firefox 134.0
    18 小时前
    2026-2-07 0:38:44

    ദ്ദി˶ー̀֊ー́ )✧

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇