Skip to content

Commit

Permalink
Add --use_1_based_coordinate option to bq_to_vcf pipeline. (#636)
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov authored Jul 9, 2020
1 parent e7dd6da commit cc653d7
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 30 deletions.
1 change: 1 addition & 0 deletions gcp_variant_transforms/beam_io/vcf_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def _convert_to_variant(self, record):
self._verify_start_end(record)
return Variant(
reference_name=record.chrom.encode('utf-8'),
# record.pos is 1-based version of record.start (ie. record.start + 1).
start=record.pos if self._use_1_based_coordinate else record.start,
end=record.stop,
reference_bases=self._convert_field(record.ref),
Expand Down
65 changes: 53 additions & 12 deletions gcp_variant_transforms/beam_io/vcfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@
class _ToVcfRecordCoder(coders.Coder):
"""Coder for encoding :class:`Variant` objects as VCF text lines."""

def __init__(self, bq_uses_1_based_coordinate):
# type: (bool) -> None
"""Initialize _ToVcfRecordCoder PTransform.
Args:
bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ
1-based (default) or 0-based. To find out examine start_position column
description.
"""
self.bq_uses_1_based_coordinate = bq_uses_1_based_coordinate

def encode(self, variant):
# type: (Variant) -> str
"""Converts a :class:`Variant` object back to a VCF line."""
Expand All @@ -61,7 +72,9 @@ def encode(self, variant):
encoded_calls = self._encode_variant_calls(variant, format_keys)
columns = [
variant.reference_name,
None if variant.start is None else variant.start + 1,
(None if variant.start is None
else (variant.start if self.bq_uses_1_based_coordinate
else variant.start + 1)),
';'.join(variant.names),
variant.reference_bases,
','.join(variant.alternate_bases),
Expand All @@ -88,12 +101,15 @@ def _encode_value(self, value):
def _encode_variant_info(self, variant):
"""Encodes the info of a :class:`Variant` for a VCF file line."""
encoded_infos = []
# Set END in info if it doesn't match start+len(reference_bases). This is
# usually the case for non-variant regions.
start_0_based = (None if variant.start is None
else (variant.start - 1 if self.bq_uses_1_based_coordinate
else variant.start))
# Set END in info if it doesn't match len(reference_bases)+start in 0-based
# coordinate system. This is usually the case for non-variant regions.
if (variant.start is not None
and variant.reference_bases
and variant.end
and variant.start + len(variant.reference_bases) != variant.end):
and start_0_based + len(variant.reference_bases) != variant.end):
encoded_infos.append('END=%d' % variant.end)
# Set all other fields of info.
for k, v in variant.info.iteritems():
Expand Down Expand Up @@ -255,7 +271,7 @@ def __init__(self,
sample_name_encoding: specify how we want to encode sample_name mainly
to deal with same sample_name used across multiple VCF files.
use_1_based_coordinate: specify whether the coordinates should be stored
in BQ using 0 based exclusive (default) or 1 based inclusive coordinate.
in BQ using 0-based exclusive (default) or 1-based inclusive coordinate.
"""
self._input_files = input_files
self._representative_header_lines = representative_header_lines
Expand Down Expand Up @@ -330,7 +346,7 @@ def __init__(
sample_name_encoding: specify how we want to encode sample_name mainly
to deal with same sample_name used across multiple VCF files.
use_1_based_coordinate: specify whether the coordinates should be stored
in BQ using 0 based exclusive (default) or 1 based inclusive coordinate.
in BQ using 0-based exclusive (default) or 1-based inclusive coordinate.
"""
super(ReadFromVcf, self).__init__(**kwargs)

Expand Down Expand Up @@ -408,7 +424,7 @@ def __init__(
sample_name_encoding: specify how we want to encode sample_name mainly
to deal with same sample_name used across multiple VCF files.
use_1_based_coordinate: specify whether the coordinates should be stored
in BQ using 0 based exclusive (default) or 1 based inclusive coordinate.
in BQ using 0-based exclusive (default) or 1-based inclusive coordinate.
"""
super(ReadAllFromVcf, self).__init__(**kwargs)
source_from_file = partial(
Expand Down Expand Up @@ -436,7 +452,8 @@ def __init__(self,
file_path,
num_shards=1,
compression_type=CompressionTypes.AUTO,
headers=None):
headers=None,
bq_uses_1_based_coordinate=True):
# type: (str, int, str, List[str]) -> None
"""Initialize a WriteToVcf PTransform.
Expand All @@ -457,27 +474,39 @@ def __init__(self,
headers: A list of VCF meta-information lines describing the at least the
INFO and FORMAT entries in each record and a header line describing the
column names. These lines will be written at the beginning of the file.
bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ
1-based (default) or 0-based. To find out examine start_position column
description.
"""
self._file_path = file_path
self._num_shards = num_shards
self._compression_type = compression_type
self._header = headers and '\n'.join([h.strip() for h in headers]) + '\n'
self.bq_uses_1_based_coordinate = bq_uses_1_based_coordinate

def expand(self, pcoll):
return pcoll | 'WriteToVCF' >> textio.WriteToText(
self._file_path,
append_trailing_newlines=False,
num_shards=self._num_shards,
coder=_ToVcfRecordCoder(),
coder=_ToVcfRecordCoder(self.bq_uses_1_based_coordinate),
compression_type=self._compression_type,
header=self._header)


class _WriteVcfDataLinesFn(beam.DoFn):
"""A function that writes variants to one VCF file."""

def __init__(self):
self._coder = _ToVcfRecordCoder()
def __init__(self, bq_uses_1_based_coordinate):
# type: (bool) -> None
"""Initialize _WriteVcfDataLinesFn DoFn function.
Args:
bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ
1-based (default) or 0-based. To find out examine start_position column
description.
"""
self._coder = _ToVcfRecordCoder(bq_uses_1_based_coordinate)

def process(self, (file_path, variants), *args, **kwargs):
# type: (Tuple[str, List[Variant]]) -> None
Expand All @@ -493,5 +522,17 @@ class WriteVcfDataLines(PTransform):
writes `variants` to `file_path`. The PTransform `WriteToVcf` takes
PCollection<`Variant`> as input, and writes all variants to the same file.
"""
def __init__(self, bq_uses_1_based_coordinate):
# type: (bool) -> None
"""Initialize WriteVcfDataLines PTransform.
Args:
bq_uses_1_based_coordinate: specify whether the coordinates used to in BQ
1-based (default) or 0-based. To find out examine start_position column
description.
"""
self.bq_uses_1_based_coordinate = bq_uses_1_based_coordinate

def expand(self, pcoll):
return pcoll | 'WriteToVCF' >> beam.ParDo(_WriteVcfDataLinesFn())
return pcoll | 'WriteToVCF' >> beam.ParDo(_WriteVcfDataLinesFn(
self.bq_uses_1_based_coordinate))
69 changes: 53 additions & 16 deletions gcp_variant_transforms/beam_io/vcfio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def _get_sample_variant_1(file_name='', use_1_based_coordinate=False,
"""
hash_name_method = _get_hashing_function(file_name, use_hashing)
variant = vcfio.Variant(
reference_name='20', start=1234 if use_1_based_coordinate else 1233,
end=1234, reference_bases='C',
alternate_bases=['A', 'T'], names=['rs123', 'rs2'], quality=50,
filters=['PASS'], info={'AF': [0.5, 0.1], 'NS': 1, 'SVTYPE': ['BÑD']})
reference_name='20', start=1233 + use_1_based_coordinate, end=1234,
reference_bases='C', alternate_bases=['A', 'T'], names=['rs123', 'rs2'],
quality=50, filters=['PASS'],
info={'AF': [0.5, 0.1], 'NS': 1, 'SVTYPE': ['BÑD']})
variant.calls.append(
vcfio.VariantCall(sample_id=hash_name_method('Sample1'), genotype=[0, 0],
info={'GQ': 48}))
Expand All @@ -116,8 +116,8 @@ def _get_sample_variant_2(file_name='', use_1_based_coordinate=False,
hash_name_method = _get_hashing_function(file_name, use_hashing)
variant = vcfio.Variant(
reference_name='19',
start=123 if use_1_based_coordinate else 122, end=125,
reference_bases='GTC', alternate_bases=[], names=['rs1234'], quality=40,
start=122 + use_1_based_coordinate, end=125, reference_bases='GTC',
alternate_bases=[], names=['rs1234'], quality=40,
filters=['q10', 's50'], info={'NS': 2})
variant.calls.append(
vcfio.VariantCall(sample_id=hash_name_method('Sample1'), genotype=[-1, 0],
Expand All @@ -139,7 +139,7 @@ def _get_sample_variant_3(file_name='', use_1_based_coordinate=False,
"""
hash_name_method = _get_hashing_function(file_name, use_hashing)
variant = vcfio.Variant(
reference_name='19', start=12 if use_1_based_coordinate else 11, end=12,
reference_name='19', start=11 + use_1_based_coordinate, end=12,
reference_bases='C', alternate_bases=['<SYMBOLIC>'], quality=49,
filters=['q10'], info={'AF': [0.5]})
variant.calls.append(
Expand All @@ -152,11 +152,11 @@ def _get_sample_variant_3(file_name='', use_1_based_coordinate=False,
return variant


def _get_sample_non_variant():
def _get_sample_non_variant(use_1_based_coordinate=False):
"""Get sample non variant."""
non_variant = vcfio.Variant(
reference_name='19', start=1233, end=1236, reference_bases='C',
alternate_bases=['<NON_REF>'], quality=50)
reference_name='19', start=1233 + use_1_based_coordinate, end=1236,
reference_bases='C', alternate_bases=['<NON_REF>'], quality=50)
non_variant.calls.append(
vcfio.VariantCall(sample_id=hash_name('Sample1'), genotype=[0, 0],
info={'GQ': 99}))
Expand Down Expand Up @@ -859,10 +859,10 @@ def _assert_variant_lines_equal(self, actual, expected):
# Compare the rest of the items ignoring order
self.assertItemsEqual(actual_split[1:], expected_split[1:])

def _get_coder(self):
return vcfio._ToVcfRecordCoder()
def _get_coder(self, bq_uses_1_based_coordinate=False):
return vcfio._ToVcfRecordCoder(bq_uses_1_based_coordinate)

def test_to_vcf_line(self):
def test_to_vcf_line_0_based(self):
coder = self._get_coder()
for variant, line in zip(self.variants, self.variant_lines):
self._assert_variant_lines_equal(
Expand All @@ -872,6 +872,21 @@ def test_to_vcf_line(self):
self._assert_variant_lines_equal(
coder.encode(empty_variant), empty_line)

def test_to_vcf_line_1_based(self):
coder = self._get_coder(bq_uses_1_based_coordinate=True)
variants = [
_get_sample_variant_1(use_1_based_coordinate=True),
_get_sample_variant_2(use_1_based_coordinate=True),
_get_sample_variant_3(use_1_based_coordinate=True),
_get_sample_non_variant(use_1_based_coordinate=True)]
for variant, line in zip(variants, self.variant_lines):
self._assert_variant_lines_equal(
coder.encode(variant), line)
empty_variant = vcfio.Variant()
empty_line = '\t'.join(['.' for _ in range(9)])
self._assert_variant_lines_equal(
coder.encode(empty_variant), empty_line)

def test_missing_info_key(self):
coder = self._get_coder()
variant = Variant()
Expand Down Expand Up @@ -935,9 +950,29 @@ def test_triploid_genotype(self):

self._assert_variant_lines_equal(coder.encode(variant), expected)

def test_write_dataflow(self):
def test_write_dataflow_0_based(self):
pipeline = TestPipeline()
pcoll = pipeline | beam.Create(self.variants, reshuffle=False)
_ = pcoll | 'Write' >> vcfio.WriteToVcf(
self.path, bq_uses_1_based_coordinate=False)
pipeline.run()

read_result = []
for file_name in glob.glob(self.path + '*'):
with open(file_name, 'r') as f:
read_result.extend(f.read().splitlines())

for actual, expected in zip(read_result, self.variant_lines):
self._assert_variant_lines_equal(actual, expected)

def test_write_dataflow_1_based(self):
variants = [
_get_sample_variant_1(use_1_based_coordinate=True),
_get_sample_variant_2(use_1_based_coordinate=True),
_get_sample_variant_3(use_1_based_coordinate=True),
_get_sample_non_variant(use_1_based_coordinate=True)]
pipeline = TestPipeline()
pcoll = pipeline | beam.Create(variants, reshuffle=False)
_ = pcoll | 'Write' >> vcfio.WriteToVcf(self.path)
pipeline.run()

Expand All @@ -954,7 +989,8 @@ def test_write_dataflow_auto_compression(self):
pcoll = pipeline | beam.Create(self.variants, reshuffle=False)
_ = pcoll | 'Write' >> vcfio.WriteToVcf(
self.path + '.gz',
compression_type=CompressionTypes.AUTO)
compression_type=CompressionTypes.AUTO,
bq_uses_1_based_coordinate=False)
pipeline.run()

read_result = []
Expand All @@ -972,7 +1008,8 @@ def test_write_dataflow_header(self):
_ = pcoll | 'Write' >> vcfio.WriteToVcf(
self.path + '.gz',
compression_type=CompressionTypes.AUTO,
headers=headers)
headers=headers,
bq_uses_1_based_coordinate=False)
pipeline.run()

read_result = []
Expand Down
2 changes: 1 addition & 1 deletion gcp_variant_transforms/bq_to_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def _bigquery_to_vcf_shards(
beam.Map(_pair_variant_with_key, known_args.number_of_bases_per_shard)
| 'GroupVariantsByKey' >> beam.GroupByKey()
| beam.ParDo(_get_file_path_and_sorted_variants, vcf_data_temp_folder)
| vcfio.WriteVcfDataLines())
| vcfio.WriteVcfDataLines(known_args.bq_uses_1_based_coordinate))


def _get_schema(input_table):
Expand Down
12 changes: 12 additions & 0 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,18 @@ def add_arguments(self, parser):
'extracted variants to have the same sample ordering (usually '
'true for tables from single VCF file import).'))

parser.add_argument(
'--bq_uses_1_based_coordinate',
type='bool', default=True, nargs='?', const=True,
help=('Set to False, if --use_1_based_coordinate was set to False when '
'generating the BQ tables, and hence, start positions are '
'0-based. By default, imported BQ tables use 1-based coordinate. '
'Please examine your table''s start_position column description '
'to find out whether your variant tables uses 0-based or 1-based '
'coordinate.'))



def validate(self, parsed_args, client=None):
if not client:
credentials = GoogleCredentials.get_application_default().create_scoped(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
##fileformat=VCFv4.3
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##INFO=<ID=AF,Number=A,Type=Float,Description="Allele Frequency">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003
19 1234566 microsat1 GTCT G,GTACT 50.0 PASS END=1234570;AA=G;NS=3;DP=9 GT:DP:GQ 0/1:4:35 0/2:2:17 1/1:3:40
20 14369 rs6054257 G A 29.0 PASS END=14370;H2;NS=3;DB;DP=14;AF=0.5 GT:DP:GQ:HQ 0|0:1:48:51,51 1|0:8:48:51,51 1/1:5:43:.
20 17329 . T A 3.0 q10 END=17330;NS=3;DP=11;AF=0.017 GT:DP:GQ:HQ 0|0:3:49:58,50 0|1:5:3:65,3 0/0:3:41:.
20 1110695 rs6040355 A G,T 67.0 PASS END=1110696;AA=T;NS=2;DB;DP=10;AF=0.333,0.667 GT:DP:GQ:HQ 1|2:6:21:23,27 2|1:0:2:18,2 2/2:4:35:.
20 1230236 . T . 47.0 PASS END=1230237;AA=T;NS=3;DP=13 GT:DP:GQ:HQ 0|0:7:54:56,60 0|0:4:48:51,51 0/0:2:61:.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"test_name": "bq-to-vcf-no-options",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix",
"output_file_name": "bq_to_vcf_no_options.vcf",
"bq_uses_1_based_coordinate": false,
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/no_options.vcf"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_2__suffix",
"output_file_name": "bq_to_vcf_option_allow_incompatible_schema.vcf",
"allow_incompatible_schema": true,
"bq_uses_1_based_coordinate": false,
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_allow_incompatible_schema.vcf"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"output_file_name": "bq_to_vcf_option_customized_export.vcf",
"genomic_regions": "19:1234566-1234570 20:14369-17330",
"sample_names": "NA00001 NA00003",
"bq_uses_1_based_coordinate": false,
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_customized_export.vcf"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.platinum_NA12877_hg38_10K_lines__suffix",
"output_file_name": "bq_to_vcf_option_number_of_bases_per_shard.vcf",
"number_of_bases_per_shard": 100000,
"bq_uses_1_based_coordinate": false,
"runner": "DataflowRunner",
"expected_output_file": "gs://gcp-variant-transforms-testfiles/bq_to_vcf_expected_output/platinum_NA12877_hg38_10K_lines_v2.vcf"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.merge_option_move_to_calls__suffix",
"output_file_name": "bq_to_vcf_option_preserve_sample_order.vcf",
"preserve_sample_order": false,
"bq_uses_1_based_coordinate": false,
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_preserve_sample_order.vcf"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"representative_header_file": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf",
"preserve_sample_order": true,
"output_file_name": "bq_to_vcf_option_representative_header_file.vcf",
"bq_uses_1_based_coordinate": false,
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_representative_header_file.vcf"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"test_name": "bq-to-vcf-option-use-1-based-coordinate",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix",
"output_file_name": "bq_to_vcf_option_use_1_based_coordinate.vcf",
"bq_uses_1_based_coordinate": true,
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/option_use_1_based_coordinate.vcf"
}
]
Loading

0 comments on commit cc653d7

Please sign in to comment.