Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 178 additions & 6 deletions cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
Expand All @@ -59,6 +61,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand All @@ -67,6 +70,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -177,9 +181,18 @@ public class TimeSeriesDaoImpl extends JooqDao<TimeSeries> implements TimeSeries
private final Histogram getRequestedTimeSeriesResultsReturnedHistogram;
@NotNull
private final Histogram getRequestedTimeSeriesRequestWindowMillisHistogram;
@NotNull
private final MetricRegistry metrics;
private final boolean forceOldLrtsFormatting;

public TimeSeriesDaoImpl(DSLContext dsl, @NotNull MetricRegistry metrics) {
this(dsl, metrics, false);
}

private TimeSeriesDaoImpl(DSLContext dsl, @NotNull MetricRegistry metrics, boolean forceOldLrtsFormatting) {
super(dsl);
this.metrics = metrics;
this.forceOldLrtsFormatting = forceOldLrtsFormatting;

String className = this.getClass().getName();
CacheStats stats = isVersionedCache.stats();
Expand Down Expand Up @@ -669,6 +682,50 @@ private TimeSeries buildTimeSeriesFromMetadata(Record tsMetadata, @Nullable Inte

private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize,
@NotNull TimeSeriesRequestParameters requestParameters) {
if (isPseudoIrregularOldStyleLocalRegularId(requestParameters)) {
return getRequestedTimeSeriesDirectWithOldLrtsFormatting(page, pageSize, requestParameters);
}
return getRequestedTimeSeriesDirectForSession(page, pageSize, requestParameters);
}

private TimeSeries getRequestedTimeSeriesDirectWithOldLrtsFormatting(String page, int pageSize,
@NotNull TimeSeriesRequestParameters
requestParameters) {
return connectionResult(dsl, conn -> {
DSLContext oldLrtsDsl = DSL.using(conn, SQLDialect.ORACLE18C);
setOldLrtsFormatting(oldLrtsDsl);
TimeSeriesDaoImpl oldLrtsDao = new TimeSeriesDaoImpl(oldLrtsDsl, metrics, true);
return oldLrtsDao.getRequestedTimeSeriesDirectForSession(page, pageSize, requestParameters);
});
}

private boolean isPseudoIrregularOldStyleLocalRegularId(@NotNull TimeSeriesRequestParameters requestParameters) {
if (!isOldStyleLocalRegularId(requestParameters.getNames())) {
return false;
}
return connectionResult(dsl, conn -> {
DSLContext oldLrtsDsl = DSL.using(conn, SQLDialect.ORACLE18C);
setOldLrtsFormatting(oldLrtsDsl);
TimeSeriesDaoImpl oldLrtsDao = new TimeSeriesDaoImpl(oldLrtsDsl, metrics, true);
DirectReadMetadata metadata = oldLrtsDao.fetchRequestedTimeSeriesMetadataRecord(requestParameters);
return metadata != null && metadata.intervalUtcOffset == UTC_OFFSET_IRREGULAR;
});
}

private static void setOldLrtsFormatting(DSLContext oldLrtsDsl) {
CWMS_UTIL_PACKAGE.call_SET_SESSION_INFO(oldLrtsDsl.configuration(),
SESSION_USE_LRTS_ID_FORMAT, formatBool(false), REQUIRE_OLD_LRTS_ID_FORMAT);
}

private static boolean isOldStyleLocalRegularId(String tsId) {
String[] parts = splitTimeSeriesId(tsId);
String intervalPart = getTimeSeriesIdPart(parts, 3);
return intervalPart != null && intervalPart.startsWith("~");
}

private TimeSeries getRequestedTimeSeriesDirectForSession(String page, int pageSize,
@NotNull TimeSeriesRequestParameters
requestParameters) {
String names = requestParameters.getNames();
String office = requestParameters.getOffice();
String requestedUnits = requestParameters.getUnits();
Expand All @@ -679,6 +736,10 @@ private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize,
String cursor = null;
Timestamp tsCursor = null;

if (forceOldLrtsFormatting) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this cares about the LRTS formatting. There is a function that just tells you:

https://github.com/HydrologicEngineeringCenter/cwms-database/blob/648b849254db226bedd9d9a38e94628df28ec9d5/schema/src/cwms/cwms_ts_pkg.sql#L1064

use JOOQ to call (may already be a helper somewhere in CDA that caches the response.

Basic logic:

  1. Is LRTS -> build expected times and map it out
  2. Is not LRTS -> just return the data.

setOldLrtsFormatting(dsl);
}

if (page != null && !page.isEmpty()) {
final String[] parts = CwmsDTOPaginated.decodeCursor(page);

Expand Down Expand Up @@ -727,10 +788,13 @@ private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize,

// Pagination happens after regular-interval gap rows are merged
// fetch the full raw window first
if (forceOldLrtsFormatting) {
setOldLrtsFormatting(dsl);
}
List<TimeSeries.Record> rawRows = fetchRequestedTimeSeriesRows(tsCode, metadataOfficeId, nativeUnits,
metadataUnits, requestParameters, includeEntryDate);
long effectiveIntervalOffset = intervalOffset;
if (isRegularSeries(intervalMinutes, intervalPart)) {
if (isRegularSeries(intervalMinutes, intervalOffset, intervalPart, isLrts)) {
effectiveIntervalOffset = resolveIntervalOffset(intervalOffset, timeZoneId, intervalPart, isLrts, rawRows);
}

Expand All @@ -747,7 +811,7 @@ private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize,
beginTime,
endTime,
metadataUnits,
resolveIntervalDuration(intervalMinutes, intervalPart),
resolveIntervalDuration(intervalMinutes, intervalOffset, intervalPart, isLrts),
verticalDatumInfo,
effectiveIntervalOffset,
timeZoneId,
Expand Down Expand Up @@ -843,6 +907,11 @@ private List<TimeSeries.Record> fetchRequestedTimeSeriesRows(long tsCode, String
String requestedUnits,
TimeSeriesRequestParameters requestParameters,
boolean includeEntryDate) {
if (forceOldLrtsFormatting) {
return fetchRequestedTimeSeriesRowsWithJdbc(tsCode, officeId, nativeUnits, requestedUnits,
requestParameters, includeEntryDate);
}

ZonedDateTime beginTime = requestParameters.getBeginTime();
ZonedDateTime endTime = requestParameters.getEndTime();
ZonedDateTime versionDate = requestParameters.getVersionDate();
Expand Down Expand Up @@ -890,6 +959,103 @@ private List<TimeSeries.Record> fetchRequestedTimeSeriesRows(long tsCode, String
});
}

private List<TimeSeries.Record> fetchRequestedTimeSeriesRowsWithJdbc(
long tsCode, String officeId, String nativeUnits, String requestedUnits,
TimeSeriesRequestParameters requestParameters, boolean includeEntryDate) {
return connectionResult(dsl, conn -> {
setOldLrtsFormatting(DSL.using(conn, SQLDialect.ORACLE18C));
ZonedDateTime versionDate = requestParameters.getVersionDate();
String sql = versionDate != null
? buildVersionedRowsSql(includeEntryDate)
: buildMaxVersionRowsSql(includeEntryDate);
try (PreparedStatement statement = conn.prepareStatement(sql)) {
bindDirectRowQuery(statement, tsCode, officeId, nativeUnits, requestedUnits,
requestParameters, versionDate);
try (ResultSet resultSet = statement.executeQuery()) {
List<TimeSeries.Record> rows = new ArrayList<>();
Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
while (resultSet.next()) {
Timestamp dateTime = resultSet.getTimestamp(DATE_TIME, utcCalendar);
Double value = resultSet.getDouble(VALUE);
if (resultSet.wasNull()) {
value = null;
}
int qualityCode = resultSet.getInt("quality_norm");
Timestamp dataEntryDate = includeEntryDate
? resultSet.getTimestamp(DATA_ENTRY_DATE, utcCalendar)
: null;
if (includeEntryDate) {
rows.add(new TimeSeries.Record(dateTime, value, qualityCode, dataEntryDate));
} else {
rows.add(new TimeSeries.Record(dateTime, value, qualityCode));
}
}
return rows;
}
} catch (SQLException ex) {
throw new DataAccessException("Unable to fetch direct time series rows", ex);
}
});
}

private static String buildVersionedRowsSql(boolean includeEntryDate) {
return "select date_time,"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can build and return a JOOQ query. keeps the type safety.

NOTE: some of those elements will still need to be text.

NOTE2: since we have to do things in a loop anyways to build the output, it will likely be faster to do the normalization of the quality_code in Java. Moving between plsql and sql does cause a context switch which has some performance concerns.

NOTE3: av_tsv_dqu already handles the unit conversions (that's what the U stands for in dqu), you just put units_id = ... in the where clause. Additional conversions are no-ops, but force the context switch.

+ " cwms_20.cwms_util.convert_units(value, unit_id, ?) value,"
+ " cwms_20.cwms_ts.normalize_quality(nvl(cast(quality_code as number), 5)) quality_norm,"
+ (includeEntryDate ? " data_entry_date" : " cast(null as timestamp) data_entry_date")
+ " from cwms_20.av_tsv_dqu"
+ " where aliased_item is null"
+ " and ts_code = ?"
+ " and office_id = ?"
+ " and lower(unit_id) = lower(?)"
+ " and date_time >= ?"
+ " and date_time <= ?"
+ " and start_date <= ?"
+ " and end_date > ?"
+ " and version_date = ?"
+ " order by date_time";
}

private static String buildMaxVersionRowsSql(boolean includeEntryDate) {
return "select date_time, value, quality_norm, data_entry_date from ("
+ " select date_time,"
+ " cwms_20.cwms_util.convert_units(value, unit_id, ?) value,"
+ " cwms_20.cwms_ts.normalize_quality(nvl(cast(quality_code as number), 5)) quality_norm,"
+ (includeEntryDate ? " data_entry_date" : " cast(null as timestamp) data_entry_date")
+ ", row_number() over (partition by date_time order by version_date desc, data_entry_date desc)"
+ " version_rank"
+ " from cwms_20.av_tsv_dqu"
+ " where aliased_item is null"
+ " and ts_code = ?"
+ " and office_id = ?"
+ " and lower(unit_id) = lower(?)"
+ " and date_time >= ?"
+ " and date_time <= ?"
+ " and start_date <= ?"
+ " and end_date > ?"
+ ") where version_rank = 1"
+ " order by date_time";
}

private static void bindDirectRowQuery(PreparedStatement statement, long tsCode, String officeId,
String nativeUnits, String requestedUnits,
TimeSeriesRequestParameters requestParameters,
ZonedDateTime versionDate) throws SQLException {
Timestamp beginTimestamp = Timestamp.from(requestParameters.getBeginTime().toInstant());
Timestamp endTimestamp = Timestamp.from(requestParameters.getEndTime().toInstant());
statement.setString(1, requestedUnits);
statement.setLong(2, tsCode);
statement.setString(3, officeId);
statement.setString(4, nativeUnits);
statement.setTimestamp(5, beginTimestamp);
statement.setTimestamp(6, endTimestamp);
statement.setTimestamp(7, endTimestamp);
statement.setTimestamp(8, beginTimestamp);
if (versionDate != null) {
statement.setTimestamp(9, Timestamp.from(versionDate.toInstant()));
}
}

private ResultQuery<Record4<Timestamp, Double, BigDecimal, Timestamp>> buildVersionedRowsQuery(
AV_TSV_DQU view,
Field<Double> value,
Expand Down Expand Up @@ -951,7 +1117,7 @@ private List<Timestamp> fetchExpectedRegularTimes(long intervalMinutes, long int
TimeSeriesRequestParameters requestParameters,
List<TimeSeries.Record> rawRows) {
boolean shouldTrim = requestParameters.isShouldTrim();
if (!isRegularSeries(intervalMinutes, intervalPart)) {
if (!isRegularSeries(intervalMinutes, intervalOffset, intervalPart, isLrts)) {
return Collections.emptyList();
}
// Trimmed requests collapse to the observed data window
Expand Down Expand Up @@ -1029,11 +1195,17 @@ private long resolveIntervalOffset(long intervalOffset, String timeZoneId,
return (rawRows.get(0).getDateTime().getTime() - topOfInterval.getTime()) / TimeUnit.MINUTES.toMillis(1);
}

private boolean isRegularSeries(long intervalMinutes, String intervalPart) {
return intervalMinutes != 0L || isLocalRegularInterval(intervalPart);
private boolean isRegularSeries(long intervalMinutes, long intervalOffset, String intervalPart, boolean isLrts) {
return intervalOffset != UTC_OFFSET_IRREGULAR
&& (intervalMinutes != 0L || (isLrts && isLocalRegularInterval(intervalPart)));
}

private Duration resolveIntervalDuration(long intervalMinutes, String intervalPart) {
private Duration resolveIntervalDuration(long intervalMinutes, long intervalOffset,
String intervalPart, boolean isLrts) {
if (!isRegularSeries(intervalMinutes, intervalOffset, intervalPart, isLrts)) {
return Duration.ZERO;
}

if (intervalMinutes != 0L) {
return Duration.ofMinutes(intervalMinutes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import cwms.cda.ApiServlet;
import cwms.cda.api.enums.VersionType;
import cwms.cda.data.dto.TimeSeries;
import cwms.cda.formatters.Formats;
Expand Down Expand Up @@ -159,6 +160,32 @@ void irregularReadMatchesRetrieveTs() throws Exception {
);
}

@Test
void pseudoIrregularReadWithLrtsHeaderMatchesRetrieveTs() throws Exception {
String seriesId = "ITPARPIRR.Flow.Inst.~15Minutes.0.BENCH";
Instant beginTime = Instant.parse("2024-01-05T12:00:00Z");
Instant endTime = Instant.parse("2024-01-05T13:00:00Z");
List<SeedRow> rows = List.of(
row("2024-01-05T12:00:00Z", 10.0, 0, "2024-01-06T00:00:00Z", null),
row("2024-01-05T12:17:00Z", 20.0, 0, "2024-01-06T00:01:00Z", null),
row("2024-01-05T12:45:00Z", 30.0, 0, "2024-01-06T00:02:00Z", null)
);
seedTimeSeries("ITPARPIRR", seriesId, rows, false, null);

List<TimeSeries.Record> expectedRows = fetchOracleRows(seriesId, "cfs", beginTime, endTime,
false, null);
TimeSeries actualResponse = fetchCdaRowsWithPageSize(seriesId, "cfs", beginTime, endTime,
1000, false, null, true, true);

assertEquals(expectedRows.size(), actualResponse.getTotal(), "total");
assertEquals(expectedRows.size(), actualResponse.getValues().size(), "values size");
assertEquals(Duration.ZERO, actualResponse.getInterval(), "interval");
assertEquals((long) Integer.MIN_VALUE, actualResponse.getIntervalOffset(), "interval offset");
for (int index = 0; index < expectedRows.size(); index++) {
assertRecordsEqual(expectedRows.get(index), actualResponse.getValues().get(index), index);
}
}

@Test
void dstWindowRegularReadMatchesRetrieveTs() throws Exception {
Instant dstStart = Instant.parse("2024-03-09T00:00:00Z");
Expand Down Expand Up @@ -420,8 +447,17 @@ private static void assertRecordsEqual(TimeSeries.Record expected, TimeSeries.Re

private static void seedTimeSeries(String locationId, String seriesId, List<SeedRow> rows,
boolean versioned) throws SQLException {
seedTimeSeries(locationId, seriesId, rows, versioned, 0);
}

private static void seedTimeSeries(String locationId, String seriesId, List<SeedRow> rows,
boolean versioned, Integer intervalOffset) throws SQLException {
createLocation(locationId, true, OFFICE);
createTimeseries(OFFICE, seriesId, 0);
if (intervalOffset != null) {
createTimeseries(OFFICE, seriesId, intervalOffset);
} else {
createTimeseries(OFFICE, seriesId);
}

CwmsDatabaseContainer<?> database = CwmsDataApiSetupCallback.getDatabaseLink();
database.connection(connection -> {
Expand Down Expand Up @@ -650,6 +686,14 @@ private static TimeSeries fetchCdaRowsWithPageSize(String seriesId, String units
Instant endTime, int pageSize, boolean includeEntryDate,
Instant versionDate, boolean trim)
throws Exception {
return fetchCdaRowsWithPageSize(seriesId, units, beginTime, endTime, pageSize, includeEntryDate,
versionDate, trim, null);
}

private static TimeSeries fetchCdaRowsWithPageSize(String seriesId, String units, Instant beginTime,
Instant endTime, int pageSize, boolean includeEntryDate,
Instant versionDate, boolean trim, Boolean lrtsFormatting)
throws Exception {
RequestSpecification request = given()
.log().ifValidationFails(LogDetail.ALL, true)
.accept(Formats.JSONV2)
Expand All @@ -661,6 +705,9 @@ private static TimeSeries fetchCdaRowsWithPageSize(String seriesId, String units
.queryParam(Controllers.PAGE_SIZE, pageSize)
.queryParam(Controllers.TRIM, trim)
.queryParam(Controllers.INCLUDE_ENTRY_DATE, includeEntryDate);
if (lrtsFormatting != null) {
request = request.header(ApiServlet.IS_NEW_LRTS, lrtsFormatting);
}
if (versionDate != null) {
request = request.queryParam(Controllers.VERSION_DATE, versionDate.toString());
}
Expand Down
Loading