This repository was archived by the owner on Apr 23, 2026. It is now read-only.
forked from Signal65/elasticsearch-Copilot
-
Notifications
You must be signed in to change notification settings - Fork 1
Add mappings for enrich fields (#96056) #10
Open
MitchLewis930
wants to merge
1
commit into
pr_020_before
Choose a base branch
from
pr_020_after
base: pr_020_before
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 96056 | ||
| summary: Add mappings for enrich fields | ||
| area: Ingest Node | ||
| type: enhancement | ||
| issues: [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,15 +39,15 @@ | |
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.bytes.BytesArray; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.CheckedFunction; | ||
| import org.elasticsearch.common.util.Maps; | ||
| import org.elasticsearch.common.util.iterable.Iterables; | ||
| import org.elasticsearch.index.mapper.MapperService; | ||
| import org.elasticsearch.index.query.QueryBuilders; | ||
| import org.elasticsearch.index.reindex.BulkByScrollResponse; | ||
| import org.elasticsearch.index.reindex.ReindexRequest; | ||
| import org.elasticsearch.index.reindex.ScrollableHitSource; | ||
| import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
| import org.elasticsearch.tasks.TaskCancelledException; | ||
| import org.elasticsearch.xcontent.ObjectPath; | ||
| import org.elasticsearch.xcontent.XContentBuilder; | ||
| import org.elasticsearch.xcontent.XContentType; | ||
| import org.elasticsearch.xcontent.json.JsonXContent; | ||
|
|
@@ -57,6 +57,7 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
@@ -180,9 +181,9 @@ static void validateMappings( | |
| } | ||
| // Validate the key and values | ||
| try { | ||
| validateField(mapping, policy.getMatchField(), true); | ||
| validateAndGetMappingTypeAndFormat(mapping, policy.getMatchField(), true); | ||
| for (String valueFieldName : policy.getEnrichFields()) { | ||
| validateField(mapping, valueFieldName, false); | ||
| validateAndGetMappingTypeAndFormat(mapping, valueFieldName, false); | ||
| } | ||
| } catch (ElasticsearchException e) { | ||
| throw new ElasticsearchException( | ||
|
|
@@ -194,11 +195,64 @@ static void validateMappings( | |
| } | ||
| } | ||
|
|
||
| private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) { | ||
| private record MappingTypeAndFormat(String type, String format) { | ||
|
|
||
| } | ||
|
|
||
| private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat( | ||
| String fieldName, | ||
| EnrichPolicy policy, | ||
| boolean strictlyRequired, | ||
| List<Map<String, Object>> sourceMappings | ||
| ) { | ||
| var fieldMappings = sourceMappings.stream() | ||
| .map(mapping -> validateAndGetMappingTypeAndFormat(mapping, fieldName, strictlyRequired)) | ||
| .filter(Objects::nonNull) | ||
| .toList(); | ||
| Set<String> types = fieldMappings.stream().map(tf -> tf.type).collect(Collectors.toSet()); | ||
| if (types.size() > 1) { | ||
| if (strictlyRequired) { | ||
| throw new ElasticsearchException( | ||
| "Multiple distinct mapping types for field '{}' - indices({}) types({})", | ||
| fieldName, | ||
| Strings.collectionToCommaDelimitedString(policy.getIndices()), | ||
| Strings.collectionToCommaDelimitedString(types) | ||
| ); | ||
| } | ||
| return null; | ||
| } | ||
| if (types.isEmpty()) { | ||
| return null; | ||
| } | ||
| Set<String> formats = fieldMappings.stream().map(tf -> tf.format).filter(Objects::nonNull).collect(Collectors.toSet()); | ||
| if (formats.size() > 1) { | ||
| if (strictlyRequired) { | ||
| throw new ElasticsearchException( | ||
| "Multiple distinct formats specified for field '{}' - indices({}) format entries({})", | ||
| policy.getMatchField(), | ||
| Strings.collectionToCommaDelimitedString(policy.getIndices()), | ||
| Strings.collectionToCommaDelimitedString(formats) | ||
| ); | ||
| } | ||
| return null; | ||
| } | ||
| return new MappingTypeAndFormat(Iterables.get(types, 0), formats.isEmpty() ? null : Iterables.get(formats, 0)); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private static <T> T extractValues(Map<String, Object> properties, String path) { | ||
| return (T) properties.get(path); | ||
| } | ||
|
|
||
| private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat( | ||
| Map<String, Object> properties, | ||
| String fieldName, | ||
| boolean fieldRequired | ||
| ) { | ||
| assert Strings.isEmpty(fieldName) == false : "Field name cannot be null or empty"; | ||
| String[] fieldParts = fieldName.split("\\."); | ||
| StringBuilder parent = new StringBuilder(); | ||
| Map<?, ?> currentField = properties; | ||
| Map<String, Object> currentField = properties; | ||
| boolean onRoot = true; | ||
| for (String fieldPart : fieldParts) { | ||
| // Ensure that the current field is of object type only (not a nested type or a non compound field) | ||
|
|
@@ -211,7 +265,7 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea | |
| type | ||
| ); | ||
| } | ||
| Map<?, ?> currentProperties = ((Map<?, ?>) currentField.get("properties")); | ||
| Map<String, Object> currentProperties = extractValues(currentField, "properties"); | ||
| if (currentProperties == null) { | ||
| if (fieldRequired) { | ||
| throw new ElasticsearchException( | ||
|
|
@@ -220,10 +274,10 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea | |
| onRoot ? "root" : parent.toString() | ||
| ); | ||
| } else { | ||
| return; | ||
| return null; | ||
| } | ||
| } | ||
| currentField = ((Map<?, ?>) currentProperties.get(fieldPart)); | ||
| currentField = extractValues(currentProperties, fieldPart); | ||
| if (currentField == null) { | ||
| if (fieldRequired) { | ||
| throw new ElasticsearchException( | ||
|
|
@@ -233,7 +287,7 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea | |
| onRoot ? "root" : parent.toString() | ||
| ); | ||
| } else { | ||
| return; | ||
| return null; | ||
| } | ||
| } | ||
| if (onRoot) { | ||
|
|
@@ -243,95 +297,70 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea | |
| } | ||
| parent.append(fieldPart); | ||
| } | ||
| } | ||
|
|
||
| private XContentBuilder resolveEnrichMapping(final EnrichPolicy enrichPolicy, final List<Map<String, Object>> mappings) { | ||
| if (EnrichPolicy.MATCH_TYPE.equals(enrichPolicy.getType())) { | ||
| return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false)); | ||
| } else if (EnrichPolicy.RANGE_TYPE.equals(enrichPolicy.getType())) { | ||
| return createRangeEnrichMappingBuilder(enrichPolicy, mappings); | ||
| } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(enrichPolicy.getType())) { | ||
| return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape")); | ||
| } else { | ||
| throw new ElasticsearchException("Unrecognized enrich policy type [{}]", enrichPolicy.getType()); | ||
| if (currentField == null) { | ||
| return null; | ||
| } | ||
| final String type = (String) currentField.getOrDefault("type", "object"); | ||
| final String format = (String) currentField.get("format"); | ||
| return new MappingTypeAndFormat(type, format); | ||
| } | ||
|
|
||
| private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy enrichPolicy, List<Map<String, Object>> mappings) { | ||
| String matchFieldPath = "properties." + enrichPolicy.getMatchField().replace(".", ".properties."); | ||
| List<Map<String, String>> matchFieldMappings = mappings.stream() | ||
| .map(map -> ObjectPath.<Map<String, String>>eval(matchFieldPath, map)) | ||
| .filter(Objects::nonNull) | ||
| .toList(); | ||
|
|
||
| Set<String> types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet()); | ||
| if (types.size() == 1) { | ||
| String type = types.iterator().next(); | ||
| if (type == null) { | ||
| // when no type is defined in a field mapping then it is of type object: | ||
| throw new ElasticsearchException( | ||
| "Field '{}' has type [object] which doesn't appear to be a range type", | ||
| enrichPolicy.getMatchField(), | ||
| type | ||
| ); | ||
| } | ||
| static final Set<String> RANGE_TYPES = Set.of("integer_range", "float_range", "long_range", "double_range", "ip_range", "date_range"); | ||
|
|
||
| switch (type) { | ||
| case "integer_range": | ||
| case "float_range": | ||
| case "long_range": | ||
| case "double_range": | ||
| case "ip_range": | ||
| return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); | ||
|
|
||
| // date_range types mappings allow for the format to be specified, should be preserved in the created index | ||
| case "date_range": | ||
| Set<String> formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet()); | ||
| if (formatEntries.size() == 1) { | ||
| return createEnrichMappingBuilder((builder) -> { | ||
| builder.field("type", type).field("doc_values", false); | ||
| String format = formatEntries.iterator().next(); | ||
| if (format != null) { | ||
| builder.field("format", format); | ||
| } | ||
| return builder; | ||
| }); | ||
| } | ||
| if (formatEntries.isEmpty()) { | ||
| // no format specify rely on default | ||
| return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unreachable null check is dead codeLow Severity The |
||
| } | ||
| throw new ElasticsearchException( | ||
| "Multiple distinct date format specified for match field '{}' - indices({}) format entries({})", | ||
| enrichPolicy.getMatchField(), | ||
| Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()), | ||
| (formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries) | ||
| ); | ||
|
|
||
| default: | ||
| static Map<String, Object> mappingForMatchField(EnrichPolicy policy, List<Map<String, Object>> sourceMappings) { | ||
| MappingTypeAndFormat typeAndFormat = validateAndGetMappingTypeAndFormat(policy.getMatchField(), policy, true, sourceMappings); | ||
| if (typeAndFormat == null) { | ||
| throw new ElasticsearchException( | ||
| "Match field '{}' doesn't have a correct mapping type for policy type '{}'", | ||
| policy.getMatchField(), | ||
| policy.getType() | ||
| ); | ||
| } | ||
| return switch (policy.getType()) { | ||
| case EnrichPolicy.MATCH_TYPE -> Map.of("type", "keyword", "doc_values", false); | ||
| case EnrichPolicy.GEO_MATCH_TYPE -> Map.of("type", "geo_shape"); | ||
| case EnrichPolicy.RANGE_TYPE -> { | ||
| if (RANGE_TYPES.contains(typeAndFormat.type) == false) { | ||
| throw new ElasticsearchException( | ||
| "Field '{}' has type [{}] which doesn't appear to be a range type", | ||
| enrichPolicy.getMatchField(), | ||
| type | ||
| policy.getMatchField(), | ||
| typeAndFormat.type | ||
| ); | ||
| } | ||
| Map<String, Object> mapping = Maps.newMapWithExpectedSize(3); | ||
| mapping.put("type", typeAndFormat.type); | ||
| mapping.put("doc_values", false); | ||
| if (typeAndFormat.format != null) { | ||
| mapping.put("format", typeAndFormat.format); | ||
| } | ||
| yield mapping; | ||
| } | ||
| } | ||
| if (types.isEmpty()) { | ||
| throw new ElasticsearchException( | ||
| "No mapping type found for match field '{}' - indices({})", | ||
| enrichPolicy.getMatchField(), | ||
| Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()) | ||
| ); | ||
| } | ||
| throw new ElasticsearchException( | ||
| "Multiple distinct mapping types for match field '{}' - indices({}) types({})", | ||
| enrichPolicy.getMatchField(), | ||
| Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()), | ||
| Strings.collectionToCommaDelimitedString(types) | ||
| ); | ||
| default -> throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); | ||
| }; | ||
| } | ||
|
|
||
| private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping) { | ||
| private XContentBuilder createEnrichMapping(List<Map<String, Object>> sourceMappings) { | ||
| Map<String, Map<String, Object>> fieldMappings = new HashMap<>(); | ||
| Map<String, Object> mappingForMatchField = mappingForMatchField(policy, sourceMappings); | ||
| for (String enrichField : policy.getEnrichFields()) { | ||
| if (enrichField.equals(policy.getMatchField())) { | ||
| mappingForMatchField = new HashMap<>(mappingForMatchField); | ||
| mappingForMatchField.remove("doc_values"); // enable doc_values | ||
| } else { | ||
| var typeAndFormat = validateAndGetMappingTypeAndFormat(enrichField, policy, false, sourceMappings); | ||
| if (typeAndFormat != null) { | ||
| Map<String, Object> mapping = Maps.newMapWithExpectedSize(3); | ||
| mapping.put("type", typeAndFormat.type); | ||
| if (typeAndFormat.format != null) { | ||
| mapping.put("format", typeAndFormat.format); | ||
| } | ||
| mapping.put("index", false); // disable index | ||
| fieldMappings.put(enrichField, mapping); | ||
| } | ||
| } | ||
| } | ||
| fieldMappings.put(policy.getMatchField(), mappingForMatchField); | ||
|
|
||
| // Enable _source on enrich index. Explicitly mark key mapping type. | ||
| try { | ||
| XContentBuilder builder = JsonXContent.contentBuilder(); | ||
|
|
@@ -347,9 +376,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuild | |
| builder.endObject(); | ||
| builder.startObject("properties"); | ||
| { | ||
| builder.startObject(policy.getMatchField()); | ||
| matchFieldMapping.apply(builder); | ||
| builder.endObject(); | ||
| builder.mapContents(fieldMappings); | ||
| } | ||
| builder.endObject(); | ||
| builder.startObject("_meta"); | ||
|
|
@@ -380,7 +407,7 @@ private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) { | |
| .put("index.warmer.enabled", false) | ||
| .build(); | ||
| CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings); | ||
| createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings)); | ||
| createEnrichIndexRequest.mapping(createEnrichMapping(mappings)); | ||
| logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName); | ||
| enrichOriginClient().admin() | ||
| .indices() | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong field name used in format error message
Low Severity
The error message for multiple distinct formats uses
policy.getMatchField()instead offieldName. The method takesfieldNameas a parameter and correctly uses it in the similar error for multiple types at line 217. Using the wrong variable causes the exception message to report an incorrect field name when a non-match field has conflicting formats.