diff --git a/gcf/custom_import_controller/custom_instance.go b/gcf/custom_import_controller/custom_instance.go index d4dc662..b2bfd57 100644 --- a/gcf/custom_import_controller/custom_instance.go +++ b/gcf/custom_import_controller/custom_instance.go @@ -109,7 +109,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error { } dataDirectory := fmt.Sprintf("%s/data/", importRootDir) - manifest, err := lib.GenerateManifest(ctx, bucket, dataDirectory) + manifest, err := GenerateManifest(ctx, bucket, dataDirectory) if err != nil { log.Fatalf("unable to generate manifest: %v", err) return err diff --git a/gcf/custom_import_controller/manifest.go b/gcf/custom_import_controller/manifest.go new file mode 100644 index 0000000..34b77e0 --- /dev/null +++ b/gcf/custom_import_controller/manifest.go @@ -0,0 +1,151 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package custom + +import ( + "context" + "fmt" + "log" + "path/filepath" + "strings" + + "cloud.google.com/go/storage" + "github.com/datacommonsorg/tools/gcf/lib" + pb "github.com/datacommonsorg/tools/gcf/proto" + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" +) + +// pathToDataFolder is the "gs://"-prefixed folder that contains raw data. +// For the folder structure that is expected, please see, +// https://docs.datacommons.org/custom_dc/upload_data.html +func GenerateManifest(ctx context.Context, bucket, pathToDataFolder string) ( + *pb.DataCommonsManifest, error, +) { + client, err := storage.NewClient(ctx) + if err != nil { + return nil, err + } + + // IMPORTANT NOTE: importGroupName has a character limit of 21 + rootFolder := filepath.Base( + filepath.Dir(strings.TrimSuffix(pathToDataFolder, "/"))) + if len(rootFolder) > 20 { + rootFolder = rootFolder[:20] + } + importGroupName := rootFolder + + // Construct a list of Manifest imports and data sources as we + // walk through the data folder in GCS. + datasetSources := []*pb.DataCommonsManifest_DatasetSource{} + imports := []*pb.DataCommonsManifest_Import{} + + // Find all sources + sources, err := lib.FindFolders(ctx, client, bucket, pathToDataFolder) + for _, source := range sources { + + sourceName := filepath.Base(strings.TrimSuffix(source, "/")) + log.Printf("Found source: %s\n", sourceName) + datasetNames := []string{} + + // Find file groups + fileGroups, err := lib.FindFolders(ctx, client, bucket, source) + if err != nil { + return nil, err + } + + // Find all tmcf csvs in a file group + for _, fileGroup := range fileGroups { + + datasetName := filepath.Base(strings.TrimSuffix(fileGroup, "/")) + datasetNames = append(datasetNames, datasetName) + log.Printf("Found dataset: %s\n", datasetName) + + // all subfolders are tmcf csv files. + tmcfCSVs, err := lib.FindFiles(ctx, client, bucket, fileGroup) + if err != nil { + return nil, err + } + + var tmcfPath string + csvPaths := []string{} + for _, tmcfCSV := range tmcfCSVs { + if filepath.Ext(tmcfCSV) == ".tmcf" { + // There should only be 1 TMCF file + if len(tmcfPath) > 0 { + return nil, errors.Errorf("more than 1 tmcf file found in %s", fileGroup) + } + tmcfPath = lib.BigStorePath(bucket, tmcfCSV) + log.Printf("Found tmcf: %s\n", tmcfPath) + continue + } + if filepath.Ext(tmcfCSV) == ".csv" { + csvPath := lib.BigStorePath(bucket, tmcfCSV) + csvPaths = append(csvPaths, csvPath) + log.Printf("Found csv: %s\n", csvPath) + continue + } + // all other file types are ignored. + } + + mcfProtoUrl := lib.BigStorePath(bucket, fmt.Sprintf("%s%s", fileGroup, "graph.tfrecord@*.gz")) + + imports = append(imports, &pb.DataCommonsManifest_Import{ + ImportName: proto.String(datasetName), // Use datasetName for import name. + Category: pb.DataCommonsManifest_STATS.Enum(), + ProvenanceUrl: proto.String("https://datacommons.org/"), // Dummy URL + McfProtoUrl: []string{mcfProtoUrl}, + ImportGroups: []string{importGroupName}, + ResolutionInfo: &pb.ResolutionInfo{UsesIdResolver: proto.Bool(true)}, + DatasetName: proto.String(datasetName), + AutomatedMcfGenerationBy: proto.String(importGroupName), + Table: []*pb.ExternalTable{ + { + MappingPath: proto.String(tmcfPath), + CsvPath: csvPaths, + }, + }, + }) + + } + + ds := &pb.DataCommonsManifest_DatasetSource{ + Url: proto.String("https://datacommons.org/"), + Name: proto.String(sourceName), + Datasets: []*pb.DataCommonsManifest_DatasetInfo{}, + } + for i, datasetName := range datasetNames { + ds.Datasets[i] = &pb.DataCommonsManifest_DatasetInfo{ + Name: proto.String(datasetName), + Url: ds.Url, // Dummy URL + } + } + datasetSources = append(datasetSources, ds) + } + + importGroups := []*pb.DataCommonsManifest_ImportGroup{ + { + Name: proto.String(importGroupName), + IsCustomDc: proto.Bool(true), + Description: proto.String("Custom DC import group"), + }, + } + + return &pb.DataCommonsManifest{ + Import: imports, + DatasetSource: datasetSources, + ImportGroups: importGroups, + }, nil +} diff --git a/gcf/lib/gcs.go b/gcf/lib/gcs.go index 62e26c1..68a1b61 100644 --- a/gcf/lib/gcs.go +++ b/gcf/lib/gcs.go @@ -29,16 +29,12 @@ package lib import ( "context" - "fmt" "log" "path/filepath" "strings" "cloud.google.com/go/storage" "google.golang.org/api/iterator" - - pb "github.com/datacommonsorg/tools/gcf/proto" - "github.com/pkg/errors" ) // FindFolders find all the folders @@ -94,103 +90,3 @@ func FindFiles(ctx context.Context, c *storage.Client, bucket, pathPrefix string func BigStorePath(bucket, path string) string { return filepath.Join("/bigstore", bucket, path) } - -// pathToDataFolder is the "gs://"-prefixed folder that contains raw data. -// For the folder structure that is expected, please see, -// https://docs.datacommons.org/custom_dc/upload_data.html -func GenerateManifest(ctx context.Context, bucket, pathToDataFolder string) (*pb.DataCommonsManifest, error) { - client, err := storage.NewClient(ctx) - if err != nil { - return nil, err - } - - // IMPORTANT NOTE: importGroupName has a character limit of 21 - rootFolder := filepath.Base(filepath.Dir(strings.TrimSuffix(pathToDataFolder, "/"))) - if len(rootFolder) > 20 { - rootFolder = rootFolder[:20] - } - importGroupName := rootFolder - - // Construct a list of Manifest imports and data sources as we - // walk through the data folder in GCS. - dataSourceParams := []*DataSourceParams{} - importParams := []*ImportParams{} - - // Find all sources - sources, err := FindFolders(ctx, client, bucket, pathToDataFolder) - for _, source := range sources { - - sourceName := filepath.Base(strings.TrimSuffix(source, "/")) - log.Printf("Found source: %s\n", sourceName) - datasetNames := []string{} - - // Find file groups - fileGroups, err := FindFolders(ctx, client, bucket, source) - if err != nil { - return nil, err - } - - // Find all tmcf csvs in a file group - for _, fileGroup := range fileGroups { - - datasetName := filepath.Base(strings.TrimSuffix(fileGroup, "/")) - datasetNames = append(datasetNames, datasetName) - log.Printf("Found dataset: %s\n", datasetName) - - // all subfolders are tmcf csv files. - tmcfCSVs, err := FindFiles(ctx, client, bucket, fileGroup) - if err != nil { - return nil, err - } - - var tmcfPath string - csvPaths := []string{} - for _, tmcfCSV := range tmcfCSVs { - if filepath.Ext(tmcfCSV) == ".tmcf" { - // There should only be 1 TMCF file - if len(tmcfPath) > 0 { - return nil, errors.Errorf("more than 1 tmcf file found in %s", fileGroup) - } - tmcfPath = BigStorePath(bucket, tmcfCSV) - log.Printf("Found tmcf: %s\n", tmcfPath) - continue - } - if filepath.Ext(tmcfCSV) == ".csv" { - csvPath := BigStorePath(bucket, tmcfCSV) - csvPaths = append(csvPaths, csvPath) - log.Printf("Found csv: %s\n", csvPath) - continue - } - // all other file types are ignored. - } - - mcfProtoUrl := BigStorePath(bucket, fmt.Sprintf("%s%s", fileGroup, "graph.tfrecord@*.gz")) - - importParams = append(importParams, &ImportParams{ - ImportName: datasetName, // Use datasetName for import name. - ImportGroupName: importGroupName, - DatasetName: datasetName, - DataSourceName: sourceName, - MCFProtoURL: mcfProtoUrl, - TMCFPath: tmcfPath, - CSVPaths: csvPaths, - }) - - } - - dataSourceParams = append(dataSourceParams, &DataSourceParams{ - DataSourceName: sourceName, - URL: "https://datacommons.org/", - DatasetNames: datasetNames, - }) - - } - - manifestParams := &ManifestParams{ - ImportsParams: importParams, - DataSourcesParams: dataSourceParams, - ImportGroupName: importGroupName, - } - - return manifestParams.ManifestProto(), nil -} diff --git a/gcf/lib/manifest.go b/gcf/lib/manifest.go deleted file mode 100644 index 2538486..0000000 --- a/gcf/lib/manifest.go +++ /dev/null @@ -1,104 +0,0 @@ -package lib - -import ( - pb "github.com/datacommonsorg/tools/gcf/proto" - "google.golang.org/protobuf/proto" -) - -type ImportParams struct { - ImportName string - ImportGroupName string - DatasetName string - DataSourceName string - MCFProtoURL string - TMCFPath string - CSVPaths []string -} - -type DataSourceParams struct { - DataSourceName string - URL string - DatasetNames []string -} - -type ManifestParams struct { - ImportsParams []*ImportParams - DataSourcesParams []*DataSourceParams - ImportGroupName string -} - -func (ip *ImportParams) ImportProto() *pb.DataCommonsManifest_Import { - return &pb.DataCommonsManifest_Import{ - ImportName: proto.String(ip.ImportName), - Category: pb.DataCommonsManifest_STATS.Enum(), - ProvenanceUrl: proto.String("https://datacommons.org/"), // Dummy URL - McfProtoUrl: []string{ip.MCFProtoURL}, - ImportGroups: []string{ip.ImportGroupName}, - ResolutionInfo: &pb.ResolutionInfo{UsesIdResolver: proto.Bool(true)}, - DatasetName: proto.String(ip.DatasetName), - AutomatedMcfGenerationBy: proto.String(ip.ImportGroupName), - Table: []*pb.ExternalTable{ - &pb.ExternalTable{ - MappingPath: proto.String(ip.TMCFPath), - CsvPath: ip.CSVPaths, - }, - }, - } -} - -func (dsp *DataSourceParams) DataSourceProto() *pb.DataCommonsManifest_DatasetSource { - datasets := make([]*pb.DataCommonsManifest_DatasetInfo, len(dsp.DatasetNames)) - for i, datasetName := range dsp.DatasetNames { - datasets[i] = &pb.DataCommonsManifest_DatasetInfo{ - Name: proto.String(datasetName), - // Dummy URL - Url: proto.String(dsp.URL), - } - } - - return &pb.DataCommonsManifest_DatasetSource{ - Name: proto.String(dsp.DataSourceName), - // Dummy URL - Url: proto.String(dsp.URL), - Datasets: datasets, - } -} - -func NewManifestParams() *ManifestParams { - manifestParams := &ManifestParams{} - return manifestParams -} - -func (mp *ManifestParams) ImportsProto() []*pb.DataCommonsManifest_Import { - importsProto := make([]*pb.DataCommonsManifest_Import, len(mp.ImportsParams)) - for i, ip := range mp.ImportsParams { - importsProto[i] = ip.ImportProto() - } - return importsProto -} - -func (mp *ManifestParams) DataSourcesProto() []*pb.DataCommonsManifest_DatasetSource { - dataSourcesProto := make([]*pb.DataCommonsManifest_DatasetSource, len(mp.DataSourcesParams)) - for i, dsp := range mp.DataSourcesParams { - dataSourcesProto[i] = dsp.DataSourceProto() - } - return dataSourcesProto -} - -func (mp *ManifestParams) ImportGroupsProto() []*pb.DataCommonsManifest_ImportGroup { - return []*pb.DataCommonsManifest_ImportGroup{ - &pb.DataCommonsManifest_ImportGroup{ - Name: proto.String(mp.ImportGroupName), - IsCustomDc: proto.Bool(true), - Description: proto.String("Custom DC import group"), - }, - } -} - -func (mp *ManifestParams) ManifestProto() *pb.DataCommonsManifest { - return &pb.DataCommonsManifest{ - Import: mp.ImportsProto(), - DatasetSource: mp.DataSourcesProto(), - ImportGroups: mp.ImportGroupsProto(), - } -}