diff --git a/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java b/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java index f5d690880..d811376f7 100644 --- a/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java +++ b/src/main/java/io/mapsmessaging/analytics/impl/StatisticalAnalyser.java @@ -31,6 +31,7 @@ import io.mapsmessaging.selector.IdentifierResolver; import org.jetbrains.annotations.NotNull; +import java.io.IOException; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -73,7 +74,11 @@ public Analyser create(@NotNull StatisticsConfigDTO config) { public Message ingest(@NotNull Message event) { if(formatter == null) { String schemaId = event.getSchemaId(); - formatter = SchemaManager.getInstance().getMessageFormatter(schemaId); + try { + formatter = SchemaManager.getInstance().getMessageFormatter(schemaId); + } catch (IOException e) { + // log + } } if(formatter != null) { IdentifierResolver resolver = formatter.parse(event.getOpaqueData()); diff --git a/src/main/java/io/mapsmessaging/api/message/Filter.java b/src/main/java/io/mapsmessaging/api/message/Filter.java index 5f27d5948..e121f2360 100644 --- a/src/main/java/io/mapsmessaging/api/message/Filter.java +++ b/src/main/java/io/mapsmessaging/api/message/Filter.java @@ -27,6 +27,7 @@ import io.mapsmessaging.selector.IdentifierResolver; import io.mapsmessaging.selector.operators.ParserExecutor; +import java.io.IOException; import java.util.List; @SuppressWarnings("java:S6548") // yes it is a singleton @@ -75,9 +76,13 @@ public static IdentifierResolver getTopicResolver(String topicName, Message mess } public static IdentifierResolver getResolver(String lookup, Message message) { - MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(lookup); - if (formatter != null) { - return formatter.parse(message.getOpaqueData()); + try { + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(lookup); + if (formatter != null) { + return formatter.parse(message.getOpaqueData()); + } + } catch (IOException e) { + // log } return null; } diff --git a/src/main/java/io/mapsmessaging/api/message/Message.java b/src/main/java/io/mapsmessaging/api/message/Message.java index 5ec6a25f3..5303da131 100644 --- a/src/main/java/io/mapsmessaging/api/message/Message.java +++ b/src/main/java/io/mapsmessaging/api/message/Message.java @@ -28,7 +28,6 @@ import io.mapsmessaging.location.LocationManager; import io.mapsmessaging.schemas.config.SchemaConfig; import io.mapsmessaging.schemas.formatters.MessageFormatter; -import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; import io.mapsmessaging.schemas.formatters.ParsedObject; import io.mapsmessaging.selector.IdentifierResolver; import io.mapsmessaging.storage.Storable; @@ -360,7 +359,7 @@ public Object get(String key) { SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId); if(config != null){ try { - MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config); + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config); parsedObject = formatter.parse(getOpaqueData()); } catch (IOException e) { parsedObject = null; diff --git a/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java b/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java index 9908fc279..f53663ebd 100644 --- a/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java +++ b/src/main/java/io/mapsmessaging/api/transformers/JsonQueryTransformation.java @@ -134,7 +134,7 @@ private MessageFormatter locateMessageFormatter(String source, ParsedMessage mes if (schemaId != null) { SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId); try { - messageFormatter = MessageFormatterFactory.getInstance().getFormatter(config); + messageFormatter = SchemaManager.getInstance().getMessageFormatter(config); if (messageFormatter != null) { schemaMap.put(source, messageFormatter); } diff --git a/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java b/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java index b1b9b58bf..c96118827 100644 --- a/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java +++ b/src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java @@ -28,6 +28,7 @@ import io.mapsmessaging.utilities.configuration.ConfigurationManager; import java.io.IOException; +import java.util.List; public class SchemaManagerConfig extends SchemaManagerConfigDTO implements Config, ConfigManager { @@ -41,6 +42,28 @@ public SchemaManagerConfig(ConfigurationProperties configurationProperties) { case "maps" -> super.setRepositoryConfig(configureMaps( (ConfigurationProperties) configurationProperties.get("config"))); default -> super.setRepositoryConfig(new SimpleRepositoryConfigDTO()); } + Object locations = configurationProperties.get("importLocations"); + if(locations instanceof List list){ + for(Object location : list){ + if(location instanceof ConfigurationProperties props){ + loadLocations(props); + } + } + } + else if(locations instanceof ConfigurationProperties props){ + loadLocations(props); + } + setProtocPath(configurationProperties.getProperty("protocPath", null)); + } + + private void loadLocations(ConfigurationProperties props){ + if(props.containsKey("path") && props.containsKey("format") && props.containsKey("name")){ + SchemaImportLocationDTO locationDTO = new SchemaImportLocationDTO(); + locationDTO.setPath(props.getProperty("path")); + locationDTO.setFormat(props.getProperty("format")); + locationDTO.setName(props.getProperty("name")); + super.getImportLocations().add(locationDTO); + } } @Override diff --git a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java new file mode 100644 index 000000000..d7facb415 --- /dev/null +++ b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaImportLocationDTO.java @@ -0,0 +1,62 @@ +/* + * + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.dto.rest.schema; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Schema( + title = "Schema Import Location", + description = "Defines a directory path and the schema format to load from that directory") +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SchemaImportLocationDTO { + + @Schema( + description = "Name to apply to the schema when loaded", + requiredMode = Schema.RequiredMode.REQUIRED, + nullable = false, + example = "protobuf_schema_1" + ) + private String name; + + + @Schema( + description = "Directory path containing schema files", + requiredMode = Schema.RequiredMode.REQUIRED, + nullable = false, + example = "/opt/schema/protobuf" + ) + private String path; + + @Schema( + description = "Schema format contained in the directory", + requiredMode = Schema.RequiredMode.REQUIRED, + nullable = false, + allowableValues = { + "protobuf", + "json" + }, + example = "protobuf" + ) + private String format; +} \ No newline at end of file diff --git a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java index 57f0f534b..06d8b17be 100644 --- a/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java +++ b/src/main/java/io/mapsmessaging/dto/rest/schema/SchemaManagerConfigDTO.java @@ -29,6 +29,9 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import java.util.ArrayList; +import java.util.List; + @Schema( title = "Schema Manager config", description = "Configures the schema manager on where it can find and store schemas") @@ -57,6 +60,25 @@ public class SchemaManagerConfigDTO extends BaseManagerConfigDTO { ) private RepositoryConfigDTO repositoryConfig; + + @Schema( + description = "Optional path to the protoc compiler executable. " + + "If not supplied the system PATH will be used.", + requiredMode = Schema.RequiredMode.NOT_REQUIRED, + nullable = true, + example = "/usr/local/bin/protoc" + ) + private String protocPath; + + @Schema( + description = "List of directories that are scanned for schema source files such as protobuf or JSON schemas. " + + "These schemas are loaded and exposed as built-in or well-known schemas.", + requiredMode = Schema.RequiredMode.NOT_REQUIRED, + nullable = true + ) + private List importLocations = new ArrayList<>(); + + public SchemaManagerConfigDTO() { super("SchemaManagerConfigDTO"); } diff --git a/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java b/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java index 67a3886d4..a887af814 100644 --- a/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java +++ b/src/main/java/io/mapsmessaging/engine/schema/MessageSchemaToJsonBuilder.java @@ -36,7 +36,7 @@ public Message parse(Message message, String destinationName) throws Exception { if(schemaId != null && !destinationName.startsWith("$")) { SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId); if(config != null) { - MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config); + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config); if (formatter != null && !(formatter instanceof RawFormatter)) { byte[] data = pack(message, config, formatter); MessageBuilder messageBuilder = new MessageBuilder(); diff --git a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java index 15b656e1d..e960e85f9 100644 --- a/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java +++ b/src/main/java/io/mapsmessaging/engine/schema/SchemaManager.java @@ -25,32 +25,37 @@ import io.mapsmessaging.dto.rest.schema.FileRepositoryConfigDTO; import io.mapsmessaging.dto.rest.schema.MapsRepositoryConfigDTO; import io.mapsmessaging.dto.rest.schema.RepositoryConfigDTO; +import io.mapsmessaging.dto.rest.schema.SchemaImportLocationDTO; import io.mapsmessaging.dto.rest.system.Status; import io.mapsmessaging.dto.rest.system.SubSystemStatusDTO; +import io.mapsmessaging.logging.Logger; +import io.mapsmessaging.logging.LoggerFactory; import io.mapsmessaging.schemas.config.SchemaConfig; import io.mapsmessaging.schemas.config.SchemaResource; -import io.mapsmessaging.schemas.config.impl.JsonSchemaConfig; -import io.mapsmessaging.schemas.config.impl.NativeSchemaConfig; +import io.mapsmessaging.schemas.config.impl.*; import io.mapsmessaging.schemas.config.impl.NativeSchemaConfig.TYPE; -import io.mapsmessaging.schemas.config.impl.RawSchemaConfig; -import io.mapsmessaging.schemas.config.impl.XmlSchemaConfig; import io.mapsmessaging.schemas.formatters.MessageFormatter; import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; import io.mapsmessaging.schemas.repository.SchemaRepository; +import io.mapsmessaging.schemas.repository.SchemaResolver; import io.mapsmessaging.schemas.repository.impl.FileSchemaRepository; import io.mapsmessaging.schemas.repository.impl.RestSchemaRepository; import io.mapsmessaging.schemas.repository.impl.SimpleSchemaRepository; +import io.mapsmessaging.schemas.tools.protobuf.ProtobufSchemaGenerator; import io.mapsmessaging.utilities.Agent; import io.mapsmessaging.utilities.configuration.ConfigurationManager; import lombok.Getter; import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.*; import java.util.Map.Entry; -@SuppressWarnings("java:S6548") // yes it is a singleton -public class SchemaManager implements Agent { +import static io.mapsmessaging.logging.ServerLogMessages.*; + +@SuppressWarnings("java:S6548") // yes, it is a singleton +public class SchemaManager implements Agent, SchemaResolver { private static final String MONITOR = "monitor"; public static final UUID DEFAULT_RAW_UUID = UUID.fromString("10000000-0000-1000-a000-100000000000"); @@ -67,9 +72,13 @@ public static SchemaManager getInstance() { return Holder.INSTANCE; } + private final Logger logger = LoggerFactory.getLogger(getClass()); private final SchemaRepository repository; private final Map loadedFormatter; private final Map> pathMap; + private final List importLocations = new ArrayList<>(); + private final Map preLoadedSchemas = new HashMap<>(); + private final String protocPath; @Getter private long updateCount = 0; @@ -85,14 +94,6 @@ public SchemaConfig addSchema(String path, SchemaConfig schemaConfig) { } List list = pathMap.computeIfAbsent(path, k -> new ArrayList<>()); list.add(schemaConfig); - try { - if (!loadedFormatter.containsKey(schemaConfig.getUniqueId())) { - MessageFormatter messageFormatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig); - loadedFormatter.put(schemaConfig.getUniqueId(), messageFormatter); - } - } catch (Exception e) { - // Unable to load the formatter - } updateCount++; return resource.getDefaultVersion(); } @@ -112,18 +113,17 @@ public SchemaConfig getDefaultSchemaByName(String name){ }; } - public MessageFormatter getMessageFormatter(String uniqueId) { + public MessageFormatter getMessageFormatter(SchemaConfig config) throws IOException{ + return getMessageFormatter(config.getUniqueId()); + } + + public MessageFormatter getMessageFormatter(String uniqueId) throws IOException { MessageFormatter formatter = loadedFormatter.get(uniqueId); if(formatter == null){ SchemaConfig schemaConfig = getSchema(uniqueId); if(schemaConfig != null) { - try { - formatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig); - loadedFormatter.put(schemaConfig.getUniqueId(), formatter); - } catch (IOException e) { - // log this - } - + formatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig, this); + loadedFormatter.put(schemaConfig.getUniqueId(), formatter); } } return formatter; @@ -133,7 +133,6 @@ public List getMessageFormats() { return MessageFormatterFactory.getInstance().getFormatters(); } - public synchronized SchemaConfig getSchema(UUID uniqueId) { return getSchema(uniqueId.toString()); } @@ -146,6 +145,11 @@ public synchronized SchemaConfig getSchema(String uniqueId) { return null; } + + public synchronized SchemaConfig getSchemaByName(String name) { + return preLoadedSchemas.get(name); + } + public synchronized SchemaConfig locateSchema(String destinationName) { SchemaConfig config = SchemaLocationHelper.locateSchema(repository.getAllSchemas(), destinationName); if(config != null){ @@ -218,6 +222,7 @@ public String getDescription() { } public void start() { + logger.log(SCHEMA_MANAGER_STARTUP); SchemaConfig rawConfig = new RawSchemaConfig(); rawConfig.setUniqueId(DEFAULT_RAW_UUID); rawConfig.setVersion(1); @@ -264,11 +269,62 @@ public void start() { xmlSchemaConfig.setSchema(new JsonObject()); addSchema("$SYS", xmlSchemaConfig); + Path protoExec = protocPath == null? null: Path.of(protocPath); + + for(SchemaImportLocationDTO location: importLocations){ + if(location.getFormat().equalsIgnoreCase("json")){ + try { + JsonSchemaConfig jsonSchemaConfig1 = new JsonSchemaConfig(Path.of(location.getPath())); + addSchema(location.getName(), jsonSchemaConfig1); + loadBundle(jsonSchemaConfig1); + preLoadedSchemas.put(location.getName(), jsonSchemaConfig1); + logger.log(SCHEMA_MANAGER_LOADED_CONFIG,location.getName(), jsonSchemaConfig1.getUniqueId(), jsonSchemaConfig1.getFormat()); + } catch (IOException e) { + logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); + } + } + else if(location.getFormat().equalsIgnoreCase("protobuf")){ + try { + List schemaList = ProtobufSchemaGenerator.loadSchemas(Path.of(location.getPath()), protoExec); + for(ProtoBufSchemaConfig schema: schemaList){ + addSchema(location.getName(), schema); + loadBundle(schema); + preLoadedSchemas.put(location.getName(), schema); + logger.log(SCHEMA_MANAGER_LOADED_CONFIG,location.getName(), schema.getUniqueId(), schema.getFormat()); + } + } catch (IOException e) { + logger.log(SCHEMA_MANAGER_LOADED_CONFIG_FAILED, location.getPath(), location.getFormat(), e); + } + } + } // This ensures the factory is loaded MessageFormatterFactory.getInstance(); } + @Override + public SchemaConfig resolveParent(SchemaConfig schemaConfig) { + SchemaResource resource = repository.getResource(schemaConfig.getUniqueId()); + if(resource != null){ + return resource.getDefaultVersion(); + } + return null; + } + + + private void loadBundle(SchemaConfig schema){ + if(schema.isBundle()){ + for (SchemaConfig config : schema.getBundledSchemas()) { + logger.log(SCHEMA_MANAGER_LOADED_BUNDLED, config.getName(), schema.getName(), schema.getUniqueId(), schema.getFormat()); + repository.addVersion(config.getUniqueId(), config); + } + } + } + + private SchemaManager() { + loadedFormatter = new LinkedHashMap<>(); + pathMap = new LinkedHashMap<>(); + SchemaManagerConfig config; try { config = ConfigurationManager.getInstance().getConfiguration(SchemaManagerConfig.class); @@ -289,8 +345,16 @@ private SchemaManager() { buildTime = new SimpleSchemaRepository(); } repository = buildTime; - loadedFormatter = new LinkedHashMap<>(); - pathMap = new LinkedHashMap<>(); + + if(config != null){ + protocPath = config.getProtocPath(); + if(config.getImportLocations() != null) { + importLocations.addAll(config.getImportLocations()); + } + } + else{ + protocPath = null; + } } @Override diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java index b687d9b09..3cae27b34 100644 --- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java +++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java @@ -738,6 +738,14 @@ public enum ServerLogMessages implements LogMessage { COAP_BLOCK2_REQUEST(LEVEL.TRACE, SERVER_CATEGORY.PROTOCOL, "Block2 packet received, Message No: {}, Block Size: {}, has more:{}"), // + // + SCHEMA_MANAGER_STARTUP(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Schema Manager starting up"), + SCHEMA_MANAGER_LOADED_CONFIG(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Loaded configuration from file name:{} UUID:{} type:{}"), + SCHEMA_MANAGER_LOADED_BUNDLED(LEVEL.INFO, SERVER_CATEGORY.ENGINE, "Loaded bundled schema {} from name:{} UUID:{} type:{} "), + SCHEMA_MANAGER_LOADED_CONFIG_FAILED(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Failed to loaded configuration from file {} of type:{}"), + // + + // DEVICE_SELECTOR_PARSER_EXCEPTION(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Selection {}, failed to parse with the following exception {}"), DEVICE_SCHEMA_UPDATED(LEVEL.WARN, SERVER_CATEGORY.DEVICE, "Device {} schema configuration updated"), @@ -748,8 +756,6 @@ public enum ServerLogMessages implements LogMessage { DEVICE_STOP(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Stopping device {}"), DEVICE_SCHEDULE_TASK_FAILED(LEVEL.WARN, SERVER_CATEGORY.DEVICE, "Scheduled task for device failed"), DEVICE_SCHEDULE_TASK_EXCCEDED_TIME(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Scheduled task exceeded time limit, took {}ms"), - - DEVICE_MANAGER_STARTUP(LEVEL.DEBUG, SERVER_CATEGORY.ENGINE, "Starting Device Manager"), DEVICE_MANAGER_STARTUP_FAILED(LEVEL.WARN, SERVER_CATEGORY.ENGINE, "Device Manager failed to start"), DEVICE_MANAGER_FAILED_TO_REGISTER(LEVEL.INFO, SERVER_CATEGORY.DEVICE, "Failed to register device"), diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java b/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java index 1a0bb2248..4747fe593 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/n2k/N2kProtocol.java @@ -96,7 +96,7 @@ else if(encodedDatabase != null && !encodedDatabase.isEmpty()) { rawTopicTemplate = ((N2KConfigDTO)protocolConfig).getUnknownPacketTopic().replace("{candevice}",endPoint.getName()); parseToJson =((N2KConfigDTO)protocolConfig).isParseToJson(); String inboundTopicName =((N2KConfigDTO)protocolConfig).getInboundTopicName(); - formatter = (CanbusFormatter) MessageFormatterFactory.getInstance().getFormatter(canbusSchema); + formatter = (CanbusFormatter) SchemaManager.getInstance().getMessageFormatter(canbusSchema); try { session = buildSession(endPoint.getName(), 10000); } catch (ExecutionException | TimeoutException e) { diff --git a/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java b/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java index 766da211f..497d23baf 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java +++ b/src/main/java/io/mapsmessaging/network/protocol/transformation/cloudevent/pack/PackHelper.java @@ -76,7 +76,7 @@ protected PackHelper(@NotNull Gson gson) { MessageFormatter formatter = null; if (schemaConfig != null) { try { - formatter = MessageFormatterFactory.getInstance().getFormatter(schemaConfig); + formatter = SchemaManager.getInstance().getMessageFormatter(schemaConfig); } catch (Exception ignore) { // if we can not parse to json, we fall through and base64 it } diff --git a/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java b/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java index 566435a43..d093bd197 100644 --- a/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java +++ b/src/main/java/io/mapsmessaging/rest/api/impl/messaging/impl/RestMessageListener.java @@ -236,7 +236,7 @@ private MessageDTO convert(MessageDTO messageDTO, MessageEvent message) { if(message.getMessage().getSchemaId() != null) { SchemaConfig config = SchemaManager.getInstance().getSchema(message.getMessage().getSchemaId()); try { - MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config); + MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config); if (formatter != null && !(formatter instanceof RawFormatter)) { JsonObject jsonObject = formatter.parseToJson(payload); JsonObject wrapper = new JsonObject(); diff --git a/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java b/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java index 949e7d4df..933bd2269 100644 --- a/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java +++ b/src/test/java/io/mapsmessaging/api/transformers/JsonQueryTransformationTest.java @@ -20,6 +20,7 @@ package io.mapsmessaging.api.transformers; import io.mapsmessaging.configuration.ConfigurationProperties; +import io.mapsmessaging.engine.schema.SchemaManager; import io.mapsmessaging.engine.transformers.TransformerManager; import io.mapsmessaging.schemas.formatters.MessageFormatter; import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; @@ -56,7 +57,7 @@ protected InterServerTransformation createTransformer() { private void setMessageFormatter(InterServerTransformation transformation) { try { MessageFormatter messageFormatter = - MessageFormatterFactory.getInstance().getFormatter(AbstractInterServerTransformationTest.JSON_SCHEMA_CONFIG); + SchemaManager.getInstance().getMessageFormatter(AbstractInterServerTransformationTest.JSON_SCHEMA_CONFIG); ((JsonQueryTransformation) transformation).schemaMap.put(AbstractInterServerTransformationTest.SOURCE, messageFormatter); } catch (IOException e) { throw new RuntimeException(e);