这一小节我们基于已有的LSM Tree接口, 来设计一个兼容Redis协议的服务层, 使其能替代redis-server处理redis-cli的请求.
代码仓库:https://github.com/Vanilla-Beauty/tiny-lsm
欢迎点个Star
1 Resp协议简介
这里就不再对Redis本身的基础概念进行介绍了, 毕竟Redis是校招八股必背知识点, 大家想必都非常熟悉了. 但大多数朋友可能对Redis通信的Resp协议完全不熟悉, 这里简单介绍一下Resp协议.
Redis的RESP(REdis Serialization Protocol)是Redis客户端与服务器之间通信的协议, 也就是redis-cli和redis-server之间进行通信的协议,它简单、高效,支持多种数据类型。因此, 其只需要描述Redis中的数据类型和请求类型就可以了。
需要说明的是, Resp协议应该属于TCP这一层的协议, 其没有HTTP等协议的头, 实现时我们也不需要http层的框架
1.1 一个简单的案例
假设你在 redis-cli 中输入了以下命令:
redis-cli客户端会将该命令转换为 RESP 协议格式并发送给 Redis 服务器。具体表示如下:
这里其实显式地表达了换行符\r\n, 真实的内容是: *3\r\n$3\r\nSET\r\n$2\r\nk1\r\n$2\r\nv1\r\n
解释:
*3:表示这是一个包含 3 个元素的数组。
$3:表示第一个元素是一个长度为 3 的批量字符串(Bulk String),内容为 SET。
$2:表示第二个元素是一个长度为 2 的批量字符串,内容为 k1。
$2:表示第三个元素是一个长度为 2 的批量字符串,内容为 v1。
\r\n为不同字段之间的分隔符, 且不计入长度
Redis 服务器接收到上述请求后,执行 SET k1 v1 操作,并返回响应。例如:
解释:
+OK:表示这是一个简单字符串(Simple String),值为 OK,表示操作成功。
如果客户端发送的命令或参数有误,Redis 服务器可能会返回错误信息。例如:
解释:
-ERR:表示这是一个错误消息(Error),内容为 syntax error。
1.2 数据类型语法
通过之前的案例我们可以看到, RESP使用一些符号来对数据类型进行标记, 这里简单总结如下:
- 简单字符串(Simple Strings):以”+”开头,如
+OK\r\n。
- 错误(Errors):以”-“开头,如
-ERR unknown command\r\n。
- 整数(Integers):以”:”开头,如
:1000\r\n。
- 批量字符串(Bulk Strings):以”$”开头,如
$6\r\nfoobar\r\n。
- 数组(Arrays):以”*”开头,如
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n。
2 Redis实现思路
这里我们将利用自身的LSM Tree接口来设计一个兼容Redis协议的服务层, 使其能替代redis-server处理redis-cli的请求.
首先我们回顾一下我们的LSM Tree接口支持什么api:
Put(key, value): 将键值对插入到数据库中。
Get(key): 根据键获取对应的值。
Delete(key): 根据键删除对应的值。
Scan(start_key, end_key): 根据起始键和结束键范围获取键值对。(就是上一章实现的谓词查询, 这里的Scan是一个虚拟的接口)
然后我们想一下Redsis的不同数据结构的接口
- 字符串(String):和
LSM Tree一样, 我们也只需要实现Put(key, value)和Get(key)即可
- 列表(List):相当于不同字符串之间有连接
- 哈希(Hash):一个哈希的
key由很多个filed即value组成
- 集合(Set): 一个集合的
key由很多个member, 但不需要排序
- 有序集合(Sorted Set): 集合的
key由很多个member和score组成, 并且需要按照score排序
同时很多key还有一些基础属性, 最常用的就是TTL(过期时间), 当TTL过期时, 该key将不再存在, 我们也可以通过TTL来判断一个key是否过期, 也可以使用EXPIRE来设置一个key的过期时间
其实本质上, 由于我们的存储引擎是KV存储, 我们的哈希的所有数据都将作为基础的key和value进行存储, 这与Redis中的字符串是一致的。而List, Set, Sorted Set等数据结构就需要将多对key进行组合, 并且需要根据一定的规则进行排序, 而且回想我的LSM Tree接口支持的api, 核心思路就只有2类:
List, Set, Sorted Set等数据结构的key的value中要记录所管理成员的元信息
- 归属于某个大数据结构(
List, Set, Sorted Set等)的成员的key需要包含统一的前缀, 这样才能通过我的的存储引擎进行前缀查询
接下来, 本小节将基于上述的思路首先介绍Redis基础字符串(这一部分大概分2-3章吧, 内容有点多)
3 代码实现
3.1 基础封装
我们新建一个include/redis_wrapper/redis_wrapper.h和src/redis_wrapper/redis_wrapper.cpp文件来对我们的存储引擎进行封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
class RedisWrapper { private: std::unique_ptr<LSM> lsm; std::shared_mutex redis_mtx;
std::string set(std::vector<std::string> &args); std::string get(std::vector<std::string> &args); std::string incr(std::vector<std::string> &args); std::string decr(std::vector<std::string> &args); std::string expire(std::vector<std::string> &args); std::string del(std::vector<std::string> &args); std::string ttl(std::vector<std::string> &args);
std::string redis_incr(const std::string &key); std::string redis_decr(const std::string &key); std::string redis_expire(const std::string &key, std::string seconds_count); std::string redis_set(std::string &key, std::string &value); std::string redis_get(std::string &key); std::string redis_del(std::vector<std::string> &args); std::string redis_ttl(std::string &key);
... }
|
这里以redis_set为例, 我们很容易写出下面的代码:
1 2 3 4 5
| std::string RedisWrapper::redis_set(std::string &key, std::string &value) { std::unique_lock<std::shared_mutex> lock(redis_mtx); this->lsm->put(key, value); return "+OK\r\n"; }
|
但这样会有一个问题, 如果这个key原来就存在, 且是有ttl的呢? Redis默认的 SET key value 命令对已存在的 key 进行设置,Redis 会覆盖该 key 的值,并移除原有的 TTL,使该 key 成为一个持久化的 key(永不过期)。但上述代码则缺乏对TTL的考虑, 因此我们需要引入TTL。
3.2 TTL设计
首先我们的每个key都可能伴随着一个过期时间, 这个过期时间我们只需要为其加上一个固定的前缀, 就可以与真实的key关联起来, 称为expire_key, 同时其value直接为时间戳即可,如果这个key对应的expire_key不存在, 则表示该key没有过期时间.
另一方面, 原本的Redis支持后台线程定期清理 和 查询时延迟检查清理 2种检查TTL的方式, 这里为了实现简单, 我们选择查询时延迟检查清理的方式, 在后续查询或写入时检查expire_key的value, 如果发现value小于当前时间戳, 则进行相应的操作.
3.3 考虑了TTL的set/get
所以这里我们首选需要对set进行如下修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| inline std::string get_explire_key(const std::string &key) { return REDIS_EXPIRE_HEADER + key; }
std::string RedisWrapper::redis_set(std::string &key, std::string &value) { std::unique_lock<std::shared_mutex> lock(redis_mtx); this->lsm->put(key, value); std::string expire_key = get_explire_key(key); if (this->lsm->get(expire_key).has_value()) { this->lsm->remove(expire_key); } return "+OK\r\n"; }
|
在get时, 也需要检查TTL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| std::string RedisWrapper::redis_get(std::string &key) { std::shared_lock<std::shared_mutex> rlock(redis_mtx);
auto key_query = this->lsm->get(key);
std::string expire_key = get_explire_key(key); auto expire_query = this->lsm->get(expire_key);
if (key_query.has_value()) { if (expire_query.has_value()) {
if (is_expired(expire_query, nullptr)) { rlock.unlock(); std::unique_lock<std::shared_mutex> wlock(redis_mtx); this->lsm->remove(key); this->lsm->remove(expire_key); return "$-1\r\n"; } else { return "$" + std::to_string(key_query.value().size()) + "\r\n" + key_query.value() + "\r\n"; } } else { return "$" + std::to_string(key_query.value().size()) + "\r\n" + key_query.value() + "\r\n"; } } else { if (expire_query.has_value()) { rlock.unlock(); std::unique_lock<std::shared_mutex> wlock(redis_mtx); this->lsm->remove(expire_key); } } return "$-1\r\n"; }
|
这里还有一个易错点, 我们检查TTL和查询key都是读操作, 因此获取的是读锁, 但我们在由于TTL超时导致key不存在时, 需要清理expire_key和key, 这需要我们进行写锁的升级, 需要先释放读锁, 然获取写锁.
而且get操作比set要复杂, 具体的流程代码里面的注释很清楚了, 就不多说了
3.4 TTL/EXPIRE
这里的expire操作就是简单插入一个expire_key和value即可, value为当前时间戳加上seconds_count
1 2 3 4 5 6 7 8 9 10 11 12
| std::string RedisWrapper::redis_expire(const std::string &key, std::string seconds_count) { std::unique_lock<std::shared_mutex> lock(redis_mtx); std::string expire_key = get_explire_key(key);
auto expire_time_str = get_expire_time(seconds_count);
this->lsm->put(expire_key, expire_time_str);
return ":1\r\n"; }
|
TTL则是直接获取expire_key的value即可, 这里需要注意的是, 如果expire_key不存在, 则表示该key没有过期时间, 因此返回-1即可, 当expire_key存在, 则需要判断是否过期, 如果过期, 则返回-2表示key不存在, 如果没有过期, 则返回剩余的过期时间即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| std::string RedisWrapper::redis_ttl(std::string &key) { std::shared_lock<std::shared_mutex> lock(redis_mtx);
auto key_query = this->lsm->get(key);
std::string expire_key = get_explire_key(key); auto expire_query = this->lsm->get(expire_key);
if (key_query.has_value()) { if (expire_query.has_value()) { std::time_t now_time_t; if (is_expired(expire_query, &now_time_t)) { return ":-2\r\n"; } else { auto now = std::chrono::system_clock::now(); return ":" + std::to_string(std::stoll(expire_query.value()) - now_time_t) + "\r\n"; } } else { return ":-1\r\n"; } } else { return ":-1\r\n"; } }
|
其他的常规接口, 如incr, del等很简单, 逻辑也都差不多, 就不展开介绍了, 可以看源码
4 后端服务器实现
完成了基础的封装后, 我们需要设置一个网络服务器来接受redis-cli的请求, 这里我选择muduo网络库进行网络层的封装, 服务器的代码在server文件夹下
4.1 添加依赖
这里建议直接使用xmake提供的muduo库, 并定义server依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13
| add_requires("muduo")
target("server") set_kind("binary") add_files("server/src/*.cpp") add_deps("redis") add_includedirs("include", {public = true}) add_packages("muduo") set_targetdir("$(buildir)/bin")
|
如果不想使用xmake提供的muduo库, 也可以手动添加依赖, 具体可以参考muduo的文档: https://github.com/chenshuo/muduo-tutorial
4.2 Muduo 网络层封装
这里直接用代码简单介绍下muduo的使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| #include "../../include/redis_wrapper/redis_wrapper.h" #include "../include/handler.h" #include <cstddef> #include <iostream> #include <muduo/base/Logging.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <muduo/net/TcpConnection.h> #include <muduo/net/TcpServer.h> #include <string> #include <unordered_map> #include <vector>
using namespace muduo; using namespace muduo::net;
class RedisServer { public: RedisServer(EventLoop *loop, const InetAddress &listenAddr) : server_(loop, listenAddr, "RedisServer"), redis("example_db") { server_.setConnectionCallback( std::bind(&RedisServer::onConnection, this, std::placeholders::_1)); server_.setMessageCallback( std::bind(&RedisServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); }
void start() { server_.start(); }
void onConnection(const TcpConnectionPtr &conn) { if (conn->connected()) { LOG_INFO << "Connection from " << conn->peerAddress().toIpPort(); } else { LOG_INFO << "Connection closed from " << conn->peerAddress().toIpPort(); } }
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg(buf->retrieveAllAsString()); LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;
std::string response = handleRequest(msg); conn->send(response); }
std::string handleRequest(const std::string &request) { size_t pos = 0;
if (request.empty() || request[pos] != '*') { return "-ERR Protocol error: expected '*'\r\n"; }
int numElements = 0; try { numElements = std::stoi(request.substr(pos + 1)); } catch (const std::exception &) { return "-ERR Protocol error: invalid number of elements\r\n"; } pos = request.find('\n', pos) + 1;
LOG_INFO << "request: " << request << '\n'; LOG_INFO << "Number of elements: " << numElements << '\n';
std::vector<std::string> args;
for (int i = 0; i < numElements; ++i) { if (pos >= request.size() || request[pos] != '$') { LOG_INFO << "pos = " << pos << ", i = " << i << ", last args = " << args.back() << '\n'; LOG_INFO << "-ERR Protocol error: expected '$'\r\n"; return "-ERR Protocol error: expected '$'\r\n"; }
int len = 0; std::string value_len; int next_n_pos; try { next_n_pos = request.find('\n', pos); len = std::stoi(request.substr(pos + 1)); } catch (const std::exception &) { LOG_INFO << "-ERR Protocol error: invalid bulk string length\r\n"; return "-ERR Protocol error: invalid bulk string length\r\n"; } pos = next_n_pos + 1; if (pos + len > request.size()) { LOG_INFO << "-ERR Protocol error: bulk string length exceeds request " "size\r\n"; return "-ERR Protocol error: bulk string length exceeds request " "size\r\n"; } args.push_back(request.substr(pos, len)); next_n_pos = request.find('\n', pos); pos = next_n_pos + 1; }
LOG_INFO << "Request: "; for (const auto &arg : args) { LOG_INFO << arg << " "; } LOG_INFO << '\n';
switch (string2Ops(args[0])) { return save_handler(redis); case OPS::SET: return set_handler(args, redis); case OPS::GET: return get_handler(args, redis); case OPS::DEL: return del_handler(args, redis); case OPS::INCR: return incr_handler(args, redis); case OPS::DECR: return decr_handler(args, redis); case OPS::EXPIRE: return expire_handler(args, redis); case OPS::TTL: return ttl_handler(args, redis); case OPS::HSET: } }
TcpServer server_; RedisWrapper redis; };
int main() { EventLoop loop; InetAddress listenAddr(6379); RedisServer server(&loop, listenAddr);
server.start(); loop.loop(); }
|
这里的handleRequest就是处理Resp协议的函数, 其目前只支持一些基础指令的解析, 这里的解析也是比较繁琐的, 不好说清楚, 有兴趣直接单步调试看看吧, 这里的RedisWrapper就是我们之前实现的Redis的封装, 这里的RedisWrapper就是我们之前实现的LSM Tree Engine的封装。这里重点介绍muduo的使用:
muduo 是一个基于 Reactor 模式的 C++ 网络库,由陈硕开发,广泛应用于高性能网络编程。它提供了简单易用的接口,能够帮助开发者快速构建高性能的 TCP 服务器。以下是如何使用 muduo 的基本步骤,基于你提供的代码进行说明。
引入 muduo 头文件
1 2 3 4 5
| #include <muduo/base/Logging.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <muduo/net/TcpConnection.h> #include <muduo/net/TcpServer.h>
|
这些头文件分别提供了日志、事件循环、网络地址、TCP 连接和 TCP 服务器的功能。
创建 EventLoop
EventLoop 是 muduo 的核心组件,负责事件循环和事件分发。每个 muduo 服务器都需要一个 EventLoop 对象。
loop.loop() 会启动事件循环,等待事件的发生。
定义服务器地址
使用 InetAddress 来指定服务器的监听地址和端口。例如,监听本地 6379 端口:
1
| InetAddress listenAddr(6379);
|
创建 TcpServer
TcpServer 是 muduo 提供的 TCP 服务器类。需要传入 EventLoop 和监听地址来初始化它:
1
| TcpServer server(&loop, listenAddr, "RedisServer");
|
- 第一个参数是
EventLoop 对象。
- 第二个参数是
InetAddress,表示服务器的监听地址。
- 第三个参数是服务器的名称(可选)。
这里的TcpServer放入了RedisServer类中,并在类的构造函数中初始化。
设置回调函数
muduo 是一个事件驱动的网络库,通过回调函数处理连接和消息事件。你需要为 TcpServer 设置以下回调函数:
连接回调
当有客户端连接或断开时,会触发连接回调函数:
1 2
| server.setConnectionCallback( std::bind(&RedisServer::onConnection, this, std::placeholders::_1));
|
onConnection 是一个成员函数,用于处理连接事件。
std::placeholders::_1 表示回调函数的第一个参数(TcpConnectionPtr)。
消息回调
当接收到客户端发送的数据时,会触发消息回调函数:
1 2 3
| server.setMessageCallback( std::bind(&RedisServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
|
onMessage 是一个成员函数,用于处理接收到的消息。
std::placeholders::_1 是 TcpConnectionPtr,表示连接对象。
std::placeholders::_2 是 Buffer*,表示接收到的数据缓冲区。
std::placeholders::_3 是 Timestamp,表示消息到达的时间。
实现回调函数
需要实现 onConnection 和 onMessage 回调函数。
(1) onConnection处理连接事件:
1 2 3 4 5 6 7
| void onConnection(const TcpConnectionPtr &conn) { if (conn->connected()) { LOG_INFO << "Connection from " << conn->peerAddress().toIpPort(); } else { LOG_INFO << "Connection closed from " << conn->peerAddress().toIpPort(); } }
|
conn->connected() 判断连接是建立还是断开。
conn->peerAddress().toIpPort() 获取客户端的 IP 和端口。
(2) onMessage处理接收到的消息:
1 2 3 4 5 6 7 8
| void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg(buf->retrieveAllAsString()); LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;
std::string response = handleRequest(msg); conn->send(response); }
|
buf->retrieveAllAsString() 从缓冲区中提取所有数据并转换为字符串。
conn->send(response) 将响应发送给客户端。
启动服务器
调用 TcpServer::start() 启动服务器,然后进入事件循环:
1 2
| server.start(); loop.loop();
|
日志功能
muduo 提供了内置的日志功能,可以通过 LOG_INFO、LOG_ERROR 等宏记录日志:
1
| LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;
|
5 小节
这章引入了一个简单的Redis服务器和基于Redis对LSM Tree的封装,Redis服务器和使用muduo库来处理TCP连接和消息; 封装的Redis_wrapper实现了一些基本的Redis命令,如GET、SET、DEL等。由于这一章还介绍了Resp和muduo, Redis命令的实现只包括了简单字符串和TTL, 后续章节将视线更复杂的set, hash, list, zset 等命令。