Skip to content

Instantly share code, notes, and snippets.

@Shikugawa
Last active July 30, 2023 15:26
Show Gist options
  • Save Shikugawa/11fb6b1f8588e69d9b46ac2dfdf83b9d to your computer and use it in GitHub Desktop.
Save Shikugawa/11fb6b1f8588e69d9b46ac2dfdf83b9d to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"net/rpc"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type RaftLogEntry []byte
type RaftLog struct {
commitedHeadIdx int
appliedHeadIdx int
logEntries []RaftLogEntry // TODO: max length
mu sync.Mutex
}
func NewRaftLog() *RaftLog {
return &RaftLog{}
}
func (r *RaftLog) NewLogEntries(entries []RaftLogEntry) {
r.mu.Lock()
defer r.mu.Unlock()
r.logEntries = append(r.logEntries, entries...)
}
func (r *RaftLog) ToCommit() []RaftLogEntry {
r.mu.Lock()
defer r.mu.Unlock()
if r.commitedHeadIdx >= len(r.logEntries) {
return []RaftLogEntry{}
}
entries := r.logEntries[r.commitedHeadIdx:]
r.commitedHeadIdx = len(r.logEntries)
return entries
}
func (r *RaftLog) ToApply(len int) []RaftLogEntry {
entries := r.logEntries[r.appliedHeadIdx : r.appliedHeadIdx+len]
r.appliedHeadIdx += len
return entries
}
type RaftState struct {
term int
state Role
leader string
deadline time.Time
}
func NewRaftState() *RaftState {
return &RaftState{
term: 0,
state: Follower,
}
}
func (r *RaftState) UpdateDeadline() {
timeout := time.Duration((150 + rand.Intn(150+1)) * int(time.Millisecond))
r.deadline = time.Now().Add(timeout)
}
func (r *RaftState) ToFollower() {
log.Info("became to Follower")
r.state = Follower
r.UpdateDeadline()
}
func (r *RaftState) ToCandidate() {
log.Info("became to Candidate")
r.state = Candidate
r.UpdateDeadline()
}
func (r *RaftState) ToLeader() {
log.Info("became to Leader")
r.state = Leader
}
func (r *RaftState) IsLeader(hostname string) bool {
return hostname == r.leader
}
func (r *RaftState) SetLeader(leader string) {
r.leader = leader
}
type InflightVoteRequest struct {
members []string
req *VoteRequestRequest
resp chan *VoteRequestResponse
deadline time.Time
}
func NewInflightVoteRequest(members []string, req *VoteRequestRequest,
timeout time.Duration) *InflightVoteRequest {
return &InflightVoteRequest{
members: members,
req: req,
resp: make(chan *VoteRequestResponse, len(members)),
deadline: time.Now().Add(timeout),
}
}
func (i *InflightVoteRequest) Timeouted() bool {
// TODO: support timeout
return false
}
func (i *InflightVoteRequest) Finished() bool {
return i.Timeouted() || len(i.resp) >= len(i.members)
}
func (i *InflightVoteRequest) Elected() bool {
if !i.Finished() || i.Timeouted() {
return false
}
count := 0
for j := 0; j < len(i.members); j++ {
res := <-i.resp
if res.Vote {
count += 1
}
}
return count > (len(i.members)+1)/2
}
func (i *InflightVoteRequest) Start() {
for j := range i.members {
go func(k int) {
res, err := NewRaftClient(i.members[k]).VoteRequest(i.req)
if err != nil {
log.Error(err)
return
}
i.resp <- res
}(j)
}
}
type InflightAppendEntiriesRequest struct {
resp chan *AppendEntiriesResponse
members []string
req *AppendEntiriesRequest
deadline time.Time
}
func NewInflightAppendEntiriesRequest(hostname string, members []string,
req *AppendEntiriesRequest, timeout time.Duration) *InflightAppendEntiriesRequest {
return &InflightAppendEntiriesRequest{
resp: make(chan *AppendEntiriesResponse, len(members)),
members: members,
req: req,
deadline: time.Now().Add(timeout),
}
}
func (i *InflightAppendEntiriesRequest) Timeouted() bool {
// TODO: support timeout
return false
}
func (i *InflightAppendEntiriesRequest) Finished() bool {
return i.Timeouted() || len(i.resp) >= len(i.members)
}
func (i *InflightAppendEntiriesRequest) ShouldApply() bool {
if !i.Finished() || i.Timeouted() {
return false
}
count := 0
for j := 0; j < len(i.members); j++ {
res := <-i.resp
if res.Ack {
count += 1
}
}
return count > (len(i.members)+1)/2
}
func (i *InflightAppendEntiriesRequest) Start() {
for j := range i.members {
go func(k int) {
res, err := NewRaftClient(i.members[k]).AppendEntiriesRequest(i.req)
if err != nil {
log.Error(err)
return
}
i.resp <- res
}(j)
}
}
type RaftAgent struct {
state *RaftState
log *RaftLog
storage *InMemoryStorage
hostname string
members []string
inflightVoteRequest *InflightVoteRequest
inflightAppendEntiriesRequest *InflightAppendEntiriesRequest
}
func NewRaftAgent(state *RaftState, log *RaftLog, storage *InMemoryStorage,
hostname string, members []string) *RaftAgent {
return &RaftAgent{
state: state,
log: log,
storage: storage,
hostname: hostname,
members: members,
}
}
func (r *RaftAgent) doFollowerTask() {
if time.Now().After(r.state.deadline) {
r.state.term += 1
r.state.ToCandidate()
}
}
func (r *RaftAgent) doCandidateTask() {
if r.inflightVoteRequest != nil {
if !r.inflightVoteRequest.Finished() {
return
}
if r.inflightVoteRequest.Elected() {
r.state.ToLeader()
r.state.SetLeader(r.hostname)
} else {
r.state.ToFollower()
}
r.inflightVoteRequest = nil
return
}
req := VoteRequestRequest{
Term: r.state.term,
}
r.inflightVoteRequest = NewInflightVoteRequest(r.members, &req, 300*time.Millisecond)
r.inflightVoteRequest.Start()
}
func (r *RaftAgent) doLeaderTask() {
if r.inflightAppendEntiriesRequest != nil {
if !r.inflightAppendEntiriesRequest.Finished() {
return
}
if r.inflightAppendEntiriesRequest.ShouldApply() {
entries := r.log.ToApply(len(r.inflightAppendEntiriesRequest.req.Entries))
data := [][]byte{}
for i := range entries {
data = append(data, entries[i])
}
r.storage.Apply(data)
if len(data) != 0 {
log.Info("leader state", r.storage.data)
}
}
r.inflightAppendEntiriesRequest = nil
return
}
commitEntries := r.log.ToCommit()
req := AppendEntiriesRequest{
RequestFrom: r.hostname,
Entries: commitEntries,
}
r.inflightAppendEntiriesRequest = NewInflightAppendEntiriesRequest(
r.hostname, r.members, &req, 300*time.Millisecond)
r.inflightAppendEntiriesRequest.Start()
}
func (r *RaftAgent) Start() {
r.state.UpdateDeadline()
interval := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-interval.C:
if r.state.state == Follower {
r.doFollowerTask()
} else if r.state.state == Leader {
r.doLeaderTask()
} else if r.state.state == Candidate {
r.doCandidateTask()
}
}
}
}
type VoteRequestRequest struct {
Term int
}
type VoteRequestResponse struct {
Vote bool
}
type AppendEntiriesRequest struct {
RequestFrom string
Entries []RaftLogEntry
}
type AppendEntiriesResponse struct {
Ack bool
}
type RaftService struct {
state *RaftState
log *RaftLog
storage *InMemoryStorage
mu sync.Mutex
}
func NewRaftService(state *RaftState, log *RaftLog, storage *InMemoryStorage) *RaftService {
return &RaftService{
state: state,
log: log,
storage: storage,
}
}
func (r *RaftService) VoteRequest(req *VoteRequestRequest, res *VoteRequestResponse) error {
// 2台以上のCandidateが選出され、それぞれが同時にVoteRequestを送った場合、ここが同時に実行される可能性がある。
// しかし、それはつまり2台以上のCandidateに投票してしまうことを意味する。
// 2台以上のCandidateに投票してしまうFollowerが複数いた場合、Leaderが複数選択されてしまう可能性がある。
// そこで同時実行を防ぐためにロックを取っている。
r.mu.Lock()
defer r.mu.Unlock()
if r.state.state != Follower || req.Term <= r.state.term {
res.Vote = false
return nil
}
r.state.UpdateDeadline()
r.state.term = req.Term
res.Vote = true
return nil
}
func (r *RaftService) AppendEntiries(req *AppendEntiriesRequest, res *AppendEntiriesResponse) error {
if r.state.state == Leader {
// リーダーがAppendEntiriesを受け取った時、リーダーが複数いる可能性があるので、再選挙が必要
r.state.ToCandidate()
} else if r.state.state == Candidate {
r.state.ToCandidate()
} else { // Follower
entries := r.log.ToApply(len(r.log.logEntries) - r.log.appliedHeadIdx)
data := [][]byte{}
for i := range entries {
data = append(data, entries[i])
}
r.storage.Apply(data)
if len(data) != 0 {
log.Info("follower state", r.storage.data)
}
r.log.NewLogEntries(req.Entries)
r.log.ToCommit()
res.Ack = true
r.state.SetLeader(req.RequestFrom)
r.state.UpdateDeadline()
}
return nil
}
type RaftClient struct {
target string
}
func NewRaftClient(target string) *RaftClient {
return &RaftClient{
target: target,
}
}
func (r *RaftClient) VoteRequest(req *VoteRequestRequest) (*VoteRequestResponse, error) {
client, err := rpc.DialHTTP("tcp", r.target)
if err != nil {
return nil, err
}
defer client.Close()
var voteReqRes VoteRequestResponse
err = client.Call("RaftService.VoteRequest", req, &voteReqRes)
if err != nil {
return nil, err
}
return &voteReqRes, nil
}
func (r *RaftClient) AppendEntiriesRequest(req *AppendEntiriesRequest) (*AppendEntiriesResponse, error) {
client, err := rpc.DialHTTP("tcp", r.target)
if err != nil {
return nil, err
}
defer client.Close()
var res AppendEntiriesResponse
err = client.Call("RaftService.AppendEntiries", req, &res)
if err != nil {
return nil, err
}
return &res, nil
}
type Role string
const (
Follower Role = "Follower"
Candidate Role = "Candidate"
Leader Role = "Leader"
)
func startServer(state *RaftState, log *RaftLog, storage *InMemoryStorage) error {
rpc.Register(NewRaftService(state, log, storage))
rpc.HandleHTTP()
l, err := net.Listen("tcp", ":1234")
if err != nil {
return err
}
defer l.Close()
err = http.Serve(l, nil)
if err != nil {
return err
}
return nil
}
type InMemoryStorage struct {
data [][]byte
}
func NewInMemoryStore() *InMemoryStorage {
return &InMemoryStorage{}
}
func (i *InMemoryStorage) Apply(data [][]byte) {
i.data = append(i.data, data...)
}
type HTTPService struct {
hostname string
state *RaftState
log *RaftLog
}
func (s *HTTPService) helloHandler(w http.ResponseWriter, r *http.Request) {
if !s.state.IsLeader(s.hostname) {
// TODO: redirect
} else {
val := []byte{'a'}
val2 := []byte{'b'}
s.log.NewLogEntries([]RaftLogEntry{val, val2})
}
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
var hostname string
var members string
// Define flags and their usage descriptions
flag.StringVar(&hostname, "hostname", "", "hostname")
flag.StringVar(&members, "members", "", "member list")
// Parse the command-line flags
flag.Parse()
state := NewRaftState()
log := NewRaftLog()
storage := NewInMemoryStore()
go startServer(state, log, storage)
go func() {
service := &HTTPService{
state: state,
log: log,
hostname: hostname,
}
// Define the route and handler for the server
http.HandleFunc("/", service.helloHandler)
// Start the HTTP server
port := 8080
fmt.Printf("Starting server on port %d...\n", port)
err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
if err != nil {
fmt.Println("Error starting server:", err)
}
}()
time.Sleep(3 * time.Second)
membersList := strings.Split(members, ",")
agent := NewRaftAgent(state, log, storage, hostname, membersList)
agent.Start()
// TTODO: graceful
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment