diff --git a/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java b/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java index 7944c04b4b..bc4d6f2dfa 100644 --- a/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java +++ b/cwms-data-api/src/main/java/cwms/cda/ApiServlet.java @@ -187,6 +187,8 @@ import io.javalin.http.Handler; import io.javalin.http.HttpResponseException; import io.javalin.http.JavalinServlet; +import cwms.cda.data.dto.csv.CwmsCsvDTO; +import cwms.cda.formatters.csv.CsvExampleGenerator; import io.javalin.plugin.openapi.OpenApiOptions; import io.javalin.plugin.openapi.OpenApiPlugin; import io.swagger.v3.oas.models.Components; @@ -194,6 +196,8 @@ import io.swagger.v3.oas.models.Operation; import io.swagger.v3.oas.models.PathItem; import io.swagger.v3.oas.models.info.Info; +import io.swagger.v3.oas.models.media.MediaType; +import io.swagger.v3.oas.models.responses.ApiResponse; import io.swagger.v3.oas.models.security.SecurityRequirement; import io.swagger.v3.oas.models.servers.Server; import java.io.IOException; @@ -224,6 +228,8 @@ import org.jooq.exception.DataAccessException; import org.owasp.html.HtmlPolicyBuilder; import org.owasp.html.PolicyFactory; +import io.github.classgraph.ClassGraph; +import io.github.classgraph.ScanResult; /** @@ -946,7 +952,37 @@ private void getOpenApiOptions(JavalinConfig config) { }); } }); - + Map> schemaToClass = new HashMap<>(); + try (ScanResult scanResult = new ClassGraph() + .acceptPackages("cwms.cda.data.dto") + .scan()) { + List> csvDtoClasses = scanResult.getClassesImplementing(CwmsCsvDTO.class.getName()) + .loadClasses(CwmsCsvDTO.class); + for (Class clazz : csvDtoClasses) { + schemaToClass.put(clazz.getSimpleName(), clazz); + } + } + api.getPaths().values().forEach(pathItem -> { + for (Operation op : pathItem.readOperations()) { + if (op.getResponses() != null) { + for (ApiResponse resp : op.getResponses().values()) { + if (resp.getContent() != null && resp.getContent().containsKey(Formats.CSV)) { + MediaType csvMedia = resp.getContent().get(Formats.CSV); + if (csvMedia.getSchema() != null && csvMedia.getSchema().get$ref() != null) { + String ref = csvMedia.getSchema().get$ref(); + String schemaName = ref.substring(ref.lastIndexOf('/') + 1); + @SuppressWarnings("unchecked") + Class> dtoClass = (Class>) schemaToClass.get(schemaName); + + if (dtoClass != null) { + csvMedia.setExample(CsvExampleGenerator.getExample(dtoClass)); + } + } + } + } + } + } + }); return api; }) .defaultDocumentation(doc -> { diff --git a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java index fef8f7322b..ed05a37267 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java @@ -40,7 +40,9 @@ import io.javalin.core.validation.Validator; import io.javalin.http.Context; import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -127,6 +129,8 @@ public final class Controllers { public static final String MATCH_NULL_PARENTS = "match-null-parents"; public static final String ENTITY_ID = "entity-id"; public static final String PARENT_ENTITY_ID = "parent-entity-id"; + public static final String INCLUDE_METADATA_AS_CSV_COMMENTS = "include-metadata-as-comments"; + public static final String INCLUDE_OPTIONAL_CSV_COLUMNS = "include-optional-csv-columns"; public static final String CREATE_AS_LRTS = "create-as-lrts"; public static final String STORE_RULE = "store-rule"; @@ -172,9 +176,8 @@ public final class Controllers { public static final String TS_IDS = "ts-ids"; public static final String EXAMPLE_DATE = "2021-06-10T13:00:00-07:00"; - public static final String DATE_FORMAT = "YYYY-MM-dd'T'hh:mm:ss[Z'['VV']']"; - public static final String TIME_FORMAT_DESC = "The format for this field is ISO 8601 extended" + - ", with optional offset and timezone, i.e., '" + DATE_FORMAT + "', e.g., '" + EXAMPLE_DATE + "'." ; + public static final String TIME_FORMAT_DESC = "The format for this field " + + "is ISO 8601 extended in UTC, e.g., 2026-06-18T19:42:00Z"; public static final String INCLUDE_ASSIGNED = "include-assigned"; public static final String ANY_MASK = "*"; @@ -215,6 +218,8 @@ public final class Controllers { public static final String AREA_UNIT = "area-unit"; public static final String STATION_UNIT = "station-unit"; public static final String STAGE_UNIT = "stage-unit"; + public static final String DATE_FORMAT = "date-format"; + public static final String DATE_FORMAT_PATTERN = "date-format-pattern"; public static final String TRIM = "trim"; public static final String DESIGNATOR = "designator"; public static final String DESIGNATOR_MASK = "designator-mask"; diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java index 5a977d9616..ad5aefbfe6 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java @@ -21,6 +21,10 @@ import cwms.cda.data.dao.VerticalDatum; import cwms.cda.data.dto.TimeSeries; import cwms.cda.formatters.ContentType; +import cwms.cda.formatters.DateFormatResolver; +import cwms.cda.formatters.DateFormat; +import cwms.cda.formatters.csv.CsvConfiguration; +import cwms.cda.data.dto.csv.TimeSeriesCsv; import cwms.cda.formatters.Formats; import cwms.cda.helpers.DateUtils; import io.javalin.apibuilder.CrudHandler; @@ -112,7 +116,7 @@ public class TimeSeriesController implements CrudHandler { private final MetricRegistry metrics; private final Histogram requestResultSize; - private static final int DEFAULT_PAGE_SIZE = 500; + static final int DEFAULT_PAGE_SIZE = 500; public TimeSeriesController(MetricRegistry metrics) { @@ -382,12 +386,26 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) { + "identifies where in the request you are. This is an opaque" + " value, and can be obtained from the 'next-page' value in " + "the response. Deprecated, use " + PAGE + " instead."), - @OpenApiParam(name = PAGE_SIZE, - type = Integer.class, + @OpenApiParam(name = PAGE_SIZE, type = Integer.class, description = "How many entries per page returned. " - + "Default " + DEFAULT_PAGE_SIZE + ". Use 0 to return an empty values array, " + + "For JSON/XML paging, this controls page size. " + + "For CSV, this controls the internal fetch batch size used while streaming a single response. " + + "CSV clients do not request subsequent pages. " + + "Default " + DEFAULT_PAGE_SIZE +". Use 0 to return an empty values array, " + "or -1 to return the entire window in one response without a next-page cursor. " - + "Values less than -1 are invalid.") + + "Values less than -1 are invalid."), + @OpenApiParam(name = INCLUDE_METADATA_AS_CSV_COMMENTS, type = Boolean.class, + description = "When true, include dataset metadata as csv header comments " + + "prepended with # (default is false)."), + @OpenApiParam(name = INCLUDE_OPTIONAL_CSV_COLUMNS, type = Boolean.class, + description = "When true, include optional columns (quality-code, data-entry-date) " + + "in the CSV response (default is false)."), + @OpenApiParam(name = DATE_FORMAT, + description = "Specifies the format of any dates in the response. " + + "Default is ISO8601-Instant. Other possibilities are epoch-millis, ISO8601-Offset, " + + "ISO8601-Local, date-only, and custom."), + @OpenApiParam(name = DATE_FORMAT_PATTERN, + description = "When date-format is set to 'custom', this parameter specifies the date format pattern.") }, responses = { @OpenApiResponse(status = STATUS_200, @@ -397,6 +415,7 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) { @OpenApiContent(from = TimeSeries.class, type = Formats.XMLV2), @OpenApiContent(from = TimeSeries.class, type = Formats.XML), @OpenApiContent(from = TimeSeries.class, type = Formats.JSON), + @OpenApiContent(from = TimeSeriesCsv.class, type= Formats.CSV), @OpenApiContent(from = TimeSeries.class, type = ""),}), @OpenApiResponse(status = STATUS_400, description = "Invalid parameter combination"), @OpenApiResponse(status = STATUS_404, description = "The provided combination of " @@ -444,8 +463,21 @@ public void getAll(@NotNull Context ctx) { Integer.class, DEFAULT_PAGE_SIZE, metrics, name(TimeSeriesController.class.getName(), GET_ALL))); + boolean includeMetadata = ctx.queryParamAsClass(INCLUDE_METADATA_AS_CSV_COMMENTS, Boolean.class) + .getOrDefault(false); + boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class) + .getOrDefault(false); + String dateFormatParam = ctx.queryParam(DATE_FORMAT); + String dateFormatPattern = ctx.queryParam(DATE_FORMAT_PATTERN); + String acceptHeader = ctx.header(Header.ACCEPT); ContentType contentType = Formats.parseHeaderAndQueryParm(acceptHeader, format, TimeSeries.class); + DateFormat dateFormat = DateFormatResolver.resolve(dateFormatParam, dateFormatPattern); + CsvConfiguration csvConfig = new CsvConfiguration.Builder() + .withMetadataIncluded(includeMetadata) + .withOptionalColumnsIncluded(includeOptionalColumns) + .withDateFormat(dateFormat) + .build(); String results; String version = contentType.getParameters().get(VERSION); @@ -471,6 +503,13 @@ public void getAll(@NotNull Context ctx) { .withShouldTrim(trim.getOrDefault(true)) .withIncludeEntryDate(includeEntryDate) .build(); + + // CSV: stream a single response; page-size is only internal batch size + if (Formats.CSV.equals(contentType.getType())) { + streamCsv(ctx, csvConfig, pageSize, dao, requestParameters); + return; + } + // Execute DAO call with a timeout so we can return a clearer message instead of a generic 500 int apiTimeoutMs = Integer.getInteger("cwms.cda.api.apiTimeoutMs", 45000); CompletableFuture daoFuture = CompletableFuture.supplyAsync( @@ -492,7 +531,7 @@ public void getAll(@NotNull Context ctx) { throw unwrapExecutionException(ex); } - if(datum != null) { //this will be null for non-elevation ts + if (datum != null) { //this will be null for non-elevation ts // user has requested a specific vertical datum VerticalDatum vd = VerticalDatum.valueOf(datum); // the users request ts = TimeSeriesVerticalDatumConverter.convertToVerticalDatum(ts, vd); @@ -512,6 +551,24 @@ public void getAll(@NotNull Context ctx) { ctx.header(Header.CONTENT_LENGTH, String.valueOf(bytes.length)); ctx.res.getOutputStream().write(bytes); } else { + String office = ctx.queryParam(OFFICE); + + // CSV: stream a single response; page-size is only internal batch size + if (Formats.CSV.equals(contentType.getType())) { + TimeSeriesRequestParameters requestParameters = new TimeSeriesRequestParameters.Builder() + .withNames(names) + .withOffice(office) + .withUnits(units) + .withBeginTime(beginZdt) + .withEndTime(endZdt) + .withShouldTrim(trim.getOrDefault(true)) + .withIncludeEntryDate(includeEntryDate) + .withVersionDate(versionDate) + .build(); + streamCsv(ctx, csvConfig, pageSize, dao, requestParameters); + return; + } + if (versionDate != null) { throw new IllegalArgumentException(String.format("Version date is only supported for:%s and %s", Formats.JSONV2, Formats.XMLV2)); @@ -526,7 +583,6 @@ public void getAll(@NotNull Context ctx) { format = "json"; } - String office = ctx.queryParam(OFFICE); results = dao.getTimeseries(format, names, office, units, datum, beginZdt, endZdt, tz); ctx.status(HttpServletResponse.SC_OK); @@ -536,6 +592,7 @@ public void getAll(@NotNull Context ctx) { ctx.header(Header.CONTENT_LENGTH, String.valueOf(bytes.length)); ctx.res.getOutputStream().write(bytes); } + addDeprecatedContentTypeWarning(ctx, contentType); } catch (NotFoundException e) { CdaError re = new CdaError("Not found."); @@ -554,6 +611,27 @@ public void getAll(@NotNull Context ctx) { } } + private void streamCsv(@NotNull Context ctx, CsvConfiguration csvConfig, int batchSize, TimeSeriesDao dao, TimeSeriesRequestParameters requestParameters) { + int csvBatchSize = validateCsvBatchSize(batchSize); + dao.streamRequestedTimeSeriesCsv( + requestParameters, + (stream, position, mediaType, totalLength) -> { + ctx.status(HttpServletResponse.SC_OK); + ctx.contentType(mediaType); + ctx.header(Header.CONTENT_TYPE, Formats.CSV + "; charset=UTF-8"); + ctx.header("X-Stream-Batch-Size", String.valueOf(batchSize)); + try (stream) { + IOUtils.copy(stream, ctx.res.getOutputStream()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + csvConfig, + null, + csvBatchSize //page-size drives streaming chunk size + ); + } + static RuntimeException unwrapExecutionException(java.util.concurrent.ExecutionException ex) { Throwable cause = ex.getCause(); if (cause instanceof RuntimeException) { @@ -566,6 +644,13 @@ static RuntimeException unwrapExecutionException(java.util.concurrent.ExecutionE return new RuntimeException(cause); } + private int validateCsvBatchSize(int requestedPageSize) { + if (requestedPageSize <= 0) { + throw new IllegalArgumentException("For CSV streaming, page-size must be greater than 0."); + } + return requestedPageSize; + } + private void addLinkHeader(@NotNull Context ctx, TimeSeries ts, ContentType contentType) { try { // Send back the link to the next page in the response header diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/CsvOnDemandInputStream.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/CsvOnDemandInputStream.java new file mode 100644 index 0000000000..612022192f --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/CsvOnDemandInputStream.java @@ -0,0 +1,178 @@ +package cwms.cda.data.dao; + +import com.google.common.flogger.FluentLogger; +import cwms.cda.data.dto.csv.TimeSeriesCsv; +import cwms.cda.data.dto.csv.TimeSeriesCsvRow; +import cwms.cda.formatters.csv.CsvConfiguration; +import cwms.cda.formatters.csv.CsvV1; +import org.jooq.Cursor; +import org.jooq.Record4; +import org.jooq.exception.DataAccessException; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +// InputStream that renders CSV rows on-demand from a jOOQ Cursor +final class CsvOnDemandInputStream extends InputStream { + private final Cursor> cursor; + private final Iterator> it; + private final CsvV1 csv; + private final String tsIdStr; + private final String officeId; + private final String units; + private final Timestamp versionTs; + private final int rowsPerBuffer; + private final CsvConfiguration csvConfiguration; + private final CsvConfiguration rowConfiguration; + + private byte[] buffer = new byte[0]; + private int bufPos = 0; + private boolean first = true; + private boolean closed = false; + + CsvOnDemandInputStream(Cursor> cursor, + CsvV1 csv, + String tsIdStr, + String officeId, + String units, + Timestamp versionTs, + CsvConfiguration csvConfiguration, + Integer rowsPerBuffer) { + this.cursor = cursor; + this.it = cursor.iterator(); + this.csv = csv; + this.tsIdStr = tsIdStr; + this.officeId = officeId; + this.units = units; + this.versionTs = versionTs; + this.csvConfiguration = csvConfiguration; + this.rowConfiguration = new CsvConfiguration.Builder() + .from(csvConfiguration) + .withMetadataIncluded(false) + .build(); + + int rpb = rowsPerBuffer == null ? 1 : rowsPerBuffer; + this.rowsPerBuffer = rpb > 0 ? rpb : 1; + } + + @Override + public int read() throws IOException { + byte[] one = new byte[1]; + int r = read(one, 0, 1); + return r == -1 ? -1 : (one[0] & 0xFF); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (b == null) { + throw new NullPointerException("Buffer is null"); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + int totalCopied = 0; + while (len > 0) { + if (bufPos >= buffer.length) { + if (!fillBuffer()) { + break; // EOF + } + } + + int toCopy = Math.min(len, buffer.length - bufPos); + System.arraycopy(buffer, bufPos, b, off, toCopy); + bufPos += toCopy; + off += toCopy; + len -= toCopy; + totalCopied += toCopy; + } + + return totalCopied == 0 ? -1 : totalCopied; + } + + private boolean fillBuffer() { + if (!it.hasNext()) { + buffer = new byte[0]; + bufPos = 0; + return false; + } + + List batch = new ArrayList<>(rowsPerBuffer); + int produced = 0; + + while (it.hasNext() && produced < rowsPerBuffer) { + Record4 r = it.next(); + + Timestamp ts = r.value1(); + Double val = r.value2(); + BigDecimal qualityCode = r.value3(); + Timestamp dataEntryDate = r.value4(); + + TimeSeriesCsvRow row = new TimeSeriesCsvRow.Builder() + .withDateTime(ts == null ? null : ts.toInstant()) + .withValue(val) + .withQualityCode(qualityCode == null ? null : qualityCode.intValue()) + .withDataEntryDate(dataEntryDate == null ? null : dataEntryDate.toInstant()) + .withUnits(units) + .build(); + + batch.add(row); + produced++; + } + + if (batch.isEmpty()) { + buffer = new byte[0]; + bufPos = 0; + return false; + } + + TimeSeriesCsv container = new TimeSeriesCsv.Builder() + .withTimeSeriesId(tsIdStr) + .withOfficeId(officeId) + .withVersionDate(versionTs == null ? null : versionTs.toInstant().toString()) + .withRows(batch) + .build(); + + String rendered = first + ? csv.format(container, csvConfiguration) + : csv.format(container, rowConfiguration); + + if (first) { + first = false; + } else { + // Remove header from subsequent writes + int headerEnd = rendered.indexOf('\n'); + // Check for \r\n as well + if (headerEnd != -1) { + if (headerEnd > 0 && rendered.charAt(headerEnd - 1) == '\r') { + // It was \r\n, skip the \n and start at headerEnd + 1 + } + rendered = rendered.substring(headerEnd + 1); + } + } + buffer = rendered.getBytes(StandardCharsets.UTF_8); + bufPos = 0; + return buffer.length > 0; + } + + @Override + public void close() { + if (!closed) { + closed = true; + try { + cursor.close(); + } catch (DataAccessException ex) { + FluentLogger.forEnclosingClass().atWarning().withCause(ex).log("Error closing database cursor"); + } + } + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/JooqDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/JooqDao.java index 2c862083cd..742887fe6a 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/JooqDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/JooqDao.java @@ -835,7 +835,7 @@ private static UnsupportedOperationException buildUnsupportedOperationException( return new UnsupportedOperationException("CWMS currently does not support the requested operation", cause); } - private static @Nullable String sanitizeOrNull(@Nullable String localizedMessage) { + protected static @Nullable String sanitizeOrNull(@Nullable String localizedMessage) { if (localizedMessage != null && !localizedMessage.isEmpty()) { int length = localizedMessage.length(); PolicyFactory sanitizer = new HtmlPolicyBuilder().disallowElements("