-
Notifications
You must be signed in to change notification settings - Fork 24
Fix pseudo-irregular time series direct reads #1781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,8 @@ | |
| import java.math.BigDecimal; | ||
| import java.math.BigInteger; | ||
| import java.sql.Connection; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.ResultSet; | ||
| import java.sql.SQLException; | ||
| import java.sql.Timestamp; | ||
| import java.time.Duration; | ||
|
|
@@ -59,6 +61,7 @@ | |
| import java.time.format.DateTimeFormatter; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Calendar; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Date; | ||
|
|
@@ -67,6 +70,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
|
|
@@ -177,9 +181,18 @@ public class TimeSeriesDaoImpl extends JooqDao<TimeSeries> implements TimeSeries | |
| private final Histogram getRequestedTimeSeriesResultsReturnedHistogram; | ||
| @NotNull | ||
| private final Histogram getRequestedTimeSeriesRequestWindowMillisHistogram; | ||
| @NotNull | ||
| private final MetricRegistry metrics; | ||
| private final boolean forceOldLrtsFormatting; | ||
|
|
||
| public TimeSeriesDaoImpl(DSLContext dsl, @NotNull MetricRegistry metrics) { | ||
| this(dsl, metrics, false); | ||
| } | ||
|
|
||
| private TimeSeriesDaoImpl(DSLContext dsl, @NotNull MetricRegistry metrics, boolean forceOldLrtsFormatting) { | ||
| super(dsl); | ||
| this.metrics = metrics; | ||
| this.forceOldLrtsFormatting = forceOldLrtsFormatting; | ||
|
|
||
| String className = this.getClass().getName(); | ||
| CacheStats stats = isVersionedCache.stats(); | ||
|
|
@@ -669,6 +682,50 @@ private TimeSeries buildTimeSeriesFromMetadata(Record tsMetadata, @Nullable Inte | |
|
|
||
| private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize, | ||
| @NotNull TimeSeriesRequestParameters requestParameters) { | ||
| if (isPseudoIrregularOldStyleLocalRegularId(requestParameters)) { | ||
| return getRequestedTimeSeriesDirectWithOldLrtsFormatting(page, pageSize, requestParameters); | ||
| } | ||
| return getRequestedTimeSeriesDirectForSession(page, pageSize, requestParameters); | ||
| } | ||
|
|
||
| private TimeSeries getRequestedTimeSeriesDirectWithOldLrtsFormatting(String page, int pageSize, | ||
| @NotNull TimeSeriesRequestParameters | ||
| requestParameters) { | ||
| return connectionResult(dsl, conn -> { | ||
| DSLContext oldLrtsDsl = DSL.using(conn, SQLDialect.ORACLE18C); | ||
| setOldLrtsFormatting(oldLrtsDsl); | ||
| TimeSeriesDaoImpl oldLrtsDao = new TimeSeriesDaoImpl(oldLrtsDsl, metrics, true); | ||
| return oldLrtsDao.getRequestedTimeSeriesDirectForSession(page, pageSize, requestParameters); | ||
| }); | ||
| } | ||
|
|
||
| private boolean isPseudoIrregularOldStyleLocalRegularId(@NotNull TimeSeriesRequestParameters requestParameters) { | ||
| if (!isOldStyleLocalRegularId(requestParameters.getNames())) { | ||
| return false; | ||
| } | ||
| return connectionResult(dsl, conn -> { | ||
| DSLContext oldLrtsDsl = DSL.using(conn, SQLDialect.ORACLE18C); | ||
| setOldLrtsFormatting(oldLrtsDsl); | ||
| TimeSeriesDaoImpl oldLrtsDao = new TimeSeriesDaoImpl(oldLrtsDsl, metrics, true); | ||
| DirectReadMetadata metadata = oldLrtsDao.fetchRequestedTimeSeriesMetadataRecord(requestParameters); | ||
| return metadata != null && metadata.intervalUtcOffset == UTC_OFFSET_IRREGULAR; | ||
| }); | ||
| } | ||
|
|
||
| private static void setOldLrtsFormatting(DSLContext oldLrtsDsl) { | ||
| CWMS_UTIL_PACKAGE.call_SET_SESSION_INFO(oldLrtsDsl.configuration(), | ||
| SESSION_USE_LRTS_ID_FORMAT, formatBool(false), REQUIRE_OLD_LRTS_ID_FORMAT); | ||
| } | ||
|
|
||
| private static boolean isOldStyleLocalRegularId(String tsId) { | ||
| String[] parts = splitTimeSeriesId(tsId); | ||
| String intervalPart = getTimeSeriesIdPart(parts, 3); | ||
| return intervalPart != null && intervalPart.startsWith("~"); | ||
| } | ||
|
|
||
| private TimeSeries getRequestedTimeSeriesDirectForSession(String page, int pageSize, | ||
| @NotNull TimeSeriesRequestParameters | ||
| requestParameters) { | ||
| String names = requestParameters.getNames(); | ||
| String office = requestParameters.getOffice(); | ||
| String requestedUnits = requestParameters.getUnits(); | ||
|
|
@@ -679,6 +736,10 @@ private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize, | |
| String cursor = null; | ||
| Timestamp tsCursor = null; | ||
|
|
||
| if (forceOldLrtsFormatting) { | ||
| setOldLrtsFormatting(dsl); | ||
| } | ||
|
|
||
| if (page != null && !page.isEmpty()) { | ||
| final String[] parts = CwmsDTOPaginated.decodeCursor(page); | ||
|
|
||
|
|
@@ -727,10 +788,13 @@ private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize, | |
|
|
||
| // Pagination happens after regular-interval gap rows are merged | ||
| // fetch the full raw window first | ||
| if (forceOldLrtsFormatting) { | ||
| setOldLrtsFormatting(dsl); | ||
| } | ||
| List<TimeSeries.Record> rawRows = fetchRequestedTimeSeriesRows(tsCode, metadataOfficeId, nativeUnits, | ||
| metadataUnits, requestParameters, includeEntryDate); | ||
| long effectiveIntervalOffset = intervalOffset; | ||
| if (isRegularSeries(intervalMinutes, intervalPart)) { | ||
| if (isRegularSeries(intervalMinutes, intervalOffset, intervalPart, isLrts)) { | ||
| effectiveIntervalOffset = resolveIntervalOffset(intervalOffset, timeZoneId, intervalPart, isLrts, rawRows); | ||
| } | ||
|
|
||
|
|
@@ -747,7 +811,7 @@ private TimeSeries getRequestedTimeSeriesDirect(String page, int pageSize, | |
| beginTime, | ||
| endTime, | ||
| metadataUnits, | ||
| resolveIntervalDuration(intervalMinutes, intervalPart), | ||
| resolveIntervalDuration(intervalMinutes, intervalOffset, intervalPart, isLrts), | ||
| verticalDatumInfo, | ||
| effectiveIntervalOffset, | ||
| timeZoneId, | ||
|
|
@@ -843,6 +907,11 @@ private List<TimeSeries.Record> fetchRequestedTimeSeriesRows(long tsCode, String | |
| String requestedUnits, | ||
| TimeSeriesRequestParameters requestParameters, | ||
| boolean includeEntryDate) { | ||
| if (forceOldLrtsFormatting) { | ||
| return fetchRequestedTimeSeriesRowsWithJdbc(tsCode, officeId, nativeUnits, requestedUnits, | ||
| requestParameters, includeEntryDate); | ||
| } | ||
|
|
||
| ZonedDateTime beginTime = requestParameters.getBeginTime(); | ||
| ZonedDateTime endTime = requestParameters.getEndTime(); | ||
| ZonedDateTime versionDate = requestParameters.getVersionDate(); | ||
|
|
@@ -890,6 +959,103 @@ private List<TimeSeries.Record> fetchRequestedTimeSeriesRows(long tsCode, String | |
| }); | ||
| } | ||
|
|
||
| private List<TimeSeries.Record> fetchRequestedTimeSeriesRowsWithJdbc( | ||
| long tsCode, String officeId, String nativeUnits, String requestedUnits, | ||
| TimeSeriesRequestParameters requestParameters, boolean includeEntryDate) { | ||
| return connectionResult(dsl, conn -> { | ||
| setOldLrtsFormatting(DSL.using(conn, SQLDialect.ORACLE18C)); | ||
| ZonedDateTime versionDate = requestParameters.getVersionDate(); | ||
| String sql = versionDate != null | ||
| ? buildVersionedRowsSql(includeEntryDate) | ||
| : buildMaxVersionRowsSql(includeEntryDate); | ||
| try (PreparedStatement statement = conn.prepareStatement(sql)) { | ||
| bindDirectRowQuery(statement, tsCode, officeId, nativeUnits, requestedUnits, | ||
| requestParameters, versionDate); | ||
| try (ResultSet resultSet = statement.executeQuery()) { | ||
| List<TimeSeries.Record> rows = new ArrayList<>(); | ||
| Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); | ||
| while (resultSet.next()) { | ||
| Timestamp dateTime = resultSet.getTimestamp(DATE_TIME, utcCalendar); | ||
| Double value = resultSet.getDouble(VALUE); | ||
| if (resultSet.wasNull()) { | ||
| value = null; | ||
| } | ||
| int qualityCode = resultSet.getInt("quality_norm"); | ||
| Timestamp dataEntryDate = includeEntryDate | ||
| ? resultSet.getTimestamp(DATA_ENTRY_DATE, utcCalendar) | ||
| : null; | ||
| if (includeEntryDate) { | ||
| rows.add(new TimeSeries.Record(dateTime, value, qualityCode, dataEntryDate)); | ||
| } else { | ||
| rows.add(new TimeSeries.Record(dateTime, value, qualityCode)); | ||
| } | ||
| } | ||
| return rows; | ||
| } | ||
| } catch (SQLException ex) { | ||
| throw new DataAccessException("Unable to fetch direct time series rows", ex); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private static String buildVersionedRowsSql(boolean includeEntryDate) { | ||
| return "select date_time," | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can build and return a JOOQ query. keeps the type safety. NOTE: some of those elements will still need to be text. NOTE2: since we have to do things in a loop anyways to build the output, it will likely be faster to do the normalization of the quality_code in Java. Moving between plsql and sql does cause a context switch which has some performance concerns. NOTE3: av_tsv_dqu already handles the unit conversions (that's what the U stands for in dqu), you just put |
||
| + " cwms_20.cwms_util.convert_units(value, unit_id, ?) value," | ||
| + " cwms_20.cwms_ts.normalize_quality(nvl(cast(quality_code as number), 5)) quality_norm," | ||
| + (includeEntryDate ? " data_entry_date" : " cast(null as timestamp) data_entry_date") | ||
| + " from cwms_20.av_tsv_dqu" | ||
| + " where aliased_item is null" | ||
| + " and ts_code = ?" | ||
| + " and office_id = ?" | ||
| + " and lower(unit_id) = lower(?)" | ||
| + " and date_time >= ?" | ||
| + " and date_time <= ?" | ||
| + " and start_date <= ?" | ||
| + " and end_date > ?" | ||
| + " and version_date = ?" | ||
| + " order by date_time"; | ||
| } | ||
|
|
||
| private static String buildMaxVersionRowsSql(boolean includeEntryDate) { | ||
| return "select date_time, value, quality_norm, data_entry_date from (" | ||
| + " select date_time," | ||
| + " cwms_20.cwms_util.convert_units(value, unit_id, ?) value," | ||
| + " cwms_20.cwms_ts.normalize_quality(nvl(cast(quality_code as number), 5)) quality_norm," | ||
| + (includeEntryDate ? " data_entry_date" : " cast(null as timestamp) data_entry_date") | ||
| + ", row_number() over (partition by date_time order by version_date desc, data_entry_date desc)" | ||
| + " version_rank" | ||
| + " from cwms_20.av_tsv_dqu" | ||
| + " where aliased_item is null" | ||
| + " and ts_code = ?" | ||
| + " and office_id = ?" | ||
| + " and lower(unit_id) = lower(?)" | ||
| + " and date_time >= ?" | ||
| + " and date_time <= ?" | ||
| + " and start_date <= ?" | ||
| + " and end_date > ?" | ||
| + ") where version_rank = 1" | ||
| + " order by date_time"; | ||
| } | ||
|
|
||
| private static void bindDirectRowQuery(PreparedStatement statement, long tsCode, String officeId, | ||
| String nativeUnits, String requestedUnits, | ||
| TimeSeriesRequestParameters requestParameters, | ||
| ZonedDateTime versionDate) throws SQLException { | ||
| Timestamp beginTimestamp = Timestamp.from(requestParameters.getBeginTime().toInstant()); | ||
| Timestamp endTimestamp = Timestamp.from(requestParameters.getEndTime().toInstant()); | ||
| statement.setString(1, requestedUnits); | ||
| statement.setLong(2, tsCode); | ||
| statement.setString(3, officeId); | ||
| statement.setString(4, nativeUnits); | ||
| statement.setTimestamp(5, beginTimestamp); | ||
| statement.setTimestamp(6, endTimestamp); | ||
| statement.setTimestamp(7, endTimestamp); | ||
| statement.setTimestamp(8, beginTimestamp); | ||
| if (versionDate != null) { | ||
| statement.setTimestamp(9, Timestamp.from(versionDate.toInstant())); | ||
| } | ||
| } | ||
|
|
||
| private ResultQuery<Record4<Timestamp, Double, BigDecimal, Timestamp>> buildVersionedRowsQuery( | ||
| AV_TSV_DQU view, | ||
| Field<Double> value, | ||
|
|
@@ -951,7 +1117,7 @@ private List<Timestamp> fetchExpectedRegularTimes(long intervalMinutes, long int | |
| TimeSeriesRequestParameters requestParameters, | ||
| List<TimeSeries.Record> rawRows) { | ||
| boolean shouldTrim = requestParameters.isShouldTrim(); | ||
| if (!isRegularSeries(intervalMinutes, intervalPart)) { | ||
| if (!isRegularSeries(intervalMinutes, intervalOffset, intervalPart, isLrts)) { | ||
| return Collections.emptyList(); | ||
| } | ||
| // Trimmed requests collapse to the observed data window | ||
|
|
@@ -1029,11 +1195,17 @@ private long resolveIntervalOffset(long intervalOffset, String timeZoneId, | |
| return (rawRows.get(0).getDateTime().getTime() - topOfInterval.getTime()) / TimeUnit.MINUTES.toMillis(1); | ||
| } | ||
|
|
||
| private boolean isRegularSeries(long intervalMinutes, String intervalPart) { | ||
| return intervalMinutes != 0L || isLocalRegularInterval(intervalPart); | ||
| private boolean isRegularSeries(long intervalMinutes, long intervalOffset, String intervalPart, boolean isLrts) { | ||
| return intervalOffset != UTC_OFFSET_IRREGULAR | ||
| && (intervalMinutes != 0L || (isLrts && isLocalRegularInterval(intervalPart))); | ||
| } | ||
|
|
||
| private Duration resolveIntervalDuration(long intervalMinutes, String intervalPart) { | ||
| private Duration resolveIntervalDuration(long intervalMinutes, long intervalOffset, | ||
| String intervalPart, boolean isLrts) { | ||
| if (!isRegularSeries(intervalMinutes, intervalOffset, intervalPart, isLrts)) { | ||
| return Duration.ZERO; | ||
| } | ||
|
|
||
| if (intervalMinutes != 0L) { | ||
| return Duration.ofMinutes(intervalMinutes); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this cares about the LRTS formatting. There is a function that just tells you:
https://github.com/HydrologicEngineeringCenter/cwms-database/blob/648b849254db226bedd9d9a38e94628df28ec9d5/schema/src/cwms/cwms_ts_pkg.sql#L1064
use JOOQ to call (may already be a helper somewhere in CDA that caches the response.
Basic logic: