diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 99cd26cc4098..321f35722a01 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1927,6 +1927,12 @@ def validate_transform(transform_id): raise ValueError( "Incompatible input coder %s and output coder %s for transform %s" % (transform_id, input_coder, output_coder)) + elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn: + if not transform_proto.inputs: + raise ValueError("Missing input for transform: %s" % transform_proto) + elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn: + if not transform_proto.inputs: + raise ValueError("Missing input for transform: %s" % transform_proto) for t in transform_proto.subtransforms: validate_transform(t)