Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions conf/mdc/counter_weekly.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/bin/sh
#counter_weekly.sh

# This script iterates through all published Datasets in all Dataverses and calls the Make Data Count API to update their citations from DataCite
# Note: Requires curl and jq for parsing JSON responses form curl

# A recursive method to process each Dataverse
processDV () {
echo "Processing Dataverse ID#: $1"

#Call the Dataverse API to get the contents of the Dataverse (without credentials, this will only list published datasets and dataverses
DVCONTENTS=$(curl -s http://localhost:8080/api/dataverses/$1/contents)

# Iterate over all datasets, pulling the value of their DOIs (as part of the persistentUrl) from the json returned
for subds in $(echo "${DVCONTENTS}" | jq -r '.data[] | select(.type == "dataset") | .persistentUrl'); do

#The authority/identifier are preceded by a protocol/host, i.e. https://doi.org/
DOI=`expr "$subds" : '.*:\/\/\doi\.org\/\(.*\)'`

# Call the Dataverse API for this dataset and capture both the response and HTTP status code
HTTP_RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "http://localhost:8080/api/admin/makeDataCount/:persistentId/updateCitationsForDataset?persistentId=doi:$DOI")

# Extract the HTTP status code from the last line
HTTP_STATUS=$(echo "$HTTP_RESPONSE" | tail -n1)
# Extract the response body (everything except the last line)
RESPONSE_BODY=$(echo "$HTTP_RESPONSE" | sed '$d')

# Check the HTTP status code and report accordingly
case $HTTP_STATUS in
200)
# Successfully queued
# Extract status from the nested data object
STATUS=$(echo "$RESPONSE_BODY" | jq -r '.data.status')

# Extract message from the nested data object
if echo "$RESPONSE_BODY" | jq -e '.data.message' > /dev/null 2>&1 && [ "$(echo "$RESPONSE_BODY" | jq -r '.data.message')" != "null" ]; then
MESSAGE=$(echo "$RESPONSE_BODY" | jq -r '.data.message')
echo "[SUCCESS] doi:$DOI - $STATUS: $MESSAGE"
else
# If message is missing or null, just show the status
echo "[SUCCESS] doi:$DOI - $STATUS: Citation update queued"
fi
;;
400)
# Bad request
if echo "$RESPONSE_BODY" | jq -e '.message' > /dev/null 2>&1; then
ERROR=$(echo "$RESPONSE_BODY" | jq -r '.message')
echo "[ERROR 400] doi:$DOI - Bad request: $ERROR"
else
echo "[ERROR 400] doi:$DOI - Bad request"
fi
;;
404)
# Not found
if echo "$RESPONSE_BODY" | jq -e '.message' > /dev/null 2>&1; then
ERROR=$(echo "$RESPONSE_BODY" | jq -r '.message')
echo "[ERROR 404] doi:$DOI - Not found: $ERROR"
else
echo "[ERROR 404] doi:$DOI - Not found"
fi
;;
503)
# Service unavailable (queue full)
if echo "$RESPONSE_BODY" | jq -e '.message' > /dev/null 2>&1; then
ERROR=$(echo "$RESPONSE_BODY" | jq -r '.message')
echo "[ERROR 503] doi:$DOI - Service unavailable: $ERROR"
elif echo "$RESPONSE_BODY" | jq -e '.data.message' > /dev/null 2>&1; then
ERROR=$(echo "$RESPONSE_BODY" | jq -r '.data.message')
echo "[ERROR 503] doi:$DOI - Service unavailable: $ERROR"
else
echo "[ERROR 503] doi:$DOI - Service unavailable: Queue is full"
fi
;;
*)
# Other error
echo "[ERROR $HTTP_STATUS] doi:$DOI - Unexpected error"
echo "Response: $RESPONSE_BODY"
;;
esac

done

# Now iterate over any child Dataverses and recursively process them
for subdv in $(echo "${DVCONTENTS}" | jq -r '.data[] | select(.type == "dataverse") | .id'); do
echo $subdv
processDV $subdv
done

}

# Call the function on the root dataverse to start processing
processDV 1
7 changes: 7 additions & 0 deletions doc/release-notes/11777-MDC-citation-api-improvement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
The /api/admin/makeDataCount/{id}/updateCitationsForDataset endpoint, which allows citations for a dataset to be retrieved from DataCite, is often called periodically for all datasets. However, allowing calls for many datasets to be processed in parallel can cause performance problems in Dataverse and/or cause calls to DataCite to fail due to rate limiting. The existing implementation was also inefficient w.r.t. memory use when used on datasets with many (>~1K) files. This release configures Dataverse to queue calls to this api, processes them serially, adds optional throttling to avoid hitting DataCite rate limits and improves memory use.

New optional MPConfig setting:

dataverse.api.mdc.min-delay-ms - number of milliseconds to wait between calls to DataCite. A value of ~100 should conservatively address DataCite's current 3000/5 minute limit. A value of 250 may be required for their test service.

Backward compatibility: This api call is now asynchronous and will return an OK response when the call is queued or a 503 if the queue is full.
2 changes: 2 additions & 0 deletions doc/sphinx-guides/source/admin/make-data-count.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ The example :download:`counter_weekly.sh <../_static/util/counter_weekly.sh>` wi

Citations will be retrieved for each published dataset and recorded in the your Dataverse installation's database.

Note that the :ref:`dataverse.api.mdc.min-delay-ms` setting can be used to avoid getting rate-limit errors from DataCite.

For how to get the citations out of your Dataverse installation, see "Retrieving Citations for a Dataset" under :ref:`Dataset Metrics <dataset-metrics-api>` in the :doc:`/api/native-api` section of the API Guide.

Please note that while the Dataverse Software has a metadata field for "Related Dataset" this information is not currently sent as a citation to Crossref.
Expand Down
4 changes: 4 additions & 0 deletions doc/sphinx-guides/source/api/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ This API changelog is experimental and we would love feedback on its usefulness.
:local:
:depth: 1

v6.9
----
- The POST /api/admin/makeDataCount/{id}/updateCitationsForDataset processing is now asynchronous and the response no longer includes the number of citations. The response can be OK if the request is queued or 503 if the queue is full (default queue size is 1000).

v6.8
----

Expand Down
16 changes: 16 additions & 0 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3731,6 +3731,22 @@ Example:

Can also be set via any `supported MicroProfile Config API source`_, e.g. the environment variable ``DATAVERSE_CORS_HEADERS_EXPOSE``.


.. _dataverse.api.mdc.min-delay-ms:

dataverse.api.mdc.min-delay-ms
++++++++++++++++++++++++++++++

Minimum delay in milliseconds between Make Data Count (MDC) API requests from the /api/admin/makeDataCount/{id}/updateCitationsForDataset api.
This setting helps prevent overloading the MDC service by enforcing a minimum time interval between consecutive requests.
If a request arrives before this interval has elapsed since the previous request, it will be rate-limited.

Default: ``0`` (no delay enforced)

Example: ``dataverse.api.mdc.min-delay-ms=100`` (enforces a minimum 100ms delay between MDC API requests)

Can also be set via any `supported MicroProfile Config API source`_, e.g. the environment variable ``DATAVERSE_API_MDC_MIN_DELAY_MS``.

.. _feature-flags:

Feature Flags
Expand Down
172 changes: 142 additions & 30 deletions src/main/java/edu/harvard/iq/dataverse/api/MakeDataCountApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@
import java.net.URL;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import jakarta.annotation.Resource;
import jakarta.ejb.EJB;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
Expand Down Expand Up @@ -62,6 +68,13 @@ public class MakeDataCountApi extends AbstractApiBean {
@EJB
SystemConfig systemConfig;

// Inject the managed executor service provided by the container
@Resource(name = "concurrent/CitationUpdateExecutor")
private ManagedExecutorService executorService;

// Track the last execution time to implement rate limiting during Citation updates
private static final AtomicLong lastExecutionTime = new AtomicLong(0);

/**
* TODO: For each dataset, send the following:
*
Expand Down Expand Up @@ -141,69 +154,166 @@ public Response addUsageMetricsFromSushiReportAll(@QueryParam("reportOnDisk") St

@POST
@Path("{id}/updateCitationsForDataset")
public Response updateCitationsForDataset(@PathParam("id") String id) throws IOException {
public Response updateCitationsForDataset(@PathParam("id") String id) {
try {
Dataset dataset = findDatasetOrDie(id);
GlobalId pid = dataset.getGlobalId();
PidProvider pidProvider = PidUtil.getPidProvider(pid.getProviderId());
// First validate that the dataset exists and has a valid DOI
final Dataset dataset = findDatasetOrDie(id);
final GlobalId pid = dataset.getGlobalId();
final PidProvider pidProvider = PidUtil.getPidProvider(pid.getProviderId());

// Only supported for DOIs and for DataCite DOI providers
if(!DataCiteDOIProvider.TYPE.equals(pidProvider.getProviderType())) {
if (!DataCiteDOIProvider.TYPE.equals(pidProvider.getProviderType())) {
return error(Status.BAD_REQUEST, "Only DataCite DOI providers are supported");
}
String persistentId = pid.toString();

// DataCite wants "doi=", not "doi:".
String authorityPlusIdentifier = persistentId.replaceFirst("doi:", "");
// Request max page size and then loop to handle multiple pages
URL url = null;
// Submit the task to the managed executor service
Future<?> future;
try {
url = new URI(JvmSettings.DATACITE_REST_API_URL.lookup(pidProvider.getId()) +
"/events?doi=" +
authorityPlusIdentifier +
"&source=crossref&page[size]=1000&page[cursor]=1").toURL();
} catch (URISyntaxException e) {
//Nominally this means a config error/ bad DATACITE_REST_API_URL for this provider
logger.warning("Unable to create URL for " + persistentId + ", pidProvider " + pidProvider.getId());
return error(Status.INTERNAL_SERVER_ERROR, "Unable to create DataCite URL to retrieve citations.");
future = executorService.submit(() -> {
try {
// Apply rate limiting if enabled
applyRateLimit();

// Process the citation update
boolean success = processCitationUpdate(dataset, pid, pidProvider);

// Update the last execution time after processing
lastExecutionTime.set(System.currentTimeMillis());

if (success) {
logger.fine("Successfully processed citation update for dataset " + id);
} else {
logger.warning("Failed to process citation update for dataset " + id);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Error processing citation update for dataset " + id, e);
}
});

JsonObjectBuilder output = Json.createObjectBuilder();
output.add("status", "queued");
output.add("message", "Citation update for dataset " + id + " has been queued for processing");
return ok(output);
} catch (RejectedExecutionException ree) {
logger.warning("Citation update for dataset " + id + " was rejected: Queue is full");
return error(Status.SERVICE_UNAVAILABLE,
"Citation update service is currently at capacity. Please try again later.");
}
logger.fine("Retrieving Citations from " + url.toString());
boolean nextPage = true;
JsonArrayBuilder dataBuilder = Json.createArrayBuilder();
} catch (WrappedResponse wr) {
return wr.getResponse();
}
}

/**
* Apply rate limiting by waiting if necessary
*/
private void applyRateLimit() {
// Check if rate limiting is enabled
long minDelay = JvmSettings.API_MDC_UPDATE_MIN_DELAY_MS.lookupOptional(Long.class).orElse(0l);
if(minDelay ==0) {
return;
}
// Calculate how long to wait
long lastExecution = lastExecutionTime.get();
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastExecution;

// If not enough time has passed since the last execution, wait
if (lastExecution > 0 && elapsedTime < minDelay) {
long waitTime = minDelay - elapsedTime;
logger.fine("Rate limiting: waiting " + waitTime + " ms before processing next citation update");
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warning("Rate limiting sleep interrupted: " + e.getMessage());
}
}
}

/**
* Process the citation update for a dataset
* This method contains the logic that was previously in updateCitationsForDataset
* @return true if processing was successful, false otherwise
*/
private boolean processCitationUpdate(Dataset dataset, GlobalId pid, PidProvider pidProvider) {
String persistentId = pid.asRawIdentifier();

// Request max page size and then loop to handle multiple pages
URL url = null;
try {
url = new URI(JvmSettings.DATACITE_REST_API_URL.lookup(pidProvider.getId()) +
"/events?doi=" +
persistentId +
"&source=crossref&page[size]=1000&page[cursor]=1").toURL();
} catch (URISyntaxException | MalformedURLException e) {
//Nominally this means a config error/ bad DATACITE_REST_API_URL for this provider
logger.warning("Unable to create URL for " + persistentId + ", pidProvider " + pidProvider.getId());
return false;
}

logger.fine("Retrieving Citations from " + url.toString());
boolean nextPage = true;
JsonArrayBuilder dataBuilder = Json.createArrayBuilder();

try {
do {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
int status = connection.getResponseCode();
if (status != 200) {
logger.warning("Failed to get citations from " + url.toString());
connection.disconnect();
return error(Status.fromStatusCode(status), "Failed to get citations from " + url.toString());
return false;
}

JsonObject report;
try (InputStream inStream = connection.getInputStream()) {
report = JsonUtil.getJsonObject(inStream);
} finally {
connection.disconnect();
}

JsonObject links = report.getJsonObject("links");
JsonArray data = report.getJsonArray("data");
Iterator<JsonValue> iter = data.iterator();
while (iter.hasNext()) {
dataBuilder.add(iter.next());
JsonValue citationValue = iter.next();
JsonObject citation = (JsonObject) citationValue;

// Filter out relations we don't use (e.g. hasPart) to lower memory req. with many files
if (citation.containsKey("attributes")) {
JsonObject attributes = citation.getJsonObject("attributes");
if (attributes.containsKey("relation-type-id")) {
String relationshipType = attributes.getString("relation-type-id");

// Only add citations with relationship types we care about
if (DatasetExternalCitationsServiceBean.inboundRelationships.contains(relationshipType) ||
DatasetExternalCitationsServiceBean.outboundRelationships.contains(relationshipType)) {
dataBuilder.add(citationValue);
}
}
}
}

if (links.containsKey("next")) {
try {
url = new URI(links.getString("next")).toURL();
applyRateLimit();
} catch (URISyntaxException e) {
logger.warning("Unable to create URL from DataCite response: " + links.getString("next"));
return error(Status.INTERNAL_SERVER_ERROR, "Unable to retrieve all results from DataCite");
return false;
}
} else {
nextPage = false;
}

logger.fine("body of citation response: " + report.toString());
} while (nextPage == true);

JsonArray allData = dataBuilder.build();
List<DatasetExternalCitations> datasetExternalCitations = datasetExternalCitationsService.parseCitations(allData);

/*
* ToDo: If this is the only source of citations, we should remove all the existing ones for the dataset and repopulate them.
* As is, this call doesn't remove old citations if there are now none (legacy issue if we decide to stop counting certain types of citation
Expand All @@ -216,14 +326,16 @@ public Response updateCitationsForDataset(@PathParam("id") String id) throws IOE
datasetExternalCitationsService.save(dm);
}
}

JsonObjectBuilder output = Json.createObjectBuilder();
output.add("citationCount", datasetExternalCitations.size());
return ok(output);
} catch (WrappedResponse wr) {
return wr.getResponse();

logger.fine("Citation update completed for dataset " + dataset.getId() +
" with " + datasetExternalCitations.size() + " citations");
return true;
} catch (IOException e) {
logger.log(Level.WARNING, "Error processing citation update for dataset " + dataset.getId(), e);
return false;
}
}

@GET
@Path("{yearMonth}/processingState")
public Response getProcessingState(@PathParam("yearMonth") String yearMonth) {
Expand Down
Loading