Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -111,15 +113,25 @@ public enum Attribute {
*/
FAILED;

private static final Logger logger = LogManager.getLogger(Attribute.class);

/**
* Read an Attribute from a StreamInput
* Read an Attribute from a StreamInput.
* Returns null for unrecognized attribute names to support forward compatibility
* during rolling upgrades where newer nodes may send attributes unknown to older nodes.
*
* @param in the StreamInput to read from
* @return Attribute
* @return Attribute, or null if the attribute name is not recognized
* @throws IOException IOException
*/
static Attribute readFromStream(final StreamInput in) throws IOException {
return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT));
String name = in.readString().toUpperCase(Locale.ROOT);
try {
return Attribute.valueOf(name);
} catch (IllegalArgumentException e) {
logger.debug("Ignoring unrecognized attribute [{}] from stream", name);
return null;
}
}

/**
Expand Down Expand Up @@ -205,6 +217,11 @@ public static Map<Attribute, Object> readAttributeMap(StreamInput in) throws IOE

for (int i = 0; i < size; i++) {
Attribute key = readFromStream(in);
if (key == null) {
// Unknown attribute — consume the value to keep stream position correct, then discard
in.readGenericValue();
continue;
}
Object value = readAttributeValue(in, key);
map.put(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.plugin.insights.rules.model;

import java.io.IOException;
import java.util.Map;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -137,4 +138,105 @@ public void testNewNodeWriteToOldNodeInvalidJson() throws IOException {
assertNotNull("Should not be null", result);
}

/**
* Test that readFromStream returns null for unrecognized attribute names
* instead of throwing IllegalArgumentException.
*/
public void testReadFromStreamUnknownAttribute() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.writeString("UNKNOWN_FUTURE_ATTRIBUTE");

StreamInput in = out.bytes().streamInput();
Attribute result = Attribute.readFromStream(in);
assertNull("Unrecognized attribute should return null", result);
}

/**
* Test that readFromStream still works for known attributes.
*/
public void testReadFromStreamKnownAttribute() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.writeString("source");

StreamInput in = out.bytes().streamInput();
Attribute result = Attribute.readFromStream(in);
assertEquals(Attribute.SOURCE, result);
}

/**
* Test that readAttributeMap skips unknown attributes while preserving known ones.
* Simulates a newer node sending attributes that this node doesn't recognize.
*/
public void testReadAttributeMapSkipsUnknownAttributes() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();

// Write a map with 3 entries: known, unknown, known
out.writeVInt(3);

// Entry 1: known attribute (SEARCH_TYPE)
out.writeString("search_type");
out.writeGenericValue("query_then_fetch");

// Entry 2: unknown attribute from a future version
out.writeString("some_future_attribute");
out.writeGenericValue("future_value");

// Entry 3: known attribute (TOTAL_SHARDS)
out.writeString("total_shards");
out.writeGenericValue(5);

StreamInput in = out.bytes().streamInput();
Map<Attribute, Object> result = Attribute.readAttributeMap(in);

assertEquals("Should contain only the 2 known attributes", 2, result.size());
assertEquals("query_then_fetch", result.get(Attribute.SEARCH_TYPE));
assertEquals(5, result.get(Attribute.TOTAL_SHARDS));
assertNull("Unknown attribute should not be in map", result.get(null));
}

/**
* Test that readAttributeMap handles a map where all attributes are unknown.
*/
public void testReadAttributeMapAllUnknown() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();

out.writeVInt(2);
out.writeString("future_attr_1");
out.writeGenericValue(true);
out.writeString("future_attr_2");
out.writeGenericValue("some_value");

StreamInput in = out.bytes().streamInput();
Map<Attribute, Object> result = Attribute.readAttributeMap(in);

assertTrue("Map should be empty when all attributes are unknown", result.isEmpty());
}

/**
* Test that stream position is correctly maintained after skipping unknown attributes,
* allowing subsequent reads to succeed.
*/
public void testStreamPositionAfterSkippingUnknownAttributes() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();

// Write attribute map with an unknown entry
out.writeVInt(2);
out.writeString("unknown_attr");
out.writeGenericValue("skip_me");
out.writeString("node_id");
out.writeGenericValue("node_123");

// Write additional data after the map
out.writeString("sentinel");

StreamInput in = out.bytes().streamInput();
Map<Attribute, Object> result = Attribute.readAttributeMap(in);

assertEquals(1, result.size());
assertEquals("node_123", result.get(Attribute.NODE_ID));

// Verify stream position is correct — we can still read the sentinel
assertEquals("sentinel", in.readString());
}

}
Loading