diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java index e3ff545b2fcf..9f9c6cbfdfbd 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java @@ -181,6 +181,18 @@ public BulkResponse indexDocuments(String index, List reco // TODO: Can we do something like this? private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteConfig config) { if (!config.hasPrimaryKey()) { + // Check for _id field if no primary key is configured + // Try uppercase first (Snowflake default), then lowercase fallback + JsonPointer idPtr = JsonPointer.valueOf("/_ID"); + var idNode = doc.getData().at(idPtr); + if (idNode.isMissingNode()) { + idPtr = JsonPointer.valueOf("/_id"); + idNode = doc.getData().at(idPtr); + } + if (!idNode.isMissingNode() && idNode.isValueNode()) { + log.debug("using _id field value for document id"); + return idNode.asText(); + } return UUID.randomUUID().toString(); } var optFirst = config.getPrimaryKey().stream().findFirst(); @@ -196,7 +208,19 @@ private String extractPrimaryKey(AirbyteRecordMessage doc, ElasticsearchWriteCon return pkNode.asText(); } } - log.warn("unable to extract primary key"); + log.warn("unable to extract primary key, checking for _id field"); + // Check for _id field as fallback before random UUID + // Try uppercase first (Snowflake default), then lowercase fallback + JsonPointer idPtr = JsonPointer.valueOf("/_ID"); + var idNode = doc.getData().at(idPtr); + if (idNode.isMissingNode()) { + idPtr = JsonPointer.valueOf("/_id"); + idNode = doc.getData().at(idPtr); + } + if (!idNode.isMissingNode() && idNode.isValueNode()) { + log.debug("using _id field value for document id as fallback"); + return idNode.asText(); + } return UUID.randomUUID().toString(); }