flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).evaluateAttributeExpressions().asInteger());
if (!flowFiles.isEmpty()) {
processFlowFiles(processContext, processSession, flowFiles);
}
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java
index 0a4367d..9362cd2 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractToProcessor.java
@@ -33,7 +33,7 @@ public AbstractToProcessor() {
*/
public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("KAS URL")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java
index d55ea56..a6044f8 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java
@@ -60,23 +60,28 @@ public ConvertToZTDF() {
}
/**
- * Property descriptor for the "Sign Assertions" feature in the ConvertToZTDF processor. This property allows specifying whether
- * the assertions should be signed or not. It is not a required property and defaults to "false".
- *
- * - Name: Sign Assertions
- * - Description: sign assertions
- * - Required: false
- * - Default Value: false
- * - Allowable Values: true, false
- * - Expression Language Supported: {@link ExpressionLanguageScope#VARIABLE_REGISTRY}
+ * Property descriptor for the "Enable Encryption" feature in the ConvertToZTDF processor.
+ * When false, the flow file passes through without TDF encryption (tag-only / ABAC-only mode).
+ * Required, defaults to "true". Supports ENVIRONMENT-scoped expression language.
*/
+ public static final PropertyDescriptor ENABLE_ENCRYPTION = new org.apache.nifi.components.PropertyDescriptor.Builder()
+ .name("Enable Encryption")
+ .description("When false, the flow file passes through without TDF encryption. " +
+ "Use this for ABAC policy enforcement only (tag-only mode), mirroring the " +
+ "GATEWAY_ABAC_ENCRYPT_EMAIL=0 behavior in the Virtru Gateway.")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
public static final PropertyDescriptor SIGN_ASSERTIONS = new org.apache.nifi.components.PropertyDescriptor.Builder()
.name("Sign Assertions")
.description("sign assertions")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
/**
@@ -94,7 +99,7 @@ public ConvertToZTDF() {
.required(true)
.identifiesControllerService(PrivateKeyService.class)
.dependsOn(SIGN_ASSERTIONS, new AllowableValue("true"))
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
@@ -118,6 +123,7 @@ PrivateKeyService getPrivateKeyService(ProcessContext processContext) {
@Override
public List getSupportedPropertyDescriptors() {
List propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ propertyDescriptors.add(ENABLE_ENCRYPTION);
propertyDescriptors.add(PRIVATE_KEY_CONTROLLER_SERVICE);
propertyDescriptors.add(SIGN_ASSERTIONS);
return Collections.unmodifiableList(propertyDescriptors);
@@ -195,6 +201,11 @@ private void populateFieldFromMap(Map, ?> sourceMap, String key, Map, ?> des
*/
@Override
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List flowFiles) throws ProcessException {
+ Boolean encryptionEnabled = processContext.getProperty(ENABLE_ENCRYPTION).evaluateAttributeExpressions().asBoolean();
+ if (encryptionEnabled == null) {
+ throw new ProcessException("Enable Encryption property did not resolve to 'true' or 'false'");
+ }
+
SDK sdk = getTDFSDK(processContext);
for (final FlowFile flowFile : flowFiles) {
try {
@@ -203,6 +214,9 @@ void processFlowFiles(ProcessContext processContext, ProcessSession processSessi
//build baseline TDF Config options
List> configurationOptions = new ArrayList<>(Arrays.asList(Config.withKasInformation(kasInfoList.toArray(new Config.KASInfo[0])),
Config.withDataAttributes(dataAttributes.toArray(new String[0]))));
+ if (!encryptionEnabled) {
+ configurationOptions.add(cfg -> cfg.enableEncryption = false);
+ }
List nifiAssertionAttributeKeys = flowFile.getAttributes().keySet().stream().filter(x->x.startsWith(TDF_ASSERTION_PREFIX)).toList();
for(String nifiAssertionAttributeKey: nifiAssertionAttributeKeys) {
getLogger().debug(String.format("Adding assertion for NiFi attribute = %s", nifiAssertionAttributeKey));
diff --git a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java
index 74212c9..7fd17ca 100644
--- a/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java
+++ b/nifi-tdf-processors/src/main/java/io/opentdf/nifi/SimpleOpenTDFControllerService.java
@@ -37,7 +37,7 @@ public SimpleOpenTDFControllerService() {
.name("platform-endpoint")
.displayName("OpenTDF Platform ENDPOINT")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(false)
.description("OpenTDF Platform ENDPOINT in GRPC compatible format (no protocol prefix)")
@@ -55,7 +55,7 @@ public SimpleOpenTDFControllerService() {
.name("clientSecret")
.displayName("Client Secret")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.description("OpenTDF Platform Authentication Client Secret")
@@ -70,7 +70,7 @@ public SimpleOpenTDFControllerService() {
.name("clientId")
.displayName("Client ID")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(false)
.description("OpenTDF Platform Authentication Client ID")
diff --git a/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 79acb08..6fc52e5 100644
--- a/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-tdf-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -1,2 +1,3 @@
io.opentdf.nifi.ConvertFromZTDF
io.opentdf.nifi.ConvertToZTDF
+io.opentdf.nifi.ABACEnforcement
diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ABACEnforcementTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ABACEnforcementTest.java
new file mode 100644
index 0000000..43bc0f0
--- /dev/null
+++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/ABACEnforcementTest.java
@@ -0,0 +1,230 @@
+package io.opentdf.nifi;
+
+import com.google.common.util.concurrent.Futures;
+import io.opentdf.platform.authorization.AuthorizationServiceGrpc;
+import io.opentdf.platform.authorization.DecisionResponse;
+import io.opentdf.platform.authorization.GetDecisionsRequest;
+import io.opentdf.platform.authorization.GetDecisionsResponse;
+import io.opentdf.platform.sdk.SDK;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+class ABACEnforcementTest {
+
+ private static final String ENTITY_ID_VALUE = "nifi-service-account";
+ private static final String TDF_ATTR_FQN = "https://ns.example.mil/attr/classification/value/secret";
+ private static final byte[] PAYLOAD = "tactical message content".getBytes();
+
+ // ─── Mock inner class ─────────────────────────────────────────────────────
+
+ static class MockABACEnforcement extends ABACEnforcement {
+ SDK mockSDK;
+ @Override
+ SDK getTDFSDK(ProcessContext ctx) { return mockSDK; }
+ }
+
+ // ─── Fixtures ─────────────────────────────────────────────────────────────
+
+ private MockABACEnforcement processor;
+ private TestRunner runner;
+ private AuthorizationServiceGrpc.AuthorizationServiceFutureStub mockAuthStub;
+
+ @BeforeEach
+ void setup() throws Exception {
+ processor = new MockABACEnforcement();
+
+ SDK mockSDK = mock(SDK.class);
+ SDK.Services mockServices = mock(SDK.Services.class);
+ mockAuthStub = mock(AuthorizationServiceGrpc.AuthorizationServiceFutureStub.class);
+ when(mockSDK.getServices()).thenReturn(mockServices);
+ when(mockServices.authorization()).thenReturn(mockAuthStub);
+ processor.mockSDK = mockSDK;
+
+ runner = TestRunners.newTestRunner(processor);
+ Utils.setupTDFControllerService(runner);
+ runner.setProperty(ABACEnforcement.ENTITY_ID, ENTITY_ID_VALUE);
+ runner.setProperty(ABACEnforcement.ENTITY_TYPE, "CLIENT_ID");
+ }
+
+ // ─── PERMIT / DENY ───────────────────────────────────────────────────────
+
+ @Test
+ void permit_routesToPermitWithDecisionAttribute() {
+ stubAuthResponse(DecisionResponse.Decision.DECISION_PERMIT);
+
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", TDF_ATTR_FQN));
+ runner.run(1);
+
+ List permitted = runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT);
+ assertEquals(1, permitted.size());
+ assertEquals(0, runner.getFlowFilesForRelationship(ABACEnforcement.REL_DENY).size());
+ assertEquals(0, runner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+
+ MockFlowFile ff = permitted.get(0);
+ assertEquals("PERMIT", ff.getAttribute("abac.decision"));
+ assertEquals(ENTITY_ID_VALUE, ff.getAttribute("abac.entity_id"));
+ assertEquals(TDF_ATTR_FQN, ff.getAttribute("abac.resource_attributes"));
+ assertNotNull(ff.getAttribute("abac.processing_time_ms"));
+ // Binary content must pass through unchanged
+ assertArrayEquals(PAYLOAD, ff.toByteArray());
+ }
+
+ @Test
+ void deny_routesToDenyWithDecisionAttribute() {
+ stubAuthResponse(DecisionResponse.Decision.DECISION_DENY);
+
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", TDF_ATTR_FQN));
+ runner.run(1);
+
+ List denied = runner.getFlowFilesForRelationship(ABACEnforcement.REL_DENY);
+ assertEquals(1, denied.size());
+ assertEquals(0, runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT).size());
+ assertEquals("DENY", denied.get(0).getAttribute("abac.decision"));
+ assertArrayEquals(PAYLOAD, denied.get(0).toByteArray());
+ }
+
+ @Test
+ void anyDenyInMultipleDecisions_overallDeny() {
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenReturn(Futures.immediateFuture(GetDecisionsResponse.newBuilder()
+ .addDecisionResponses(decisionOf(DecisionResponse.Decision.DECISION_PERMIT))
+ .addDecisionResponses(decisionOf(DecisionResponse.Decision.DECISION_DENY))
+ .build()));
+
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", TDF_ATTR_FQN));
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_DENY).size());
+ }
+
+ // ─── Validation failures (fail-closed regardless of Fail Open setting) ───
+
+ @Test
+ void missingTdfAttribute_noDefault_routesToFailure() {
+ // No tdf_attribute on flow file, no Default Resource Attribute FQNs configured
+ runner.enqueue(PAYLOAD);
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+ assertEquals(0, runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT).size());
+ verifyNoInteractions(mockAuthStub);
+ }
+
+ @Test
+ void blankTdfAttribute_noDefault_routesToFailure() {
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", " "));
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+ verifyNoInteractions(mockAuthStub);
+ }
+
+ @Test
+ void allBlankFqnsAfterSplit_routesToFailure() {
+ // All entries are blank after splitting — empty FQN list must never default-permit
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", " , , "));
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+ verifyNoInteractions(mockAuthStub);
+ }
+
+ @Test
+ void emptyDecisionList_routesToFailure() {
+ // Auth service returns a response with no decisions — ambiguous, treat as failure
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenReturn(Futures.immediateFuture(GetDecisionsResponse.newBuilder().build()));
+
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", TDF_ATTR_FQN));
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+ }
+
+ // ─── Default resource attributes ─────────────────────────────────────────
+
+ @Test
+ void defaultResourceAttributes_usedWhenTdfAttributeAbsent() {
+ stubAuthResponse(DecisionResponse.Decision.DECISION_PERMIT);
+ runner.setProperty(ABACEnforcement.DEFAULT_RESOURCE_ATTRIBUTES, TDF_ATTR_FQN);
+
+ // Flow file has no tdf_attribute — should fall back to the default
+ runner.enqueue(PAYLOAD);
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT).size());
+ assertEquals(TDF_ATTR_FQN,
+ runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT)
+ .get(0).getAttribute("abac.resource_attributes"));
+ }
+
+ @Test
+ void tdfAttributeOverridesDefault_whenBothSet() {
+ stubAuthResponse(DecisionResponse.Decision.DECISION_PERMIT);
+ String defaultAttr = "https://ns.example.mil/attr/classification/value/unclassified";
+ runner.setProperty(ABACEnforcement.DEFAULT_RESOURCE_ATTRIBUTES, defaultAttr);
+
+ String flowFileAttr = TDF_ATTR_FQN;
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", flowFileAttr));
+ runner.run(1);
+
+ MockFlowFile ff = runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT).get(0);
+ assertEquals(flowFileAttr, ff.getAttribute("abac.resource_attributes"),
+ "Flow file tdf_attribute must take precedence over default");
+ }
+
+ // ─── Remote call failure / Fail Open ─────────────────────────────────────
+
+ @Test
+ void authServiceException_failClosedDefault_routesToFailure() {
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenThrow(new RuntimeException("gRPC channel closed"));
+
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", TDF_ATTR_FQN));
+ runner.run(1);
+
+ assertEquals(1, runner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+ assertEquals(0, runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT).size());
+ }
+
+ @Test
+ void authServiceTimeout_failOpen_routesToPermit() {
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenReturn(Futures.immediateFailedFuture(new TimeoutException("upstream timeout")));
+ runner.setProperty(ABACEnforcement.FAIL_OPEN, "true");
+
+ runner.enqueue(PAYLOAD, Map.of("tdf_attribute", TDF_ATTR_FQN));
+ runner.run(1);
+
+ List permitted = runner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT);
+ assertEquals(1, permitted.size());
+ assertEquals("PERMIT", permitted.get(0).getAttribute("abac.decision"));
+ assertNotNull(permitted.get(0).getAttribute("abac.error"),
+ "abac.error must be set on fail-open permit");
+ }
+
+ // ─── Helpers ─────────────────────────────────────────────────────────────
+
+ private void stubAuthResponse(DecisionResponse.Decision decision) {
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenReturn(Futures.immediateFuture(GetDecisionsResponse.newBuilder()
+ .addDecisionResponses(decisionOf(decision))
+ .build()));
+ }
+
+ private static DecisionResponse decisionOf(DecisionResponse.Decision decision) {
+ return DecisionResponse.newBuilder().setDecision(decision).build();
+ }
+}
diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/JREAPCPipelineTest.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/JREAPCPipelineTest.java
new file mode 100644
index 0000000..7c6ff4d
--- /dev/null
+++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/JREAPCPipelineTest.java
@@ -0,0 +1,291 @@
+package io.opentdf.nifi;
+
+import com.google.common.util.concurrent.Futures;
+import io.opentdf.platform.authorization.AuthorizationServiceGrpc;
+import io.opentdf.platform.authorization.DecisionResponse;
+import io.opentdf.platform.authorization.GetDecisionsRequest;
+import io.opentdf.platform.authorization.GetDecisionsResponse;
+import io.opentdf.platform.sdk.SDK;
+import io.opentdf.platform.sdk.TDF;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * End-to-end pipeline test: ParseJREAPC → ABACEnforcement → ConvertToZTDF.
+ *
+ * Verifies that JREAP-C binary content is preserved byte-for-byte through
+ * the full classification-tagging and ABAC enforcement pipeline before the TDF
+ * wrapping step receives it. Two scenarios are covered:
+ *
+ * - PERMIT — message flows through all three stages; TDF receives original bytes.
+ * - DENY — message is stopped at ABACEnforcement and never reaches ConvertToZTDF.
+ *
+ */
+class JREAPCPipelineTest {
+
+ // ─── JREAP-C binary builder (32-byte big-endian header + payload) ─────────
+
+ private static final int HEADER_SIZE = 32;
+
+ /**
+ * Builds a synthetic JREAP-C message with the given word-type, classification
+ * code, and arbitrary payload bytes appended after the fixed header.
+ *
+ * Header layout (big-endian):
+ *
+ * offset len field
+ * 0 2 word type
+ * 2 1 classification code (0=UNCLASSIFIED, 1=CUI, 2=SECRET, 3=TOP SECRET)
+ * 3 1 flags (bit0=exercise, bit1=simulation)
+ * 4 4 sequence number
+ * 8 8 source address
+ * 16 8 destination address
+ * 24 4 timestamp (UNIX, 32-bit)
+ * 28 2 track number
+ * 30 2 reserved
+ *
+ */
+ static byte[] buildJreapCMessage(int wordType, int classCode, int seqNum,
+ long timestamp, int trackNumber, byte[] payload) {
+ ByteBuffer buf = ByteBuffer.allocate(HEADER_SIZE + payload.length)
+ .order(ByteOrder.BIG_ENDIAN);
+ buf.putShort((short) wordType);
+ buf.put((byte) classCode);
+ buf.put((byte) 0x00); // flags
+ buf.putInt(seqNum);
+ buf.put(new byte[8]); // source address (zeroed)
+ buf.put(new byte[8]); // dest address (zeroed)
+ buf.putInt((int) timestamp);
+ buf.putShort((short) trackNumber);
+ buf.putShort((short) 0); // reserved
+ buf.put(payload);
+ return buf.array();
+ }
+
+ // ─── Mock inner classes ───────────────────────────────────────────────────
+
+ static class MockABACEnforcement extends ABACEnforcement {
+ SDK mockSDK;
+ @Override
+ SDK getTDFSDK(ProcessContext ctx) { return mockSDK; }
+ }
+
+ static class MockConvertToZTDF extends ConvertToZTDF {
+ SDK mockSDK;
+ TDF mockTDF;
+ @Override
+ SDK getTDFSDK(ProcessContext ctx) { return mockSDK; }
+ @Override
+ TDF getTDF() { return mockTDF; }
+ }
+
+ // ─── Tests ────────────────────────────────────────────────────────────────
+
+ /**
+ * Happy-path: SECRET JREAP-C message is parsed, ABAC returns PERMIT,
+ * and ConvertToZTDF receives the original binary unchanged.
+ */
+ @Test
+ void secretMessage_permitDecision_binaryPreservedThroughFullPipeline() throws Exception {
+ // Build a realistic JREAP-C message: J5.0 word type, SECRET, with a short payload
+ byte[] tacticalPayload = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
+ byte[] jreapMsg = buildJreapCMessage(
+ 0x0500, // J5.0 word type
+ 2, // classification code 2 = SECRET
+ 42, // sequence number
+ 1700000000L, // timestamp
+ 7, // track number
+ tacticalPayload
+ );
+
+ // ── Stage 1: ParseJREAPC ─────────────────────────────────────────────
+ ParseJREAPC parseProcessor = new ParseJREAPC();
+ TestRunner parseRunner = TestRunners.newTestRunner(parseProcessor);
+ parseRunner.setProperty(ParseJREAPC.CLASSIFICATION_ATTRIBUTE_NAMESPACE,
+ "https://classification.example.mil/attr/level");
+
+ parseRunner.enqueue(jreapMsg);
+ parseRunner.run(1);
+
+ List parsedFiles = parseRunner.getFlowFilesForRelationship(ParseJREAPC.REL_SUCCESS);
+ assertEquals(1, parsedFiles.size(), "ParseJREAPC must route to success");
+ MockFlowFile parsedFF = parsedFiles.get(0);
+
+ // Verify classification was extracted and tdf_attribute was set
+ assertEquals("SECRET", parsedFF.getAttribute("jreapc.classification"));
+ assertEquals("https://classification.example.mil/attr/level/value/secret",
+ parsedFF.getAttribute("tdf_attribute"));
+ assertEquals("J5.0", parsedFF.getAttribute("jreapc.word_type"));
+ assertEquals("42", parsedFF.getAttribute("jreapc.sequence_number"));
+
+ // Binary must be unchanged at this stage
+ assertArrayEquals(jreapMsg, parsedFF.toByteArray(),
+ "ParseJREAPC must not modify binary content");
+
+ // ── Stage 2: ABACEnforcement (mocked: PERMIT) ────────────────────────
+ MockABACEnforcement abacProcessor = new MockABACEnforcement();
+
+ SDK mockAbacSDK = mock(SDK.class);
+ SDK.Services mockAbacServices = mock(SDK.Services.class);
+ AuthorizationServiceGrpc.AuthorizationServiceFutureStub mockAuthStub =
+ mock(AuthorizationServiceGrpc.AuthorizationServiceFutureStub.class);
+ when(mockAbacSDK.getServices()).thenReturn(mockAbacServices);
+ when(mockAbacServices.authorization()).thenReturn(mockAuthStub);
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenReturn(Futures.immediateFuture(GetDecisionsResponse.newBuilder()
+ .addDecisionResponses(DecisionResponse.newBuilder()
+ .setDecision(DecisionResponse.Decision.DECISION_PERMIT))
+ .build()));
+ abacProcessor.mockSDK = mockAbacSDK;
+
+ TestRunner abacRunner = TestRunners.newTestRunner(abacProcessor);
+ Utils.setupTDFControllerService(abacRunner);
+ abacRunner.setProperty(ABACEnforcement.ENTITY_ID, "nifi-pipeline-service");
+ abacRunner.setProperty(ABACEnforcement.ENTITY_TYPE, "CLIENT_ID");
+
+ // Carry all attributes and content forward from ParseJREAPC output
+ Map parsedAttrs = new HashMap<>(parsedFF.getAttributes());
+ abacRunner.enqueue(parsedFF.toByteArray(), parsedAttrs);
+ abacRunner.run(1);
+
+ List permittedFiles = abacRunner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT);
+ assertEquals(1, permittedFiles.size(), "ABACEnforcement must route SECRET to permit");
+ assertEquals(0, abacRunner.getFlowFilesForRelationship(ABACEnforcement.REL_DENY).size());
+
+ MockFlowFile permittedFF = permittedFiles.get(0);
+ assertEquals("PERMIT", permittedFF.getAttribute("abac.decision"));
+ assertEquals("https://classification.example.mil/attr/level/value/secret",
+ permittedFF.getAttribute("abac.resource_attributes"));
+
+ // Binary still unchanged after ABAC decision
+ assertArrayEquals(jreapMsg, permittedFF.toByteArray(),
+ "ABACEnforcement must not modify binary content");
+
+ // ── Stage 3: ConvertToZTDF (mocked: capture bytes fed to TDF) ────────
+ MockConvertToZTDF tdfProcessor = new MockConvertToZTDF();
+
+ SDK mockTdfSDK = mock(SDK.class);
+ SDK.Services mockTdfServices = mock(SDK.Services.class);
+ SDK.KAS mockKAS = mock(SDK.KAS.class);
+ TDF mockTDF = mock(TDF.class);
+ when(mockTdfSDK.getServices()).thenReturn(mockTdfServices);
+ when(mockTdfServices.kas()).thenReturn(mockKAS);
+ tdfProcessor.mockSDK = mockTdfSDK;
+ tdfProcessor.mockTDF = mockTDF;
+
+ // Capture the exact bytes ConvertToZTDF passes to TDF.createTDF()
+ final byte[][] capturedInputBytes = {null};
+ doAnswer(inv -> {
+ java.io.InputStream is = inv.getArgument(0);
+ java.io.OutputStream os = inv.getArgument(1);
+ capturedInputBytes[0] = is.readAllBytes();
+ os.write(("WRAPPED:" + capturedInputBytes[0].length + "b").getBytes());
+ return null;
+ }).when(mockTDF).createTDF(any(), any(), any(), any(), any());
+
+ TestRunner tdfRunner = TestRunners.newTestRunner(tdfProcessor);
+ Utils.setupTDFControllerService(tdfRunner);
+ tdfRunner.setProperty(ConvertToZTDF.KAS_URL, "https://kas.example.mil");
+
+ Map permittedAttrs = new HashMap<>(permittedFF.getAttributes());
+ tdfRunner.enqueue(permittedFF.toByteArray(), permittedAttrs);
+ tdfRunner.run(1);
+
+ assertEquals(1, tdfRunner.getFlowFilesForRelationship(ConvertToZTDF.REL_SUCCESS).size(),
+ "ConvertToZTDF must succeed");
+
+ // ── Key assertion: TDF receives the original JREAP-C binary, byte-for-byte ──
+ assertNotNull(capturedInputBytes[0], "TDF.createTDF must have been called");
+ assertArrayEquals(jreapMsg, capturedInputBytes[0],
+ "ConvertToZTDF must feed the original JREAP-C binary to the TDF library unchanged");
+ }
+
+ /**
+ * Deny-path: SECRET JREAP-C message is parsed but ABAC returns DENY —
+ * the message must be stopped before reaching ConvertToZTDF.
+ */
+ @Test
+ void secretMessage_denyDecision_neverReachesTdf() throws Exception {
+ byte[] jreapMsg = buildJreapCMessage(0x0500, 2, 1, 1700000000L, 3,
+ "sensitive tactical data".getBytes());
+
+ // Stage 1: ParseJREAPC
+ ParseJREAPC parseProcessor = new ParseJREAPC();
+ TestRunner parseRunner = TestRunners.newTestRunner(parseProcessor);
+ parseRunner.setProperty(ParseJREAPC.CLASSIFICATION_ATTRIBUTE_NAMESPACE,
+ "https://classification.example.mil/attr/level");
+ parseRunner.enqueue(jreapMsg);
+ parseRunner.run(1);
+ MockFlowFile parsedFF = parseRunner.getFlowFilesForRelationship(ParseJREAPC.REL_SUCCESS).get(0);
+
+ // Stage 2: ABACEnforcement — DENY
+ MockABACEnforcement abacProcessor = new MockABACEnforcement();
+ SDK mockAbacSDK = mock(SDK.class);
+ SDK.Services mockAbacServices = mock(SDK.Services.class);
+ AuthorizationServiceGrpc.AuthorizationServiceFutureStub mockAuthStub =
+ mock(AuthorizationServiceGrpc.AuthorizationServiceFutureStub.class);
+ when(mockAbacSDK.getServices()).thenReturn(mockAbacServices);
+ when(mockAbacServices.authorization()).thenReturn(mockAuthStub);
+ when(mockAuthStub.getDecisions(any(GetDecisionsRequest.class)))
+ .thenReturn(Futures.immediateFuture(GetDecisionsResponse.newBuilder()
+ .addDecisionResponses(DecisionResponse.newBuilder()
+ .setDecision(DecisionResponse.Decision.DECISION_DENY))
+ .build()));
+ abacProcessor.mockSDK = mockAbacSDK;
+
+ TestRunner abacRunner = TestRunners.newTestRunner(abacProcessor);
+ Utils.setupTDFControllerService(abacRunner);
+ abacRunner.setProperty(ABACEnforcement.ENTITY_ID, "unauthorized-client");
+ abacRunner.setProperty(ABACEnforcement.ENTITY_TYPE, "CLIENT_ID");
+
+ abacRunner.enqueue(parsedFF.toByteArray(), new HashMap<>(parsedFF.getAttributes()));
+ abacRunner.run(1);
+
+ // Message must be on the deny relationship — not permit, not failure
+ assertEquals(1, abacRunner.getFlowFilesForRelationship(ABACEnforcement.REL_DENY).size());
+ assertEquals(0, abacRunner.getFlowFilesForRelationship(ABACEnforcement.REL_PERMIT).size());
+ assertEquals(0, abacRunner.getFlowFilesForRelationship(ABACEnforcement.REL_FAILURE).size());
+
+ MockFlowFile deniedFF = abacRunner.getFlowFilesForRelationship(ABACEnforcement.REL_DENY).get(0);
+ assertEquals("DENY", deniedFF.getAttribute("abac.decision"));
+
+ // The denied flow file would NOT be fed to ConvertToZTDF — no wrapping stage reached.
+ // Binary content is still intact on the deny branch (for audit/quarantine downstream).
+ assertArrayEquals(jreapMsg, deniedFF.toByteArray(),
+ "Binary content must be intact on the deny branch");
+ }
+
+ /**
+ * Top-secret message: classification code 3 → tdf_attribute slug is top_secret.
+ */
+ @Test
+ void topSecretMessage_tdfAttributeSlug_isTopSecret() throws Exception {
+ byte[] jreapMsg = buildJreapCMessage(0x0300, 3, 1, 1700000000L, 0, new byte[]{0x55});
+
+ ParseJREAPC parseProcessor = new ParseJREAPC();
+ TestRunner parseRunner = TestRunners.newTestRunner(parseProcessor);
+ parseRunner.setProperty(ParseJREAPC.CLASSIFICATION_ATTRIBUTE_NAMESPACE,
+ "https://classification.example.mil/attr/level");
+ parseRunner.enqueue(jreapMsg);
+ parseRunner.run(1);
+
+ MockFlowFile parsedFF = parseRunner.getFlowFilesForRelationship(ParseJREAPC.REL_SUCCESS).get(0);
+ assertEquals("TOP SECRET", parsedFF.getAttribute("jreapc.classification"));
+ assertEquals("https://classification.example.mil/attr/level/value/top_secret",
+ parsedFF.getAttribute("tdf_attribute"));
+ assertArrayEquals(jreapMsg, parsedFF.toByteArray());
+ }
+}
diff --git a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java
index b505c18..65ce79d 100644
--- a/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java
+++ b/nifi-tdf-processors/src/test/java/io/opentdf/nifi/Utils.java
@@ -12,6 +12,7 @@
public class Utils {
static void setupTDFControllerService(TestRunner runner) throws Exception {
+ runner.setValidateExpressionUsage(false);
SimpleOpenTDFControllerService tdfControllerService = new SimpleOpenTDFControllerService();
Map controllerPropertyMap = new HashMap<>();
controllerPropertyMap.put(PLATFORM_ENDPOINT.getName(), "http://platform");
diff --git a/pom.xml b/pom.xml
index 9f34e47..c9e9ab6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,11 +29,11 @@
https://github.com/opentdf/nifi/tree/main/
- 1.23.1
+ 2.7.0
5.10.0
- 17
- 17
- 17
+ 21
+ 21
+ 21
.7
@@ -41,6 +41,8 @@
nifi-tdf-controller-services-api-nar
nifi-tdf-processors
nifi-tdf-nar
+ nifi-jreapc-processors
+ nifi-jreapc-nar
@@ -78,13 +80,13 @@
org.mockito
mockito-core
- 5.2.0
+ 5.14.0
test
org.mockito
mockito-junit-jupiter
- 5.2.0
+ 5.14.0
test
@@ -105,7 +107,7 @@
org.apache.nifi
nifi-nar-maven-plugin
- 1.5.1
+ 2.0.0
true
@@ -256,6 +258,10 @@
1
+
+ false