Skip to content

Feature Request: Expose Local Iterator in Spark Connect Go Client (For Streaming Rows) #150

@caldempsey

Description

@caldempsey

The current Spark Connect Go client implementation fetches all DataFrame rows at once using the Collect() method, which limits its practicality for handling very large datasets (GBs/TBs). In the Scala Spark Connect client, a toLocalIterator() method allows incremental streaming of rows, which is advantageous for efficiently processing large result sets by external clients or services. In my case, I have an API gateway which polls Spark Connect allowing me to distribute the results of my OLAP queries as a stream with fairly lightweight instance sizes (when multiple GB of rows are returned). Super useful!

Originally I wanted to develop this gateway using the Spark Connect Go client but found it doesn't expose the ToLocalIterator() method within the Spark Connect Scala DataFrame interface, returning an iterator that streams rows incrementally as they land in Arrow. Quite explicitly in client/sql/dataframe.go:

func (df *dataFrameImpl) Collect() ([]Row, error) {
	responseClient, err := df.sparkSession.executePlan(df.createPlan())
	if err != nil {
		return nil, fmt.Errorf("failed to execute plan: %w", err)
	}

	var schema *StructType
	var allRows []Row

	for {
		response, err := responseClient.Recv()
		if err != nil {
			if errors.Is(err, io.EOF) {
				return allRows, nil
			} else {
				return nil, fmt.Errorf("failed to receive plan execution response: %w", err)
			}
		}

		dataType := response.GetSchema()
		if dataType != nil {
			schema = convertProtoDataTypeToStructType(dataType)
			continue
		}

		arrowBatch := response.GetArrowBatch()
		if arrowBatch == nil {
			continue
		}

		rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
		if err != nil {
			return nil, err
		}

		if allRows == nil {
			allRows = make([]Row, 0, len(rowBatch))
		}
		allRows = append(allRows, rowBatch...)
	}

	return allRows, nil
}

The parts are already there for us to define a clear ToLocalIterator() implementation., I'd probably avoid using channels so call-site can determine themselves whether they want the behaviour to be asynchronous and implement something like this that tries to gracefully handle the Arrow resources OOTB:

package sql

import (
    "errors"
    "fmt"
    "io"
)

// RowIterator streams rows batch‑by‑batch, keeping only the
// current batch in memory (i guess what Spark’s Scala
// `toLocalIterator()` does under the hood).
//
//   • At any time we hold one `[]Row` converted via the existing
//     `readArrowBatchData` helper.
//   • When the slice is exhausted we fetch the next Arrow batch
//     and replace it. Whatever row we do return is denoted by the cursor of the current []Row.
type RowIterator interface {
    HasNext() bool
    Next() (Row, error)
    Close() error
}

type rowIteratorImpl struct {
    responseClient proto.SparkConnectService_ExecutePlanClient
    schema         *StructType

    batch   []Row // current batch
    rowIdx  int   // index inside current batch
    exhausted bool
}

func (it *rowIteratorImpl) HasNext() bool {
    if it.exhausted {
        return false
    }
    if it.rowIdx < len(it.batch) {
        return true
    }
    // current batch consumed -> try to pull next batch
    return it.fetchNextBatch() == nil && len(it.batch) > 0
}

func (it *rowIteratorImpl) Next() (Row, error) {
    if !it.HasNext() {
        return nil, io.EOF
    }
    r := it.batch[it.rowIdx]
    it.rowIdx++
    return r, nil
}

func (it *rowIteratorImpl) Close() error {
    return it.responseClient.CloseSend()
}

// fetchNextBatch reads Arrow messages until we convert a non‑empty batch.
func (it *rowIteratorImpl) fetchNextBatch() error {
    it.batch = nil
    it.rowIdx = 0

    for {
        resp, err := it.responseClient.Recv()
        if err != nil {
            if errors.Is(err, io.EOF) {
                it.exhausted = true
                return io.EOF
            }
            return fmt.Errorf("receive execute‑plan response: %w", err)
        }

        // Schema message comes first (once)
        if sch := resp.GetSchema(); sch != nil && it.schema == nil {
            it.schema = convertProtoDataTypeToStructType(sch)
            continue
        }

        // Arrow batch
        if batch := resp.GetArrowBatch(); batch != nil {
            rows, err := readArrowBatchData(batch.Data, it.schema)
            if err != nil {
                return err
            }
            // Skip empty batches just in case
            if len(rows) == 0 {
                continue
            }
            it.batch = rows
            return nil
        }
    }
}

// ToLocalIterator returns a streaming RowIterator for the DataFrame.
func (df *dataFrameImpl) ToLocalIterator() (RowIterator, error) {
    client, err := df.sparkSession.executePlan(df.createPlan())
    if err != nil {
        return nil, fmt.Errorf("execute plan: %w", err)
    }
    return &rowIteratorImpl{responseClient: client}, nil
}

Let me know what you think, I'm happy to PR this and provide a productionised edition if we can agree on the semantics of the interface. Also I'm not sold on the ToLocalIterator naming convention, because it obfuscates the idea that we strictly avoid pulling all the data into memory before enumerating it, which Arrow helps with a great deal, I'd prefer ToStreamingIterator. I'd like to help the Spark Connect client for Go move out of alpha - it works pretty well as is.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions