项目仓库地址 https://github.com/sylar-yin/sylar/
基础介绍
完成sylar C++ Server Framework基础模块之后,又新增了websocket支持,环境变量模块,JSON支持,现在使用sylar框架搭建一个基于websocket的在线多人网页聊天室,样式如下:
访问这里加入聊天室:http://ayanami.blue:8080/html/index.html



项目结构
bin目录下存放可执行文件,yaml配置文件,日志文件以及网页html文件
build目录为构建目录
cmake目录存放自定义的cmake函数工具
src目录存放源码

源码解析
按照功能可分为5部分:
- 服务器资源请求路由ResourceServlet
- 聊天协议ChatMessage
- 聊天功能路由ChatWSServlet
- 聊天服务器ChatServer
- main函数
ResourceServlet
继承自Servlet类,重载了handle方法,用于解析路径返回对应路径文件资源
class ResourceServlet : public sylar::http::Servlet {
public:
typedef std::shared_ptr
<ResourceServlet> ptr;
ResourceServlet(const std::string& path);
virtual int32_t handle(sylar::http::HttpRequest::ptr request
, sylar::http::HttpResponse::ptr response
, sylar::http::HttpSession::ptr session) override;
private:
std::string m_path;
};
ResourceServlet::ResourceServlet(const std::string& path)
:Servlet("ResourceServlet")
,m_path(path){
}
int32_t ResourceServlet::handle(sylar::http::HttpRequest::ptr request
, sylar::http::HttpResponse::ptr response
, sylar::http::HttpSession::ptr session) {
// 获取文件资源请求路径
auto path = m_path + "/" + request->getPath();
SYLAR_LOG_INFO(g_logger) << "handle path=" << path;
if(path.find("..") != std::string::npos) {
response->setBody("invalid path");
response->setStatus(sylar::http::HttpStatus::NOT_FOUND);
return 0;
}
// 获取服务器文件资源
std::ifstream ifs(path);
if(!ifs) {
response->setBody("invalid path");
response->setStatus(sylar::http::HttpStatus::NOT_FOUND);
return 0;
}
std::stringstream ss;
std::string line;
while(std::getline(ifs, line)) {
ss << line << std::endl;
}
// 写入响应报文
response->setBody(ss.str());
response->setHeader("content-type", "text/html;charset=utf-8");
return 0;
}
ChatMessage
聊天协议,使用JOSN字符串表示消息报文,在类中使用std::map来存储信息,使用时调用toString方法转成JSON字符串
class ChatMessage {
public:
typedef std::shared_ptr
<ChatMessage> ptr;
static ChatMessage::ptr Create(const std::string& v);
ChatMessage();
std::string get(const std::string& name);
void set(const std::string& name, const std::string& val);
std::string toString() const;
private:
std::map<std::string, std::string> m_datas;
};
ChatMessage::ptr ChatMessage::Create(const std::string& v) {
Json::Value json;
if(!sylar::JsonUtil::FromString(json, v)) {
return nullptr;
}
ChatMessage::ptr rt(new ChatMessage);
auto names = json.getMemberNames();
for(auto& i : names) {
rt->m_datas[i] = json[i].asString();
}
return rt;
}
ChatMessage::ChatMessage() {
}
std::string ChatMessage::get(const std::string& name) {
auto it = m_datas.find(name);
return it == m_datas.end() ? "" : it->second;
}
void ChatMessage::set(const std::string& name, const std::string& val) {
m_datas[name] = val;
}
std::string ChatMessage::toString() const {
Json::Value json;
for(auto& i : m_datas) {
json[i.first] = i.second;
}
return sylar::JsonUtil::ToString(json);
}
ChatWSServlet
继承自WSServlet类,重载了onConnect,onClose函数,对应连接建立/关闭逻辑;也重载了handle函数,该函数解析聊天JSON字符串报文,根据type的值选择不同的处理逻辑
全局上开了一个std::map<std::string, sylar::http::WSSession::ptr> m_sessions来存储已连接的会话
聊天消息报文分为以下几类:
- 登录消息
- 退出消息
- WebSocket ping pong 心跳处理
- 发送消息
class ChatWSServlet : public sylar::http::WSServlet {
public:
typedef std::shared_ptr
<ChatWSServlet> ptr;
ChatWSServlet();
virtual int32_t onConnect(sylar::http::HttpRequest::ptr header
,sylar::http::WSSession::ptr session) override;
virtual int32_t onClose(sylar::http::HttpRequest::ptr header
,sylar::http::WSSession::ptr session) override;
virtual int32_t handle(sylar::http::HttpRequest::ptr header
,sylar::http::WSFrameMessage::ptr msg
,sylar::http::WSSession::ptr session) override;
};
ChatWSServlet::ChatWSServlet()
:sylar::http::WSServlet("chat_servlet") {
}
int32_t ChatWSServlet::onConnect(sylar::http::HttpRequest::ptr header
,sylar::http::WSSession::ptr session) {
SYLAR_LOG_INFO(g_logger) << "onConnect " << session;
return 0;
}
int32_t ChatWSServlet::onClose(sylar::http::HttpRequest::ptr header,
sylar::http::WSSession::ptr session) {
auto id = header->getHeader("$id");
SYLAR_LOG_INFO(g_logger) << "onClose " << session << " id=" << id;
if(!id.empty()) {
session_del(id);
ChatMessage::ptr nty(new ChatMessage);
nty->set("type", "user_leave");
nty->set("time", sylar::Time2Str());
nty->set("name", id);
session_notify(nty, nullptr);
notify_online_count();
}
return 0;
}
int32_t ChatWSServlet::handle(sylar::http::HttpRequest::ptr header,
sylar::http::WSFrameMessage::ptr msgx,
sylar::http::WSSession::ptr session) {
SYLAR_LOG_DEBUG(g_logger) << "handle " << session
<< " opcode=" << msgx->getOpcode()
<< " data=" << msgx->getData();
auto msg = ChatMessage::Create(msgx->getData());
auto id = header->getHeader("$id");
// 解析失败,直接认为连接异常
if(!msg) {
if(!id.empty()) {
sylar::RWMutex::WriteLock lock(m_mutex);
m_sessions.erase(id);
}
return 1;
}
auto type = msg->get("type");
/********************* 心跳处理 *********************/
if(type == "ping") {
ChatMessage::ptr pong(new ChatMessage);
pong->set("type", "pong");
pong->set("time", sylar::Time2Str());
return SendMessage(session, pong);
}
ChatMessage::ptr rsp(new ChatMessage);
/********************* 登录 *********************/
if(type == "login_request") {
rsp->set("type", "login_response");
auto name = msg->get("name");
if(name.empty()) {
rsp->set("result", "400");
rsp->set("msg", "name is null");
return SendMessage(session, rsp);
}
if(!id.empty()) {
rsp->set("result", "401");
rsp->set("msg", "logined");
return SendMessage(session, rsp);
}
if(session_exists(name)) {
rsp->set("result", "402");
rsp->set("msg", "name exists");
return SendMessage(session, rsp);
}
id = name;
header->setHeader("$id", id);
session_add(id, session);
notify_online_count();
rsp->set("result", "200");
rsp->set("msg", "ok");
ChatMessage::ptr nty(new ChatMessage);
nty->set("type", "user_enter");
nty->set("time", sylar::Time2Str());
nty->set("name", id);
session_notify(nty, session);
return SendMessage(session, rsp);
}
/********************* 退出登录(新增) *********************/
if(type == "logout_request") {
rsp->set("type", "logout_response");
if(id.empty()) {
rsp->set("result", "403");
rsp->set("msg", "not login");
return SendMessage(session, rsp);
}
// 从会话表中移除
{
sylar::RWMutex::WriteLock lock(m_mutex);
m_sessions.erase(id);
}
notify_online_count();
// 通知其他用户
ChatMessage::ptr nty(new ChatMessage);
nty->set("type", "user_leave");
nty->set("time", sylar::Time2Str());
nty->set("name", id);
session_notify(nty, session);
// 清空 header 中的 id
header->delHeader("$id");
rsp->set("result", "200");
rsp->set("msg", "ok");
// 给自己一个响应(客户端收到后主动 close)
return SendMessage(session, rsp);
}
/********************* 发消息 *********************/
if(type == "send_request") {
rsp->set("type", "send_response");
auto m = msg->get("msg");
if(m.empty()) {
rsp->set("result", "500");
rsp->set("msg", "msg is null");
return SendMessage(session, rsp);
}
if(id.empty()) {
rsp->set("result", "501");
rsp->set("msg", "not login");
return SendMessage(session, rsp);
}
rsp->set("result", "200");
rsp->set("msg", "ok");
ChatMessage::ptr nty(new ChatMessage);
nty->set("type", "msg");
nty->set("time", sylar::Time2Str());
nty->set("name", id);
nty->set("msg", m);
session_notify(nty, nullptr);
return SendMessage(session, rsp);
}
// 未知类型
SYLAR_LOG_WARN(g_logger) << "unknown message type=" << type;
return 0;
}
添加/删除会话,广播消息,广播在线人数功能:
bool session_exists(const std::string& id) {
SYLAR_LOG_INFO(g_logger) << "session_exists id=" << id;
sylar::RWMutex::ReadLock lock(m_mutex);
auto it = m_sessions.find(id);
return it != m_sessions.end();
}
void session_add(const std::string& id, sylar::http::WSSession::ptr session) {
SYLAR_LOG_INFO(g_logger) << "session_add id=" << id;
sylar::RWMutex::WriteLock lock(m_mutex);
m_sessions[id] = session;
}
void session_del(const std::string& id) {
SYLAR_LOG_INFO(g_logger) << "session_add del=" << id;
sylar::RWMutex::WriteLock lock(m_mutex);
m_sessions.erase(id);
}
int32_t SendMessage(sylar::http::WSSession::ptr session
, ChatMessage::ptr msg) {
SYLAR_LOG_DEBUG(g_logger) << msg->toString() << " - " << session;
return session->sendMessage(msg->toString()) > 0 ? 0: 1;
}
void session_notify(ChatMessage::ptr msg, sylar::http::WSSession::ptr session = nullptr) {
sylar::RWMutex::ReadLock lock(m_mutex);
auto sessions = m_sessions;
lock.unlock();
for(auto& i : sessions) {
if(i.second == session) {
continue;
}
SendMessage(i.second, msg);
}
}
void notify_online_count() {
size_t cnt = 0;
{
sylar::RWMutex::ReadLock lock(m_mutex);
cnt = m_sessions.size();
}
ChatMessage::ptr nty(new ChatMessage);
nty->set("type", "online_count");
nty->set("count", std::to_string(cnt));
session_notify(nty, nullptr); // 广播给所有在线用户
}
http服务器的端口和websocket服务器的端口注册了配置项
static sylar::ConfigVar
<uint16_t>::ptr g_http_server_port =
sylar::Config::Lookup("http_server.port", (uint16_t)(8080),
"http server port");
static sylar::ConfigVar
<uint16_t>::ptr g_websocket_server_port =
sylar::Config::Lookup("websocket_server.port", (uint16_t)(8072),
"websocket server port");
ChatServer
包含了一个http服务器和一个websocket服务器,http服务器用于监听建立连接并升级连接,连接升级之后使用websocket服务器通信
class ChatServer {
public:
typedef std::shared_ptr
<ChatServer> ptr;
ChatServer(int thread_count = 4, int argc = 1, char** argv = nullptr);
void start();
private:
sylar::http::HttpServer::ptr m_http_server;
sylar::http::WSServer::ptr m_ws_server;
};
ChatServer::ChatServer(int thread_count, int argc, char** argv) {
SYLAR_LOG_INFO(g_logger) << "===============Server Initializing===============";
// 初始化http服务器
m_http_server = sylar::http::HttpServer::ptr(new sylar::http::HttpServer(true));
auto http_addr = sylar::Address::LookupAnyIPAddress("0.0.0.0:8080");
http_addr->setPort(g_http_server_port->getValue());
if(!m_http_server->bind(http_addr)) {
SYLAR_LOG_ERROR(g_logger) << "http_server bind fail";
SYLAR_ASSERT2(false, "http_server bind fail");
}
auto dispatch = m_http_server->getServletDispatch();
dispatch->addGlobServlet(
"/html/*",
sylar::http::ResourceServlet::ptr(
new sylar::http::ResourceServlet(
sylar::EnvMgr::GetInstance()->getCwd()
)
)
);
// 初始化websocket服务器
m_ws_server = sylar::http::WSServer::ptr(new sylar::http::WSServer());
auto ws_addr = sylar::Address::LookupAnyIPAddress("0.0.0.0:8072");
ws_addr->setPort(g_websocket_server_port->getValue());
if(!m_ws_server->bind(ws_addr)) {
SYLAR_LOG_ERROR(g_logger) << "ws_server bind fail";
SYLAR_ASSERT2(false, "ws_server bind fail");
}
auto ws_dispatch = m_ws_server->getWSServletDispatch();
chat::ChatWSServlet::ptr servlet(new chat::ChatWSServlet());
ws_dispatch->addServlet(
"/sylar/chat",
std::bind(
&chat::ChatWSServlet::handle,
servlet,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3
),
std::bind(
&chat::ChatWSServlet::onConnect,
servlet,
std::placeholders::_1,
std::placeholders::_2
),
std::bind(
&chat::ChatWSServlet::onClose,
servlet,
std::placeholders::_1,
std::placeholders::_2
)
);
}
void ChatServer::start() {
m_http_server->start();
SYLAR_LOG_INFO(g_logger) << "HTTP Server stated";
m_ws_server->start();
SYLAR_LOG_INFO(g_logger) << "WebSocket Server started";
SYLAR_LOG_INFO(g_logger) << "===============Server Is Running===============";
}
main
可以使用命令行参数
- -s 终端前台运行
- -d 守护进程后台运行
- -p 打印帮助内容
启动后会先从conf目录加载所有配置文件,再解析命令行参数选择启动模式
int server_main(int argc, char** argv) {
sylar::IOManager iom(4, true, "chat");
chat::ChatServer::ptr server(
new chat::ChatServer(4, argc, argv)
);
server->start();
return 0;
}
int main(int argc, char** argv) {
sylar::EnvMgr::GetInstance()->addHelp("s", "start with terminal");
sylar::EnvMgr::GetInstance()->addHelp("d", "run as daemon");
sylar::EnvMgr::GetInstance()->addHelp("p", "print help");
if(!sylar::EnvMgr::GetInstance()->init(argc, argv)) {
sylar::EnvMgr::GetInstance()->printHelp();
return 0;
}
sylar::Config::LoadFromConfDir("conf");
if(sylar::EnvMgr::GetInstance()->has("p")) {
sylar::EnvMgr::GetInstance()->printHelp();
return 0;
}
if(sylar::EnvMgr::GetInstance()->has("d")) {
return sylar::start_daemon(argc, argv, server_main, true);
return 0;
}
if(sylar::EnvMgr::GetInstance()->has("s")) {
return sylar::start_daemon(argc, argv, server_main, false);
}
sylar::EnvMgr::GetInstance()->printHelp();
return 0;
}
配置文件
log.yml
logs:
- name: system
level: info
appenders:
- type: FileLogAppender
file: system.log
- name: server
level: info
appenders:
- type: StdoutLogAppender
- type: FileLogAppender
file: server.log
- name: root
level: info
server.yml
tcp_server:
read_timeout: 120000
http_server:
port: 8080
websocket_server:
port: 8072
前端
这里只展示javascript部分
<script>
let websocket = null;
let logged_in = false;
let my_name = "";
let heartbeat_timer = null;
let closing_by_logout = false;
let online_count = 0;
const HEARTBEAT_INTERVAL = 30000;
function startHeartbeat() {
stopHeartbeat();
heartbeat_timer = setInterval(() => {
if (websocket && websocket.readyState === WebSocket.OPEN) {
websocket.send(JSON.stringify({ type: "ping" }));
}
}, HEARTBEAT_INTERVAL);
}
function stopHeartbeat() {
if (heartbeat_timer) {
clearInterval(heartbeat_timer);
heartbeat_timer = null;
}
}
function resetUIToLogin() {
stopHeartbeat();
logged_in = false;
online_count = 0;
document.getElementById("login-mask").classList.remove("hidden");
document.getElementById("send-btn").disabled = true;
document.getElementById("user-info").classList.add("hidden");
document.getElementById("online-count").style.display = "none";
}
function addMessage(type, content, meta = "") {
const box = document.getElementById("message");
const div = document.createElement("div");
div.className = "msg " + type;
if (meta) {
const m = document.createElement("div");
m.className = "meta";
m.innerText = meta;
div.appendChild(m);
}
const text = document.createElement("div");
text.innerText = content;
div.appendChild(text);
box.appendChild(div);
box.scrollTop = box.scrollHeight;
}
function onSocketClosed() {
websocket = null;
if (!closing_by_logout) {
resetUIToLogin();
addMessage("system", "连接已断开");
}
closing_by_logout = false;
}
function logout() {
if (!websocket || websocket.readyState !== WebSocket.OPEN) return;
closing_by_logout = true;
resetUIToLogin();
addMessage("system", "你已退出聊天室");
websocket.send(JSON.stringify({ type: "logout_request" }));
setTimeout(() => {
websocket && websocket.close(1000, "logout");
}, 50);
}
function login() {
const name = document.getElementById("tname").value.trim();
if (!name) return;
my_name = name;
websocket = new WebSocket("ws://localhost:8072/sylar/chat");
websocket.onopen = () => {
websocket.send(JSON.stringify({
type: "login_request",
name
}));
};
websocket.onmessage = e => {
let o;
try {
o = JSON.parse(e.data);
} catch (err) {
return;
}
if (o.type === "pong") return;
if (o.type === "login_response") {
if (o.result == 200) {
logged_in = true;
document.getElementById("login-mask").classList.add("hidden");
document.getElementById("send-btn").disabled = false;
document.getElementById("user-name").innerText = my_name;
document.getElementById("user-info").classList.remove("hidden");
document.getElementById("online-count").style.display = "inline-block";
addMessage("system", "登录成功");
startHeartbeat();
} else {
addMessage("system", o.msg);
}
} else if (o.type === "online_count") {
online_count = parseInt(o.count, 10) || 0;
document.getElementById("online-count").innerText = `🟢 在线 ${online_count} 人`;
} else if (o.type === "user_enter") {
addMessage("system", `${o.name} 加入聊天室`);
} else if (o.type === "user_leave") {
addMessage("system", `${o.name} 离开聊天室`);
} else if (o.type === "msg") {
const cls = o.name === my_name ? "self" : "other";
addMessage(cls, o.msg, o.name + " · " + o.time);
}
};
websocket.onclose = onSocketClosed;
websocket.onerror = onSocketClosed;
}
function sendmsg() {
const input = document.getElementById("msg");
const text = input.value.trim();
if (!text || !logged_in) return;
websocket.send(JSON.stringify({
type: "send_request",
msg: text
}));
input.value = "";
}
window.onload = () => {
document.getElementById("tname").addEventListener("keydown", e => {
if (e.key === "Enter") login();
});
document.getElementById("msg").addEventListener("keydown", e => {
if (e.key === "Enter") sendmsg();
});
};
</script>
测试
http://ayanami.blue:8080/html/index.html
聊天室已经部署到本网站服务器的8080端口




!?强强?!
!?强强?!