记录一下最近这学期学习的sylar服务器框架项目,输出整理一下项目的结构,用到的知识和自己的体会
项目仓库地址
https://github.com/sylar-yin/sylar/
整理博客过程中参考的大佬资料链接:
============================================
基础介绍
HttpConnection模块能让服务器像客户端一样发送http请求报文的能力
该模块主要由URI模块,继承自SocketStream的HttpConnection类,HttpResult类以及HttpConnectionPool类组成
URI模块
定义了一个URI类,可以完成URI的解析,基于有限状态机ragel进行解析
/*
foo://user@sylar.com:8042/over/there?name=ferret#nose
\_/ \______________/\_________/ \_________/ \__/
| | | | |
scheme authority path query fragment
*/
// URI类
class Uri {
public:
typedef std::shared_ptr
<Uri> ptr;
static Uri::ptr Create(const std::string& uri); // 创建URI
Uri();
const std::string& getScheme() const { return m_scheme; }
const std::string& getUserinfo() const { return m_userinfo; }
const std::string& getHost() const { return m_host; }
const std::string& getPath() const;
const std::string& getQuery() const { return m_query; }
const std::string& getFragment() const { return m_fragment; }
int32_t getPort() const;
void setScheme(const std::string& v) { m_scheme = v; }
void setUserinfo(const std::string& v) { m_userinfo = v; }
void setHost(const std::string& v) { m_host = v; }
void setPath(const std::string& v) { m_path = v; }
void setQuery(const std::string& v) { m_query = v; }
void setFragment(const std::string& v) { m_fragment = v; }
void setPort(int32_t v) { m_port = v;}
std::ostream& dump(std::ostream& os) const; // 转字符串并插到流
std::string toString() const; // 转成字符串
Address::ptr createAddress() const; // 使用URI的host字段创建地址
private:
bool isDefaultPort() const; // 是否为默认端口(80 443)
private:
std::string m_scheme; // scheme
std::string m_userinfo; // 用户信息
std::string m_host; // 主机
std::string m_path; // 路径
std::string m_query; // 查询参数(?)
std::string m_fragment; // fragment(#)
int32_t m_port; // 端口
};
有限状态机源代码uri.rl如下
#include "uri.h"
#include
<sstream>
namespace sylar {
%%{
# See RFC 3986: http://www.ietf.org/rfc/rfc3986.txt
machine uri_parser;
gen_delims = ":" | "/" | "?" | "#" | "[" | "]" | "@";
sub_delims = "!" | "$" | "&" | "'" | "(" | ")" | "*" | "+" | "," | ";" | "=";
reserved = gen_delims | sub_delims;
unreserved = alpha | digit | "-" | "." | "_" | "~";
pct_encoded = "%" xdigit xdigit;
action marku { mark = fpc; }
action markh { mark = fpc; }
action save_scheme
{
uri->setScheme(std::string(mark, fpc - mark));
mark = NULL;
}
scheme = (alpha (alpha | digit | "+" | "-" | ".")*) >marku %save_scheme;
action save_port
{
if (fpc != mark) {
uri->setPort(atoi(mark));
}
mark = NULL;
}
action save_userinfo
{
if(mark) {
//std::cout << std::string(mark, fpc - mark) << std::endl;
uri->setUserinfo(std::string(mark, fpc - mark));
}
mark = NULL;
}
action save_host
{
if (mark != NULL) {
//std::cout << std::string(mark, fpc - mark) << std::endl;
uri->setHost(std::string(mark, fpc - mark));
}
}
userinfo = (unreserved | pct_encoded | sub_delims | ":")*;
dec_octet = digit | [1-9] digit | "1" digit{2} | 2 [0-4] digit | "25" [0-5];
IPv4address = dec_octet "." dec_octet "." dec_octet "." dec_octet;
h16 = xdigit{1,4};
ls32 = (h16 ":" h16) | IPv4address;
IPv6address = ( (h16 ":"){6} ls32) |
( "::" (h16 ":"){5} ls32) |
(( h16)? "::" (h16 ":"){4} ls32) |
(((h16 ":"){1} h16)? "::" (h16 ":"){3} ls32) |
(((h16 ":"){2} h16)? "::" (h16 ":"){2} ls32) |
(((h16 ":"){3} h16)? "::" (h16 ":"){1} ls32) |
(((h16 ":"){4} h16)? "::" ls32) |
(((h16 ":"){5} h16)? "::" h16 ) |
(((h16 ":"){6} h16)? "::" );
IPvFuture = "v" xdigit+ "." (unreserved | sub_delims | ":")+;
IP_literal = "[" (IPv6address | IPvFuture) "]";
reg_name = (unreserved | pct_encoded | sub_delims)*;
host = IP_literal | IPv4address | reg_name;
port = digit*;
authority = ( (userinfo %save_userinfo "@")? host >markh %save_host (":" port >markh %save_port)? ) >markh;
action save_segment
{
mark = NULL;
}
action save_path
{
//std::cout << std::string(mark, fpc - mark) << std::endl;
uri->setPath(std::string(mark, fpc - mark));
mark = NULL;
}
# pchar = unreserved | pct_encoded | sub_delims | ":" | "@";
# add (any -- ascii) support chinese
pchar = ( (any -- ascii ) | unreserved | pct_encoded | sub_delims | ":" | "@" ) ;
segment = pchar*;
segment_nz = pchar+;
segment_nz_nc = (pchar - ":")+;
action clear_segments
{
}
path_abempty = (("/" segment))? ("/" segment)*;
path_absolute = ("/" (segment_nz ("/" segment)*)?);
path_noscheme = segment_nz_nc ("/" segment)*;
path_rootless = segment_nz ("/" segment)*;
path_empty = "";
path = (path_abempty | path_absolute | path_noscheme | path_rootless | path_empty);
action save_query
{
//std::cout << std::string(mark, fpc - mark) << std::endl;
uri->setQuery(std::string(mark, fpc - mark));
mark = NULL;
}
action save_fragment
{
//std::cout << std::string(mark, fpc - mark) << std::endl;
uri->setFragment(std::string(mark, fpc - mark));
mark = NULL;
}
query = (pchar | "/" | "?")* >marku %save_query;
fragment = (pchar | "/" | "?")* >marku %save_fragment;
hier_part = ("//" authority path_abempty > markh %save_path) | path_absolute | path_rootless | path_empty;
relative_part = ("//" authority path_abempty) | path_absolute | path_noscheme | path_empty;
relative_ref = relative_part ( "?" query )? ( "#" fragment )?;
absolute_URI = scheme ":" hier_part ( "?" query )? ;
# Obsolete, but referenced from HTTP, so we translate
relative_URI = relative_part ( "?" query )?;
URI = scheme ":" hier_part ( "?" query )? ( "#" fragment )?;
URI_reference = URI | relative_ref;
main := URI_reference;
write data;
}%%
Uri::ptr Uri::Create(const std::string& uristr) {
Uri::ptr uri(new Uri);
int cs = 0;
const char* mark = 0;
%% write init;
const char *p = uristr.c_str();
const char *pe = p + uristr.size();
const char* eof = pe;
%% write exec;
if(cs == uri_parser_error) {
return nullptr;
} else if(cs >= uri_parser_first_final) {
return uri;
}
return nullptr;
}
Uri::Uri()
:m_port(0) {
}
bool Uri::isDefaultPort() const {
if(m_port == 0) {
return true;
}
if(m_scheme == "http"
|| m_scheme == "ws") {
return m_port == 80;
} else if(m_scheme == "https"
|| m_scheme == "wss") {
return m_port == 443;
}
return false;
}
const std::string& Uri::getPath() const {
static std::string s_default_path = "/";
return m_path.empty() ? s_default_path : m_path;
}
int32_t Uri::getPort() const {
if(m_port) {
return m_port;
}
if(m_scheme == "http"
|| m_scheme == "ws") {
return 80;
} else if(m_scheme == "https"
|| m_scheme == "wss") {
return 443;
}
return m_port;
}
std::ostream& Uri::dump(std::ostream& os) const {
os << m_scheme << "://"
<< m_userinfo
<< (m_userinfo.empty() ? "" : "@")
<< m_host
<< (isDefaultPort() ? "" : ":" + std::to_string(m_port))
<< getPath()
<< (m_query.empty() ? "" : "?")
<< m_query
<< (m_fragment.empty() ? "" : "#")
<< m_fragment;
return os;
}
std::string Uri::toString() const {
std::stringstream ss;
dump(ss);
return ss.str();
}
Address::ptr Uri::createAddress() const {
auto addr = Address::LookupAnyIPAddress(m_host);
if(addr) {
addr->setPort(getPort());
}
return addr;
}
}
HttpResult
该结构体用于存储HTTP响应结果
// HTTP响应结果类
struct HttpResult {
typedef std::shared_ptr
<HttpResult> ptr;
// 错误枚举
enum class Error {
OK = 0, // 正常
INVALID_URL = 1, // 非法URL
INVALID_HOST = 2, // 无法解析host
CONNECT_FAIL = 3, // 连接失败
SEND_CLOSE_BY_PEER = 4, // 连接被对端关闭
SEND_SOCKET_ERROR = 5, // 发送请求产生socket错误
TIMEOUT = 6, // 超时
CREATE_SOCK_ERROR = 7, // 创建socket失败
POOL_GET_CONNECTION = 8, // 从连接池中获取连接失败
POOL_INVALID_CONNECTION = 9 // 连接池无效的连接
};
HttpResult(int _result, HttpResponse::ptr _response, const std::string& _error)
:result(_result)
,response(_response)
,error(_error) {}
int result; // 错误码
HttpResponse::ptr response; // HTTP响应类
std::string error; // 错误描述
std::string toString() const; // 字符串方法
};
std::string HttpResult::toString() const {
std::stringstream ss;
ss << "[HttpResult result=" << result
<< " error=" << error
<< " response=" << (response ? response->toString() : "nullptr")
<< "]";
return ss.str();
}
HttpConnection
SocketStream的子类,提供了发送各种方法请求的接口
// HTTP连接类(继承自SocketStream)
class HttpConnection : public SocketStream {
friend class HttpConnectionPool;
public:
typedef std::shared_ptr
<HttpConnection> ptr;
// 发送GET请求(url字符串)
static HttpResult::ptr DoGet(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
// 发送GET请求(Uri指针)
static HttpResult::ptr DoGet(Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
// 发送POST请求(URL字符串)
static HttpResult::ptr DoPost(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
// 发送POST请求(Uri指针)
static HttpResult::ptr DoPost(Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
// 发送请求(URL字符串)
static HttpResult::ptr DoRequest(HttpMethod method
, const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
// 发送请求(Uri指针)
static HttpResult::ptr DoRequest(HttpMethod method
, Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
// 发送请求(HttpRequest指针)
static HttpResult::ptr DoRequest(HttpRequest::ptr req
, Uri::ptr uri
, uint64_t timeout_ms);
HttpConnection(Socket::ptr sock, bool owner = true);
~HttpConnection();
HttpResponse::ptr recvResponse(); // 接收响应报文
int sendRequest(HttpRequest::ptr req); // 发送请求报文
private:
uint64_t m_createTime = 0; // 创建时间
uint64_t m_request = 0; // 请求超时时间
};
HttpConnection::HttpConnection(Socket::ptr sock, bool owner)
:SocketStream(sock, owner) {
}
HttpConnection::~HttpConnection() {
SYLAR_LOG_DEBUG(g_logger) << "HttpConnection::~HttpConnection";
}
HttpResponse::ptr HttpConnection::recvResponse() {
// 创建解析器
HttpResponseParser::ptr parser(new HttpResponseParser);
uint64_t buff_size = HttpRequestParser::GetHttpRequestBufferSize();
// 智能指针接管
std::shared_ptr
<char> buffer(
new char[buff_size + 1], [](char* ptr){
delete[] ptr;
});
char* data = buffer.get();
int offset = 0;
// 解析工作循环
do {
int len = read(data + offset, buff_size - offset);
if(len <= 0) {
close();
return nullptr;
}
len += offset;
data[len] = '\0';
size_t nparse = parser->execute(data, len, false);
if(parser->hasError()) {
close();
return nullptr;
}
offset = len - nparse;
if(offset == (int)buff_size) {
close();
return nullptr;
}
if(parser->isFinished()) {
break;
}
} while(true);
auto client_parser = parser->getParser();
// 如果chunk
if(client_parser.chunked) {
std::string body;
int len = offset;
do {
do {
int rt = read(data + len, buff_size - len);
if(rt <= 0) {
close();
return nullptr;
}
len += rt;
data[len] = '\0';
size_t nparse = parser->execute(data, len, true);
if(parser->hasError()) {
close();
return nullptr;
}
len -= nparse;
if(len == (int)buff_size) {
close();
return nullptr;
}
} while(!parser->isFinished());
len -= 2;
SYLAR_LOG_INFO(g_logger) << "content_len=" << client_parser.content_len;
if(client_parser.content_len <= len) {
body.append(data, client_parser.content_len);
memmove(data, data + client_parser.content_len
, len - client_parser.content_len);
len -= client_parser.content_len;
} else {
body.append(data, len);
int left = client_parser.content_len - len;
while(left > 0) {
int rt = read(data, left > (int)buff_size ? (int)buff_size : left);
if(rt <= 0) {
close();
return nullptr;
}
body.append(data, rt);
left -= rt;
}
len = 0;
}
} while(client_parser.chunks_done);
parser->getData()->setBody(body);
} else { // 非chunk
int64_t length = parser->getContentLength();
if(length > 0) {
std::string body;
body.resize(length);
int len = 0;
if(length >= offset) {
memcpy(&body[0], data, length);
len = offset;
} else {
memcpy(&body[0], data, length);
len = length;
}
length -= offset;
if(length > 0) {
if(readFixSize(&body[len], length) <= 0) {
close();
return nullptr;
}
}
parser->getData()->setBody(body);
}
}
// 返回解析完毕数据
return parser->getData();
}
int HttpConnection::sendRequest(HttpRequest::ptr req) {
std::stringstream ss;
ss << *req;
std::string data = ss.str();
return writeFixSize(data.c_str(), data.size());
}
HttpResult::ptr HttpConnection::DoGet(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers
, const std::string& body) {
Uri::ptr uri = Uri::Create(url);
if(!uri) {
return std::make_shared
<HttpResult>((int)HttpResult::Error::INVALID_URL
, nullptr, "invalid url: " + url);
}
return DoGet(uri, timeout_ms, headers, body);
}
// ...
HttpConnectionPool
仅在长连接时有效,预备一池子的HttpConnection指针,复用http连接
// HTTP连接池(仅在Connection: keep-alive时有效)
class HttpConnectionPool {
public:
typedef std::shared_ptr
<HttpConnectionPool> ptr;
typedef Mutex MutexType;
HttpConnectionPool(const std::string& host
, const std::string& vhost
, uint32_t port
, uint32_t max_size
, uint32_t max_alive_time
, uint32_t max_request);
HttpConnection::ptr getConnection(); // 获得连接
HttpResult::ptr doGet(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
HttpResult::ptr doGet(Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
HttpResult::ptr doPost(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
HttpResult::ptr doPost(Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
HttpResult::ptr doRequest(HttpMethod method
, const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
HttpResult::ptr doRequest(HttpMethod method
, Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers = {}
, const std::string& body = "");
HttpResult::ptr doRequest(HttpRequest::ptr req
, uint64_t timeout_ms);
private:
static void ReleasePtr(HttpConnection* ptr, HttpConnectionPool* pool); // 释放连接
private:
std::string m_host; // 主机
std::string m_vhost;
uint32_t m_port; // 端口
uint32_t m_maxSize; // 连接最大数
uint32_t m_maxAliveTime; // 连接时长
uint32_t m_maxRequest; // 请求时长
MutexType m_mutex; // 互斥锁
std::list<HttpConnection*> m_conns; // HttpConnection指针链表
std::atomic
<int32_t> m_total = {0}; // 连接的数量
};
HttpConnectionPool::HttpConnectionPool(const std::string& host
, const std::string& vhost
, uint32_t port
, uint32_t max_size
, uint32_t max_alive_time
, uint32_t max_request)
:m_host(host)
,m_vhost(vhost)
,m_port(port)
,m_maxSize(max_size)
,m_maxAliveTime(max_alive_time)
,m_maxRequest(max_request) {
}
HttpConnection::ptr HttpConnectionPool::getConnection() {
uint64_t now_ms = sylar::GetCurrentMs();
std::vector<HttpConnection*> invalid_conns;
HttpConnection* ptr = nullptr;
Mutex::Lock lock(m_mutex);
// 若连接池不为空,取出一个connection
while(!m_conns.empty()) {
auto conn = *m_conns.begin();
m_conns.pop_front();
if(!conn->isConnected()) {
invalid_conns.push_back(conn);
continue;
}
if((conn->m_createTime + m_maxAliveTime) > now_ms) {
invalid_conns.push_back(conn);
continue;
}
ptr = conn;
break;
}
lock.unlock();
// 删除非法连接
for(auto i : invalid_conns) {
delete i;
}
m_total -= invalid_conns.size();
// 若没有连接
if(!ptr) {
// 创建地址
IPAddress::ptr addr = Address::LookupAnyIPAddress(m_host);
if(!addr) {
SYLAR_LOG_ERROR(g_logger) << "get addr fail: " << *addr;
return nullptr;
}
// 设置端口
addr->setPort(m_port);
Socket::ptr sock = Socket::CreateTCP(addr);
if(!sock) {
SYLAR_LOG_ERROR(g_logger) << "create sock fail: " << *addr;
return nullptr;
}
// 连接
if(!sock->connect(addr)) {
SYLAR_LOG_ERROR(g_logger) << "sock connect fail: " << *addr;
return nullptr;
}
ptr = new HttpConnection(sock);
m_total++;
}
return HttpConnection::ptr(ptr, std::bind(&HttpConnectionPool::ReleasePtr
, std::placeholders::_1, this));
}
void HttpConnectionPool::ReleasePtr(HttpConnection* ptr, HttpConnectionPool* pool) {
ptr->m_request++;
if(!ptr->isConnected()
|| ((ptr->m_createTime + pool->m_maxAliveTime) >= sylar::GetCurrentMs())
|| (ptr->m_request >= pool->m_maxRequest)) {
delete ptr;
pool->m_total--;
return;
}
MutexType::Lock lock(pool->m_mutex);
// 重新加入连接池
pool->m_conns.push_back(ptr);
}
HttpResult::ptr HttpConnectionPool::doGet(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers
, const std::string& body) {
return doRequest(HttpMethod::GET, url, timeout_ms, headers, body);
}
HttpResult::ptr HttpConnectionPool::doGet(Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers
, const std::string& body) {
std::stringstream ss;
ss << uri->getPath()
<< (uri->getQuery().empty() ? "" : "?")
<< uri->getQuery()
<< (uri->getFragment().empty() ? "" : "#")
<< uri->getFragment();
return doGet(ss.str(), timeout_ms, headers, body);
}
HttpResult::ptr HttpConnectionPool::doPost(const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers
, const std::string& body) {
return doRequest(HttpMethod::POST, url, timeout_ms, headers, body);
}
// ...
HttpResult::ptr HttpConnectionPool::doRequest(HttpMethod method
, const std::string& url
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers
, const std::string& body) {
HttpRequest::ptr req = std::make_shared
<HttpRequest>();
req->setPath(url);
req->setMethod(method);
req->setClose(false);
bool has_host = false;
for(auto& i : headers) {
if(strcasecmp(i.first.c_str(), "connection") == 0) {
if(strcasecmp(i.second.c_str(), "keep-alive") == 0) {
req->setClose(false);
}
continue;
}
if(!has_host && strcasecmp(i.first.c_str(), "host") == 0) {
has_host = !i.second.empty();
}
req->setHeader(i.first, i.second);
}
if(!has_host) {
if(m_vhost.empty()) {
req->setHeader("Host", m_host);
} else {
req->setHeader("Host", m_vhost);
}
}
req->setBody(body);
return doRequest(req, timeout_ms);
}
HttpResult::ptr HttpConnectionPool::doRequest(HttpMethod method
, Uri::ptr uri
, uint64_t timeout_ms
, const std::map<std::string, std::string>& headers
, const std::string& body) {
std::stringstream ss;
ss << uri->getPath()
<< (uri->getQuery().empty() ? "" : "?")
<< uri->getQuery()
<< (uri->getFragment().empty() ? "" : "#")
<< uri->getFragment();
return doRequest(method, ss.str(), timeout_ms, headers, body);
}
HttpResult::ptr HttpConnectionPool::doRequest(HttpRequest::ptr req
, uint64_t timeout_ms) {
auto conn = getConnection();
if(!conn) {
return std::make_shared
<HttpResult>((int)HttpResult::Error::POOL_GET_CONNECTION
, nullptr, "pool host:" + m_host + " port:" + std::to_string(m_port));
}
auto sock = conn->getSocket();
if(!sock) {
return std::make_shared
<HttpResult>((int)HttpResult::Error::POOL_INVALID_CONNECTION
, nullptr, "pool host:" + m_host + " port:" + std::to_string(m_port));
}
sock->setRecvTimeout(timeout_ms);
int rt = conn->sendRequest(req);
if(rt == 0) {
return std::make_shared
<HttpResult>((int)HttpResult::Error::SEND_CLOSE_BY_PEER
, nullptr, "send request closed by peer: " + sock->getRemoteAddress()->toString());
}
if(rt < 0) {
return std::make_shared
<HttpResult>((int)HttpResult::Error::SEND_SOCKET_ERROR
, nullptr, "send request socket error errno=" + std::to_string(errno)
+ " errstr=" + std::string(strerror(errno)));
}
auto rsp = conn->recvResponse();
if(!rsp) {
return std::make_shared
<HttpResult>((int)HttpResult::Error::TIMEOUT
, nullptr, "recv response timeout: " + sock->getRemoteAddress()->toString()
+ " timeout_ms:" + std::to_string(timeout_ms));
}
return std::make_shared
<HttpResult>((int)HttpResult::Error::OK, rsp, "ok");
}
测试
创建一个连接池,目标www.baidu.com的80端口,最大连接数10,最大连接时长30秒,单个连接最多复用5次
#include
<iostream>
#include "sylar/http/http_connection.h"
#include "sylar/log.h"
#include "sylar/iomanager.h"
static sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();
void test_pool() {
sylar::http::HttpConnectionPool::ptr pool(new sylar::http::HttpConnectionPool(
"www.baidu.com", "", 80, 10, 1000 * 30, 5));
sylar::IOManager::GetThis()->addTimer(1000, [pool](){
auto r = pool->doGet("/", 300);
SYLAR_LOG_INFO(g_logger) << r->toString().substr(0, 500);
}, true);
}
int main(int argc, char** argv) {
sylar::IOManager iom(2);
iom.schedule(test_pool);
// iom.schedule(run);
return 0;
}
可以看到符合预期

总结
提供了http客户端功能




严肃困惑中……