Skip to content
This repository has been archived by the owner on Jan 26, 2021. It is now read-only.

Integrate county level data (WIP) #52

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions covid/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,19 @@
from enum import IntEnum

PATH_TO_SERVICE_ACCOUNT_KEY = "service-account-key.json"


# Define the frontend colors.
class Color(IntEnum):
GREEN = 0
YELLOW = 1
RED = 2
DARK_RED = 3


COLOR_NAME_MAP = {
Color.GREEN: "Green",
Color.YELLOW: "Yellow",
Color.RED: "Red",
Color.DARK_RED: "Dark Red",
}
29 changes: 29 additions & 0 deletions covid/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pandas as pd
import requests
from covidcountydata import Client
from df2gspread import gspread2df

import covid.extract_config.cdc_govcloud as cgc
Expand All @@ -29,6 +30,13 @@
CATEGORY_3_DATA_GOOGLE_SHEET_KEY = "1-BSd5eFbNsypygMkhuGX1OWoUsF2u4chpsu6aC4cgVo"
CATEGORY_3_HISTORICAL_DATA_TAB = "Historical Data"


# Define the url used for covidatlas.com which we use for county-level data.
_COVID_ATLAS_TIME_SERIES_URL = "https://liproduction-reportsbucket-bhk8fnhv1s76.s3-us-west-1.amazonaws.com/v1/latest/timeseries.csv"

# Define the API key for covidcountydata
_COVID_COUNTY_DATA_API_KEY = "AWWjV0egkRXe8GPm6JAB3NrUHRW3VDAe"

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -202,3 +210,24 @@ def extract_cdc_beds_historical_data(credentials):
cdc_historical_df = cdc_historical_df.set_index(STATE_FIELD)

return cdc_historical_df


def extract_covid_atlas_data():
"""This function pulls a time series of all county-level data available using covidatlas.com."""
# Pull all historical data available.
logger.info("Downloading Covid Atlas data (this may take a while).")
historical_df = pd.read_csv(_COVID_ATLAS_TIME_SERIES_URL)
logger.info("Finished downloading Covid Atlas data.")
return historical_df


def extract_covidcounty_data():
logger.info("Downloading county-level data")
c = Client(apikey=_COVID_COUNTY_DATA_API_KEY)
two_weeks_ago = pd.Timestamp.utcnow() - pd.Timedelta("14D")
c.covid_us(location=">1000", dt=f">{two_weeks_ago.date()}").demographics(
variable="Total population"
).us_counties()
covid_df = c.fetch()
logger.info("Finished downloading county-level data")
return covid_df
5 changes: 4 additions & 1 deletion covid/load_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging
import time

SECONDS_TO_SLEEP = 20

logger = logging.getLogger(__name__)


def sleep_and_log(seconds=SECONDS_TO_SLEEP):
print(f"Sleeping for {seconds} seconds between posts...")
logger.info(f"Sleeping for {seconds} seconds between posts...")
time.sleep(seconds)
383 changes: 383 additions & 0 deletions covid/test_fixtures/covidatlas_example_subset.csv

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions covid/test_fixtures/expected_county_df_example.csv

Large diffs are not rendered by default.

215 changes: 212 additions & 3 deletions covid/transform.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import datetime
import logging

import numpy as np
import pandas as pd

from covid.constants import Color
from covid.constants import COLOR_NAME_MAP
from covid.extract import DATE_SOURCE_FIELD
from covid.extract import extract_state_population_data
from covid.extract import get_state_abbreviations_to_names
Expand All @@ -12,10 +16,13 @@
from covid.transform_utils import calculate_consecutive_boolean_series
from covid.transform_utils import calculate_consecutive_positive_or_negative_values
from covid.transform_utils import calculate_max_run_in_window
from covid.transform_utils import compute_lagged_frame
from covid.transform_utils import fit_and_predict_cubic_spline_in_r
from covid.transform_utils import generate_lag_column_name_formatter_and_column_names
from covid.transform_utils import generate_lags
from covid.transform_utils import get_color_series_from_range

logger = logging.getLogger(__name__)

# Define output field names.
# Criteria Category 1 Fields.
Expand Down Expand Up @@ -245,6 +252,64 @@
for criteria_field in _CDC_CRITERIA_6_STREAK_STATE_SUMMARY_FIELDS
]

# Define fields for our county-level data.
COUNTY_LAST_UPDATED_FIELD = "LAST UPDATED"
COUNTY_POPULATION_FIELD = "POPULATION"
COUNTY_STATE_FIELD = "STATE"
COUNTY_FIELD = "COUNTY"
COUNTY_FIPS_FIELD = "FIPS"
COUNTY_NEW_TESTS_FIELD = "NEW TESTS"
COUNTY_TESTED_3DCS_FIELD = "NEW TESTS (3DCS)"
COUNTY_TESTED_7DRA_FIELD = "NEW TESTS (7DRA)"
COUNTY_NEW_CASES_FIELD = "NEW CASES"
COUNTY_NEW_CASES_3DRA_FIELD = "NEW CASES (3DRA)"
COUNTY_NEW_CASES_3DCS_FIELD = "NEW CASES (3DCS)"
COUNTY_NEW_CASES_7DRA_FIELD = "NEW CASES (7DRA)"
COUNTY_NEW_CASES_PM_FIELD = "NEW CASES PER MILLION"
COUNTY_NEW_CASES_PM_3DRA_FIELD = "NEW CASES PER MILLION (3DRA)"
COUNTY_NEW_CASES_PM_3DCS_FIELD = "NEW CASES PER MILLION (3DCS)"
COUNTY_NEW_CASES_PM_7DRA_FIELD = "NEW CASES PER MILLION (7DRA)"
COUNTY_NEW_CASES_PM_COLOR_FIELD = "NEW CASES PER MILLION COLOR"
COUNTY_POSITIVITY_FIELD = "COVID+ RATE"
COUNTY_POSITIVITY_3DCS_FIELD = "COVID+ RATE (3DCS)"
COUNTY_POSITIVITY_3DRA_FIELD = "COVID+ RATE (3DRA)"
COUNTY_POSITIVITY_7DRA_FIELD = "COVID+ RATE (7DRA)"
COUNTY_POSITIVITY_COLOR_FIELD = "COVID+ COLOR"
COUNTY_COLOR_FIELD = "COUNTY COLOR"
_COUNTY_NUM_LAGS = 14
_, _COUNTY_NEW_CASES_LAG_FIELDS = generate_lag_column_name_formatter_and_column_names(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, there might be a better way to do this -- I wrote this but it still feels a little hacky to me, so I'm open to all other suggestions!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I adopted the "unless I'm refactoring everything or it's necessary, don't change pattern" mindset. Maybe we can track a few issues on refactors we know we should do?

column_name=COUNTY_NEW_CASES_7DRA_FIELD, num_lags=_COUNTY_NUM_LAGS
)
(
_,
_COUNTY_NEW_CASES_PM_LAG_FIELDS,
) = generate_lag_column_name_formatter_and_column_names(
column_name=COUNTY_NEW_CASES_PM_7DRA_FIELD, num_lags=_COUNTY_NUM_LAGS
)
_, _COUNTY_POSITIVITY_LAG_FIELDS = generate_lag_column_name_formatter_and_column_names(
column_name=COUNTY_POSITIVITY_7DRA_FIELD, num_lags=_COUNTY_NUM_LAGS
)

# Define the list of columns for county-level data.
COUNTY_SUMMARY_COLUMNS = [
COUNTY_LAST_UPDATED_FIELD,
COUNTY_STATE_FIELD,
COUNTY_FIELD,
COUNTY_FIPS_FIELD,
COUNTY_COLOR_FIELD,
COUNTY_NEW_CASES_PM_COLOR_FIELD,
COUNTY_POSITIVITY_COLOR_FIELD,
*_COUNTY_NEW_CASES_LAG_FIELDS,
COUNTY_NEW_CASES_7DRA_FIELD,
*_COUNTY_NEW_CASES_PM_LAG_FIELDS,
COUNTY_NEW_CASES_PM_7DRA_FIELD,
*_COUNTY_POSITIVITY_LAG_FIELDS,
COUNTY_POSITIVITY_7DRA_FIELD,
COUNTY_NEW_TESTS_FIELD,
COUNTY_NEW_CASES_FIELD,
COUNTY_POPULATION_FIELD,
]


# Define the list of columns that should appear in the state summary tab.
STATE_SUMMARY_COLUMNS = [
Expand Down Expand Up @@ -434,6 +499,22 @@
LAST_UPDATED_FIELD,
]

# Define the upper bounds for each color for the new cases per million metric.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

_NEW_CASES_PM_COLOR_DICT = {
Color.GREEN: (0, 40),
Color.YELLOW: (40, 80),
Color.RED: (80, 150),
Color.DARK_RED: (150, np.inf),
}

# Define the upper bounds for each color for the positivity metric.
_POSITIVITY_COLOR_DICT = {
Color.GREEN: (0, 2),
Color.YELLOW: (2, 7),
Color.RED: (7, 10),
Color.DARK_RED: (10, np.inf),
}


def transform_covidtracking_data(covidtracking_df):
"""Transforms data from https://covidtracking.com/ and calculates CDC Criteria 1 (A, B, C, D) and 2 (A, B, C, D)."""
Expand Down Expand Up @@ -465,7 +546,7 @@ def transform_covidtracking_data(covidtracking_df):
state_population_data = extract_state_population_data()

for state in states:
print(f"Processing covid tracking data for state {state}...")
logger.info(f"Processing covid tracking data for state {state}...")

###### Calculate criteria category 1. ######
# Calculate new cases (raw).
Expand Down Expand Up @@ -987,7 +1068,7 @@ def transform_cdc_ili_data(ili_df):
ili_df[PERCENT_ILI] = ili_df[PERCENT_ILI].astype(float)

for state in states:
print(f"Processing CDC ILI data for state {state}...")
logger.info(f"Processing CDC ILI data for state {state}...")

###### Calculate criteria category 5. ######
# Calculate total cases (spline).
Expand Down Expand Up @@ -1153,7 +1234,7 @@ def transform_cdc_beds_data(cdc_beds_current_df, cdc_beds_historical_df):
states = cdc_df.index.get_level_values(STATE_FIELD).unique()
state_dfs = []
for state in states:
print(f"Transforming CDC beds data for state {state}...")
logger.info(f"Transforming CDC beds data for state {state}...")
state_df = cdc_df.xs(state, axis=0, level=STATE_FIELD)
state_df[MAX_INPATIENT_BED_OCCUPATION_7_DAYS] = (
state_df[INPATIENT_PERCENT_OCCUPIED]
Expand Down Expand Up @@ -1228,3 +1309,131 @@ def indication_of_rebound(series_):
indicator = "Rebound"

return indicator


def transform_county_data(covidcounty_df):
"""Takes a df with historical county-level observations and generates a df in our desired format with computed
columns.
"""
# Limit results to US counties.
county_df = covidcounty_df.copy()

# Rename primary columns.
county_df = county_df.rename(
columns={
"location": COUNTY_FIPS_FIELD,
"county_name": COUNTY_FIELD,
"dt": DATE_SOURCE_FIELD,
"state_name": COUNTY_STATE_FIELD,
"Total population": COUNTY_POPULATION_FIELD,
}
)

# covidcounty stores FIPS as integers....
county_df.loc[:, COUNTY_FIPS_FIELD] = county_df[COUNTY_FIPS_FIELD].transform(
lambda x: "{0:0>5}".format(x)
)

# Set date index.
county_df.loc[:, COUNTY_LAST_UPDATED_FIELD] = county_df[DATE_SOURCE_FIELD]
county_df.loc[:, DATE_SOURCE_FIELD] = county_df[DATE_SOURCE_FIELD].astype(str)
county_df.loc[:, DATE_SOURCE_FIELD] = pd.to_datetime(county_df[DATE_SOURCE_FIELD])
county_df = county_df.set_index(DATE_SOURCE_FIELD)
county_df.index = county_df.index.tz_localize("UTC")

# Set FIPS index.
county_df = county_df.set_index(COUNTY_FIPS_FIELD, append=True)
county_df = county_df.sort_index(level=DATE_SOURCE_FIELD)

# Calculate the new cases and test fields by differencing the cumulative fields.
county_df.loc[:, COUNTY_NEW_CASES_FIELD] = (
county_df.loc[:, "cases_total"].groupby(COUNTY_FIPS_FIELD).diff(periods=1)
)
county_df.loc[:, COUNTY_NEW_TESTS_FIELD] = (
county_df.loc[:, "tests_total"].groupby(COUNTY_FIPS_FIELD).diff(periods=1)
)

# Calculate the new cases per million field by dividing by population and multiplying by 1MM.
county_df.loc[:, COUNTY_NEW_CASES_PM_FIELD] = (
county_df.loc[:, COUNTY_NEW_CASES_FIELD]
/ county_df.loc[:, COUNTY_POPULATION_FIELD]
) * 1e6

# Set tested field to NaN when test counts are 0 to avoid zero-division errors.
# TODO (pjsheehan): is there a better way to handle this case?
county_df.loc[
county_df[COUNTY_NEW_TESTS_FIELD] == 0.0, COUNTY_NEW_TESTS_FIELD
] = np.NaN
county_df.loc[:, COUNTY_POSITIVITY_FIELD] = (
county_df[COUNTY_NEW_CASES_FIELD] / county_df[COUNTY_NEW_TESTS_FIELD]
) * 100

# Calculate rolling averages for important fields
rolling_avg_frame = (
county_df.loc[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised fit_and_predict_cubic_spline_in_r doesn't automatically return NaNs if its input data is NaN -- we could make that part of the function? Or this works just fine!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It breaks if you have all-NaN I think. Like the fitting portion was breaking on me. I can investigate further if we want!

:,
[COUNTY_NEW_CASES_FIELD, COUNTY_NEW_CASES_PM_FIELD, COUNTY_NEW_TESTS_FIELD],
]
.groupby(level=COUNTY_FIPS_FIELD)
.apply(lambda x: x.rolling(window=7, min_periods=1).mean())
)
rolling_avg_frame = rolling_avg_frame.add_suffix(" (7DRA)")
county_df = pd.concat([county_df, rolling_avg_frame], axis=1)

# Calculate the rolling average for positivity separately based on the average of tests and cases.
county_df.loc[:, COUNTY_POSITIVITY_7DRA_FIELD] = (
county_df[COUNTY_NEW_CASES_7DRA_FIELD] / county_df[COUNTY_TESTED_7DRA_FIELD]
) * 100

# Generate lags for important columns..
lag_fields = [
COUNTY_NEW_CASES_PM_7DRA_FIELD,
COUNTY_NEW_CASES_7DRA_FIELD,
COUNTY_POSITIVITY_7DRA_FIELD,
]
lag_frame = county_df.groupby(level=COUNTY_FIPS_FIELD).apply(
lambda x: compute_lagged_frame(
x,
num_periods=list(range(0, _COUNTY_NUM_LAGS)),
suffix=" T-",
subset=lag_fields,
)
)
county_df = pd.concat([county_df, lag_frame], axis=1)

# Calculate the 7-day rolling average color status.
cases_pm_color_series = get_color_series_from_range(
county_df[COUNTY_NEW_CASES_PM_7DRA_FIELD], _NEW_CASES_PM_COLOR_DICT,
)
county_df.loc[:, COUNTY_NEW_CASES_PM_COLOR_FIELD] = cases_pm_color_series.transform(
lambda x: COLOR_NAME_MAP.get(x)
)

positivity_color_series = get_color_series_from_range(
county_df[COUNTY_POSITIVITY_7DRA_FIELD], _POSITIVITY_COLOR_DICT,
)
county_df.loc[:, COUNTY_POSITIVITY_COLOR_FIELD] = positivity_color_series.transform(
lambda x: COLOR_NAME_MAP.get(x)
)

# Calculate the overall county color by taking the max of new cases pm and positivity.
county_color_series = cases_pm_color_series.combine(
positivity_color_series, func=max
)
county_df.loc[:, COUNTY_COLOR_FIELD] = county_color_series.transform(
lambda x: COLOR_NAME_MAP.get(x)
)

# Restore FIPS and date as regular columns to integer-id rows.
county_df = county_df.reset_index(drop=False)

# Reduce the dataset to the 2nd most recent data by county.
# Note: this is because the most recent data from covidatlas is sometimes incorrect.
county_df = county_df.sort_values(DATE_SOURCE_FIELD, ascending=False)
county_df = county_df.groupby(COUNTY_FIPS_FIELD).nth(2)
county_df = county_df.reset_index(drop=False)

# Reduce the dataset to the summary columns.
county_df = county_df.loc[:, COUNTY_SUMMARY_COLUMNS]

return county_df
19 changes: 19 additions & 0 deletions covid/transform_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import unittest

import pandas as pd
from pandas.testing import assert_frame_equal

from covid.transform import transform_county_data


class TransformTest(unittest.TestCase):
# @unittest.skip("Lots of formatting to fix this, TODO (@patricksheehan)")
def test_transform_county_data(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love using test fixtures for this!

covidatlas_df = pd.read_csv("test_fixtures/covidatlas_example_subset.csv")
county_df = transform_county_data(covidcounty_df=covidatlas_df)
expected_county_df = pd.read_csv(
"test_fixtures/expected_county_df_example.csv",
)
assert_frame_equal(
county_df, expected_county_df, check_dtype=False,
)
Loading