Skip to content
86 changes: 0 additions & 86 deletions parquet/internal/encoding/byte_stream_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,92 +88,6 @@ func encodeByteStreamSplitWidth8(data []byte, in []byte) {
}
}

// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) {
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
out[width*element] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
}
}

// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) {
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
out[width*element] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
out[width*element+4] = data[4*stride+element]
out[width*element+5] = data[5*stride+element]
out[width*element+6] = data[6*stride+element]
out[width*element+7] = data[7*stride+element]
}
}

// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for stream := 0; stream < width; stream++ {
for element := 0; element < nValues; element++ {
encLoc := stride*stream + element
out[element][stream] = data[encLoc]
}
}
}

// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for element := 0; element < nValues; element++ {
out[element][0] = data[element]
out[element][1] = data[stride+element]
}
}

// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for element := 0; element < nValues; element++ {
out[element][0] = data[element]
out[element][1] = data[stride+element]
out[element][2] = data[stride*2+element]
out[element][3] = data[stride*3+element]
}
}

// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for element := 0; element < nValues; element++ {
out[element][0] = data[element]
out[element][1] = data[stride+element]
out[element][2] = data[stride*2+element]
out[element][3] = data[stride*3+element]
out[element][4] = data[stride*4+element]
out[element][5] = data[stride*5+element]
out[element][6] = data[stride*6+element]
out[element][7] = data[stride*7+element]
}
}

func releaseBufferToPool(pooled *PooledBufferWriter) {
buf := pooled.buf
memory.Set(buf.Buf(), 0)
Expand Down
60 changes: 60 additions & 0 deletions parquet/internal/encoding/byte_stream_split_amd64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !noasm

package encoding

import (
"fmt"
"unsafe"

"github.com/apache/arrow-go/v18/parquet/internal/debug"
"golang.org/x/sys/cpu"
)

func init() {
if cpu.X86.HasAVX2 {
decodeByteStreamSplitBatchWidth4 = decodeByteStreamSplitBatchWidth4AVX2
decodeByteStreamSplitBatchWidth8 = decodeByteStreamSplitBatchWidth8AVX2
}
}

//go:noescape
func _decodeByteStreamSplitWidth4AVX2(data, out unsafe.Pointer, nValues, stride int)

//go:noescape
func _decodeByteStreamSplitWidth8AVX2(data, out unsafe.Pointer, nValues, stride int)

func decodeByteStreamSplitBatchWidth4AVX2(data []byte, nValues, stride int, out []byte) {
if nValues == 0 {
return
}
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues))
_decodeByteStreamSplitWidth4AVX2(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride)
}

func decodeByteStreamSplitBatchWidth8AVX2(data []byte, nValues, stride int, out []byte) {
if nValues == 0 {
return
}
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues))
_decodeByteStreamSplitWidth8AVX2(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride)
}
60 changes: 60 additions & 0 deletions parquet/internal/encoding/byte_stream_split_arm64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !noasm

package encoding

import (
"fmt"
"unsafe"

"github.com/apache/arrow-go/v18/parquet/internal/debug"
"golang.org/x/sys/cpu"
)

func init() {
if cpu.ARM64.HasASIMD {
decodeByteStreamSplitBatchWidth4 = decodeByteStreamSplitBatchWidth4NEON
decodeByteStreamSplitBatchWidth8 = decodeByteStreamSplitBatchWidth8NEON
}
}

//go:noescape
func _decodeByteStreamSplitWidth4NEON(data, out unsafe.Pointer, nValues, stride int)

//go:noescape
func _decodeByteStreamSplitWidth8NEON(data, out unsafe.Pointer, nValues, stride int)

func decodeByteStreamSplitBatchWidth4NEON(data []byte, nValues, stride int, out []byte) {
if nValues == 0 {
return
}
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues))
_decodeByteStreamSplitWidth4NEON(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride)
}

func decodeByteStreamSplitBatchWidth8NEON(data []byte, nValues, stride int, out []byte) {
if nValues == 0 {
return
}
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues))
_decodeByteStreamSplitWidth8NEON(unsafe.Pointer(&data[0]), unsafe.Pointer(&out[0]), nValues, stride)
}
140 changes: 140 additions & 0 deletions parquet/internal/encoding/byte_stream_split_decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoding

import (
"fmt"
"unsafe"

"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/debug"
)

var (
decodeByteStreamSplitBatchWidth4 func(data []byte, nValues, stride int, out []byte) = decodeByteStreamSplitBatchWidth4Default
decodeByteStreamSplitBatchWidth8 func(data []byte, nValues, stride int, out []byte) = decodeByteStreamSplitBatchWidth8Default
)

// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth4Default(data []byte, nValues, stride int, out []byte) {
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
// the beginning of the data slice can be truncated, but for valid encoding we need at least (width-1)*stride+nValues bytes
debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues))
s0 := data[:nValues]
s1 := data[stride : stride+nValues]
s2 := data[2*stride : 2*stride+nValues]
s3 := data[3*stride : 3*stride+nValues]
out = out[:width*nValues]
out32 := unsafe.Slice((*uint32)(unsafe.Pointer(&out[0])), nValues)
for i := range nValues {
out32[i] = uint32(s0[i]) | uint32(s1[i])<<8 | uint32(s2[i])<<16 | uint32(s3[i])<<24
}
}

// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth8Default(data []byte, nValues, stride int, out []byte) {
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues))
s0 := data[:nValues]
s1 := data[stride : stride+nValues]
s2 := data[2*stride : 2*stride+nValues]
s3 := data[3*stride : 3*stride+nValues]
s4 := data[4*stride : 4*stride+nValues]
s5 := data[5*stride : 5*stride+nValues]
s6 := data[6*stride : 6*stride+nValues]
s7 := data[7*stride : 7*stride+nValues]
out = out[:width*nValues]
out64 := unsafe.Slice((*uint64)(unsafe.Pointer(&out[0])), nValues)
for i := range nValues {
out64[i] = uint64(s0[i]) | uint64(s1[i])<<8 | uint64(s2[i])<<16 | uint64(s3[i])<<24 |
uint64(s4[i])<<32 | uint64(s5[i])<<40 | uint64(s6[i])<<48 | uint64(s7[i])<<56
}
}

// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
debug.Assert(len(data) >= (width-1)*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), (width-1)*stride+nValues))
for stream := 0; stream < width; stream++ {
for element := 0; element < nValues; element++ {
encLoc := stride*stream + element
out[element][stream] = data[encLoc]
}
}
}

// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
const width = 2
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
debug.Assert(len(data) >= stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), stride+nValues))
s0 := data[:nValues]
s1 := data[stride : stride+nValues]
for i := range nValues {
out16 := (*uint16)(unsafe.Pointer(&out[i][0]))
*out16 = uint16(s0[i]) | uint16(s1[i])<<8
}
}

// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
const width = 4
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
debug.Assert(len(data) >= 3*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 3*stride+nValues))
s0 := data[:nValues]
s1 := data[stride : stride+nValues]
s2 := data[stride*2 : stride*2+nValues]
s3 := data[stride*3 : stride*3+nValues]
for i := range nValues {
out32 := (*uint32)(unsafe.Pointer(&out[i][0]))
*out32 = uint32(s0[i]) | uint32(s1[i])<<8 | uint32(s2[i])<<16 | uint32(s3[i])<<24
}
}

// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
const width = 8
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
debug.Assert(len(data) >= 7*stride+nValues, fmt.Sprintf("not enough data for decoding, data: %d bytes, expected at least: %d bytes", len(data), 7*stride+nValues))
s0 := data[:nValues]
s1 := data[stride : stride+nValues]
s2 := data[stride*2 : stride*2+nValues]
s3 := data[stride*3 : stride*3+nValues]
s4 := data[stride*4 : stride*4+nValues]
s5 := data[stride*5 : stride*5+nValues]
s6 := data[stride*6 : stride*6+nValues]
s7 := data[stride*7 : stride*7+nValues]
for i := range nValues {
out64 := (*uint64)(unsafe.Pointer(&out[i][0]))
*out64 = uint64(s0[i]) | uint64(s1[i])<<8 | uint64(s2[i])<<16 | uint64(s3[i])<<24 |
uint64(s4[i])<<32 | uint64(s5[i])<<40 | uint64(s6[i])<<48 | uint64(s7[i])<<56
}
}
Loading