Skip to content

Commit

Permalink
saving changes
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Oct 16, 2023
1 parent e830db5 commit 644d2d1
Show file tree
Hide file tree
Showing 19 changed files with 534 additions and 187 deletions.
3 changes: 3 additions & 0 deletions flyteartifacts/artifact_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ artifactsServer:
database:
postgres:
dbname: artifacts
logger:
level: 5
show-source: true
9 changes: 3 additions & 6 deletions flyteartifacts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jinzhu/gorm v1.9.16
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.56.1
gorm.io/gorm v1.25.5
)
Expand All @@ -39,6 +39,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.7 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
Expand All @@ -54,17 +55,12 @@ require (
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.4.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/lib/pq v1.2.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand All @@ -74,6 +70,7 @@ require (
github.com/ncw/swift v1.0.53 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down
98 changes: 1 addition & 97 deletions flyteartifacts/go.sum

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package db

import (
"gorm.io/gorm"
)

type ArtifactKey struct {
gorm.Model
Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
}
type Artifact struct {
gorm.Model
// gatepr: this doesn't actually create a foreign key...
ArtifactKeyID uint
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_artifact_version"`
ExecutionName string `gorm:"type:varchar(255)"`
}

//type Artifact struct {
// gorm.Model
// // gatepr: this doesn't actually create a foreign key...
// ArtifactKeyID uint
// ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
// Version string `gorm:"not null;type:varchar(255);index:idx_artifact_version"`
// Partitions postgres.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`
//
// LiteralType []byte `gorm:"not null"`
// LiteralValue []byte `gorm:"not null"`
//
// Description string `gorm:"type:varchar(255)"`
// MetadataType string `gorm:"type:varchar(64)"`
// OffloadedUserMetadata string `gorm:"type:varchar(255)"`
//
// // Project/Domain assumed to always be the same as the Artifact
// ExecutionName string `gorm:"type:varchar(255)"`
// WorkflowProject string `gorm:"type:varchar(64)"`
// WorkflowDomain string `gorm:"type:varchar(64)"`
// WorkflowName string `gorm:"type:varchar(255)"`
// WorkflowVersion string `gorm:"type:varchar(255)"`
// TaskProject string `gorm:"type:varchar(64)"`
// TaskDomain string `gorm:"type:varchar(64)"`
// TaskName string `gorm:"type:varchar(255)"`
// TaskVersion string `gorm:"type:varchar(255)"`
// NodeID string `gorm:"type:varchar(64)"`
// // See Admin migration for note.
// // Here nullable in the case of workflow output.
// RetryAttempt *uint32
//}
30 changes: 30 additions & 0 deletions flyteartifacts/pkg/db/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package db

import (
"time"

"github.com/flyteorg/flyte/flytestdlib/promutils"
)

// Common metrics emitted by gormimpl repos.
type gormMetrics struct {
Scope promutils.Scope
CreateDuration promutils.StopWatch
GetDuration promutils.StopWatch
UpdateDuration promutils.StopWatch
SearchDuration promutils.StopWatch
}

func newMetrics(scope promutils.Scope) gormMetrics {
return gormMetrics{
Scope: scope,
CreateDuration: scope.MustNewStopWatch(
"create", "time taken to create a new entry", time.Millisecond),
GetDuration: scope.MustNewStopWatch(
"get", "time taken to get an entry", time.Millisecond),
UpdateDuration: scope.MustNewStopWatch(
"update", "time taken to update an entry", time.Millisecond),
SearchDuration: scope.MustNewStopWatch(
"search", "time taken for searching", time.Millisecond),
}
}
88 changes: 88 additions & 0 deletions flyteartifacts/pkg/db/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package db

import (
"github.com/go-gormigrate/gormigrate/v2"
"gorm.io/gorm"
)

var Migrations = []*gormigrate.Migration{
{
ID: "2023-10-12-inits",
Migrate: func(tx *gorm.DB) error {
type ArtifactKey struct {
gorm.Model
Project string `gorm:"uniqueIndex:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}
type Artifact struct {
gorm.Model
ArtifactKeyID uint `gorm:"uniqueIndex:idx_pdnv"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"type:varchar(255);index:idx_artifact_version;uniqueIndex:idx_pdnv"`
ExecutionName string `gorm:"type:varchar(255)"`
}

err := tx.AutoMigrate(
&ArtifactKey{}, &Artifact{},
)
return err
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
"artifact_keys", "artifacts",
)
},
},
}

//var Migrations = []*gormigrate.Migration{
// {
// ID: "2023-10-12-inits",
// Migrate: func(tx *gorm.DB) error {
// type ArtifactKey struct {
// gorm.Model
// Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
// Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
// Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
// }
// type Artifact struct {
// gorm.Model
// ArtifactKeyID uint `gorm:"uniqueIndex:idx_pdnv"`
// ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
// Version string `gorm:"type:varchar(255);index:idx_artifact_version;uniqueIndex:idx_pdnv"`
// Partitions postgres.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`
//
// LiteralType []byte `gorm:"not null"`
// LiteralValue []byte `gorm:"not null"`
//
// Description string `gorm:"type:varchar(255)"`
// MetadataType string `gorm:"type:varchar(64)"`
// OffloadedUserMetadata string `gorm:"type:varchar(255)"`
//
// // Project/Domain assumed to always be the same as the Artifact
// ExecutionName string `gorm:"type:varchar(255)"`
// WorkflowProject string `gorm:"type:varchar(64)"`
// WorkflowDomain string `gorm:"type:varchar(64)"`
// WorkflowName string `gorm:"type:varchar(255)"`
// WorkflowVersion string `gorm:"type:varchar(255)"`
// TaskProject string `gorm:"type:varchar(64)"`
// TaskDomain string `gorm:"type:varchar(64)"`
// TaskName string `gorm:"type:varchar(255)"`
// TaskVersion string `gorm:"type:varchar(255)"`
// NodeID string `gorm:"type:varchar(64)"`
// // See Admin migration for note.
// // Here nullable in the case of workflow output.
// RetryAttempt *uint32
// }
// return tx.AutoMigrate(
// &ArtifactKey{}, &Artifact{},
// )
// },
// Rollback: func(tx *gorm.DB) error {
// return tx.Migrator().DropTable(
// "markers",
// )
// },
// },
//}
44 changes: 44 additions & 0 deletions flyteartifacts/pkg/db/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package db

import (
"context"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flytestdlib/database"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"gorm.io/gorm"
)

// RDSStorage should implement StorageInterface
type RDSStorage struct {
config database.DbConfig
db *gorm.DB
metrics gormMetrics
}

// WriteOne is a test function
func (r *RDSStorage) WriteOne(ctx context.Context, gormModel Artifact) (models.Artifact, error) {
timer := r.metrics.CreateDuration.Start()
logger.Debugf(ctx, "Attempt create artifact %s", gormModel.Version)
tx := r.db.Omit("id").Create(&gormModel)
timer.Stop()
if tx.Error != nil {
return models.Artifact{}, tx.Error
}
return models.Artifact{}, nil
}

func NewStorage(ctx context.Context, scope promutils.Scope) *RDSStorage {
dbCfg := database.GetConfig()
logConfig := logger.GetConfig()

db, err := database.GetDB(ctx, dbCfg, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
return &RDSStorage{
config: *dbCfg,
db: db,
metrics: newMetrics(scope.NewSubScope("rds")),
}
}
90 changes: 90 additions & 0 deletions flyteartifacts/pkg/db/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//go:build local_integration

package db

import (
"context"
"fmt"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/config/viper"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
"testing"
)

func TestWriteOne(t *testing.T) {
ctx := context.Background()
configAccessor := viper.NewAccessor(config.Options{
SearchPaths: []string{"/Users/ytong/go/src/github.com/flyteorg/flyte/flyteartifacts/sandbox.yaml"},
StrictMode: false,
})
err := configAccessor.UpdateConfig(ctx)

fmt.Println("Local integration testing using: ", configAccessor.ConfigFilesUsed())
scope := promutils.NewTestScope()
rds := NewStorage(ctx, scope)

gormA := Artifact{
ArtifactKey: ArtifactKey{
Project: "demotst",
Domain: "unit",
Name: "testname 1",
},
Version: "abc123/1/n0/4",
ExecutionName: "ddd",
}

a, err := rds.WriteOne(ctx, gormA)
assert.NoError(t, err)
fmt.Println(a, err)
}

//func TestWriteOne(t *testing.T) {
// ctx := context.Background()
// configAccessor := viper.NewAccessor(config.Options{
// SearchPaths: []string{"/Users/ytong/go/src/github.com/flyteorg/flyte/flyteartifacts/sandbox.yaml"},
// StrictMode: false,
// })
// err := configAccessor.UpdateConfig(ctx)
//
// fmt.Println("Local integration testing using: ", configAccessor.ConfigFilesUsed())
// scope := promutils.NewTestScope()
// rds := NewStorage(ctx, scope)
//
// one := uint32(1)
// pval1 := "51"
// p := map[string]*string{"area": &pval1}
//
// lt := &core.LiteralType{
// Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER},
// }
// lit := &core.Literal{
// Value: &core.Literal_Scalar{
// Scalar: &core.Scalar{
// Value: &core.Scalar_Primitive{
// &core.Primitive{
// Value: &core.Primitive_Integer{15},
// },
// },
// },
// },
// }
//
// ltBytes, err := proto.Marshal(lt)
// assert.NoError(t, err)
// litBytes, err := proto.Marshal(lit)
// assert.NoError(t, err)
//
// gormA := Artifact{
// ArtifactKey: ArtifactKey{
// Project: "demotst",
// Domain: "unit",
// Name: "testname 1",
// },
// Version: "abc123/1/n0/4",
// ExecutionName: "ddd",
// }
//
// a, err := rds.WriteOne(ctx, gormA)
// fmt.Println(a, err)
//}
Loading

0 comments on commit 644d2d1

Please sign in to comment.