Skip to content

Commit 451949c

Browse files
committed
Merge branch 'master' of https://github.com/apache/nifi into aws-web-api
2 parents 391a3b3 + 2799211 commit 451949c

7 files changed

Lines changed: 119 additions & 7 deletions

File tree

nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,8 @@ private static RecordFieldType getFieldType(final int sqlType) {
350350
return RecordFieldType.TIME;
351351
case Types.TIMESTAMP:
352352
case Types.TIMESTAMP_WITH_TIMEZONE:
353+
case -101: // Oracle's TIMESTAMP WITH TIME ZONE
354+
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
353355
return RecordFieldType.TIMESTAMP;
354356
}
355357

nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.nifi.controller.leader.election;
1818

19-
import java.util.HashMap;
20-
import java.util.Map;
2119
import org.apache.commons.lang3.StringUtils;
2220
import org.apache.curator.RetryPolicy;
2321
import org.apache.curator.framework.CuratorFramework;
@@ -36,6 +34,9 @@
3634
import org.slf4j.Logger;
3735
import org.slf4j.LoggerFactory;
3836

37+
import java.util.HashMap;
38+
import java.util.Map;
39+
3940
public class CuratorLeaderElectionManager implements LeaderElectionManager {
4041

4142
private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
@@ -112,7 +113,7 @@ public synchronized void register(final String roleName, final LeaderElectionSta
112113
final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
113114

114115
if (!isStopped()) {
115-
final ElectionListener electionListener = new ElectionListener(roleName, listener);
116+
final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId);
116117
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
117118
if (isParticipant) {
118119
leaderSelector.autoRequeue();
@@ -358,12 +359,14 @@ private class ElectionListener extends LeaderSelectorListenerAdapter implements
358359

359360
private final String roleName;
360361
private final LeaderElectionStateChangeListener listener;
362+
private final String participantId;
361363

362364
private volatile boolean leader;
363365

364-
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
366+
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
365367
this.roleName = roleName;
366368
this.listener = listener;
369+
this.participantId = participantId;
367370
}
368371

369372
public boolean isLeader() {
@@ -373,9 +376,37 @@ public boolean isLeader() {
373376
@Override
374377
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
375378
logger.info("{} Connection State changed to {}", this, newState.name());
379+
380+
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
381+
if (leader == true) {
382+
logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
383+
}
384+
385+
leader = false;
386+
}
387+
376388
super.stateChanged(client, newState);
377389
}
378390

391+
/**
392+
* Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose
393+
* its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc.
394+
*
395+
* @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise
396+
*/
397+
private boolean verifyLeader() {
398+
final String leader = getLeader(roleName);
399+
if (leader == null) {
400+
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
401+
return false;
402+
}
403+
404+
final boolean match = leader.equals(participantId);
405+
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
406+
roleName, leader, participantId, match);
407+
return match;
408+
}
409+
379410
@Override
380411
public void takeLeadership(final CuratorFramework client) throws Exception {
381412
leader = true;
@@ -396,14 +427,40 @@ public void takeLeadership(final CuratorFramework client) throws Exception {
396427
// Curator API states that we lose the leadership election when we return from this method,
397428
// so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
398429
try {
399-
while (!isStopped()) {
430+
int failureCount = 0;
431+
int loopCount = 0;
432+
while (!isStopped() && leader) {
400433
try {
401434
Thread.sleep(100L);
402435
} catch (final InterruptedException ie) {
403436
logger.info("{} has been interrupted; no longer leader for role '{}'", this, roleName);
404437
Thread.currentThread().interrupt();
405438
return;
406439
}
440+
441+
if (leader && ++loopCount % 50 == 0) {
442+
// While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally
443+
// seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper
444+
// periodically to determine whether or not this node is still the elected leader.
445+
try {
446+
final boolean stillLeader = verifyLeader();
447+
failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0.
448+
449+
if (!stillLeader) {
450+
logger.info("According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName);
451+
break;
452+
}
453+
} catch (final Exception e) {
454+
failureCount++;
455+
if (failureCount > 1) {
456+
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
457+
+ "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e);
458+
} else {
459+
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
460+
+ "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e);
461+
}
462+
}
463+
}
407464
}
408465
} finally {
409466
leader = false;

nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ public Connection updateConnection(final ConnectionDTO connectionDTO) {
567567

568568
// configure the connection
569569
configureConnection(connection, connectionDTO);
570+
group.onComponentModified();
570571

571572
// update the relationships if necessary
572573
if (!newProcessorRelationships.isEmpty()) {

nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,10 @@ private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, fina
339339
componentType, componentName);
340340

341341
addField(builder, "componentId", status.getId());
342+
addField(builder, "sourceId", status.getSourceId());
343+
addField(builder, "sourceName", status.getSourceName());
344+
addField(builder, "destinationId", status.getDestinationId());
345+
addField(builder, "destinationName", status.getDestinationName());
342346
addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes());
343347
addField(builder, "maxQueuedCount", status.getMaxQueuedCount());
344348
addField(builder, "queuedBytes", status.getQueuedBytes());

nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.UUID;
3232

3333
import javax.json.Json;
34+
import javax.json.JsonObject;
3435
import javax.json.JsonReader;
3536
import javax.json.JsonString;
3637

@@ -144,8 +145,11 @@ public void testConnectionStatus() throws IOException, InitializationException {
144145

145146
final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
146147
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
147-
JsonString backpressure = jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled");
148+
JsonObject object = jsonReader.readArray().getJsonObject(0);
149+
JsonString backpressure = object.getJsonString("isBackPressureEnabled");
150+
JsonString source = object.getJsonString("sourceName");
148151
assertEquals("true", backpressure.getString());
152+
assertEquals("source", source.getString());
149153
}
150154

151155
@Test
@@ -318,6 +322,10 @@ public static ConnectionStatus generateConnectionStatus(String id, String namePr
318322
cStatus.setOutputCount(7);
319323
cStatus.setQueuedBytes(8l);
320324
cStatus.setQueuedCount(9);
325+
cStatus.setSourceId(id);
326+
cStatus.setSourceName("source");
327+
cStatus.setDestinationId(id);
328+
cStatus.setDestinationName("destination");
321329

322330
return cStatus;
323331
}

nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Set;
3535
import java.util.TimeZone;
36+
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.atomic.AtomicBoolean;
3738

3839
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -61,6 +62,7 @@
6162
import org.apache.nifi.serialization.RecordSetWriterFactory;
6263
import org.apache.nifi.serialization.record.RecordSchema;
6364
import org.apache.nifi.serialization.record.RecordSet;
65+
import org.apache.nifi.util.StopWatch;
6466
import org.apache.nifi.util.StringUtils;
6567

6668
import org.apache.solr.client.solrj.SolrQuery;
@@ -336,6 +338,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
336338
solrQuery.setParam("sort", sortClause.toString());
337339

338340
while (continuePaging.get()) {
341+
StopWatch timer = new StopWatch(true);
342+
339343
final QueryRequest req = new QueryRequest(solrQuery);
340344
if (isBasicAuthEnabled()) {
341345
req.setBasicAuthCredentials(getUsername(), getPassword());
@@ -385,8 +389,19 @@ public void process(final OutputStream out) throws IOException {
385389
}
386390
}
387391
});
392+
388393
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType.toString());
389394
}
395+
396+
timer.stop();
397+
StringBuilder transitUri = new StringBuilder("solr://");
398+
transitUri.append(getSolrLocation());
399+
if (getSolrLocation().equals(SolrUtils.SOLR_TYPE_CLOUD.getValue())) {
400+
transitUri.append(":").append(context.getProperty(COLLECTION).evaluateAttributeExpressions().getValue());
401+
}
402+
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
403+
session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration);
404+
390405
session.transfer(flowFile, REL_SUCCESS);
391406
}
392407
continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue()));

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static java.sql.Types.SMALLINT;
4242
import static java.sql.Types.TIME;
4343
import static java.sql.Types.TIMESTAMP;
44+
import static java.sql.Types.TIMESTAMP_WITH_TIMEZONE;
4445
import static java.sql.Types.TINYINT;
4546
import static java.sql.Types.VARBINARY;
4647
import static java.sql.Types.VARCHAR;
@@ -356,7 +357,28 @@ public static long convertToAvroStream(final ResultSet rs, final OutputStream ou
356357
continue;
357358
}
358359

359-
final Object value = rs.getObject(i);
360+
Object value;
361+
362+
// If a Timestamp type, try getTimestamp() rather than getObject()
363+
if (javaSqlType == TIMESTAMP
364+
|| javaSqlType == TIMESTAMP_WITH_TIMEZONE
365+
// The following are Oracle-specific codes for TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE. This would be better
366+
// located in the DatabaseAdapter interfaces, but some processors (like ExecuteSQL) use this method but don't specify a DatabaseAdapter.
367+
|| javaSqlType == -101
368+
|| javaSqlType == -102) {
369+
try {
370+
value = rs.getTimestamp(i);
371+
// Some drivers (like Derby) return null for getTimestamp() but return a Timestamp object in getObject()
372+
if (value == null) {
373+
value = rs.getObject(i);
374+
}
375+
} catch (Exception e) {
376+
// The cause of the exception is not known, but we'll fall back to call getObject() and handle any "real" exception there
377+
value = rs.getObject(i);
378+
}
379+
} else {
380+
value = rs.getObject(i);
381+
}
360382

361383
if (value == null) {
362384
rec.put(i - 1, null);
@@ -603,6 +625,9 @@ public static Schema createSchema(final ResultSet rs, AvroConversionOptions opti
603625
break;
604626

605627
case TIMESTAMP:
628+
case TIMESTAMP_WITH_TIMEZONE:
629+
case -101: // Oracle's TIMESTAMP WITH TIME ZONE
630+
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
606631
addNullableField(builder, columnName,
607632
u -> options.useLogicalTypes
608633
? u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()))

0 commit comments

Comments
 (0)