diff --git a/parquet/internal/encoding/byte_stream_split.go b/parquet/internal/encoding/byte_stream_split.go index fab61365..a9fcd53e 100644 --- a/parquet/internal/encoding/byte_stream_split.go +++ b/parquet/internal/encoding/byte_stream_split.go @@ -88,92 +88,6 @@ func encodeByteStreamSplitWidth8(data []byte, in []byte) { } } -// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data', -// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least len(data) bytes. -func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) { - const width = 4 - debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) - for element := 0; element < nValues; element++ { - out[width*element] = data[element] - out[width*element+1] = data[stride+element] - out[width*element+2] = data[2*stride+element] - out[width*element+3] = data[3*stride+element] - } -} - -// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data', -// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least len(data) bytes. -func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) { - const width = 8 - debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) - for element := 0; element < nValues; element++ { - out[width*element] = data[element] - out[width*element+1] = data[stride+element] - out[width*element+2] = data[2*stride+element] - out[width*element+3] = data[3*stride+element] - out[width*element+4] = data[4*stride+element] - out[width*element+5] = data[5*stride+element] - out[width*element+6] = data[6*stride+element] - out[width*element+7] = data[7*stride+element] - } -} - -// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data', -// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least nValues slices. -func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) { - debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) - for stream := 0; stream < width; stream++ { - for element := 0; element < nValues; element++ { - encLoc := stride*stream + element - out[element][stream] = data[encLoc] - } - } -} - -// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data', -// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least nValues slices. -func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { - debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) - for element := 0; element < nValues; element++ { - out[element][0] = data[element] - out[element][1] = data[stride+element] - } -} - -// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data', -// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least nValues slices. -func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { - debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) - for element := 0; element < nValues; element++ { - out[element][0] = data[element] - out[element][1] = data[stride+element] - out[element][2] = data[stride*2+element] - out[element][3] = data[stride*3+element] - } -} - -// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data', -// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. -// 'out' must have space for at least nValues slices. -func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { - debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) - for element := 0; element < nValues; element++ { - out[element][0] = data[element] - out[element][1] = data[stride+element] - out[element][2] = data[stride*2+element] - out[element][3] = data[stride*3+element] - out[element][4] = data[stride*4+element] - out[element][5] = data[stride*5+element] - out[element][6] = data[stride*6+element] - out[element][7] = data[stride*7+element] - } -} - func releaseBufferToPool(pooled *PooledBufferWriter) { buf := pooled.buf memory.Set(buf.Buf(), 0) diff --git a/parquet/internal/encoding/byte_stream_split_amd64.go b/parquet/internal/encoding/byte_stream_split_amd64.go new file mode 100644 index 00000000..9e029df8 --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_amd64.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !noasm + +package encoding + +import ( + "fmt" + "unsafe" + + "github.com/apache/arrow-go/v18/parquet/internal/debug" + "golang.org/x/sys/cpu" +) + +func init() { + if cpu.X86.HasAVX2 { + decodeByteStreamSplitBatchWidth4 = decodeByteStreamSplitBatchWidth4AVX2 + decodeByteStreamSplitBatchWidth8 = decodeByteStreamSplitBatchWidth8AVX2 + } +} + +//go:noescape +func _decodeByteStreamSplitWidth4AVX2(data, out unsafe.Pointer, nValues, stride int) + +//go:noescape +func _decodeByteStreamSplitWidth8AVX2(data, out unsafe.Pointer, nValues, stride int) + +func decodeByteStreamSplitBatchWidth4AVX2(data []byte, nValues, stride int, out []byte) { + if nValues == 0 { + return + } + const width = 4 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues)) + _decodeByteStreamSplitWidth4AVX2(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride) +} + +func decodeByteStreamSplitBatchWidth8AVX2(data []byte, nValues, stride int, out []byte) { + if nValues == 0 { + return + } + const width = 8 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues)) + _decodeByteStreamSplitWidth8AVX2(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride) +} diff --git a/parquet/internal/encoding/byte_stream_split_arm64.go b/parquet/internal/encoding/byte_stream_split_arm64.go new file mode 100644 index 00000000..df4a841f --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_arm64.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !noasm + +package encoding + +import ( + "fmt" + "unsafe" + + "github.com/apache/arrow-go/v18/parquet/internal/debug" + "golang.org/x/sys/cpu" +) + +func init() { + if cpu.ARM64.HasASIMD { + decodeByteStreamSplitBatchWidth4 = decodeByteStreamSplitBatchWidth4NEON + decodeByteStreamSplitBatchWidth8 = decodeByteStreamSplitBatchWidth8NEON + } +} + +//go:noescape +func _decodeByteStreamSplitWidth4NEON(data, out unsafe.Pointer, nValues, stride int) + +//go:noescape +func _decodeByteStreamSplitWidth8NEON(data, out unsafe.Pointer, nValues, stride int) + +func decodeByteStreamSplitBatchWidth4NEON(data []byte, nValues, stride int, out []byte) { + if nValues == 0 { + return + } + const width = 4 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues)) + _decodeByteStreamSplitWidth4NEON(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride) +} + +func decodeByteStreamSplitBatchWidth8NEON(data []byte, nValues, stride int, out []byte) { + if nValues == 0 { + return + } + const width = 8 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues)) + _decodeByteStreamSplitWidth8NEON(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride) +} diff --git a/parquet/internal/encoding/byte_stream_split_decode.go b/parquet/internal/encoding/byte_stream_split_decode.go new file mode 100644 index 00000000..7c3c910c --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_decode.go @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "fmt" + "unsafe" + + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/internal/debug" +) + +var ( + decodeByteStreamSplitBatchWidth4 func(data []byte, nValues, stride int, out []byte) = decodeByteStreamSplitBatchWidth4Default + decodeByteStreamSplitBatchWidth8 func(data []byte, nValues, stride int, out []byte) = decodeByteStreamSplitBatchWidth8Default +) + +// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data', +// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth4Default(data []byte, nValues, stride int, out []byte) { + const width = 4 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + // the beginning of the data slice can be truncated, but for valid encoding we need at least (width-1)*stride+nValues bytes + debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues)) + s0 := data[:nValues] + s1 := data[stride : stride+nValues] + s2 := data[2*stride : 2*stride+nValues] + s3 := data[3*stride : 3*stride+nValues] + out = out[:width*nValues] + out32 := unsafe.Slice((*uint32)(unsafe.Pointer(&out[0])), nValues) + for i := range nValues { + out32[i] = uint32(s0[i]) | uint32(s1[i])<<8 | uint32(s2[i])<<16 | uint32(s3[i])<<24 + } +} + +// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data', +// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth8Default(data []byte, nValues, stride int, out []byte) { + const width = 8 + debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data))) + debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues)) + s0 := data[:nValues] + s1 := data[stride : stride+nValues] + s2 := data[2*stride : 2*stride+nValues] + s3 := data[3*stride : 3*stride+nValues] + s4 := data[4*stride : 4*stride+nValues] + s5 := data[5*stride : 5*stride+nValues] + s6 := data[6*stride : 6*stride+nValues] + s7 := data[7*stride : 7*stride+nValues] + out = out[:width*nValues] + out64 := unsafe.Slice((*uint64)(unsafe.Pointer(&out[0])), nValues) + for i := range nValues { + out64[i] = uint64(s0[i]) | uint64(s1[i])<<8 | uint64(s2[i])<<16 | uint64(s3[i])<<24 | + uint64(s4[i])<<32 | uint64(s5[i])<<40 | uint64(s6[i])<<48 | uint64(s7[i])<<56 + } +} + +// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) { + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + debug.Assert(len(data) >= (width-1)*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), (width-1)*stride+nValues)) + for stream := 0; stream < width; stream++ { + for element := 0; element < nValues; element++ { + encLoc := stride*stream + element + out[element][stream] = data[encLoc] + } + } +} + +// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + const width = 2 + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + debug.Assert(len(data) >= stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), stride+nValues)) + s0 := data[:nValues] + s1 := data[stride : stride+nValues] + for i := range nValues { + out16 := (*uint16)(unsafe.Pointer(&out[i][0])) + *out16 = uint16(s0[i]) | uint16(s1[i])<<8 + } +} + +// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + const width = 4 + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues)) + s0 := data[:nValues] + s1 := data[stride : stride+nValues] + s2 := data[stride*2 : stride*2+nValues] + s3 := data[stride*3 : stride*3+nValues] + for i := range nValues { + out32 := (*uint32)(unsafe.Pointer(&out[i][0])) + *out32 = uint32(s0[i]) | uint32(s1[i])<<8 | uint32(s2[i])<<16 | uint32(s3[i])<<24 + } +} + +// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + const width = 8 + debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues)) + debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues)) + s0 := data[:nValues] + s1 := data[stride : stride+nValues] + s2 := data[stride*2 : stride*2+nValues] + s3 := data[stride*3 : stride*3+nValues] + s4 := data[stride*4 : stride*4+nValues] + s5 := data[stride*5 : stride*5+nValues] + s6 := data[stride*6 : stride*6+nValues] + s7 := data[stride*7 : stride*7+nValues] + for i := range nValues { + out64 := (*uint64)(unsafe.Pointer(&out[i][0])) + *out64 = uint64(s0[i]) | uint64(s1[i])<<8 | uint64(s2[i])<<16 | uint64(s3[i])<<24 | + uint64(s4[i])<<32 | uint64(s5[i])<<40 | uint64(s6[i])<<48 | uint64(s7[i])<<56 + } +} diff --git a/parquet/internal/encoding/byte_stream_split_decode_avx2_amd64.s b/parquet/internal/encoding/byte_stream_split_decode_avx2_amd64.s new file mode 100644 index 00000000..cde6a473 --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_decode_avx2_amd64.s @@ -0,0 +1,290 @@ +//+build !noasm !appengine +// AVX2 implementation with 256-bit vectors + +#include "textflag.h" + +// func _decodeByteStreamSplitWidth4AVX2(data, out unsafe.Pointer, nValues, stride int) +// +// AVX2 implementation with 256-bit vectors: +// - Processes suffix FIRST, then vectorized blocks +// - Uses 256-bit AVX2 vectors (32 bytes per register) +// - Processes 32 float32 values per block (128 bytes total) +// - Uses 2-stage byte unpacking hierarchy +TEXT ·_decodeByteStreamSplitWidth4AVX2(SB), NOSPLIT, $0-32 + MOVQ data+0(FP), SI // SI = data pointer + MOVQ out+8(FP), DI // DI = out pointer + MOVQ nValues+16(FP), CX // CX = nValues + MOVQ stride+24(FP), DX // DX = stride + + // Setup stream pointers + MOVQ SI, R9 // stream 0 + LEAQ (SI)(DX*1), R10 // stream 1 = data + stride + LEAQ (SI)(DX*2), R11 // stream 2 = data + 2*stride + LEAQ (R10)(DX*2), R12 // stream 3 = data + 3*stride + + // Calculate num_blocks = nValues / 32 + MOVQ CX, AX + SHRQ $5, AX // AX = num_blocks (divide by 32) + + // Calculate num_processed_elements = num_blocks * 32 + MOVQ AX, R13 + SHLQ $5, R13 // R13 = num_processed_elements + + // First handle suffix (elements beyond complete blocks) + MOVQ R13, R14 // R14 = i = num_processed_elements + JMP suffix_check_avx2 + +suffix_loop_avx2: + MOVBQZX (R9)(R14*1), BX // s0 + MOVBQZX (R10)(R14*1), R15 // s1 + SHLQ $8, R15 + ORQ R15, BX + MOVBQZX (R11)(R14*1), R15 // s2 + SHLQ $16, R15 + ORQ R15, BX + MOVBQZX (R12)(R14*1), R15 // s3 + SHLQ $24, R15 + ORQ R15, BX + MOVQ R14, AX + + SHLQ $2, AX // AX = i*4 + MOVL BX, (DI)(AX*1) // ← single 32-bit store + + INCQ R14 + +suffix_check_avx2: + CMPQ R14, CX + JL suffix_loop_avx2 + + // Check if we have blocks to process + TESTQ R13, R13 // Check if num_processed_elements > 0 + JZ done_avx2 + + // Process blocks with AVX2 + XORQ R14, R14 // R14 = block index i = 0 + SHRQ $5, R13 // R13 = num_blocks + +block_loop_avx2: + // Calculate offset for this block: i * 32 + MOVQ R14, AX + SHLQ $5, AX // AX = i * 32 + + // Load 32 bytes from each stream + // stage[0][j] = _mm256_loadu_si256(&data[i * 32 + j * stride]) + VMOVDQU (R9)(AX*1), Y0 // stage[0][0] from stream 0 + VMOVDQU (R10)(AX*1), Y1 // stage[0][1] from stream 1 + VMOVDQU (R11)(AX*1), Y2 // stage[0][2] from stream 2 + VMOVDQU (R12)(AX*1), Y3 // stage[0][3] from stream 3 + + // Stage 1: First level of byte interleaving + // stage[1][0] = _mm256_unpacklo_epi8(stage[0][0], stage[0][2]) + // stage[1][1] = _mm256_unpackhi_epi8(stage[0][0], stage[0][2]) + // stage[1][2] = _mm256_unpacklo_epi8(stage[0][1], stage[0][3]) + // stage[1][3] = _mm256_unpackhi_epi8(stage[0][1], stage[0][3]) + + VPUNPCKLBW Y2, Y0, Y4 // Y4 = unpacklo_epi8(Y0, Y2) + VPUNPCKHBW Y2, Y0, Y5 // Y5 = unpackhi_epi8(Y0, Y2) + VPUNPCKLBW Y3, Y1, Y6 // Y6 = unpacklo_epi8(Y1, Y3) + VPUNPCKHBW Y3, Y1, Y7 // Y7 = unpackhi_epi8(Y1, Y3) + + // Stage 2: Second level of byte interleaving + // stage[2][0] = _mm256_unpacklo_epi8(stage[1][0], stage[1][2]) + // stage[2][1] = _mm256_unpackhi_epi8(stage[1][0], stage[1][2]) + // stage[2][2] = _mm256_unpacklo_epi8(stage[1][1], stage[1][3]) + // stage[2][3] = _mm256_unpackhi_epi8(stage[1][1], stage[1][3]) + + VPUNPCKLBW Y6, Y4, Y0 // Y0 = unpacklo_epi8(Y4, Y6) + VPUNPCKHBW Y6, Y4, Y1 // Y1 = unpackhi_epi8(Y4, Y6) + VPUNPCKLBW Y7, Y5, Y2 // Y2 = unpacklo_epi8(Y5, Y7) + VPUNPCKHBW Y7, Y5, Y3 // Y3 = unpackhi_epi8(Y5, Y7) + + // Fix lane order: AVX2 unpacking operates within each 128-bit lane + // After two levels of unpacking, we have: + // Y0 = [bytes 0-7 of values 0-7 | bytes 0-7 of values 16-23] + // Y1 = [bytes 0-7 of values 8-15 | bytes 0-7 of values 24-31] + // Y2 = [same pattern for different byte positions] + // Y3 = [same pattern for different byte positions] + // We need: [values 0-7 | values 8-15 | values 16-23 | values 24-31] + + VPERM2I128 $0x20, Y1, Y0, Y4 // Y4 = [Y0_low(0-7) | Y1_low(8-15)] + VPERM2I128 $0x31, Y1, Y0, Y5 // Y5 = [Y0_high(16-23) | Y1_high(24-31)] + VPERM2I128 $0x20, Y3, Y2, Y6 // Y6 = [Y2_low | Y3_low] + VPERM2I128 $0x31, Y3, Y2, Y7 // Y7 = [Y2_high | Y3_high] + + // Store results: out[(i * 4 + j) * 32] = stage[result][j] + // Calculate output base: i * 128 + MOVQ R14, AX + SHLQ $7, AX // AX = i * 128 + + VMOVDQU Y4, (DI)(AX*1) // Store at offset 0 + VMOVDQU Y6, 32(DI)(AX*1) // Store at offset 32 + VMOVDQU Y5, 64(DI)(AX*1) // Store at offset 64 + VMOVDQU Y7, 96(DI)(AX*1) // Store at offset 96 + + INCQ R14 + CMPQ R14, R13 + JL block_loop_avx2 + +done_avx2: + VZEROUPPER + RET + +// func _decodeByteStreamSplitWidth8AVX2(data, out unsafe.Pointer, nValues, stride int) +// +// AVX2 implementation for width=8 (float64/int64) with 256-bit vectors: +// - Processes suffix FIRST, then vectorized blocks +// - Uses 256-bit AVX2 vectors (32 bytes per register) +// - Processes 16 float64 values per block (128 bytes total) +// - Uses 3-stage byte unpacking hierarchy (for 8 streams) +TEXT ·_decodeByteStreamSplitWidth8AVX2(SB), NOSPLIT, $0-32 + MOVQ data+0(FP), SI // SI = data pointer + MOVQ out+8(FP), DI // DI = out pointer + MOVQ nValues+16(FP), CX // CX = nValues + MOVQ stride+24(FP), DX // DX = stride + + // Setup 8 stream pointers + MOVQ SI, R9 // stream 0 + LEAQ (SI)(DX*1), R10 // stream 1 = data + stride + LEAQ (SI)(DX*2), R11 // stream 2 = data + 2*stride + LEAQ (R10)(DX*2), R12 // stream 3 = data + 3*stride + LEAQ (SI)(DX*4), R13 // stream 4 = data + 4*stride + LEAQ (R10)(DX*4), R14 // stream 5 = data + 5*stride + LEAQ (R11)(DX*4), R15 // stream 6 = data + 6*stride + LEAQ (R12)(DX*4), BX // stream 7 = data + 7*stride + + // Calculate num_blocks = nValues / 16 + MOVQ CX, AX + SHRQ $4, AX // AX = num_blocks + + // Calculate num_processed_elements = num_blocks * 16 + MOVQ AX, R8 + SHLQ $4, R8 // R8 = num_processed_elements + + // First handle suffix (elements beyond complete blocks) + MOVQ R8, SI // SI = i = num_processed_elements + JMP suffix_check_w8_avx2 + +suffix_loop_w8_avx2: + // Load first byte (stream 0) and start accumulator + MOVBQZX (R9)(SI*1), DX // DX = s0[i] (lowest byte) + + // stream 1 << 8 + MOVBQZX (R10)(SI*1), AX + SHLQ $8, AX + ORQ AX, DX + + // stream 2 << 16 + MOVBQZX (R11)(SI*1), AX + SHLQ $16, AX + ORQ AX, DX + + // stream 3 << 24 + MOVBQZX (R12)(SI*1), AX + SHLQ $24, AX + ORQ AX, DX + + // stream 4 << 32 + MOVBQZX (R13)(SI*1), AX + SHLQ $32, AX + ORQ AX, DX + + // stream 5 << 40 + MOVBQZX (R14)(SI*1), AX + SHLQ $40, AX + ORQ AX, DX + + // stream 6 << 48 + MOVBQZX (R15)(SI*1), AX + SHLQ $48, AX + ORQ AX, DX + + // stream 7 << 56 + MOVBQZX (BX)(SI*1), AX + SHLQ $56, AX + ORQ AX, DX + + MOVQ SI, AX + SHLQ $3, AX // AX = i * 8 + MOVQ DX, (DI)(AX*1) + + INCQ SI + +suffix_check_w8_avx2: + CMPQ SI, CX + JL suffix_loop_w8_avx2 + + // Check if we have blocks to process + TESTQ R8, R8 // Check if num_processed_elements > 0 + JZ done_w8_avx2 + + // Process blocks with AVX2 + XORQ SI, SI // SI = block index i = 0 + SHRQ $4, R8 // R8 = num_blocks + +block_loop_w8_avx2: + // Calculate offset for this block: i * 16 + MOVQ SI, AX + SHLQ $4, AX // AX = i * 16 + + // Load 16 bytes from each of 8 streams (using 128-bit loads for narrower data) + // We load 16 bytes but will use AVX2 operations + VMOVDQU (R9)(AX*1), X0 // stream 0 + VMOVDQU (R10)(AX*1), X1 // stream 1 + VMOVDQU (R11)(AX*1), X2 // stream 2 + VMOVDQU (R12)(AX*1), X3 // stream 3 + VMOVDQU (R13)(AX*1), X4 // stream 4 + VMOVDQU (R14)(AX*1), X5 // stream 5 + VMOVDQU (R15)(AX*1), X6 // stream 6 + VMOVDQU (BX)(AX*1), X7 // stream 7 + + // Stage 1: First level of byte interleaving (pairs 0-4, 1-5, 2-6, 3-7) + VPUNPCKLBW X4, X0, X8 // X8 = unpacklo_epi8(X0, X4) + VPUNPCKHBW X4, X0, X9 // X9 = unpackhi_epi8(X0, X4) + VPUNPCKLBW X5, X1, X10 // X10 = unpacklo_epi8(X1, X5) + VPUNPCKHBW X5, X1, X11 // X11 = unpackhi_epi8(X1, X5) + VPUNPCKLBW X6, X2, X12 // X12 = unpacklo_epi8(X2, X6) + VPUNPCKHBW X6, X2, X13 // X13 = unpackhi_epi8(X2, X6) + VPUNPCKLBW X7, X3, X14 // X14 = unpacklo_epi8(X3, X7) + VPUNPCKHBW X7, X3, X15 // X15 = unpackhi_epi8(X3, X7) + + // Stage 2: Second level of byte interleaving + VPUNPCKLBW X12, X8, X0 // X0 = unpacklo_epi8(X8, X12) + VPUNPCKHBW X12, X8, X1 // X1 = unpackhi_epi8(X8, X12) + VPUNPCKLBW X13, X9, X2 // X2 = unpacklo_epi8(X9, X13) + VPUNPCKHBW X13, X9, X3 // X3 = unpackhi_epi8(X9, X13) + VPUNPCKLBW X14, X10, X4 // X4 = unpacklo_epi8(X10, X14) + VPUNPCKHBW X14, X10, X5 // X5 = unpackhi_epi8(X10, X14) + VPUNPCKLBW X15, X11, X6 // X6 = unpacklo_epi8(X11, X15) + VPUNPCKHBW X15, X11, X7 // X7 = unpackhi_epi8(X11, X15) + + // Stage 3: Third level of byte interleaving + VPUNPCKLBW X4, X0, X8 // X8 = unpacklo_epi8(X0, X4) + VPUNPCKHBW X4, X0, X9 // X9 = unpackhi_epi8(X0, X4) + VPUNPCKLBW X5, X1, X10 // X10 = unpacklo_epi8(X1, X5) + VPUNPCKHBW X5, X1, X11 // X11 = unpackhi_epi8(X1, X5) + VPUNPCKLBW X6, X2, X12 // X12 = unpacklo_epi8(X2, X6) + VPUNPCKHBW X6, X2, X13 // X13 = unpackhi_epi8(X2, X6) + VPUNPCKLBW X7, X3, X14 // X14 = unpacklo_epi8(X3, X7) + VPUNPCKHBW X7, X3, X15 // X15 = unpackhi_epi8(X3, X7) + + // Store results: out[(i * 8 + j) * 16] = result[j] + // Calculate output base: i * 128 + MOVQ SI, AX + SHLQ $7, AX // AX = i * 128 + + VMOVDQU X8, (DI)(AX*1) // Store at offset 0 + VMOVDQU X9, 16(DI)(AX*1) // Store at offset 16 + VMOVDQU X10, 32(DI)(AX*1) // Store at offset 32 + VMOVDQU X11, 48(DI)(AX*1) // Store at offset 48 + VMOVDQU X12, 64(DI)(AX*1) // Store at offset 64 + VMOVDQU X13, 80(DI)(AX*1) // Store at offset 80 + VMOVDQU X14, 96(DI)(AX*1) // Store at offset 96 + VMOVDQU X15, 112(DI)(AX*1) // Store at offset 112 + + INCQ SI + CMPQ SI, R8 + JL block_loop_w8_avx2 + +done_w8_avx2: + VZEROUPPER + RET diff --git a/parquet/internal/encoding/byte_stream_split_decode_neon_arm64.s b/parquet/internal/encoding/byte_stream_split_decode_neon_arm64.s new file mode 100644 index 00000000..838d8d86 --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_decode_neon_arm64.s @@ -0,0 +1,289 @@ +//+build !noasm !appengine +// NEON implementation following AVX2 algorithm structure with 128-bit vectors + +#include "textflag.h" + +// func _decodeByteStreamSplitWidth4NEON(data, out unsafe.Pointer, nValues, stride int) +// +// NEON implementation following the AVX2 algorithm structure: +// - Processes suffix FIRST, then vectorized blocks +// - Uses 128-bit NEON vectors (16 bytes per register) +// - Processes 16 float32 values per block (64 bytes total) +// - Uses 2-stage byte unpacking hierarchy +TEXT ·_decodeByteStreamSplitWidth4NEON(SB), NOSPLIT, $0-32 + MOVD data+0(FP), R0 // R0 = data pointer + MOVD out+8(FP), R1 // R1 = out pointer + MOVD nValues+16(FP), R2 // R2 = nValues + MOVD stride+24(FP), R3 // R3 = stride + + // Setup stream pointers + MOVD R0, R4 // R4 = stream 0 + ADD R3, R0, R5 // R5 = stream 1 = data + stride + ADD R3, R5, R6 // R6 = stream 2 = data + 2*stride + ADD R3, R6, R7 // R7 = stream 3 = data + 3*stride + + // Calculate num_blocks = nValues / 16 + LSR $4, R2, R8 // R8 = num_blocks (divide by 16) + + // Calculate num_processed_elements = num_blocks * 16 + LSL $4, R8, R9 // R9 = num_processed_elements + + // First handle suffix (elements beyond complete blocks) + MOVD R9, R10 // R10 = i = num_processed_elements + B suffix_check_neon + +suffix_loop_neon: + // Gather bytes: gathered_byte_data[b] = data[b * stride + i] + MOVBU (R4)(R10), R11 // byte from stream 0 + MOVBU (R5)(R10), R12 // byte from stream 1 + + // Calculate output offset: i * 4 + LSL $2, R10, R13 // R13 = i * 4 + ADD R1, R13, R14 // R14 = out + (i * 4) + + // Store gathered bytes + MOVB R11, (R14) + MOVB R12, 1(R14) + + MOVBU (R6)(R10), R11 // byte from stream 2 + MOVBU (R7)(R10), R12 // byte from stream 3 + + MOVB R11, 2(R14) + MOVB R12, 3(R14) + + ADD $1, R10, R10 + +suffix_check_neon: + CMP R2, R10 + BLT suffix_loop_neon + + // Check if we have blocks to process + CBZ R9, done_neon // if num_processed_elements == 0, done + + // Process blocks with NEON + MOVD $0, R10 // R10 = block index i = 0 + LSR $4, R9, R9 // R9 = num_blocks + +block_loop_neon: + // Calculate offset for this block: i * 16 + LSL $4, R10, R11 // R11 = i * 16 + + // Load 16 bytes from each stream - REVERSED for little-endian! + ADD R7, R11, R12 + VLD1 (R12), [V0.B16] // V0 = stream 3 (MSB in little-endian) + + ADD R6, R11, R12 + VLD1 (R12), [V1.B16] // V1 = stream 2 + + ADD R5, R11, R12 + VLD1 (R12), [V2.B16] // V2 = stream 1 + + ADD R4, R11, R12 + VLD1 (R12), [V3.B16] // V3 = stream 0 (LSB in little-endian) + + // Stage 1: Interleave like AVX2 VPUNPCKLBW/VPUNPCKHBW + VZIP1 V0.B16, V2.B16, V4.B16 // Interleave streams 0,2 low bytes + VZIP2 V0.B16, V2.B16, V5.B16 // Interleave streams 0,2 high bytes + VZIP1 V1.B16, V3.B16, V6.B16 // Interleave streams 1,3 low bytes + VZIP2 V1.B16, V3.B16, V7.B16 // Interleave streams 1,3 high bytes + + // Stage 2: Second level + VZIP1 V4.B16, V6.B16, V0.B16 + VZIP2 V4.B16, V6.B16, V1.B16 + VZIP1 V5.B16, V7.B16, V2.B16 + VZIP2 V5.B16, V7.B16, V3.B16 + + // Store results in sequential order + // Calculate output base: i * 64 + LSL $6, R10, R11 // R11 = i * 64 + + ADD R1, R11, R12 + VST1 [V0.B16], (R12) // Store at offset 0 + + ADD $16, R12 + VST1 [V1.B16], (R12) // Store at offset 16 + + ADD $16, R12 + VST1 [V2.B16], (R12) // Store at offset 32 + + ADD $16, R12 + VST1 [V3.B16], (R12) // Store at offset 48 + + ADD $1, R10, R10 + CMP R9, R10 + BLT block_loop_neon + +done_neon: + RET + +// func _decodeByteStreamSplitWidth8NEON(data, out unsafe.Pointer, nValues, stride int) +// +// NEON implementation for width=8 following AVX2 algorithm structure: +// - Processes suffix FIRST, then vectorized blocks +// - Uses 128-bit NEON vectors (16 bytes per register) +// - Processes 16 float64 values per block (128 bytes total) +// - Uses 3-stage byte unpacking hierarchy (for 8 streams) +TEXT ·_decodeByteStreamSplitWidth8NEON(SB), NOSPLIT, $0-32 + MOVD data+0(FP), R0 // R0 = data pointer + MOVD out+8(FP), R1 // R1 = out pointer + MOVD nValues+16(FP), R2 // R2 = nValues + MOVD stride+24(FP), R3 // R3 = stride + + // Setup 8 stream pointers + MOVD R0, R4 // R4 = stream 0 + ADD R3, R0, R5 // R5 = stream 1 = data + stride + ADD R3, R5, R6 // R6 = stream 2 = data + 2*stride + ADD R3, R6, R7 // R7 = stream 3 = data + 3*stride + LSL $2, R3, R8 // R8 = 4*stride + ADD R0, R8, R9 // R9 = stream 4 = data + 4*stride + ADD R5, R8, R10 // R10 = stream 5 = data + 5*stride + ADD R6, R8, R11 // R11 = stream 6 = data + 6*stride + ADD R7, R8, R12 // R12 = stream 7 = data + 7*stride + + // Calculate num_blocks = nValues / 16 + LSR $4, R2, R13 // R13 = num_blocks (divide by 16) + + // Calculate num_processed_elements = num_blocks * 16 + LSL $4, R13, R14 // R14 = num_processed_elements + + // First handle suffix (elements beyond complete blocks) + MOVD R14, R15 // R15 = i = num_processed_elements + B suffix_check_w8_neon + +suffix_loop_w8_neon: + // Calculate output offset: i * 8 + LSL $3, R15, R16 // R16 = i * 8 + ADD R1, R16, R17 // R17 = out + (i * 8) + + // Load and store bytes from all 8 streams + MOVBU (R4)(R15), R19 + MOVB R19, (R17) + + MOVBU (R5)(R15), R19 + MOVB R19, 1(R17) + + MOVBU (R6)(R15), R19 + MOVB R19, 2(R17) + + MOVBU (R7)(R15), R19 + MOVB R19, 3(R17) + + MOVBU (R9)(R15), R19 + MOVB R19, 4(R17) + + MOVBU (R10)(R15), R19 + MOVB R19, 5(R17) + + MOVBU (R11)(R15), R19 + MOVB R19, 6(R17) + + MOVBU (R12)(R15), R19 + MOVB R19, 7(R17) + + ADD $1, R15, R15 + +suffix_check_w8_neon: + CMP R2, R15 + BLT suffix_loop_w8_neon + + // Check if we have blocks to process + CBZ R14, done_w8_neon // if num_processed_elements == 0, done + + // Process blocks with NEON + MOVD $0, R15 // R15 = block index i = 0 + LSR $4, R14, R14 // R14 = num_blocks + +block_loop_w8_neon: + // Calculate offset for this block: i * 16 + LSL $4, R15, R16 // R16 = i * 16 + + // Load 16 bytes from each stream - REVERSED for little-endian! + // V0 = stream 7 (MSB), V7 = stream 0 (LSB) + ADD R12, R16, R17 + VLD1 (R17), [V0.B16] // V0 = stream 7 (MSB in little-endian) + + ADD R11, R16, R17 + VLD1 (R17), [V1.B16] // V1 = stream 6 + + ADD R10, R16, R17 + VLD1 (R17), [V2.B16] // V2 = stream 5 + + ADD R9, R16, R17 + VLD1 (R17), [V3.B16] // V3 = stream 4 + + ADD R7, R16, R17 + VLD1 (R17), [V4.B16] // V4 = stream 3 + + ADD R6, R16, R17 + VLD1 (R17), [V5.B16] // V5 = stream 2 + + ADD R5, R16, R17 + VLD1 (R17), [V6.B16] // V6 = stream 1 + + ADD R4, R16, R17 + VLD1 (R17), [V7.B16] // V7 = stream 0 (LSB in little-endian) + + // Stage 1: First level of byte interleaving (pairs 0-4, 1-5, 2-6, 3-7) + VZIP1 V0.B16, V4.B16, V8.B16 // Interleave streams 3,7 low + VZIP2 V0.B16, V4.B16, V9.B16 // Interleave streams 3,7 high + VZIP1 V1.B16, V5.B16, V10.B16 // Interleave streams 2,6 low + VZIP2 V1.B16, V5.B16, V11.B16 // Interleave streams 2,6 high + VZIP1 V2.B16, V6.B16, V12.B16 // Interleave streams 1,5 low + VZIP2 V2.B16, V6.B16, V13.B16 // Interleave streams 1,5 high + VZIP1 V3.B16, V7.B16, V14.B16 // Interleave streams 0,4 low + VZIP2 V3.B16, V7.B16, V15.B16 // Interleave streams 0,4 high + + // Stage 2: Second level of byte interleaving + VZIP1 V8.B16, V12.B16, V0.B16 + VZIP2 V8.B16, V12.B16, V1.B16 + VZIP1 V9.B16, V13.B16, V2.B16 + VZIP2 V9.B16, V13.B16, V3.B16 + VZIP1 V10.B16, V14.B16, V4.B16 + VZIP2 V10.B16, V14.B16, V5.B16 + VZIP1 V11.B16, V15.B16, V6.B16 + VZIP2 V11.B16, V15.B16, V7.B16 + + // Stage 3: Third level of byte interleaving + VZIP1 V0.B16, V4.B16, V8.B16 + VZIP2 V0.B16, V4.B16, V9.B16 + VZIP1 V1.B16, V5.B16, V10.B16 + VZIP2 V1.B16, V5.B16, V11.B16 + VZIP1 V2.B16, V6.B16, V12.B16 + VZIP2 V2.B16, V6.B16, V13.B16 + VZIP1 V3.B16, V7.B16, V14.B16 + VZIP2 V3.B16, V7.B16, V15.B16 + + // Store results: out[(i * 8 + j) * 16] = result[j] + // Calculate output base: i * 128 + LSL $7, R15, R16 // R16 = i * 128 + + ADD R1, R16, R17 + VST1 [V8.B16], (R17) // Store at offset 0 + + ADD $16, R17 + VST1 [V9.B16], (R17) // Store at offset 16 + + ADD $16, R17 + VST1 [V10.B16], (R17) // Store at offset 32 + + ADD $16, R17 + VST1 [V11.B16], (R17) // Store at offset 48 + + ADD $16, R17 + VST1 [V12.B16], (R17) // Store at offset 64 + + ADD $16, R17 + VST1 [V13.B16], (R17) // Store at offset 80 + + ADD $16, R17 + VST1 [V14.B16], (R17) // Store at offset 96 + + ADD $16, R17 + VST1 [V15.B16], (R17) // Store at offset 112 + + ADD $1, R15, R15 + CMP R14, R15 + BLT block_loop_w8_neon + +done_w8_neon: + RET diff --git a/parquet/internal/encoding/byte_stream_split_decode_test.go b/parquet/internal/encoding/byte_stream_split_decode_test.go new file mode 100644 index 00000000..68436baf --- /dev/null +++ b/parquet/internal/encoding/byte_stream_split_decode_test.go @@ -0,0 +1,394 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "bytes" + "fmt" + "testing" + + "github.com/apache/arrow-go/v18/parquet" +) + +func TestDecodeByteStreamSplitWidth4(t *testing.T) { + const width = 4 + + // Test various sizes including edge cases + sizes := []int{1, 2, 7, 8, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 512, 1024} + + for _, nValues := range sizes { + stride := nValues + data := make([]byte, width*nValues) + + // Initialize with predictable pattern + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) // stream 0 + data[stride+i] = byte((i + 1) % 256) // stream 1 + data[2*stride+i] = byte((i + 2) % 256) // stream 2 + data[3*stride+i] = byte((i + 3) % 256) // stream 3 + } + + // Expected output: interleaved bytes + expected := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + expected[i*4] = byte(i % 256) + expected[i*4+1] = byte((i + 1) % 256) + expected[i*4+2] = byte((i + 2) % 256) + expected[i*4+3] = byte((i + 3) % 256) + } + + out := make([]byte, width*nValues) + t.Run(fmt.Sprintf("nValues=%d", nValues), func(t *testing.T) { + decodeByteStreamSplitBatchWidth4(data, nValues, stride, out) + if !bytes.Equal(out, expected) { + for i := 0; i < len(expected); i++ { + if out[i] != expected[i] { + t.Errorf("First mismatch at index %d: got %d, want %d", i, out[i], expected[i]) + break + } + } + } + }) + } +} + +func BenchmarkDecodeByteStreamSplitBatchWidth4(b *testing.B) { + const width = 4 + sizes := []int{8, 64, 512, 4096, 32768, 2097152, 16777216} + + for _, nValues := range sizes { + stride := nValues + data := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) + data[stride+i] = byte((i + 1) % 256) + data[2*stride+i] = byte((i + 2) % 256) + data[3*stride+i] = byte((i + 3) % 256) + } + out := make([]byte, width*nValues) + b.SetBytes(int64(width * nValues)) + + b.Run(fmt.Sprintf("nValues=%d", nValues), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + decodeByteStreamSplitBatchWidth4(data, nValues, stride, out) + } + }) + } +} + +func TestDecodeByteStreamSplitWidth8(t *testing.T) { + const width = 8 + + // Test various sizes including edge cases + sizes := []int{1, 2, 7, 8, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 512, 1024} + + for _, nValues := range sizes { + // Setup encoded data (byte stream split format) + stride := nValues + data := make([]byte, width*nValues) + + // Initialize with predictable pattern + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) // stream 0 + data[stride+i] = byte((i + 1) % 256) // stream 1 + data[2*stride+i] = byte((i + 2) % 256) // stream 2 + data[3*stride+i] = byte((i + 3) % 256) // stream 3 + data[4*stride+i] = byte((i + 4) % 256) // stream 4 + data[5*stride+i] = byte((i + 5) % 256) // stream 5 + data[6*stride+i] = byte((i + 6) % 256) // stream 6 + data[7*stride+i] = byte((i + 7) % 256) // stream 7 + } + + // Expected output: interleaved bytes + expected := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + expected[i*8] = byte(i % 256) + expected[i*8+1] = byte((i + 1) % 256) + expected[i*8+2] = byte((i + 2) % 256) + expected[i*8+3] = byte((i + 3) % 256) + expected[i*8+4] = byte((i + 4) % 256) + expected[i*8+5] = byte((i + 5) % 256) + expected[i*8+6] = byte((i + 6) % 256) + expected[i*8+7] = byte((i + 7) % 256) + } + + t.Run(fmt.Sprintf("nValues=%d", nValues), func(t *testing.T) { + out := make([]byte, width*nValues) + decodeByteStreamSplitBatchWidth8(data, nValues, stride, out) + if !bytes.Equal(out, expected) { + t.Errorf("Reference implementation produced incorrect output") + for i := 0; i < len(expected); i++ { + if out[i] != expected[i] { + t.Errorf("First mismatch at index %d: got %d, want %d", i, out[i], expected[i]) + break + } + } + } + }) + } +} + +func BenchmarkDecodeByteStreamSplitBatchWidth8(b *testing.B) { + const width = 8 + sizes := []int{8, 64, 512, 4096, 32768, 2097152, 16777216} + + for _, nValues := range sizes { + stride := nValues + data := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) + data[stride+i] = byte((i + 1) % 256) + data[2*stride+i] = byte((i + 2) % 256) + data[3*stride+i] = byte((i + 3) % 256) + data[4*stride+i] = byte((i + 4) % 256) + data[5*stride+i] = byte((i + 5) % 256) + data[6*stride+i] = byte((i + 6) % 256) + data[7*stride+i] = byte((i + 7) % 256) + } + out := make([]byte, width*nValues) + b.SetBytes(int64(width * nValues)) + + b.Run(fmt.Sprintf("nValues=%d", nValues), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + decodeByteStreamSplitBatchWidth8(data, nValues, stride, out) + } + }) + } +} + +func TestDecodeByteStreamSplitFLBAWidth2(t *testing.T) { + const width = 2 + + // Test various sizes including edge cases + sizes := []int{1, 2, 7, 8, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 512, 1024} + + for _, nValues := range sizes { + // Setup encoded data (byte stream split format) + stride := nValues + data := make([]byte, width*nValues) + + // Initialize with predictable pattern + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) // stream 0 + data[stride+i] = byte((i + 1) % 256) // stream 1 + } + + // Expected output: FixedLenByteArray slices with interleaved bytes + expected := make([]parquet.FixedLenByteArray, nValues) + for i := 0; i < nValues; i++ { + expected[i] = make(parquet.FixedLenByteArray, width) + expected[i][0] = byte(i % 256) + expected[i][1] = byte((i + 1) % 256) + } + + t.Run(fmt.Sprintf("nValues=%d", nValues), func(t *testing.T) { + out := make([]parquet.FixedLenByteArray, nValues) + for i := range out { + out[i] = make(parquet.FixedLenByteArray, width) + } + decodeByteStreamSplitBatchFLBAWidth2(data, nValues, stride, out) + for i := 0; i < nValues; i++ { + if !bytes.Equal(out[i], expected[i]) { + t.Errorf("Reference implementation mismatch at index %d: got %v, want %v", i, out[i], expected[i]) + break + } + } + }) + } +} + +func BenchmarkDecodeByteStreamSplitBatchFLBAWidth2(b *testing.B) { + const width = 2 + sizes := []int{8, 64, 512, 4096, 32768, 262144, 2097152, 16777216} + + for _, nValues := range sizes { + stride := nValues + data := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) + data[stride+i] = byte((i + 1) % 256) + } + out := make([]parquet.FixedLenByteArray, nValues) + for i := range out { + out[i] = make(parquet.FixedLenByteArray, width) + } + b.SetBytes(int64(width * nValues)) + + b.Run(fmt.Sprintf("nValues=%d", nValues), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + decodeByteStreamSplitBatchFLBAWidth2(data, nValues, stride, out) + } + }) + } +} + +func TestDecodeByteStreamSplitFLBAWidth4(t *testing.T) { + const width = 4 + // Test various sizes including edge cases and block boundaries + sizes := []int{1, 2, 7, 8, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 512, 1024} + + for _, nValues := range sizes { + // Setup encoded data (byte stream split format) + stride := nValues + data := make([]byte, width*nValues) + + // Initialize with predictable pattern + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) // stream 0 + data[stride+i] = byte((i + 1) % 256) // stream 1 + data[stride*2+i] = byte((i + 2) % 256) // stream 2 + data[stride*3+i] = byte((i + 3) % 256) // stream 3 + } + + // Expected output: FixedLenByteArray slices with interleaved bytes + expected := make([]parquet.FixedLenByteArray, nValues) + for i := 0; i < nValues; i++ { + expected[i] = make(parquet.FixedLenByteArray, width) + expected[i][0] = byte(i % 256) + expected[i][1] = byte((i + 1) % 256) + expected[i][2] = byte((i + 2) % 256) + expected[i][3] = byte((i + 3) % 256) + } + + t.Run(fmt.Sprintf("nValues=%d", nValues), func(t *testing.T) { + out := make([]parquet.FixedLenByteArray, nValues) + for i := range out { + out[i] = make(parquet.FixedLenByteArray, width) + } + decodeByteStreamSplitBatchFLBAWidth4(data, nValues, stride, out) + for i := 0; i < nValues; i++ { + if !bytes.Equal(out[i], expected[i]) { + t.Errorf("Reference implementation mismatch at index %d: got %v, want %v", i, out[i], expected[i]) + break + } + } + }) + } +} + +func BenchmarkDecodeByteStreamSplitBatchFLBAWidth4(b *testing.B) { + const width = 4 + sizes := []int{8, 64, 512, 4096, 32768, 262144, 2097152, 16777216} + + for _, nValues := range sizes { + stride := nValues + data := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) + data[stride+i] = byte((i + 1) % 256) + data[stride*2+i] = byte((i + 2) % 256) + data[stride*3+i] = byte((i + 3) % 256) + } + out := make([]parquet.FixedLenByteArray, nValues) + for i := range out { + out[i] = make(parquet.FixedLenByteArray, width) + } + b.SetBytes(int64(width * nValues)) + + b.Run(fmt.Sprintf("nValues=%d", nValues), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + decodeByteStreamSplitBatchFLBAWidth4(data, nValues, stride, out) + } + }) + } +} + +func TestDecodeByteStreamSplitFLBAWidth8(t *testing.T) { + const width = 8 + // Test various sizes including edge cases and block boundaries + sizes := []int{1, 2, 7, 8, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 512, 1024} + + for _, nValues := range sizes { + // Setup encoded data (byte stream split format) + stride := nValues + data := make([]byte, width*nValues) + // Initialize with predictable pattern + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) // stream 0 + data[stride+i] = byte((i + 1) % 256) // stream 1 + data[stride*2+i] = byte((i + 2) % 256) // stream 2 + data[stride*3+i] = byte((i + 3) % 256) // stream 3 + data[stride*4+i] = byte((i + 4) % 256) // stream 4 + data[stride*5+i] = byte((i + 5) % 256) // stream 5 + data[stride*6+i] = byte((i + 6) % 256) // stream 6 + data[stride*7+i] = byte((i + 7) % 256) // stream 7 + } + // Expected output: FixedLenByteArray slices with interleaved bytes + expected := make([]parquet.FixedLenByteArray, nValues) + for i := 0; i < nValues; i++ { + expected[i] = make(parquet.FixedLenByteArray, width) + expected[i][0] = byte(i % 256) + expected[i][1] = byte((i + 1) % 256) + expected[i][2] = byte((i + 2) % 256) + expected[i][3] = byte((i + 3) % 256) + expected[i][4] = byte((i + 4) % 256) + expected[i][5] = byte((i + 5) % 256) + expected[i][6] = byte((i + 6) % 256) + expected[i][7] = byte((i + 7) % 256) + } + + t.Run(fmt.Sprintf("nValues=%d", nValues), func(t *testing.T) { + out := make([]parquet.FixedLenByteArray, nValues) + for i := range out { + out[i] = make(parquet.FixedLenByteArray, width) + } + decodeByteStreamSplitBatchFLBAWidth8(data, nValues, stride, out) + for i := 0; i < nValues; i++ { + if !bytes.Equal(out[i], expected[i]) { + t.Errorf("Reference implementation mismatch at index %d: got %v, want %v", i, out[i], expected[i]) + break + } + } + }) + } +} + +func BenchmarkDecodeByteStreamSplitBatchFLBAWidth8(b *testing.B) { + const width = 8 + sizes := []int{8, 64, 512, 4096, 32768, 262144, 2097152, 16777216} + + for _, nValues := range sizes { + stride := nValues + data := make([]byte, width*nValues) + for i := 0; i < nValues; i++ { + data[i] = byte(i % 256) + data[stride+i] = byte((i + 1) % 256) + data[stride*2+i] = byte((i + 2) % 256) + data[stride*3+i] = byte((i + 3) % 256) + data[stride*4+i] = byte((i + 4) % 256) + data[stride*5+i] = byte((i + 5) % 256) + data[stride*6+i] = byte((i + 6) % 256) + data[stride*7+i] = byte((i + 7) % 256) + } + out := make([]parquet.FixedLenByteArray, nValues) + for i := range out { + out[i] = make(parquet.FixedLenByteArray, width) + } + b.SetBytes(int64(width * nValues)) + + b.Run(fmt.Sprintf("nValues=%d", nValues), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + decodeByteStreamSplitBatchFLBAWidth8(data, nValues, stride, out) + } + }) + } +}