diff --git a/sdks/python/apache_beam/yaml/README.md b/sdks/python/apache_beam/yaml/README.md index 3ba78784c997..62c0d0eea162 100644 --- a/sdks/python/apache_beam/yaml/README.md +++ b/sdks/python/apache_beam/yaml/README.md @@ -166,41 +166,42 @@ Here we read two sources, join them, and write two outputs. ``` pipeline: - - type: ReadFromCsv - name: ReadLeft - config: - path: /path/to/left*.csv + transforms: + - type: ReadFromCsv + name: ReadLeft + config: + path: /path/to/left*.csv - - type: ReadFromCsv - name: ReadRight - config: - path: /path/to/right*.csv + - type: ReadFromCsv + name: ReadRight + config: + path: /path/to/right*.csv - - type: Sql - config: - query: select left.col1, right.col2 from left join right using (col3) - input: - left: ReadLeft - right: ReadRight - - - type: WriteToJson - name: WriteAll - input: Sql - config: - path: /path/to/all.json + - type: Sql + config: + query: select left.col1, right.col2 from left join right using (col3) + input: + left: ReadLeft + right: ReadRight - - type: Filter - name: FilterToBig - input: Sql - config: - language: python - keep: "col2 > 100" + - type: WriteToJson + name: WriteAll + input: Sql + config: + path: /path/to/all.json - - type: WriteToCsv - name: WriteBig - input: FilterToBig - config: - path: /path/to/big.csv + - type: Filter + name: FilterToBig + input: Sql + config: + language: python + keep: "col2 > 100" + + - type: WriteToCsv + name: WriteBig + input: FilterToBig + config: + path: /path/to/big.csv ``` One can, however, nest `chains` within a non-linear pipeline. @@ -209,49 +210,50 @@ that has a single input and contains its own sink. ``` pipeline: - - type: ReadFromCsv - name: ReadLeft - config: - path: /path/to/left*.csv + transforms: + - type: ReadFromCsv + name: ReadLeft + config: + path: /path/to/left*.csv - - type: ReadFromCsv - name: ReadRight - config: - path: /path/to/right*.csv + - type: ReadFromCsv + name: ReadRight + config: + path: /path/to/right*.csv - - type: Sql - config: - query: select left.col1, right.col2 from left join right using (col3) - input: - left: ReadLeft - right: ReadRight - - - type: WriteToJson - name: WriteAll - input: Sql - config: - path: /path/to/all.json + - type: Sql + config: + query: select left.col1, right.col2 from left join right using (col3) + input: + left: ReadLeft + right: ReadRight - - type: chain - name: ExtraProcessingForBigRows - input: Sql - transforms: - - type: Filter - config: - language: python - keep: "col2 > 100" - - type: Filter - config: - language: python - keep: "len(col1) > 10" - - type: Filter - config: - language: python - keep: "col1 > 'z'" - sink: - type: WriteToCsv + - type: WriteToJson + name: WriteAll + input: Sql config: - path: /path/to/big.csv + path: /path/to/all.json + + - type: chain + name: ExtraProcessingForBigRows + input: Sql + transforms: + - type: Filter + config: + language: python + keep: "col2 > 100" + - type: Filter + config: + language: python + keep: "len(col1) > 10" + - type: Filter + config: + language: python + keep: "col1 > 'z'" + sink: + type: WriteToCsv + config: + path: /path/to/big.csv ``` ## Windowing @@ -329,25 +331,26 @@ a join per window. ``` pipeline: - - type: ReadFromPubSub - name: ReadLeft - config: - topic: leftTopic + transforms: + - type: ReadFromPubSub + name: ReadLeft + config: + topic: leftTopic - - type: ReadFromPubSub - name: ReadRight - config: - topic: rightTopic + - type: ReadFromPubSub + name: ReadRight + config: + topic: rightTopic - - type: Sql - config: - query: select left.col1, right.col2 from left join right using (col3) - input: - left: ReadLeft - right: ReadRight - windowing: - type: fixed - size: 60 + - type: Sql + config: + query: select left.col1, right.col2 from left join right using (col3) + input: + left: ReadLeft + right: ReadRight + windowing: + type: fixed + size: 60 ``` For a transform with no inputs, the specified windowing is instead applied to diff --git a/sdks/python/apache_beam/yaml/pipeline.schema.yaml b/sdks/python/apache_beam/yaml/pipeline.schema.yaml index ef0d9fe0f262..e784531d9be1 100644 --- a/sdks/python/apache_beam/yaml/pipeline.schema.yaml +++ b/sdks/python/apache_beam/yaml/pipeline.schema.yaml @@ -15,7 +15,7 @@ # limitations under the License. # -$schema: 'http://json-schema.org/schema#' +$schema: 'http://json-schema.org/draft-07/schema#' $id: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml/pipeline.schema.yaml $defs: @@ -115,6 +115,23 @@ $defs: - $ref: '#/$defs/nestedTransform' - $ref: '#/$defs/implicitInputOutputs' + - if: + not: + anyOf: + - properties: { type: { const: composite }} + - properties: { type: { const: chain }} + then: + properties: + type: {} + name: {} + input: {} + output: {} + windowing: {} + config: { type: object } + __line__: {} + __uuid__: {} + additionalProperties: false + windowing: {} # TODO provider: @@ -128,27 +145,38 @@ $defs: properties: { __line__: {}} additionalProperties: type: string + config: { type: object } + additionalProperties: false required: - type - transforms + - config type: object properties: pipeline: - anyOf: - - type: array - items: - $ref: '#/$defs/transform' - - $ref: '#/$defs/transform' + allOf: + # These are the only top-level properties defined in pipeline. - type: object properties: - transforms: - type: array - items: - $ref: '#/$defs/transform' + type: { const: chain } + windowing: + $ref: '#/$defs/windowing' + transforms: {} + extra_transforms: {} + sink: {} + source: {} __line__: {} __uuid__: {} additionalProperties: false + # This defines the allowable contents of the attributes above. + - $ref: '#/$defs/nestedTransform' + # A chain-type transform, like a chain composite, must have implicit io. + - if: + properties: { type: { const: chain }} + required: [type] + then: + $ref: '#/$defs/implicitInputOutputs' providers: type: array items: