diff --git a/README.md b/README.md index c3ebd9f..4227acb 100644 --- a/README.md +++ b/README.md @@ -64,10 +64,10 @@ filename from edi ) fgs ) trnx -) clms +) clms; --Create a "Claims Header" table - +drop table if exists claim_header; create table claim_header as select filename, tax_id, @@ -75,10 +75,26 @@ sender, transaction_type, clms.claim_header.*, clms.diagnosis.*, -clms.patient.*, clms.payer.*, -clms.providers.* -from stg_claims +clms.providers.*, + clms.patient.name as patient_name, + clms.patient.patient_relationship_cd, + clms.patient.street as patient_street, + clms.patient.city as patient_city, + clms.patient.zip as patient_zip, + clms.patient.dob as patient_dob, + clms.patient.dob_format as patient_dob_format, + clms.patient.gender_cd as patient_gender_cd, + clms.subscriber.subsciber_identifier, + clms.subscriber.name as subscriber_name, + clms.subscriber.subscriber_relationship_cd, + clms.subscriber.street as subscriber_street, + clms.subscriber.city as subscriber_city, + clms.subscriber.zip as subscriber_zip, + clms.subscriber.dob as subscriber_dob, + clms.subscriber.dob_format as subscriber_dob_format, + clms.subscriber.gender_cd as subscriber_gender_cd +from stg_claims; --Create a "Claim Line" table create table claim_line as @@ -97,6 +113,26 @@ from stg_claims ![image](images/claim_header.png?raw=true) ![image](images/claim_line.png?raw=true) +### 835 sample + +```python +df = spark.read.text("sampledata/835/*txt", wholetext = True) + +rdd = ( + df.withColumn("filename", input_file_name()).rdd + .map(lambda x: (x.asDict().get("filename"),x.asDict().get("value"))) + .map(lambda x: (x[0], EDI(x[1]))) + .map(lambda x: { **{'filename': x[0]}, **hm.to_json(x[1])} ) + .map(lambda x: json.dumps(x)) +) +claims = spark.read.json(rdd) + +#Create Claims tables from the EDI transactions +#... +``` +![image](images/remittance.png?raw=true) + + ## Different EDI Formats Default format used is AnsiX12 (* as a delim and ~ as segment separator) diff --git a/databricksx12/edi.py b/databricksx12/edi.py index cd74cf9..cb61080 100644 --- a/databricksx12/edi.py +++ b/databricksx12/edi.py @@ -29,21 +29,25 @@ def segment_count(self): # # Returns all segments matching segment_name # - def segments_by_name(self, segment_name, range_start=-1, range_end=None): - return [x for i,x in enumerate(self.data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(self.data))] + def segments_by_name(self, segment_name, range_start=-1, range_end=None, data = None): + if data is None: + data = self.data + return [x for i,x in enumerate(data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(data))] # # Returns a tuple of all segments matching segment_name and their index # - def segments_by_name_index(self, segment_name, range_start=-1, range_end = None): - return [(i,x) for i,x in enumerate(self.data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(self.data))] + def segments_by_name_index(self, segment_name, range_start=-1, range_end = None, data = None): + if data is None: + data = self.data + return [(i,x) for i,x in enumerate(data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(data))] # # Return the first occurence of the specified index # - def index_of_segment(self, segments, segment_name): + def index_of_segment(self, segments, segment_name, search_start_idx=0): try: - return min([(i) for i,x in enumerate(segments) if x.segment_name() == segment_name]) + return min([(i) for i,x in enumerate(segments) if x.segment_name() == segment_name and i >=search_start_idx]) except: return -1 #not found diff --git a/databricksx12/hls/claim.py b/databricksx12/hls/claim.py index 654f812..6b394b9 100644 --- a/databricksx12/hls/claim.py +++ b/databricksx12/hls/claim.py @@ -2,8 +2,113 @@ from databricksx12.hls.loop import Loop from databricksx12.hls.identities import * from typing import List, Dict +import functools from collections import defaultdict + +# +# Base claim builder (transaction -> 1 or more claims) +# + + +class ClaimBuilder(EDI): + # + # Given claim type (837i, 837p, etc), segments, and delim class, build claim level classes + # + def __init__(self, trnx_type_cls, trnx_data, delim_cls=AnsiX12Delim): + self.data = trnx_data + self.format_cls = delim_cls + self.trnx_cls = trnx_type_cls + self.loop = Loop(trnx_data) + + # + # Builds a claim object from + # + # @param clm_segment - the claim segment of claim to build + # @param idx - the index of the claim segment in the data + # + # @return the class containing the relevent claim information + # + def build_claim(self, clm_segment, idx): + return self.trnx_cls( + sender_receiver_loop=self.get_submitter_receiver_loop(idx), + billing_loop=self.loop.get_loop_segments(idx, "2000A"), + subscriber_loop=self.loop.get_loop_segments(idx, "2000B"), + patient_loop=self.loop.get_loop_segments(idx, "2000C"), + claim_loop=self.get_claim_loop(idx), + sl_loop=self.get_service_line_loop(idx), # service line loop + ) + + # + # https://datainsight.health/edi/payments/dollars-separate/ + # trx_header_loop = 0000 + # payer_loop = 1000A + # payee_loop = 1000B + # clm_payment_loop = 2100 + # srv_payment_loop = 2110 + def build_remittance(self, pay_segment, idx): + return self.trnx_cls(trx_header_loop = self.data[0:self.index_of_segment(self.data, "N1")] + ,payer_loop = self.data[self.index_of_segment(self.data, "N1"):self.index_of_segment(self.data, "N1", self.index_of_segment(self.data, "N1")+1)] + ,payee_loop = self.data[self.index_of_segment(self.data, "N1", self.index_of_segment(self.data, "N1")+1): self.index_of_segment(self.data, "LX")] + ,clm_loop = self.data[idx:min( + self.index_of_segment(self.data, "LX", idx+1), #next LX OR CLP or end + self.index_of_segment(self.data, "CLP", idx+1), + self.index_of_segment(self.data, "SE", idx+1) + )] + ) + + # + # Determine claim loop: starts at the clm index and ends at LX segment, or CLM segment, or end of data + # + def get_claim_loop(self, clm_idx): + sl_start_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("LX")))) + clm_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("CLM")))) + + if sl_start_indexes: + clm_end_idx = min(sl_start_indexes) + elif clm_indexes: + clm_end_idx = min(clm_indexes + [len(self.data)]) + else: + clm_end_idx = len(self.data) + + return self.data[clm_idx:clm_end_idx] + + # + # fetch the indices of LX and CLM segments that are beyond the current clm index + # + def get_service_line_loop(self, clm_idx): + sl_start_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("LX")))) + tx_end_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("SE")))) + if sl_start_indexes: + sl_end_idx = min(tx_end_indexes + [len(self.data)]) + return self.data[min(sl_start_indexes):sl_end_idx] + return [] + + def get_submitter_receiver_loop(self, clm_idx): + bht_start_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] < clm_idx, self.segments_by_name_index("BHT")))) + bht_end_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] < clm_idx and x[1].element(3) == '20', self.segments_by_name_index("HL")))) + if bht_start_indexes: + sub_rec_start_idx = max(bht_start_indexes) + sub_rec_end_idx = max(bht_end_indexes) + + return self.data[sub_rec_start_idx:sub_rec_end_idx] + return [] + + + # + # Given transaction type, transaction segments, and delim info, build out claims in the transaction + # @return a list of Claim for each "clm" segment + # + def build(self): + if self.trnx_cls.NAME in ['837I', '837P']: + return [ + self.build_claim(seg, i) for i, seg in self.segments_by_name_index("CLM") + ] + elif self.trnx_cls.NAME == '835': + return [ + self.build_remittance(seg, i) for i, seg in self.segments_by_name_index("CLP") + ] + # # Base claim class # @@ -30,8 +135,8 @@ def __init__( # # Return first segment found of name == name otherwise Segment.empty() # - def _first(self, segments, name): - return ([x for x in segments if x.segment_name() == name][0] if len([x for x in segments if x.segment_name() == name]) > 0 else Segment.empty()) + def _first(self, segments, name, start_index = 0): + return ([x for x in segments[start_index:] if x.segment_name() == name][0] if len([x for x in segments[start_index:] if x.segment_name() == name]) > 0 else Segment.empty()) def _populate_providers(self): return {"billing": self._billing_provider()} @@ -42,7 +147,6 @@ def _billing_provider(self): n4=self._first(self.billing_loop, "N4"), ref=self._first(self.billing_loop, "REF")) - def _populate_diagnosis(self): return DiagnosisIdentity([x for x in self.claim_loop if x.segment_name() == "HI"]) @@ -199,82 +303,119 @@ def _populate_sl_loop(self, missing=""): def _populate_patient_loop(self): pass + # -# Base claim builder (transaction -> 1 or more claims) +# 835 payment information +# https://datainsight.health/edi/payments/with-discount/ # +class Remittance(MedicalClaim): + NAME = "835" + + def __init__(self, + trx_header_loop, + payer_loop, + payee_loop, + clm_loop): + self.trx_header_loop = trx_header_loop + self.payer_loop = payer_loop + self.payee_loop = payee_loop + self.clm_loop = clm_loop + self.build() -class ClaimBuilder(EDI): - # - # Given claim type (837i, 837p, etc), segments, and delim class, build claim level classes - # - def __init__(self, trnx_type_cls, trnx_data, delim_cls=AnsiX12Delim): - self.data = trnx_data - self.format_cls = delim_cls - self.trnx_cls = trnx_type_cls - self.loop = Loop(trnx_data) + def build(self): + self.trx_header_info = self.populate_trx_loop() + self.payer_info = self.populate_payer_loop() + self.payee_info = self.populate_payee_loop() + self.clm_info = self.populate_claim_loop() - # - # Builds a claim object from - # - # @param clm_segment - the claim segment of claim to build - # @param idx - the index of the claim segment in the data - # - # @return the clas containing the relevent claim information - # - def build_claim(self, clm_segment, idx): - return self.trnx_cls( - sender_receiver_loop=self.get_submitter_receiver_loop(idx), - billing_loop=self.loop.get_loop_segments(idx, "2000A"), - subscriber_loop=self.loop.get_loop_segments(idx, "2000B"), - patient_loop=self.loop.get_loop_segments(idx, "2000C"), - claim_loop=self.get_claim_loop(idx), - sl_loop=self.get_service_line_loop(idx), # service line loop - ) + def populate_payer_loop(self): + return { + 'entity_id_cd': self._first(self.payer_loop, "N1").element(1), + 'payer_name': self._first(self.payer_loop, "N1").element(2), + 'payer_street': self._first(self.payer_loop, "N3").element(1), + 'payer_city': self._first(self.payer_loop, "N4").element(1), + 'payer_state': self._first(self.payer_loop, "N4").element(2), + 'payer_zip': self._first(self.payer_loop, "N4").element(3), + 'payer_contact_name': self._first(self.payer_loop, "PER").element(2), + 'payer_contact_function_cd': self._first(self.payer_loop, "PER").element(1), + 'payer_contact_number': self._first(self.payer_loop, "PER").element(4) + } - # - # Determine claim loop: starts at the clm index and ends at LX segment, or CLM segment, or end of data - # - def get_claim_loop(self, clm_idx): - sl_start_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("LX")))) - clm_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("CLM")))) + def populate_payee_loop(self): + return { + 'payee_name': self._first(self.payer_loop, "N1").element(2), + 'payee_npi': self._first(self.payer_loop, "N1").element(3), + 'payee_id_cd': self._first(self.payer_loop, "N1").element(4) + } + + def populate_trx_loop(self): + return { + 'transaction_handling_cd': self._first(self.trx_header_loop,"BPR").element(1), + 'pay_amt': self._first(self.trx_header_loop,"BPR").element(2), + 'credit_debit_flag': self._first(self.trx_header_loop,"BPR").element(3), + 'origin_company_id': self._first(self.trx_header_loop,"BPR").element(10) + } - if sl_start_indexes: - clm_end_idx = min(sl_start_indexes) - elif clm_indexes: - clm_end_idx = min(clm_indexes + [len(self.data)]) - else: - clm_end_idx = len(self.data) - - return self.data[clm_idx:clm_end_idx] + def populate_claim_loop(self): + return { + 'claim_id': self._first(self.clm_loop,"CLP").element(1), + 'claim_status_cd': self._first(self.clm_loop,"CLP").element(2), + 'claim_chrg_amt': self._first(self.clm_loop,"CLP").element(3), + 'claim_pay_amt': self._first(self.clm_loop,"CLP").element(4), + 'patient_pay_amt': self._first(self.clm_loop,"CLP").element(5), + 'claim_filing_cd': self._first(self.clm_loop,"CLP").element(6), + 'payer_claim_id': self._first(self.clm_loop,"CLP").element(7), + 'facility_type_cd': self._first(self.clm_loop,"CLP").element(8), + 'claim_freq_cd': self._first(self.clm_loop,"CLP").element(9), + 'patient_entity_id_cd': self._first(self.clm_loop,"NM1").element(1), + 'entity_type_qualifier': self._first(self.clm_loop,"NM1").element(2), + 'patient_last_nm': self._first(self.clm_loop,"NM1").element(4), + 'patient_first_nm': self._first(self.clm_loop,"NM1").element(5), + 'id_code_qualifier': self._first(self.clm_loop,"NM1").element(9), + 'patient_id': self._first(self.clm_loop,"NM1").element(10), + 'provider_adjustment_id': self._first(self.clm_loop,"PLB").element(1), + 'provider_adjustment_date': self._first(self.clm_loop,"PLB").element(2), + 'provider_adjustment_reason_cd': self._first(self.clm_loop,"PLB").element(3), + 'provider_adjustment_amt': self._first(self.clm_loop,"PLB").element(4), + 'claim_lines': [self.populate_claim_line(seg, i, min(self.index_of_segment(self.clm_loop, 'SVC', i+1), len(self.clm_loop)-1)) for i,seg in self.segments_by_name_index(segment_name="SVC", data=self.clm_loop)] + } # - # fetch the indices of LX and CLM segments that are beyond the current clm index + # @parma svc - the svc segment for the service rendered + # @param idx - the index where the svc is found within self.clm_loop + # @param svc_end_idx - the last segment associated witht he service # - def get_service_line_loop(self, clm_idx): - sl_start_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("LX")))) - tx_end_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] > clm_idx, self.segments_by_name_index("SE")))) - if sl_start_indexes: - sl_end_idx = min(tx_end_indexes + [len(self.data)]) - return self.data[min(sl_start_indexes):sl_end_idx] - return [] - - def get_submitter_receiver_loop(self, clm_idx): - bht_start_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] < clm_idx, self.segments_by_name_index("BHT")))) - bht_end_indexes = list(map(lambda x: x[0], filter(lambda x: x[0] < clm_idx and x[1].element(3) == '20', self.segments_by_name_index("HL")))) - if bht_start_indexes: - sub_rec_start_idx = max(bht_start_indexes) - sub_rec_end_idx = max(bht_end_indexes) - - return self.data[sub_rec_start_idx:sub_rec_end_idx] - return [] - + def populate_claim_line(self, svc, idx, svc_end_idx): + return { + 'prcdr_cd':svc.element(1), + 'chrg_amt':svc.element(2), + 'paid_amt':svc.element(3), + 'rev_cd':svc.element(4), + 'units': svc.element(5), + 'original_prcdr_cd':svc.element(6), + 'service_date_qualifier_cd': self._first(self.clm_loop, "DTM", idx).element(1), + 'service_date': self._first(self.clm_loop, "DTM", idx).element(2), + 'service_date': self._first(self.clm_loop, "DTM", idx).element(3), + 'service_adjustments': functools.reduce(lambda x,y: x+y,[ + self.populate_adjustment_groups(x) for x in self.segments_by_name("CAS", data = self.clm_loop[idx:svc_end_idx]) + ]+[[]]), + 'amt_qualifier_cd': self._first(self.clm_loop, "AMT", idx).element(1), + 'servie_line_amt': self._first(self.clm_loop, "AMT", idx).element(2) + } # - # Given transaction type, transaction segments, and delim info, build out claims in the transaction - # @return a list of Claim for each "clm" segment + # group adjustment logic # - def build(self): - return [ - self.build_claim(seg, i) for i, seg in self.segments_by_name_index("CLM") - ] + def populate_adjustment_groups(self, cas): + return [{'grp_cd': cas.element(i), 'reason_cd': cas.element(i+1), 'amount': cas.element(i+2)} for i in list(range(1, cas.segment_len(), 3))] + + def to_json(self): + return { + **{'payment': self.trx_header_info}, + **{'payer': self.payer_info}, + **{'payee': self.payee_info}, + **{'claim': self.clm_info} + } + + diff --git a/databricksx12/hls/healthcare.py b/databricksx12/hls/healthcare.py index dc69885..2b18020 100644 --- a/databricksx12/hls/healthcare.py +++ b/databricksx12/hls/healthcare.py @@ -6,7 +6,7 @@ class HealthcareManager(EDI): def __init__(self, mapping = { - "221": None, # Remittance "835" + "221": Remittance, # Remittance "835" "222": Claim837p, "223": Claim837i, "224": None #Dental diff --git a/databricksx12/hls/identities.py b/databricksx12/hls/identities.py index 45f00fe..6a41250 100644 --- a/databricksx12/hls/identities.py +++ b/databricksx12/hls/identities.py @@ -38,6 +38,7 @@ def __init__(self, nm1): class PatientIdentity(Identity): def __init__(self, nm1, n3, n4, dmg, pat, sbr): + self.subsciber_identifier = nm1.element(9) self.name = ' '.join([nm1.element(3), nm1.element(4), nm1.element(5)]) self.patient_relationship_cd = pat.element(1) self.subscriber_relationship_cd = sbr.element(2) diff --git a/images/remittance.png b/images/remittance.png new file mode 100644 index 0000000..89c5e59 Binary files /dev/null and b/images/remittance.png differ diff --git a/notebooks/Staging Claims.py b/notebooks/Staging Claims.py new file mode 100644 index 0000000..c23fc97 --- /dev/null +++ b/notebooks/Staging Claims.py @@ -0,0 +1,217 @@ +# Databricks notebook source +# MAGIC %md # 837I and 837P + +# COMMAND ---------- + +from databricksx12 import * +from databricksx12.hls import * +import json, os +from pyspark.sql.functions import input_file_name + + +hm = HealthcareManager() +df = spark.read.text("file:////Workspace/Repos/aaron.zavora@databricks.com/x12-edi-parser/sampledata/837/*txt", wholetext = True) + + +rdd = ( + df.withColumn("filename", input_file_name()).rdd + .map(lambda x: (x.asDict().get("filename"),x.asDict().get("value"))) + .map(lambda x: (x[0], EDI(x[1]))) + .map(lambda x: { **{'filename': x[0]}, **hm.to_json(x[1])} ) + .map(lambda x: json.dumps(x)) +) +claims = spark.read.json(rdd) + +# COMMAND ---------- + +claims.createOrReplaceTempView("edi") + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC select +# MAGIC `edi.sender_tax_id` as tax_id, +# MAGIC explode(`FuncitonalGroup`) as fg +# MAGIC from edi + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC --flatten EDI +# MAGIC drop table if exists stg_claims; +# MAGIC CREATE TABLE stg_claims +# MAGIC as +# MAGIC select clms, filename, tax_id, sender, transaction_type +# MAGIC from +# MAGIC ( +# MAGIC select *, explode(trnx.Claims) as clms +# MAGIC from +# MAGIC ( +# MAGIC select filename, tax_id, +# MAGIC fg.`FunctionalGroup.sender` as sender, +# MAGIC fg.`FunctionalGroup.transaction_type` as transaction_type, +# MAGIC explode(fg.`Transactions`) as trnx +# MAGIC from +# MAGIC ( +# MAGIC select +# MAGIC `edi.sender_tax_id` as tax_id, +# MAGIC explode(`FuncitonalGroup`) as fg, +# MAGIC filename +# MAGIC from edi +# MAGIC ) fgs +# MAGIC ) trnx +# MAGIC ) clms +# MAGIC + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from stg_claims limit 10 + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Claim Header Table + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC drop table if exists claim_header; +# MAGIC create table claim_header as +# MAGIC select filename, +# MAGIC tax_id, +# MAGIC sender, +# MAGIC transaction_type, +# MAGIC clms.claim_header.*, +# MAGIC clms.diagnosis.*, +# MAGIC clms.payer.*, +# MAGIC clms.providers.*, +# MAGIC clms.patient.name as patient_name, +# MAGIC clms.patient.patient_relationship_cd, +# MAGIC clms.patient.street as patient_street, +# MAGIC clms.patient.city as patient_city, +# MAGIC clms.patient.zip as patient_zip, +# MAGIC clms.patient.dob as patient_dob, +# MAGIC clms.patient.dob_format as patient_dob_format, +# MAGIC clms.patient.gender_cd as patient_gender_cd, +# MAGIC clms.subscriber.subsciber_identifier, +# MAGIC clms.subscriber.name as subscriber_name, +# MAGIC clms.subscriber.subscriber_relationship_cd, +# MAGIC clms.subscriber.street as subscriber_street, +# MAGIC clms.subscriber.city as subscriber_city, +# MAGIC clms.subscriber.zip as subscriber_zip, +# MAGIC clms.subscriber.dob as subscriber_dob, +# MAGIC clms.subscriber.dob_format as subscriber_dob_format, +# MAGIC clms.subscriber.gender_cd as subscriber_gender_cd +# MAGIC from stg_claims + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from claim_header limit 10 + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Claim Lines table + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC create table claim_line as +# MAGIC select filename, claim_id, cl.* +# MAGIC from ( +# MAGIC select filename, +# MAGIC clms.claim_header.claim_id, +# MAGIC explode(clms.claim_lines) as cl +# MAGIC from stg_claims +# MAGIC ) foo + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from claim_line limit 10; + +# COMMAND ---------- + +# MAGIC %md # 835 + +# COMMAND ---------- + +from databricksx12 import * +from databricksx12.hls import * +import json, os +from pyspark.sql.functions import input_file_name + +hm = HealthcareManager() +df = spark.read.text("file:////Workspace/Repos/aaron.zavora@databricks.com/x12-edi-parser/sampledata/835/*txt", wholetext = True) + + +rdd = ( + df.withColumn("filename", input_file_name()).rdd + .map(lambda x: (x.asDict().get("filename"),x.asDict().get("value"))) + .map(lambda x: (x[0], EDI(x[1]))) + .map(lambda x: { **{'filename': x[0]}, **hm.to_json(x[1])} ) + .map(lambda x: json.dumps(x)) +) +claims = spark.read.json(rdd) + +# COMMAND ---------- + +claims.createOrReplaceTempView("edi") + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC --flatten EDI +# MAGIC drop table if exists stg_remittance; +# MAGIC CREATE TABLE stg_remittance +# MAGIC as +# MAGIC select clms, filename, tax_id, sender, transaction_type +# MAGIC from +# MAGIC ( +# MAGIC select *, explode(trnx.Claims) as clms +# MAGIC from +# MAGIC ( +# MAGIC select filename, tax_id, +# MAGIC fg.`FunctionalGroup.sender` as sender, +# MAGIC fg.`FunctionalGroup.transaction_type` as transaction_type, +# MAGIC explode(fg.`Transactions`) as trnx +# MAGIC from +# MAGIC ( +# MAGIC select +# MAGIC `edi.sender_tax_id` as tax_id, +# MAGIC explode(`FuncitonalGroup`) as fg, +# MAGIC filename +# MAGIC from edi +# MAGIC ) fgs +# MAGIC ) trnx +# MAGIC ) clms + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from stg_remittance limit 10 +# MAGIC + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC drop table if exists remittance; +# MAGIC create table remittance as +# MAGIC select filename, +# MAGIC tax_id, +# MAGIC sender, +# MAGIC transaction_type, +# MAGIC clms.* +# MAGIC from stg_remittance + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC select * from remittance limit 10; + +# COMMAND ---------- + + diff --git a/sampledata/835/sample.txt b/sampledata/835/sample.txt new file mode 100644 index 0000000..93bc450 --- /dev/null +++ b/sampledata/835/sample.txt @@ -0,0 +1,67 @@ +ISA*00* *00* *ZZ*NVMED *ZZ*99999999*180613*1230*^*00501*100000300*0*P*:~ +GS*HP*NVMED*99999999*20180613*123021*100000300*X*005010X221A1~ +ST*835*0001~ +BPR*H*0*C*NON************20180615~ +TRN*1*100004762*1388600002~ +DTM*405*20180613~ +N1*PR*DIVISON OF HEALTH CARE FINANCING AND POLICY~ +N3*1100 East William Street Suite 101~ +N4*Carson*NV*89701~ +PER*BL*Nevada Medicaid*TE*8776383472*EM*nvmmis.edisupport@dxc.com~ +N1*PE*SUMMER*XX*6666666666~ +REF*TJ*111111111~ +LX*1~ +CLP*77777777*4*72232*0**MC*6666666666666~ +CAS*OA*147*50016*0~ +CAS*CO*26*22216*0~ +NM1*QC*1*TOM*SMITH****MR*77777777777~ +NM1*74*1*ALAN*PARKER****C*88888888888~ +NM1*PR*2*PACIFI*****PI* 9999~ +NM1*GB*1*BARRY*CARRY****MI*666666666~ +REF*EA*8888888~ +DTM*232*20180314~ +DTM*233*20180317~ +SE*22*0001~ +ST*835*0002~ +BPR*H*0*C*NON************20180615~ +TRN*1*100004765*5555555555~ +DTM*405*20180613~ +N1*PR*DIVISON OF HEALTH CARE FINANCING AND POLICY~ +N3*1100 East William Street Suite 101~ +N4*Carson*NV*89701~ +PER*BL*Nevada Medicaid*TE*8776383472*EM*nvmmis.edisupport@dxc.com~ +N1*PE*VALLEY*XX*6666666666~ +REF*TJ*530824679~ +LX*1~ +CLP*77777778*2*3002*0**MC*6666666666667~ +CAS*OA*176*3002*0~ +NM1*QC*1*BOB*THOMAS****MR*55555555555~ +NM1*74*1*ALAN*JACKSON****C*66666666666~ +REF*EA*8888888~ +DTM*232*20171001~ +DTM*233*20171002~ +CLP*77777779*4*41231.04*0**MC*6666666666668~ +CAS*OA*147*9365.04*0~ +CAS*CO*26*31866*0~ +NM1*QC*1*HELD*ALLEN****MR*77777777778~ +NM1*74*1*RYAN*LARRY****C*88888888889~ +NM1*PR*2*SENIOR*****PI* 8888~ +NM1*GB*1*MARY*JANE****MI*777777777~ +REF*EA*6047740~ +DTM*232*20180220~ +DTM*233*20180221~ +SE*29*0002~ +ST*835*0003~ +BPR*I*1812.27*C*CHK************20180727~ +TRN*1*000012382*5555555555~ +DTM*405*20180720~ +N1*PR*DIVISON OF HEALTH CARE FINANCING AND POLICY~ +N3*1100 East William Street Suite 101~ +N4*Carson*NV*89701~ +PER*BL*Nevada Medicaid*TE*8776383472*EM*nvmmis.edisupport@dxc.com~ +N1*PE*SILVER*XX*7777777777~ +REF*TJ*666666666~ +PLB*8888888888*20181231*CT:888888888*-1092.46*CT:888888888*-719.81*CS:8888888888887*-181.55*CS:8888888888887*181.55*CS:8888888888888*-130*CS:8888888888888*130~ +SE*12*0003~ +GE*3*100000300~ +IEA*1*100000301~ diff --git a/sampledata/835/sample_services.txt b/sampledata/835/sample_services.txt new file mode 100644 index 0000000..e83b49e --- /dev/null +++ b/sampledata/835/sample_services.txt @@ -0,0 +1,35 @@ +ISA*00* *00* *ZZ*NVMED *ZZ*99999999*180613*1230*^*00501*100000300*0*P*:~ +GS*HP*NVMED*99999999*20180613*123021*100000300*X*005010X221A1~ +ST*835*112233*005010X221A1~ +BPR*I*391.05*C*ACH*CCP*01*322271724*DA*203158175*8076853391**01*122000496*DA*7341099666*20120131~ +TRN*1*051036622050010*1262721578~ +N1*PR*BCBS DISNEY~ +N3*POBLADO RD~ +N4*LOS ANGELES*CA*9006~ +PER*BL*MICHAEL EISNER*TE*7145205060*EX*123*EM*edi@bcbsdisney.com~ +PER*IC**UR*www.bcbsdisney.com/policies.html~ +N1*PE*UCLA MEDICAL CENTER*XX*1215193883~ +LX*1001~ +CLP*ABC9001*1*225*200*5*12*1142381711242*22*1~ +CAS*CO*45*20~ +NM1*QC*1*MOUSE*MICKEY****MI*60345914A~ +SVC*HC:98765*150*145~ +DTM*472*20120124~ +CAS*PR*3*5~ +REF*0K*8910~ +SVC*HC:26591*75*75~ +DTM*472*20120124~ +LX*1002~ +CLP*ABC9002*1*225*195*10*12*1142381711242*22*1~ +CAS*CO*45*20~ +NM1*QC*1*DUCK*DONALD****MI*60345914B~ +SVC*HC:98765*150*140~ +DTM*472*20120124~ +CAS*PR*3*10~ +REF*0K*8910~ +SVC*HC:26591*75*75~ +DTM*472*20120124~ +PLB*1215193883*20121231*90*3.95~ +SE*31*112233~ +GE*3*100000300~ +IEA*1*100000301~ diff --git a/tests/test_claims.py b/tests/test_claims.py index 2dff588..38dd269 100644 --- a/tests/test_claims.py +++ b/tests/test_claims.py @@ -1,9 +1,13 @@ -from test_spark_base import * +from .test_spark_base import * +from .test_pyspark import * from databricksx12.hls import * from databricksx12 import * import unittest, re +from functools import reduce +from operator import add -class TestClaims(PySparkBaseTest): + +class TestClaims(PysparkBaseTest): def test_professional_service_lines(self): edi = EDI(open("sampledata/837/CC_837P_EDI.txt", "rb").read().decode("utf-8")) @@ -20,6 +24,6 @@ def test_institutional_service_lines(self): data = hm.from_edi(edi)[0] assert([y.to_dict().get("claim_line_number") for y in data.sl_info] == ['1', '2', '3', '4', '5', '6', '7', '8', '9']) assert([y.to_dict().get("revenue_cd") for y in data.sl_info] ==['0124', '0250', '0260', '0300', '0301', '0305', '0306', '0307', '0351']) - assert( sum([float(y.to_dict().get("line_chrg_amt")) for y in data.sl_info]) == 17166.7) + assert( reduce(add, [float(y.to_dict().get("line_chrg_amt")) for y in data.sl_info]) == 17166.7) diff --git a/tests/test_edi.py b/tests/test_edi.py index 801027c..7bbdc59 100644 --- a/tests/test_edi.py +++ b/tests/test_edi.py @@ -1,5 +1,5 @@ import unittest, re -from test_spark_base import * +from .test_spark_base import * from databricksx12.edi import * class TestEDI(PysparkBaseTest): diff --git a/tests/test_loop.py b/tests/test_loop.py index e0dcd13..215386f 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -1,4 +1,4 @@ -from test_spark_base import * +from .test_spark_base import * from databricksx12.hls.loop import * import unittest, re @@ -6,7 +6,7 @@ class TestLoop(PysparkBaseTest): data = open("sampledata/837/837p.txt", "rb").read().decode("utf-8") - loop = Loop(data) + loop = Loop([Segment(x) for x in re.split(r'~[\n]', data)][:-1]) # # Test Loop base info @@ -43,7 +43,7 @@ def test_loop_hierarchy(self): # def test_loop_hierarchy_child_codes(self): data = open("./sampledata/837/CHPW_Claimdata_edited.txt.tmp", "rb").read().decode("utf-8") - loop = Loop(data) + loop = Loop([Segment(x) for x in re.split(r'~[\n]', data)][:-1]) assert(loop.find_hl_codes(174, '22')['start_idx'] == 160) # @@ -51,7 +51,7 @@ def test_loop_hierarchy_child_codes(self): # def test_get_segments(self): data = open("./sampledata/837/CHPW_Claimdata_edited.txt.tmp", "rb").read().decode("utf-8") - loop = Loop(data) + loop = Loop([Segment(x) for x in re.split(r'~[\n]', data)][:-1]) assert(loop.get_loop(174, '2000A')['start_idx'] == 144 and loop.get_loop(174, '2000A')['end_idx'] == 153) assert( len(loop.get_loop_segments(174, '2000A')) == 153 - 144) assert(loop.get_loop_segments(174, '2000A')[0].element(0) == "HL") diff --git a/tests/test_pyspark.py b/tests/test_pyspark.py index b7fcd3e..170aafa 100644 --- a/tests/test_pyspark.py +++ b/tests/test_pyspark.py @@ -1,4 +1,4 @@ -from test_spark_base import * +from .test_spark_base import * from databricksx12.edi import * class TestPyspark(PysparkBaseTest): diff --git a/tests/test_segment.py b/tests/test_segment.py index 51ad0bd..c288270 100644 --- a/tests/test_segment.py +++ b/tests/test_segment.py @@ -1,5 +1,5 @@ import unittest, re -from test_spark_base import * +from .test_spark_base import * from databricksx12.edi import * class TestSegment(PysparkBaseTest): @@ -20,11 +20,11 @@ def test_sub_element_length(self): def test_get_elements(self): assert ( TestSegment.segments[0].element(0) == TestSegment.segments[0].element(0, 0) == TestSegment.segments[0].element(0, -1) == 'ISA' ) - assert ( TestSegment.segments[0].element(0, 1) == TestSegment.segments[0].element(0, 2) == 'na/dne' ) + assert ( TestSegment.segments[0].element(0, 1) == TestSegment.segments[0].element(0, 2) == '' ) assert ( TestSegment.segments[0].element(0, 1, dne='foobar') == TestSegment.segments[0].element(0, 2, dne='foobar') == 'foobar' ) assert ( TestSegment.segments[22].element(5) == '11:A:1' ) assert ( TestSegment.segments[22].element(5, 0) + ":" + TestSegment.segments[22].element(5, 1) + ":" + TestSegment.segments[22].element(5, 2) == '11:A:1' ) - assert ( TestSegment.segments[22].element(5, 3) == "na/dne" ) + assert ( TestSegment.segments[22].element(5, 3) == "" ) if __name__ == '__main__': unittest.main()