tAdd code - sraft - simple raft implementation HTML git clone https://git.parazyd.org/sraft DIR Log DIR Files DIR Refs DIR README --- DIR commit db0dad77964a9efc1f9301f33aa185a4857dc216 HTML Author: parazyd <parazyd@dyne.org> Date: Thu, 31 Mar 2022 02:00:00 +0200 Add code Diffstat: A .gitignore | 2 ++ A Cargo.toml | 20 ++++++++++++++++++++ A README.md | 14 ++++++++++++++ A examples/peer.rs | 55 +++++++++++++++++++++++++++++++ A rustfmt.toml | 9 +++++++++ A src/lib.rs | 463 +++++++++++++++++++++++++++++++ A src/method.rs | 40 +++++++++++++++++++++++++++++++ 7 files changed, 603 insertions(+), 0 deletions(-) --- DIR diff --git a/.gitignore b/.gitignore t@@ -0,0 +1,2 @@ +/target +Cargo.lock DIR diff --git a/Cargo.toml b/Cargo.toml t@@ -0,0 +1,20 @@ +[package] +name = "sraft" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-channel = "1.6.1" +async-std = {version = "1.11.0", features = ["attributes"]} +borsh = "0.9.3" +futures = "0.3.21" +lazy_static = "1.4.0" +log = "0.4.16" +rand = "0.8.5" + +[dev-dependencies] +async-executor = "1.4.1" +clap = {version = "3.1.6", features = ["derive"]} +easy-parallel = "3.2.0" +simplelog = "0.12.0-alpha1" +smol = "1.2.5" DIR diff --git a/README.md b/README.md t@@ -0,0 +1,14 @@ +sraft +===== + +Simple Raft consensus implementation. + +``` +$ cargo build --example peer +$ ./target/debug/examples/peer -p 127.0.0.1:13002 -p 127.0.0.1:13003 -i 1 -l 127.0.0.1:13001 +$ ./target/debug/examples/peer -p 127.0.0.1:13001 -p 127.0.0.1:13001 -i 2 -l 127.0.0.1:13002 +$ ./target/debug/examples/peer -p 127.0.0.1:13001 -p 127.0.0.1:13002 -i 3 -l 127.0.0.1:13003 +``` + +Try stopping and starting certain nodes to see how new leaders are +elected. DIR diff --git a/examples/peer.rs b/examples/peer.rs t@@ -0,0 +1,55 @@ +use std::net::SocketAddr; + +use async_executor::Executor; +use async_std::sync::Arc; +use clap::Parser; +use easy_parallel::Parallel; +use simplelog::{ColorChoice, Config, LevelFilter, TermLogger, TerminalMode}; + +use sraft::{Raft, RaftRpc}; + +#[derive(Parser)] +struct Args { + #[clap(long, short)] + peer: Vec<SocketAddr>, + + #[clap(long, short)] + id: u64, + + #[clap(long, short)] + listen: SocketAddr, +} + +#[async_std::main] +async fn main() { + let args = Args::parse(); + + TermLogger::init(LevelFilter::Debug, Config::default(), TerminalMode::Mixed, ColorChoice::Auto) + .unwrap(); + + let mut raft = Raft::new(args.id); + for (k, v) in args.peer.iter().enumerate() { + raft.peers.insert(k as u64, *v); + } + + let raft_rpc = RaftRpc(args.listen); + + let ex = Arc::new(Executor::new()); + let (_signal, shutdown) = async_channel::unbounded::<()>(); + + Parallel::new() + .each(0..4, |_| smol::future::block_on(ex.run(shutdown.recv()))) + // + .add(|| { + smol::future::block_on(async move { + raft_rpc.start().await; + }); + Ok(()) + }) + // + .finish(|| { + smol::future::block_on(async move { + raft.start().await; + }) + }); +} DIR diff --git a/rustfmt.toml b/rustfmt.toml t@@ -0,0 +1,9 @@ +reorder_imports = true +imports_granularity = "Crate" +use_small_heuristics = "Max" +comment_width = 100 +wrap_comments = false +binop_separator = "Back" +trailing_comma = "Vertical" +trailing_semicolon = false +use_field_init_shorthand = true DIR diff --git a/src/lib.rs b/src/lib.rs t@@ -0,0 +1,463 @@ +use std::{collections::HashMap, io, net::SocketAddr, time::Duration}; + +use async_channel::{Receiver, Sender}; +use async_std::{ + io::{ReadExt, WriteExt}, + net::{TcpListener, TcpStream}, + stream::StreamExt, + sync::Mutex, + task, +}; +use borsh::{BorshDeserialize, BorshSerialize}; +use futures::{select, FutureExt}; +use lazy_static::lazy_static; +use log::{debug, error}; +use rand::Rng; + +mod method; +use crate::method::{HeartbeatArgs, HeartbeatReply, RaftMethod, VoteArgs, VoteReply}; + +#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)] +pub struct LogEntry { + log_term: u64, + log_index: u64, + log_data: Vec<u8>, +} + +pub struct LogStore(pub Vec<LogEntry>); + +impl LogStore { + fn get_last_index(&self) -> u64 { + let rlen = self.0.len(); + if rlen == 0 { + return 0 + } + + self.0[rlen - 1].log_index + } +} + +lazy_static! { + pub static ref LOG_STORE: Mutex<LogStore> = Mutex::new(LogStore(vec![])); + // This is used for heartbeats + pub static ref HEARTBEAT_CHAN: (Sender<bool>, Receiver<bool>) = async_channel::unbounded(); + // This is used to let our node know when it has become a leader + pub static ref TOLEADER_CHAN: (Sender<bool>, Receiver<bool>) = async_channel::unbounded(); + + pub static ref STATE: Mutex<State> = Mutex::new(State::new()); +} + +#[derive(Default)] +pub struct State { + pub current_term: u64, + pub voted_for: u64, + pub vote_count: u64, + + pub commit_index: u64, + pub _last_applied: u64, + + pub next_index: Vec<u64>, + pub match_index: Vec<u64>, +} + +impl State { + pub fn new() -> Self { + Self { + current_term: 0, + voted_for: 0, + vote_count: 0, + commit_index: 0, + _last_applied: 0, + next_index: vec![], + match_index: vec![], + } + } +} + +pub enum Role { + Follower, + Candidate, + Leader, +} + +pub struct Raft { + pub peers: HashMap<u64, SocketAddr>, + node_id: u64, + role: Role, +} + +impl Raft { + pub fn new(node_id: u64) -> Self { + Self { peers: Default::default(), node_id, role: Role::Follower } + } + + pub async fn start(&mut self) { + debug!("Raft::start()"); + self.role = Role::Follower; + + let mut state = STATE.lock().await; + state.current_term = 0; + state.voted_for = 0; + drop(state); + + let mut rng = rand::thread_rng(); + + loop { + let delay = Duration::from_millis(rng.gen_range(0..200) + 300); + + match self.role { + Role::Follower => { + select! { + _ = HEARTBEAT_CHAN.1.recv().fuse() => { + debug!("[FOLLOWER] Raft::start(): follower_{} got heartbeat", self.node_id); + } + _ = task::sleep(delay).fuse() => { + debug!("[FOLLOWER] Raft::start(): follower_{} timeout", self.node_id); + self.role = Role::Candidate; + } + } + } + + Role::Candidate => { + debug!("[CANDIDATE] Raft::start(): peer_{} is now a candidate", self.node_id); + let mut state = STATE.lock().await; + state.current_term += 1; + state.voted_for = self.node_id; + state.vote_count = 1; + drop(state); + + // TODO: In background + debug!("[CANDIDATE] Raft::start(): broadcasting request_vote"); + self.broadcast_request_vote().await; + + select! { + _ = task::sleep(delay).fuse() => { + debug!("[CANDIDATE] Raft::start(): Timeout as candidate, becoming a follower"); + self.role = Role::Follower; + } + _ = TOLEADER_CHAN.1.recv().fuse() => { + debug!("[CANDIDATE] Raft::start(): We are now the leader"); + self.role = Role::Leader; + + let mut state = STATE.lock().await; + state.next_index = vec![1_u64; self.peers.len()]; + state.match_index = vec![0_u64; self.peers.len()]; + drop(state); + + // TODO: In background + let t = task::spawn(async { + let mut i = 0; + loop { + debug!("[CANDIDATE] Raft::start(): Appending data in bg loop"); + i += 1; + let state = STATE.lock().await; + let logentry = LogEntry { + log_term: state.current_term, + log_index: i, + log_data: format!("user send: {}", i).as_bytes().to_vec(), + }; + drop(state); + + debug!("[CANDIDATE] Raft::start(): Acquiring logstore lock in bg loop"); + let mut logstore = LOG_STORE.lock().await; + logstore.0.push(logentry); + drop(logstore); + debug!("[CANDIDATE] Raft::start(): Dropped logstore lock in bg loop"); + task::sleep(Duration::from_secs(3)).await; + } + }); + } + } + } + + Role::Leader => { + debug!("[LEADER] Raft::start(): Broadcasting heartbeat as leader"); + self.broadcast_heartbeat().await; + task::sleep(Duration::from_millis(100)).await; + } + } + } + } + + async fn broadcast_request_vote(&mut self) { + debug!("Raft::broadcast_request_vote()"); + let state = STATE.lock().await; + let args = VoteArgs { term: state.current_term, candidate_id: self.node_id }; + drop(state); + + // TODO: Do this concurrently + for i in self.peers.clone() { + debug!("Raft::broadcast_request_vote(): Sending req to peer {}", i.1); + match self.send_request_vote(i.0, args.clone()).await { + Ok(v) => debug!("Raft::broadcast_request_vote(): Got reply: {:?}", v), + Err(e) => { + error!("Raft::broadcast_request_vote(): Failed vote to peer {}, ({})", i.1, e); + continue + } + }; + } + } + + async fn send_request_vote( + &mut self, + node_id: u64, + args: VoteArgs, + ) -> Result<VoteReply, io::Error> { + debug!("Raft::send_request_vote()"); + let addr = self.peers[&node_id]; + + let method = RaftMethod::Vote(args); + let payload = method.try_to_vec().unwrap(); + + debug!("Raft::send_request_vote(): Connecting to peer_{}", node_id); + let mut stream = TcpStream::connect(addr).await?; + debug!("Raft::send_request_vote(): Writing to stream"); + stream.write_all(&payload).await?; + debug!("Raft::send_request_vote(): Wrote to stream"); + + debug!("Raft::send_request_vote(): Reading from stream"); + let mut buf = vec![0_u8; 4096]; + stream.read(&mut buf).await?; + debug!("Raft::send_request_vote(): Read from stream"); + + let reply = try_from_slice_unchecked::<VoteReply>(&buf)?; + let mut state = STATE.lock().await; + if reply.term > state.current_term { + debug!("Raft::send_request_vote(): reply.term > state.current_term"); + state.current_term = reply.term; + state.voted_for = 0; + drop(state); + self.role = Role::Follower; + return Ok(reply) + } + drop(state); + + if reply.vote_granted { + debug!("Raft::send_request_vote(): reply.vote_granted == true"); + let mut state = STATE.lock().await; + state.vote_count += 1; + drop(state); + } + + let state = STATE.lock().await; + if state.vote_count >= (self.peers.len() / 2 + 1).try_into().unwrap() { + debug!("Raft::send_request_vote(): Elected for leader"); + TOLEADER_CHAN.0.send(true).await.unwrap(); + } + drop(state); + + Ok(reply) + } + + async fn broadcast_heartbeat(&mut self) { + debug!("[LEADER] Raft::broadcast_heartbeat()"); + + for i in self.peers.clone() { + let state = STATE.lock().await; + let mut args = HeartbeatArgs { + term: state.current_term, + leader_id: self.node_id, + prev_log_index: 0, + prev_log_term: 0, + entries: vec![], + leader_commit: state.commit_index, + }; + + let prev_log_index = state.next_index[i.0 as usize] - 1; + drop(state); + + debug!("[LEADER] Raft::broadcast_heartbeat(): Acquiring lock on LOG_STORE"); + let logstore = LOG_STORE.lock().await; + if logstore.get_last_index() > prev_log_index { + args.prev_log_index = prev_log_index; + args.prev_log_term = logstore.0[prev_log_index as usize].log_term; + args.entries = logstore.0[prev_log_index as usize..].to_vec(); + drop(logstore); + debug!("[LEADER] Raft::broadcast_heartbeat(): Dropped lock on LOG_STORE"); + debug!("[LEADER] Raft::broadcast_heartbeat(): Send entries: {:?}", args.entries); + } + + // TODO: Run in background + match self.send_heartbeat(i.0, args).await { + Ok(v) => debug!("[LEADER] Raft::broadcast_heartbeat(): Got reply: {:?}", v), + Err(e) => { + error!( + "[LEADER] Raft::broadcast_heartbeat(): Failed heartbeat to peer_{} ({})", + i.0, e + ); + continue + } + }; + } + } + + async fn send_heartbeat( + &mut self, + node_id: u64, + args: HeartbeatArgs, + ) -> Result<HeartbeatReply, io::Error> { + debug!("Raft::send_heartbeat({}, {:?}", node_id, args); + let addr = self.peers[&node_id]; + + let method = RaftMethod::Heartbeat(args); + let payload = method.try_to_vec()?; + + debug!("Raft::send_heartbeat(): Connecting to peer_{}", node_id); + let mut stream = TcpStream::connect(addr).await?; + debug!("Raft::send_heartbeat(): Writing to stream"); + stream.write_all(&payload).await?; + debug!("Raft::send_heartbeat(): Wrote to stream"); + + debug!("Raft::send_heartbeat(): Reading from stream"); + let mut buf = vec![0_u8; 4096]; + stream.read(&mut buf).await?; + debug!("Raft::send_heartbeat(): Read from stream"); + + let reply = try_from_slice_unchecked::<HeartbeatReply>(&buf)?; + + let mut state = STATE.lock().await; + if reply.success { + debug!("Raft::send_heartbeat(): Got success reply"); + if reply.next_index > 0 { + state.next_index[node_id as usize] = reply.next_index; + state.match_index[node_id as usize] = reply.next_index - 1; + } + } else if reply.term > state.current_term { + debug!("Raft::send_heartbeat(): reply.term > state.current_term"); + state.current_term = reply.term; + state.voted_for = 0; + self.role = Role::Follower; + } + drop(state); + + Ok(reply) + } +} + +pub struct RaftRpc(pub SocketAddr); + +impl RaftRpc { + pub async fn start(&self) { + debug!("RaftRpc::start()"); + + debug!("RaftRpc::start(): Binding to {}", self.0); + let listener = TcpListener::bind(self.0).await.unwrap(); + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + debug!("RaftRpc::start(): Got RPC request"); + let stream = stream.unwrap(); + let (reader, writer) = &mut (&stream, &stream); + + debug!("RaftRpc::start(): Reading from reader..."); + let mut buf = vec![0_u8; 4096]; + reader.read(&mut buf).await.unwrap(); + debug!("RaftRpc::start(): Read from reader"); + + match try_from_slice_unchecked::<RaftMethod>(&buf).unwrap() { + RaftMethod::Vote(args) => { + debug!("RaftRpc::start(): Got RaftMethod::Vote"); + let reply = self.request_vote(args).await; + let payload = reply.try_to_vec().unwrap(); + + debug!("RaftRpc::start(): Vote: Writing to writer..."); + writer.write_all(&payload).await.unwrap(); + debug!("RaftRpc::start(): Vote: Wrote to writer"); + } + + RaftMethod::Heartbeat(args) => { + debug!("RaftRpc::start(): Got RaftMethod::Heartbeat"); + let reply = self.heartbeat(args).await; + let payload = reply.try_to_vec().unwrap(); + + debug!("RaftRpc::start(): Heartbeat: Writing to writer..."); + writer.write_all(&payload).await.unwrap(); + debug!("RaftRpc::start(): Heartbeat: Wrote to writer"); + } + } + } + } + + async fn request_vote(&self, args: VoteArgs) -> VoteReply { + debug!("RaftRpc::request_vote()"); + let mut reply = VoteReply { term: 0, vote_granted: false }; + + debug!("RaftRpc::request_vote(): Acquiring state lock"); + let mut state = STATE.lock().await; + debug!("RaftRpc::request_vote(): Got lock"); + + if args.term < state.current_term { + reply.term = state.current_term; + drop(state); + reply.vote_granted = false; + return reply + } + + if state.voted_for == 0 { + state.current_term = args.term; + state.voted_for = args.candidate_id; + drop(state); + reply.term = args.term; + reply.vote_granted = true; + return reply + } + + drop(state); + reply + } + + async fn heartbeat(&self, args: HeartbeatArgs) -> HeartbeatReply { + debug!("RaftRpc::heartbeat()"); + let mut reply = HeartbeatReply { success: false, term: 0, next_index: 0 }; + + debug!("RaftRpc::heartbeat(): Acquiring state lock"); + let state = STATE.lock().await; + debug!("RaftRpc::heartbeat(): Got state lock"); + let current_term = state.current_term; + drop(state); + debug!("RaftRpc::heartbeat(): Dropped state lock"); + + if args.term < current_term { + reply.success = false; + reply.term = current_term; + return reply + } + + debug!("RaftRpc::heartbeat(): Sending to channel"); + HEARTBEAT_CHAN.0.send(true).await.unwrap(); + debug!("RaftRpc::heartbeat(): Sent to channel"); + + if args.entries.is_empty() { + reply.success = true; + reply.term = current_term; + return reply + } + + debug!("RaftRpc::heartbeat(): Acquiring logstore lock"); + let mut logstore = LOG_STORE.lock().await; + debug!("RaftRpc::heartbeat(): Got logstore lock"); + if args.prev_log_index > logstore.get_last_index() { + reply.success = false; + reply.term = current_term; + reply.next_index = logstore.get_last_index() + 1; + drop(logstore); + return reply + } + + logstore.0.extend_from_slice(&args.entries); + reply.next_index = logstore.get_last_index() + 1; + drop(logstore); + debug!("RaftRpc::heartbeat(): Dropped logstore lock"); + + reply.success = true; + reply.term = current_term; + + reply + } +} + +fn try_from_slice_unchecked<T: BorshDeserialize>(data: &[u8]) -> Result<T, io::Error> { + let mut data_mut = data; + let result = T::deserialize(&mut data_mut)?; + Ok(result) +} DIR diff --git a/src/method.rs b/src/method.rs t@@ -0,0 +1,40 @@ +use borsh::{BorshDeserialize, BorshSerialize}; + +use crate::LogEntry; + +#[derive(BorshSerialize, BorshDeserialize, Debug)] +pub enum RaftMethod { + Vote(VoteArgs), + Heartbeat(HeartbeatArgs), +} + +#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)] +pub struct VoteArgs { + pub term: u64, + pub candidate_id: u64, +} + +#[derive(BorshSerialize, BorshDeserialize, Debug)] +pub struct VoteReply { + pub term: u64, + pub vote_granted: bool, +} + +#[derive(BorshSerialize, BorshDeserialize, Debug)] +pub struct HeartbeatArgs { + pub term: u64, + pub leader_id: u64, + + pub prev_log_index: u64, + pub prev_log_term: u64, + + pub entries: Vec<LogEntry>, + pub leader_commit: u64, +} + +#[derive(BorshSerialize, BorshDeserialize, Debug)] +pub struct HeartbeatReply { + pub success: bool, + pub term: u64, + pub next_index: u64, +}