Skip to content

Commit

Permalink
GetTriples support for StatVar Observation node (#482)
Browse files Browse the repository at this point in the history
* GetTriples support for StatVar Observation node

* fix lint

* resolve comments

* lint
  • Loading branch information
shifucun authored Mar 24, 2021
1 parent 13274d1 commit daf8a94
Show file tree
Hide file tree
Showing 4 changed files with 997 additions and 82 deletions.
205 changes: 162 additions & 43 deletions internal/server/triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package server
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"

Expand All @@ -33,11 +35,162 @@ const (
obsAncestorTypeComparedNode = "1"
)

type prop struct {
name string
isObj bool
}

var obsProps = []prop{
{"observationAbout", true},
{"variableMeasured", true},
{"value", false},
{"observationDate", false},
{"observationPeriod", false},
{"measurementMethod", true},
{"unit", true},
{"scalingFactor", false},
{"samplePopulation", true},
{"location", true},
}

func getObsTriplesSvObs(
ctx context.Context, s *Server, obsDcids []string) (map[string][]*Triple, error) {
dcidList := ""
for _, dcid := range obsDcids {
dcidList += fmt.Sprintf("\"%s\" ", dcid)
}
selectStatment := "SELECT ?o ?provenance "
tripleStatment := "?o typeOf StatVarObservation . ?o provenance ?provenance . "
for _, prop := range obsProps {
selectStatment += fmt.Sprintf("?%s ", prop.name)
tripleStatment += fmt.Sprintf("?o %s ?%s . ", prop.name, prop.name)
}
tripleStatment += fmt.Sprintf("?o dcid (%s)", dcidList)
sparql := fmt.Sprintf(
`%s
WHERE {
%s
}
`, selectStatment, tripleStatment,
)
resp, err := s.Query(ctx, &pb.QueryRequest{Sparql: sparql})
if err != nil {
return nil, err
}
result := map[string][]*Triple{}
for _, row := range resp.GetRows() {
dcid := row.GetCells()[0].Value
prov := row.GetCells()[1].Value
objDcids := []string{}
objTriples := map[string]*Triple{}
for i, prop := range obsProps {
objCell := row.GetCells()[i+2].Value
if objCell != "" {
if prop.isObj {
// The object is a node; need to fetch the name.
objDcid := objCell
objDcids = append(objDcids, objDcid)
objTriples[objDcid] = &Triple{
SubjectID: dcid,
Predicate: prop.name,
ObjectID: objDcid,
ProvenanceID: prov,
}
} else {
result[dcid] = append(result[dcid], &Triple{
SubjectID: dcid,
Predicate: prop.name,
ObjectValue: objCell,
ProvenanceID: prov,
})
}
}
}
nameNodes, err := getPropertyValuesHelper(ctx, s.store, objDcids, "name", true)
if err != nil {
return nil, err
}
for prop, nodes := range nameNodes {
if len(nodes) > 0 {
objTriples[prop].ObjectName = nodes[0].Value
}
}
// Sort the triples to get determinisic result.
keys := make([]string, 0)
for k := range objTriples {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
result[dcid] = append(result[dcid], objTriples[key])
}
}
return result, nil
}

func getObsTriplesPopObs(
ctx context.Context, s *Server, obsDcids []string) (map[string][]*Triple, error) {
result := map[string][]*Triple{}
for _, param := range []struct {
predKey, pred string
}{
{obsAncestorTypeObservedNode, "observedNode"},
{obsAncestorTypeComparedNode, "comparedNode"},
} {
rowList := buildObservedNodeKey(obsDcids, param.predKey)
baseDataMap, branchDataMap, err := bigTableReadRowsParallel(
ctx, s.store, rowList,
func(dcid string, raw []byte) (interface{}, error) {
return string(raw), nil
}, nil)
if err != nil {
return nil, err
}
// Map from observation dcid to observedNode dcid.
observedNodeMap := map[string]string{}
for _, dcid := range obsDcids {
if data, ok := branchDataMap[dcid]; ok {
observedNodeMap[dcid] = data.(string)
} else if data, ok := baseDataMap[dcid]; ok {
observedNodeMap[dcid] = data.(string)
}
}
// Get the observedNode names.
observedNodes := []string{}
for _, dcid := range observedNodeMap {
observedNodes = append(observedNodes, dcid)
}
nameRowList := buildPropertyValuesKey(observedNodes, "name", true)
nameNodes, err := readPropertyValues(ctx, s.store, nameRowList)
if err != nil {
return nil, err
}

for dcid, observedNode := range observedNodeMap {
if _, exist := result[dcid]; !exist {
result[dcid] = []*Triple{}
}
name := observedNode
if len(nameNodes[observedNode]) > 0 {
name = nameNodes[observedNode][0].Value
}
result[dcid] = append(result[dcid], &Triple{
SubjectID: dcid,
Predicate: param.pred,
ObjectID: observedNode,
ObjectName: name,
})
}
}
return result, nil
}

// GetTriples implements API for Mixer.GetTriples.
func (s *Server) GetTriples(ctx context.Context, in *pb.GetTriplesRequest) (
*pb.GetTriplesResponse, error) {
dcids := in.GetDcids()
limit := in.GetLimit()
svobsMode := s.metadata.SvObsMode

if len(dcids) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "Missing argument: dcids")
Expand Down Expand Up @@ -90,55 +243,21 @@ func (s *Server) GetTriples(ctx context.Context, in *pb.GetTriplesRequest) (

// Observation DCIDs.
if len(obsDcids) > 0 {
for _, param := range []struct {
predKey, pred string
}{
{obsAncestorTypeObservedNode, "observedNode"},
{obsAncestorTypeComparedNode, "comparedNode"},
} {
rowList := buildObservedNodeKey(obsDcids, param.predKey)
baseDataMap, branchDataMap, err := bigTableReadRowsParallel(
ctx, s.store, rowList,
func(dcid string, raw []byte) (interface{}, error) {
return string(raw), nil
}, nil)
var err error
var obsResult map[string][]*Triple
if svobsMode {
obsResult, err = getObsTriplesSvObs(ctx, s, obsDcids)
if err != nil {
return nil, err
}
// Map from observation dcid to observedNode dcid.
observedNodeMap := map[string]string{}
for _, dcid := range obsDcids {
if data, ok := branchDataMap[dcid]; ok {
observedNodeMap[dcid] = data.(string)
} else if data, ok := baseDataMap[dcid]; ok {
observedNodeMap[dcid] = data.(string)
}
}
// Get the observedNode names.
observedNodes := []string{}
for _, dcid := range observedNodeMap {
observedNodes = append(observedNodes, dcid)
}
nameRowList := buildPropertyValuesKey(observedNodes, "name", true)
nameNodes, err := readPropertyValues(ctx, s.store, nameRowList)
} else {
obsResult, err = getObsTriplesPopObs(ctx, s, obsDcids)
if err != nil {
return nil, err
}
for dcid, observedNode := range observedNodeMap {
if _, exist := resultsMap[dcid]; !exist {
resultsMap[dcid] = []*Triple{}
}
name := observedNode
if len(nameNodes[observedNode]) > 0 {
name = nameNodes[observedNode][0].Value
}
resultsMap[dcid] = append(resultsMap[dcid], &Triple{
SubjectID: dcid,
Predicate: param.pred,
ObjectID: observedNode,
ObjectName: name,
})
}
}
for k, v := range obsResult {
resultsMap[k] = append(resultsMap[k], v...)
}
}

Expand Down
Loading

0 comments on commit daf8a94

Please sign in to comment.