Skip to content

Commit 0ac1a3f

Browse files
committed
NIFI-4199: Consistent proxy support across components
- Applied ProxyConfigurationService to ElasticsearchHttp processors - ElasticsearchHttp processors now support SOCKS proxy, too
1 parent d0b19ea commit 0ac1a3f

2 files changed

Lines changed: 35 additions & 19 deletions

File tree

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ language governing permissions and limitations under the License. -->
5656
<groupId>org.apache.nifi</groupId>
5757
<artifactId>nifi-record</artifactId>
5858
</dependency>
59+
<dependency>
60+
<groupId>org.apache.nifi</groupId>
61+
<artifactId>nifi-proxy-configuration-api</artifactId>
62+
</dependency>
5963
<dependency>
6064
<groupId>org.apache.commons</groupId>
6165
<artifactId>commons-lang3</artifactId>

nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@
3333
import org.apache.nifi.processor.ProcessContext;
3434
import org.apache.nifi.processor.exception.ProcessException;
3535
import org.apache.nifi.processor.util.StandardValidators;
36+
import org.apache.nifi.proxy.ProxyConfiguration;
37+
import org.apache.nifi.proxy.ProxyConfigurationService;
3638
import org.apache.nifi.ssl.SSLContextService;
3739
import org.apache.nifi.util.StringUtils;
3840

3941
import javax.net.ssl.SSLContext;
4042
import java.io.IOException;
4143
import java.io.InputStream;
42-
import java.net.InetSocketAddress;
4344
import java.net.Proxy;
4445
import java.net.URL;
4546
import java.util.ArrayList;
@@ -141,6 +142,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String proper
141142
static {
142143
final List<PropertyDescriptor> properties = new ArrayList<>();
143144
properties.add(ES_URL);
145+
properties.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
144146
properties.add(PROXY_HOST);
145147
properties.add(PROXY_PORT);
146148
properties.add(PROXY_USERNAME);
@@ -164,28 +166,38 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE
164166
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
165167

166168
// Add a proxy if set
167-
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
168-
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
169-
if (proxyHost != null && proxyPort != null) {
170-
final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
169+
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
170+
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
171+
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
172+
if (proxyHost != null && proxyPort != null) {
173+
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
174+
componentProxyConfig.setProxyServerHost(proxyHost);
175+
componentProxyConfig.setProxyServerPort(proxyPort);
176+
componentProxyConfig.setProxyUserName(context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue());
177+
componentProxyConfig.setProxyUserPassword(context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
178+
return componentProxyConfig;
179+
}
180+
return ProxyConfiguration.DIRECT_CONFIGURATION;
181+
});
182+
183+
if (!Proxy.Type.DIRECT.equals(proxyConfig.getProxyType())) {
184+
final Proxy proxy = proxyConfig.createProxy();
171185
okHttpClient.proxy(proxy);
172-
}
173186

174-
final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
175-
final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
176-
177-
if (proxyUsername != null && proxyPassword != null){
178-
okHttpClient.proxyAuthenticator(new Authenticator() {
179-
@Override
180-
public Request authenticate(Route route, Response response) throws IOException {
181-
final String credential=Credentials.basic(proxyUsername, proxyPassword);
182-
return response.request().newBuilder()
183-
.header("Proxy-Authorization", credential)
184-
.build();
185-
}
186-
});
187+
if (proxyConfig.hasCredential()){
188+
okHttpClient.proxyAuthenticator(new Authenticator() {
189+
@Override
190+
public Request authenticate(Route route, Response response) throws IOException {
191+
final String credential=Credentials.basic(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
192+
return response.request().newBuilder()
193+
.header("Proxy-Authorization", credential)
194+
.build();
195+
}
196+
});
197+
}
187198
}
188199

200+
189201
// Set timeouts
190202
okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
191203
okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);

0 commit comments

Comments
 (0)