Note: I loved making the Redis example from this earlier post so much, that I’ve decided to make a series of posts called “Rebuilding Redis”. Hope you enjoy! Another note: All source code for this portion of the Redis rebuild can be found here.

Introduction Link to heading

Put yourself in the following situation 🚗

It’s getting later on a Saturday afternoon. You’ve spent all day running around town, shopping for groceries, found a great present for your friend’s birthday, and you even managed to pick up dog food for your pup. You’re exhausted, and you’re ready to get home and relax - all that’s left is to grab a few things from the hardware store.

So, you go into the hardware store, grab what you need, come back out and……is that….your car? Oh no. The door is open! You forgot to lock the door! Everything is gone. Your groceries, your dog food, your birthday present. You’ve been robbed!

After a few minutes of some choice words, and realizing there are no cameras around in this massive parking lot, you think to yourself “alright, it’s not the end of the world, I’ll just go grab everything again”. You’ll just go back to the stores, grab everything you need, and head home.

Only one problem: what was that birthday present again?

You can’t remember. It was a spur of the moment item that you had just found, and the stress of the situation is making it impossible to recall. If only you had written it down somewhere, and you could retrace your steps to figure out what you had bought…

If only you had a WAL.

What is a WAL? Link to heading

A Write-Ahead Log is a mechanism for ensuring that data is written to disk when it is committed somewhere. Much like the situation above, when we encounter a failure state, we could resolve our problems by replaying the steps we got to. It’s a simple concept, but it’s one that is incredibly powerful and is used in many databases today such as

How does it work? Link to heading

Conceptually, the way a WAL works is quite simple - when you do an action, log what you did. Then, when you need to recover, replay the log of actions from a known state.

If we were to diagram out a traditional WAL, it would look like the following:

flowchart LR C["Client"] -- "Sends Write" --> A["Application"] A["Application"] -- "Writes" --> Log Log -- "Commits" --> Database A -- "Read" --> Database

In this diagram, we have a WAL that is being written to by a database. The database is writing to the WAL, and then writing to the database. When the database needs to recover, it can replay the WAL to get back to a known state.

But Redis is a bit different, in that what makes it fast is that it doesn’t write to disk on every operation. So how, then, can we guarantee recoverability if we don’t write to disk on every operation?

The Append-Only File (AOF) Link to heading

The Append-Only File is a mechanism that Redis uses to ensure that data is recoverable in the event of a failure. It works much like a WAL by writing every operation to a file, and then replaying that file on startup to get back to a known state.

However, unlike a WAL, there is no guarantee that the data is written to disk at the time of the operation. This means that, in the event of a failure, we could lose data that was written to the AOF but not yet written to disk.

If we move the arrow around in the above diagram, we can turn our WAL into an AOF:

flowchart LR c["client"] -- "sends write" --> a["application"] A["Application"] -- "Writes" --> DB["Database"] DB -- "Writes" --> Log["Log"] A -- "Read" --> DB["Database"]

Notice where the write is happening. This is the key difference between a WAL and an AOF - the WAL is written to before the database guarantees the write, while the AOF is written to during or after the database guarantees the write. This means that, in the event of a failure, we could lose data that was written to the AOF but not yet written to disk. However, given Redis’s most common use case of caching, this can be perfectly acceptable, and in fact desired to sacrifice data guarantees for performance.

So how do we implement this in our Redis clone and keep the database fast? We’ll use the topic from our previous post - channels.

Implementing the AOF Link to heading

Let’s start by defining our basic parts of our persistence engine to get them in place. We’ll start by creating the outline of the engine which will consist of a few parts:

  • A log
  • A snapshotter
  • A logger
package main

import (
	"bufio"
	"bytes"
	"compress/gzip"
	"encoding/gob"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"time"
)

type PersistenceLog struct {
	// The command used for the action
    Command string
	// The arguments used for the action
	Arguments []string
	// Timestamp of the operation
	Timestamp time.Time
}

type PersistenceEngine struct {
	// The path for a top level data directory
	DataDir string
	// The path to the append-only file directory
	AOFDir string
	// The path to the snapshot directory
	SnapshotDir string
	// Interval at which to run a snapshot
	SnapshotInterval time.Duration
	// Time snapshots and AOFs will be retained before deleting
	SnapshotRetention time.Duration
	// The time since the last database snapshot
	LastSnapshotTime time.Time
	// The time since the last log to the AOF
	LastLogFlushTime time.Time
	// Channel accepting AOF log messages
	LogChannel chan PersistenceLog
	// Signal whether to stop ingesting logs temporarily (like during a snapshot)
	LogLock sync.Mutex
	// The time interval between log writes to flush to storage
	LogFlushInterval time.Duration
	// Pointer to the in-memory data store
	StoreRef *Store
	// Pointer to log reference
	LogRef []PersistenceLog
	// Buffered writer to use for persisting logs to disk
	AOFBufferedWriter *bufio.Writer
	// Encoder for writing binary files to the log
	AOFEncoder *gob.Encoder
	// Underlying writer for persisting logs
	AOFFile *os.File
}

// NewPersistenceEngine returns a new persistence engine
func NewPersistenceEngine(store *Store) *PersistenceEngine {
	fmt.Println("Starting persistence engine")
	return &PersistenceEngine{
		DataDir:           "_data",
		AOFDir:            "aof",
		SnapshotDir:       "snapshot",
		SnapshotRetention: time.Minute * 30,
		LastLogFlushTime:  time.Now(),
		SnapshotInterval:  time.Minute * 1,
		LogFlushInterval:  time.Second * 5,
		LogChannel:        make(chan PersistenceLog), // Give the log channel a buffer to make non-blocking
		LogRef:            make([]PersistenceLog, 2014),
		StoreRef:          store,
	}
}

Now there’s a good amount on this struct here, but breaking it down, we can see that that this consists of various settings pertaining to:

  • How often to take a snapshot
  • How often to rotate the AOF log
  • References to the communication channels
  • Holding references to other engines and stores
  • Holding references to encoders and file buffers

With our struct and initializer in place, we can begin the logic for the process of the snapshotter and AOF log.

Performing Snapshots and Logging Link to heading

Much in the same manner that we did the expiry, we’ll be doing this passively on request.

flowchart LR C["Client"] -- "Request" --> S["Server"] S --> psi{"Past Snapshot Interval?"} psi -- "Yes" --> ss["Start Snapshot"] psi -- "No" --> atl["Append to Log"] ss --> p[Pause AOF injestion] p --> ps[Write encoded snapshot] ps --> rl[Rotate the AOF log] rl --> rs[Resume injestion]

We can start by setting up our main function which will start the engine, perform necessary setup on the filesystem for storage, and then begin listening on the channel:

// Start starts the persistence engine
func (p *PersistenceEngine) Start() {
	p.ensureDirsExist()
	p.RestoreSnapshot()
	// Ingest logs from the channel on a background thread
	go func() {
		for {
			// Check if we need to take a snapshot
			if p.IsPendingSnapshot() {
				p.PerformSnapshot()
			}
			// Check for incoming messages
			toLog := <-p.LogChannel
			if err := p.ProcessLog(&toLog); err != nil {
				fmt.Println(err)
			}
		}
	}()
}

func makeDirsIfNotExists(dirs []string) error {
	for _, dir := range dirs {
		if _, err := os.Stat(dir); os.IsNotExist(err) {
			if err := os.MkdirAll(dir, 0700); err != nil {
				fmt.Println(err)
				return err
			}
		}
	}

	return nil
}

// ensureDirsExist ensures that the required directories exist on the filesystem
func (p *PersistenceEngine) ensureDirsExist() error {
	// Ensure our data directories exists
	dirs := []string{
		p.DataDir,
		filepath.Join(p.DataDir, p.SnapshotDir),
		filepath.Join(p.DataDir, p.AOFDir),
	}

	if err := makeDirsIfNotExists(dirs); err != nil {
		fmt.Println(err)
		return err
	}
	return nil
}

With this in place, let’s impliment the pending snapshot function, which at every iteration of the read, will check for a pending snapshot at the end. Notice how we purposely placed this prior to stopping on the listen through the channel, so we capture a new starting snapshot on startup by default.

// PerformSnapshot Performs a snapshot to the disk
func (p *PersistenceEngine) PerformSnapshot() error {
	fmt.Println("performing snapshot")
	// Create a buffer to hold the gob-encoded data
	var gobBuffer bytes.Buffer

	newSnapshotTime := time.Now()

	// Create a new gob encoder and encode the map into the buffer
	enc := gob.NewEncoder(&gobBuffer)
	if err := enc.Encode(*p.StoreRef); err != nil {
		fmt.Println("error encoding store:", err)
	}

	// Create a file to write the compressed data
	fileName := fmt.Sprintf("%d.gob.gz", newSnapshotTime.UnixMilli())
	snapshotName := filepath.Join(p.DataDir, p.SnapshotDir, fileName)
	file, err := os.Create(snapshotName)
	if err != nil {
		fmt.Println("error creating snapshot:", err)
	}
	defer file.Close()

	// Create a new gzip writer
	gzipWriter := gzip.NewWriter(file)
	defer gzipWriter.Close()

	// Write the gob-encoded data to the gzip writer
	if _, err := gzipWriter.Write(gobBuffer.Bytes()); err != nil {
		fmt.Println("error writing compressed data to snapshot:", err)
	}

	// Flush the gzip writer to ensure all data is written
	if err := gzipWriter.Flush(); err != nil {
		fmt.Println("error flushing snapshot to disk:", err)
	}

	fmt.Printf("successfully wrote snapshot %d to disk\n", newSnapshotTime.UnixMilli())

	// Successful snapshot! Rotate the AOF log and restart AOF ingestion from new starting point
	p.LastSnapshotTime = newSnapshotTime
	p.RotateLog()

	return nil
}

To perform this snapshot, we’re doing a few things for persistence and recoverability:

  • Encoding the data so it can be read back on restore into structs through the use of a gob encoder
  • Gzipping the file to take up less space on the filesystem.

To see how we use this in this context, let’s write the recovery portion.

// RestoreSnapshot restores the database from a given snapshot and AOF log
func (p *PersistenceEngine) RestoreSnapshot() error {
	// Look for the most recent snapshot
	files, err := os.ReadDir(filepath.Join(p.DataDir, p.SnapshotDir))
	if err != nil {
		fmt.Println("failed to read directory")
	}

	// Directories are already sorted by newFileName
	for _, file := range files {
		fmt.Printf("Found snapshot: %s\n", file.Name())
	}

	latestSnapshot := files[len(files)-1].Name()
	fmt.Printf("restoring snapshot %s\n", latestSnapshot)

	// Create the gzip reader
	compressedFile, err := os.ReadFile(filepath.Join(p.DataDir, p.SnapshotDir, latestSnapshot))
	if err != nil {
		fmt.Println("failed to open snapshot")
	}

	gzipReader, err := gzip.NewReader(bytes.NewBuffer(compressedFile))
	if err != nil {
		fmt.Println("failed to open gzip reader:", err)
	}
	defer gzipReader.Close()

	decompressedSnapshot, err := io.ReadAll(gzipReader)
	if err != nil {
		fmt.Println("failed to decompress snapshot", err)
	}

	dec := gob.NewDecoder(bytes.NewReader(decompressedSnapshot))
	if err := dec.Decode(p.StoreRef); err != nil {
		fmt.Println("Failed to decode snapshot gob: ", err)
	}

	// Replay the AOF for the selected snapshot
	p.RestoreAOF(latestSnapshot)
	return nil
}

In the recovery, you can see that were taking advantage of how we encoded this to disk, being able to quite easily replay the database back into memory after being decompressed. However, this just plays the snapshot back into memory - we can’t take a snapshot every time we write to the database, as that would be an insane amount of storage and disk i/o. So, how do we recover state between the snapshots?

This is where the AOF steps in:

// Log to a file
func (p *PersistenceEngine) ProcessLog(log *PersistenceLog) error {
	if log.Timestamp == (time.Time{}) {
		log.Timestamp = time.Now()
	}

	// Log to our AOF
	p.AOFEncoder.Encode(log)

	// Check if we need to flush to disk
	flushAfterTimeIs := p.LastLogFlushTime.Add(p.LogFlushInterval)
	if flushAfterTimeIs.Before(time.Now()) {
		err := p.AOFBufferedWriter.Flush()
		if err != nil {
			fmt.Println("error flushing to disk:", err)
		}
		p.LastLogFlushTime = time.Now()
	}

	return nil
}

Luckily for us, using the gob package in Go is super simple, where all we have to do is call encode the data coming into the gob writer. However, notice that we’re not immediately writing our data to disk - instead, we’re flushing the operation to disk if a length of time has passed. This allows us to minimize writes to the disk where we may have a large surge of writes come in, as not to hinder the performance of the writer under heavy load.

This is why the Redis AOF is an AOF and not a WAL - these writes are not gaurunteed at time of write to the database in case of a crash, but as mentioned earlier surrounding the purposes of Redis as a high-throughput caching layer, this is normally acceptable.

Now, for the restore, we’ll take the snapshot timestamp we used, and replay back just the AOF since that snapshot. As you can see two examples up, we call the RotateLog function when we create a new snapshot.

// rotateLogFile rotate the AOF log file to the newest snapshot time
func (p *PersistenceEngine) RotateLog() error {
	// Flush any buffers, if they exist, and close
	if p.AOFFile != nil && p.AOFBufferedWriter != nil && p.AOFEncoder != nil {
		// Flush and close
		if err := p.AOFBufferedWriter.Flush(); err != nil {
			fmt.Println("failed to flush AOF log", err)
		}
		if err := p.AOFFile.Close(); err != nil {
			fmt.Println("failed to close file", err)
		}
		p.AOFEncoder = nil
	}

	// Create the buffered writer and encoder
	newFileName := filepath.Join(p.DataDir, p.AOFDir, strconv.FormatInt(p.LastSnapshotTime.UnixMilli(), 10))
	file, err := os.Create(newFileName)
	if err != nil {
		return err
	}
	bufferedWriter := bufio.NewWriter(file)
	enc := gob.NewEncoder(bufferedWriter)

	// Make sure our engine is using the new writer
	p.AOFFile = file
	p.AOFBufferedWriter = bufferedWriter
	p.AOFEncoder = enc

	return nil
}

// RestoreAOF Loads an AOF file for replaying to the store
func (p *PersistenceEngine) RestoreAOF(snapshotFile string) error {
	// remove the gob.gz from the end
	snapshotId := strings.Trim(snapshotFile, ".gob.gz")
	filePath := filepath.Join(p.DataDir, p.AOFDir, snapshotId)
	// Load the file for the necessary AOF
	file, err := os.Open(filePath)
	if err != nil {
		fmt.Println("error opening aof for snapshot:", err)
	}

	defer file.Close()

	// Load them back into in-memory
	dec := gob.NewDecoder(file)

	// Replay the actions
	var logsToReplay []PersistenceLog
	for {
		var log PersistenceLog
		err := dec.Decode(&log)
		if err != nil {
			if err != io.EOF {
				fmt.Println("encoundered error decoding aof: ", err)
			}
			break
		}
		fmt.Printf("restoring action: %v\n", log)
		logsToReplay = append(logsToReplay, log)
	}

	// Run the replay
	ReplayAOF(&logsToReplay)

	return nil
}

// ReplayAOF replays the actions from an AOF back to disk
func ReplayAOF(logs *[]PersistenceLog) error {
	for _, log := range *logs {
		switch log.Command {
		case "SET":
			PerformSet(log.Arguments, nil)
		case "DEL":
			PerformDel(log.Arguments, nil)
		default:
			fmt.Printf("unknown command to restore: %s\n", log.Command)
		}
	}

	return nil
}

All we have to do to restore the database is replay the encoded AOF file until we hit the end of the file, which each loop iteration replaying the action against the database. Quite a simple solution to handle the complex problem of state recoverability!

And that’s it - with this we have made a simple recoverability solution to give our Redis rewrite a way to recover from unexpected crashes.

All source code can be found for this portion of the solution here.