From d14f87f19180542562704c9a21943b2ed7ea4fae Mon Sep 17 00:00:00 2001 From: Sergey Zmeyev Date: Wed, 16 Jul 2025 04:42:56 -0700 Subject: [PATCH 1/2] elasticsearch destination add custom _id fallback for deduplication --- .../elasticsearch/ElasticsearchConnection.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java index e3ff545b2fcf..1f2a1740721c 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java @@ -181,6 +181,13 @@ public BulkResponse indexDocuments(String index, List reco // TODO: Can we do something like this? private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteConfig config) { if (!config.hasPrimaryKey()) { + // Check for _id field if no primary key is configured + JsonPointer idPtr = JsonPointer.valueOf("/_id"); + var idNode = doc.getData().at(idPtr); + if (!idNode.isMissingNode() && idNode.isValueNode()) { + log.debug("using _id field value for document id"); + return idNode.asText(); + } return UUID.randomUUID().toString(); } var optFirst = config.getPrimaryKey().stream().findFirst(); @@ -196,7 +203,14 @@ private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteCon return pkNode.asText(); } } - log.warn("unable to extract primary key"); + log.warn("unable to extract primary key, checking for _id field"); + // Check for _id field as fallback before random UUID + JsonPointer idPtr = JsonPointer.valueOf("/_id"); + var idNode = doc.getData().at(idPtr); + if (!idNode.isMissingNode() && idNode.isValueNode()) { + log.debug("using _id field value for document id as fallback"); + return idNode.asText(); + } return UUID.randomUUID().toString(); } From 17c39b8f28181e066ae31b4e2ce8166956e8df63 Mon Sep 17 00:00:00 2001 From: Sergey Zmeyev Date: Wed, 16 Jul 2025 05:04:39 -0700 Subject: [PATCH 2/2] elasticsearch destination handle _id casing --- .../elasticsearch/ElasticsearchConnection.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java index 1f2a1740721c..9f9c6cbfdfbd 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java @@ -182,8 +182,13 @@ public BulkResponse indexDocuments(String index, List reco private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteConfig config) { if (!config.hasPrimaryKey()) { // Check for _id field if no primary key is configured - JsonPointer idPtr = JsonPointer.valueOf("/_id"); + // Try uppercase first (Snowflake default), then lowercase fallback + JsonPointer idPtr = JsonPointer.valueOf("/_ID"); var idNode = doc.getData().at(idPtr); + if (idNode.isMissingNode()) { + idPtr = JsonPointer.valueOf("/_id"); + idNode = doc.getData().at(idPtr); + } if (!idNode.isMissingNode() && idNode.isValueNode()) { log.debug("using _id field value for document id"); return idNode.asText(); @@ -205,8 +210,13 @@ private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteCon } log.warn("unable to extract primary key, checking for _id field"); // Check for _id field as fallback before random UUID - JsonPointer idPtr = JsonPointer.valueOf("/_id"); + // Try uppercase first (Snowflake default), then lowercase fallback + JsonPointer idPtr = JsonPointer.valueOf("/_ID"); var idNode = doc.getData().at(idPtr); + if (idNode.isMissingNode()) { + idPtr = JsonPointer.valueOf("/_id"); + idNode = doc.getData().at(idPtr); + } if (!idNode.isMissingNode() && idNode.isValueNode()) { log.debug("using _id field value for document id as fallback"); return idNode.asText();