在上一节中,我们介绍了 CURP 的核心理论。这一节将讨论如何基于现有的 Raft 实现扩展为 CURP。我们会分析现有代码结构,对比 Raft 和 CURP 的差异,并制定详细的实现计划。
1 为什么要基于 Raft 扩展?
在开始实现之前,一个重要的问题是:为什么不从头开始实现 CURP,而是基于 Raft 扩展?
1.1 复用已有成果
我们已经实现了一个功能完整的 Raft 共识协议,包含:
- 完整的选举机制
- 日志复制和提交
- 快照和日志压缩
- 成员变更支持
这些组件在 CURP 中同样需要,直接复用可以:
1.2 渐进式开发
基于 Raft 扩展允许我们:
- 先实现一个最小可用的 CURP 版本
- 逐步添加优化和高级特性
- 每个阶段都可以验证正确性
1.3 对比学习
通过对比 Raft 和 CURP 的实现,可以更深入理解:
- 两者的设计权衡
- CURP 优化的本质
- 分布式系统的核心挑战
2 现有代码分析
让我们先了解现有 Raft 实现的结构,这是扩展的基础。
2.1 项目结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| tiny-distributed-kv/ ├── proto/ # Protocol Buffer 定义 │ ├── raft.proto # Raft RPC 定义 │ └── node.proto # 节点间通信定义 │ ├── include/ # 头文件 │ ├── raft/ │ │ └── raft.h # Raft 核心状态机 │ ├── storage/ │ │ └── log_vec.h # 日志存储 │ ├── utils/ │ │ └── timer.h # 定时器 │ └── grpc/ │ └── grpc_server.h # gRPC 服务框架 │ ├── src/ # 源文件 │ ├── raft/ │ │ ├── raft.cpp # Raft 状态机实现 │ │ └── raft_rpc.cpp # Raft RPC 服务实现 │ ├── storage/ │ │ └── log_vec.cpp # 日志存储实现 │ ├── utils/ │ │ └── timer.cpp # 定时器实现 │ └── grpc/ │ └── grpc_server.cpp # gRPC 服务实现 │ └── test/ # 测试文件 └── dtest_raft.cpp # Raft 测试
|
2.2 核心组件分析
RaftNode 状态机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| class RaftNode { public: enum class RaftState { LEADER, FOLLOWER, CANDIDATE }; private: RaftState role_; int current_term_; int voted_for_; LogVec log_; std::vector<uint64_t> next_index_; std::vector<uint64_t> match_index_; uint64_t commit_index_; uint64_t last_applied_; std::unique_ptr<Timer> election_timer_; std::unique_ptr<Timer> heartbeat_timer_; public: void Start(); void Elect(); void SendHeartBeats(); void AppendEntries(); void RequestVote(); private: void applyCommittedEntries(); void becomeLeader(); void becomeFollower(); };
|
关键观察:
commit_index_ 跟踪已提交的日志索引
last_applied_ 跟踪已应用到状态机的日志索引
- Leader 维护
next_index_ 和 match_index_ 用于日志复制
日志存储 LogVec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class LogVec { public: struct Entry { uint64_t term; uint64_t index; std::vector<uint8_t> command; }; private: std::vector<Entry> entries_; std::string log_dir_; public: void Append(const Entry& entry); Entry Get(uint64_t index); void Truncate(uint64_t from_index); void Persist(); uint64_t LastLogIndex(); uint64_t LastLogTerm(); };
|
关键特性:
- 支持追加、随机访问、截断
- 持久化到磁盘,崩溃后可恢复
- 提供日志索引和任期查询
RPC 服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| service RaftService { rpc RequestVote(RequestVoteRequest) returns (RequestVoteReply); rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesReply); rpc InstallSnapshot(InstallSnapshotRequest) returns (InstallSnapshotReply); }
class RaftServiceImpl final : public RaftService::Service { private: std::shared_ptr<RaftNode> raft_node_; public: grpc::Status RequestVote( grpc::ServerContext* context, const RequestVoteRequest* request, RequestVoteReply* reply) override; grpc::Status AppendEntries( grpc::ServerContext* context, const AppendEntriesRequest* request, AppendEntriesReply* reply) override; };
|
2.3 Raft 写操作流程回顾
让我们回顾 Raft 的写操作流程,这对理解 CURP 的扩展至关重要:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 时间线(Raft 写操作): ──────────────────────────────────────────────────────────────→
Client Leader Follower 1 Follower 2 │ │ │ │ │─── 1. 写请求 ───────→│ │ │ │ │ │ │ │ │─── 2. AppendEntries ─────→│ │ │ │─────────────────────────→│ │ │ │ │ │ │ │←── 3. ACK ───────────────│ │ │ │←─────────────────────────│ │ │ │ │ │ │ │─── 4. 更新 commitIndex ─→│ (多数派确认) │ │ │ │ │ │ │─── 5. 应用到状态机 ─────→│ │ │ │ │ │ │←─── 6. 响应 ─────────│ │ │
|
关键点:
- 步骤 2-3 需要等待多数派确认(至少 1 RTT)
- 步骤 4 需要等待 Leader 自己的 AppendEntries 返回
- 步骤 6 响应客户端,总延迟 ≈ 2 RTT
3 Raft vs CURP 对比
现在让我们详细对比 Raft 和 CURP 的差异,理解需要做哪些修改。
3.1 架构对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| Raft 架构(容错 f=1): ───────────────────────────────────── ┌────────┐ │ Client │ └────────┘ ↓ ┌────────┐ ┌──────────┐ ┌──────────┐ │ Leader │←───→│ Follower │←───→│ Follower │ └────────┘ └──────────┘ └──────────┘ ↑ ↑ ↑ └──────────────┴────────────────┘ 同步复制(2 RTT)
CURP 架构(容错 f=1): ───────────────────────────────────── ┌────────┐ │ Client │ └────────┘ ↓ ↓ ↓ └──────→┌──────────┐ ↓ │ Witness │ ← 轻量级,只存请求 ↓ └──────────┘ ↓ ┌────────┐ ┌──────────┐ │ Master │────→│ Backup │ ← 完整数据副本 └────────┘ └──────────┘ ↑ ↑ └────────────────┘ 异步复制(不阻塞客户端)
|
3.2 角色对比
| Raft 角色 |
CURP 角色 |
主要变化 |
| Leader |
Master |
增加推测执行、管理未同步操作 |
| Follower |
Backup |
基本不变,接收异步同步 |
| - |
Witness |
新增,轻量级持久化存储 |
| Candidate |
- |
选举机制可复用 |
3.3 写路径对比
Raft 写路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Status RaftNode::Propose(const Command& cmd) { uint64_t index = log_.Append(cmd); for (auto& follower : followers_) { SendAppendEntries(follower, index); } while (!hasMajorityAck(index)) { } commit_index_ = index; Apply(cmd); return Status::OK; }
|
关键问题:步骤 3 是阻塞的,必须等待多数派确认。
CURP 写路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| Status CurpMaster::Propose(const Command& cmd) { if (!canSpeculativeExecute(cmd.key)) { return slowPathWrite(cmd); } unsynced_keys_.insert(cmd.key); Apply(cmd); return Status::OK; }
void CurpMaster::asyncSyncThread() { while (running_) { std::this_thread::sleep_for(kSyncInterval); auto to_sync = getUnsyncedOperations(); for (auto& backup : backups_) { SendSync(backup, to_sync); } onSyncComplete(to_sync); } }
|
关键区别:不等待 Backup,立即响应客户端。
4 实现计划详解
我们分四个阶段实现 CURP,每个阶段都是可验证的。
Phase 1: Witness 实现
Witness 是 CURP 的核心新组件,优先级最高。
4.1.1 Witness 的职责
Witness 需要实现以下功能:
| 功能 |
说明 |
复杂度 |
| record |
接收并存储客户端请求 |
核心 |
| 可交换性检查 |
拒绝冲突请求 |
核心 |
| gc |
清理已同步的请求 |
中等 |
| getRecoveryData |
恢复时返回所有请求 |
简单 |
| 持久化 |
崩溃后数据不丢失 |
可选(教学版) |
4.1.2 数据结构设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| #pragma once
#include <string> #include <vector> #include <unordered_map> #include <mutex> #include <chrono>
struct WitnessRecord { uint64_t rpc_id; std::string key; std::vector<uint8_t> request_data; std::chrono::steady_clock::time_point timestamp; };
class Witness { public: Witness(size_t max_records = 10000); enum class RecordResult { ACCEPTED, REJECTED_NOT_COMMUTATIVE, REJECTED_NO_SPACE, REJECTED_NOT_ACCEPTING }; RecordResult record( const std::string& master_id, const std::string& key, uint64_t rpc_id, const std::vector<uint8_t>& request_data ); void garbage_collect( const std::vector<std::pair<std::string, uint64_t>>& synced ); std::vector<WitnessRecord> get_recovery_data(); void stop_accepting(); void reset(); bool is_accepting() const { return accepting_; } size_t record_count() const { return records_.size(); } private: std::mutex mtx_; bool accepting_ = true; size_t max_records_; std::unordered_map<std::string, WitnessRecord> records_; bool hasConflict(const std::string& key) const; };
|
4.1.3 核心实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #include "curp/witness.h"
Witness::RecordResult Witness::record( const std::string& master_id, const std::string& key, uint64_t rpc_id, const std::vector<uint8_t>& request_data ) { std::lock_guard<std::mutex> lock(mtx_); if (!accepting_) { return RecordResult::REJECTED_NOT_ACCEPTING; } if (records_.find(key) != records_.end()) { return RecordResult::REJECTED_NOT_COMMUTATIVE; } if (records_.size() >= max_records_) { return RecordResult::REJECTED_NO_SPACE; } records_[key] = WitnessRecord{ .rpc_id = rpc_id, .key = key, .request_data = request_data, .timestamp = std::chrono::steady_clock::now() }; return RecordResult::ACCEPTED; }
void Witness::garbage_collect( const std::vector<std::pair<std::string, uint64_t>>& synced ) { std::lock_guard<std::mutex> lock(mtx_); for (const auto& [key, rpc_id] : synced) { auto it = records_.find(key); if (it != records_.end() && it->second.rpc_id == rpc_id) { records_.erase(it); } } }
|
4.1.4 RPC 定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| syntax = "proto3";
package witness;
service WitnessService { rpc Record(RecordRequest) returns (RecordReply); rpc GarbageCollect(GCRequest) returns (GCReply); rpc GetRecoveryData(RecoveryRequest) returns (RecoveryData); rpc Stop(StopRequest) returns (StopReply); rpc Reset(ResetRequest) returns (ResetReply); }
message RecordRequest { string master_id = 1; string key = 2; uint64 rpc_id = 3; bytes request_data = 4; }
message RecordReply { bool accepted = 1; enum RejectReason { UNKNOWN = 0; NOT_COMMUTATIVE = 1; NO_SPACE = 2; NOT_ACCEPTING = 3; } RejectReason reason = 2; }
message GCRequest { repeated SyncedEntry entries = 1; }
message SyncedEntry { string key = 1; uint64 rpc_id = 2; }
message RecoveryRequest { string master_id = 1; }
message RecoveryData { repeated WitnessRecord records = 1; }
message WitnessRecord { string key = 1; uint64 rpc_id = 2; bytes request_data = 3; }
|
Phase 2: CurpMaster 实现
CurpMaster 是 CURP 的核心,负责推测执行和异步同步。
4.2.1 继承 RaftNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| #pragma once
#include "../raft/raft.h" #include "witness.h" #include <unordered_set> #include <atomic>
class CurpMaster : public RaftNode { public: static std::shared_ptr<CurpMaster> Create( const std::vector<NodeConfig>& cluster_configs, const std::vector<NodeConfig>& witness_configs, const std::string& log_dir, uint64_t cur_node_id ); enum class WriteResult { SUCCESS_FAST, SUCCESS_SLOW, FAILED }; WriteResult proposeUpdate( const std::string& key, const std::vector<uint8_t>& value ); std::optional<std::vector<uint8_t>> read(const std::string& key); bool hasUnsyncedKey(const std::string& key) const; private: CurpMaster( const std::vector<NodeConfig>& cluster_configs, const std::vector<NodeConfig>& witness_configs, const std::string& log_dir, uint64_t cur_node_id ); std::vector<NodeConfig> witness_configs_; std::vector<std::unique_ptr<WitnessServiceStub>> witness_stubs_; std::unordered_set<std::string> unsynced_keys_; std::unordered_map<std::string, uint64_t> unsynced_rpc_ids_; mutable std::mutex unsynced_mtx_; std::thread sync_thread_; std::atomic<bool> running_{true}; std::condition_variable sync_cv_; bool canSpeculativeExecute(const std::string& key); void markUnsynced(const std::string& key, uint64_t rpc_id); void removeFromUnsynced(const std::string& key); void asyncSyncThread(); void syncToBackup(); void garbageCollectWitnesses(); WriteResult slowPathWrite( const std::string& key, const std::vector<uint8_t>& value ); };
|
4.2.2 写操作实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| #include "curp/curp_master.h"
CurpMaster::WriteResult CurpMaster::proposeUpdate( const std::string& key, const std::vector<uint8_t>& value ) { if (!canSpeculativeExecute(key)) { return slowPathWrite(key, value); } uint64_t rpc_id = nextRpcId(); markUnsynced(key, rpc_id); kv_store_.put(key, value); return WriteResult::SUCCESS_FAST; }
bool CurpMaster::canSpeculativeExecute(const std::string& key) { std::lock_guard<std::mutex> lock(unsynced_mtx_); return unsynced_keys_.find(key) == unsynced_keys_.end(); }
void CurpMaster::markUnsynced(const std::string& key, uint64_t rpc_id) { std::lock_guard<std::mutex> lock(unsynced_mtx_); unsynced_keys_.insert(key); unsynced_rpc_ids_[key] = rpc_id; }
CurpMaster::WriteResult CurpMaster::slowPathWrite( const std::string& key, const std::vector<uint8_t>& value ) { syncToBackup(); { std::lock_guard<std::mutex> lock(unsynced_mtx_); unsynced_keys_.clear(); unsynced_rpc_ids_.clear(); } kv_store_.put(key, value); syncToBackup(); return WriteResult::SUCCESS_SLOW; }
|
4.2.3 异步同步线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| void CurpMaster::asyncSyncThread() { while (running_) { std::unique_lock<std::mutex> lock(unsynced_mtx_); sync_cv_.wait_for(lock, std::chrono::milliseconds(10)); if (!running_) break; if (unsynced_keys_.empty()) continue; lock.unlock(); syncToBackup(); } }
void CurpMaster::syncToBackup() { std::vector<std::pair<std::string, uint64_t>> to_sync; { std::lock_guard<std::mutex> lock(unsynced_mtx_); for (const auto& key : unsynced_keys_) { to_sync.push_back({key, unsynced_rpc_ids_[key]}); } } if (to_sync.empty()) return; for (auto& backup : backups_) { SendAppendEntries(backup, to_sync); } garbageCollectWitnesses(to_sync); { std::lock_guard<std::mutex> lock(unsynced_mtx_); for (const auto& [key, _] : to_sync) { unsynced_keys_.erase(key); unsynced_rpc_ids_.erase(key); } } }
|
Phase 3: CurpClient 实现
CurpClient 负责并行发送请求到 Master 和 Witness。
4.3.1 客户端设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #pragma once
#include <string> #include <vector> #include <memory> #include <future>
struct CurpConfig { std::string master_addr; std::vector<std::string> witness_addrs; };
class CurpClient { public: CurpClient(const CurpConfig& config); ~CurpClient(); enum class WriteResult { SUCCESS, SLOW_SUCCESS, FAILED }; WriteResult write( const std::string& key, const std::vector<uint8_t>& value ); std::optional<std::vector<uint8_t>> read(const std::string& key); private: CurpConfig config_; std::atomic<uint64_t> next_rpc_id_{1}; std::unique_ptr<KvServiceStub> master_stub_; std::vector<std::unique_ptr<WitnessServiceStub>> witness_stubs_; bool recordToAllWitnesses( const std::string& key, uint64_t rpc_id, const std::vector<uint8_t>& request_data ); };
|
4.3.2 并行写实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| CurpClient::WriteResult CurpClient::write( const std::string& key, const std::vector<uint8_t>& value ) { uint64_t rpc_id = next_rpc_id_++; auto request_data = serializeRequest(key, value); auto master_future = std::async(std::launch::async, [&]() { return sendToMaster(key, value, rpc_id); }); auto witness_future = std::async(std::launch::async, [&]() { return recordToAllWitnesses(key, rpc_id, request_data); }); auto master_reply = master_future.get(); bool all_witness_accepted = witness_future.get(); if (master_reply.success) { if (all_witness_accepted && !master_reply.need_sync) { return WriteResult::SUCCESS; } else { return WriteResult::SLOW_SUCCESS; } } return WriteResult::FAILED; }
bool CurpClient::recordToAllWitnesses( const std::string& key, uint64_t rpc_id, const std::vector<uint8_t>& request_data ) { std::vector<std::future<bool>> futures; for (auto& stub : witness_stubs_) { futures.push_back(std::async(std::launch::async, [&]() { RecordRequest request; request.set_key(key); request.set_rpc_id(rpc_id); request.set_request_data(request_data.data(), request_data.size()); auto reply = stub->Record(request); return reply.accepted(); })); } for (auto& f : futures) { if (!f.get()) return false; } return true; }
|
Phase 4: 恢复机制
4.4.1 恢复流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| void CurpMaster::recover() { spdlog::info("Starting CURP recovery..."); spdlog::info("Phase 1: Recovering from Backup..."); recoverFromBackup(); spdlog::info("Phase 2: Replaying from Witnesses..."); for (auto& stub : witness_stubs_) { RecoveryRequest request; auto data = stub->GetRecoveryData(request); for (const auto& record : data.records()) { if (!alreadyExecuted(record.rpc_id())) { replayRequest(record); executed_rpc_ids_.insert(record.rpc_id()); } } break; } spdlog::info("Phase 3: Syncing to Backup..."); syncToBackup(); spdlog::info("Phase 4: Resetting Witnesses..."); for (auto& stub : witness_stubs_) { stub->Reset(ResetRequest()); } spdlog::info("CURP recovery completed."); }
|
5 文件结构规划
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| tiny-distributed-kv/ ├── proto/ │ ├── raft.proto # 已有 │ ├── node.proto # 已有 │ └── witness.proto # 新增 │ ├── include/ │ ├── raft/ # 已有,保持不变 │ ├── curp/ │ │ ├── witness.h # 新增 │ │ ├── curp_master.h # 新增 │ │ └── curp_client.h # 新增 │ └── grpc/ │ ├── grpc_server.h # 已有 │ └── witness_service_impl.h # 新增 │ ├── src/ │ ├── raft/ # 已有 │ ├── curp/ │ │ ├── witness.cpp # 新增 │ │ ├── curp_master.cpp # 新增 │ │ └── curp_client.cpp # 新增 │ └── grpc/ │ ├── grpc_server.cpp # 已有 │ └── witness_service_impl.cpp # 新增 │ └── test/ ├── dtest_curp.cpp # 新增:CURP 集成测试 └── dtest_witness.cpp # 新增:Witness 单元测试
|
6 实现顺序建议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Phase 1: Witness(预计 2-3 天) ──────────────────────────────── ├── Day 1: proto/witness.proto + Witness 类框架 ├── Day 2: Witness 核心逻辑(record, gc) └── Day 3: Witness RPC 服务 + 单元测试
Phase 2: CurpMaster(预计 3-4 天) ──────────────────────────────── ├── Day 1: CurpMaster 框架 + 推测执行 ├── Day 2: 未同步操作管理 ├── Day 3: 异步同步线程 └── Day 4: 集成测试
Phase 3: CurpClient(预计 1-2 天) ──────────────────────────────── ├── Day 1: 客户端框架 + 并行发送 └── Day 2: 错误处理 + 测试
Phase 4: 恢复机制(预计 2-3 天) ──────────────────────────────── ├── Day 1: 恢复流程框架 ├── Day 2: RIFL 去重 └── Day 3: 恢复测试
|
7 教学版简化策略
作为教学版实现,我们可以做一些简化:
| 方面 |
完整实现 |
教学简化 |
影响 |
| RIFL |
完整的去重机制 |
简单的 RPC ID 集合 |
崩溃后可能重复执行(幂等操作无影响) |
| Witness 数量 |
f 个独立 Witness |
单个 Witness |
容错能力降低,但足够演示 |
| 批量同步 |
自适应批大小 |
固定周期同步 |
性能不是最优 |
| GC |
复杂策略 |
简单超时清理 |
可能内存占用稍高 |
| Witness 持久化 |
必须持久化 |
内存存储 |
崩溃后可能丢失 |
| 配置管理 |
动态配置 |
静态配置 |
无法动态扩缩容 |
这些简化不影响正确性验证,但降低了实现复杂度。
8 下一步
本节介绍了完整的实现计划。下一节将介绍 CURP 的测试框架设计,包括:
- 如何验证 CURP 的线性一致性
- 如何模拟网络故障和节点崩溃
- 如何测量快速路径成功率