forked from Hydrospheredata/kubeflow-workshop
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline_recurring.py
149 lines (131 loc) · 5.35 KB
/
pipeline_recurring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import kfp.dsl as dsl
import kubernetes.client.models as k8s
import argparse
@dsl.pipeline(name="mnist", description="MNIST classifier")
def pipeline_definition(
hydrosphere_address,
mount_path="/storage",
learning_rate="0.0005",
epochs="3",
batch_size="256",
model_name="mnist",
acceptable_accuracy="0.50",
):
storage_pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="storage")
storage_volume = k8s.V1Volume(name="storage", persistent_volume_claim=storage_pvc)
storage_volume_mount = k8s.V1VolumeMount(
mount_path="{{workflow.parameters.mount-path}}", name="storage")
# 1. Make a sample of production data for retraining
sample = dsl.ContainerOp(
name="sample",
image="tidylobster/mnist-pipeline-sample:latest", # <-- Replace with correct docker image
file_outputs={"data_path": "/data_path.txt"},
arguments=[
"--mount-path", mount_path,
"--hydrosphere-address", hydrosphere_address,
"--model-name", model_name,
],
)
sample.add_volume(storage_volume)
sample.add_volume_mount(storage_volume_mount)
# 2. Train and save a MNIST classifier using Tensorflow
train = dsl.ContainerOp(
name="train",
image="tidylobster/mnist-pipeline-train:latest", # <-- Replace with correct docker image
file_outputs={
"accuracy": "/accuracy.txt",
"model_path": "/model_path.txt",
},
command=[
"python", "train-resnet.py",
"--data-path", sample.outputs["data_path"],
"--mount-path", mount_path,
"--learning-rate", learning_rate,
"--epochs", epochs,
"--batch-size", batch_size
]
)
train.add_volume(storage_volume)
train.add_volume_mount(storage_volume_mount)
train.after(sample)
train.set_memory_request('1G')
train.set_cpu_request('1')
# 3. Release trained model to the cluster
release = dsl.ContainerOp(
name="release",
image="tidylobster/mnist-pipeline-release:latest", # <-- Replace with correct docker image
file_outputs={"model-version": "/model-version.txt"},
arguments=[
"--data-path", sample.outputs["data_path"],
"--mount-path", mount_path,
"--model-name", model_name,
"--model-path", train.outputs["model_path"],
"--accuracy", train.outputs["accuracy"],
"--hydrosphere-address", hydrosphere_address,
"--learning-rate", learning_rate,
"--epochs", epochs,
"--batch-size", batch_size,
]
)
release.add_volume(storage_volume)
release.add_volume_mount(storage_volume_mount)
release.after(train)
# 4. Deploy to stage application
deploy_to_stage = dsl.ContainerOp(
name="deploy_to_stage",
image="tidylobster/mnist-pipeline-deploy-to-stage:latest", # <-- Replace with correct docker image
file_outputs={"stage-app-name": "/stage-app-name.txt"},
arguments=[
"--model-version", release.outputs["model-version"],
"--hydrosphere-address", hydrosphere_address,
"--model-name", model_name,
],
)
deploy_to_stage.after(release)
# 5. Test the model
test = dsl.ContainerOp(
name="test",
image="tidylobster/mnist-pipeline-test:latest", # <-- Replace with correct docker image
arguments=[
"--stage-app-name", deploy_to_stage.outputs["stage-app-name"],
"--data-path", sample.outputs["data_path"],
"--mount-path", mount_path,
"--hydrosphere-address", hydrosphere_address,
"--acceptable-accuracy", acceptable_accuracy,
"--model-name", model_name,
],
)
test.add_volume(storage_volume)
test.add_volume_mount(storage_volume_mount)
test.after(deploy_to_stage)
test.set_retry(3)
# 6. Deploy to production application
deploy_to_prod = dsl.ContainerOp(
name="deploy_to_prod",
image="tidylobster/mnist-pipeline-deploy-to-prod:latest", # <-- Replace with correct docker image
arguments=[
"--model-version", release.outputs["model-version"],
"--model-name", model_name,
"--hydrosphere-address", hydrosphere_address
],
)
deploy_to_prod.after(test)
if __name__ == "__main__":
import kfp.compiler as compiler
import subprocess, sys
# Parse namespace
parser = argparse.ArgumentParser()
parser.add_argument(
'-f', '--file', help='New pipeline file', default="pipeline.tar.gz")
parser.add_argument(
'-n', '--namespace', help="Namespace, where kubeflow and serving are running", required=True)
args = parser.parse_args()
arguments = args.__dict__
namespace, file = arguments["namespace"], arguments["file"]
compiler.Compiler().compile(pipeline_definition, file)
untar = f"tar -xvf {file}"
replace_minio = f"sed -i '' s/minio-service.kubeflow/minio-service.{namespace}/g pipeline.yaml"
replace_pipeline_runner = f"sed -i '' s/pipeline-runner/{namespace}-pipeline-runner/g pipeline.yaml"
process = subprocess.run(untar.split())
process = subprocess.run(replace_minio.split())
process = subprocess.run(replace_pipeline_runner.split())