Authoring and Submitting Argo Workflows using Python
Python is a popular programming language for user writing Machine Learning workflows on Kubernetes.
Out of the box, Argo does not provide 1st-class support for Python. Instead, we provide Java, Golang, and Python API, clients.
But this is not enough for most users. Many users need an abstraction layer that adds components and use-case specific features.
Here are two options for you today.
Couler
Couler is a popular project that allows you a way to specify workflow in a platform-agnostic way, but it primarily supports Argo Workflows (with plans to support Kubeflow and AirFlow in the future):
Installation:
pip3 install git+https://github.com/couler-proj/couler
Example:
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitterdef random_code():
import randomres = "heads" if random.randint(0, 1) == 0 else "tails"
print(res)def flip_coin():
return couler.run_script(image="python:alpine3.6", source=random_code)def heads():
return couler.run_container(
image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']
)def tails():
return couler.run_container(
image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']
)result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())submitter = ArgoSubmitter()
couler.run(submitter=submitter)
This creates the following workflow:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: couler-example-
spec:
templates:
- name: couler-example
steps:
- - name: flip-coin-29
template: flip-coin
- - name: heads-31
template: heads
when: '{{steps.flip-coin-29.outputs.result}} == heads'
- name: tails-32
template: tails
when: '{{steps.flip-coin-29.outputs.result}} == tails'
- name: flip-coin
script:
name: ''
image: 'python:alpine3.6'
command:
- python
source: |import randomres = "heads" if random.randint(0, 1) == 0 else "tails"
print(res)
- name: heads
container:
image: 'alpine:3.6'
command:
- sh
- '-c'
- echo "it was heads"
- name: tails
container:
image: 'alpine:3.6'
command:
- sh
- '-c'
- echo "it was tails"
entrypoint: couler-example
ttlStrategy:
secondsAfterCompletion: 600
activeDeadlineSeconds: 300
KFP Compiler + Python Client
Argo Workflows is used as the engine for executing Kubeflow pipelines. You can define a Kubeflow pipeline and compile it directly to an Argo Workflow in Python.
Then you can use the Argo Python Client to submit the workflow t the Argo Server API.
This approach allows you to leverage existing Kubeflow components.
Install:
pip3 install kfp
pip3 install argo-workflows
Example:
import kfp as kfpdef flip_coin():
return kfp.dsl.ContainerOp(
name='Flip a coin',
image='python:alpine3.6',
command=['python', '-c', """
import random
res = "heads" if random.randint(0, 1) == 0 else "tails"
with open('/output', 'w') as f:
f.write(res)
"""],
file_outputs={'output': '/output'}
)def heads():
return kfp.dsl.ContainerOp(name='Heads', image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"'])def tails():
return kfp.dsl.ContainerOp(name='Tails', image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"'])@kfp.dsl.pipeline(name='Coin-flip', description='Flip a coin')
def coin_flip_pipeline():
flip = flip_coin()
with kfp.dsl.Condition(flip.output == 'heads'):
heads()
with kfp.dsl.Condition(flip.output == 'tails'):
tails()def main():
kfp.compiler.Compiler().compile(coin_flip_pipeline, __file__ + ".yaml")if __name__ == '__main__':
main()
Run this to create your workflow:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coin-flip-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-21T17:17:54.299235',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Flip a coin", "name":
"Coin-flip"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0}
spec:
entrypoint: coin-flip
templates:
- name: coin-flip
dag:
tasks:
- name: condition-1
template: condition-1
when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "heads"'
dependencies: [flip-a-coin]
- name: condition-2
template: condition-2
when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "tails"'
dependencies: [flip-a-coin]
- {name: flip-a-coin, template: flip-a-coin}
- name: condition-1
dag:
tasks:
- {name: heads, template: heads}
- name: condition-2
dag:
tasks:
- {name: tails, template: tails}
- name: flip-a-coin
container:
command:
- python
- -c
- "\nimport random\nres = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\
\nwith open('/output', 'w') as f:\n f.write(res) \n "
image: python:alpine3.6
outputs:
parameters:
- name: flip-a-coin-output
valueFrom: {path: /output}
artifacts:
- {name: flip-a-coin-output, path: /output}
- name: heads
container:
command: [sh, -c, echo "it was heads"]
image: alpine:3.6
- name: tails
container:
command: [sh, -c, echo "it was tails"]
image: alpine:3.6
arguments:
parameters: []
serviceAccountName: pipeline-runner
Note that this approach is unsupported by Kubeflow.
You can submit the above workflow using the client as follows:
import yaml
from argo.workflows.client import (ApiClient,
WorkflowServiceApi,
Configuration,
V1alpha1WorkflowCreateRequest)def main():
config = Configuration(host="http://localhost:2746")
client = ApiClient(configuration=config)
service = WorkflowServiceApi(api_client=client)with open("coin-flip.py.yaml") as f:
manifest: dict = yaml.safe_load(f)del manifest['spec']['serviceAccountName']service.create_workflow('argo', V1alpha1WorkflowCreateRequest(workflow=manifest))if __name__ == '__main__':
main()