From d29a815da780047f5f362497b12f233e17cd1124 Mon Sep 17 00:00:00 2001 From: Matthew Buckton Date: Mon, 9 Mar 2026 15:52:16 +1100 Subject: [PATCH 1/3] feat(schema): add support for importing and managing schema files from specified locations --- .../config/SchemaManagerConfig.java | 22 ++++++++ .../rest/schema/SchemaImportLocationDTO.java | 53 +++++++++++++++++++ .../rest/schema/SchemaManagerConfigDTO.java | 22 ++++++++ .../engine/schema/SchemaManager.java | 47 ++++++++++++++-- .../logging/ServerLogMessages.java | 9 +++- 5 files changed, 146 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java diff --git a/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java b/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java index b1b9b58bf..d22be8afe 100644 --- a/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java +++ b/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java @@ -28,6 +28,7 @@ import io.mapsmessaging.utilities.configuration.ConfigurationManager; import java.io.IOException; +import java.util.List; public class SchemaManagerConfig extends SchemaManagerConfigDTO implements Config, ConfigManager { @@ -41,6 +42,27 @@ public SchemaManagerConfig(ConfigurationProperties configurationProperties) { case "maps" -> super.setRepositoryConfig(configureMaps( (ConfigurationProperties) configurationProperties.get("config"))); default -> super.setRepositoryConfig(new SimpleRepositoryConfigDTO()); } + Object locations = configurationProperties.get("importLocations"); + if(locations instanceof List list){ + for(Object location : list){ + if(location instanceof ConfigurationProperties props){ + loadLocations(props); + } + } + } + else if(locations instanceof ConfigurationProperties props){ + loadLocations(props); + } + setProtocPath(configurationProperties.getProperty("protocPath", null)); + } + + private void loadLocations(ConfigurationProperties props){ + if(props.containsKey("path") && props.containsKey("format")){ + SchemaImportLocationDTO locationDTO = new SchemaImportLocationDTO(); + locationDTO.setPath(props.getProperty("path")); + locationDTO.setFormat(props.getProperty("format")); + super.getImportLocations().add(locationDTO); + } } @Override diff --git a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java new file mode 100644 index 000000000..b40634908 --- /dev/null +++ b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java @@ -0,0 +1,53 @@ +/* + * + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.dto.rest.schema; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Schema( + title = "Schema Import Location", + description = "Defines a directory path and the schema format to load from that directory") +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SchemaImportLocationDTO { + + @Schema( + description = "Directory path containing schema files", + requiredMode = Schema.RequiredMode.REQUIRED, + nullable = false, + example = "/opt/schema/protobuf" + ) + private String path; + + @Schema( + description = "Schema format contained in the directory", + requiredMode = Schema.RequiredMode.REQUIRED, + nullable = false, + allowableValues = { + "protobuf", + "json" + }, + example = "protobuf" + ) + private String format; +} \ No newline at end of file diff --git a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java index 57f0f534b..06d8b17be 100644 --- a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java +++ b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java @@ -29,6 +29,9 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import java.util.ArrayList; +import java.util.List; + @Schema( title = "Schema Manager config", description = "Configures the schema manager on where it can find and store schemas") @@ -57,6 +60,25 @@ public class SchemaManagerConfigDTO extends BaseManagerConfigDTO { ) private RepositoryConfigDTO repositoryConfig; + + @Schema( + description = "Optional path to the protoc compiler executable. " + + "If not supplied the system PATH will be used.", + requiredMode = Schema.RequiredMode.NOT_REQUIRED, + nullable = true, + example = "/usr/local/bin/protoc" + ) + private String protocPath; + + @Schema( + description = "List of directories that are scanned for schema source files such as protobuf or JSON schemas. " + + "These schemas are loaded and exposed as built-in or well-known schemas.", + requiredMode = Schema.RequiredMode.NOT_REQUIRED, + nullable = true + ) + private List importLocations = new ArrayList<>(); + + public SchemaManagerConfigDTO() { super("SchemaManagerConfigDTO"); } diff --git a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java index 15b656e1d..3bca466af 100644 --- a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java +++ b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java @@ -25,31 +25,35 @@ import io.mapsmessaging.dto.rest.schema.FileRepositoryConfigDTO; import io.mapsmessaging.dto.rest.schema.MapsRepositoryConfigDTO; import io.mapsmessaging.dto.rest.schema.RepositoryConfigDTO; +import io.mapsmessaging.dto.rest.schema.SchemaImportLocationDTO; import io.mapsmessaging.dto.rest.system.Status; import io.mapsmessaging.dto.rest.system.SubSystemStatusDTO; +import io.mapsmessaging.logging.Logger; +import io.mapsmessaging.logging.LoggerFactory; import io.mapsmessaging.schemas.config.SchemaConfig; import io.mapsmessaging.schemas.config.SchemaResource; -import io.mapsmessaging.schemas.config.impl.JsonSchemaConfig; -import io.mapsmessaging.schemas.config.impl.NativeSchemaConfig; +import io.mapsmessaging.schemas.config.impl.*; import io.mapsmessaging.schemas.config.impl.NativeSchemaConfig.TYPE; -import io.mapsmessaging.schemas.config.impl.RawSchemaConfig; -import io.mapsmessaging.schemas.config.impl.XmlSchemaConfig; import io.mapsmessaging.schemas.formatters.MessageFormatter; import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; import io.mapsmessaging.schemas.repository.SchemaRepository; import io.mapsmessaging.schemas.repository.impl.FileSchemaRepository; import io.mapsmessaging.schemas.repository.impl.RestSchemaRepository; import io.mapsmessaging.schemas.repository.impl.SimpleSchemaRepository; +import io.mapsmessaging.schemas.tools.protobuf.ProtobufSchemaGenerator; import io.mapsmessaging.utilities.Agent; import io.mapsmessaging.utilities.configuration.ConfigurationManager; import lombok.Getter; import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.*; import java.util.Map.Entry; -@SuppressWarnings("java:S6548") // yes it is a singleton +import static io.mapsmessaging.logging.ServerLogMessages.*; + +@SuppressWarnings("java:S6548") // yes, it is a singleton public class SchemaManager implements Agent { private static final String MONITOR = "monitor"; @@ -67,9 +71,12 @@ public static SchemaManager getInstance() { return Holder.INSTANCE; } + private final Logger logger = LoggerFactory.getLogger(getClass()); private final SchemaRepository repository; private final Map loadedFormatter; private final Map> pathMap; + private final List importLocations = new ArrayList<>(); + private final String protocPath; @Getter private long updateCount = 0; @@ -218,6 +225,7 @@ public String getDescription() { } public void start() { + logger.log(SCHEMA_MANAGER_STARTUP); SchemaConfig rawConfig = new RawSchemaConfig(); rawConfig.setUniqueId(DEFAULT_RAW_UUID); rawConfig.setVersion(1); @@ -264,6 +272,31 @@ public void start() { xmlSchemaConfig.setSchema(new JsonObject()); addSchema("$SYS", xmlSchemaConfig); + Path protoExec = protocPath == null? null: Path.of(protocPath); + + for(SchemaImportLocationDTO location: importLocations){ + if(location.getFormat().equalsIgnoreCase("json")){ + try { + JsonSchemaConfig jsonSchemaConfig1 = new JsonSchemaConfig(Path.of(location.getPath())); + addSchema(location.getPath(), jsonSchemaConfig1); + logger.log(SCHEMA_MANAGER_LOADED_CONFIG,jsonSchemaConfig1.getName(), jsonSchemaConfig1.getUniqueId(), jsonSchemaConfig1.getFormat()); + } catch (IOException e) { + logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); + } + } + else if(location.getFormat().equalsIgnoreCase("protobuf")){ + try { + List schemaList = ProtobufSchemaGenerator.loadSchemas(Path.of(location.getPath()), protoExec); + for(ProtoBufSchemaConfig schema: schemaList){ + addSchema(schema.getName(), schema); + logger.log(SCHEMA_MANAGER_LOADED_CONFIG,schema.getName(), schema.getUniqueId(), schema.getFormat()); + } + } catch (IOException e) { + logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); + } + } + } + // This ensures the factory is loaded MessageFormatterFactory.getInstance(); } @@ -288,9 +321,13 @@ private SchemaManager() { else{ buildTime = new SimpleSchemaRepository(); } + protocPath = config.getProtocPath(); repository = buildTime; loadedFormatter = new LinkedHashMap<>(); pathMap = new LinkedHashMap<>(); + if(config != null && config.getImportLocations() != null) { + importLocations.addAll(config.getImportLocations()); + } } @Override diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java index b687d9b09..cc8f5d348 100644 --- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java +++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java @@ -738,6 +738,13 @@ public enum ServerLogMessages implements LogMessage { COAP_BLOCK2_REQUEST(LEVEL.TRACE, SERVER_CATEGORY.PROTOCOL, "Block2 packet received, Message No: {}, Block Size: {}, has more:{}"), // + // + SCHEMA_MANAGER_STARTUP(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Schema Manager starting up"), + SCHEMA_MANAGER_LOADED_CONFIG(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Loaded configuration from file name:{} UUID:{} type:{}"), + SCHEMA_MANAGER_LOADED_CONFIG_FAILED(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Failed to loaded configuration from file {} of type:{}"), + // + + // DEVICE_SELECTOR_PARSER_EXCEPTION(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Selection {}, failed to parse with the following exception {}"), DEVICE_SCHEMA_UPDATED(LEVEL.WARN, SERVER_CATEGORY.DEVICE, "Device {} schema configuration updated"), @@ -748,8 +755,6 @@ public enum ServerLogMessages implements LogMessage { DEVICE_STOP(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Stopping device {}"), DEVICE_SCHEDULE_TASK_FAILED(LEVEL.WARN, SERVER_CATEGORY.DEVICE, "Scheduled task for device failed"), DEVICE_SCHEDULE_TASK_EXCCEDED_TIME(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Scheduled task exceeded time limit, took {}ms"), - - DEVICE_MANAGER_STARTUP(LEVEL.DEBUG, SERVER_CATEGORY.ENGINE, "Starting Device Manager"), DEVICE_MANAGER_STARTUP_FAILED(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Device Manager failed to start"), DEVICE_MANAGER_FAILED_TO_REGISTER(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Failed to register device"), From 17954517a53d883919fc2467bc92765a188a8b68 Mon Sep 17 00:00:00 2001 From: Matthew Buckton Date: Tue, 10 Mar 2026 11:02:43 +1100 Subject: [PATCH 2/3] fix(schema): ensure schema names are correctly loaded and logged instead of paths --- .../io/mapsmessaging/config/SchemaManagerConfig.java | 3 ++- .../dto/rest/schema/SchemaImportLocationDTO.java | 9 +++++++++ .../io/mapsmessaging/engine/schema/SchemaManager.java | 8 ++++---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java b/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java index d22be8afe..c96118827 100644 --- a/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java +++ b/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java @@ -57,10 +57,11 @@ else if(locations instanceof ConfigurationProperties props){ } private void loadLocations(ConfigurationProperties props){ - if(props.containsKey("path") && props.containsKey("format")){ + if(props.containsKey("path") && props.containsKey("format") && props.containsKey("name")){ SchemaImportLocationDTO locationDTO = new SchemaImportLocationDTO(); locationDTO.setPath(props.getProperty("path")); locationDTO.setFormat(props.getProperty("format")); + locationDTO.setName(props.getProperty("name")); super.getImportLocations().add(locationDTO); } } diff --git a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java index b40634908..d7facb415 100644 --- a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java +++ b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java @@ -31,6 +31,15 @@ @AllArgsConstructor public class SchemaImportLocationDTO { + @Schema( + description = "Name to apply to the schema when loaded", + requiredMode = Schema.RequiredMode.REQUIRED, + nullable = false, + example = "protobuf_schema_1" + ) + private String name; + + @Schema( description = "Directory path containing schema files", requiredMode = Schema.RequiredMode.REQUIRED, diff --git a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java index 3bca466af..598319337 100644 --- a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java +++ b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java @@ -278,8 +278,8 @@ public void start() { if(location.getFormat().equalsIgnoreCase("json")){ try { JsonSchemaConfig jsonSchemaConfig1 = new JsonSchemaConfig(Path.of(location.getPath())); - addSchema(location.getPath(), jsonSchemaConfig1); - logger.log(SCHEMA_MANAGER_LOADED_CONFIG,jsonSchemaConfig1.getName(), jsonSchemaConfig1.getUniqueId(), jsonSchemaConfig1.getFormat()); + addSchema(location.getName(), jsonSchemaConfig1); + logger.log(SCHEMA_MANAGER_LOADED_CONFIG,location.getName(), jsonSchemaConfig1.getUniqueId(), jsonSchemaConfig1.getFormat()); } catch (IOException e) { logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); } @@ -288,8 +288,8 @@ else if(location.getFormat().equalsIgnoreCase("protobuf")){ try { List schemaList = ProtobufSchemaGenerator.loadSchemas(Path.of(location.getPath()), protoExec); for(ProtoBufSchemaConfig schema: schemaList){ - addSchema(schema.getName(), schema); - logger.log(SCHEMA_MANAGER_LOADED_CONFIG,schema.getName(), schema.getUniqueId(), schema.getFormat()); + addSchema(location.getName(), schema); + logger.log(SCHEMA_MANAGER_LOADED_CONFIG,location.getName(), schema.getUniqueId(), schema.getFormat()); } } catch (IOException e) { logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); From 6e5bbae6bc52c22b663f0254450d78a8fb26e287 Mon Sep 17 00:00:00 2001 From: Matthew Buckton Date: Wed, 11 Mar 2026 16:01:24 +1100 Subject: [PATCH 3/3] refactor(server): expand schema bundles and resolve formatters via SchemaManager Update the server to support bundle-based schemas using the schema library. On schema load the server now: - detects protobuf descriptor bundles and JSON schemas with $defs - extracts child schemas from the bundle - registers parent and child schemas in the schema repository - links children to the parent bundle - resolves message formatters through SchemaManager using SchemaResolver This allows topics to bind directly to protobuf message types or JSON $defs entries while the formatter resolves the parent bundle schema at runtime. --- .../analytics/impl/StatisticalAnalyser.java | 7 +- .../io/mapsmessaging/api/message/Filter.java | 11 ++- .../io/mapsmessaging/api/message/Message.java | 3 +- .../transformers/JsonQueryTransformation.java | 2 +- .../schema/MessageSchemaToJsonBuilder.java | 2 +- .../engine/schema/SchemaManager.java | 75 +++++++++++++------ .../logging/ServerLogMessages.java | 3 +- .../protocol/impl/n2k/N2kProtocol.java | 2 +- .../cloudevent/pack/PackHelper.java | 2 +- .../messaging/impl/RestMessageListener.java | 2 +- .../JsonQueryTransformationTest.java | 3 +- 11 files changed, 75 insertions(+), 37 deletions(-) diff --git a/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java b/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java index f5d690880..d811376f7 100644 --- a/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java +++ b/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java @@ -31,6 +31,7 @@ import io.mapsmessaging.selector.IdentifierResolver; import org.jetbrains.annotations.NotNull; +import java.io.IOException; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -73,7 +74,11 @@ public Analyser create(@NotNull StatisticsConfigDTO config) { public Message ingest(@NotNull Message event) { if(formatter == null) { String schemaId = event.getSchemaId(); - formatter = SchemaManager.getInstance().getMessageFormatter(schemaId); + try { + formatter = SchemaManager.getInstance().getMessageFormatter(schemaId); + } catch (IOException e) { + // log + } } if(formatter != null) { IdentifierResolver resolver = formatter.parse(event.getOpaqueData()); diff --git a/src/main/java/io/mapsmessaging/api/message/Filter.java b/src/main/java/io/mapsmessaging/api/message/Filter.java index 5f27d5948..e121f2360 100644 --- a/src/main/java/io/mapsmessaging/api/message/Filter.java +++ b/src/main/java/io/mapsmessaging/api/message/Filter.java @@ -27,6 +27,7 @@ import io.mapsmessaging.selector.IdentifierResolver; import io.mapsmessaging.selector.operators.ParserExecutor; +import java.io.IOException; import java.util.List; @SuppressWarnings("java:S6548") // yes it is a singleton @@ -75,9 +76,13 @@ public static IdentifierResolver getTopicResolver(String topicName, Message mess } public static IdentifierResolver getResolver(String lookup, Message message) { - MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(lookup); - if (formatter != null) { - return formatter.parse(message.getOpaqueData()); + try { + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(lookup); + if (formatter != null) { + return formatter.parse(message.getOpaqueData()); + } + } catch (IOException e) { + // log } return null; } diff --git a/src/main/java/io/mapsmessaging/api/message/Message.java b/src/main/java/io/mapsmessaging/api/message/Message.java index 5ec6a25f3..5303da131 100644 --- a/src/main/java/io/mapsmessaging/api/message/Message.java +++ b/src/main/java/io/mapsmessaging/api/message/Message.java @@ -28,7 +28,6 @@ import io.mapsmessaging.location.LocationManager; import io.mapsmessaging.schemas.config.SchemaConfig; import io.mapsmessaging.schemas.formatters.MessageFormatter; -import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; import io.mapsmessaging.schemas.formatters.ParsedObject; import io.mapsmessaging.selector.IdentifierResolver; import io.mapsmessaging.storage.Storable; @@ -360,7 +359,7 @@ public Object get(String key) { SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId); if(config != null){ try { - MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config); + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config); parsedObject = formatter.parse(getOpaqueData()); } catch (IOException e) { parsedObject = null; diff --git a/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java b/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java index 9908fc279..f53663ebd 100644 --- a/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java +++ b/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java @@ -134,7 +134,7 @@ private MessageFormatter locateMessageFormatter(String source, ParsedMessage mes if (schemaId != null) { SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId); try { - messageFormatter = MessageFormatterFactory.getInstance().getFormatter(config); + messageFormatter = SchemaManager.getInstance().getMessageFormatter(config); if (messageFormatter != null) { schemaMap.put(source, messageFormatter); } diff --git a/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java b/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java index 67a3886d4..a887af814 100644 --- a/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java +++ b/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java @@ -36,7 +36,7 @@ public Message parse(Message message, String destinationName) throws Exception { if(schemaId != null && !destinationName.startsWith("$")) { SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId); if(config != null) { - MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config); + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config); if (formatter != null && !(formatter instanceof RawFormatter)) { byte[] data = pack(message, config, formatter); MessageBuilder messageBuilder = new MessageBuilder(); diff --git a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java index 598319337..e960e85f9 100644 --- a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java +++ b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java @@ -37,6 +37,7 @@ import io.mapsmessaging.schemas.formatters.MessageFormatter; import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; import io.mapsmessaging.schemas.repository.SchemaRepository; +import io.mapsmessaging.schemas.repository.SchemaResolver; import io.mapsmessaging.schemas.repository.impl.FileSchemaRepository; import io.mapsmessaging.schemas.repository.impl.RestSchemaRepository; import io.mapsmessaging.schemas.repository.impl.SimpleSchemaRepository; @@ -54,7 +55,7 @@ import static io.mapsmessaging.logging.ServerLogMessages.*; @SuppressWarnings("java:S6548") // yes, it is a singleton -public class SchemaManager implements Agent { +public class SchemaManager implements Agent, SchemaResolver { private static final String MONITOR = "monitor"; public static final UUID DEFAULT_RAW_UUID = UUID.fromString("10000000-0000-1000-a000-100000000000"); @@ -76,6 +77,7 @@ public static SchemaManager getInstance() { private final Map loadedFormatter; private final Map> pathMap; private final List importLocations = new ArrayList<>(); + private final Map preLoadedSchemas = new HashMap<>(); private final String protocPath; @Getter @@ -92,14 +94,6 @@ public SchemaConfig addSchema(String path, SchemaConfig schemaConfig) { } List list = pathMap.computeIfAbsent(path, k -> new ArrayList<>()); list.add(schemaConfig); - try { - if (!loadedFormatter.containsKey(schemaConfig.getUniqueId())) { - MessageFormatter messageFormatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig); - loadedFormatter.put(schemaConfig.getUniqueId(), messageFormatter); - } - } catch (Exception e) { - // Unable to load the formatter - } updateCount++; return resource.getDefaultVersion(); } @@ -119,18 +113,17 @@ public SchemaConfig getDefaultSchemaByName(String name){ }; } - public MessageFormatter getMessageFormatter(String uniqueId) { + public MessageFormatter getMessageFormatter(SchemaConfig config) throws IOException{ + return getMessageFormatter(config.getUniqueId()); + } + + public MessageFormatter getMessageFormatter(String uniqueId) throws IOException { MessageFormatter formatter = loadedFormatter.get(uniqueId); if(formatter == null){ SchemaConfig schemaConfig = getSchema(uniqueId); if(schemaConfig != null) { - try { - formatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig); - loadedFormatter.put(schemaConfig.getUniqueId(), formatter); - } catch (IOException e) { - // log this - } - + formatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig, this); + loadedFormatter.put(schemaConfig.getUniqueId(), formatter); } } return formatter; @@ -140,7 +133,6 @@ public List getMessageFormats() { return MessageFormatterFactory.getInstance().getFormatters(); } - public synchronized SchemaConfig getSchema(UUID uniqueId) { return getSchema(uniqueId.toString()); } @@ -153,6 +145,11 @@ public synchronized SchemaConfig getSchema(String uniqueId) { return null; } + + public synchronized SchemaConfig getSchemaByName(String name) { + return preLoadedSchemas.get(name); + } + public synchronized SchemaConfig locateSchema(String destinationName) { SchemaConfig config = SchemaLocationHelper.locateSchema(repository.getAllSchemas(), destinationName); if(config != null){ @@ -279,6 +276,8 @@ public void start() { try { JsonSchemaConfig jsonSchemaConfig1 = new JsonSchemaConfig(Path.of(location.getPath())); addSchema(location.getName(), jsonSchemaConfig1); + loadBundle(jsonSchemaConfig1); + preLoadedSchemas.put(location.getName(), jsonSchemaConfig1); logger.log(SCHEMA_MANAGER_LOADED_CONFIG,location.getName(), jsonSchemaConfig1.getUniqueId(), jsonSchemaConfig1.getFormat()); } catch (IOException e) { logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); @@ -289,6 +288,8 @@ else if(location.getFormat().equalsIgnoreCase("protobuf")){ List schemaList = ProtobufSchemaGenerator.loadSchemas(Path.of(location.getPath()), protoExec); for(ProtoBufSchemaConfig schema: schemaList){ addSchema(location.getName(), schema); + loadBundle(schema); + preLoadedSchemas.put(location.getName(), schema); logger.log(SCHEMA_MANAGER_LOADED_CONFIG,location.getName(), schema.getUniqueId(), schema.getFormat()); } } catch (IOException e) { @@ -296,12 +297,34 @@ else if(location.getFormat().equalsIgnoreCase("protobuf")){ } } } - // This ensures the factory is loaded MessageFormatterFactory.getInstance(); } + @Override + public SchemaConfig resolveParent(SchemaConfig schemaConfig) { + SchemaResource resource = repository.getResource(schemaConfig.getUniqueId()); + if(resource != null){ + return resource.getDefaultVersion(); + } + return null; + } + + + private void loadBundle(SchemaConfig schema){ + if(schema.isBundle()){ + for (SchemaConfig config : schema.getBundledSchemas()) { + logger.log(SCHEMA_MANAGER_LOADED_BUNDLED, config.getName(), schema.getName(), schema.getUniqueId(), schema.getFormat()); + repository.addVersion(config.getUniqueId(), config); + } + } + } + + private SchemaManager() { + loadedFormatter = new LinkedHashMap<>(); + pathMap = new LinkedHashMap<>(); + SchemaManagerConfig config; try { config = ConfigurationManager.getInstance().getConfiguration(SchemaManagerConfig.class); @@ -321,12 +344,16 @@ private SchemaManager() { else{ buildTime = new SimpleSchemaRepository(); } - protocPath = config.getProtocPath(); repository = buildTime; - loadedFormatter = new LinkedHashMap<>(); - pathMap = new LinkedHashMap<>(); - if(config != null && config.getImportLocations() != null) { - importLocations.addAll(config.getImportLocations()); + + if(config != null){ + protocPath = config.getProtocPath(); + if(config.getImportLocations() != null) { + importLocations.addAll(config.getImportLocations()); + } + } + else{ + protocPath = null; } } diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java index cc8f5d348..3cae27b34 100644 --- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java +++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java @@ -740,7 +740,8 @@ public enum ServerLogMessages implements LogMessage { // SCHEMA_MANAGER_STARTUP(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Schema Manager starting up"), - SCHEMA_MANAGER_LOADED_CONFIG(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Loaded configuration from file name:{} UUID:{} type:{}"), + SCHEMA_MANAGER_LOADED_CONFIG(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Loaded configuration from file name:{} UUID:{} type:{}"), + SCHEMA_MANAGER_LOADED_BUNDLED(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Loaded bundled schema {} from name:{} UUID:{} type:{} "), SCHEMA_MANAGER_LOADED_CONFIG_FAILED(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Failed to loaded configuration from file {} of type:{}"), // diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java b/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java index 1a0bb2248..4747fe593 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java @@ -96,7 +96,7 @@ else if(encodedDatabase != null && !encodedDatabase.isEmpty()) { rawTopicTemplate = ((N2KConfigDTO)protocolConfig).getUnknownPacketTopic().replace("{candevice}",endPoint.getName()); parseToJson =((N2KConfigDTO)protocolConfig).isParseToJson(); String inboundTopicName =((N2KConfigDTO)protocolConfig).getInboundTopicName(); - formatter = (CanbusFormatter) MessageFormatterFactory.getInstance().getFormatter(canbusSchema); + formatter = (CanbusFormatter) SchemaManager.getInstance().getMessageFormatter(canbusSchema); try { session = buildSession(endPoint.getName(), 10000); } catch (ExecutionException | TimeoutException e) { diff --git a/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java b/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java index 766da211f..497d23baf 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java +++ b/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java @@ -76,7 +76,7 @@ protected PackHelper(@NotNull Gson gson) { MessageFormatter formatter = null; if (schemaConfig != null) { try { - formatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig); + formatter = SchemaManager.getInstance().getMessageFormatter(schemaConfig); } catch (Exception ignore) { // if we can not parse to json, we fall through and base64 it } diff --git a/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java b/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java index 566435a43..d093bd197 100644 --- a/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java +++ b/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java @@ -236,7 +236,7 @@ private MessageDTO convert(MessageDTO messageDTO, MessageEvent message) { if(message.getMessage().getSchemaId() != null) { SchemaConfig config = SchemaManager.getInstance().getSchema(message.getMessage().getSchemaId()); try { - MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config); + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config); if (formatter != null && !(formatter instanceof RawFormatter)) { JsonObject jsonObject = formatter.parseToJson(payload); JsonObject wrapper = new JsonObject(); diff --git a/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java b/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java index 949e7d4df..933bd2269 100644 --- a/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java +++ b/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java @@ -20,6 +20,7 @@ package io.mapsmessaging.api.transformers; import io.mapsmessaging.configuration.ConfigurationProperties; +import io.mapsmessaging.engine.schema.SchemaManager; import io.mapsmessaging.engine.transformers.TransformerManager; import io.mapsmessaging.schemas.formatters.MessageFormatter; import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; @@ -56,7 +57,7 @@ protected InterServerTransformation createTransformer() { private void setMessageFormatter(InterServerTransformation transformation) { try { MessageFormatter messageFormatter = - MessageFormatterFactory.getInstance().getFormatter(AbstractInterServerTransformationTest.JSON_SCHEMA_CONFIG); + SchemaManager.getInstance().getMessageFormatter(AbstractInterServerTransformationTest.JSON_SCHEMA_CONFIG); ((JsonQueryTransformation) transformation).schemaMap.put(AbstractInterServerTransformationTest.SOURCE, messageFormatter); } catch (IOException e) { throw new RuntimeException(e);