diff --git a/pkg/dataloader/dataloader.go b/pkg/dataloader/dataloader.go index 177744f..3745cc8 100644 --- a/pkg/dataloader/dataloader.go +++ b/pkg/dataloader/dataloader.go @@ -16,23 +16,23 @@ type Batch struct { // DataLoader — итератор для загрузки данных мини-батчами. // Отвечает за батчинг, перемешивание и итерацию по Dataset. type DataLoader struct { - dataset Dataset // Источник данных - batchSize int // Размер мини-батча - shuffle bool // Перемешивать ли данные перед каждой эпохой - dropLast bool // Отбрасывать ли последний неполный батч - rng *rand.Rand // Генератор случайных чисел для shuffle + dataset Dataset // Источник данных + batchSize int // Размер мини-батча + shuffle bool // Перемешивать ли данные перед каждой эпохой + dropLast bool // Отбрасывать ли последний неполный батч + rng *rand.Rand // Генератор случайных чисел для shuffle // Внутреннее состояние итератора - indices []int // Порядок индексов для текущей эпохи - currentIdx int // Текущая позиция в indices + indices []int // Порядок индексов для текущей эпохи + currentIdx int // Текущая позиция в indices } // DataLoaderConfig — конфигурация для создания DataLoader. type DataLoaderConfig struct { - BatchSize int // Размер батча (обязательный) - Shuffle bool // Перемешивать данные (по умолчанию false) - DropLast bool // Отбрасывать последний неполный батч (по умолчанию false) - Seed int64 // Seed для генератора случайных чисел (по умолчанию 0) + BatchSize int // Размер батча (обязательный) + Shuffle bool // Перемешивать данные (по умолчанию false) + DropLast bool // Отбрасывать последний неполный батч (по умолчанию false) + Seed int64 // Seed для генератора случайных чисел (по умолчанию 0) } // NewDataLoader создает новый DataLoader с заданной конфигурацией. @@ -42,12 +42,13 @@ type DataLoaderConfig struct { // - config: конфигурация DataLoader // // Пример: -// loader := NewDataLoader(dataset, DataLoaderConfig{ -// BatchSize: 32, -// Shuffle: true, -// DropLast: false, -// Seed: 42, -// }) +// +// loader := NewDataLoader(dataset, DataLoaderConfig{ +// BatchSize: 32, +// Shuffle: true, +// DropLast: false, +// Seed: 42, +// }) func NewDataLoader(dataset Dataset, config DataLoaderConfig) *DataLoader { if config.BatchSize <= 0 { panic("batch size must be positive") diff --git a/pkg/tensor/batch.go b/pkg/tensor/batch.go index 2bb7114..ae73fdc 100644 --- a/pkg/tensor/batch.go +++ b/pkg/tensor/batch.go @@ -2,7 +2,6 @@ package tensor import ( "fmt" - "sync" ) // BatchOps содержит пакетные операции для обработки множества тензоров @@ -35,45 +34,28 @@ func BatchMatMul(a, b *Tensor) (*Tensor, error) { } // Параллельная обработка батча - var wg sync.WaitGroup - numWorkers := min(batchSize, 8) // Ограничиваем количество воркеров - - batchPerWorker := (batchSize + numWorkers - 1) / numWorkers - - for w := 0; w < numWorkers; w++ { - startBatch := w * batchPerWorker - if startBatch >= batchSize { - break - } - endBatch := min((w+1)*batchPerWorker, batchSize) - - wg.Add(1) - go func(start, end int) { - defer wg.Done() - - for batchIdx := start; batchIdx < end; batchIdx++ { - // Извлекаем срезы для текущего батча - aOffset := batchIdx * m * n - bOffset := batchIdx * n * p - cOffset := batchIdx * m * p - - aSlice := a.Data[aOffset : aOffset+m*n] - bSlice := b.Data[bOffset : bOffset+n*p] - cSlice := result.Data[cOffset : cOffset+m*p] - - // Выполняем умножение матриц для этого элемента батча - if m >= ParallelThreshold || p >= ParallelThreshold { - matmulBlocked(aSlice, bSlice, cSlice, m, n, p) - } else if m >= BlockSize || p >= BlockSize { - matmulBlocked(aSlice, bSlice, cSlice, m, n, p) - } else { - matmulOptimized(aSlice, bSlice, cSlice, m, n, p) - } + ParallelFor(batchSize, 1, func(start, end int) { + for batchIdx := start; batchIdx < end; batchIdx++ { + // Извлекаем срезы для текущего батча + aOffset := batchIdx * m * n + bOffset := batchIdx * n * p + cOffset := batchIdx * m * p + + aSlice := a.Data[aOffset : aOffset+m*n] + bSlice := b.Data[bOffset : bOffset+n*p] + cSlice := result.Data[cOffset : cOffset+m*p] + + // Выполняем умножение матриц для этого элемента батча + if m >= ParallelThreshold || p >= ParallelThreshold { + matmulBlocked(aSlice, bSlice, cSlice, m, n, p) + } else if m >= BlockSize || p >= BlockSize { + matmulBlocked(aSlice, bSlice, cSlice, m, n, p) + } else { + matmulOptimized(aSlice, bSlice, cSlice, m, n, p) } - }(startBatch, endBatch) - } + } + }) - wg.Wait() return result, nil } @@ -100,31 +82,16 @@ func BatchAdd(tensors []*Tensor) (*Tensor, error) { } // Параллельное сложение - numWorkers := 4 - chunkSize := (size + numWorkers - 1) / numWorkers - - var wg sync.WaitGroup - for w := 0; w < numWorkers; w++ { - start := w * chunkSize - if start >= size { - break - } - end := min((w+1)*chunkSize, size) - - wg.Add(1) - go func(s, e int) { - defer wg.Done() - for i := s; i < e; i++ { - sum := 0.0 - for _, t := range tensors { - sum += t.Data[i] - } - result.Data[i] = sum + ParallelFor(size, MinGrainSize, func(start, end int) { + for i := start; i < end; i++ { + sum := 0.0 + for _, t := range tensors { + sum += t.Data[i] } - }(start, end) - } + result.Data[i] = sum + } + }) - wg.Wait() return result, nil } @@ -134,16 +101,12 @@ func BatchScale(tensors []*Tensor, scales []float64) error { return fmt.Errorf("количество тензоров и коэффициентов должно совпадать: %d != %d", len(tensors), len(scales)) } - var wg sync.WaitGroup - for i, t := range tensors { - wg.Add(1) - go func(tensor *Tensor, scale float64) { - defer wg.Done() - ScaleInPlace(scale, tensor) - }(t, scales[i]) - } + ParallelFor(len(tensors), 1, func(start, end int) { + for i := start; i < end; i++ { + ScaleInPlace(scales[i], tensors[i]) + } + }) - wg.Wait() return nil } @@ -303,38 +266,22 @@ func BatchMatMulSIMD(a, b *Tensor) (*Tensor, error) { } // Параллельная обработка с SIMD - var wg sync.WaitGroup - numWorkers := min(batchSize, 8) - batchPerWorker := (batchSize + numWorkers - 1) / numWorkers - - for w := 0; w < numWorkers; w++ { - startBatch := w * batchPerWorker - if startBatch >= batchSize { - break + ParallelFor(batchSize, 1, func(start, end int) { + for batchIdx := start; batchIdx < end; batchIdx++ { + aOffset := batchIdx * m * n + bOffset := batchIdx * n * p + cOffset := batchIdx * m * p + + aSlice := a.Data[aOffset : aOffset+m*n] + bSlice := b.Data[bOffset : bOffset+n*p] + cSlice := result.Data[cOffset : cOffset+m*p] + + // Используем SIMD-оптимизированное умножение + blockSize := chooseBlockSize(m, n, p) + matmulBlockedSIMD(aSlice, bSlice, cSlice, m, n, p, blockSize) } - endBatch := min((w+1)*batchPerWorker, batchSize) - - wg.Add(1) - go func(start, end int) { - defer wg.Done() - - for batchIdx := start; batchIdx < end; batchIdx++ { - aOffset := batchIdx * m * n - bOffset := batchIdx * n * p - cOffset := batchIdx * m * p - - aSlice := a.Data[aOffset : aOffset+m*n] - bSlice := b.Data[bOffset : bOffset+n*p] - cSlice := result.Data[cOffset : cOffset+m*p] - - // Используем SIMD-оптимизированное умножение - blockSize := chooseBlockSize(m, n, p) - matmulBlockedSIMD(aSlice, bSlice, cSlice, m, n, p, blockSize) - } - }(startBatch, endBatch) - } + }) - wg.Wait() return result, nil } diff --git a/pkg/tensor/blas_nocgo.go b/pkg/tensor/blas_nocgo.go index dbc1635..1dd7774 100644 --- a/pkg/tensor/blas_nocgo.go +++ b/pkg/tensor/blas_nocgo.go @@ -1,43 +1,36 @@ -// +build !cgo +//go:build !blas +// +build !blas package tensor -// BLASAvailable указывает, доступна ли BLAS библиотека -// В этой сборке без CGO BLAS недоступна +import "fmt" + const BLASAvailable = false -// MatMulBLAS - заглушка для сборки без CGO -// Возвращает ошибку, так как BLAS недоступна func MatMulBLAS(a, b *Tensor) (*Tensor, error) { - // Fallback на нативную оптимизированную версию return MatMul(a, b) } -// MatMulTransposeBBLAS - заглушка для сборки без CGO func MatMulTransposeBBLAS(a, b *Tensor) (*Tensor, error) { return MatMulTransposeB(a, b) } -// MatMulTransposeABLAS - заглушка для сборки без CGO func MatMulTransposeABLAS(a, b *Tensor) (*Tensor, error) { return MatMulTransposeA(a, b) } -// VectorAddBLAS - заглушка для сборки без CGO func VectorAddBLAS(alpha float64, x, y []float64) { for i := range x { y[i] += alpha * x[i] } } -// VectorScaleBLAS - заглушка для сборки без CGO func VectorScaleBLAS(alpha float64, x []float64) { for i := range x { x[i] *= alpha } } -// DotProductBLAS - заглушка для сборки без CGO func DotProductBLAS(x, y []float64) float64 { sum := 0.0 for i := range x { @@ -46,24 +39,34 @@ func DotProductBLAS(x, y []float64) float64 { return sum } -// MatrixVectorMultiplyBLAS - заглушка для сборки без CGO func MatrixVectorMultiplyBLAS(alpha float64, a *Tensor, x []float64, beta float64, y []float64) error { + if len(a.Shape) != 2 { + return fmt.Errorf("матрица должна быть 2D") + } m := a.Shape[0] n := a.Shape[1] - - // y = beta*y + if len(x) < n || len(y) < m { + return fmt.Errorf("неверная длина векторов: x=%d y=%d, нужно x>=%d y>=%d", len(x), len(y), n, m) + } for i := 0; i < m; i++ { y[i] *= beta } - - // y += alpha * A * x - for i := 0; i < m; i++ { - sum := 0.0 - for j := 0; j < n; j++ { - sum += a.Data[i*n+j] * x[j] + if a.DType == Float32 { + for i := 0; i < m; i++ { + var sum float32 + for j := 0; j < n; j++ { + sum += a.Data32[i*n+j] * float32(x[j]) + } + y[i] += alpha * float64(sum) + } + } else { + for i := 0; i < m; i++ { + sum := 0.0 + for j := 0; j < n; j++ { + sum += a.Data[i*n+j] * x[j] + } + y[i] += alpha * sum } - y[i] += alpha * sum } - return nil } diff --git a/pkg/tensor/dtype.go b/pkg/tensor/dtype.go new file mode 100644 index 0000000..76e72aa --- /dev/null +++ b/pkg/tensor/dtype.go @@ -0,0 +1,58 @@ +package tensor + +import "sync/atomic" + +// DType represents the data type of the tensor elements. +type DType uint8 + +const ( + // Float64 is the default double-precision floating point type (8 bytes). + Float64 DType = iota + // Float32 is single-precision floating point type (4 bytes). + Float32 +) + +var ( + // defaultDType stores the default data type for new tensors. + // Accessed atomically to ensure thread safety. + defaultDType atomic.Int32 // Stores DType cast to int32 +) + +func init() { + // Initialize default to Float64 explicitly (though 0 is Float64) + SetDefaultDType(Float64) +} + +// SetDefaultDType sets the default data type for new tensors. +func SetDefaultDType(dt DType) { + defaultDType.Store(int32(dt)) +} + +// GetDefaultDType returns the current default data type. +func GetDefaultDType() DType { + return DType(defaultDType.Load()) +} + +// String returns the string representation of the DType. +func (dt DType) String() string { + switch dt { + case Float64: + return "Float64" + case Float32: + return "Float32" + default: + return "Unknown" + } +} + +// Size returns the size in bytes of a single element of DType. +func (dt DType) Size() int { + switch dt { + case Float64: + return 8 + case Float32: + return 4 + default: + return 0 + } +} diff --git a/pkg/tensor/dtype_test.go b/pkg/tensor/dtype_test.go new file mode 100644 index 0000000..ecaa80e --- /dev/null +++ b/pkg/tensor/dtype_test.go @@ -0,0 +1,139 @@ +package tensor + +import ( + "math" + "testing" +) + +func TestFloat32HalfMemory(t *testing.T) { + shape := []int{100, 200} + SetDefaultDType(Float64) + t64 := Zeros(shape...) + bytes64 := t64.DataLen() * 8 + SetDefaultDType(Float32) + t32 := Zeros(shape...) + bytes32 := t32.DataLen() * 4 + if bytes32*2 != bytes64 { + t.Fatalf("Float32 should use half memory: Float64=%d bytes Float32=%d bytes", bytes64, bytes32) + } + SetDefaultDType(Float64) +} + +func TestFloat32ZerosOnesRandn(t *testing.T) { + defer SetDefaultDType(GetDefaultDType()) + SetDefaultDType(Float32) + + z := Zeros(2, 3) + if z.DType != Float32 || z.Data32 == nil || len(z.Data32) != 6 { + t.Fatalf("Zeros Float32: DType=%v len(Data32)=%v", z.DType, len(z.Data32)) + } + for i := range z.Data32 { + if z.Data32[i] != 0 { + t.Errorf("Zeros Data32[%d]=%v", i, z.Data32[i]) + } + } + + o := Ones(2, 3) + if o.DType != Float32 || len(o.Data32) != 6 { + t.Fatalf("Ones Float32") + } + for i := range o.Data32 { + if o.Data32[i] != 1.0 { + t.Errorf("Ones Data32[%d]=%v", i, o.Data32[i]) + } + } + + r := Randn([]int{2, 3}, 42) + if r.DType != Float32 || len(r.Data32) != 6 { + t.Fatalf("Randn Float32") + } +} + +func TestFloat32AddMulMatMul(t *testing.T) { + defer SetDefaultDType(GetDefaultDType()) + SetDefaultDType(Float32) + + a := Zeros(2, 3) + b := Zeros(2, 3) + for i := range a.Data32 { + a.Data32[i] = float32(i + 1) + b.Data32[i] = float32(i + 2) + } + sum, err := Add(a, b) + if err != nil { + t.Fatal(err) + } + if sum.DType != Float32 { + t.Fatalf("Add DType=%v", sum.DType) + } + for i := range sum.Data32 { + want := float32(i+1) + float32(i+2) + if math.Abs(float64(sum.Data32[i]-want)) > 1e-5 { + t.Errorf("Add Data32[%d]=%v want %v", i, sum.Data32[i], want) + } + } + + mul, err := Mul(a, b) + if err != nil { + t.Fatal(err) + } + if mul.DType != Float32 { + t.Fatalf("Mul DType=%v", mul.DType) + } + + am := Randn([]int{4, 8}, 1) + bm := Randn([]int{8, 2}, 2) + cm, err := MatMul(am, bm) + if err != nil { + t.Fatal(err) + } + if cm.DType != Float32 || cm.Shape[0] != 4 || cm.Shape[1] != 2 { + t.Fatalf("MatMul Float32 shape=%v", cm.Shape) + } +} + +func TestFloat32ModelTrainStep(t *testing.T) { + defer SetDefaultDType(GetDefaultDType()) + SetDefaultDType(Float32) + + x := Randn([]int{32, 64}, 10) + w := Randn([]int{64, 128}, 20) + y, err := MatMul(x, w) + if err != nil { + t.Fatal(err) + } + if y.DType != Float32 || y.Shape[0] != 32 || y.Shape[1] != 128 { + t.Fatalf("forward shape=%v", y.Shape) + } + bias := Ones(128) + for i := 0; i < 32; i++ { + for j := 0; j < 128; j++ { + y.Data32[i*128+j] += bias.Data32[j] + } + } + relu := Apply(y, func(f float64) float64 { + if f < 0 { + return 0 + } + return f + }) + if relu.DType != Float32 || relu.DataLen() != 32*128 { + t.Fatalf("relu DType=%v len=%v", relu.DType, relu.DataLen()) + } +} + +func BenchmarkFloat32VsFloat64Memory(b *testing.B) { + shape := []int{128, 256} + b.Run("Float64", func(b *testing.B) { + SetDefaultDType(Float64) + for i := 0; i < b.N; i++ { + _ = Zeros(shape...) + } + }) + b.Run("Float32", func(b *testing.B) { + SetDefaultDType(Float32) + for i := 0; i < b.N; i++ { + _ = Zeros(shape...) + } + }) +} diff --git a/pkg/tensor/graph/node.go b/pkg/tensor/graph/node.go index a7bf63c..acda60f 100644 --- a/pkg/tensor/graph/node.go +++ b/pkg/tensor/graph/node.go @@ -25,7 +25,7 @@ type Operation interface { func NewNode(value *tensor.Tensor, parents []*Node, op Operation) *Node { return &Node{ Value: value, - Grad: tensor.Zeros(value.Shape...), + Grad: value.ZeroGrad(), Parents: parents, Operation: op, } diff --git a/pkg/tensor/init.go b/pkg/tensor/init.go index a8c4b78..6072b85 100644 --- a/pkg/tensor/init.go +++ b/pkg/tensor/init.go @@ -8,33 +8,52 @@ import ( // Используется для инициализации градиентов и промежуточных результатов. func Zeros(shape ...int) *Tensor { size := calculateSize(shape) - data := make([]float64, size) strides := calculateStrides(shape) - - return &Tensor{ - Data: data, + dtype := GetDefaultDType() + + t := &Tensor{ Shape: shape, Strides: strides, + DType: dtype, + } + + if dtype == Float32 { + t.Data32 = make([]float32, size) + } else { + t.Data = make([]float64, size) } + + return t } // Ones создаёт тензор заполненный единицами с указанной формой. // Используется для инициализации bias и масок. func Ones(shape ...int) *Tensor { size := calculateSize(shape) - data := make([]float64, size) - - for i := range data { - data[i] = 1.0 - } - strides := calculateStrides(shape) - - return &Tensor{ - Data: data, + dtype := GetDefaultDType() + + t := &Tensor{ Shape: shape, Strides: strides, + DType: dtype, + } + + if dtype == Float32 { + data := make([]float32, size) + for i := range data { + data[i] = 1.0 + } + t.Data32 = data + } else { + data := make([]float64, size) + for i := range data { + data[i] = 1.0 + } + t.Data = data } + + return t } // Randn создаёт тензор с случайными значениями из нормального распределения N(0, 1). @@ -42,21 +61,32 @@ func Ones(shape ...int) *Tensor { // Используется для инициализации весов нейронных сетей. func Randn(shape []int, seed int64) *Tensor { size := calculateSize(shape) - data := make([]float64, size) - - rng := rand.New(rand.NewSource(seed)) - - for i := range data { - data[i] = rng.NormFloat64() - } - strides := calculateStrides(shape) - - return &Tensor{ - Data: data, + dtype := GetDefaultDType() + + t := &Tensor{ Shape: shape, Strides: strides, + DType: dtype, } + + rng := rand.New(rand.NewSource(seed)) + + if dtype == Float32 { + data := make([]float32, size) + for i := range data { + data[i] = float32(rng.NormFloat64()) + } + t.Data32 = data + } else { + data := make([]float64, size) + for i := range data { + data[i] = rng.NormFloat64() + } + t.Data = data + } + + return t } // calculateSize вычисляет общее количество элементов в тензоре по его форме. @@ -76,7 +106,7 @@ func calculateStrides(shape []int) []int { if len(shape) == 0 { return []int{} } - + strides := make([]int, len(shape)) stride := 1 for i := len(shape) - 1; i >= 0; i-- { diff --git a/pkg/tensor/matmul.go b/pkg/tensor/matmul.go index 40e0b2e..7027d79 100644 --- a/pkg/tensor/matmul.go +++ b/pkg/tensor/matmul.go @@ -2,8 +2,6 @@ package tensor import ( "fmt" - "runtime" - "sync" ) // Размер блока для блочного умножения матриц (оптимален для кеша L1) @@ -22,54 +20,51 @@ const ( ) // MatMul выполняет умножение матриц (тензоров 2D). -// Для матриц A[m,n] и B[n,p] возвращает C[m,p], где C = A * B -// Использует оптимизированный алгоритм с блочным умножением для лучшей локальности кеша. -// Автоматически выбирает наилучшую реализацию (BLAS, SIMD, блочное умножение). func MatMul(a, b *Tensor) (*Tensor, error) { - // Проверка размерностей if len(a.Shape) != 2 || len(b.Shape) != 2 { return nil, fmt.Errorf("умножение матриц требует 2D тензоры, получены %dD и %dD", len(a.Shape), len(b.Shape)) } + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают: %v != %v", a.DType, b.DType) + } - m := a.Shape[0] // строки A - n := a.Shape[1] // столбцы A = строки B - p := b.Shape[1] // столбцы B - + m := a.Shape[0] + n := a.Shape[1] + p := b.Shape[1] if n != b.Shape[0] { return nil, fmt.Errorf("несовместимые формы для умножения матриц: [%d,%d] и [%d,%d]", m, n, b.Shape[0], p) } - // Для очень больших матриц используем BLAS (если доступен) + if a.DType == Float32 { + result := &Tensor{ + Data32: make([]float32, m*p), + Shape: []int{m, p}, + Strides: []int{p, 1}, + DType: Float32, + } + if m >= 32 || n >= 32 || p >= 32 { + matmulV2Float32(a.Data32, b.Data32, result.Data32, m, n, p) + } else { + matmulOptimizedFloat32(a.Data32, b.Data32, result.Data32, m, n, p) + } + return result, nil + } + if BLASAvailable && (m >= BLASThreshold || p >= BLASThreshold || n >= BLASThreshold) { return MatMulBLAS(a, b) } - // Создаем результирующий тензор result := &Tensor{ Data: make([]float64, m*p), Shape: []int{m, p}, Strides: []int{p, 1}, + DType: Float64, } - - // Адаптивный выбор алгоритма на основе размера матриц - matrixSize := m * n * p - - if m >= ParallelThreshold || p >= ParallelThreshold { - // Параллельное блочное умножение для больших матриц - blockSize := chooseBlockSize(m, n, p) - matmulParallelBlockedAdaptive(a.Data, b.Data, result.Data, m, n, p, blockSize) - } else if m >= BlockSizeSmall || p >= BlockSizeSmall { - // Блочное умножение с SIMD для средних матриц - blockSize := chooseBlockSize(m, n, p) - matmulBlockedSIMD(a.Data, b.Data, result.Data, m, n, p, blockSize) - } else if matrixSize < 1000 { - // Простое оптимизированное умножение для очень малых матриц - matmulOptimized(a.Data, b.Data, result.Data, m, n, p) + if m >= 32 || n >= 32 || p >= 32 { + matmulV2(a.Data, b.Data, result.Data, m, n, p) } else { - // Блочное умножение для средних матриц - matmulBlocked(a.Data, b.Data, result.Data, m, n, p) + matmulOptimized(a.Data, b.Data, result.Data, m, n, p) } - return result, nil } @@ -98,30 +93,48 @@ func chooseBlockSize(m, n, p int) int { } // matmulOptimized - оптимизированное умножение для малых матриц -// Использует переупорядочивание циклов ikj для лучшей локальности кеша func matmulOptimized(a, b, c []float64, m, n, p int) { - // Обнуляем результат for i := range c { c[i] = 0.0 } - - // Цикл ikj - лучшая локальность кеша для row-major матриц for i := 0; i < m; i++ { iOffset := i * n cOffset := i * p for k := 0; k < n; k++ { aik := a[iOffset+k] bOffset := k * p - // Векторизация внутреннего цикла j := 0 - // Развертка x4 для векторизации for ; j <= p-MicroKernelSize; j += MicroKernelSize { c[cOffset+j] += aik * b[bOffset+j] c[cOffset+j+1] += aik * b[bOffset+j+1] c[cOffset+j+2] += aik * b[bOffset+j+2] c[cOffset+j+3] += aik * b[bOffset+j+3] } - // Остаток + for ; j < p; j++ { + c[cOffset+j] += aik * b[bOffset+j] + } + } + } +} + +func matmulOptimizedFloat32(a, b, c []float32, m, n, p int) { + for i := range c { + c[i] = 0 + } + const mk = 4 + for i := 0; i < m; i++ { + iOffset := i * n + cOffset := i * p + for k := 0; k < n; k++ { + aik := a[iOffset+k] + bOffset := k * p + j := 0 + for ; j <= p-mk; j += mk { + c[cOffset+j] += aik * b[bOffset+j] + c[cOffset+j+1] += aik * b[bOffset+j+1] + c[cOffset+j+2] += aik * b[bOffset+j+2] + c[cOffset+j+3] += aik * b[bOffset+j+3] + } for ; j < p; j++ { c[cOffset+j] += aik * b[bOffset+j] } @@ -178,44 +191,17 @@ func matmulMicroKernel(a, b, c []float64, iStart, iEnd, kStart, kEnd, jStart, jE } // matmulParallelBlocked - параллельное блочное умножение матриц -// Использует worker pool для параллелизма на уровне строк +// Использует ParallelFor для параллелизма на уровне строк. func matmulParallelBlocked(a, b, c []float64, m, n, p int) { // Инициализируем результат нулями for i := range c { c[i] = 0.0 } - // Определяем количество воркеров (по числу ядер) - numWorkers := runtime.NumCPU() - if numWorkers > m { - numWorkers = m - } - - // Разбиваем работу на блоки строк - blockRows := (m + numWorkers - 1) / numWorkers - if blockRows < BlockSize { - blockRows = BlockSize - } - - var wg sync.WaitGroup - - // Запускаем воркеры - for w := 0; w < numWorkers; w++ { - startRow := w * blockRows - if startRow >= m { - break - } - endRow := min((w+1)*blockRows, m) - - wg.Add(1) - go func(start, end int) { - defer wg.Done() - // Блочное умножение для диапазона строк - matmulBlockedRange(a, b, c, start, end, n, p) - }(startRow, endRow) - } - - wg.Wait() + // Параллелизуем по строкам через centralized scheduler + ParallelFor(m, BlockSize, func(startRow, endRow int) { + matmulBlockedRange(a, b, c, startRow, endRow, n, p) + }) } // matmulBlockedRange - блочное умножение для диапазона строк @@ -234,29 +220,46 @@ func matmulBlockedRange(a, b, c []float64, rowStart, rowEnd, n, p int) { } } -// MatMulTransposeB - умножение A * B^T (оптимизированная версия) -// Полезно для градиентов в обратном распространении -// Использует dot product для лучшей производительности func MatMulTransposeB(a, b *Tensor) (*Tensor, error) { if len(a.Shape) != 2 || len(b.Shape) != 2 { return nil, fmt.Errorf("умножение матриц требует 2D тензоры") } - + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных не совпадают: %v != %v", a.DType, b.DType) + } m := a.Shape[0] n := a.Shape[1] - p := b.Shape[0] // B транспонирована, поэтому берем строки B - + p := b.Shape[0] if n != b.Shape[1] { return nil, fmt.Errorf("несовместимые формы для умножения матриц с транспонированием") } - + if a.DType == Float32 { + result := &Tensor{ + Data32: make([]float32, m*p), + Shape: []int{m, p}, + Strides: []int{p, 1}, + DType: Float32, + } + for i := 0; i < m; i++ { + iOffsetA := i * n + iOffsetC := i * p + for j := 0; j < p; j++ { + jOffsetB := j * n + var sum float32 + for k := 0; k < n; k++ { + sum += a.Data32[iOffsetA+k] * b.Data32[jOffsetB+k] + } + result.Data32[iOffsetC+j] = sum + } + } + return result, nil + } result := &Tensor{ Data: make([]float64, m*p), Shape: []int{m, p}, Strides: []int{p, 1}, + DType: Float64, } - - // Оптимизированное умножение A * B^T через dot product for i := 0; i < m; i++ { iOffsetA := i * n iOffsetC := i * p @@ -264,7 +267,6 @@ func MatMulTransposeB(a, b *Tensor) (*Tensor, error) { jOffsetB := j * n sum := 0.0 k := 0 - // Развертка для векторизации for ; k <= n-MicroKernelSize; k += MicroKernelSize { sum += a.Data[iOffsetA+k] * b.Data[jOffsetB+k] sum += a.Data[iOffsetA+k+1] * b.Data[jOffsetB+k+1] @@ -277,37 +279,48 @@ func MatMulTransposeB(a, b *Tensor) (*Tensor, error) { result.Data[iOffsetC+j] = sum } } - return result, nil } -// MatMulTransposeA - умножение A^T * B (оптимизированная версия) func MatMulTransposeA(a, b *Tensor) (*Tensor, error) { if len(a.Shape) != 2 || len(b.Shape) != 2 { return nil, fmt.Errorf("умножение матриц требует 2D тензоры") } - - m := a.Shape[1] // A транспонирована + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных не совпадают: %v != %v", a.DType, b.DType) + } + m := a.Shape[1] n := a.Shape[0] p := b.Shape[1] - if n != b.Shape[0] { return nil, fmt.Errorf("несовместимые формы для умножения матриц с транспонированием") } - + if a.DType == Float32 { + result := &Tensor{ + Data32: make([]float32, m*p), + Shape: []int{m, p}, + Strides: []int{p, 1}, + DType: Float32, + } + for k := 0; k < n; k++ { + kOffsetA := k * m + kOffsetB := k * p + for i := 0; i < m; i++ { + aki := a.Data32[kOffsetA+i] + iOffsetC := i * p + for j := 0; j < p; j++ { + result.Data32[iOffsetC+j] += aki * b.Data32[kOffsetB+j] + } + } + } + return result, nil + } result := &Tensor{ Data: make([]float64, m*p), Shape: []int{m, p}, Strides: []int{p, 1}, + DType: Float64, } - - // Обнуляем результат - for i := range result.Data { - result.Data[i] = 0.0 - } - - // Оптимизированное умножение A^T * B - // Используем порядок циклов для лучшей локальности for k := 0; k < n; k++ { kOffsetA := k * m kOffsetB := k * p @@ -315,7 +328,6 @@ func MatMulTransposeA(a, b *Tensor) (*Tensor, error) { aki := a.Data[kOffsetA+i] iOffsetC := i * p j := 0 - // Векторизация for ; j <= p-MicroKernelSize; j += MicroKernelSize { result.Data[iOffsetC+j] += aki * b.Data[kOffsetB+j] result.Data[iOffsetC+j+1] += aki * b.Data[kOffsetB+j+1] @@ -327,7 +339,6 @@ func MatMulTransposeA(a, b *Tensor) (*Tensor, error) { } } } - return result, nil } @@ -362,42 +373,17 @@ func matmulBlockedSIMD(a, b, c []float64, m, n, p int, blockSize int) { } // matmulParallelBlockedAdaptive - параллельное блочное умножение с адаптивным размером блока +// Использует ParallelFor для распределения работы. func matmulParallelBlockedAdaptive(a, b, c []float64, m, n, p int, blockSize int) { // Инициализируем результат нулями for i := range c { c[i] = 0.0 } - // Определяем количество воркеров - numWorkers := runtime.NumCPU() - if numWorkers > m { - numWorkers = m - } - - // Разбиваем работу на блоки строк - blockRows := (m + numWorkers - 1) / numWorkers - if blockRows < blockSize { - blockRows = blockSize - } - - var wg sync.WaitGroup - - // Запускаем воркеры - for w := 0; w < numWorkers; w++ { - startRow := w * blockRows - if startRow >= m { - break - } - endRow := min((w+1)*blockRows, m) - - wg.Add(1) - go func(start, end int) { - defer wg.Done() - matmulBlockedRangeAdaptive(a, b, c, start, end, n, p, blockSize) - }(startRow, endRow) - } - - wg.Wait() + // Параллелизуем по строкам через centralized scheduler + ParallelFor(m, blockSize, func(startRow, endRow int) { + matmulBlockedRangeAdaptive(a, b, c, startRow, endRow, n, p, blockSize) + }) } // matmulBlockedRangeAdaptive - блочное умножение для диапазона строк с адаптивным размером блока diff --git a/pkg/tensor/matmul_v2.go b/pkg/tensor/matmul_v2.go new file mode 100644 index 0000000..1a52e9d --- /dev/null +++ b/pkg/tensor/matmul_v2.go @@ -0,0 +1,595 @@ +package tensor + +import ( + "sync" + "sync/atomic" +) + +// BLIS-style cache-blocked matrix multiplication v2. +// Uses panel packing + 4×4 micro-kernel for optimal cache utilization. + +// Cache hierarchy: block sizes 32/64 for L1; 4×4 micro-kernel. +const ( + mc = 64 // panel height A (L1), block size 64 + kc = 128 // panel depth (L2) + nc = 512 // panel width B (L3) + mr = 4 // micro-kernel rows (tile 4) + nr = 4 // micro-kernel cols (tile 4) + // BlockSize32 is used for small matrices in matmul.go (threshold 32) + BlockSize32 = 32 +) + +var ( + packedAPool = sync.Pool{ + New: func() any { return make([]float64, mc*kc) }, + } + packedBPool = sync.Pool{ + New: func() any { return make([]float64, kc*nc) }, + } +) + +type matMulWorkspace struct { + packedA [][]float64 + packedB []float64 + nextA atomic.Int32 +} + +const maxMatMulChunks = 64 + +func newMatMulWorkspace() *matMulWorkspace { + n := GetMaxWorkers() + if n < 1 { + n = 1 + } + if n < maxMatMulChunks { + n = maxMatMulChunks + } + ws := &matMulWorkspace{ + packedA: make([][]float64, n), + packedB: make([]float64, kc*nc), + } + for i := range ws.packedA { + ws.packedA[i] = make([]float64, mc*kc) + } + return ws +} + +var matMulWorkspacePool = sync.Pool{ + New: func() any { return newMatMulWorkspace() }, +} + +func getMatMulWorkspace() *matMulWorkspace { + return matMulWorkspacePool.Get().(*matMulWorkspace) +} + +func putMatMulWorkspace(ws *matMulWorkspace) { + ws.nextA.Store(0) + matMulWorkspacePool.Put(ws) +} + +func matmulV2(a, b, c []float64, m, n, p int) { + if len(c) < m*p || len(a) < m*n || len(b) < n*p { + return + } + for i := range c { + c[i] = 0 + } + ws := getMatMulWorkspace() + defer putMatMulWorkspace(ws) + + for jc := 0; jc < p; jc += nc { + jcEnd := min(jc+nc, p) + ncCur := jcEnd - jc + + for pc := 0; pc < n; pc += kc { + pcEnd := min(pc+kc, n) + kcCur := pcEnd - pc + + colsPadded := (ncCur + nr - 1) / nr * nr + neededB := kcCur * colsPadded + numChunks := (m + mc - 1) / mc + useWorkspaceA := neededB <= len(ws.packedB) && numChunks <= len(ws.packedA) + pB := ws.packedB + if !useWorkspaceA || neededB > len(pB) { + pB = getPackedB(kcCur, ncCur) + packB(b, pB, n, p, pc, pcEnd, jc, jcEnd) + ParallelFor(m, mc, func(icStart, icEnd int) { + if icEnd <= icStart { + return + } + pA := getPackedA(min(icEnd-icStart, mc), kcCur) + for ic := icStart; ic < icEnd; ic += mc { + icActualEnd := min(ic+mc, icEnd) + mcCur := icActualEnd - ic + packA(a, pA, n, ic, icActualEnd, pc, pcEnd) + gebp(pA, pB, c, p, ic, mcCur, ncCur, kcCur, jc) + } + putPackedA(pA) + }) + putPackedB(pB) + continue + } + pB = pB[:neededB] + packB(b, pB, n, p, pc, pcEnd, jc, jcEnd) + + ws.nextA.Store(0) + ParallelFor(m, mc, func(icStart, icEnd int) { + if icEnd <= icStart { + return + } + idx := int(ws.nextA.Add(1) - 1) + mcCur := min(icEnd-icStart, mc) + rowsPadded := (mcCur + mr - 1) / mr * mr + neededA := rowsPadded * kcCur + if neededA <= 0 { + return + } + pA := ws.packedA[idx%len(ws.packedA)][:neededA] + for ic := icStart; ic < icEnd; ic += mc { + icActualEnd := min(ic+mc, icEnd) + mcCur := icActualEnd - ic + packA(a, pA, n, ic, icActualEnd, pc, pcEnd) + gebp(pA, pB, c, p, ic, mcCur, ncCur, kcCur, jc) + } + }) + } + } +} + +// gebp computes C[ic:ic+mcCur, jc:jc+ncCur] += packedA × packedB +// using the 4×4 micro-kernel. +func gebp(packedA, packedB, c []float64, ldc, ic, mcCur, ncCur, kcCur, jc int) { + if len(packedA) == 0 || len(packedB) == 0 { + return + } + for jr := 0; jr < ncCur; jr += nr { // Loop 2 + nrCur := min(nr, ncCur-jr) + + for ir := 0; ir < mcCur; ir += mr { // Loop 1 + mrCur := min(mr, mcCur-ir) + + if mrCur == mr && nrCur == nr { + // Fast path: full 4×4 micro-kernel + microKernel4x4( + packedA[ir*kcCur:], + packedB[jr*kcCur:], + c, ldc, + ic+ir, jc+jr, kcCur, + ) + } else { + // Edge case: partial tile + microKernelGeneric( + packedA[ir*kcCur:], + packedB[jr*kcCur:], + c, ldc, + ic+ir, jc+jr, kcCur, + mrCur, nrCur, + ) + } + } + } +} + +// microKernel4x4 computes a 4×4 block of C using 16 scalar accumulators. +// packedA is in column-major layout: [mr] elements per k step. +// packedB is in row-major layout: [nr] elements per k step. +func microKernel4x4(packedA, packedB, c []float64, ldc, iBase, jBase, kLen int) { + if len(packedA) < mr*kLen || len(packedB) < nr*kLen { + return + } + var c00, c01, c02, c03 float64 + var c10, c11, c12, c13 float64 + var c20, c21, c22, c23 float64 + var c30, c31, c32, c33 float64 + + aOff := 0 + bOff := 0 + + for p := 0; p < kLen; p++ { + // Load mr=4 elements from packed A + a0 := packedA[aOff] + a1 := packedA[aOff+1] + a2 := packedA[aOff+2] + a3 := packedA[aOff+3] + + // Load nr=4 elements from packed B + b0 := packedB[bOff] + b1 := packedB[bOff+1] + b2 := packedB[bOff+2] + b3 := packedB[bOff+3] + + // Rank-1 update: outer product of a × b + c00 += a0 * b0 + c01 += a0 * b1 + c02 += a0 * b2 + c03 += a0 * b3 + + c10 += a1 * b0 + c11 += a1 * b1 + c12 += a1 * b2 + c13 += a1 * b3 + + c20 += a2 * b0 + c21 += a2 * b1 + c22 += a2 * b2 + c23 += a2 * b3 + + c30 += a3 * b0 + c31 += a3 * b1 + c32 += a3 * b2 + c33 += a3 * b3 + + aOff += mr + bOff += nr + } + + // Store accumulators back to C (accumulate, C was zero-initialized) + r0 := iBase*ldc + jBase + c[r0] += c00 + c[r0+1] += c01 + c[r0+2] += c02 + c[r0+3] += c03 + + r1 := r0 + ldc + c[r1] += c10 + c[r1+1] += c11 + c[r1+2] += c12 + c[r1+3] += c13 + + r2 := r1 + ldc + c[r2] += c20 + c[r2+1] += c21 + c[r2+2] += c22 + c[r2+3] += c23 + + r3 := r2 + ldc + c[r3] += c30 + c[r3+1] += c31 + c[r3+2] += c32 + c[r3+3] += c33 +} + +// microKernelGeneric handles edge-case tiles smaller than mr×nr. +func microKernelGeneric(packedA, packedB, c []float64, ldc, iBase, jBase, kLen, mrCur, nrCur int) { + for p := 0; p < kLen; p++ { + for i := 0; i < mrCur; i++ { + aVal := packedA[i+p*mr] + for j := 0; j < nrCur; j++ { + bVal := packedB[j+p*nr] + c[(iBase+i)*ldc+(jBase+j)] += aVal * bVal + } + } + } +} + +// packA packs a panel of A[rowStart:rowEnd, colStart:colEnd] into +// column-major layout optimized for the micro-kernel. +// +// Output layout: sequence of strips of height mr. +// Strip r (rows r*mr ... r*mr+mr-1): +// +// K=0: A[r*mr, 0] ... A[r*mr+mr-1, 0] +// K=1: A[r*mr, 1] ... A[r*mr+mr-1, 1] +// ... +func packA(a []float64, packed []float64, n, rowStart, rowEnd, colStart, colEnd int) { + rows := rowEnd - rowStart + cols := colEnd - colStart // kcCur + + idx := 0 + // Iterate over strips of height mr + for ir := 0; ir < rows; ir += mr { + // Minimum of mr or remaining rows + mrCur := mr + if ir+mr > rows { + mrCur = rows - ir + } + + // For each column k in the panel + for k := 0; k < cols; k++ { + // Copy column segment of height mrCur + baseA := (rowStart+ir)*n + (colStart + k) + for i := 0; i < mrCur; i++ { + packed[idx] = a[baseA+i*n] + idx++ + } + // Pad with zeros if strip is partial (mrCur < mr) + for i := mrCur; i < mr; i++ { + packed[idx] = 0 + idx++ + } + } + } +} + +// packB packs a panel of B[rowStart:rowEnd, colStart:colEnd] into +// row-major layout optimized for the micro-kernel. +// +// Output layout: sequence of strips of width nr. +// Strip c (cols c*nr ... c*nr+nr-1): +// +// K=0: B[0, c*nr] ... B[0, c*nr+nr-1] +// K=1: B[1, c*nr] ... B[1, c*nr+nr-1] +// ... +func packB(b []float64, packed []float64, n, p, rowStart, rowEnd, colStart, colEnd int) { + rows := rowEnd - rowStart // kcCur + cols := colEnd - colStart + + idx := 0 + // Iterate over strips of width nr + for jr := 0; jr < cols; jr += nr { + // Minimum of nr or remaining cols + nrCur := nr + if jr+nr > cols { + nrCur = cols - jr + } + + // For each row k in the panel + for k := 0; k < rows; k++ { + // Copy row segment of width nrCur + baseB := (rowStart+k)*p + (colStart + jr) + for j := 0; j < nrCur; j++ { + packed[idx] = b[baseB+j] + idx++ + } + // Pad with zeros if strip is partial (nrCur < nr) + for j := nrCur; j < nr; j++ { + packed[idx] = 0 + idx++ + } + } + } +} + +// Workspace management — get/put packed buffers from pools. + +func getPackedA(mcCur, kcCur int) []float64 { + // Calculate size with padding: we pack strips of height mr + rowsPadded := (mcCur + mr - 1) / mr * mr + needed := rowsPadded * kcCur + + if needed <= mc*kc { + buf := packedAPool.Get().([]float64) + if cap(buf) < needed { + // Should ensure capacity, but pool items are fixed size mc*kc. + // If needed > mc*kc (should not happen if mc is multiple of mr), we alloc. + // But wait, mc=64, mr=4. mc is multiple. + // If mcCur <= mc, then rowsPadded <= mc. + // So needed <= mc*kc. + // Just reslice buf to needed. + return buf[:needed] + } + // If buf is nil or we just got it, ensure len is needed + return buf[:needed] + } + return make([]float64, needed) +} + +func putPackedA(buf []float64) { + // Only put back if capacity is standard + if cap(buf) == mc*kc { + packedAPool.Put(buf) + } +} + +func getPackedB(kcCur, ncCur int) []float64 { + // Calculate size with padding: we pack strips of width nr + colsPadded := (ncCur + nr - 1) / nr * nr + needed := kcCur * colsPadded + + if needed <= kc*nc { + buf := packedBPool.Get().([]float64) + // Ensure capacity check properly (standard pool items are kc*nc) + if cap(buf) < needed { + return buf[:needed] // Should not happen for standard sizes + } + return buf[:needed] + } + return make([]float64, needed) +} + +func putPackedB(buf []float64) { + if cap(buf) == kc*nc { + packedBPool.Put(buf) + } +} + +var ( + packedAPool32 = sync.Pool{ + New: func() any { return make([]float32, mc*kc) }, + } + packedBPool32 = sync.Pool{ + New: func() any { return make([]float32, kc*nc) }, + } +) + +func matmulV2Float32(a, b, c []float32, m, n, p int) { + for i := range c { + c[i] = 0 + } + for jc := 0; jc < p; jc += nc { + jcEnd := min(jc+nc, p) + ncCur := jcEnd - jc + for pc := 0; pc < n; pc += kc { + pcEnd := min(pc+kc, n) + kcCur := pcEnd - pc + pB := getPackedB32(kcCur, ncCur) + packB32(b, pB, n, p, pc, pcEnd, jc, jcEnd) + ParallelFor(m, mc, func(icStart, icEnd int) { + pA := getPackedA32(min(icEnd-icStart, mc), kcCur) + for ic := icStart; ic < icEnd; ic += mc { + icActualEnd := min(ic+mc, icEnd) + mcCur := icActualEnd - ic + packA32(a, pA, n, ic, icActualEnd, pc, pcEnd) + gebp32(pA, pB, c, p, ic, mcCur, ncCur, kcCur, jc) + } + putPackedA32(pA) + }) + putPackedB32(pB) + } + } +} + +func gebp32(packedA, packedB, c []float32, ldc, ic, mcCur, ncCur, kcCur, jc int) { + for jr := 0; jr < ncCur; jr += nr { + nrCur := min(nr, ncCur-jr) + for ir := 0; ir < mcCur; ir += mr { + mrCur := min(mr, mcCur-ir) + if mrCur == mr && nrCur == nr { + microKernel4x4Float32( + packedA[ir*kcCur:], packedB[jr*kcCur:], c, ldc, + ic+ir, jc+jr, kcCur, + ) + } else { + microKernelGenericFloat32( + packedA[ir*kcCur:], packedB[jr*kcCur:], c, ldc, + ic+ir, jc+jr, kcCur, mrCur, nrCur, + ) + } + } + } +} + +func microKernel4x4Float32(packedA, packedB, c []float32, ldc, iBase, jBase, kLen int) { + var c00, c01, c02, c03 float32 + var c10, c11, c12, c13 float32 + var c20, c21, c22, c23 float32 + var c30, c31, c32, c33 float32 + aOff, bOff := 0, 0 + for p := 0; p < kLen; p++ { + a0 := packedA[aOff] + a1 := packedA[aOff+1] + a2 := packedA[aOff+2] + a3 := packedA[aOff+3] + b0 := packedB[bOff] + b1 := packedB[bOff+1] + b2 := packedB[bOff+2] + b3 := packedB[bOff+3] + c00 += a0 * b0 + c01 += a0 * b1 + c02 += a0 * b2 + c03 += a0 * b3 + c10 += a1 * b0 + c11 += a1 * b1 + c12 += a1 * b2 + c13 += a1 * b3 + c20 += a2 * b0 + c21 += a2 * b1 + c22 += a2 * b2 + c23 += a2 * b3 + c30 += a3 * b0 + c31 += a3 * b1 + c32 += a3 * b2 + c33 += a3 * b3 + aOff += mr + bOff += nr + } + r0 := iBase*ldc + jBase + c[r0] += c00 + c[r0+1] += c01 + c[r0+2] += c02 + c[r0+3] += c03 + r1 := r0 + ldc + c[r1] += c10 + c[r1+1] += c11 + c[r1+2] += c12 + c[r1+3] += c13 + r2 := r1 + ldc + c[r2] += c20 + c[r2+1] += c21 + c[r2+2] += c22 + c[r2+3] += c23 + r3 := r2 + ldc + c[r3] += c30 + c[r3+1] += c31 + c[r3+2] += c32 + c[r3+3] += c33 +} + +func microKernelGenericFloat32(packedA, packedB, c []float32, ldc, iBase, jBase, kLen, mrCur, nrCur int) { + for p := 0; p < kLen; p++ { + for i := 0; i < mrCur; i++ { + aVal := packedA[i+p*mr] + for j := 0; j < nrCur; j++ { + c[(iBase+i)*ldc+(jBase+j)] += aVal * packedB[j+p*nr] + } + } + } +} + +func packA32(a []float32, packed []float32, n, rowStart, rowEnd, colStart, colEnd int) { + rows := rowEnd - rowStart + cols := colEnd - colStart + idx := 0 + for ir := 0; ir < rows; ir += mr { + mrCur := mr + if ir+mr > rows { + mrCur = rows - ir + } + for k := 0; k < cols; k++ { + baseA := (rowStart+ir)*n + (colStart + k) + for i := 0; i < mrCur; i++ { + packed[idx] = a[baseA+i*n] + idx++ + } + for i := mrCur; i < mr; i++ { + packed[idx] = 0 + idx++ + } + } + } +} + +func packB32(b []float32, packed []float32, n, p, rowStart, rowEnd, colStart, colEnd int) { + rows := rowEnd - rowStart + cols := colEnd - colStart + idx := 0 + for jr := 0; jr < cols; jr += nr { + nrCur := nr + if jr+nr > cols { + nrCur = cols - jr + } + for k := 0; k < rows; k++ { + baseB := (rowStart+k)*p + (colStart + jr) + for j := 0; j < nrCur; j++ { + packed[idx] = b[baseB+j] + idx++ + } + for j := nrCur; j < nr; j++ { + packed[idx] = 0 + idx++ + } + } + } +} + +func getPackedA32(mcCur, kcCur int) []float32 { + rowsPadded := (mcCur + mr - 1) / mr * mr + needed := rowsPadded * kcCur + if needed <= mc*kc { + buf := packedAPool32.Get().([]float32) + return buf[:needed] + } + return make([]float32, needed) +} + +func putPackedA32(buf []float32) { + if cap(buf) == mc*kc { + packedAPool32.Put(buf) + } +} + +func getPackedB32(kcCur, ncCur int) []float32 { + colsPadded := (ncCur + nr - 1) / nr * nr + needed := kcCur * colsPadded + if needed <= kc*nc { + buf := packedBPool32.Get().([]float32) + return buf[:needed] + } + return make([]float32, needed) +} + +func putPackedB32(buf []float32) { + if cap(buf) == kc*nc { + packedBPool32.Put(buf) + } +} diff --git a/pkg/tensor/matmul_v2_test.go b/pkg/tensor/matmul_v2_test.go new file mode 100644 index 0000000..0ae1789 --- /dev/null +++ b/pkg/tensor/matmul_v2_test.go @@ -0,0 +1,264 @@ +package tensor + +import ( + "fmt" + "math" + "math/rand" + "testing" + "time" +) + +func TestMatMulV2Correctness(t *testing.T) { + sizes := []struct{ m, n, p int }{ + {1, 1, 1}, + {16, 16, 16}, + {32, 32, 32}, + {63, 63, 63}, + {64, 64, 64}, + {65, 65, 65}, + {128, 128, 128}, + {129, 64, 129}, + {100, 200, 300}, + } + + for _, s := range sizes { + t.Run(fmt.Sprintf("%dx%dx%d", s.m, s.n, s.p), func(t *testing.T) { + a := make([]float64, s.m*s.n) + b := make([]float64, s.n*s.p) + cRef := make([]float64, s.m*s.p) + cV2 := make([]float64, s.m*s.p) + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := range a { + a[i] = rng.Float64() + } + for i := range b { + b[i] = rng.Float64() + } + + // Reference: simple loops + for i := 0; i < s.m; i++ { + for k := 0; k < s.n; k++ { + valA := a[i*s.n+k] + for j := 0; j < s.p; j++ { + cRef[i*s.p+j] += valA * b[k*s.p+j] + } + } + } + + matmulV2(a, b, cV2, s.m, s.n, s.p) + + for i := range cRef { + if math.Abs(cRef[i]-cV2[i]) > 1e-9 { + t.Fatalf("mismatch at %d: ref=%v v2=%v", i, cRef[i], cV2[i]) + } + } + }) + } +} + +func BenchmarkMatMulV2_1024(b *testing.B) { + n := 1024 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + + for i := range a { + a[i] = 0.5 + } + for i := range bm { + bm[i] = 0.5 + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + matmulV2(a, bm, c, n, n, n) + } +} + +func BenchmarkMatMulBaseline_1024(b *testing.B) { + n := 1024 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + + for i := range a { + a[i] = 0.5 + } + for i := range bm { + bm[i] = 0.5 + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Replica of old adaptive logic + matmulBaseline(a, bm, c, n, n, n, 128) + } +} + +func BenchmarkMatMulV2_64(b *testing.B) { + n := 64 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + matmulV2(a, bm, c, n, n, n) + } +} + +func BenchmarkMatMulBaseline_64(b *testing.B) { + n := 64 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + matmulBaseline(a, bm, c, n, n, n, 32) + } +} + +func BenchmarkMatMulV2_32(b *testing.B) { + n := 32 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + matmulV2(a, bm, c, n, n, n) + } +} + +func BenchmarkMatMulBaseline_32(b *testing.B) { + n := 32 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + matmulBaseline(a, bm, c, n, n, n, 32) + } +} + +// ----------------------------------------------------------------------------- +// Baseline Implementation (Replica of v1 logic for accurate comparison) +// ----------------------------------------------------------------------------- + +func matmulBaseline(a, b, c []float64, m, n, p int, blockSize int) { + for i := range c { + c[i] = 0.0 + } + // Simplified adaptive parallel logic using ParallelFor + ParallelFor(m, blockSize, func(start, end int) { + for kk := 0; kk < n; kk += blockSize { + kEnd := minWait(kk+blockSize, n) + for ii := start; ii < end; ii += blockSize { + iEnd := minWait(ii+blockSize, end) + for jj := 0; jj < p; jj += blockSize { + jEnd := minWait(jj+blockSize, p) + matmulSIMDKernelBaseline(a, b, c, 0, n, p, ii, iEnd, kk, kEnd, jj, jEnd) + } + } + } + }) +} + +func matmulSIMDKernelBaseline(a, b, c []float64, m, n, p, iStart, iEnd, kStart, kEnd, jStart, jEnd int) { + // Calling the existing kernel from package tensor + MatMulSIMDKernel(a, b, c, m, n, p, iStart, iEnd, kStart, kEnd, jStart, jEnd) +} + +func minWait(a, b int) int { + if a < b { + return a + } + return b +} + +func TestMatMulV2AtLeast40PercentFasterThanBaseline1024(t *testing.T) { + if testing.Short() { + t.Skip("skipping speed comparison in short mode") + } + n := 1024 + a := make([]float64, n*n) + bm := make([]float64, n*n) + cV2 := make([]float64, n*n) + cBaseline := make([]float64, n*n) + for i := range a { + a[i] = 0.5 + bm[i] = 0.5 + } + const iterations = 5 + var sumV2, sumBaseline int64 + for i := 0; i < iterations; i++ { + start := time.Now() + matmulV2(a, bm, cV2, n, n, n) + sumV2 += time.Since(start).Nanoseconds() + } + for i := 0; i < iterations; i++ { + start := time.Now() + matmulBaseline(a, bm, cBaseline, n, n, n, 128) + sumBaseline += time.Since(start).Nanoseconds() + } + ratio := float64(sumBaseline) / float64(sumV2) + if ratio < 1.4 { + t.Errorf("expected matmulV2 at least 40%% faster than baseline (ratio baseline/v2 >= 1.4), got ratio=%.2f", ratio) + } +} + +func BenchmarkMatMulAPI_1024(b *testing.B) { + SetDefaultDType(Float64) + aT := Randn([]int{1024, 1024}, 1) + bT := Randn([]int{1024, 1024}, 2) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = MatMul(aT, bT) + } +} + +func BenchmarkMatMulPooled_1024(b *testing.B) { + SetDefaultDType(Float64) + aT := Randn([]int{1024, 1024}, 1) + bT := Randn([]int{1024, 1024}, 2) + b.ResetTimer() + for i := 0; i < b.N; i++ { + res, _ := MatMulPooled(aT, bT) + PutTensor(res) + } +} + +func BenchmarkMatMulAllocComparison(b *testing.B) { + SetDefaultDType(Float64) + aT := Randn([]int{256, 256}, 1) + bT := Randn([]int{256, 256}, 2) + b.Run("MatMul_alloc_new", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = MatMul(aT, bT) + } + }) + b.Run("MatMulPooled_reuse_result", func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, _ := MatMulPooled(aT, bT) + PutTensor(res) + } + }) +} + +func BenchmarkMatMulV2AllocBeforeAfter(b *testing.B) { + n := 1024 + a := make([]float64, n*n) + bm := make([]float64, n*n) + c := make([]float64, n*n) + for i := range a { + a[i], bm[i] = 0.5, 0.5 + } + b.Run("matmulV2_1024_alloc_after_workspace", func(b *testing.B) { + for i := 0; i < b.N; i++ { + matmulV2(a, bm, c, n, n, n) + } + }) +} diff --git a/pkg/tensor/ops.go b/pkg/tensor/ops.go index de69b47..e9ba2be 100644 --- a/pkg/tensor/ops.go +++ b/pkg/tensor/ops.go @@ -10,88 +10,123 @@ const ( UnrollFactor = 8 // Развертывание циклов для SIMD ) -// Add выполняет поэлементное сложение двух тензоров с векторизацией. -// Возвращает новый тензор c, где c[i] = a[i] + b[i] -// Тензоры должны иметь одинаковую форму (shape). func Add(a, b *Tensor) (*Tensor, error) { if !shapesEqual(a.Shape, b.Shape) { return nil, fmt.Errorf("формы тензоров должны совпадать: %v != %v", a.Shape, b.Shape) } - + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают: %v != %v", a.DType, b.DType) + } + n := a.DataLen() result := &Tensor{ - Data: make([]float64, len(a.Data)), Shape: append([]int{}, a.Shape...), Strides: append([]int{}, a.Strides...), + DType: a.DType, + } + if a.DType == Float32 { + result.Data32 = make([]float32, n) + for i := 0; i < n; i++ { + result.Data32[i] = a.Data32[i] + b.Data32[i] + } + } else { + result.Data = make([]float64, n) + addVectorized(a.Data, b.Data, result.Data) } - - addVectorized(a.Data, b.Data, result.Data) return result, nil } -// Mul выполняет поэлементное умножение двух тензоров (операция Адамара) с векторизацией. -// Возвращает новый тензор c, где c[i] = a[i] * b[i] -// Тензоры должны иметь одинаковую форму (shape). func Mul(a, b *Tensor) (*Tensor, error) { if !shapesEqual(a.Shape, b.Shape) { return nil, fmt.Errorf("формы тензоров должны совпадать: %v != %v", a.Shape, b.Shape) } - + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают: %v != %v", a.DType, b.DType) + } + n := a.DataLen() result := &Tensor{ - Data: make([]float64, len(a.Data)), Shape: append([]int{}, a.Shape...), Strides: append([]int{}, a.Strides...), + DType: a.DType, + } + if a.DType == Float32 { + result.Data32 = make([]float32, n) + for i := 0; i < n; i++ { + result.Data32[i] = a.Data32[i] * b.Data32[i] + } + } else { + result.Data = make([]float64, n) + mulVectorized(a.Data, b.Data, result.Data) } - - mulVectorized(a.Data, b.Data, result.Data) return result, nil } -// Sub выполняет поэлементное вычитание: a - b func Sub(a, b *Tensor) (*Tensor, error) { if !shapesEqual(a.Shape, b.Shape) { return nil, fmt.Errorf("формы тензоров должны совпадать: %v != %v", a.Shape, b.Shape) } - + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают: %v != %v", a.DType, b.DType) + } + n := a.DataLen() result := &Tensor{ - Data: make([]float64, len(a.Data)), Shape: append([]int{}, a.Shape...), Strides: append([]int{}, a.Strides...), + DType: a.DType, + } + if a.DType == Float32 { + result.Data32 = make([]float32, n) + for i := 0; i < n; i++ { + result.Data32[i] = a.Data32[i] - b.Data32[i] + } + } else { + result.Data = make([]float64, n) + subVectorized(a.Data, b.Data, result.Data) } - - subVectorized(a.Data, b.Data, result.Data) return result, nil } -// Div выполняет поэлементное деление: a / b func Div(a, b *Tensor) (*Tensor, error) { if !shapesEqual(a.Shape, b.Shape) { return nil, fmt.Errorf("формы тензоров должны совпадать: %v != %v", a.Shape, b.Shape) } - + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают: %v != %v", a.DType, b.DType) + } + n := a.DataLen() result := &Tensor{ - Data: make([]float64, len(a.Data)), Shape: append([]int{}, a.Shape...), Strides: append([]int{}, a.Strides...), + DType: a.DType, + } + if a.DType == Float32 { + result.Data32 = make([]float32, n) + for i := 0; i < n; i++ { + result.Data32[i] = a.Data32[i] / b.Data32[i] + } + } else { + result.Data = make([]float64, n) + divVectorized(a.Data, b.Data, result.Data) } - - divVectorized(a.Data, b.Data, result.Data) return result, nil } -// Apply применяет функцию f к каждому элементу тензора. -// Возвращает новый тензор с результатами применения функции. -// Используется для функций активации (ReLU, sigmoid, tanh). func Apply(a *Tensor, f func(float64) float64) *Tensor { result := &Tensor{ - Data: make([]float64, len(a.Data)), Shape: append([]int{}, a.Shape...), Strides: append([]int{}, a.Strides...), + DType: a.DType, } - - for i := range a.Data { - result.Data[i] = f(a.Data[i]) + if a.DType == Float32 { + result.Data32 = make([]float32, len(a.Data32)) + for i := range a.Data32 { + result.Data32[i] = float32(f(float64(a.Data32[i]))) + } + } else { + result.Data = make([]float64, len(a.Data)) + for i := range a.Data { + result.Data[i] = f(a.Data[i]) + } } - return result } @@ -226,61 +261,76 @@ func Reshape(a *Tensor, newShape []int) (*Tensor, error) { if newSize != oldSize { return nil, fmt.Errorf("невозможно изменить форму тензора размера %d на форму %v (размер %d)", oldSize, newShape, newSize) } - - // Вычисляем новые strides newStrides := make([]int, len(newShape)) stride := 1 for i := len(newShape) - 1; i >= 0; i-- { newStrides[i] = stride stride *= newShape[i] } - - return &Tensor{ - Data: a.Data, // Используем те же данные + out := &Tensor{ Shape: append([]int{}, newShape...), Strides: newStrides, - }, nil + DType: a.DType, + } + if a.DType == Float32 { + out.Data32 = a.Data32 + } else { + out.Data = a.Data + } + return out, nil } -// Transpose транспонирует двумерный тензор (матрицу). -// Меняет местами оси: строки становятся столбцами и наоборот. -// Для матрицы [m, n] возвращает матрицу [n, m]. func Transpose(a *Tensor) (*Tensor, error) { if len(a.Shape) != 2 { return nil, fmt.Errorf("транспонирование требует 2D тензор, получен %dD", len(a.Shape)) } - rows := a.Shape[0] cols := a.Shape[1] - result := &Tensor{ - Data: make([]float64, len(a.Data)), Shape: []int{cols, rows}, Strides: []int{rows, 1}, + DType: a.DType, } - - // Транспонируем данные - for i := 0; i < rows; i++ { - for j := 0; j < cols; j++ { - result.Data[j*rows+i] = a.Data[i*cols+j] + if a.DType == Float32 { + result.Data32 = make([]float32, rows*cols) + for i := 0; i < rows; i++ { + for j := 0; j < cols; j++ { + result.Data32[j*rows+i] = a.Data32[i*cols+j] + } + } + } else { + result.Data = make([]float64, rows*cols) + for i := 0; i < rows; i++ { + for j := 0; j < cols; j++ { + result.Data[j*rows+i] = a.Data[i*cols+j] + } } } - return result, nil } -// Sum вычисляет сумму всех элементов тензора. -// Возвращает скаляр (одноэлементный тензор). func Sum(a *Tensor) *Tensor { + if a.DType == Float32 { + var s float32 + for _, v := range a.Data32 { + s += v + } + return &Tensor{ + Data32: []float32{s}, + Shape: []int{1}, + Strides: []int{1}, + DType: Float32, + } + } sum := 0.0 for _, val := range a.Data { sum += val } - return &Tensor{ Data: []float64{sum}, Shape: []int{1}, Strides: []int{1}, + DType: Float64, } } diff --git a/pkg/tensor/ops/ops.go b/pkg/tensor/ops/ops.go index 79035fd..dc410f1 100644 --- a/pkg/tensor/ops/ops.go +++ b/pkg/tensor/ops/ops.go @@ -1,209 +1,42 @@ package ops -import ( - "fmt" - "math" - - "github.com/Hirogava/Go-NN-Learn/pkg/tensor" -) - -// Add выполняет поэлементное сложение двух тензоров. -// Возвращает новый тензор C, где C[i] = A[i] + B[i] -// Тензоры должны иметь одинаковую форму (Shape). -func Add(a, b *tensor.Tensor) (*tensor.Tensor, error) { - if !shapesEqual(a.Shape, b.Shape) { - return nil, fmt.Errorf("shapes must match: %v vs %v", a.Shape, b.Shape) - } - - result := &tensor.Tensor{ - Data: make([]float64, len(a.Data)), - Shape: append([]int{}, a.Shape...), - Strides: append([]int{}, a.Strides...), - } - - for i := 0; i < len(a.Data); i++ { - result.Data[i] = a.Data[i] + b.Data[i] - } - - return result, nil -} - -// Mul выполняет поэлементное умножение двух тензоров (операция Адамара). -// Возвращает новый тензор C, где C[i] = A[i] * B[i] -// Тензоры должны иметь одинаковую форму (Shape). -func Mul(a, b *tensor.Tensor) (*tensor.Tensor, error) { - if !shapesEqual(a.Shape, b.Shape) { - return nil, fmt.Errorf("shapes must match: %v vs %v", a.Shape, b.Shape) - } - - result := &tensor.Tensor{ - Data: make([]float64, len(a.Data)), - Shape: append([]int{}, a.Shape...), - Strides: append([]int{}, a.Strides...), - } - - for i := 0; i < len(a.Data); i++ { - result.Data[i] = a.Data[i] * b.Data[i] - } - - return result, nil -} - -// Apply применяет функцию f к каждому элементу тензора. -// Возвращает новый тензор с результатами применения функции. -// Используется для функций активации (ReLU, Sigmoid, Tanh). -func Apply(a *tensor.Tensor, f func(float64) float64) *tensor.Tensor { - result := &tensor.Tensor{ - Data: make([]float64, len(a.Data)), - Shape: append([]int{}, a.Shape...), - Strides: append([]int{}, a.Strides...), - } - - for i := 0; i < len(a.Data); i++ { - result.Data[i] = f(a.Data[i]) - } - - return result -} - -// shapesEqual проверяет равенство форм двух тензоров. -func shapesEqual(a, b []int) bool { - if len(a) != len(b) { - return false - } - for i := range a { - if a[i] != b[i] { - return false - } - } - return true -} - -// Reshape изменяет форму тензора без изменения данных. -// Возвращает новый тензор с новой формой, используя те же данные. -// Общее количество элементов должно совпадать. -func Reshape(a *tensor.Tensor, newShape []int) (*tensor.Tensor, error) { - // Вычисляем общее количество элементов в новой форме - newSize := 1 - for _, dim := range newShape { - if dim <= 0 { - return nil, fmt.Errorf("invalid dimension: %d", dim) - } - newSize *= dim - } - - // Проверяем, что размер совпадает - oldSize := 1 - for _, dim := range a.Shape { - oldSize *= dim - } - - if newSize != oldSize { - return nil, fmt.Errorf("cannot reshape tensor of size %d into shape %v (size %d)", oldSize, newShape, newSize) - } - - // Вычисляем новые strides - newStrides := make([]int, len(newShape)) - stride := 1 - for i := len(newShape) - 1; i >= 0; i-- { - newStrides[i] = stride - stride *= newShape[i] - } - - return &tensor.Tensor{ - Data: a.Data, // Используем те же данные - Shape: append([]int{}, newShape...), - Strides: newStrides, - }, nil -} - -// Transpose транспонирует двумерный тензор (матрицу). -// Меняет местами оси: строки становятся столбцами и наоборот. -// Для матрицы [m, n] возвращает матрицу [n, m]. -func Transpose(a *tensor.Tensor) (*tensor.Tensor, error) { - if len(a.Shape) != 2 { - return nil, fmt.Errorf("transpose requires 2D tensor, got %dD", len(a.Shape)) - } - - rows := a.Shape[0] - cols := a.Shape[1] - - result := &tensor.Tensor{ - Data: make([]float64, len(a.Data)), - Shape: []int{cols, rows}, - Strides: []int{rows, 1}, - } - - // Транспонируем данные - for i := 0; i < rows; i++ { - for j := 0; j < cols; j++ { - result.Data[j*rows+i] = a.Data[i*cols+j] - } - } - - return result, nil -} - -// Sum вычисляет сумму всех элементов тензора. -// Возвращает скаляр (одноэлементный тензор). -func Sum(a *tensor.Tensor) *tensor.Tensor { - sum := 0.0 - for _, val := range a.Data { - sum += val - } - - return &tensor.Tensor{ - Data: []float64{sum}, - Shape: []int{1}, - Strides: []int{1}, - } -} - -// Exp применяет экспоненциальную функцию e^x к каждому элементу тензора. -// Возвращает новый тензор с результатами применения exp. -func Exp(a *tensor.Tensor) *tensor.Tensor { - return Apply(a, math.Exp) -} - -// Log применяет натуральный логарифм ln(x) к каждому элементу тензора. -// Возвращает новый тензор с результатами применения log. -func Log(a *tensor.Tensor) *tensor.Tensor { - return Apply(a, math.Log) -} +import "github.com/Hirogava/Go-NN-Learn/pkg/tensor" func Max(a *tensor.Tensor) *tensor.Tensor { if len(a.Data) == 0 { - panic("Cannot compute max of empty tensor") + return tensor.Zeros(1) } - maxVal := a.Data[0] - for _, val := range a.Data { - if val > maxVal { - maxVal = val + m := a.Data[0] + for _, v := range a.Data[1:] { + if v > m { + m = v } } - return &tensor.Tensor{Data: []float64{maxVal}, Shape: []int{1}} + result := tensor.Zeros(1) + result.Data[0] = m + return result } -func Sub(a *tensor.Tensor, scalar *tensor.Tensor) *tensor.Tensor { - if len(scalar.Data) != 1 { - panic("Sub expects a scalar tensor as the second argument") - } +func Sub(a, b *tensor.Tensor) *tensor.Tensor { result := tensor.Zeros(a.Shape...) - for i := range a.Data { - result.Data[i] = a.Data[i] - scalar.Data[0] + bVal := b.Data[0] + for i, v := range a.Data { + result.Data[i] = v - bVal } return result } -func Div(a *tensor.Tensor, other *tensor.Tensor) *tensor.Tensor { - if len(a.Shape) != 2 || len(other.Shape) != 1 || a.Shape[0] != other.Shape[0] { - panic("Invalid shapes for division") - } - rows, cols := a.Shape[0], a.Shape[1] +func Div(a, b *tensor.Tensor) *tensor.Tensor { result := tensor.Zeros(a.Shape...) + rows := a.Shape[0] + cols := 1 + if len(a.Shape) > 1 { + cols = a.Shape[1] + } for i := 0; i < rows; i++ { for j := 0; j < cols; j++ { - result.Data[i*cols+j] = a.Data[i*cols+j] / other.Data[i] + idx := i*a.Strides[0] + j*a.Strides[1] + result.Data[idx] = a.Data[idx] / b.Data[i] } } return result diff --git a/pkg/tensor/pool.go b/pkg/tensor/pool.go index 715d93e..949e2ac 100644 --- a/pkg/tensor/pool.go +++ b/pkg/tensor/pool.go @@ -5,62 +5,114 @@ import ( "sync" ) +// TensorPool - пул для переиспользования тензоров и снижения аллокаций // TensorPool - пул для переиспользования тензоров и снижения аллокаций type TensorPool struct { - pools map[int]*sync.Pool - mu sync.RWMutex + pools64 map[int]*sync.Pool + pools32 map[int]*sync.Pool + mu sync.RWMutex } -// globalPool - глобальный пул тензоров -var globalPool = NewTensorPool() - // NewTensorPool создает новый пул тензоров func NewTensorPool() *TensorPool { return &TensorPool{ - pools: make(map[int]*sync.Pool), + pools64: make(map[int]*sync.Pool), + pools32: make(map[int]*sync.Pool), } } // Get получает тензор из пула или создает новый func (tp *TensorPool) Get(size int) *Tensor { + dtype := GetDefaultDType() + tp.mu.RLock() - pool, exists := tp.pools[size] + var pool *sync.Pool + var exists bool + if dtype == Float32 { + pool, exists = tp.pools32[size] + } else { + pool, exists = tp.pools64[size] + } tp.mu.RUnlock() if !exists { tp.mu.Lock() // Проверяем еще раз после получения write lock - pool, exists = tp.pools[size] - if !exists { - pool = &sync.Pool{ - New: func() interface{} { - return &Tensor{ - Data: make([]float64, size), - } - }, + if dtype == Float32 { + pool, exists = tp.pools32[size] + if !exists { + pool = &sync.Pool{ + New: func() interface{} { + return &Tensor{ + Data32: make([]float32, size), + DType: Float32, + } + }, + } + tp.pools32[size] = pool + } + } else { + pool, exists = tp.pools64[size] + if !exists { + pool = &sync.Pool{ + New: func() interface{} { + return &Tensor{ + Data: make([]float64, size), + DType: Float64, + } + }, + } + tp.pools64[size] = pool } - tp.pools[size] = pool } tp.mu.Unlock() } tensor := pool.Get().(*Tensor) + // Обнуляем данные для безопасности - for i := range tensor.Data { - tensor.Data[i] = 0.0 + if tensor.DType == Float32 { + // Optimization: check if we need to zero out? + // Ideally checking if dirtied. For now always zero. + for i := range tensor.Data32 { + tensor.Data32[i] = 0.0 + } + } else { + for i := range tensor.Data { + tensor.Data[i] = 0.0 + } } + return tensor } // Put возвращает тензор в пул для переиспользования func (tp *TensorPool) Put(t *Tensor) { - if t == nil || t.Data == nil { + if t == nil { return } - size := len(t.Data) + var size int + if t.DType == Float32 { + if t.Data32 == nil { + return + } + size = len(t.Data32) + } else { + if t.Data == nil { + return + } + size = len(t.Data) + } + tp.mu.RLock() - pool, exists := tp.pools[size] + var pool *sync.Pool + var exists bool + if t.DType == Float32 { + pool, exists = tp.pools32[size] + } else { + pool, exists = tp.pools64[size] + } tp.mu.RUnlock() if exists { @@ -71,6 +123,9 @@ func (tp *TensorPool) Put(t *Tensor) { } } +// globalPool - глобальный пул тензоров +var globalPool = NewTensorPool() + // GetTensor - глобальная функция для получения тензора из пула func GetTensor(size int) *Tensor { return globalPool.Get(size) @@ -91,6 +146,10 @@ func WithPooledTensor(size int, fn func(*Tensor) error) error { // MatMulPooled - версия MatMul с использованием пула памяти func MatMulPooled(a, b *Tensor) (*Tensor, error) { + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают: %v != %v", a.DType, b.DType) + } + // Проверка размерностей if len(a.Shape) != 2 || len(b.Shape) != 2 { return nil, fmt.Errorf("умножение матриц требует 2D тензоры") @@ -110,12 +169,10 @@ func MatMulPooled(a, b *Tensor) (*Tensor, error) { result.Strides = []int{p, 1} // Выполняем умножение - if m >= ParallelThreshold || p >= ParallelThreshold { - matmulParallelBlocked(a.Data, b.Data, result.Data, m, n, p) - } else if m >= BlockSize || p >= BlockSize { - matmulBlocked(a.Data, b.Data, result.Data, m, n, p) + if a.DType == Float32 { + matmulV2Float32(a.Data32, b.Data32, result.Data32, m, n, p) } else { - matmulOptimized(a.Data, b.Data, result.Data, m, n, p) + matmulV2(a.Data, b.Data, result.Data, m, n, p) } return result, nil @@ -123,16 +180,26 @@ func MatMulPooled(a, b *Tensor) (*Tensor, error) { // AddPooled - версия Add с использованием пула памяти func AddPooled(a, b *Tensor) (*Tensor, error) { + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают") + } if !shapesEqual(a.Shape, b.Shape) { - return nil, fmt.Errorf("формы тензоров должны совпадать: %v != %v", a.Shape, b.Shape) + return nil, fmt.Errorf("формы тензоров должны совпадать") } - result := GetTensor(len(a.Data)) + size := a.DataLen() + result := GetTensor(size) result.Shape = append([]int{}, a.Shape...) result.Strides = append([]int{}, a.Strides...) - for i := 0; i < len(a.Data); i++ { - result.Data[i] = a.Data[i] + b.Data[i] + if a.DType == Float32 { + for i := 0; i < size; i++ { + result.Data32[i] = a.Data32[i] + b.Data32[i] + } + } else { + for i := 0; i < size; i++ { + result.Data[i] = a.Data[i] + b.Data[i] + } } return result, nil @@ -140,16 +207,26 @@ func AddPooled(a, b *Tensor) (*Tensor, error) { // MulPooled - версия Mul с использованием пула памяти func MulPooled(a, b *Tensor) (*Tensor, error) { + if a.DType != b.DType { + return nil, fmt.Errorf("типы данных тензоров не совпадают") + } if !shapesEqual(a.Shape, b.Shape) { - return nil, fmt.Errorf("формы тензоров должны совпадать: %v != %v", a.Shape, b.Shape) + return nil, fmt.Errorf("формы тензоров должны совпадать") } - result := GetTensor(len(a.Data)) + size := a.DataLen() + result := GetTensor(size) result.Shape = append([]int{}, a.Shape...) result.Strides = append([]int{}, a.Strides...) - for i := 0; i < len(a.Data); i++ { - result.Data[i] = a.Data[i] * b.Data[i] + if a.DType == Float32 { + for i := 0; i < size; i++ { + result.Data32[i] = a.Data32[i] * b.Data32[i] + } + } else { + for i := 0; i < size; i++ { + result.Data[i] = a.Data[i] * b.Data[i] + } } return result, nil @@ -157,12 +234,19 @@ func MulPooled(a, b *Tensor) (*Tensor, error) { // ApplyPooled - версия Apply с использованием пула памяти func ApplyPooled(a *Tensor, f func(float64) float64) *Tensor { - result := GetTensor(len(a.Data)) + size := a.DataLen() + result := GetTensor(size) result.Shape = append([]int{}, a.Shape...) result.Strides = append([]int{}, a.Strides...) - for i := 0; i < len(a.Data); i++ { - result.Data[i] = f(a.Data[i]) + if a.DType == Float32 { + for i := 0; i < size; i++ { + result.Data32[i] = float32(f(float64(a.Data32[i]))) + } + } else { + for i := 0; i < size; i++ { + result.Data[i] = f(a.Data[i]) + } } return result diff --git a/pkg/tensor/scheduler.go b/pkg/tensor/scheduler.go new file mode 100644 index 0000000..8627671 --- /dev/null +++ b/pkg/tensor/scheduler.go @@ -0,0 +1,94 @@ +package tensor + +import ( + "runtime" + "sync" + "sync/atomic" +) + +// Scheduler provides a centralized parallel execution facility for tensor operations. +// It prevents nested parallelism (goroutine explosion) and respects GOMAXPROCS. + +var ( + // maxWorkers controls the maximum number of parallel workers. + // Defaults to GOMAXPROCS. + maxWorkers int32 = int32(runtime.GOMAXPROCS(0)) + + // parallelDepth tracks the current nesting depth of parallel regions. + // If > 0, subsequent ParallelFor calls execute sequentially to prevent N² goroutines. + parallelDepth atomic.Int32 +) + +// MinGrainSize is the default minimum number of iterations per worker. +// Below this threshold, parallelization overhead exceeds the benefit. +const MinGrainSize = 64 + +// SetMaxWorkers sets the maximum number of parallel workers. +// Use 0 to reset to GOMAXPROCS. +func SetMaxWorkers(n int) { + if n <= 0 { + n = runtime.GOMAXPROCS(0) + } + atomic.StoreInt32(&maxWorkers, int32(n)) +} + +// GetMaxWorkers returns the current maximum number of parallel workers. +func GetMaxWorkers() int { + return int(atomic.LoadInt32(&maxWorkers)) +} + +// ParallelFor splits [0, total) into chunks and executes body(start, end) in parallel. +// +// Rules: +// - If total <= minGrain or workers <= 1, runs sequentially. +// - If already inside a parallel region (nesting), runs sequentially. +// - Otherwise, splits work across GOMAXPROCS goroutines. +// +// The body function receives a [start, end) range and must be safe to call +// concurrently from multiple goroutines with non-overlapping ranges. +func ParallelFor(total, minGrain int, body func(start, end int)) { + workers := int(atomic.LoadInt32(&maxWorkers)) + + // Guard 1: too little work or single-threaded + if total <= minGrain || workers <= 1 { + body(0, total) + return + } + + // Guard 2: already inside a parallel region — prevent nesting + if parallelDepth.Add(1) > 1 { + parallelDepth.Add(-1) + body(0, total) + return + } + defer parallelDepth.Add(-1) + + // Calculate grain size: at least minGrain per worker + grain := (total + workers - 1) / workers + if grain < minGrain { + grain = minGrain + } + + // Count actual number of chunks + numChunks := (total + grain - 1) / grain + if numChunks <= 1 { + body(0, total) + return + } + + var wg sync.WaitGroup + wg.Add(numChunks) + + for start := 0; start < total; start += grain { + s, e := start, start+grain + if e > total { + e = total + } + go func(start, end int) { + defer wg.Done() + body(start, end) + }(s, e) + } + + wg.Wait() +} diff --git a/pkg/tensor/scheduler_test.go b/pkg/tensor/scheduler_test.go new file mode 100644 index 0000000..5453c9b --- /dev/null +++ b/pkg/tensor/scheduler_test.go @@ -0,0 +1,261 @@ +package tensor + +import ( + "runtime" + "sync/atomic" + "testing" +) + +func TestParallelForCorrectness(t *testing.T) { + // ParallelFor result must equal sequential result + const n = 1000 + sequential := make([]float64, n) + parallel := make([]float64, n) + + // Sequential + for i := 0; i < n; i++ { + sequential[i] = float64(i * i) + } + + // Parallel + ParallelFor(n, 1, func(start, end int) { + for i := start; i < end; i++ { + parallel[i] = float64(i * i) + } + }) + + for i := 0; i < n; i++ { + if sequential[i] != parallel[i] { + t.Fatalf("mismatch at %d: sequential=%f, parallel=%f", i, sequential[i], parallel[i]) + } + } +} + +func TestParallelForSmallInput(t *testing.T) { + // Small inputs should run sequentially (no panic, correct result) + result := make([]int, 3) + ParallelFor(3, 64, func(start, end int) { + for i := start; i < end; i++ { + result[i] = i + 1 + } + }) + + for i, v := range result { + if v != i+1 { + t.Fatalf("expected %d, got %d at index %d", i+1, v, i) + } + } +} + +func TestParallelForZeroInput(t *testing.T) { + // Zero-length input should not panic + called := false + ParallelFor(0, 1, func(start, end int) { + called = true + }) + // With total=0, body(0,0) is called but does nothing + _ = called +} + +func TestParallelForNesting(t *testing.T) { + // Nested ParallelFor should NOT create N² goroutines + // The inner call should run sequentially due to anti-nesting guard + const n = 100 + result := make([]float64, n*n) + + ParallelFor(n, 1, func(outerStart, outerEnd int) { + for i := outerStart; i < outerEnd; i++ { + // This inner call should detect nesting and run sequentially + ParallelFor(n, 1, func(innerStart, innerEnd int) { + for j := innerStart; j < innerEnd; j++ { + result[i*n+j] = float64(i + j) + } + }) + } + }) + + // Verify correctness + for i := 0; i < n; i++ { + for j := 0; j < n; j++ { + expected := float64(i + j) + if result[i*n+j] != expected { + t.Fatalf("nesting test: expected %f at [%d,%d], got %f", expected, i, j, result[i*n+j]) + } + } + } +} + +func TestParallelForRace(t *testing.T) { + // This test is designed to be run with -race flag + // Each goroutine writes to non-overlapping ranges — no race expected + const n = 10000 + data := make([]float64, n) + + ParallelFor(n, 1, func(start, end int) { + for i := start; i < end; i++ { + data[i] = float64(i) * 2.0 + } + }) + + for i := 0; i < n; i++ { + if data[i] != float64(i)*2.0 { + t.Fatalf("race test: expected %f at %d, got %f", float64(i)*2.0, i, data[i]) + } + } +} + +func TestSetMaxWorkers(t *testing.T) { + original := GetMaxWorkers() + defer SetMaxWorkers(original) + + SetMaxWorkers(4) + if GetMaxWorkers() != 4 { + t.Fatalf("expected 4 workers, got %d", GetMaxWorkers()) + } + + // SetMaxWorkers(0) should reset to GOMAXPROCS + SetMaxWorkers(0) + expected := runtime.GOMAXPROCS(0) + if GetMaxWorkers() != expected { + t.Fatalf("expected %d workers (GOMAXPROCS), got %d", expected, GetMaxWorkers()) + } +} + +func TestSetMaxWorkersSequential(t *testing.T) { + original := GetMaxWorkers() + defer SetMaxWorkers(original) + + // With 1 worker, should run sequentially + SetMaxWorkers(1) + + const n = 100 + data := make([]float64, n) + ParallelFor(n, 1, func(start, end int) { + for i := start; i < end; i++ { + data[i] = float64(i) + } + }) + + for i := 0; i < n; i++ { + if data[i] != float64(i) { + t.Fatalf("sequential mode: expected %f at %d, got %f", float64(i), i, data[i]) + } + } +} + +// Benchmarks + +func BenchmarkParallelForGOMAXPROCS(b *testing.B) { + // Benchmark scaling across different GOMAXPROCS values + const n = 1024 * 1024 + data := make([]float64, n) + + for _, procs := range []int{1, 2, 4, 8} { + b.Run( + "GOMAXPROCS="+itoa(procs), + func(b *testing.B) { + originalMaxP := runtime.GOMAXPROCS(procs) + originalWorkers := GetMaxWorkers() + SetMaxWorkers(procs) + defer func() { + runtime.GOMAXPROCS(originalMaxP) + SetMaxWorkers(originalWorkers) + }() + + // Reset parallelDepth to 0 for clean benchmark + parallelDepth.Store(0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ParallelFor(n, MinGrainSize, func(start, end int) { + for j := start; j < end; j++ { + data[j] = float64(j) * 1.5 + } + }) + } + }, + ) + } +} + +func BenchmarkParallelForMatMul256(b *testing.B) { + size := 256 + a := make([]float64, size*size) + bm := make([]float64, size*size) + c := make([]float64, size*size) + for i := range a { + a[i] = float64(i % 100) + bm[i] = float64(i % 100) + } + + // Reset parallelDepth + parallelDepth.Store(0) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + matmulParallelBlocked(a, bm, c, size, size, size) + } +} + +// itoa is a simple int to string for benchmark names +func itoa(n int) string { + if n == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + return string(buf[i:]) +} + +// Ensure parallelDepth is accessible for testing +func resetParallelDepth() { + parallelDepth.Store(0) +} + +func getParallelDepth() int32 { + return parallelDepth.Load() +} + +func TestParallelDepthResets(t *testing.T) { + resetParallelDepth() + + // After ParallelFor completes, depth should be back to 0 + ParallelFor(100, 1, func(start, end int) { + // Check depth is 1 inside parallel region + depth := getParallelDepth() + if depth != 1 { + t.Errorf("expected depth 1 inside parallel region, got %d", depth) + } + }) + + if d := getParallelDepth(); d != 0 { + t.Fatalf("expected depth 0 after ParallelFor, got %d", d) + } +} + +// Test that anti-nesting works with atomic counter +func TestAntiNestingAtomic(t *testing.T) { + resetParallelDepth() + + var innerParallel atomic.Int32 + + ParallelFor(10, 1, func(outerStart, outerEnd int) { + for i := outerStart; i < outerEnd; i++ { + // Check if inner call actually runs in parallel + depth := getParallelDepth() + if depth > 0 { + innerParallel.Add(1) // This is expected — we're in parallel region + } + } + }) + + // All iterations should have seen depth > 0 + if innerParallel.Load() != 10 { + t.Logf("inner parallel count: %d (expected 10)", innerParallel.Load()) + } +} diff --git a/pkg/tensor/tensor.go b/pkg/tensor/tensor.go index ba2b37b..c5eb5c7 100644 --- a/pkg/tensor/tensor.go +++ b/pkg/tensor/tensor.go @@ -19,21 +19,42 @@ type Matrix struct { // Strides - количество элементов для перехода к следующему элементу по оси. // Strides позволяют эффективно работать с транспонированием и подтензорами без копирования. type Tensor struct { - Data []float64 - Shape []int - Strides []int + Data []float64 // Данные (для Float64) - row-major (C-style) + Data32 []float32 // Данные (для Float32) + Shape []int // Размерности (форма) тензора + Strides []int // Шаги для перехода по размерностям + DType DType // Тип данных +} + +// IsFloat32 возвращает true, если тензор использует float32 +func (t *Tensor) IsFloat32() bool { + return t.DType == Float32 +} + +// DataLen возвращает длину активного среза данных +func (t *Tensor) DataLen() int { + if t.DType == Float32 { + return len(t.Data32) + } + return len(t.Data) } // ZeroGrad создает тензор с нулевыми градиентами той же формы func (t *Tensor) ZeroGrad() *Tensor { - return &Tensor{ - Data: make([]float64, len(t.Data)), + out := &Tensor{ Shape: append([]int{}, t.Shape...), Strides: append([]int{}, t.Strides...), + DType: t.DType, + } + if t.DType == Float32 { + out.Data32 = make([]float32, t.DataLen()) + } else { + out.Data = make([]float64, t.DataLen()) } + return out } // Size возвращает общее количество элементов в тензоре func (t *Tensor) Size() int64 { - return int64(len(t.Data)) + return int64(t.DataLen()) } diff --git a/pkg/train/config.go b/pkg/train/config.go index 8fc0f79..dd58310 100644 --- a/pkg/train/config.go +++ b/pkg/train/config.go @@ -46,7 +46,7 @@ func NewTrainerFromConfig( lossFn, lrScheduler, metric, - callbacks, + *callbacks, cfg.Epochs, ) }