Skip to content

Commit

Permalink
Merge pull request #4 from databricks-industry-solutions/raven-eda-hl…
Browse files Browse the repository at this point in the history
…-segments

Raven eda hl segments
  • Loading branch information
zavoraad authored May 22, 2024
2 parents 6e8b844 + 59ddfbe commit 333aca7
Show file tree
Hide file tree
Showing 16 changed files with 1,207 additions and 160 deletions.
209 changes: 101 additions & 108 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,104 +3,125 @@
[![CLOUD](https://img.shields.io/badge/CLOUD-ALL-blue?logo=googlecloud&style=for-the-badge)](https://cloud.google.com/databricks)
[![POC](https://img.shields.io/badge/POC-10_days-green?style=for-the-badge)](https://databricks.com/try-databricks)

## Business Problem (Under Construction / Not Stable)
# Business Problem

Addressing the issue of working with various parts of an x12 EDI transaction in Spark on Databricks.
Working with various x12 EDI transactions in Spark on Databricks.

## Install
# Install

```python
pip install git+https://github.com/databricks-industry-solutions/x12-edi-parser
```

## Run
# Run

### Reading in EDI Data
## Reading in EDI Data

Default format used is AnsiX12 (* as a delim and ~ as segment separator)

```python
from databricksx12.format import *
from databricksx12.edi import *
ediFormat = AnsiX12Delim #specifying formats of data, ansi is also the default if nothing is specified
df = spark.read.text("sampledata/837/*", wholetext = True)

(df.rdd
.map(lambda x: x.asDict().get("value"))
.map(lambda x: EDI(x, delim_cls = ediFormat))
.map(lambda x: {"transaction_count": x.num_transactions()})
).toDF().show()
+-----------------+
|transaction_count|
+-----------------+
| 5|
| 1|
| 1|
| 1|
+-----------------+
from databricksx12 import *

#EDI format type
ediFormat = AnsiX12Delim #specifying formats of data, ansi is also the default if nothing is specified
#can also specify customer formats (below is the same as AnsiX12Delim)
ediFormat = type("", (), dict({'SEGMENT_DELIM': '~', 'ELEMENT_DELIM': '*', 'SUB_DELIM': ':'}))

df = spark.read.text("sampledata/837/*txt", wholetext = True)

#Building a dynamic/custom format
customFormat = type("", (), dict({'SEGMENT_DELIM': '~', 'ELEMENT_DELIM': '*', 'SUB_DELIM': ':'}))
(df.rdd
.map(lambda x: x.asDict().get("value"))
.map(lambda x: EDI(x, delim_cls = customFormat))
.map(lambda x: EDI(x, delim_cls = ediFormat))
.map(lambda x: {"transaction_count": x.num_transactions()})
).toDF().show()
"""
+-----------------+
|transaction_count|
+-----------------+
| 5|
| 1|
| 1|
| 1|
| 1|
+-----------------+
"""
```

## Parsing Healthcare Transactions

```
Currently supports 837s. Records in each format type should be saved separately, e.g. do not mix 835s & 837s in the df.save() command.

#### EDI as a Table for SQL
### 837i and 837p sample data in Spark

```python
""""
Look at all data refernce -> https://justransform.com/edi-essentials/edi-structure/
(1) Including control header / ISA & IEA segments
"""
from databricksx12 import *
from databricksx12.hls import *
import json
from pyspark.sql.functions import input_file_name

( df.withColumn("filename", input_file_name()).rdd
hm = HealthcareManager()
df = spark.read.text("sampledata/837/*txt", wholetext = True)


rdd = (
df.withColumn("filename", input_file_name()).rdd
.map(lambda x: (x.asDict().get("filename"),x.asDict().get("value")))
.map(lambda x: (x[0], EDI(x[1])))
.map(lambda x: [{**{"filename": x[0]}, **y} for y in x[1].toRows()])
.flatMap(lambda x: x)
.toDF()).show()
.map(lambda x: { **{'filename': x[0]}, **hm.to_json(x[1])} )
.map(lambda x: json.dumps(x))
)
claims = spark.read.json(rdd)

#Claim header table TODO

#Claim line table TODO

"""
+--------------------+----------+--------------------------+--------------+------------+-----------------------------+--------+
| row_data|row_number|segment_element_delim_char|segment_length|segment_name|segment_subelement_delim_char|filename|
+--------------------+----------+--------------------------+--------------+------------+-----------------------------+--------+
|ISA*00* ...| 0| *| 17| ISA| :|file:///|
|GS*HC*CLEARINGHOU...| 1| *| 9| GS| :|file:///|
|ST*837*000000001*...| 2| *| 4| ST| :|file:///|
|BHT*0019*00*73490...| 3| *| 7| BHT| :|file:///|
|NM1*41*2*CLEARING...| 4| *| 10| NM1| :|file:///|
|PER*IC*CLEARINGHO...| 5| *| 7| PER| :|file:///|
|NM1*40*2*12345678...| 6| *| 10| NM1| :|file:///|
```

#### Parsing Healthcare Transactions
### Sample data outside of Spark


```python
from databricksx12.hls.healthcare import *
from databricksx12 import *
from databricksx12.hls import *
import json

hm = HealthcareManager()
x = EDI(open("sampledata/837/CHPW_Claimdata.txt", "rb").read().decode("utf-8"))
edi = EDI(open("sampledata/837/CHPW_Claimdata.txt", "rb").read().decode("utf-8"))

#Returns parsed claim data
hm.from_edi(edi)
#[<databricksx12.hls.claim.Claim837p object at 0x106e57b50>, <databricksx12.hls.claim.Claim837p object at 0x106e57c40>, <databricksx12.hls.claim.Claim837p object at 0x106e57eb0>, <databricksx12.hls.claim.Claim837p object at 0x106e57b20>, <databricksx12.hls.claim.Claim837p object at 0x106e721f0>]

hm.from_edi(x)
#[<databricksx12.hls.claim.Claim837p object at 0x1027003d0>, <databricksx12.hls.claim.Claim837p object at 0x1027006a0>, <databricksx12.hls.claim.Claim837p object at 0x102700700>, <databricksx12.hls.claim.Claim837p object at 0x102700550>, <databricksx12.hls.claim.Claim837p object at 0x1027002b0>]
#Print in json format
print(json.dumps(hm.to_json(edi), indent=4))

one_claim = hm.from_edi(x)[0]
"""
{
"EDI.sender_tax_id": "ZZ",
"FuncitonalGroup": [
{
"FunctionalGroup.receiver": "123456789",
"FunctionalGroup.sender": "CLEARINGHOUSE",
"FunctionalGroup.transaction_datetime": "20180508:0833",
"FunctionalGroup.transaction_type": "222",
"Transactions": [
{
"Transaction.transaction_type": "222",
"Claims": [
{
"submitter": {
"contact_name": "CLEARINGHOUSE CLIENT SERVICES",
"contacts": {
"primary": [
{
"contact_method": "Telephone",
"contact_number": "8005551212",
...
"""

#print the raw EDI Segments of one claim
one_claim = hm.from_edi(edi)[0]
print("\n".join([y.data for y in one_claim.data])) #Print one claim to look at the segments of it
"""
BHT*0019*00*7349063984*20180508*0833*CH
Expand All @@ -120,67 +141,41 @@ HL*2*1*22*0
SBR*P*18**COMMUNITY HLTH PLAN OF WASH*****CI
NM1*IL*1*SUBSCRIBER*JOHN*J***MI*987321
N3*987 65TH PL
N4*VANCOUVER*WA*986640001
DMG*D8*19881225*M
NM1*PR*2*COMMUNITY HEALTH PLAN OF WASHINGTON*****PI*CHPWA
CLM*1805080AV3648339*20***57:B:1*Y*A*Y*Y
REF*D9*7349065509
HI*ABK:F1120
NM1*82*1*PROVIDER*JAMES****XX*1112223338
PRV*PE*PXC*261QR0405X
NM1*77*2*BH CLINIC OF VANCOUVER*****XX*1122334455
N3*12345 MAIN ST SUITE A1
N4*VANCOUVER*WA*98662
LX*1
SV1*HC:H0003*20*UN*1***1
DTP*472*D8*20180428
REF*6R*142671

...
"""
```

#### Further EDI Parsing in Pyspark
> **Warning**
> Sections below this are under construction
## EDI as a Table for SQL

```python
from databricksx12.edi import *
x = EDIManager(EDI(open("sampledata/837/CHPW_Claimdata.txt", "rb").read().decode("utf-8")))
""""
Look at all data refernce -> https://justransform.com/edi-essentials/edi-structure/
(1) Including control header / ISA & IEA segments
"""
from pyspark.sql.functions import input_file_name

import json
print(json.dumps(x.flatten(x.data), indent=4))
{
"EDI.sender_tax_id": "ZZ",
"list": [
{
"FunctionalGroup.receiver": "123456789",
"FunctionalGroup.sender": "CLEARINGHOUSE",
"FunctionalGroup.transaction_datetime": "20180508:0833",
"FunctionalGroup.transaction_type": "222",
"list": [
{
"Transaction.transaction_type": "222"
},
{
"Transaction.transaction_type": "222"
},
{
"Transaction.transaction_type": "222"
},
{
"Transaction.transaction_type": "222"
},
{
"Transaction.transaction_type": "222"
}
]
}
]
}
( df.withColumn("filename", input_file_name()).rdd
.map(lambda x: (x.asDict().get("filename"),x.asDict().get("value")))
.map(lambda x: (x[0], EDI(x[1])))
.map(lambda x: [{**{"filename": x[0]}, **y} for y in x[1].toRows()])
.flatMap(lambda x: x)
.toDF()).show()

"""
+--------------------+----------+--------------------------+--------------+------------+-----------------------------+--------+
| row_data|row_number|segment_element_delim_char|segment_length|segment_name|segment_subelement_delim_char|filename|
+--------------------+----------+--------------------------+--------------+------------+-----------------------------+--------+
|ISA*00* ...| 0| *| 17| ISA| :|file:///|
|GS*HC*CLEARINGHOU...| 1| *| 9| GS| :|file:///|
|ST*837*000000001*...| 2| *| 4| ST| :|file:///|
|BHT*0019*00*73490...| 3| *| 7| BHT| :|file:///|
|NM1*41*2*CLEARING...| 4| *| 10| NM1| :|file:///|
|PER*IC*CLEARINGHO...| 5| *| 7| PER| :|file:///|
|NM1*40*2*12345678...| 6| *| 10| NM1| :|file:///|
```
#### Other EDI Parsing in Pyspark
```python
"""
Expand Down Expand Up @@ -249,8 +244,6 @@ ediDF.show()
"""
#show first line of each transaction
trxDF.filter(x.row_number == 0).show()
"""
Expand Down
5 changes: 5 additions & 0 deletions databricksx12/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from .edi import *
from .format import *
from .functional import *
from .transaction import *


14 changes: 10 additions & 4 deletions databricksx12/edi.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def num_transactions(self):
#
def num_functional_groups(self):
return len(self.segments_by_name("GE"))


#
# Maps a list of indexes [0,4,7] to a series of ranges -> [(0,4), (4,7)]
#
def _index_to_tuples(self, indexes):
return list((zip(indexes, indexes[1:])))

#
# Return all segments associated with each funtional group
Expand Down Expand Up @@ -124,6 +129,8 @@ def toRows(self):
def header(self):
return self.data[0]



class Segment():

#
Expand Down Expand Up @@ -173,8 +180,6 @@ def filter(self, value, element, sub_element, dne="na/dne"):
return self if value == self.get_element(element, sub_element, dne) else None




#
# Manage relationship heirarchy within EDI
#
Expand Down Expand Up @@ -223,7 +228,8 @@ def flatten(data = None):
}
else:
return EDIManager.class_metadata(data)




"""
from databricksx12.edi import *
Expand Down
3 changes: 3 additions & 0 deletions databricksx12/hls/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .healthcare import *
from .claim import *
from .loop import *
Loading

0 comments on commit 333aca7

Please sign in to comment.