Skip to content

Commit 63a290c

Browse files
authored
Add Arrow field mapping header for native load exports (#24)
1 parent 9824fd9 commit 63a290c

2 files changed

Lines changed: 168 additions & 16 deletions

File tree

services/cubejs/src/routes/loadExport.js

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ const ARROW_HEADERS = {
4040
"Content-Type": "application/vnd.apache.arrow.stream",
4141
"Content-Disposition": 'attachment; filename="query-result.arrow"',
4242
};
43+
const ARROW_FIELD_MAPPING_HEADER = "X-Synmetrix-Arrow-Field-Mapping";
44+
const ARROW_FIELD_MAPPING_ENCODING_HEADER =
45+
"X-Synmetrix-Arrow-Field-Mapping-Encoding";
46+
const ARROW_FIELD_MAPPING_ENCODING = "base64url-json";
4347
const CSV_STREAM_CHUNK_SIZE = 128 * 1024;
4448
const CLICKHOUSE_NULL_TOKEN_RE = /(?<=,|^)\\N(?=,|\r?\n|$)/g;
4549

@@ -167,15 +171,71 @@ function getAliasNameToMember(plan) {
167171
return plan?.streamingQuery?.aliasNameToMember || null;
168172
}
169173

170-
function canUseNativeArrowPassthrough(plan) {
174+
function getChangedAliasNameToMember(plan) {
171175
const aliasNameToMember = getAliasNameToMember(plan);
172-
if (!aliasNameToMember || Object.keys(aliasNameToMember).length === 0) {
173-
return true;
176+
if (!aliasNameToMember) return null;
177+
178+
const changedAliasNameToMember = Object.fromEntries(
179+
Object.entries(aliasNameToMember).filter(([alias, member]) => alias !== member)
180+
);
181+
182+
return Object.keys(changedAliasNameToMember).length > 0
183+
? changedAliasNameToMember
184+
: null;
185+
}
186+
187+
function getResponseHeader(res, name) {
188+
if (typeof res.getHeader === "function") {
189+
const value = res.getHeader(name);
190+
if (value != null) return value;
191+
}
192+
193+
if (typeof res.get === "function") {
194+
const value = res.get(name);
195+
if (value != null) return value;
174196
}
175197

176-
return Object.entries(aliasNameToMember).every(
177-
([alias, member]) => alias === member
198+
return res.headers?.[name] ?? res.headers?.[name.toLowerCase()];
199+
}
200+
201+
function appendResponseHeaderValues(res, name, values) {
202+
const existing = getResponseHeader(res, name);
203+
const currentValues = Array.isArray(existing)
204+
? existing.flatMap((value) => String(value).split(","))
205+
: typeof existing === "string"
206+
? existing.split(",")
207+
: [];
208+
const normalized = new Set(
209+
currentValues.map((value) => value.trim()).filter(Boolean).map((value) => value.toLowerCase())
178210
);
211+
const mergedValues = currentValues.map((value) => value.trim()).filter(Boolean);
212+
213+
for (const value of values) {
214+
const normalizedValue = value.toLowerCase();
215+
if (!normalized.has(normalizedValue)) {
216+
normalized.add(normalizedValue);
217+
mergedValues.push(value);
218+
}
219+
}
220+
221+
res.set(name, mergedValues.join(", "));
222+
}
223+
224+
function setNativeArrowFieldMappingHeaders(res, plan) {
225+
const aliasNameToMember = getChangedAliasNameToMember(plan);
226+
if (!aliasNameToMember) return;
227+
228+
const encodedMapping = Buffer.from(
229+
JSON.stringify(aliasNameToMember),
230+
"utf8"
231+
).toString("base64url");
232+
233+
res.set(ARROW_FIELD_MAPPING_HEADER, encodedMapping);
234+
res.set(ARROW_FIELD_MAPPING_ENCODING_HEADER, ARROW_FIELD_MAPPING_ENCODING);
235+
appendResponseHeaderValues(res, "Access-Control-Expose-Headers", [
236+
ARROW_FIELD_MAPPING_HEADER,
237+
ARROW_FIELD_MAPPING_ENCODING_HEADER,
238+
]);
179239
}
180240

181241
function rewriteNativeCsvHeader(line, aliasNameToMember) {
@@ -416,7 +476,6 @@ async function tryHandleLoadExport(req, res, cubejs, query, format) {
416476
if (
417477
format === "arrow"
418478
&& plan.capabilities.nativeArrowPassthrough
419-
&& canUseNativeArrowPassthrough(plan)
420479
&& isClickHouseContext(plan.context.securityContext)
421480
) {
422481
const nativeQuery = await prepareNativeClickHouseExport(plan);
@@ -425,6 +484,7 @@ async function tryHandleLoadExport(req, res, cubejs, query, format) {
425484
securityContext: plan.context.securityContext,
426485
});
427486
res.set(ARROW_HEADERS);
487+
setNativeArrowFieldMappingHeaders(res, plan);
428488
await executeNativeClickHouseArrow(
429489
res,
430490
nativeQuery.query,

services/cubejs/test/loadExport.test.js

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import { tableFromIPC } from "apache-arrow";
55

66
import { maybeHandleLoadExport } from "../src/routes/loadExport.js";
77

8+
function decodeArrowFieldMappingHeader(value) {
9+
return JSON.parse(Buffer.from(value, "base64url").toString("utf8"));
10+
}
11+
812
describe("maybeHandleLoadExport", () => {
913
it("streams CSV through the semantic export path when native passthrough is unavailable", async () => {
1014
const req = createRequest({
@@ -99,7 +103,7 @@ describe("maybeHandleLoadExport", () => {
99103
);
100104
});
101105

102-
it("streams Arrow through the semantic export path when native passthrough is unsafe", async () => {
106+
it("streams Arrow through the native ClickHouse path even when aliases are not semantic member names", async () => {
103107
const req = createRequest({
104108
format: "arrow",
105109
query: {
@@ -145,7 +149,7 @@ describe("maybeHandleLoadExport", () => {
145149
values: [],
146150
},
147151
driver: createClickHouseDriver({
148-
arrowChunks: [Buffer.from("native-arrow-should-not-be-used")],
152+
arrowChunks: [Buffer.from("native-arrow-stream-binary")],
149153
}),
150154
});
151155

@@ -157,15 +161,25 @@ describe("maybeHandleLoadExport", () => {
157161
res.headers["Content-Type"],
158162
"application/vnd.apache.arrow.stream"
159163
);
160-
161-
const parsed = tableFromIPC(res.binaryOutput);
162-
assert.deepEqual(parsed.toArray().map((row) => row.toJSON()), [
164+
assert.deepEqual(
165+
decodeArrowFieldMappingHeader(
166+
res.headers["X-Synmetrix-Arrow-Field-Mapping"]
167+
),
163168
{
164-
"Orders.city": "Reykjavik",
165-
"Orders.count": 2,
166-
"Orders.createdAt.day": Date.parse("2024-01-01T00:00:00.000Z"),
167-
},
168-
]);
169+
city_alias: "Orders.city",
170+
count_alias: "Orders.count",
171+
created_day_alias: "Orders.createdAt.day",
172+
}
173+
);
174+
assert.equal(
175+
res.headers["X-Synmetrix-Arrow-Field-Mapping-Encoding"],
176+
"base64url-json"
177+
);
178+
assert.match(
179+
res.headers["Access-Control-Expose-Headers"],
180+
/X-Synmetrix-Arrow-Field-Mapping/
181+
);
182+
assert.deepEqual(res.binaryOutput, Buffer.from("native-arrow-stream-binary"));
169183
});
170184

171185
it("streams Arrow through the native ClickHouse path when aliases are already semantic", async () => {
@@ -208,9 +222,87 @@ describe("maybeHandleLoadExport", () => {
208222
res.headers["Content-Type"],
209223
"application/vnd.apache.arrow.stream"
210224
);
225+
assert.equal(res.headers["X-Synmetrix-Arrow-Field-Mapping"], undefined);
211226
assert.deepEqual(res.binaryOutput, nativeArrowBuffer);
212227
});
213228

229+
it("streams Arrow through the semantic export path when native ClickHouse export is unavailable", async () => {
230+
const req = createRequest({
231+
format: "arrow",
232+
query: {
233+
dimensions: ["Orders.city"],
234+
measures: ["Orders.count"],
235+
timeDimensions: [{ dimension: "Orders.createdAt", granularity: "day" }],
236+
},
237+
});
238+
const res = new MockResponse();
239+
240+
const cubejs = createMockCube({
241+
dbType: "clickhouse",
242+
normalizedQuery: req.body.query,
243+
sqlQuery: {
244+
sql: ["SELECT city_alias, count_alias, created_day_alias FROM orders", []],
245+
aliasNameToMember: {
246+
city_alias: "Orders.city",
247+
count_alias: "Orders.count",
248+
created_day_alias: "Orders.createdAt.day",
249+
},
250+
},
251+
metaConfig: createMetaConfig({
252+
measures: [{ name: "Orders.count", type: "count" }],
253+
dimensions: [
254+
{ name: "Orders.city", type: "string" },
255+
{
256+
name: "Orders.createdAt",
257+
type: "time",
258+
granularities: [{ name: "day", title: "day", interval: "1 day" }],
259+
},
260+
{ name: "Orders.createdAt.day", type: "time" },
261+
],
262+
}),
263+
streamRows: [
264+
{
265+
"Orders.city": "Reykjavik",
266+
"Orders.count": "2",
267+
"Orders.createdAt.day": "2024-01-01T00:00:00.000Z",
268+
},
269+
],
270+
nativePreAggs: {
271+
preAggregationsTablesToTempTables: [
272+
[
273+
"orders_rollup",
274+
{
275+
lambdaTable: { name: "orders_lambda" },
276+
},
277+
],
278+
],
279+
values: [],
280+
},
281+
driver: createClickHouseDriver({
282+
arrowChunks: [Buffer.from("native-arrow-should-not-be-used")],
283+
}),
284+
});
285+
286+
await maybeHandleLoadExport(req, res, () => {
287+
throw new Error("next should not be called");
288+
}, cubejs);
289+
290+
assert.equal(
291+
res.headers["Content-Type"],
292+
"application/vnd.apache.arrow.stream"
293+
);
294+
assert.equal(res.headers["X-Synmetrix-Arrow-Field-Mapping"], undefined);
295+
296+
const parsed = tableFromIPC(res.binaryOutput);
297+
assert.deepEqual(parsed.toArray().map((row) => row.toJSON()), [
298+
{
299+
"Orders.city": "Reykjavik",
300+
"Orders.count": 2,
301+
"Orders.createdAt.day": Date.parse("2024-01-01T00:00:00.000Z"),
302+
},
303+
]);
304+
});
305+
214306
it("falls through to the buffered path when semantic streaming is unavailable", async () => {
215307
const req = createRequest({
216308
format: "csv",

0 commit comments

Comments
 (0)