Note: I loved making the Redis example from this earlier post so much, that I’ve decided to make a series of posts called “Rebuilding Redis”. Hope you enjoy! Another note: All source code for this portion of the Redis rebuild can be found here.
Introduction Link to heading
Put yourself in the following situation 🚗
It’s getting later on a Saturday afternoon. You’ve spent all day running around town, shopping for groceries, found a great present for your friend’s birthday, and you even managed to pick up dog food for your pup. You’re exhausted, and you’re ready to get home and relax - all that’s left is to grab a few things from the hardware store.
So, you go into the hardware store, grab what you need, come back out and……is that….your car? Oh no. The door is open! You forgot to lock the door! Everything is gone. Your groceries, your dog food, your birthday present. You’ve been robbed!
After a few minutes of some choice words, and realizing there are no cameras around in this massive parking lot, you think to yourself “alright, it’s not the end of the world, I’ll just go grab everything again”. You’ll just go back to the stores, grab everything you need, and head home.
Only one problem: what was that birthday present again?
You can’t remember. It was a spur of the moment item that you had just found, and the stress of the situation is making it impossible to recall. If only you had written it down somewhere, and you could retrace your steps to figure out what you had bought…
If only you had a WAL.
What is a WAL? Link to heading
A Write-Ahead Log is a mechanism for ensuring that data is written to disk when it is committed somewhere. Much like the situation above, when we encounter a failure state, we could resolve our problems by replaying the steps we got to. It’s a simple concept, but it’s one that is incredibly powerful and is used in many databases today such as
How does it work? Link to heading
Conceptually, the way a WAL works is quite simple - when you do an action, log what you did. Then, when you need to recover, replay the log of actions from a known state.
If we were to diagram out a traditional WAL, it would look like the following:
In this diagram, we have a WAL that is being written to by a database. The database is writing to the WAL, and then writing to the database. When the database needs to recover, it can replay the WAL to get back to a known state.
But Redis is a bit different, in that what makes it fast is that it doesn’t write to disk on every operation. So how, then, can we guarantee recoverability if we don’t write to disk on every operation?
The Append-Only File (AOF) Link to heading
The Append-Only File is a mechanism that Redis uses to ensure that data is recoverable in the event of a failure. It works much like a WAL by writing every operation to a file, and then replaying that file on startup to get back to a known state.
However, unlike a WAL, there is no guarantee that the data is written to disk at the time of the operation. This means that, in the event of a failure, we could lose data that was written to the AOF but not yet written to disk.
If we move the arrow around in the above diagram, we can turn our WAL into an AOF:
Notice where the write is happening. This is the key difference between a WAL and an AOF - the WAL is written to before the database guarantees the write, while the AOF is written to during or after the database guarantees the write. This means that, in the event of a failure, we could lose data that was written to the AOF but not yet written to disk. However, given Redis’s most common use case of caching, this can be perfectly acceptable, and in fact desired to sacrifice data guarantees for performance.
So how do we implement this in our Redis clone and keep the database fast? We’ll use the topic from our previous post - channels.
Implementing the AOF Link to heading
Let’s start by defining our basic parts of our persistence engine to get them in place. We’ll start by creating the outline of the engine which will consist of a few parts:
- A log
- A snapshotter
- A logger
package main
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/gob"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
type PersistenceLog struct {
// The command used for the action
Command string
// The arguments used for the action
Arguments []string
// Timestamp of the operation
Timestamp time.Time
}
type PersistenceEngine struct {
// The path for a top level data directory
DataDir string
// The path to the append-only file directory
AOFDir string
// The path to the snapshot directory
SnapshotDir string
// Interval at which to run a snapshot
SnapshotInterval time.Duration
// Time snapshots and AOFs will be retained before deleting
SnapshotRetention time.Duration
// The time since the last database snapshot
LastSnapshotTime time.Time
// The time since the last log to the AOF
LastLogFlushTime time.Time
// Channel accepting AOF log messages
LogChannel chan PersistenceLog
// Signal whether to stop ingesting logs temporarily (like during a snapshot)
LogLock sync.Mutex
// The time interval between log writes to flush to storage
LogFlushInterval time.Duration
// Pointer to the in-memory data store
StoreRef *Store
// Pointer to log reference
LogRef []PersistenceLog
// Buffered writer to use for persisting logs to disk
AOFBufferedWriter *bufio.Writer
// Encoder for writing binary files to the log
AOFEncoder *gob.Encoder
// Underlying writer for persisting logs
AOFFile *os.File
}
// NewPersistenceEngine returns a new persistence engine
func NewPersistenceEngine(store *Store) *PersistenceEngine {
fmt.Println("Starting persistence engine")
return &PersistenceEngine{
DataDir: "_data",
AOFDir: "aof",
SnapshotDir: "snapshot",
SnapshotRetention: time.Minute * 30,
LastLogFlushTime: time.Now(),
SnapshotInterval: time.Minute * 1,
LogFlushInterval: time.Second * 5,
LogChannel: make(chan PersistenceLog), // Give the log channel a buffer to make non-blocking
LogRef: make([]PersistenceLog, 2014),
StoreRef: store,
}
}
Now there’s a good amount on this struct here, but breaking it down, we can see that that this consists of various settings pertaining to:
- How often to take a snapshot
- How often to rotate the AOF log
- References to the communication channels
- Holding references to other engines and stores
- Holding references to encoders and file buffers
With our struct and initializer in place, we can begin the logic for the process of the snapshotter and AOF log.
Performing Snapshots and Logging Link to heading
Much in the same manner that we did the expiry, we’ll be doing this passively on request.
We can start by setting up our main function which will start the engine, perform necessary setup on the filesystem for storage, and then begin listening on the channel:
// Start starts the persistence engine
func (p *PersistenceEngine) Start() {
p.ensureDirsExist()
p.RestoreSnapshot()
// Ingest logs from the channel on a background thread
go func() {
for {
// Check if we need to take a snapshot
if p.IsPendingSnapshot() {
p.PerformSnapshot()
}
// Check for incoming messages
toLog := <-p.LogChannel
if err := p.ProcessLog(&toLog); err != nil {
fmt.Println(err)
}
}
}()
}
func makeDirsIfNotExists(dirs []string) error {
for _, dir := range dirs {
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0700); err != nil {
fmt.Println(err)
return err
}
}
}
return nil
}
// ensureDirsExist ensures that the required directories exist on the filesystem
func (p *PersistenceEngine) ensureDirsExist() error {
// Ensure our data directories exists
dirs := []string{
p.DataDir,
filepath.Join(p.DataDir, p.SnapshotDir),
filepath.Join(p.DataDir, p.AOFDir),
}
if err := makeDirsIfNotExists(dirs); err != nil {
fmt.Println(err)
return err
}
return nil
}
With this in place, let’s impliment the pending snapshot function, which at every iteration of the read, will check for a pending snapshot at the end. Notice how we purposely placed this prior to stopping on the listen through the channel, so we capture a new starting snapshot on startup by default.
// PerformSnapshot Performs a snapshot to the disk
func (p *PersistenceEngine) PerformSnapshot() error {
fmt.Println("performing snapshot")
// Create a buffer to hold the gob-encoded data
var gobBuffer bytes.Buffer
newSnapshotTime := time.Now()
// Create a new gob encoder and encode the map into the buffer
enc := gob.NewEncoder(&gobBuffer)
if err := enc.Encode(*p.StoreRef); err != nil {
fmt.Println("error encoding store:", err)
}
// Create a file to write the compressed data
fileName := fmt.Sprintf("%d.gob.gz", newSnapshotTime.UnixMilli())
snapshotName := filepath.Join(p.DataDir, p.SnapshotDir, fileName)
file, err := os.Create(snapshotName)
if err != nil {
fmt.Println("error creating snapshot:", err)
}
defer file.Close()
// Create a new gzip writer
gzipWriter := gzip.NewWriter(file)
defer gzipWriter.Close()
// Write the gob-encoded data to the gzip writer
if _, err := gzipWriter.Write(gobBuffer.Bytes()); err != nil {
fmt.Println("error writing compressed data to snapshot:", err)
}
// Flush the gzip writer to ensure all data is written
if err := gzipWriter.Flush(); err != nil {
fmt.Println("error flushing snapshot to disk:", err)
}
fmt.Printf("successfully wrote snapshot %d to disk\n", newSnapshotTime.UnixMilli())
// Successful snapshot! Rotate the AOF log and restart AOF ingestion from new starting point
p.LastSnapshotTime = newSnapshotTime
p.RotateLog()
return nil
}
To perform this snapshot, we’re doing a few things for persistence and recoverability:
- Encoding the data so it can be read back on restore into structs through the use of a
gob
encoder - Gzipping the file to take up less space on the filesystem.
To see how we use this in this context, let’s write the recovery portion.
// RestoreSnapshot restores the database from a given snapshot and AOF log
func (p *PersistenceEngine) RestoreSnapshot() error {
// Look for the most recent snapshot
files, err := os.ReadDir(filepath.Join(p.DataDir, p.SnapshotDir))
if err != nil {
fmt.Println("failed to read directory")
}
// Directories are already sorted by newFileName
for _, file := range files {
fmt.Printf("Found snapshot: %s\n", file.Name())
}
latestSnapshot := files[len(files)-1].Name()
fmt.Printf("restoring snapshot %s\n", latestSnapshot)
// Create the gzip reader
compressedFile, err := os.ReadFile(filepath.Join(p.DataDir, p.SnapshotDir, latestSnapshot))
if err != nil {
fmt.Println("failed to open snapshot")
}
gzipReader, err := gzip.NewReader(bytes.NewBuffer(compressedFile))
if err != nil {
fmt.Println("failed to open gzip reader:", err)
}
defer gzipReader.Close()
decompressedSnapshot, err := io.ReadAll(gzipReader)
if err != nil {
fmt.Println("failed to decompress snapshot", err)
}
dec := gob.NewDecoder(bytes.NewReader(decompressedSnapshot))
if err := dec.Decode(p.StoreRef); err != nil {
fmt.Println("Failed to decode snapshot gob: ", err)
}
// Replay the AOF for the selected snapshot
p.RestoreAOF(latestSnapshot)
return nil
}
In the recovery, you can see that were taking advantage of how we encoded this to disk, being able to quite easily replay the database back into memory after being decompressed. However, this just plays the snapshot back into memory - we can’t take a snapshot every time we write to the database, as that would be an insane amount of storage and disk i/o. So, how do we recover state between the snapshots?
This is where the AOF steps in:
// Log to a file
func (p *PersistenceEngine) ProcessLog(log *PersistenceLog) error {
if log.Timestamp == (time.Time{}) {
log.Timestamp = time.Now()
}
// Log to our AOF
p.AOFEncoder.Encode(log)
// Check if we need to flush to disk
flushAfterTimeIs := p.LastLogFlushTime.Add(p.LogFlushInterval)
if flushAfterTimeIs.Before(time.Now()) {
err := p.AOFBufferedWriter.Flush()
if err != nil {
fmt.Println("error flushing to disk:", err)
}
p.LastLogFlushTime = time.Now()
}
return nil
}
Luckily for us, using the gob
package in Go is super simple, where all we have to do is call encode the data coming into the gob writer. However, notice that we’re not immediately writing our data to disk - instead, we’re flushing the operation to disk if a length of time has passed. This allows us to minimize writes to the disk where we may have a large surge of writes come in, as not to hinder the performance of the writer under heavy load.
This is why the Redis AOF is an AOF and not a WAL - these writes are not gaurunteed at time of write to the database in case of a crash, but as mentioned earlier surrounding the purposes of Redis as a high-throughput caching layer, this is normally acceptable.
Now, for the restore, we’ll take the snapshot timestamp we used, and replay back just the AOF since that snapshot. As you can see two examples up, we call the RotateLog
function when we create a new snapshot.
// rotateLogFile rotate the AOF log file to the newest snapshot time
func (p *PersistenceEngine) RotateLog() error {
// Flush any buffers, if they exist, and close
if p.AOFFile != nil && p.AOFBufferedWriter != nil && p.AOFEncoder != nil {
// Flush and close
if err := p.AOFBufferedWriter.Flush(); err != nil {
fmt.Println("failed to flush AOF log", err)
}
if err := p.AOFFile.Close(); err != nil {
fmt.Println("failed to close file", err)
}
p.AOFEncoder = nil
}
// Create the buffered writer and encoder
newFileName := filepath.Join(p.DataDir, p.AOFDir, strconv.FormatInt(p.LastSnapshotTime.UnixMilli(), 10))
file, err := os.Create(newFileName)
if err != nil {
return err
}
bufferedWriter := bufio.NewWriter(file)
enc := gob.NewEncoder(bufferedWriter)
// Make sure our engine is using the new writer
p.AOFFile = file
p.AOFBufferedWriter = bufferedWriter
p.AOFEncoder = enc
return nil
}
// RestoreAOF Loads an AOF file for replaying to the store
func (p *PersistenceEngine) RestoreAOF(snapshotFile string) error {
// remove the gob.gz from the end
snapshotId := strings.Trim(snapshotFile, ".gob.gz")
filePath := filepath.Join(p.DataDir, p.AOFDir, snapshotId)
// Load the file for the necessary AOF
file, err := os.Open(filePath)
if err != nil {
fmt.Println("error opening aof for snapshot:", err)
}
defer file.Close()
// Load them back into in-memory
dec := gob.NewDecoder(file)
// Replay the actions
var logsToReplay []PersistenceLog
for {
var log PersistenceLog
err := dec.Decode(&log)
if err != nil {
if err != io.EOF {
fmt.Println("encoundered error decoding aof: ", err)
}
break
}
fmt.Printf("restoring action: %v\n", log)
logsToReplay = append(logsToReplay, log)
}
// Run the replay
ReplayAOF(&logsToReplay)
return nil
}
// ReplayAOF replays the actions from an AOF back to disk
func ReplayAOF(logs *[]PersistenceLog) error {
for _, log := range *logs {
switch log.Command {
case "SET":
PerformSet(log.Arguments, nil)
case "DEL":
PerformDel(log.Arguments, nil)
default:
fmt.Printf("unknown command to restore: %s\n", log.Command)
}
}
return nil
}
All we have to do to restore the database is replay the encoded AOF file until we hit the end of the file, which each loop iteration replaying the action against the database. Quite a simple solution to handle the complex problem of state recoverability!
And that’s it - with this we have made a simple recoverability solution to give our Redis rewrite a way to recover from unexpected crashes.
All source code can be found for this portion of the solution here.