Created
May 29, 2020 07:51
-
-
Save yusufsyaifudin/f404e784daa415b91ec7fb2fd7c36b68 to your computer and use it in GitHub Desktop.
raft.FSM using BadgerDB: https://github.com/yusufsyaifudin/raft-sample/blob/48254d0c71/fsm/badger.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fsm | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/dgraph-io/badger/v2" | |
"github.com/hashicorp/raft" | |
"io" | |
"os" | |
"strings" | |
) | |
// badgerFSM raft.FSM implementation using badgerDB | |
type badgerFSM struct { | |
db *badger.DB | |
} | |
// Apply log is invoked once a log entry is committed. | |
// It returns a value which will be made available in the | |
// ApplyFuture returned by Raft.Apply method if that | |
// method was called on the same Raft node as the FSM. | |
func (b badgerFSM) Apply(log *raft.Log) interface{} { | |
switch log.Type { | |
case raft.LogCommand: | |
var payload = CommandPayload{} | |
if err := json.Unmarshal(log.Data, &payload); err != nil { | |
_, _ = fmt.Fprintf(os.Stderr, "error marshalling store payload %s\n", err.Error()) | |
return nil | |
} | |
op := strings.ToUpper(strings.TrimSpace(payload.Operation)) | |
switch op { | |
case "SET": | |
return &ApplyResponse{ | |
Error: b.set(payload.Key, payload.Value), | |
Data: payload.Value, | |
} | |
case "GET": | |
data, err := b.get(payload.Key) | |
return &ApplyResponse{ | |
Error: err, | |
Data: data, | |
} | |
case "DELETE": | |
return &ApplyResponse{ | |
Error: b.delete(payload.Key), | |
Data: nil, | |
} | |
} | |
} | |
_, _ = fmt.Fprintf(os.Stderr, "not raft log command type\n") | |
return nil | |
} | |
// Snapshot will be called during make snapshot. | |
// Snapshot is used to support log compaction. | |
// No need to call snapshot since it already persisted in disk (using BadgerDB) when raft calling Apply function. | |
func (b badgerFSM) Snapshot() (raft.FSMSnapshot, error) { | |
return newSnapshotNoop() | |
} | |
// Restore is used to restore an FSM from a Snapshot. It is not called | |
// concurrently with any other command. The FSM must discard all previous | |
// state. | |
// Restore will update all data in BadgerDB | |
func (b badgerFSM) Restore(rClose io.ReadCloser) error { | |
defer func() { | |
if err := rClose.Close(); err != nil { | |
_, _ = fmt.Fprintf(os.Stdout, "[FINALLY RESTORE] close error %s\n", err.Error()) | |
} | |
}() | |
_, _ = fmt.Fprintf(os.Stdout, "[START RESTORE] read all message from snapshot\n") | |
var totalRestored int | |
decoder := json.NewDecoder(rClose) | |
for decoder.More() { | |
var data = &CommandPayload{} | |
err := decoder.Decode(data) | |
if err != nil { | |
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error decode data %s\n", err.Error()) | |
return err | |
} | |
if err := b.set(data.Key, data.Value); err != nil { | |
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error persist data %s\n", err.Error()) | |
return err | |
} | |
totalRestored++ | |
} | |
// read closing bracket | |
_, err := decoder.Token() | |
if err != nil { | |
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error %s\n", err.Error()) | |
return err | |
} | |
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] success restore %d messages in snapshot\n", totalRestored) | |
return nil | |
} | |
// NewBadger raft.FSM implementation using badgerDB | |
func NewBadger(badgerDB *badger.DB) raft.FSM { | |
return &badgerFSM{ | |
db: badgerDB, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment