Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.mapsmessaging.selector.IdentifierResolver;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -73,7 +74,11 @@ public Analyser create(@NotNull StatisticsConfigDTO config) {
public Message ingest(@NotNull Message event) {
if(formatter == null) {
String schemaId = event.getSchemaId();
formatter = SchemaManager.getInstance().getMessageFormatter(schemaId);
try {
formatter = SchemaManager.getInstance().getMessageFormatter(schemaId);
} catch (IOException e) {
// log
}
}
if(formatter != null) {
IdentifierResolver resolver = formatter.parse(event.getOpaqueData());
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/mapsmessaging/api/message/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.mapsmessaging.selector.IdentifierResolver;
import io.mapsmessaging.selector.operators.ParserExecutor;

import java.io.IOException;
import java.util.List;

@SuppressWarnings("java:S6548") // yes it is a singleton
Expand Down Expand Up @@ -75,9 +76,13 @@ public static IdentifierResolver getTopicResolver(String topicName, Message mess
}

public static IdentifierResolver getResolver(String lookup, Message message) {
MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(lookup);
if (formatter != null) {
return formatter.parse(message.getOpaqueData());
try {
MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(lookup);
if (formatter != null) {
return formatter.parse(message.getOpaqueData());
}
} catch (IOException e) {
// log
}
return null;
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/mapsmessaging/api/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.mapsmessaging.location.LocationManager;
import io.mapsmessaging.schemas.config.SchemaConfig;
import io.mapsmessaging.schemas.formatters.MessageFormatter;
import io.mapsmessaging.schemas.formatters.MessageFormatterFactory;
import io.mapsmessaging.schemas.formatters.ParsedObject;
import io.mapsmessaging.selector.IdentifierResolver;
import io.mapsmessaging.storage.Storable;
Expand Down Expand Up @@ -360,7 +359,7 @@ public Object get(String key) {
SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId);
if(config != null){
try {
MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config);
MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config);
parsedObject = formatter.parse(getOpaqueData());
} catch (IOException e) {
parsedObject = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private MessageFormatter locateMessageFormatter(String source, ParsedMessage mes
if (schemaId != null) {
SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId);
try {
messageFormatter = MessageFormatterFactory.getInstance().getFormatter(config);
messageFormatter = SchemaManager.getInstance().getMessageFormatter(config);
if (messageFormatter != null) {
schemaMap.put(source, messageFormatter);
}
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/mapsmessaging/config/SchemaManagerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.mapsmessaging.utilities.configuration.ConfigurationManager;

import java.io.IOException;
import java.util.List;

public class SchemaManagerConfig extends SchemaManagerConfigDTO implements Config, ConfigManager {

Expand All @@ -41,6 +42,28 @@ public SchemaManagerConfig(ConfigurationProperties configurationProperties) {
case "maps" -> super.setRepositoryConfig(configureMaps( (ConfigurationProperties) configurationProperties.get("config")));
default -> super.setRepositoryConfig(new SimpleRepositoryConfigDTO());
}
Object locations = configurationProperties.get("importLocations");
if(locations instanceof List list){
for(Object location : list){
if(location instanceof ConfigurationProperties props){
loadLocations(props);
}
}
}
else if(locations instanceof ConfigurationProperties props){
loadLocations(props);
}
setProtocPath(configurationProperties.getProperty("protocPath", null));
}

private void loadLocations(ConfigurationProperties props){
if(props.containsKey("path") && props.containsKey("format") && props.containsKey("name")){
SchemaImportLocationDTO locationDTO = new SchemaImportLocationDTO();
locationDTO.setPath(props.getProperty("path"));
locationDTO.setFormat(props.getProperty("format"));
locationDTO.setName(props.getProperty("name"));
super.getImportLocations().add(locationDTO);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
*
* Copyright [ 2024 - 2026 ] MapsMessaging B.V.
*
* Licensed under the Apache License, Version 2.0 with the Commons Clause
* (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://commonsclause.com/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mapsmessaging.dto.rest.schema;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Schema(
title = "Schema Import Location",
description = "Defines a directory path and the schema format to load from that directory")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SchemaImportLocationDTO {

@Schema(
description = "Name to apply to the schema when loaded",
requiredMode = Schema.RequiredMode.REQUIRED,
nullable = false,
example = "protobuf_schema_1"
)
private String name;


@Schema(
description = "Directory path containing schema files",
requiredMode = Schema.RequiredMode.REQUIRED,
nullable = false,
example = "/opt/schema/protobuf"
)
private String path;

@Schema(
description = "Schema format contained in the directory",
requiredMode = Schema.RequiredMode.REQUIRED,
nullable = false,
allowableValues = {
"protobuf",
"json"
},
example = "protobuf"
)
private String format;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

@Schema(
title = "Schema Manager config",
description = "Configures the schema manager on where it can find and store schemas")
Expand Down Expand Up @@ -57,6 +60,25 @@ public class SchemaManagerConfigDTO extends BaseManagerConfigDTO {
)
private RepositoryConfigDTO repositoryConfig;


@Schema(
description = "Optional path to the protoc compiler executable. " +
"If not supplied the system PATH will be used.",
requiredMode = Schema.RequiredMode.NOT_REQUIRED,
nullable = true,
example = "/usr/local/bin/protoc"
)
private String protocPath;

@Schema(
description = "List of directories that are scanned for schema source files such as protobuf or JSON schemas. " +
"These schemas are loaded and exposed as built-in or well-known schemas.",
requiredMode = Schema.RequiredMode.NOT_REQUIRED,
nullable = true
)
private List<SchemaImportLocationDTO> importLocations = new ArrayList<>();


public SchemaManagerConfigDTO() {
super("SchemaManagerConfigDTO");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Message parse(Message message, String destinationName) throws Exception {
if(schemaId != null && !destinationName.startsWith("$")) {
SchemaConfig config = SchemaManager.getInstance().getSchema(schemaId);
if(config != null) {
MessageFormatter formatter = MessageFormatterFactory.getInstance().getFormatter(config);
MessageFormatter formatter = SchemaManager.getInstance().getMessageFormatter(config);
if (formatter != null && !(formatter instanceof RawFormatter)) {
byte[] data = pack(message, config, formatter);
MessageBuilder messageBuilder = new MessageBuilder();
Expand Down
Loading