Skip to content

Commit

Permalink
Add condition operations to KfDef for reconcile semantics (kubeflow#4166
Browse files Browse the repository at this point in the history
)

* add plugin cond

* add IsPluginFinished

* add cond ops

* finish condition ops

* add tests

* comment

* add tests

* remove non-standard name in condition

* add conditions

* remove name completely

* fix tests

* add test

* add fail check

* add checks for conflicting conditions

* get plugin by kind instead of name

* move plugin kind defs to v1beta1

* use kind instead of str

* use plugin kind

* update set plugin spec

* fix test

* define plugin condition formatter

* remove hardcoded plugin conds

* fix test

* add doc
  • Loading branch information
gabrielwen authored and k8s-ci-robot committed Sep 26, 2019
1 parent 6a0fd60 commit e38c6f3
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 32 deletions.
8 changes: 0 additions & 8 deletions bootstrap/pkg/apis/apps/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,6 @@ const (
EXISTING_ARRIKTO = "existing_arrikto"
)

// Plugin kind used starting from v1beta1
const (
AWS_PLUGIN_KIND = "KfAwsPlugin"
GCP_PLUGIN_KIND = "KfGcpPlugin"
MINIKUBE_PLUGIN_KIND = "KfMinikubePlugin"
EXISTING_ARRIKTO_PLUGIN_KIND = "KfExistingArriktoPlugin"
)

// PackageManagers
const (
KSONNET = "ksonnet"
Expand Down
20 changes: 10 additions & 10 deletions bootstrap/pkg/apis/apps/kfdef/kfloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ func isValidUrl(toTest string) bool {
}

// Simple mapping from plugin name to plugin kind in v1beta1. Ideally we should
// use plugin kind to find handler functions.
func alphaPluginNameToBetaKind(pluginName string) string {
mapping := map[string]string{
kftypesv3.AWS: kftypesv3.AWS_PLUGIN_KIND,
kftypesv3.GCP: kftypesv3.GCP_PLUGIN_KIND,
kftypesv3.MINIKUBE: kftypesv3.MINIKUBE_PLUGIN_KIND,
kftypesv3.EXISTING_ARRIKTO: kftypesv3.EXISTING_ARRIKTO_PLUGIN_KIND,
// use plugin kind to find handler functions. Only used for backward compatibility.
func alphaPluginNameToBetaKind(pluginName string) kfdefv1beta1.PluginKindType {
mapping := map[string]kfdefv1beta1.PluginKindType{
kftypesv3.AWS: kfdefv1beta1.AWS_PLUGIN_KIND,
kftypesv3.GCP: kfdefv1beta1.GCP_PLUGIN_KIND,
kftypesv3.MINIKUBE: kfdefv1beta1.MINIKUBE_PLUGIN_KIND,
kftypesv3.EXISTING_ARRIKTO: kfdefv1beta1.EXISTING_ARRIKTO_PLUGIN_KIND,
}

kind, ok := mapping[pluginName]
Expand Down Expand Up @@ -114,7 +114,7 @@ func copyGcpPluginSpec(from *kfdefv1alpha1.KfDef, to *kfdefv1beta1.KfDef) error
}

spec := kfgcp.GcpPluginSpec{}
if err := to.GetPluginSpec(kftypesv3.GCP, &spec); err != nil && !kfdefv1beta1.IsPluginNotFound(err) {
if err := to.GetPluginSpec(kfdefv1beta1.GCP_PLUGIN_KIND, &spec); err != nil && !kfdefv1beta1.IsPluginNotFound(err) {
return err
}
spec.Project = from.Spec.Project
Expand All @@ -125,15 +125,15 @@ func copyGcpPluginSpec(from *kfdefv1alpha1.KfDef, to *kfdefv1beta1.KfDef) error
spec.UseBasicAuth = from.Spec.UseBasicAuth
spec.SkipInitProject = from.Spec.SkipInitProject
spec.DeleteStorage = from.Spec.DeleteStorage
return to.SetPluginSpec(kftypesv3.GCP, spec)
return to.SetPluginSpec(kfdefv1beta1.GCP_PLUGIN_KIND, spec)
}

// Copy plugins configs.
func copyPlugins(from *kfdefv1alpha1.KfDef, to *kfdefv1beta1.KfDef) {
for _, plugin := range from.Spec.Plugins {
betaPlugin := kfdefv1beta1.Plugin{}
betaPlugin.Name = plugin.Name
betaPlugin.Kind = alphaPluginNameToBetaKind(plugin.Name)
betaPlugin.Kind = string(alphaPluginNameToBetaKind(plugin.Name))
if plugin.Spec != nil {
betaPlugin.Spec = &runtime.RawExtension{}
*betaPlugin.Spec = *plugin.Spec
Expand Down
144 changes: 133 additions & 11 deletions bootstrap/pkg/apis/apps/kfdef/v1beta1/application_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ const (

// Used for populating plugin missing errors and identifying those
// errors.
notFoundErrPrefix = "Missing plugin"
pluginNotFoundErrPrefix = "Missing plugin"

// Used for populating plugin missing errors and identifying those
// errors.
conditionNotFoundErrPrefix = "Missing condition"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -83,6 +87,16 @@ type NameValue struct {
Value string `json:"value,omitempty"`
}

type PluginKindType string

// Plugin kind used starting from v1beta1
const (
AWS_PLUGIN_KIND PluginKindType = "KfAwsPlugin"
GCP_PLUGIN_KIND PluginKindType = "KfGcpPlugin"
MINIKUBE_PLUGIN_KIND PluginKindType = "KfMinikubePlugin"
EXISTING_ARRIKTO_PLUGIN_KIND PluginKindType = "KfExistingArriktoPlugin"
)

// Plugin can be used to customize the generation and deployment of Kubeflow
type Plugin struct {
metav1.TypeMeta `json:",inline"`
Expand Down Expand Up @@ -150,6 +164,16 @@ const (
KfDegraded KfDefConditionType = "Degraded"
)

// Define plugin related conditions to be the format:
// - conditions for successful plugins: ${PluginKind}Succeeded
// - conditions for failed plugins: ${PluginKind}Failed
func GetPluginSucceededCondition(pluginKind PluginKindType) KfDefConditionType {
return KfDefConditionType(fmt.Sprintf("%vSucceeded", pluginKind))
}
func GetPluginFailedCondition(pluginKind PluginKindType) KfDefConditionType {
return KfDefConditionType(fmt.Sprintf("%vFailed", pluginKind))
}

type KfDefConditionReason string

const (
Expand All @@ -175,9 +199,9 @@ type KfDefCondition struct {
// GetPluginSpec will try to unmarshal the spec for the specified plugin to the supplied
// interface. Returns an error if the plugin isn't defined or if there is a problem
// unmarshaling it.
func (d *KfDef) GetPluginSpec(pluginName string, s interface{}) error {
func (d *KfDef) GetPluginSpec(pluginKind PluginKindType, s interface{}) error {
for _, p := range d.Spec.Plugins {
if p.Name != pluginName {
if p.Kind != string(pluginKind) {
continue
}

Expand All @@ -186,7 +210,7 @@ func (d *KfDef) GetPluginSpec(pluginName string, s interface{}) error {
specBytes, err := yaml.Marshal(p.Spec)

if err != nil {
msg := fmt.Sprintf("Could not marshal plugin %v args; error %v", pluginName, err)
msg := fmt.Sprintf("Could not marshal plugin %v args; error %v", pluginKind, err)
log.Errorf(msg)
return &kfapis.KfError{
Code: int(kfapis.INVALID_ARGUMENT),
Expand All @@ -197,7 +221,7 @@ func (d *KfDef) GetPluginSpec(pluginName string, s interface{}) error {
err = yaml.Unmarshal(specBytes, s)

if err != nil {
msg := fmt.Sprintf("Could not unmarshal plugin %v to the provided type; error %v", pluginName, err)
msg := fmt.Sprintf("Could not unmarshal plugin %v to the provided type; error %v", pluginKind, err)
log.Errorf(msg)
return &kfapis.KfError{
Code: int(kfapis.INTERNAL_ERROR),
Expand All @@ -209,12 +233,100 @@ func (d *KfDef) GetPluginSpec(pluginName string, s interface{}) error {

return &kfapis.KfError{
Code: int(kfapis.NOT_FOUND),
Message: fmt.Sprintf("%v %v", notFoundErrPrefix, pluginName),
Message: fmt.Sprintf("%v %v", pluginNotFoundErrPrefix, pluginKind),
}
}

// Sets condition and status to KfDef.
func (d *KfDef) SetCondition(condType KfDefConditionType,
status v1.ConditionStatus,
reason string,
message string) {
now := metav1.Now()
cond := KfDefCondition{
Type: condType,
Status: status,
LastUpdateTime: now,
LastTransitionTime: now,
Reason: reason,
Message: message,
}

for i := range d.Status.Conditions {
if d.Status.Conditions[i].Type != condType {
continue
}
if d.Status.Conditions[i].Status == status {
cond.LastTransitionTime = d.Status.Conditions[i].LastTransitionTime
}
d.Status.Conditions[i] = cond
return
}
d.Status.Conditions = append(d.Status.Conditions, cond)
}

// Gets condition from KfDef.
func (d *KfDef) GetCondition(condType KfDefConditionType) (*KfDefCondition, error) {
for i := range d.Status.Conditions {
if d.Status.Conditions[i].Type == condType {
return &d.Status.Conditions[i], nil
}
}
return nil, &kfapis.KfError{
Code: int(kfapis.NOT_FOUND),
Message: fmt.Sprintf("%v %v", conditionNotFoundErrPrefix, condType),
}
}

// Check if a plugin is finished.
func (d *KfDef) IsPluginFinished(pluginKind PluginKindType) bool {
condType := GetPluginSucceededCondition(pluginKind)
cond, err := d.GetCondition(condType)
if err != nil {
log.Warnf("error when getting condition info: %v", err)
return false
}
return cond.Status == v1.ConditionTrue
}

// Set a plugin as finished.
func (d *KfDef) SetPluginFinished(pluginKind PluginKindType, msg string) {
succeededCond := GetPluginSucceededCondition(pluginKind)
failedCond := GetPluginFailedCondition(pluginKind)
if _, err := d.GetCondition(failedCond); err == nil {
d.SetCondition(failedCond, v1.ConditionFalse,
"", "Reset to false as the plugin is set to be finished.")
}

d.SetCondition(succeededCond, v1.ConditionTrue, "", msg)
}

func (d *KfDef) IsPluginFailed(pluginKind PluginKindType) bool {
condType := GetPluginFailedCondition(pluginKind)
cond, err := d.GetCondition(condType)
if err != nil {
if IsConditionNotFound(err) {
return false
}
log.Warnf("error when getting condition info: %v", err)
return false
}
return cond.Status == v1.ConditionTrue
}

func (d *KfDef) SetPluginFailed(pluginKind PluginKindType, msg string) {
succeededCond := GetPluginSucceededCondition(pluginKind)
failedCond := GetPluginFailedCondition(pluginKind)
if _, err := d.GetCondition(succeededCond); err == nil {
d.SetCondition(succeededCond, v1.ConditionFalse,
"", "Reset to false as the plugin is set to be failed.")
}

d.SetCondition(failedCond, v1.ConditionTrue, "", msg)
}

// SetPluginSpec sets the requested parameter. The plugin is added if it doesn't already exist.
func (d *KfDef) SetPluginSpec(pluginName string, spec interface{}) error {
func (d *KfDef) SetPluginSpec(pluginKind PluginKindType, spec interface{}) error {
// Convert spec to RawExtension
r := &runtime.RawExtension{}

Expand Down Expand Up @@ -245,18 +357,19 @@ func (d *KfDef) SetPluginSpec(pluginName string, spec interface{}) error {
index := -1

for i, p := range d.Spec.Plugins {
if p.Name == pluginName {
if p.Kind == string(pluginKind) {
index = i
break
}
}

if index == -1 {
// Plugin in doesn't exist so add it
log.Infof("Adding plugin %v", pluginName)
log.Infof("Adding plugin %v", pluginKind)

p := Plugin{}
p.Name = pluginName
p.Name = string(pluginKind)
p.Kind = string(pluginKind)
d.Spec.Plugins = append(d.Spec.Plugins, p)

index = len(d.Spec.Plugins) - 1
Expand All @@ -271,5 +384,14 @@ func IsPluginNotFound(e error) bool {
return false
}
err, ok := e.(*kfapis.KfError)
return ok && err.Code == int(kfapis.NOT_FOUND) && strings.HasPrefix(err.Message, notFoundErrPrefix)
return ok && err.Code == int(kfapis.NOT_FOUND) && strings.HasPrefix(err.Message, pluginNotFoundErrPrefix)
}

func IsConditionNotFound(e error) bool {
if e == nil {
return false
}
err, ok := e.(*kfapis.KfError)
return ok && err.Code == int(kfapis.NOT_FOUND) &&
strings.HasPrefix(err.Message, conditionNotFoundErrPrefix)
}
Loading

0 comments on commit e38c6f3

Please sign in to comment.