diff --git a/gcp_variant_transforms/beam_io/vcf_parser.py b/gcp_variant_transforms/beam_io/vcf_parser.py index 3c13c7bcf..b1bdd0a97 100644 --- a/gcp_variant_transforms/beam_io/vcf_parser.py +++ b/gcp_variant_transforms/beam_io/vcf_parser.py @@ -549,6 +549,7 @@ def _convert_to_variant(self, record): self._verify_start_end(record) return Variant( reference_name=record.chrom.encode('utf-8'), + # record.pos is 1-based version of record.start (ie. record.start + 1). start=record.pos if self._use_1_based_coordinate else record.start, end=record.stop, reference_bases=self._convert_field(record.ref), diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index 581bd3ed3..bd71af3c2 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -53,6 +53,17 @@ class _ToVcfRecordCoder(coders.Coder): """Coder for encoding :class:`Variant` objects as VCF text lines.""" + def __init__(self, bq_uses_1_based_coordinate): + # type: (bool) -> None + """Initialize _ToVcfRecordCoder PTransform. + + Args: + bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ + 1-based (default) or 0-based. To find out examine start_position column + description. + """ + self.bq_uses_1_based_coordinate = bq_uses_1_based_coordinate + def encode(self, variant): # type: (Variant) -> str """Converts a :class:`Variant` object back to a VCF line.""" @@ -61,7 +72,9 @@ def encode(self, variant): encoded_calls = self._encode_variant_calls(variant, format_keys) columns = [ variant.reference_name, - None if variant.start is None else variant.start + 1, + (None if variant.start is None + else (variant.start if self.bq_uses_1_based_coordinate + else variant.start + 1)), ';'.join(variant.names), variant.reference_bases, ','.join(variant.alternate_bases), @@ -88,12 +101,15 @@ def _encode_value(self, value): def _encode_variant_info(self, variant): """Encodes the info of a :class:`Variant` for a VCF file line.""" encoded_infos = [] - # Set END in info if it doesn't match start+len(reference_bases). This is - # usually the case for non-variant regions. + start_0_based = (None if variant.start is None + else (variant.start - 1 if self.bq_uses_1_based_coordinate + else variant.start)) + # Set END in info if it doesn't match len(reference_bases)+start in 0-based + # coordinate system. This is usually the case for non-variant regions. if (variant.start is not None and variant.reference_bases and variant.end - and variant.start + len(variant.reference_bases) != variant.end): + and start_0_based + len(variant.reference_bases) != variant.end): encoded_infos.append('END=%d' % variant.end) # Set all other fields of info. for k, v in variant.info.iteritems(): @@ -255,7 +271,7 @@ def __init__(self, sample_name_encoding: specify how we want to encode sample_name mainly to deal with same sample_name used across multiple VCF files. use_1_based_coordinate: specify whether the coordinates should be stored - in BQ using 0 based exclusive (default) or 1 based inclusive coordinate. + in BQ using 0-based exclusive (default) or 1-based inclusive coordinate. """ self._input_files = input_files self._representative_header_lines = representative_header_lines @@ -330,7 +346,7 @@ def __init__( sample_name_encoding: specify how we want to encode sample_name mainly to deal with same sample_name used across multiple VCF files. use_1_based_coordinate: specify whether the coordinates should be stored - in BQ using 0 based exclusive (default) or 1 based inclusive coordinate. + in BQ using 0-based exclusive (default) or 1-based inclusive coordinate. """ super(ReadFromVcf, self).__init__(**kwargs) @@ -408,7 +424,7 @@ def __init__( sample_name_encoding: specify how we want to encode sample_name mainly to deal with same sample_name used across multiple VCF files. use_1_based_coordinate: specify whether the coordinates should be stored - in BQ using 0 based exclusive (default) or 1 based inclusive coordinate. + in BQ using 0-based exclusive (default) or 1-based inclusive coordinate. """ super(ReadAllFromVcf, self).__init__(**kwargs) source_from_file = partial( @@ -436,7 +452,8 @@ def __init__(self, file_path, num_shards=1, compression_type=CompressionTypes.AUTO, - headers=None): + headers=None, + bq_uses_1_based_coordinate=True): # type: (str, int, str, List[str]) -> None """Initialize a WriteToVcf PTransform. @@ -457,18 +474,22 @@ def __init__(self, headers: A list of VCF meta-information lines describing the at least the INFO and FORMAT entries in each record and a header line describing the column names. These lines will be written at the beginning of the file. + bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ + 1-based (default) or 0-based. To find out examine start_position column + description. """ self._file_path = file_path self._num_shards = num_shards self._compression_type = compression_type self._header = headers and '\n'.join([h.strip() for h in headers]) + '\n' + self.bq_uses_1_based_coordinate = bq_uses_1_based_coordinate def expand(self, pcoll): return pcoll | 'WriteToVCF' >> textio.WriteToText( self._file_path, append_trailing_newlines=False, num_shards=self._num_shards, - coder=_ToVcfRecordCoder(), + coder=_ToVcfRecordCoder(self.bq_uses_1_based_coordinate), compression_type=self._compression_type, header=self._header) @@ -476,8 +497,16 @@ def expand(self, pcoll): class _WriteVcfDataLinesFn(beam.DoFn): """A function that writes variants to one VCF file.""" - def __init__(self): - self._coder = _ToVcfRecordCoder() + def __init__(self, bq_uses_1_based_coordinate): + # type: (bool) -> None + """Initialize _WriteVcfDataLinesFn DoFn function. + + Args: + bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ + 1-based (default) or 0-based. To find out examine start_position column + description. + """ + self._coder = _ToVcfRecordCoder(bq_uses_1_based_coordinate) def process(self, (file_path, variants), *args, **kwargs): # type: (Tuple[str, List[Variant]]) -> None @@ -493,5 +522,17 @@ class WriteVcfDataLines(PTransform): writes `variants` to `file_path`. The PTransform `WriteToVcf` takes PCollection<`Variant`> as input, and writes all variants to the same file. """ + def __init__(self, bq_uses_1_based_coordinate): + # type: (bool) -> None + """Initialize WriteVcfDataLines PTransform. + + Args: + bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ + 1-based (default) or 0-based. To find out examine start_position column + description. + """ + self.bq_uses_1_based_coordinate = bq_uses_1_based_coordinate + def expand(self, pcoll): - return pcoll | 'WriteToVCF' >> beam.ParDo(_WriteVcfDataLinesFn()) + return pcoll | 'WriteToVCF' >> beam.ParDo(_WriteVcfDataLinesFn( + self.bq_uses_1_based_coordinate)) diff --git a/gcp_variant_transforms/beam_io/vcfio_test.py b/gcp_variant_transforms/beam_io/vcfio_test.py index b349b6900..ad774f02a 100644 --- a/gcp_variant_transforms/beam_io/vcfio_test.py +++ b/gcp_variant_transforms/beam_io/vcfio_test.py @@ -89,10 +89,10 @@ def _get_sample_variant_1(file_name='', use_1_based_coordinate=False, """ hash_name_method = _get_hashing_function(file_name, use_hashing) variant = vcfio.Variant( - reference_name='20', start=1234 if use_1_based_coordinate else 1233, - end=1234, reference_bases='C', - alternate_bases=['A', 'T'], names=['rs123', 'rs2'], quality=50, - filters=['PASS'], info={'AF': [0.5, 0.1], 'NS': 1, 'SVTYPE': ['BÑD']}) + reference_name='20', start=1233 + use_1_based_coordinate, end=1234, + reference_bases='C', alternate_bases=['A', 'T'], names=['rs123', 'rs2'], + quality=50, filters=['PASS'], + info={'AF': [0.5, 0.1], 'NS': 1, 'SVTYPE': ['BÑD']}) variant.calls.append( vcfio.VariantCall(sample_id=hash_name_method('Sample1'), genotype=[0, 0], info={'GQ': 48})) @@ -116,8 +116,8 @@ def _get_sample_variant_2(file_name='', use_1_based_coordinate=False, hash_name_method = _get_hashing_function(file_name, use_hashing) variant = vcfio.Variant( reference_name='19', - start=123 if use_1_based_coordinate else 122, end=125, - reference_bases='GTC', alternate_bases=[], names=['rs1234'], quality=40, + start=122 + use_1_based_coordinate, end=125, reference_bases='GTC', + alternate_bases=[], names=['rs1234'], quality=40, filters=['q10', 's50'], info={'NS': 2}) variant.calls.append( vcfio.VariantCall(sample_id=hash_name_method('Sample1'), genotype=[-1, 0], @@ -139,7 +139,7 @@ def _get_sample_variant_3(file_name='', use_1_based_coordinate=False, """ hash_name_method = _get_hashing_function(file_name, use_hashing) variant = vcfio.Variant( - reference_name='19', start=12 if use_1_based_coordinate else 11, end=12, + reference_name='19', start=11 + use_1_based_coordinate, end=12, reference_bases='C', alternate_bases=[''], quality=49, filters=['q10'], info={'AF': [0.5]}) variant.calls.append( @@ -152,11 +152,11 @@ def _get_sample_variant_3(file_name='', use_1_based_coordinate=False, return variant -def _get_sample_non_variant(): +def _get_sample_non_variant(use_1_based_coordinate=False): """Get sample non variant.""" non_variant = vcfio.Variant( - reference_name='19', start=1233, end=1236, reference_bases='C', - alternate_bases=[''], quality=50) + reference_name='19', start=1233 + use_1_based_coordinate, end=1236, + reference_bases='C', alternate_bases=[''], quality=50) non_variant.calls.append( vcfio.VariantCall(sample_id=hash_name('Sample1'), genotype=[0, 0], info={'GQ': 99})) @@ -859,10 +859,10 @@ def _assert_variant_lines_equal(self, actual, expected): # Compare the rest of the items ignoring order self.assertItemsEqual(actual_split[1:], expected_split[1:]) - def _get_coder(self): - return vcfio._ToVcfRecordCoder() + def _get_coder(self, bq_uses_1_based_coordinate=False): + return vcfio._ToVcfRecordCoder(bq_uses_1_based_coordinate) - def test_to_vcf_line(self): + def test_to_vcf_line_0_based(self): coder = self._get_coder() for variant, line in zip(self.variants, self.variant_lines): self._assert_variant_lines_equal( @@ -872,6 +872,21 @@ def test_to_vcf_line(self): self._assert_variant_lines_equal( coder.encode(empty_variant), empty_line) + def test_to_vcf_line_1_based(self): + coder = self._get_coder(bq_uses_1_based_coordinate=True) + variants = [ + _get_sample_variant_1(use_1_based_coordinate=True), + _get_sample_variant_2(use_1_based_coordinate=True), + _get_sample_variant_3(use_1_based_coordinate=True), + _get_sample_non_variant(use_1_based_coordinate=True)] + for variant, line in zip(variants, self.variant_lines): + self._assert_variant_lines_equal( + coder.encode(variant), line) + empty_variant = vcfio.Variant() + empty_line = '\t'.join(['.' for _ in range(9)]) + self._assert_variant_lines_equal( + coder.encode(empty_variant), empty_line) + def test_missing_info_key(self): coder = self._get_coder() variant = Variant() @@ -935,9 +950,29 @@ def test_triploid_genotype(self): self._assert_variant_lines_equal(coder.encode(variant), expected) - def test_write_dataflow(self): + def test_write_dataflow_0_based(self): pipeline = TestPipeline() pcoll = pipeline | beam.Create(self.variants, reshuffle=False) + _ = pcoll | 'Write' >> vcfio.WriteToVcf( + self.path, bq_uses_1_based_coordinate=False) + pipeline.run() + + read_result = [] + for file_name in glob.glob(self.path + '*'): + with open(file_name, 'r') as f: + read_result.extend(f.read().splitlines()) + + for actual, expected in zip(read_result, self.variant_lines): + self._assert_variant_lines_equal(actual, expected) + + def test_write_dataflow_1_based(self): + variants = [ + _get_sample_variant_1(use_1_based_coordinate=True), + _get_sample_variant_2(use_1_based_coordinate=True), + _get_sample_variant_3(use_1_based_coordinate=True), + _get_sample_non_variant(use_1_based_coordinate=True)] + pipeline = TestPipeline() + pcoll = pipeline | beam.Create(variants, reshuffle=False) _ = pcoll | 'Write' >> vcfio.WriteToVcf(self.path) pipeline.run() @@ -954,7 +989,8 @@ def test_write_dataflow_auto_compression(self): pcoll = pipeline | beam.Create(self.variants, reshuffle=False) _ = pcoll | 'Write' >> vcfio.WriteToVcf( self.path + '.gz', - compression_type=CompressionTypes.AUTO) + compression_type=CompressionTypes.AUTO, + bq_uses_1_based_coordinate=False) pipeline.run() read_result = [] @@ -972,7 +1008,8 @@ def test_write_dataflow_header(self): _ = pcoll | 'Write' >> vcfio.WriteToVcf( self.path + '.gz', compression_type=CompressionTypes.AUTO, - headers=headers) + headers=headers, + bq_uses_1_based_coordinate=False) pipeline.run() read_result = [] diff --git a/gcp_variant_transforms/bq_to_vcf.py b/gcp_variant_transforms/bq_to_vcf.py index 69d3e352d..2e4b4bf58 100644 --- a/gcp_variant_transforms/bq_to_vcf.py +++ b/gcp_variant_transforms/bq_to_vcf.py @@ -245,7 +245,7 @@ def _bigquery_to_vcf_shards( beam.Map(_pair_variant_with_key, known_args.number_of_bases_per_shard) | 'GroupVariantsByKey' >> beam.GroupByKey() | beam.ParDo(_get_file_path_and_sorted_variants, vcf_data_temp_folder) - | vcfio.WriteVcfDataLines()) + | vcfio.WriteVcfDataLines(known_args.bq_uses_1_based_coordinate)) def _get_schema(input_table): diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 35f566994..a2aa9a55b 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -637,6 +637,18 @@ def add_arguments(self, parser): 'extracted variants to have the same sample ordering (usually ' 'true for tables from single VCF file import).')) + parser.add_argument( + '--bq_uses_1_based_coordinate', + type='bool', default=True, nargs='?', const=True, + help=('Set to False, if --use_1_based_coordinate was set to False when ' + 'generating the BQ tables, and hence, start positions are ' + '0-based. By default, imported BQ tables use 1-based coordinate. ' + 'Please examine your table''s start_position column description ' + 'to find out whether your variant tables uses 0-based or 1-based ' + 'coordinate.')) + + + def validate(self, parsed_args, client=None): if not client: credentials = GoogleCredentials.get_application_default().create_scoped( diff --git a/gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_use_1_based_coordinate.vcf b/gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_use_1_based_coordinate.vcf new file mode 100644 index 000000000..61571fa17 --- /dev/null +++ b/gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_use_1_based_coordinate.vcf @@ -0,0 +1,16 @@ +##fileformat=VCFv4.3 +##INFO= +##INFO= +##INFO= +##INFO= +##INFO= +##INFO= +##FORMAT= +##FORMAT= +##FORMAT= +#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003 +19 1234566 microsat1 GTCT G,GTACT 50.0 PASS END=1234570;AA=G;NS=3;DP=9 GT:DP:GQ 0/1:4:35 0/2:2:17 1/1:3:40 +20 14369 rs6054257 G A 29.0 PASS END=14370;H2;NS=3;DB;DP=14;AF=0.5 GT:DP:GQ:HQ 0|0:1:48:51,51 1|0:8:48:51,51 1/1:5:43:. +20 17329 . T A 3.0 q10 END=17330;NS=3;DP=11;AF=0.017 GT:DP:GQ:HQ 0|0:3:49:58,50 0|1:5:3:65,3 0/0:3:41:. +20 1110695 rs6040355 A G,T 67.0 PASS END=1110696;AA=T;NS=2;DB;DP=10;AF=0.333,0.667 GT:DP:GQ:HQ 1|2:6:21:23,27 2|1:0:2:18,2 2/2:4:35:. +20 1230236 . T . 47.0 PASS END=1230237;AA=T;NS=3;DP=13 GT:DP:GQ:HQ 0|0:7:54:56,60 0|0:4:48:51,51 0/0:2:61:. diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/no_options.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/no_options.json index eea5b326a..b68f0c914 100644 --- a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/no_options.json +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/no_options.json @@ -3,6 +3,7 @@ "test_name": "bq-to-vcf-no-options", "input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix", "output_file_name": "bq_to_vcf_no_options.vcf", + "bq_uses_1_based_coordinate": false, "runner": "DirectRunner", "expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/no_options.vcf" } diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_allow_incompatible_schema.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_allow_incompatible_schema.json index c32d05c44..eaa7f8f25 100644 --- a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_allow_incompatible_schema.json +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_allow_incompatible_schema.json @@ -4,6 +4,7 @@ "input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_2__suffix", "output_file_name": "bq_to_vcf_option_allow_incompatible_schema.vcf", "allow_incompatible_schema": true, + "bq_uses_1_based_coordinate": false, "runner": "DirectRunner", "expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_allow_incompatible_schema.vcf" } diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_customized_export.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_customized_export.json index ccf0119bb..fdcbe3716 100644 --- a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_customized_export.json +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_customized_export.json @@ -5,6 +5,7 @@ "output_file_name": "bq_to_vcf_option_customized_export.vcf", "genomic_regions": "19:1234566-1234570 20:14369-17330", "sample_names": "NA00001 NA00003", + "bq_uses_1_based_coordinate": false, "runner": "DirectRunner", "expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_customized_export.vcf" } diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_number_of_bases_per_shard.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_number_of_bases_per_shard.json index 20786523d..d471b8adc 100644 --- a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_number_of_bases_per_shard.json +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_number_of_bases_per_shard.json @@ -4,6 +4,7 @@ "input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.platinum_NA12877_hg38_10K_lines__suffix", "output_file_name": "bq_to_vcf_option_number_of_bases_per_shard.vcf", "number_of_bases_per_shard": 100000, + "bq_uses_1_based_coordinate": false, "runner": "DataflowRunner", "expected_output_file": "gs://gcp-variant-transforms-testfiles/bq_to_vcf_expected_output/platinum_NA12877_hg38_10K_lines_v2.vcf" } diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_preserve_sample_order.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_preserve_sample_order.json index 45842cc97..ecaef16f9 100644 --- a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_preserve_sample_order.json +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_preserve_sample_order.json @@ -4,6 +4,7 @@ "input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.merge_option_move_to_calls__suffix", "output_file_name": "bq_to_vcf_option_preserve_sample_order.vcf", "preserve_sample_order": false, + "bq_uses_1_based_coordinate": false, "runner": "DirectRunner", "expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_preserve_sample_order.vcf" } diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_representative_header_file.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_representative_header_file.json index a660866ef..c98f6c432 100644 --- a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_representative_header_file.json +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_representative_header_file.json @@ -5,6 +5,7 @@ "representative_header_file": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf", "preserve_sample_order": true, "output_file_name": "bq_to_vcf_option_representative_header_file.vcf", + "bq_uses_1_based_coordinate": false, "runner": "DirectRunner", "expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_representative_header_file.vcf" } diff --git a/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_use_1_based_coordinate.json b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_use_1_based_coordinate.json new file mode 100644 index 000000000..a3487020a --- /dev/null +++ b/gcp_variant_transforms/testing/integration/bq_to_vcf_tests/option_use_1_based_coordinate.json @@ -0,0 +1,10 @@ +[ + { + "test_name": "bq-to-vcf-option-use-1-based-coordinate", + "input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix", + "output_file_name": "bq_to_vcf_option_use_1_based_coordinate.vcf", + "bq_uses_1_based_coordinate": true, + "runner": "DirectRunner", + "expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_use_1_based_coordinate.vcf" + } +] diff --git a/gcp_variant_transforms/transforms/write_variants_to_shards.py b/gcp_variant_transforms/transforms/write_variants_to_shards.py index 4474b6702..9cb5ce294 100644 --- a/gcp_variant_transforms/transforms/write_variants_to_shards.py +++ b/gcp_variant_transforms/transforms/write_variants_to_shards.py @@ -37,7 +37,12 @@ def __init__(self, vcf_shards_output_dir, number_of_variants_per_shard): """ self._vcf_shards_output_dir = vcf_shards_output_dir self._number_of_variants_per_shard = number_of_variants_per_shard - self._coder = vcfio._ToVcfRecordCoder() + # Write shards as is. + # This piece of code is triggered from vcf_to_bq._shard_variants() which + # reads variants with the --use_1_based_coordinate=False option, regardless + # if the flag was passed with that value or not. Therefore, write the shards + # into BQ with 0-based coordinate assumption. + self._coder = vcfio._ToVcfRecordCoder(bq_uses_1_based_coordinate=False) self._sample_names = [] self._variant_lines = [] self._counter = 0