记录一下最近这学期学习的sylar服务器框架项目,输出整理一下项目的结构,用到的知识和自己的体会
项目仓库地址
https://github.com/sylar-yin/sylar/
整理博客过程中参考的大佬资料链接:
============================================
文章目录
基础介绍
IO协程调度模块在继承Scheduler的基础上,整合封装了Linux的epoll提供的I/O多路复用机制
epoll允许一个进程同时监控多个文件描述符,并在某个或某些文件描述符就绪时,通知进程。 这使得单线程的程序可以同时处理多个网络连接或其他 I/O 事件,从而提高程序的并发性能和资源利用率。
参考资料: https://blog.csdn.net/Long_xu/article/details/144163549
epoll底层使用了红黑树,速度极快,加上使用边缘(ET)触发模式更是快到起飞
sylar这里使用epoll,主要目的是让调度器能高效地调度I/O事件,通过为fd注册监听事件,监听文件描述符的状态(事先将fd设置为非阻塞),执行回调函数,能让线程不再忙等(busy wait)而是通过epoll_wait阻塞在管道描述符上,等待管道的可读事件;当有任务添加时,tickle被调用,管道被写入数据,idle协程检测到后退出,调度器执行调度
该模块只有一个类IOManager
IOManager相比Scheduler增加了为fd注册读/写事件回调函数的功能,这对服务器开发来说至关重要,可以让程序员使用同步的方式写异步的代码,非常舒服
IOManager类
继承自Scheduler类,新增了注册/删除IO事件功能,在名称空间内声明了FdContextfd上下文结构体用来封装注册的事件,重载了tickle,stopping,idle方法
成员变量存储了epoll的fd,管道的fd,用于记录待处理事件数的原子变量,一个FdContext类型的vector以读写锁
// IO协程调度器类(继承了调度器,定时器)
class IOManager : public Scheduler, public TimerManager {
public:
typedef std::shared_ptr
<IOManager> ptr;
typedef RWMutex RWMutexType;
// 事件类型枚举
enum Event {
NONE = 0x0, // 表示没有任何事件
READ = 0x1, // EPOLLIN,读事件,监听fd是否可以读取数据
WRITE = 0x4 // EPOLLOUT,写事件,监听fd是否可以写入数据
};
private:
// fd上下文结构体
struct FdContext {
typedef Mutex MutexType; // 线程安全
// 事件上下文结构体
struct EventContext {
Scheduler* scheduler = nullptr; // 事件执行的Scheduler
Fiber::ptr fiber; // 事件协程
std::function<void()> cb; // 事件的回调函数
};
EventContext& getContext(Event event); // 根据传入枚举类型返回对应事件上下文成员
void resetContext(EventContext& ctx); // 重置传入的EventContext结构体
void triggerEvent(Event event); // 触发指定事件并执行回调或恢复协程
EventContext read; // 读事件上下文
EventContext write; // 写事件上下文
int fd = 0; // fd本身
Event events = NONE; // 已经注册的事件
MutexType mutex; // 互斥锁
};
public:
// 构造函数
IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = "");
// 析构函数
~IOManager();
// 向指定fd添加IO事件(READ/WRITE),并关联事件回调函数(0 success, -1 error)
int addEvent(int fd, Event event, std::function<void()> cb = nullptr);
// 删除指定fd上注册的指定IO事件(READ/WRITE)
bool delEvent(int fd, Event event);
// 取消指定fd上注册的指定IO事件(READ/WRITE)
bool cancelEvent(int fd, Event event);
// 取消指定fd上全部的IO事件 (READ & WRITE)
bool cancelAll(int fd);
// 返回当前协程的IOManager
static IOManager* GetThis();
protected:
void tickle() override; // 唤醒其他线程
bool stopping() override; // 返回是否满足停止条件
void idle() override; // 线程空闲时执行的函数
void onTimerInsertAtFront() override; // 调用tickle()
void contextResize(size_t size); // 初始化fd上下文结构体vector
bool stopping(uint64_t& timeout); // 返回是否满足停止条件(定时器相关)
private:
int m_epfd = 0; // epoll的fd
int m_tickleFds[2]; // 管道fd
std::atomic
<size_t> m_pendingEventCount = {0}; // 当前待处理的事件数
RWMutexType m_mutex; // 互斥锁
std::vector<FdContext*> m_fdContexts; // fd上下文结构体vector
};
构造函数
初始化了epoll以及管道fd,注册了监听管道读事件的epoll事件
// 构造函数
IOManager::IOManager(size_t threads, bool use_caller, const std::string& name)
: Scheduler(threads, use_caller, name) {
// 创建epoll
m_epfd = epoll_create(5000);
SYLAR_ASSERT(m_epfd > 0);
// 创建管道用于线程tickle
int rt = pipe(m_tickleFds);
SYLAR_ASSERT(!rt);
// 注册一个epoll_event(读事件,fd为pipefd[0])
epoll_event event;
memset(&event, 0, sizeof(epoll_event));
event.events = EPOLLIN | EPOLLET; // 边缘触发模式
event.data.fd = m_tickleFds[0];
// 设置ticklefd[0]非阻塞
rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
SYLAR_ASSERT(!rt);
// 将该事件注册到epoll中
rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
SYLAR_ASSERT(!rt);
// 初始化fd上下文结构体vector
contextResize(32);
// 启动IOManager
start();
}
析构函数
释放资源
// 析构函数
IOManager::~IOManager() {
// 停止IOManager
stop();
// 关闭fd
close(m_epfd);
close(m_tickleFds[0]);
close(m_tickleFds[1]);
// 释放m_fdContexts内存
for(size_t i = 0; i < m_fdContexts.size(); i++) {
if(m_fdContexts[i]) {
delete m_fdContexts[i];
}
}
}
Event事件类型枚举
将事件分为读事件和写事件,枚举值与epoll函数接收的宏值相同
// 事件类型枚举
enum Event {
NONE = 0x0, // 表示没有任何事件
READ = 0x1, // EPOLLIN,读事件,监听fd是否可以读取数据
WRITE = 0x4 // EPOLLOUT,写事件,监听fd是否可以写入数据
};
FdContext结构体
对I/O事件进行封装,封装了fd,事件上下文等信息,提供了方法来设置/触发事件
// fd上下文结构体
struct FdContext {
typedef Mutex MutexType; // 线程安全
// 事件上下文结构体
struct EventContext {
Scheduler* scheduler = nullptr; // 事件执行的Scheduler
Fiber::ptr fiber; // 事件协程
std::function<void()> cb; // 事件的回调函数
};
EventContext& getContext(Event event); // 根据传入枚举类型返回对应事件上下文成员
void resetContext(EventContext& ctx); // 重置传入的EventContext结构体
void triggerEvent(Event event); // 触发指定事件并执行回调或恢复协程
EventContext read; // 读事件上下文
EventContext write; // 写事件上下文
int fd = 0; // fd本身
Event events = NONE; // 已经注册的事件
MutexType mutex; // 互斥锁
};
// 根据传入枚举类型返回对应事件上下文成员
IOManager::FdContext::EventContext& IOManager::FdContext::getContext(IOManager::Event event) {
switch(event) {
case IOManager::READ:
return read;
case IOManager::WRITE:
return write;
default:
SYLAR_ASSERT2(false, "getContext");
}
}
// 重置传入的EventContext结构体
void IOManager::FdContext::resetContext(IOManager::FdContext::EventContext& ctx) {
ctx.scheduler = nullptr;
ctx.fiber.reset();
ctx.cb = nullptr;
}
// 触发指定事件并执行回调或恢复协程
void IOManager::FdContext::triggerEvent(IOManager::Event event) {
SYLAR_ASSERT(events & event);
events = (Event)(events & ~event);
EventContext& ctx = getContext(event);
if(ctx.cb) {
ctx.scheduler->schedule(&ctx.cb);
} else {
ctx.scheduler->schedule(&ctx.fiber);
}
ctx.scheduler = nullptr;
return;
}
向fd注册事件
逻辑简单来说就是创建一个FdContext,向epoll注册事件,设置好FdContext的值,添加到IOManager内部的FdContexts数组中待执行,过程中维护一些变量
// 向指定fd添加IO事件(READ/WRITE),并关联事件回调函数(0 success, -1 error)
int IOManager::addEvent(int fd, Event event, std::function<void()> cb) {
// 初始化一个FdContext
FdContext* fd_ctx = nullptr;
RWMutexType::ReadLock lock(m_mutex); // 线程安全(readlock)
// 从m_fdContexts中获取到对应fd的FdContext指针
if((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
lock.unlock();
} else { // fd不够就扩充vector
lock.unlock();
RWMutexType::WriteLock lock2(m_mutex);
contextResize(fd * 1.5);
fd_ctx = m_fdContexts[fd];
}
// 使用取出的fdContext的互斥锁
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
// 禁止重复添加同种事件
if(fd_ctx->events & event) {
SYLAR_LOG_ERROR(g_logger) << "addEvent assert fd=" << fd
<< " event=" << event
<< " fd_ctx.event=" << fd_ctx->events;
SYLAR_ASSERT(!(fd_ctx->events & event));
}
// 根据是否有已注册事件选择操作
int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
// 创建一个epoll_event,设置边缘触发模式,添加事件,设置fd
epoll_event epevent;
epevent.events = EPOLLET | fd_ctx->events | event;
epevent.data.ptr = fd_ctx;
// 注册epoll事件
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << "," << fd << "," << epevent.events << "):"
<< rt << ", (" << errno << ") (" << strerror(errno) << ")";
return -1;
}
// 待执行的事件数+1
m_pendingEventCount++;
// 更新fdContext里的已注册事件
fd_ctx->events = (Event)(fd_ctx->events | event);
// 获取fdContext里对应事件类型的事件上下文
FdContext::EventContext& event_ctx = fd_ctx->getContext(event);
// 事件上下文应该是空的
SYLAR_ASSERT(!event_ctx.scheduler
&& !event_ctx.fiber
&& !event_ctx.cb);
// 设置事件的调度器
event_ctx.scheduler = Scheduler::GetThis();
// 若传入了回调函数
if(cb) {
// 设置事件的回调函数
event_ctx.cb.swap(cb);
} else {
// 设置事件的协程
event_ctx.fiber = Fiber::GetThis();
SYLAR_ASSERT2(event_ctx.fiber->getState() == Fiber::EXEC
,"state=" << event_ctx.fiber->getState());
}
return 0;
}
删除fd上指定类型的事件
流程与addEvent类似,先取出存放的FdContext,获取相关值,根据这些值来操作epoll
// 删除指定fd上注册的指定IO事件(READ/WRITE)
bool IOManager::delEvent(int fd, Event event) {
RWMutexType::ReadLock lock(m_mutex); // 线程安全
// 获取fd对应的FdContext
if((int)m_fdContexts.size() <= fd) {
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
// 使用FdContext的成员锁
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
// 若没有注册该类型事件,直接返回
if(!(fd_ctx->events & event)) {
return false;
}
// 获取删除指定事件后的位掩码
Event new_events = (Event)(fd_ctx->events & ~event);
// 若删除后还有剩事件,则是MOD,没剩下事件则是DEL
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
// 创建epoll事件,设置边缘触发模式,事件类型,data指向fdContext
epoll_event epevent;
epevent.events = EPOLLET | new_events;
epevent.data.ptr = fd_ctx;
// 对epoll进行操作(MOD/DEL)
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << "," << fd << "," << epevent.events << "):"
<< rt << ", (" << errno << ") (" << strerror(errno) << ")";
return false;
}
// 待处理事件数量-1
m_pendingEventCount--;
// 设置fdContext的已注册事件类型
fd_ctx->events = new_events;
// 重置fdContext的对应事件类型上下文成员
FdContext::EventContext& event_ctx = fd_ctx->getContext(event);
fd_ctx->resetContext(event_ctx);
return true;
}
取消fd上指定类型的事件
逻辑与上面的函数差不多,区别在与cancelEvent最后会触发事件的回调
// 取消指定fd上注册的指定IO事件(READ/WRITE)
bool IOManager::cancelEvent(int fd, Event event) {
RWMutexType::ReadLock lock(m_mutex); // 线程安全
if((int)m_fdContexts.size() <= fd) {
return false;
}
// 获取对应fd的FdContext
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
// 使用fdContext的成员锁
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
if(!(fd_ctx->events & event)) {
return false;
}
// 获取新的事件类型位掩码,判断应执行的操作
Event new_events = (Event)(fd_ctx->events & ~event);
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
// 创建新的epoll事件,并设置
epoll_event epevent;
epevent.events = EPOLLET | new_events;
epevent.data.ptr = fd_ctx;
// 执行epoll操作
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << "," << fd << "," << epevent.events << "):"
<< rt << ", (" << errno << ") (" << strerror(errno) << ")";
return false;
}
// 直接触发对应fdContext的对应事件
fd_ctx->triggerEvent(event);
// 待处理事件数量-1
m_pendingEventCount--;
return true;
}
取消fd上所有类型的事件
和上面逻辑类似,区别是会取消所有类型的事件
// 取消指定fd上全部的IO事件 (READ & WRITE)
bool IOManager::cancelAll(int fd) {
RWMutexType::ReadLock lock(m_mutex);
if((int)m_fdContexts.size() <= fd) {
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
if(!fd_ctx->events) {
return false;
}
int op = EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = 0;
epevent.data.ptr = fd_ctx;
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << "," << fd << "," << epevent.events << "):"
<< rt << ", (" << errno << ") (" << strerror(errno) << ")";
return false;
}
if(fd_ctx->events & READ) {
fd_ctx->triggerEvent(READ);
m_pendingEventCount--;
}
if(fd_ctx->events & WRITE) {
fd_ctx->triggerEvent(WRITE);
m_pendingEventCount--;
}
SYLAR_ASSERT(fd_ctx->events == 0);
return true;
}
tickle方法的重载
通过往管道里写一个字节,唤醒正在epoll_wait的空闲线程
// 唤醒其他线程
void IOManager::tickle() {
if(!hasIdleThreads()) {
return;
}
// 向pipe里发送1字节数据,唤醒epoll_wait
int rt = write(m_tickleFds[1], "T", 1);
SYLAR_ASSERT(rt == 1);
}
stopping方法的重载
除了要满足Scheduler::stopping(),还必须满足待执行任务数量为0 以及 定时器为空(后续整合入的定时器模块)
// 返回是否满足停止条件
bool IOManager::stopping(uint64_t& timeout) {
// 获取下次任务执行时间
timeout = getNextTimer();
// 定时器为空,待执行任务为0,Scheduler可停止
return timeout == ~0ull
&& m_pendingEventCount == 0
&& Scheduler::stopping();
}
// 返回是否满足停止条件
bool IOManager::stopping() {
uint64_t timeout = 0;
return stopping(timeout);
}
idle方法的重载
会调用epoll_wait等待I/O事件,然后遍历返回的事件,取出FdContext并触发回调,然后更新epoll事件状态
// 线程空闲时执行的函数
void IOManager::idle() {
SYLAR_LOG_DEBUG(g_logger) << "idle";
// 创建长度为64的epoll_event数组并使用智能指针托管
epoll_event* events = new epoll_event[64]();
std::shared_ptr
<epoll_event> shared_events(events, [](epoll_event* ptr){
delete[] ptr;
});
while(true) {
// 检查是否需要停止IOManager
uint64_t next_timeout = 0;
if(stopping(next_timeout)) {
SYLAR_LOG_INFO(g_logger) << "name=" << getName() << " idle stopping exit";
break;
}
int rt = 0;
do {
static const int MAX_TIMEOUT = 3000; // 最大等待超时时间
// 处理下一个定时器超时时间
if(next_timeout != ~0ull) {
next_timeout = (int)next_timeout > MAX_TIMEOUT
? MAX_TIMEOUT : next_timeout;
} else {
next_timeout = MAX_TIMEOUT;
}
// 调用epoll_wait等待IO事件
rt = epoll_wait(m_epfd, events, 64, (int)next_timeout);
// 如果epoll被中断则回到循环头部继续等待
if(rt < 0 && errno == EINTR) {
} else { // 否则跳出循环
break;
}
} while(true);
std::vector<std::function<void()>> cbs;
listExpiredCb(cbs);
if(!cbs.empty()) {
// SYLAR_LOG_DEBUG(g_logger) << "on timer cbs.size=" << cbs.size();
schedule(cbs.begin(), cbs.end());
cbs.clear();
}
// 遍历epoll_wait返回的事件
for(int i = 0; i < rt; i++) {
// 取出epoll_event
epoll_event& event = events[i];
// 如果是ticklefd
if(event.data.fd == m_tickleFds[0]) {
uint8_t dummy[256];
// 读管道发来的数据,唤醒某个正在epoll_wait的线程
while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
continue;
}
// 取出event的FdContext
FdContext* fd_ctx = (FdContext*)event.data.ptr;
FdContext::MutexType::Lock lock(fd_ctx->mutex); // 使用FdContext的成员锁
// 处理错误和挂起事件(同时设置读写事件)
if(event.events & (EPOLLERR | EPOLLHUP)) {
event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
}
int real_event = NONE;
if(event.events & EPOLLIN) { // 读事件
real_event |= READ;
}
if(event.events & EPOLLOUT) { // 写事件
real_event |= WRITE;
}
// 空事件
if((fd_ctx->events & real_event) == NONE) {
continue;
}
// 获取剩余事件
int left_event = (fd_ctx->events & ~real_event);
// 判断是修改还是删除
int op = left_event ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
// 设置为边缘触发
event.events = EPOLLET | left_event;
// 执行op操作
int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if(rt2) {
SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", "
<< op << "," << fd_ctx->fd << "," << event.events << "):"
<< rt << ", (" << errno << ") (" << strerror(errno) << ")";
continue;
}
// 触发事件
if(real_event & READ) {
fd_ctx->triggerEvent(READ);
m_pendingEventCount--;
}
if(real_event & WRITE) {
fd_ctx->triggerEvent(WRITE);
m_pendingEventCount--;
}
}
// 返回调度器协程(使用裸指针,防止析构问题)
Fiber::ptr cur = Fiber::GetThis();
auto raw_ptr = cur.get();
cur.reset();
raw_ptr->swapOut();
}
}
测试
测试内容是注册两个网络IO事件,一个读一个写,理论上是先触发写事件回调,往socket写http报文;然后触发读事件,从socket中读出http响应报文
#include "sylar/sylar.h"
#include "sylar/iomanager.h"
#include
<iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
sylar::Logger::ptr g_logger(SYLAR_LOG_ROOT());
int sock = 0;
void test_fiber() {
SYLAR_LOG_INFO(g_logger) << "test_fiber sock=" << sock;
sock = socket(AF_INET, SOCK_STREAM, 0);
fcntl(sock, F_SETFL, O_NONBLOCK);
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(80);
inet_pton(AF_INET, "104.20.34.220", &addr.sin_addr.s_addr);
if(!connect(sock, (const sockaddr*)&addr, sizeof(addr))) {
} else if(errno == EINPROGRESS) {
SYLAR_LOG_INFO(g_logger) << "add event errno=" << errno << " " << strerror(errno);
sylar::IOManager::GetThis()->addEvent(sock, sylar::IOManager::READ, [](){
SYLAR_LOG_INFO(g_logger) << "read callback";
char temp[1024];
int rt = read(sock, temp, 1024);
if(rt >= 0) {
std::string ans(temp,rt);
SYLAR_LOG_INFO(g_logger) << "read:[" << ans << "]";
} else {
SYLAR_LOG_INFO(g_logger) << "read rt=" << rt;
}
});
sylar::IOManager::GetThis()->addEvent(sock, sylar::IOManager::WRITE, [](){
SYLAR_LOG_INFO(g_logger) << "write callback";
const char* req =
"GET / HTTP/1.1\r\n"
"Host: www.example.com\r\n"
"Connection: close\r\n"
"\r\n";
int rt = write(sock, req, strlen(req));
SYLAR_LOG_INFO(g_logger) << "write rt=" << rt;
});
} else {
SYLAR_LOG_INFO(g_logger) << "else " << errno << " " << strerror(errno);
}
}
void test1() {
std::cout << "EPOLLIN=" << EPOLLIN
<< " EPOLLOUT=" << EPOLLOUT << std::endl;
sylar::IOManager iom(2, false, "IOM");
iom.schedule(&test_fiber);
}
int main(int argc, char** argv) {
SYLAR_LOG_INFO(g_logger) << "test begin";
test1();
SYLAR_LOG_INFO(g_logger) << "test end";
return 0;
}
可以看到成功事件回调成功执行,符合预期

总结
在Scheduler的基础上实现了通过epoll实现事件并发处理的IOManager,为后续网络服务器高并发服务提供支持

厉害 ദ്ദി˶>ω<)✧