Skip to content

Commit 65c9841

Browse files
committed
NIFI-4199: Consistent proxy support across components
- Added ProxyConfigurationService to manage centralized proxy configurations - Adopt ProxyConfigurationService at FTP and HTTP processors
1 parent ccedc71 commit 65c9841

23 files changed

Lines changed: 745 additions & 83 deletions

File tree

nifi-assembly/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,12 @@ language governing permissions and limitations under the License. -->
703703
<version>1.7.0-SNAPSHOT</version>
704704
<type>nar</type>
705705
</dependency>
706+
<dependency>
707+
<groupId>org.apache.nifi</groupId>
708+
<artifactId>nifi-proxy-configuration-nar</artifactId>
709+
<version>1.7.0-SNAPSHOT</version>
710+
<type>nar</type>
711+
</dependency>
706712
</dependencies>
707713
<profiles>
708714
<profile>

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@
6464
<groupId>org.apache.nifi</groupId>
6565
<artifactId>nifi-record-serialization-service-api</artifactId>
6666
</dependency>
67+
<dependency>
68+
<groupId>org.apache.nifi</groupId>
69+
<artifactId>nifi-proxy-configuration-api</artifactId>
70+
</dependency>
6771
<dependency>
6872
<groupId>org.apache.nifi</groupId>
6973
<artifactId>nifi-record</artifactId>

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.nifi.processor.ProcessContext;
3131
import org.apache.nifi.processors.standard.util.FileTransfer;
3232
import org.apache.nifi.processors.standard.util.FTPTransfer;
33+
import org.apache.nifi.proxy.ProxyConfigurationService;
3334

3435
// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
3536
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -63,6 +64,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
6364
properties.add(FTPTransfer.USE_COMPRESSION);
6465
properties.add(FTPTransfer.CONNECTION_MODE);
6566
properties.add(FTPTransfer.TRANSFER_MODE);
67+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
6668
properties.add(FTPTransfer.PROXY_TYPE);
6769
properties.add(FTPTransfer.PROXY_HOST);
6870
properties.add(FTPTransfer.PROXY_PORT);

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.nifi.processor.ProcessorInitializationContext;
3333
import org.apache.nifi.processors.standard.util.FTPTransfer;
3434
import org.apache.nifi.processors.standard.util.FileTransfer;
35+
import org.apache.nifi.proxy.ProxyConfigurationService;
3536

3637
@InputRequirement(Requirement.INPUT_FORBIDDEN)
3738
@Tags({"FTP", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@@ -76,6 +77,7 @@ protected void init(final ProcessorInitializationContext context) {
7677
properties.add(FTPTransfer.MAX_SELECTS);
7778
properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
7879
properties.add(FTPTransfer.USE_NATURAL_ORDERING);
80+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
7981
properties.add(FTPTransfer.PROXY_TYPE);
8082
properties.add(FTPTransfer.PROXY_HOST);
8183
properties.add(FTPTransfer.PROXY_PORT);

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import javax.net.ssl.SSLContext;
4343
import org.apache.commons.lang3.StringUtils;
4444
import org.apache.http.Header;
45-
import org.apache.http.HttpHost;
4645
import org.apache.http.HttpResponse;
4746
import org.apache.http.auth.AuthScope;
4847
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -93,13 +92,18 @@
9392
import org.apache.nifi.processor.ProcessorInitializationContext;
9493
import org.apache.nifi.processor.Relationship;
9594
import org.apache.nifi.processor.exception.ProcessException;
95+
import org.apache.nifi.processors.standard.util.HTTPUtils;
9696
import org.apache.nifi.security.util.KeyStoreUtils;
9797
import org.apache.nifi.processor.util.StandardValidators;
9898
import org.apache.nifi.ssl.SSLContextService;
9999
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
100100
import org.apache.nifi.util.StopWatch;
101101
import org.apache.nifi.util.Tuple;
102102

103+
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
104+
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
105+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
106+
103107
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
104108
@InputRequirement(Requirement.INPUT_FORBIDDEN)
105109
@CapabilityDescription("Fetches data from an HTTP or HTTPS URL and writes the data to the content of a FlowFile. Once the content has been fetched, the ETag and Last Modified "
@@ -195,18 +199,6 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
195199
.required(false)
196200
.identifiesControllerService(SSLContextService.class)
197201
.build();
198-
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
199-
.name("Proxy Host")
200-
.description("The fully qualified hostname or IP address of the proxy server")
201-
.required(false)
202-
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
203-
.build();
204-
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
205-
.name("Proxy Port")
206-
.description("The port of the proxy server")
207-
.required(false)
208-
.addValidator(StandardValidators.PORT_VALIDATOR)
209-
.build();
210202

211203
public static final String DEFAULT_COOKIE_POLICY_STR = "default";
212204
public static final String STANDARD_COOKIE_POLICY_STR = "standard";
@@ -268,6 +260,7 @@ protected void init(final ProcessorInitializationContext context) {
268260
properties.add(ACCEPT_CONTENT_TYPE);
269261
properties.add(FOLLOW_REDIRECTS);
270262
properties.add(REDIRECT_COOKIE_POLICY);
263+
properties.add(PROXY_CONFIGURATION_SERVICE);
271264
properties.add(PROXY_HOST);
272265
properties.add(PROXY_PORT);
273266
this.properties = Collections.unmodifiableList(properties);
@@ -315,13 +308,7 @@ protected Collection<ValidationResult> customValidate(final ValidationContext co
315308
.build());
316309
}
317310

318-
if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
319-
results.add(new ValidationResult.Builder()
320-
.explanation("Proxy Host was set but no Proxy Port was specified")
321-
.valid(false)
322-
.subject("Proxy server configuration")
323-
.build());
324-
}
311+
HTTPUtils.validateProxyProperties(context, results);
325312

326313
return results;
327314
}
@@ -456,22 +443,18 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
456443
final String password = context.getProperty(PASSWORD).getValue();
457444

458445
// set the credentials if appropriate
446+
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
447+
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
459448
if (username != null) {
460-
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
461449
if (password == null) {
462450
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
463451
} else {
464452
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
465453
}
466-
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
467454
}
468455

469456
// Set the proxy if specified
470-
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
471-
final String host = context.getProperty(PROXY_HOST).getValue();
472-
final int port = context.getProperty(PROXY_PORT).asInteger();
473-
clientBuilder.setProxy(new HttpHost(host, port));
474-
}
457+
HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
475458

476459
// create request
477460
final HttpGet get = new HttpGet(url);

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.nifi.processor.util.StandardValidators;
6060
import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
6161
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
62+
import org.apache.nifi.proxy.ProxyConfiguration;
6263
import org.apache.nifi.ssl.SSLContextService;
6364
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
6465
import org.apache.nifi.stream.io.StreamUtils;
@@ -79,7 +80,6 @@
7980
import java.io.FileInputStream;
8081
import java.io.IOException;
8182
import java.io.InputStream;
82-
import java.net.InetSocketAddress;
8383
import java.net.Proxy;
8484
import java.net.Proxy.Type;
8585
import java.net.URL;
@@ -109,6 +109,7 @@
109109
import java.util.regex.Pattern;
110110

111111
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
112+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
112113

113114
@SupportsBatching
114115
@Tags({"http", "https", "rest", "client"})
@@ -423,6 +424,7 @@ public final class InvokeHTTP extends AbstractProcessor {
423424
PROP_ATTRIBUTES_TO_SEND,
424425
PROP_BASIC_AUTH_USERNAME,
425426
PROP_BASIC_AUTH_PASSWORD,
427+
PROXY_CONFIGURATION_SERVICE,
426428
PROP_PROXY_HOST,
427429
PROP_PROXY_PORT,
428430
PROP_PROXY_TYPE,
@@ -565,6 +567,8 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va
565567
results.add(new ValidationResult.Builder().subject("SSL Context Service").valid(false).explanation("If Proxy Type is HTTPS, SSL Context Service must be set").build());
566568
}
567569

570+
ProxyConfiguration.validateProxyType(validationContext, results, Type.HTTP);
571+
568572
return results;
569573
}
570574

@@ -575,14 +579,30 @@ public void setUpClient(final ProcessContext context) throws IOException, Unreco
575579
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
576580

577581
// Add a proxy if set
578-
final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
579-
final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger();
580-
final String proxyType = context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue();
581-
boolean isHttpsProxy = false;
582-
if (proxyHost != null && proxyPort != null) {
583-
final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
582+
boolean isHttpsProxy = HTTPS.equals(context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue());
583+
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
584+
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
585+
final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
586+
final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger();
587+
if (proxyHost != null && proxyPort != null) {
588+
componentProxyConfig.setProxyType(Type.HTTP);
589+
componentProxyConfig.setProxyServerHost(proxyHost);
590+
componentProxyConfig.setProxyServerPort(proxyPort);
591+
final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue());
592+
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
593+
componentProxyConfig.setProxyUserName(proxyUsername);
594+
componentProxyConfig.setProxyUserPassword(proxyPassword);
595+
}
596+
return componentProxyConfig;
597+
});
598+
599+
final Proxy proxy = proxyConfig.createProxy();
600+
if (Type.HTTP.equals(proxy.type())) {
584601
okHttpClientBuilder.proxy(proxy);
585-
isHttpsProxy = HTTPS.equals(proxyType);
602+
if (proxyConfig.hasCredential()) {
603+
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
604+
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
605+
}
586606
}
587607

588608
// configure ETag cache if enabled
@@ -691,7 +711,6 @@ private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder, SSLCo
691711

692712
private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessContext context) {
693713
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
694-
final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue());
695714

696715
// If the username/password properties are set then check if digest auth is being used
697716
if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
@@ -706,23 +725,8 @@ private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessC
706725
com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
707726
final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
708727

709-
if(!proxyUsername.isEmpty()) {
710-
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
711-
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
712-
713-
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
714-
}
715-
716728
okHttpClientBuilder.interceptors().add(new AuthenticationCacheInterceptor(authCache));
717729
okHttpClientBuilder.authenticator(new CachingAuthenticatorDecorator(digestAuthenticator, authCache));
718-
} else {
719-
// Add proxy authentication only
720-
if(!proxyUsername.isEmpty()) {
721-
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
722-
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
723-
724-
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
725-
}
726730
}
727731
}
728732

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.nifi.processor.ProcessContext;
3434
import org.apache.nifi.processors.standard.util.FileTransfer;
3535
import org.apache.nifi.processors.standard.util.FTPTransfer;
36+
import org.apache.nifi.proxy.ProxyConfigurationService;
3637

3738
@TriggerSerially
3839
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -79,6 +80,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
7980
properties.add(FTPTransfer.DATA_TIMEOUT);
8081
properties.add(FTPTransfer.CONNECTION_MODE);
8182
properties.add(FTPTransfer.TRANSFER_MODE);
83+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
8284
properties.add(FTPTransfer.PROXY_TYPE);
8385
properties.add(FTPTransfer.PROXY_HOST);
8486
properties.add(FTPTransfer.PROXY_PORT);

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.commons.io.IOUtils;
2020
import org.apache.http.Header;
2121
import org.apache.http.HttpException;
22-
import org.apache.http.HttpHost;
2322
import org.apache.http.HttpResponse;
2423
import org.apache.http.HttpResponseInterceptor;
2524
import org.apache.http.auth.AuthScope;
@@ -75,6 +74,7 @@
7574
import org.apache.nifi.processor.exception.ProcessException;
7675
import org.apache.nifi.processor.io.InputStreamCallback;
7776
import org.apache.nifi.processor.util.StandardValidators;
77+
import org.apache.nifi.processors.standard.util.HTTPUtils;
7878
import org.apache.nifi.security.util.CertificateUtils;
7979
import org.apache.nifi.security.util.KeyStoreUtils;
8080
import org.apache.nifi.ssl.SSLContextService;
@@ -127,6 +127,10 @@
127127
import java.util.concurrent.atomic.AtomicReference;
128128
import java.util.regex.Pattern;
129129

130+
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
131+
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
132+
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
133+
130134
@SupportsBatching
131135
@InputRequirement(Requirement.INPUT_REQUIRED)
132136
@Tags({"http", "https", "remote", "copy", "archive"})
@@ -243,18 +247,6 @@ public class PostHTTP extends AbstractProcessor {
243247
.required(false)
244248
.identifiesControllerService(SSLContextService.class)
245249
.build();
246-
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
247-
.name("Proxy Host")
248-
.description("The fully qualified hostname or IP address of the proxy server")
249-
.required(false)
250-
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
251-
.build();
252-
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
253-
.name("Proxy Port")
254-
.description("The port of the proxy server")
255-
.required(false)
256-
.addValidator(StandardValidators.PORT_VALIDATOR)
257-
.build();
258250
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
259251
.name("Content-Type")
260252
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
@@ -302,6 +294,7 @@ protected void init(final ProcessorInitializationContext context) {
302294
properties.add(DATA_TIMEOUT);
303295
properties.add(ATTRIBUTES_AS_HEADERS_REGEX);
304296
properties.add(USER_AGENT);
297+
properties.add(PROXY_CONFIGURATION_SERVICE);
305298
properties.add(PROXY_HOST);
306299
properties.add(PROXY_PORT);
307300
properties.add(CONTENT_TYPE);
@@ -328,14 +321,6 @@ protected Collection<ValidationResult> customValidate(final ValidationContext co
328321
.valid(false).subject("SSL Context").build());
329322
}
330323

331-
if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
332-
results.add(new ValidationResult.Builder()
333-
.explanation("Proxy Host was set but no Proxy Port was specified")
334-
.valid(false)
335-
.subject("Proxy server configuration")
336-
.build());
337-
}
338-
339324
boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
340325
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
341326
boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet();
@@ -345,6 +330,8 @@ protected Collection<ValidationResult> customValidate(final ValidationContext co
345330
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build());
346331
}
347332

333+
HTTPUtils.validateProxyProperties(context, results);
334+
348335
return results;
349336
}
350337

@@ -535,22 +522,18 @@ public void process(final HttpResponse response, final HttpContext httpContext)
535522
final String username = context.getProperty(USERNAME).getValue();
536523
final String password = context.getProperty(PASSWORD).getValue();
537524
// set the credentials if appropriate
525+
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
526+
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
538527
if (username != null) {
539-
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
540528
if (password == null) {
541529
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
542530
} else {
543531
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
544532
}
545-
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
546533
}
547534

548535
// Set the proxy if specified
549-
if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
550-
final String host = context.getProperty(PROXY_HOST).getValue();
551-
final int port = context.getProperty(PROXY_PORT).asInteger();
552-
clientBuilder.setProxy(new HttpHost(host, port));
553-
}
536+
HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
554537

555538
client = clientBuilder.build();
556539

0 commit comments

Comments
 (0)