Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d32e3db
Implement The Binary Record Format and made Marshal and Unmarshal Fun…
Souvik606 Jun 4, 2026
6bd3d05
Implement the Batch Group Write Worker logic
Souvik606 Jun 5, 2026
e824395
Implement segment rotation logic
Souvik606 Jun 5, 2026
bd89933
Implement the replay logic to rebuild the memtable on server crash
Souvik606 Jun 5, 2026
91c9f7d
Add unit test cases for WAL phase and added concurrency tests for ski…
Souvik606 Jun 5, 2026
bfa0f94
Merge branch 'main' into feat/storage-engine/wal
Souvik606 Jun 5, 2026
1903a89
Fixed missing package bugg and panic on closing bugs
Souvik606 Jun 5, 2026
eabe20c
Fixed linting errors
Souvik606 Jun 5, 2026
f687e11
Document the code to explain the functions and record format
Souvik606 Jun 5, 2026
4e34f85
Fixed space formatting linting errors
Souvik606 Jun 5, 2026
96c242b
Add LF line ending and added few tests and bug fixes
Souvik606 Jun 5, 2026
5283cbd
Remove magic numbers
theMr17 Jun 6, 2026
26218d7
Fixed issues of race condition between different channels and file sy…
Souvik606 Jun 8, 2026
d4b949d
feat: implement Write-Ahead Log recovery with segment replay and corr…
theMr17 Jun 8, 2026
655a012
feat: implement WAL writer with concurrent-safe append and automatic …
theMr17 Jun 8, 2026
3b7d5f9
Refactor unit tests for WAL recovery, writer, and record serializatio…
theMr17 Jun 10, 2026
acaee83
Add ErrKeyTooLarge and ErrFrameTooLarge
theMr17 Jun 10, 2026
99413c3
Added ErrInvalidOpcode sentinel error
theMr17 Jun 10, 2026
05deb0c
Seed currentSizeBytes from disk when opening a segment
theMr17 Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions internal/storage/memtable/skiplist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package memtable
import (
"bytes"
"errors"
"fmt"
"sync"
"testing"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Expand Down Expand Up @@ -176,6 +178,104 @@ func TestSkipList_EmptyAndNil(t *testing.T) {
}
}

// TestSkipList_StrictConcurrency verifies that a single writer goroutine and
// multiple concurrent reader goroutines operating on the same key never observe
// a corrupted or malformed value, confirming read-write lock correctness.
func TestSkipList_StrictConcurrency(t *testing.T) {
skipList := NewSkipList(100000, 12)
var waitGroup sync.WaitGroup
key := []byte("shared-key")

waitGroup.Add(1)
go func() {
defer waitGroup.Done()
for i := 0; i < 1000; i++ {
val := []byte(fmt.Sprintf("val-%d", i))
_ = skipList.Put(key, val)
}
}()

for r := 0; r < 5; r++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
for i := 0; i < 1000; i++ {
val, err := skipList.Get(key)
if err != nil && !errors.Is(err, ErrKeyNotFound) {
t.Errorf("unexpected error on Get: %v", err)
}
if err == nil {
if !bytes.HasPrefix(val, []byte("val-")) {
t.Errorf("corrupted value: %s", val)
}
}
}
}()
}

waitGroup.Wait()
}

// TestSkipList_Concurrency is a broad stress test that runs concurrent Puts,
// Deletes, and Iterator traversals across disjoint key ranges simultaneously,
// verifying that no deadlock, panic, or data corruption occurs under contention.
func TestSkipList_Concurrency(t *testing.T) {
skipList := NewSkipList(100000, 12)
var waitGroup sync.WaitGroup

for i := 0; i < 100; i++ {
waitGroup.Add(1)
go func(id int) {
defer waitGroup.Done()
key := []byte(fmt.Sprintf("key-%03d", id))
val := []byte(fmt.Sprintf("val-%03d", id))
if err := skipList.Put(key, val); err != nil {
t.Errorf("put failed for %s: %v", key, err)
}
}(i)
}

for i := 100; i < 200; i++ {
waitGroup.Add(1)
go func(id int) {
defer waitGroup.Done()
key := []byte(fmt.Sprintf("key-%03d", id))
if err := skipList.Delete(key); err != nil {
t.Errorf("delete failed for %s: %v", key, err)
}
}(i)
}

for i := 0; i < 20; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
iterator := skipList.NewIterator()
for iterator.Valid() {
_, _, _ = iterator.Next()
}
}()
}

waitGroup.Wait()

for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key-%03d", i))
expected := []byte(fmt.Sprintf("val-%03d", i))
got, err := skipList.Get(key)
if err != nil || !bytes.Equal(got, expected) {
t.Fatalf("unexpected state for %s: got (%q, %v), want (%q, nil)", key, got, err, expected)
}
}

for i := 100; i < 200; i++ {
key := []byte(fmt.Sprintf("key-%03d", i))
if _, err := skipList.Get(key); !errors.Is(err, ErrKeyNotFound) {
t.Fatalf("expected ErrKeyNotFound for deleted/tombstoned key %s, got %v", key, err)
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// TestSkipList_SortedOrder verifies that the iterator always returns keys in
// ascending lexicographic order, regardless of insertion order.
func TestSkipList_SortedOrder(t *testing.T) {
Expand Down
Empty file removed internal/storage/wal/.gitkeep
Empty file.
49 changes: 49 additions & 0 deletions internal/storage/wal/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Package wal implements a Write-Ahead Log (WAL) for penguin-db.
// It supports log appending, rotation, record serialization, and replay recovery.
package wal

import (
"errors"
"math"
)

var (
// ErrInvalidCRC is returned when a record's checksum does not match its payload.
ErrInvalidCRC = errors.New("corrupt wal record: crc32 mismatch")

// ErrTruncated is returned when the log record has an unexpected size or EOF is reached mid-record.
ErrTruncated = errors.New("corrupt wal record: truncated payload")

// ErrEmptyKey is returned when attempting to write a log record with a zero-length key.
ErrEmptyKey = errors.New("wal record rejected: key must not be empty")

// ErrInvalidOpcode is returned when a record carries an opcode that is not
// recognized by the WAL format. Persisting such a record would succeed but
// the entry would be silently skipped during recovery replay.
ErrInvalidOpcode = errors.New("wal record rejected: unrecognized opcode")

// ErrKeyTooLarge is returned when the key exceeds the maximum representable
// length (math.MaxUint16 bytes) in the on-disk frame format.
ErrKeyTooLarge = errors.New("wal record rejected: key length exceeds maximum of " + uitoa(math.MaxUint16) + " bytes")

// ErrFrameTooLarge is returned when the total serialized frame size exceeds
// the maximum representable size (math.MaxUint32 bytes) in the on-disk format.
ErrFrameTooLarge = errors.New("wal record rejected: frame size exceeds maximum of " + uitoa(math.MaxUint32) + " bytes")
)

// uitoa converts an unsigned integer to its decimal string representation.
// Used to embed numeric limits in error sentinel messages at init time without
// depending on strconv or fmt.
func uitoa(val uint64) string {
if val == 0 {
return "0"
}
var buf [20]byte // max uint64 is 20 digits
i := len(buf)
for val > 0 {
i--
buf[i] = byte(val%10) + '0'
val /= 10
}
return string(buf[i:])
}
126 changes: 126 additions & 0 deletions internal/storage/wal/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package wal

import (
"encoding/binary"
"hash/crc32"
"math"
)

const (
// OpcodePut represents a put/insert operation in the WAL.
OpcodePut uint8 = 0
// OpcodeDelete represents a delete operation in the WAL.
OpcodeDelete uint8 = 1
)

// Record represents a single change logged in the WAL, wrapping an operation
// type (Opcode), key, and value payload.
type Record struct {
Opcode uint8
Key []byte
Value []byte
}

// Define sizes for each field in the frame
const (
checksumSize = 4
frameSizeSize = 4
opcodeSize = 1
keyLengthSize = 2

// Fixed header size is the sum of all fixed-length fields
fixedHeaderSize = checksumSize + frameSizeSize + opcodeSize + keyLengthSize
)

// Define offsets for each field to eliminate magic slice indices
const (
checksumOffset = 0
frameSizeOffset = checksumOffset + checksumSize
opcodeOffset = frameSizeOffset + frameSizeSize
keyLengthOffset = opcodeOffset + opcodeSize
keyOffset = keyLengthOffset + keyLengthSize
)

// Marshal serializes the Record into a binary frame.
//
// Frame Layout:
// +-------------+-------------+----------+------------+-----------+-----------+
// | Checksum | Frame Size | Opcode | Key Length | Key | Value |
// | (4 bytes) | (4 bytes) | (1 byte) | (2 bytes) | (n bytes) | (m bytes) |
// +-------------+-------------+----------+------------+-----------+-----------+
//
// Note: The Checksum (CRC32) covers all bytes starting from the Frame Size.
//
// Marshal returns ErrKeyTooLarge if the key exceeds math.MaxUint16 bytes, or
// ErrFrameTooLarge if the total frame size exceeds math.MaxUint32 bytes, since
// these sizes cannot be represented in the on-disk field widths.
func (record *Record) Marshal() ([]byte, error) {
keyLen := len(record.Key)
valLen := len(record.Value)

if keyLen > math.MaxUint16 {
return nil, ErrKeyTooLarge
}

totalFrameSizeBytes := fixedHeaderSize + keyLen + valLen

if totalFrameSizeBytes > math.MaxUint32 {
return nil, ErrFrameTooLarge
}

frameBuffer := make([]byte, totalFrameSizeBytes)

binary.LittleEndian.PutUint32(frameBuffer[frameSizeOffset:opcodeOffset], uint32(totalFrameSizeBytes))
frameBuffer[opcodeOffset] = record.Opcode
binary.LittleEndian.PutUint16(frameBuffer[keyLengthOffset:keyOffset], uint16(keyLen))
Comment thread
coderabbitai[bot] marked this conversation as resolved.

copy(frameBuffer[keyOffset:], record.Key)
valueOffset := keyOffset + keyLen
copy(frameBuffer[valueOffset:], record.Value)

calculatedChecksum := crc32.ChecksumIEEE(frameBuffer[frameSizeOffset:])
binary.LittleEndian.PutUint32(frameBuffer[checksumOffset:frameSizeOffset], calculatedChecksum)

return frameBuffer, nil
}

// UnmarshalRecord deserializes a raw binary frame and reconstructs the original
// Record. It validates the data integrity using the CRC checksum and performs
// bounds checking on payload lengths.
func UnmarshalRecord(frameData []byte) (*Record, error) {
if len(frameData) < fixedHeaderSize {
return nil, ErrTruncated
}

storedChecksum := binary.LittleEndian.Uint32(frameData[checksumOffset:frameSizeOffset])
calculatedChecksum := crc32.ChecksumIEEE(frameData[frameSizeOffset:])

if storedChecksum != calculatedChecksum {
return nil, ErrInvalidCRC
}

extractedOpcode := frameData[opcodeOffset]
extractedKeyLength := binary.LittleEndian.Uint16(frameData[keyLengthOffset:keyOffset])

if len(frameData) < fixedHeaderSize+int(extractedKeyLength) {
return nil, ErrTruncated
}

extractedKey := make([]byte, extractedKeyLength)
copy(extractedKey, frameData[keyOffset:keyOffset+int(extractedKeyLength)])

extractedValueLength := len(frameData) - (fixedHeaderSize + int(extractedKeyLength))
var extractedValue []byte

if extractedValueLength > 0 {
extractedValue = make([]byte, extractedValueLength)
valueOffset := keyOffset + int(extractedKeyLength)
copy(extractedValue, frameData[valueOffset:])
}

return &Record{
Opcode: extractedOpcode,
Key: extractedKey,
Value: extractedValue,
}, nil
}
Loading