diff --git a/datastore/datastore/storagebackend/postgresql/common.go b/datastore/datastore/storagebackend/postgresql/common.go index dc402da1..795e5d17 100644 --- a/datastore/datastore/storagebackend/postgresql/common.go +++ b/datastore/datastore/storagebackend/postgresql/common.go @@ -36,8 +36,8 @@ func getTimeFilter(tspec common.TemporalSpec) string { return fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND ")) } -// createObsQueryVals creates from polygon, circle, filter, and tspec values used for querying -// observations. +// createObsQueryVals creates from polygon, circle, camslRange, filter, and tspec values used for +// querying observations. // // Values to be used for query placeholders are appended to phVals. // @@ -49,13 +49,13 @@ func getTimeFilter(tspec common.TemporalSpec) string { // - nil, // otherwise (..., ..., ..., ..., error). func createObsQueryVals( - polygon *datastore.Polygon, circle *datastore.Circle, filter map[string]*datastore.Strings, - tspec common.TemporalSpec, phVals *[]interface{}) ( + polygon *datastore.Polygon, circle *datastore.Circle, camslRange string, + filter map[string]*datastore.Strings, tspec common.TemporalSpec, phVals *[]interface{}) ( string, string, string, string, error) { timeFilter := getTimeFilter(tspec) - geoFilter, err := getGeoFilter(polygon, circle, phVals) + geoFilter, err := getGeoFilter(polygon, circle, camslRange, phVals) if err != nil { return "", "", "", "", fmt.Errorf("getGeoFilter() failed: %v", err) } @@ -70,8 +70,15 @@ func createObsQueryVals( } } - int64MdataFilter := getInt64MdataFilter(filter, phVals) - stringMdataFilter := getStringMdataFilter(filter, phVals) + int64MdataFilter, err := getInt64MdataFilter(filter, phVals) + if err != nil { + return "", "", "", "", fmt.Errorf("getInt64MdataFilter() failed: %v", err) + } + + stringMdataFilter, err := getStringMdataFilter(filter, phVals) + if err != nil { + return "", "", "", "", fmt.Errorf("getStringMdataFilter() failed: %v", err) + } // --- END filters for reflectable metadata (of type int64 or string) ------------- diff --git a/datastore/datastore/storagebackend/postgresql/getlocations.go b/datastore/datastore/storagebackend/postgresql/getlocations.go index b9ff677e..2111211e 100644 --- a/datastore/datastore/storagebackend/postgresql/getlocations.go +++ b/datastore/datastore/storagebackend/postgresql/getlocations.go @@ -26,10 +26,9 @@ func getLocs( // get values needed for query phVals := []interface{}{} // placeholder values - timeFilter, geoFilter, int64MdataFilter, stringMdataFilter, - err := createObsQueryVals( - request.GetSpatialPolygon(), request.GetSpatialCircle(), request.GetFilter(), tspec, - &phVals) + timeFilter, geoFilter, int64MdataFilter, stringMdataFilter, err := createObsQueryVals( + request.GetSpatialPolygon(), request.GetSpatialCircle(), request.GetCamslRange(), + request.GetFilter(), tspec, &phVals) if err != nil { return nil, fmt.Errorf("createLocsQueryVals() failed: %v", err) } diff --git a/datastore/datastore/storagebackend/postgresql/getobservations.go b/datastore/datastore/storagebackend/postgresql/getobservations.go index 43527b35..56258da8 100644 --- a/datastore/datastore/storagebackend/postgresql/getobservations.go +++ b/datastore/datastore/storagebackend/postgresql/getobservations.go @@ -209,8 +209,8 @@ type filterInfo struct { // TODO: add filter info for non-reflectable types -// getPolygonFilter derives the expression used in a WHERE clause for selecting only -// points inside a polygon. +// getPolygonFilter derives the expression used in a WHERE clause for selecting only points inside +// a polygon. // // Values to be used for query placeholders are appended to phVals. // @@ -253,8 +253,8 @@ func getPolygonFilter(polygon *datastore.Polygon, phVals *[]interface{}) (string return whereExpr, nil } -// getCircleFilter derives the expression used in a WHERE clause for selecting only -// points inside a circle. +// getCircleFilter derives the expression used in a WHERE clause for selecting only points inside +// a circle. // // Values to be used for query placeholders are appended to phVals. // @@ -291,14 +291,34 @@ func getCircleFilter(circle *datastore.Circle, phVals *[]interface{}) (string, e return whereExpr, nil } -// getGeoFilter derives from polygon and circle the expression used in a WHERE clause for keeping -// observations inside both of these areas (i.e. in their intersection). +// getCamslRangeFilter derives the expression used in a WHERE clause for selecting only points +// within a camsl range. +// +// Values to be used for query placeholders are appended to phVals. +// +// Returns (expression, nil) upon success, otherwise (..., error). +func getCamslRangeFilter(camslRange string, phVals *[]interface{}) (string, error) { + + whereExpr := []string{} + + addWhereCondMatchAnyPatternForInt64("camsl", []string{camslRange}, &whereExpr, phVals, false) + + if len(whereExpr) == 0 { + return "TRUE", nil + } + // assert len(whereExpr) == 1 + return whereExpr[0], nil +} + +// getGeoFilter derives from polygon, circle, and camslRange the expression used in a WHERE clause +// for keeping observations inside these areas/ranges (i.e. in their intersection). // // Values to be used for query placeholders are appended to phVals. // // Returns (expression, nil) upon success, otherwise (..., error). func getGeoFilter( - polygon *datastore.Polygon, circle *datastore.Circle, phVals *[]interface{}) (string, error) { + polygon *datastore.Polygon, circle *datastore.Circle, + camslRange string, phVals *[]interface{}) (string, error) { var err error @@ -312,7 +332,12 @@ func getGeoFilter( return "", fmt.Errorf("getCircleFilter() failed: %v", err) } - return fmt.Sprintf("(%s) AND (%s)", polygonExpr, circleExpr), nil + camslRangeExpr, err := getCamslRangeFilter(camslRange, phVals) + if err != nil { + return "", fmt.Errorf("getCamslRangeFilter() failed: %v", err) + } + + return fmt.Sprintf("(%s) AND (%s) AND (%s)", polygonExpr, circleExpr, camslRangeExpr), nil } // scanObsRow scans all columns from the current result row in rows and converts to an ObsMetadata @@ -320,20 +345,22 @@ func getGeoFilter( // Returns (ObsMetadata object, time series ID, nil) upon success, otherwise (..., ..., error). func scanObsRow(rows *sql.Rows) (*datastore.ObsMetadata, int64, error) { var ( - tsID int64 - obsTimeInstant0 time.Time - pubTime0 sql.NullTime - value sql.NullString - point postgis.PointS + tsID int64 + obsTimeInstant time.Time + pubTime sql.NullTime + value sql.NullString + point postgis.PointS + camsl sql.NullInt32 ) // initialize colValPtrs with non-reflectable metadata colValPtrs := []interface{}{ &tsID, - &obsTimeInstant0, - &pubTime0, + &obsTimeInstant, + &pubTime, &value, &point, + &camsl, } // extend colValPtrs with reflectable metadata of type int64 @@ -362,12 +389,15 @@ func scanObsRow(rows *sql.Rows) (*datastore.ObsMetadata, int64, error) { }, }, Obstime: &datastore.ObsMetadata_ObstimeInstant{ - ObstimeInstant: timestamppb.New(obsTimeInstant0), + ObstimeInstant: timestamppb.New(obsTimeInstant), }, Value: value.String, } - if pubTime0.Valid { - obsMdata.Pubtime = timestamppb.New(pubTime0.Time) + if pubTime.Valid { + obsMdata.Pubtime = timestamppb.New(pubTime.Time) + } + if camsl.Valid { + obsMdata.Camsl = &camsl.Int32 } var err error @@ -444,8 +474,8 @@ func getObs( // get values needed for query phVals := []interface{}{} // placeholder values timeFilter, geoFilter, int64MdataFilter, stringMdataFilter, err := createObsQueryVals( - request.GetSpatialPolygon(), request.GetSpatialCircle(), request.GetFilter(), tspec, - &phVals) + request.GetSpatialPolygon(), request.GetSpatialCircle(), request.GetCamslRange(), + request.GetFilter(), tspec, &phVals) if err != nil { return nil, fmt.Errorf("createObsQueryVals() failed: %v", err) } @@ -471,6 +501,7 @@ func getObs( %s, point, %s, + %s, %s FROM observation JOIN time_series on observation.ts_id = time_series.id @@ -481,6 +512,7 @@ func getObs( distinctSpec, convertSelectCol(incFields, "pubtime", "observation."), convertSelectCol(incFields, "value", "observation."), + convertSelectCol(incFields, "camsl", "observation."), strings.Join(convOI64MC, ","), strings.Join(convOSMC, ","), timeFilter, diff --git a/datastore/datastore/storagebackend/postgresql/init.go b/datastore/datastore/storagebackend/postgresql/init.go index 98285e8c..e85b22a3 100644 --- a/datastore/datastore/storagebackend/postgresql/init.go +++ b/datastore/datastore/storagebackend/postgresql/init.go @@ -267,6 +267,7 @@ func init() { // automatically called once on program startup (on first import o supIncRespFields.Set(strings.TrimPrefix(f, "observation.")) } supIncRespFields.Set("geo_point") + supIncRespFields.Set("camsl") supIncRespFields.Set("obstime_instant") supIncRespFields.Set("pubtime") supIncRespFields.Set("value") diff --git a/datastore/datastore/storagebackend/postgresql/postgresql.go b/datastore/datastore/storagebackend/postgresql/postgresql.go index 0ae702ff..7aeea3c0 100644 --- a/datastore/datastore/storagebackend/postgresql/postgresql.go +++ b/datastore/datastore/storagebackend/postgresql/postgresql.go @@ -5,12 +5,13 @@ import ( "datastore/common" "datastore/datastore" "fmt" - _ "github.com/lib/pq" "log" "regexp" "strconv" "strings" "time" + + _ "github.com/lib/pq" ) // PostgreSQL is an implementation of the StorageBackend interface that @@ -192,20 +193,6 @@ func getTSColNamesUniqueCompl() []string { return result } -// createPlaceholders returns the list of n placeholder strings for -// values in a parameterized query, e.g. $1, to_timestamp($2), ..., $n. -// Items in formats must be strings containing exactly one "$%d" pattern, -// e.g. "$%d", "to_timestamp($%d)" etc. -func createPlaceholders(formats []string) []string { - phs := []string{} - for i, format := range formats { - index := i + 1 - ph := fmt.Sprintf(format, index) - phs = append(phs, ph) - } - return phs -} - // createSetFilter creates expression used in a WHERE clause for testing // if the value in column colName is included in a set of string values. // The filter is fully closed (--> return FALSE) if the set non-nil but empty. @@ -220,16 +207,24 @@ func createSetFilter(colName string, vals []string) string { // addWhereCondMatchAnyPatternForInt64 appends to whereExpr an expression of the form // "(cond1 OR cond2 OR ... OR condN)" where condi tests if the ith pattern in patterns matches -// colName assumed to be of type int64/BIGINT. A pattern of the form / generates -// a range filter directly on the int type where the from and to values are appended to phVals. -// Otherwise the function generates an expression where the pattern is matched agains a text-version -// of the int type in a case-insensitive way. In this case an asterisk in a pattern matches zero or -// more arbitrary characters, and patterns with '*' replaced with '%' are appended to phVals. +// colName assumed to be of integer type. A pattern of the form lo/hi, ../hi, lo/.., or ../.. +// generates a range filter directly on the int type where the integer lo and hi values are +// appended to phVals as appropriate, and the function returns nil. +// +// If none of the four patterns matched, there are two cases: +// +// Case 1: allowStringMatchFallback is true => the function generates an expression where the +// pattern is matched against a text-version of the int type in a case-insensitive way. In this +// case an asterisk in a pattern matches zero or more arbitrary characters, patterns with '*' +// replaced with '%' are appended to phVals, and the function returns nil. +// +// Case 2: allowStringMatchFallback is false => the function returns error. func addWhereCondMatchAnyPatternForInt64( - colName string, patterns []string, whereExpr *[]string, phVals *[]interface{}) { + colName string, patterns []string, whereExpr *[]string, phVals *[]interface{}, + allowStringMatchFallback bool) error { if (patterns == nil) || (len(patterns) == 0) { - return // nothing to do + return nil // nothing to do } // getInt64RangeBoth checks if ptn is of the form '/', in which case @@ -319,15 +314,20 @@ func addWhereCondMatchAnyPatternForInt64( // disable int range filtering, but note that we still don't want to fall // back to regular string matching! whereExprOR = append(whereExprOR, "TRUE") - } else { // fall back to regular string matching + } else if allowStringMatchFallback { // fall back to regular string matching index++ expr := fmt.Sprintf("(lower(%s::text) LIKE lower($%d))", colName, index) whereExprOR = append(whereExprOR, expr) *phVals = append(*phVals, strings.ReplaceAll(ptn, "*", "%")) + } else { + return fmt.Errorf( + "invalid int range pattern: %s; must be one of lo/hi, ../hi, lo/.., or ../..", ptn) } } *whereExpr = append(*whereExpr, fmt.Sprintf("(%s)", strings.Join(whereExprOR, " OR "))) + + return nil } // addWhereCondMatchAnyPatternForString appends to whereExpr an expression of the form @@ -336,10 +336,10 @@ func addWhereCondMatchAnyPatternForInt64( // pattern matches zero or more arbitrary characters. The patterns with '*' replaced with '%' are // appended to phVals. func addWhereCondMatchAnyPatternForString( - colName string, patterns []string, whereExpr *[]string, phVals *[]interface{}) { + colName string, patterns []string, whereExpr *[]string, phVals *[]interface{}, _ bool) error { if (patterns == nil) || (len(patterns) == 0) { - return + return nil } whereExprOR := []string{} @@ -353,6 +353,8 @@ func addWhereCondMatchAnyPatternForString( } *whereExpr = append(*whereExpr, fmt.Sprintf("(%s)", strings.Join(whereExprOR, " OR "))) + + return nil } // getInt64MdataFilterFromFilterInfos derives from filterInfos the expression used in a WHERE @@ -369,15 +371,20 @@ func addWhereCondMatchAnyPatternForString( // // Values to be used for query placeholders are appended to phVals. // -// Returns expression. +// Returns (expression, nil) on success, otherwise (..., error). func getMdataFilterFromFilterInfos( filterInfos []filterInfo, phVals *[]interface{}, - whereExprGenerator func(string, []string, *[]string, *[]interface{})) string { + whereExprGenerator func(string, []string, *[]string, *[]interface{}, bool) error, + allowStringMatchFallback bool) (string, error) { whereExprAND := []string{} for _, sfi := range filterInfos { - whereExprGenerator(sfi.colName, sfi.patterns, &whereExprAND, phVals) + if err := whereExprGenerator( + sfi.colName, sfi.patterns, &whereExprAND, phVals, + allowStringMatchFallback); err != nil { + return "", fmt.Errorf("whereExprGenerator() failed: %v", err) + } } whereExpr := "TRUE" // by default, don't filter @@ -385,7 +392,7 @@ func getMdataFilterFromFilterInfos( whereExpr = fmt.Sprintf("(%s)", strings.Join(whereExprAND, " AND ")) } - return whereExpr + return whereExpr, nil } // getMdataFilter creates from 'filter' the metadata filter used for querying observations or @@ -394,11 +401,13 @@ func getMdataFilterFromFilterInfos( // pbType2table defines field->table mapping for the type in question. // whereExprGenerator defines the expression at the lowest level for the type in question. // -// Returns a metadata filter for a 'WHERE ... AND ...' clause (possibly just 'TRUE'). +// Returns ((a metadata filter for a 'WHERE ... AND ...' clause (possibly just 'TRUE')), nil) +// on success, otherwise (..., error). func getMdataFilter( filter map[string]*datastore.Strings, phVals *[]interface{}, pbType2table map[string]string, - whereExprGenerator func(string, []string, *[]string, *[]interface{})) string { + whereExprGenerator func(string, []string, *[]string, *[]interface{}, bool) error, + allowStringMatchFallback bool) (string, error) { filterInfos := []filterInfo{} @@ -415,17 +424,26 @@ func getMdataFilter( } } - return getMdataFilterFromFilterInfos(filterInfos, phVals, whereExprGenerator) + whereExpr, err := getMdataFilterFromFilterInfos( + filterInfos, phVals, whereExprGenerator, allowStringMatchFallback) + if err != nil { + return "", fmt.Errorf("getMdataFilterFromFilterInfos() failed: %v", err) + } + return whereExpr, nil } // getInt64MdataFilter is a convenience wrapper around getMdataFilter for type int64. -func getInt64MdataFilter(filter map[string]*datastore.Strings, phVals *[]interface{}) string { - return getMdataFilter(filter, phVals, pbInt642table, addWhereCondMatchAnyPatternForInt64) +func getInt64MdataFilter( + filter map[string]*datastore.Strings, phVals *[]interface{}) (string, error) { + return getMdataFilter( + filter, phVals, pbInt642table, addWhereCondMatchAnyPatternForInt64, true) } // getStringMdataFilter is a convenience wrapper around getMdataFilter for type string. -func getStringMdataFilter(filter map[string]*datastore.Strings, phVals *[]interface{}) string { - return getMdataFilter(filter, phVals, pbString2table, addWhereCondMatchAnyPatternForString) +func getStringMdataFilter( + filter map[string]*datastore.Strings, phVals *[]interface{}) (string, error) { + return getMdataFilter( + filter, phVals, pbString2table, addWhereCondMatchAnyPatternForString, true) } // cleanup performs various cleanup tasks, like removing old observations from the database. diff --git a/datastore/datastore/storagebackend/postgresql/putobservations.examples.sql b/datastore/datastore/storagebackend/postgresql/putobservations.examples.sql index 1582e581..ffcc92e4 100644 --- a/datastore/datastore/storagebackend/postgresql/putobservations.examples.sql +++ b/datastore/datastore/storagebackend/postgresql/putobservations.examples.sql @@ -124,7 +124,6 @@ INSERT INTO observation (ts_id, history, processing_level, quality_code, - camsl, value) VALUES ($1, to_timestamp($2), @@ -135,8 +134,7 @@ VALUES ($1, $7, $8, $9, - $10, - $11), + $10), ($12, to_timestamp($13), $14, @@ -146,8 +144,7 @@ VALUES ($1, $18, $19, $20, - $21, - $22) + $21) ON CONFLICT ON CONSTRAINT observation_pkey DO UPDATE SET id = EXCLUDED.id, geo_point_id = EXCLUDED.geo_point_id, @@ -156,7 +153,6 @@ ON CONFLICT ON CONSTRAINT observation_pkey DO UPDATE history = EXCLUDED.history, processing_level = EXCLUDED.processing_level, quality_code = EXCLUDED.quality_code, - camsl = EXCLUDED.camsl, value = EXCLUDED.value WHERE observation.id IS DISTINCT FROM EXCLUDED.id OR observation.geo_point_id IS DISTINCT FROM EXCLUDED.geo_point_id @@ -165,4 +161,3 @@ WHERE observation.id IS DISTINCT FROM EXCLUDED.id OR observation.history IS DISTINCT FROM EXCLUDED.history OR observation.processing_level IS DISTINCT FROM EXCLUDED.processing_level OR observation.quality_code IS DISTINCT FROM EXCLUDED.quality_code - OR observation.camsl IS DISTINCT FROM EXCLUDED.camsl diff --git a/datastore/datastore/storagebackend/postgresql/putobservations.go b/datastore/datastore/storagebackend/postgresql/putobservations.go index 5d7c1c31..0c46bea2 100644 --- a/datastore/datastore/storagebackend/postgresql/putobservations.go +++ b/datastore/datastore/storagebackend/postgresql/putobservations.go @@ -5,12 +5,13 @@ import ( "datastore/common" "datastore/datastore" "fmt" - "github.com/cridenour/go-postgis" "log" "reflect" "slices" "strings" + "github.com/cridenour/go-postgis" + "github.com/lib/pq" "google.golang.org/grpc/codes" "google.golang.org/protobuf/types/known/timestamppb" @@ -294,7 +295,7 @@ func upsertTSs( log.Printf("In upsertTSs(): concurrency issue detected: 'len(tsIDmap)=%v', 'len(mapTScolVals)=%v', "+ "retrying db query...", len(tsIDmap), len(mapTScolVals)) } - return nil, fmt.Errorf("upsertTSs() failed: still concurrency issues afer 3 retries") + return nil, fmt.Errorf("still concurrency issues after 3 retries") } // getObsTime extracts the obs time from obsMdata. @@ -324,8 +325,9 @@ func getObsTime(obsMdata *datastore.ObsMetadata) (*timestamppb.Timestamp, error) // --- END a variant of getObsTime that also supports intervals --------------------------------- type GeoPoint struct { - lon float64 - lat float64 + lon float64 + lat float64 + camsl *int32 } // getGeoPointIDs returns a map of GeoPoint to ID of the row in table geo_point that matches point, @@ -337,21 +339,23 @@ func getGeoPointIDs(db *sql.DB, observations []*datastore.Metadata1) (map[GeoPoi phVals := []interface{}{} // Collect all unique points - // This looks like premature optimisation... but it is not. Postgres will throw error on duplicates in the INSERT + // This looks like premature optimisation... but it is not. Postgres will throw error on + // duplicates in the INSERT uniquePoints := map[GeoPoint]bool{} for _, obs := range observations { point := obs.GetObsMdata().GetGeoPoint() - uniquePoints[GeoPoint{point.Lon, point.Lat}] = true + uniquePoints[GeoPoint{point.Lon, point.Lat, obs.GetObsMdata().Camsl}] = true } index := 0 // Loop over unique points for point := range uniquePoints { - valsExpr0 := fmt.Sprintf(`(ST_MakePoint($%d, $%d)::geography)`, + valsExpr0 := fmt.Sprintf(`(ST_MakePoint($%d, $%d)::geography, $%d::integer)`, index+1, index+2, + index+3, ) - phVals0 := []interface{}{point.lon, point.lat} + phVals0 := []interface{}{point.lon, point.lat, point.camsl} valsExpr = append(valsExpr, valsExpr0) phVals = append(phVals, phVals0...) @@ -366,21 +370,21 @@ func getGeoPointIDs(db *sql.DB, observations []*datastore.Metadata1) (map[GeoPoi cmd := fmt.Sprintf(` WITH input_rows AS ( SELECT * FROM ( - (SELECT point FROM geo_point LIMIT 0) -- only copies column names and types + (SELECT point, camsl FROM geo_point LIMIT 0) -- only copies column names and types UNION ALL VALUES %s - ) t ORDER BY point -- ORDER BY for consistent order to avoid deadlocks + ) t ORDER BY point, camsl -- ORDER BY for consistent order to avoid deadlocks ) , ins AS ( - INSERT INTO geo_point (point) + INSERT INTO geo_point (point, camsl) SELECT * FROM input_rows - ON CONFLICT (point) DO NOTHING - RETURNING id, point + ON CONFLICT (point, camsl) DO NOTHING + RETURNING id, point, camsl ) - SELECT id, point FROM ins + SELECT id, point, camsl FROM ins UNION - SELECT c.id, point FROM input_rows - JOIN geo_point c USING (point) + SELECT c.id, point, camsl FROM input_rows + JOIN geo_point c USING (point, camsl) `, strings.Join(valsExpr, ",")) for range 3 { // try at most 3 times @@ -395,15 +399,20 @@ func getGeoPointIDs(db *sql.DB, observations []*datastore.Metadata1) (map[GeoPoi log.Printf("db.Query() failed: HINT: %v", e.Hint) } } - return nil, fmt.Errorf("tx.Query() failed: %v", err) + return nil, fmt.Errorf("db.Query() failed: %v", err) } gpIDmap := map[GeoPoint]int64{} var id int64 - var p postgis.PointS + var point postgis.PointS + var camsl0 sql.NullInt32 for rows.Next() { - rows.Scan(&id, &p) - gpIDmap[GeoPoint{p.X, p.Y}] = id + rows.Scan(&id, &point, &camsl0) + var camsl *int32 // default nil + if camsl0.Valid { + camsl = &camsl0.Int32 + } + gpIDmap[GeoPoint{point.X, point.Y, camsl}] = id } // Under concurrent load, if another process is adding the same entry, in which case this transaction @@ -418,12 +427,11 @@ func getGeoPointIDs(db *sql.DB, observations []*datastore.Metadata1) (map[GeoPoi log.Printf("In getGeoPointIDs(): concurrency issue detected: 'len(gpIDmap)=%v', 'len(uniquePoints)=%v', "+ "retrying db query...", len(gpIDmap), len(uniquePoints)) } - return nil, fmt.Errorf("getGeoPointIDs() failed: still concurrency issues afer 3 retries") + return nil, fmt.Errorf("still concurrency issues after 3 retries") } // createInsertVals generates from (tsID, obsTimes, gpIDs, and omds) two arrays: -// - in valsExpr: the list of arguments to the VALUES clause in the SQL INSERT -// statement, and +// - in valsExpr: the list of arguments to the VALUES clause in the SQL INSERT statement, and // - in phVals: the total, flat list of all placeholder values. func createInsertVals( tsInfos map[int64]tsInfo, valsExpr *[]string, phVals *[]interface{}) { @@ -433,7 +441,7 @@ func createInsertVals( obsTimes := tsInfo.obsTimes omds := tsInfo.omds gpIDs := tsInfo.gpIDs - for i := 0; i < len(*obsTimes); i++ { + for i := range *obsTimes { valsExpr0 := fmt.Sprintf(`( $%d, to_timestamp($%d), @@ -444,7 +452,6 @@ func createInsertVals( $%d, $%d, $%d, - $%d, $%d )`, index+1, @@ -457,7 +464,6 @@ func createInsertVals( index+8, index+9, index+10, - index+11, ) phVals0 := []interface{}{ @@ -470,7 +476,6 @@ func createInsertVals( (*omds)[i].GetHistory(), (*omds)[i].GetProcessingLevel(), (*omds)[i].GetQualityCode(), - (*omds)[i].GetCamsl(), (*omds)[i].GetValue(), } @@ -520,7 +525,6 @@ func upsertObs( history, processing_level, quality_code, - camsl, value) VALUES %s ON CONFLICT ON CONSTRAINT observation_pkey DO UPDATE @@ -532,7 +536,6 @@ func upsertObs( history = EXCLUDED.history, processing_level = EXCLUDED.processing_level, quality_code = EXCLUDED.quality_code, - camsl = EXCLUDED.camsl, value = EXCLUDED.value WHERE observation.id IS DISTINCT FROM EXCLUDED.id OR @@ -541,8 +544,7 @@ func upsertObs( observation.data_id IS DISTINCT FROM EXCLUDED.data_id OR observation.history IS DISTINCT FROM EXCLUDED.history OR observation.processing_level IS DISTINCT FROM EXCLUDED.processing_level OR - observation.quality_code IS DISTINCT FROM EXCLUDED.quality_code OR - observation.camsl IS DISTINCT FROM EXCLUDED.camsl + observation.quality_code IS DISTINCT FROM EXCLUDED.quality_code `, strings.Join(valsExpr, ",")) _, err := db.Exec(cmd, phVals...) @@ -627,7 +629,7 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) (codes. tsID := tsIDMap[key] point := obs.GetObsMdata().GetGeoPoint() - gpID := gpIDMap[GeoPoint{point.GetLon(), point.GetLat()}] + gpID := gpIDMap[GeoPoint{point.GetLon(), point.GetLat(), obs.GetObsMdata().Camsl}] var obsTimes []*timestamppb.Timestamp var gpIDs []int64 diff --git a/datastore/migrate/data/migrations/1747651104_camsl.down.sql b/datastore/migrate/data/migrations/1747651104_camsl.down.sql index ea394cda..46b08969 100644 --- a/datastore/migrate/data/migrations/1747651104_camsl.down.sql +++ b/datastore/migrate/data/migrations/1747651104_camsl.down.sql @@ -1 +1,3 @@ -ALTER TABLE observation DROP COLUMN camsl; +ALTER TABLE geo_point DROP CONSTRAINT geo_point_point_camsl_key; +ALTER TABLE geo_point ADD CONSTRAINT geo_point_point_key UNIQUE (point); +ALTER TABLE geo_point DROP COLUMN camsl; diff --git a/datastore/migrate/data/migrations/1747651104_camsl.up.sql b/datastore/migrate/data/migrations/1747651104_camsl.up.sql index 21cef76d..cb3ee8ae 100644 --- a/datastore/migrate/data/migrations/1747651104_camsl.up.sql +++ b/datastore/migrate/data/migrations/1747651104_camsl.up.sql @@ -1 +1,9 @@ -ALTER TABLE observation ADD COLUMN camsl BIGINT; +ALTER TABLE geo_point ADD COLUMN camsl INTEGER; + +-- drop UNIQUE constraint of 'point' column +-- WARNING: we assume that the constraint name is the correct one (it was never explicitly set) +ALTER TABLE geo_point DROP CONSTRAINT geo_point_point_key; + +-- and instead define a UNIQUE constraint on the combination of the 'point' and 'camsl' columns +-- (note how we name the new constraint explicitly this time!) +ALTER TABLE geo_point ADD CONSTRAINT geo_point_point_camsl_key UNIQUE NULLS NOT DISTINCT (point, camsl); diff --git a/protobuf/datastore.proto b/protobuf/datastore.proto index 2011812b..18db1b6e 100644 --- a/protobuf/datastore.proto +++ b/protobuf/datastore.proto @@ -136,25 +136,25 @@ message ObsMetadata { Point geo_point = 1 [json_name = "geo_point"]; Polygon geo_polygon = 2 [json_name = "geo_polygon"]; } - oneof obstime { - google.protobuf.Timestamp obstime_instant = 3 [json_name = "obstime_instant"]; - //TimeInterval obstime_interval = 4 [json_name = "obstime_interval"]; -- unsupported for now - } - google.protobuf.Timestamp pubtime = 5; - // --- END non-reflectable metadata ----------------- - - // --- BEGIN reflectable metadata (of type int64 or string) - string id = 6; - string data_id = 7 [json_name = "data_id"]; - string history = 8; - string processing_level = 9 [json_name = "processing_level"]; - int64 quality_code = 10 [json_name = "quality_code"]; - int64 camsl = 11; // centimeters above mean sea level + optional int32 camsl = 3; // centimeters above mean sea level // Case 1 - stationary data: camsl of the station ground position // Case 2 - mobile data: camsl of the sensor itself // NOTE: in Case 1, the absolute vertical location of the sensor would be camsl + level where // level is the height above ground (hag) of the sensor. (Typical values for level are the WMO // standard 2m for temperature and 10m for wind.) + oneof obstime { + google.protobuf.Timestamp obstime_instant = 4 [json_name = "obstime_instant"]; + //TimeInterval obstime_interval = 5 [json_name = "obstime_interval"]; -- unsupported for now + } + google.protobuf.Timestamp pubtime = 6; + // --- END non-reflectable metadata ----------------- + + // --- BEGIN reflectable metadata (of type int64 or string) + string id = 7; + string data_id = 8 [json_name = "data_id"]; + string history = 9; + string processing_level = 10 [json_name = "processing_level"]; + int64 quality_code = 11 [json_name = "quality_code"]; // --- END reflectable metadata ----------------- string value = 12; // obs value (not metadata in a strict sense) @@ -207,9 +207,25 @@ message GetObsRequest { // --- END temporal spec ---------------------------------- - // spatial filter + // --- BEGIN spatial filter --------------------------------------------- Polygon spatial_polygon = 3; // if specified, only observations in this polygon may be returned Circle spatial_circle = 4; // if specified, only observations in this circle may be returned + string camsl_range = 5; // if specified, only observations within this CAMSL (centimeters above + // mean sea level) range may be returned. + // Syntax: + // (NOTE: lo and hi are both int32) + // Case 1 - 'lo/hi': matches all V where lo <= V <= hi. + // Examples that match V = 10: '10/10', '-10/10', '9/10', '10/11'. + // + // Case 2 - '../hi': matches all V where V <= hi. + // Examples that match V = 10: '../10' and '../11'. + // + // Case 3 - 'lo/..': matches all V where lo <= V. + // Examples that match V = 10: '10/..', '9/..'. + // + // Case 4 - '../..': matches any V. NOTE: this is the default behavior and is thus never required + // to be explicitly specified (it is supported for completeness/consistency). + // --- END spatial filter --------------------------------------------- // search wrt. TSMetadata.links // TODO - needs special handling @@ -257,12 +273,12 @@ message GetObsRequest { // default behavior and is thus never required to be explicitly specified (it is // supported for completeness/consistency). // - // Case 2.5: If P is not any of the integer interval forms of Case 2.1..4, then the value + // Case 2.5: If P is none of the integer interval forms of Case 2.1..4, then the value // V of F is converted to a string and matched against P just like in Case 1. // Example: for F='level', V='10', the following values for P will match: // '10', '1*', '*0', '*'. // - map filter = 5; + map filter = 6; // --- END filter for reflectable metadata of type int64 or string ------------------------- @@ -274,12 +290,13 @@ message GetObsRequest { // 'history') // * 'links' (non-reflectable field in TSMedatata) // * 'geo_point' (non-reflectable field in ObsMetadata) + // * 'camsl' (--- '' ---) // * 'obstime_instant' (--- '' ---) // * 'pubtime' (--- '' ---) // * 'value' (non-reflection handleable string field in ObsMetadata) - repeated string included_response_fields = 6; + repeated string included_response_fields = 7; - // repeated string excluded_response_fields = 7; // TODO + // repeated string excluded_response_fields = 8; // TODO } message GetObsResponse { @@ -334,7 +351,8 @@ message GetLocsRequest { TimeInterval temporal_interval = 1; Polygon spatial_polygon = 2; Circle spatial_circle = 3; - map filter = 4; + string camsl_range = 4; + map filter = 5; } message LocMetadata {