TimelineServer: 代码介绍
头文件参考资料:C_POSIX_library
TimelineServer::Buffer
- 整体结构
- 用
std::vector<char>
存储字符 - 用两个指针
std::atomic<std::size_t>
记录读写位置
- 用
写入
在每次写入之前都先用
make_space
保证剩余可写空间足够- 在空间不足时用
std::copy
将存储的字符移动到读取过的地方 - 如果依然不够就用
std::vector::resize
申请足够的空间(此时不再移动存储的字符)
- 在空间不足时用
再用
std::copy
写入为
snprintf
函数预留了一个move_write_ptr
函数用来只移动写指针不写入内容,在调用该函数之前需要手动先调用一下make_space
来保证有足够的空间为文件读写(读取文件写入缓存)预留了
ssize_t read_fd(int fd, int* errno_)
函数,该函数有读写上限,读完之后需要再读一次,判断是否读完了
读取
- 为字符串读取预留了
string read(size_t len)
和string read_all()
函数 - 为文件读写(读取缓存写入文件)预留了
ssize_t write_fd(int fd, int* errno_)
函数
TimelineServer::LogQueue
整体结构是一个用 std::mutex
和 std::condition_variable
做了多线程优化的队列
最重要的两个函数:
push
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19template <class T>
bool LogQueue<T>::push(const T& item) {
{
std::unique_lock<std::mutex> locker(deq_mtx_);
while (deq_.size() >= capacity_) {
// condition_variable::wait
// 在阻塞之前会执行 locker.unlock
// 在被唤醒后会执行 locker.lock
cond_producer_.wait(locker);
if (is_closed_) {
return false;
}
}
deq_.push_back(item);
}
cond_consumer_.notify_one();
return true;
}pop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template <class T>
bool LogQueue<T>::pop(T& item) {
{
std::unique_lock<std::mutex> locker(deq_mtx_);
while (deq_.empty()) {
cond_consumer_.wait(locker);
if (is_closed_) {
return false;
}
}
item = deq_.front();
deq_.pop_front();
}
cond_producer_.notify_one();
return true;
}TimelineServer::Log
整体结构用 “单例模式”+”宏” 来实现,在使用前需要初始化
1
Log::get_instance()->init(LOG_LEVEL::ELL_INFO, "../log", ".log", 0);
分为同步/异步两种模式
同步模式 (max_capacity 参数设为 0)
在写入的时候直接执行
1
2fputs(buffer_.get_read_ptr(), fp_);
fflush(fp_);异步模式 (max_capacity 参数大于 0)
创建一个线程将日志从
LogQueue
写入文件1
2
3
4
5
6
7
8void Log::async_write() {
string log_message = "";
while (log_queue_->pop(log_message)) {
std::lock_guard<std::mutex> locker(fp_mtx_);
fputs(log_message.data(), fp_);
flush();
}
}写入日志的时候将日志写入
LogQueue
1
log_queue_->push(buffer_.read_all());
Log 宏
1
2
3
4
5
6
7
8
9
10
11
12
13// 使用 while(0) 把宏包起来,可以使其不受括号,分号等的影响
do { \
TimelineServer::Log* log = TimelineServer::Log::get_instance(); \
if (log->get_level() <= level) { \
log->write_buffer(level, format, ##__VA_ARGS__); \
} \
} while (0);
do { \
LOG_BASE(TimelineServer::LOG_LEVEL::ELL_DEBUG, format, ##__VA_ARGS__); \
} while (0);关于 Log 宏中为什么要用
do{}while(0);
给包起来, 参考资料: 参考资料|stackoverflow
简单来说是用来解决以下的问题的,
直接定义操作
或用{}包起来
或用if-else包起来
都不行
1
2
3
4 if (xxx)
MACROS(xxx);
else
func();最后关于 Log 宏中
##__VA_ARGS__
前面的##
,参考资料: 参考资料|stackoverflowNote: some compilers offer an extension that allows ## to appear after a comma and before VA_ARGS, in which case the ## does nothing when the variable arguments are present, but removes the comma when the variable arguments are not present: this makes it possible to define macros such as fprintf (stderr, format, ##VA_ARGS). This can also be achieved in a standard manner using VA_OPT, such as fprintf (stderr, format VA_OPT(, ) VA_ARGS). (since C++20)
简单来说就是用来去掉没用的逗号,如果没有这个 “##”,
LOG_INFO(“some info”) 必须写成 LOG_INFO(“%s”, “some info”),
进而被扩展成 log->write_buffer(level, %s, “some thing”);
否则会被扩展成 log->write_buffer(level, “some thing”, ); (多一个逗号)
TimelineServer::Timer
整体结构是一个用 std::vector<TimerNode>
为底层的小顶堆,同时用 std::unordered_map<timer_id, size_t>
存储定时器的 id
和存储位置的对应关系
主要 API:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 增加/删除定时器
void add_timer(timer_id id, int timeout, const timeout_cb& call_back);
void pop_timer();
// 调用某个定时器的回调函数/重新调整某个定时器的时间
void do_work(timer_id id);
void adjust(timer_id id, int timeout);
// 删除所有定时器/查询定时器是否到时间,激活并删除
void clear();
// 执行所有未执行的到时定时器
void tick();
// 获取距离下一个定时器触发的时间差(并执行tick)
int get_next_timeout();定时器节点
1
2
3
4
5
6struct TimerNode {
timer_id id;
time_stamp expires;
timeout_cb cb;
bool operator<(const TimerNode& t) { return expires < t.expires; }
};向上调整节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14void Timer::siftup_(size_t i) {
assert(i >= 0 && i < heap_.size());
// 二叉树中的父节点
size_t j = (i - 1) / 2;
while (j >= 0) {
// 父节点小于当前节点,停止传递
if (heap_[j] < heap_[i]) break;
// 当前节点小于父节点,则必然也小于兄弟节点
swap_(i, j);
i = j;
j = (i - 1) / 2;
}
}向下调整节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20bool Timer::siftdown_(size_t i, size_t n) {
assert(i >= 0 && i < heap_.size());
assert(n >= 0 && n <= heap_.size());
int init_pos = i;
// 二叉树中的左子结点
size_t j = i * 2 + 1;
while (j < n) {
// 查找子结点中较小的(否则交换后新节点不满足小于子结点的要求)
if (j + 1 < n && heap_[j + 1] < heap_[j]) j++;
// 当前节点小于所有子节点,停止传递
if (heap_[i] < heap_[j]) break;
swap_(i, j);
i = j;
j = i * 2 + 1;
}
// 返回是否进行过交换
return i > init_pos;
}调整节点
1
2
3
4
5
6
7
8
9
10
11
12void Timer::adjust(timer_id id, int timeout) {
// 保证该元素存在
assert(!heap_.empty() && timer_ref_.count(id) > 0);
// 调整触发时间
heap_[timer_ref_[id]].expires = Clock::now() + MS(timeout);
// 调整该元素位置(加一个限制位置,防止下标越界)
if (!siftdown_(timer_ref_[id], heap_.size())) {
// 节点无法向下调整,则向上调整
siftup_(timer_ref_[id]);
}
}TimelineServer::ThreadPool
使用一个任务队列存储待处理任务
启动多个线程从任务队列拿任务并执行
任务管理队列
1
2
3
4
5
6
7// 不同 Worker 共享的数据
struct Manager {
std::mutex mtx;
std::condition_variable cond;
bool is_closed;
std::queue<std::function<void()>> tasks;
};构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22explicit ThreadPool(size_t workers = 8)
: manager_(std::make_shared<Manager>()) {
assert(workers > 0);
for (size_t i = 0; i < workers; i++) {
std::thread([manager = manager_] {
std::unique_lock<std::mutex> locker(manager->mtx);
while (true) {
if (manager->is_closed) break;
if (!manager->tasks.empty()) {
auto task = std::move(manager->tasks.front());
manager->tasks.pop();
locker.unlock();
// 当且仅当执行任务的时候解开互斥锁
task();
locker.lock();
} else {
manager->cond.wait(locker);
}
}
}).detach();
}
}TimelineServer::SQLConnPool
SQLConnPool
:- 使用单例模式,负责管理与数据库的链接
sql::Connection* get_connect();
: 从链接池中获取一个链接void free_connect(sql::Connection* conn);
: 将链接还给链接池int get_free_connect_count();
: 获取可用链接数量
SQLConn
:- 对
sql::Connection*
的 RAII(Resource Acquisition Is Initialization) 包装 - 创建后自动从链接池中获取链接
- 析构时自动归还链接
bool is_valid()
: 判断链接是否可用
- 对
TimelineServer::Mux
对 epoll
系列函数的封装
// 以后可以尝试用不同系列的 IO 复用函数写一些不同 Mux 的实现
TimelineServer::HttpRequest
解析 Http 报文,提供查询 Header
和 Json
格式的报文体
- 解析报文: 用有限状态机依次解析请求行(请求地址),请求头和请求体
- 解析
Header
- 解析后存储在
std::unordered_map<string, string> header_;
中
- 解析后存储在
- 解析
Json
:- 需要在请求头中配置 “Content-Type: application/json”
- 解析后存储在
Json
对象中
查询接口
1
2
3
4
5
6
7
8
9
10
11const string query_header(const string& key) const;
const string query_header(const char* key) const;
const std::unordered_map<string, string> get_header() const {
return header_;
}
const Json query_post(const string& key) const;
const Json query_post(const char* key) const;
const Json get_post() const {
return post_;
}TimelineServer::HttpResponse
响应解析好的请求报文,优先响应动态请求(调用注册好的回调函数),再响应静态文件请求,并在解析后填充响应报文
回调函数定义:
1
typedef std::function<bool(const HttpRequest& request, Buffer& buffer)> router_cb;
- 响应接口:
void make_response(const HttpRequest& request, Buffer& buffer);
- 注册动态请求回调函数:
static bool register_dynamic_router(string& src, const router_cb& cb);
- 注册静态路由跳转:
static bool register_static_router(string& src, string& des);
TimelineServer::HttpConn
控制 HttpRequest
和 HttpResponse
完成网络请求
- 读取报文:
ssize_t read(int* errno_);
- 响应请求:
bool process();
- 写入链接:
write(int* errno_);
TimelineServer::Server
创建
Server
实例:1
2
3
4
5Server(int port, bool is_ET, int timeout_ms, bool linger_close,
const char* src_dir, const char* sql_host, int sql_port,
const char* sql_user, const char* sql_pwd, const char* sql_db_name,
int pool_sql_conn_num, int pool_thread_num, LOG_LEVEL log_level,
int log_queue_size);注册路由
1
2
3
4
5
6
7
8// 注册静态路由跳转
static bool register_static_router(string& src, string& des);
static bool register_static_router(const char* src, string& des);
static bool register_static_router(string& src, const char* des);
static bool register_static_router(const char* src, const char* des);
// 注册动态回调函数
static bool register_dynamic_router(string& src, const router_cb& cb);
static bool register_dynamic_router(const char* src, const router_cb& cb);**