头文件参考资料:C_POSIX_library

TimelineServer::Buffer

  • 整体结构
    • std::vector<char> 存储字符
    • 用两个指针 std::atomic<std::size_t> 记录读写位置

写入

  • 在每次写入之前都先用 make_space 保证剩余可写空间足够

    • 在空间不足时用 std::copy 将存储的字符移动到读取过的地方
    • 如果依然不够就用 std::vector::resize 申请足够的空间(此时不再移动存储的字符)
  • 再用 std::copy 写入

  • snprintf 函数预留了一个 move_write_ptr 函数用来只移动写指针不写入内容,在调用该函数之前需要手动先调用一下 make_space 来保证有足够的空间

  • 为文件读写(读取文件写入缓存)预留了 ssize_t read_fd(int fd, int* errno_) 函数,该函数有读写上限,读完之后需要再读一次,判断是否读完了

读取

  • 为字符串读取预留了 string read(size_t len)string read_all() 函数
  • 为文件读写(读取缓存写入文件)预留了 ssize_t write_fd(int fd, int* errno_) 函数

TimelineServer::LogQueue

整体结构是一个用 std::mutexstd::condition_variable 做了多线程优化的队列

最重要的两个函数:

  • push

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    template <class T>
    bool LogQueue<T>::push(const T& item) {
    {
    std::unique_lock<std::mutex> locker(deq_mtx_);
    while (deq_.size() >= capacity_) {
    // condition_variable::wait
    // 在阻塞之前会执行 locker.unlock
    // 在被唤醒后会执行 locker.lock
    cond_producer_.wait(locker);
    if (is_closed_) {
    return false;
    }
    }
    deq_.push_back(item);
    }

    cond_consumer_.notify_one();
    return true;
    }
  • pop

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    template <class T>
    bool LogQueue<T>::pop(T& item) {
    {
    std::unique_lock<std::mutex> locker(deq_mtx_);
    while (deq_.empty()) {
    cond_consumer_.wait(locker);
    if (is_closed_) {
    return false;
    }
    }
    item = deq_.front();
    deq_.pop_front();
    }

    cond_producer_.notify_one();
    return true;
    }

    TimelineServer::Log

  • 整体结构用 “单例模式”+”宏” 来实现,在使用前需要初始化

    1
    Log::get_instance()->init(LOG_LEVEL::ELL_INFO, "../log", ".log", 0);
  • 分为同步/异步两种模式

    • 同步模式 (max_capacity 参数设为 0)

      • 在写入的时候直接执行

        1
        2
        fputs(buffer_.get_read_ptr(), fp_);
        fflush(fp_);
      • 异步模式 (max_capacity 参数大于 0)

        • 创建一个线程将日志从 LogQueue 写入文件

          1
          2
          3
          4
          5
          6
          7
          8
          void Log::async_write() {
          string log_message = "";
          while (log_queue_->pop(log_message)) {
          std::lock_guard<std::mutex> locker(fp_mtx_);
          fputs(log_message.data(), fp_);
          flush();
          }
          }
        • 写入日志的时候将日志写入 LogQueue

          1
          log_queue_->push(buffer_.read_all());
  • Log 宏

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 使用 while(0) 把宏包起来,可以使其不受括号,分号等的影响
    #define LOG_BASE(level, format, ...) \
    do { \
    TimelineServer::Log* log = TimelineServer::Log::get_instance(); \
    if (log->get_level() <= level) { \
    log->write_buffer(level, format, ##__VA_ARGS__); \
    } \
    } while (0);

    #define LOG_DEBUG(format, ...) \
    do { \
    LOG_BASE(TimelineServer::LOG_LEVEL::ELL_DEBUG, format, ##__VA_ARGS__); \
    } while (0);

    关于 Log 宏中为什么要用 do{}while(0); 给包起来, 参考资料: 参考资料|stackoverflow

简单来说是用来解决以下的问题的,
直接定义操作用{}包起来用if-else包起来 都不行

1
2
3
4
if (xxx)
MACROS(xxx);
else
func();

最后关于 Log 宏中 ##__VA_ARGS__ 前面的 ##,参考资料: 参考资料|stackoverflow

Note: some compilers offer an extension that allows ## to appear after a comma and before VA_ARGS, in which case the ## does nothing when the variable arguments are present, but removes the comma when the variable arguments are not present: this makes it possible to define macros such as fprintf (stderr, format, ##VA_ARGS). This can also be achieved in a standard manner using VA_OPT, such as fprintf (stderr, format VA_OPT(, ) VA_ARGS). (since C++20)

简单来说就是用来去掉没用的逗号,如果没有这个 “##”,
LOG_INFO(“some info”) 必须写成 LOG_INFO(“%s”, “some info”),
进而被扩展成 log->write_buffer(level, %s, “some thing”);
否则会被扩展成 log->write_buffer(level, “some thing”, ); (多一个逗号)

TimelineServer::Timer

整体结构是一个用 std::vector<TimerNode> 为底层的小顶堆,同时用 std::unordered_map<timer_id, size_t> 存储定时器的 id 和存储位置的对应关系

  • 主要 API:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 增加/删除定时器
    void add_timer(timer_id id, int timeout, const timeout_cb& call_back);
    void pop_timer();

    // 调用某个定时器的回调函数/重新调整某个定时器的时间
    void do_work(timer_id id);
    void adjust(timer_id id, int timeout);

    // 删除所有定时器/查询定时器是否到时间,激活并删除
    void clear();

    // 执行所有未执行的到时定时器
    void tick();

    // 获取距离下一个定时器触发的时间差(并执行tick)
    int get_next_timeout();
  • 定时器节点

    1
    2
    3
    4
    5
    6
    struct TimerNode {
    timer_id id;
    time_stamp expires;
    timeout_cb cb;
    bool operator<(const TimerNode& t) { return expires < t.expires; }
    };
  • 向上调整节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void Timer::siftup_(size_t i) {
    assert(i >= 0 && i < heap_.size());
    // 二叉树中的父节点
    size_t j = (i - 1) / 2;

    while (j >= 0) {
    // 父节点小于当前节点,停止传递
    if (heap_[j] < heap_[i]) break;
    // 当前节点小于父节点,则必然也小于兄弟节点
    swap_(i, j);
    i = j;
    j = (i - 1) / 2;
    }
    }
  • 向下调整节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    bool Timer::siftdown_(size_t i, size_t n) {
    assert(i >= 0 && i < heap_.size());
    assert(n >= 0 && n <= heap_.size());
    int init_pos = i;
    // 二叉树中的左子结点
    size_t j = i * 2 + 1;

    while (j < n) {
    // 查找子结点中较小的(否则交换后新节点不满足小于子结点的要求)
    if (j + 1 < n && heap_[j + 1] < heap_[j]) j++;
    // 当前节点小于所有子节点,停止传递
    if (heap_[i] < heap_[j]) break;
    swap_(i, j);
    i = j;
    j = i * 2 + 1;
    }

    // 返回是否进行过交换
    return i > init_pos;
    }
  • 调整节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    void Timer::adjust(timer_id id, int timeout) {
    // 保证该元素存在
    assert(!heap_.empty() && timer_ref_.count(id) > 0);
    // 调整触发时间
    heap_[timer_ref_[id]].expires = Clock::now() + MS(timeout);

    // 调整该元素位置(加一个限制位置,防止下标越界)
    if (!siftdown_(timer_ref_[id], heap_.size())) {
    // 节点无法向下调整,则向上调整
    siftup_(timer_ref_[id]);
    }
    }

    TimelineServer::ThreadPool

  • 使用一个任务队列存储待处理任务

  • 启动多个线程从任务队列拿任务并执行

  • 任务管理队列

    1
    2
    3
    4
    5
    6
    7
    // 不同 Worker 共享的数据
    struct Manager {
    std::mutex mtx;
    std::condition_variable cond;
    bool is_closed;
    std::queue<std::function<void()>> tasks;
    };
  • 构造函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    explicit ThreadPool(size_t workers = 8)
    : manager_(std::make_shared<Manager>()) {
    assert(workers > 0);
    for (size_t i = 0; i < workers; i++) {
    std::thread([manager = manager_] {
    std::unique_lock<std::mutex> locker(manager->mtx);
    while (true) {
    if (manager->is_closed) break;
    if (!manager->tasks.empty()) {
    auto task = std::move(manager->tasks.front());
    manager->tasks.pop();
    locker.unlock();
    // 当且仅当执行任务的时候解开互斥锁
    task();
    locker.lock();
    } else {
    manager->cond.wait(locker);
    }
    }
    }).detach();
    }
    }

    TimelineServer::SQLConnPool

  • SQLConnPool:

    • 使用单例模式,负责管理与数据库的链接
    • sql::Connection* get_connect();: 从链接池中获取一个链接
    • void free_connect(sql::Connection* conn);: 将链接还给链接池
    • int get_free_connect_count();: 获取可用链接数量
  • SQLConn:

    • sql::Connection* 的 RAII(Resource Acquisition Is Initialization) 包装
    • 创建后自动从链接池中获取链接
    • 析构时自动归还链接
    • bool is_valid(): 判断链接是否可用

TimelineServer::Mux

epoll 系列函数的封装

// 以后可以尝试用不同系列的 IO 复用函数写一些不同 Mux 的实现

TimelineServer::HttpRequest

解析 Http 报文,提供查询 HeaderJson 格式的报文体

  • 解析报文: 用有限状态机依次解析请求行(请求地址),请求头和请求体
  • 解析 Header
    • 解析后存储在 std::unordered_map<string, string> header_;
  • 解析 Json:
    • 需要在请求头中配置 “Content-Type: application/json”
    • 解析后存储在 Json 对象中
  • 查询接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    const string query_header(const string& key) const;
    const string query_header(const char* key) const;
    const std::unordered_map<string, string> get_header() const {
    return header_;
    }

    const Json query_post(const string& key) const;
    const Json query_post(const char* key) const;
    const Json get_post() const {
    return post_;
    }

    TimelineServer::HttpResponse

响应解析好的请求报文,优先响应动态请求(调用注册好的回调函数),再响应静态文件请求,并在解析后填充响应报文

  • 回调函数定义:

    1
    typedef std::function<bool(const HttpRequest& request, Buffer& buffer)> router_cb;
  • 响应接口: void make_response(const HttpRequest& request, Buffer& buffer);
  • 注册动态请求回调函数: static bool register_dynamic_router(string& src, const router_cb& cb);
  • 注册静态路由跳转: static bool register_static_router(string& src, string& des);

TimelineServer::HttpConn

控制 HttpRequestHttpResponse 完成网络请求

  • 读取报文: ssize_t read(int* errno_);
  • 响应请求: bool process();
  • 写入链接: write(int* errno_);

TimelineServer::Server

  • 创建 Server 实例:

    1
    2
    3
    4
    5
    Server(int port, bool is_ET, int timeout_ms, bool linger_close,
    const char* src_dir, const char* sql_host, int sql_port,
    const char* sql_user, const char* sql_pwd, const char* sql_db_name,
    int pool_sql_conn_num, int pool_thread_num, LOG_LEVEL log_level,
    int log_queue_size);
  • 注册路由

    1
    2
    3
    4
    5
    6
    7
    8
    // 注册静态路由跳转
    static bool register_static_router(string& src, string& des);
    static bool register_static_router(const char* src, string& des);
    static bool register_static_router(string& src, const char* des);
    static bool register_static_router(const char* src, const char* des);
    // 注册动态回调函数
    static bool register_dynamic_router(string& src, const router_cb& cb);
    static bool register_dynamic_router(const char* src, const router_cb& cb);**