Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFD-3664: Pipeline job for SAMHSA tag backfill. #2506

Draft
wants to merge 37 commits into
base: feature/samhsa2.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c623eb5
Rebased commit for CCW RIF ingestion tag creation.
dondevun Nov 19, 2024
15a7958
Update apps/bfd-model/bfd-model-rif/src/main/java/gov/cms/bfd/model/r…
dondevun Nov 20, 2024
99a9bda
Refactor SamhsaUtil to combine a lot of the logic.
dondevun Nov 21, 2024
1d5206b
Refactor CCW Samhsa ingestion.
dondevun Nov 21, 2024
4e5c249
Fix broken table field in samhsa adapters.
dondevun Nov 21, 2024
ec53538
Refactor samhsa adapters
dondevun Nov 22, 2024
18754ba
Removed erroneous file
dondevun Nov 22, 2024
e2f5960
Refactored the way SAMHSA methods are executed.
dondevun Nov 25, 2024
88219b4
Removed unused documentation for throws.
dondevun Nov 25, 2024
da7848f
Added boolean on SamhsaUtil to indicate if a tag was created.
dondevun Nov 26, 2024
56351c9
Add javadoc for return statement in SamhsaUtil
dondevun Nov 26, 2024
c4a82dd
Create pipeline job to backfill SAMHSA data.:
dondevun Dec 2, 2024
b5c3a2b
Add configuration for backfill pipeline.
dondevun Dec 3, 2024
69bc595
Move majority of CCW Samsha tag processing to adapters.
dondevun Dec 3, 2024
42535d5
Rearrange methods in SamhsaUtil.
dondevun Dec 3, 2024
ead04dc
Update backfill query to ignore claims that already have tags.
dondevun Dec 4, 2024
67f26a4
Formatting changes
dondevun Dec 4, 2024
52da6dd
Merge branch 'feature/samhsa2.0' into BFD-3664
dondevun Dec 4, 2024
e978d0a
Fix formatting.
dondevun Dec 4, 2024
b3a9b15
Create ability for the backfill to restart where it left off if inter…
dondevun Dec 5, 2024
2ff48dc
Clean up switch statement in getClaimId methods.
dondevun Dec 5, 2024
87f2de9
Fixed checkstyle violation.
dondevun Dec 5, 2024
c84a48e
Create integration test for backfill service.
dondevun Dec 6, 2024
6ae26d1
Changes to Samhsa backfill service to reuse same entityManager throug…
dondevun Dec 6, 2024
a56f290
Update apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/…
dondevun Dec 6, 2024
85507f4
Fix bug in transaction usage
dondevun Dec 9, 2024
fda77ee
Each table will now be processed in its own thread.
dondevun Dec 10, 2024
e8d4868
Sum totals of all callable return values using lambdas
dondevun Dec 10, 2024
e11f2dc
Refactor backfill to use enums.
dondevun Dec 10, 2024
4d6954d
Using Query instead of NativeQuery
dondevun Dec 11, 2024
eb0f047
Testing omitting the not exists statement in the queries
dondevun Dec 11, 2024
068c4fc
Refactor code to query codes from tables directly.
dondevun Dec 13, 2024
9a6f719
Refactor CCW Samhsa backfill to use SQL queries.
dondevun Dec 16, 2024
ab6b9f7
Fix null value for appStateCcw
dondevun Dec 16, 2024
3373f25
Added new fields to progress table.
dondevun Dec 17, 2024
e17bfd3
Refactor backfill sql
dondevun Jan 7, 2025
b75b481
Refactor backfil query
dondevun Jan 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS ccw.samhsa_backfill_progress (
claim_table CHARACTER VARYING,
last_processed_claim CHARACTER VARYING,
PRIMARY KEY(claim_table)
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import gov.cms.bfd.pipeline.rda.grpc.source.StandardGrpcRdaSource;
import gov.cms.bfd.pipeline.sharedutils.IdHasher;
import gov.cms.bfd.pipeline.sharedutils.s3.S3ClientConfig;
import gov.cms.bfd.pipeline.sharedutils.samhsa.backfill.BackfillConfigOptions;
import gov.cms.bfd.sharedutils.config.AppConfigurationException;
import gov.cms.bfd.sharedutils.config.AwsClientConfig;
import gov.cms.bfd.sharedutils.config.BaseAppConfiguration;
Expand Down Expand Up @@ -375,12 +376,21 @@ public final class AppConfiguration extends BaseAppConfiguration {
public static final Set<String> MICROMETER_CW_ALLOWED_METRIC_NAMES =
Set.of("FissClaimRdaSink.change.latency.millis", "McsClaimRdaSink.change.latency.millis");

/** Config value for SAMHSA backfill enabled. */
public static final String SSM_PATH_SAMHSA_BACKFILL_ENABLED = "rda/samhsa/backfill/enabled";

/** Config value for SAMHSA backfill batch size. */
public static final String SSM_PATH_SAMHSA_BACKFILL_BATCH_SIZE = "rda/samhsa/backfill/batch_size";

/**
* The CCW rif load options. This can be null if the CCW job is not configured, Optional is not
* Serializable.
*/
@Nullable private final CcwRifLoadOptions ccwRifLoadOptions;

/** Configuration for Backfill Config. */
@Nullable private final BackfillConfigOptions backfillConfigOptions;

/**
* The RDA rif load options. This can be null if the RDA job is not configured, Optional is not
* Serializable.
Expand All @@ -405,6 +415,8 @@ public final class AppConfiguration extends BaseAppConfiguration {
.put(
SSM_PATH_RDA_GRPC_SECONDS_BEFORE_CONNECTION_DROP,
String.valueOf(Duration.ofMinutes(4).toSeconds()))
.put(SSM_PATH_SAMHSA_BACKFILL_ENABLED, "false")
.put(SSM_PATH_SAMHSA_BACKFILL_BATCH_SIZE, String.valueOf(10000))
.build();

/**
Expand All @@ -415,16 +427,19 @@ public final class AppConfiguration extends BaseAppConfiguration {
* @param awsClientConfig used to configure AWS services
* @param ccwRifLoadOptions the value to use for {@link #getCcwRifLoadOptions()}
* @param rdaLoadOptions the value to use for {@link #getRdaLoadOptions()}
* @param backFillConfigOptions the value to use fo {@link #getBackfillConfigOptions()}
*/
private AppConfiguration(
MetricOptions metricOptions,
DatabaseOptions databaseOptions,
AwsClientConfig awsClientConfig,
@Nullable CcwRifLoadOptions ccwRifLoadOptions,
@Nullable RdaLoadOptions rdaLoadOptions) {
@Nullable RdaLoadOptions rdaLoadOptions,
@Nullable BackfillConfigOptions backFillConfigOptions) {
super(metricOptions, databaseOptions, awsClientConfig);
this.ccwRifLoadOptions = ccwRifLoadOptions;
this.rdaLoadOptions = rdaLoadOptions;
this.backfillConfigOptions = backFillConfigOptions;
}

/**
Expand All @@ -445,6 +460,15 @@ public Optional<RdaLoadOptions> getRdaLoadOptions() {
return Optional.ofNullable(rdaLoadOptions);
}

/**
* returns the {@link #backfillConfigOptions}.
*
* @return the {@link BackfillConfigOptions}
*/
public Optional<BackfillConfigOptions> getBackfillConfigOptions() {
return Optional.ofNullable(backfillConfigOptions);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder(super.toString());
Expand Down Expand Up @@ -484,6 +508,26 @@ static ConfigLoader createConfigLoaderForTesting(Map<String, String> getenv) {
return ConfigLoader.builder().addMap(DEFAULT_CONFIG_VALUES).addMap(getenv).build();
}

/**
* Loads Backfill Configuration.
*
* @param config Config loader
* @param ccwPipelineEnabled True if the CCW pipeline is enabled in this instance.
* @return {@link BackfillConfigOptions}
*/
public static BackfillConfigOptions loadBackfillConfigOptions(
ConfigLoader config, boolean ccwPipelineEnabled) {
boolean enabled = config.booleanOption(SSM_PATH_SAMHSA_BACKFILL_ENABLED).orElse(false);
// We don't want to run if we're on a CCW Pipeline instance
if (!enabled || ccwPipelineEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit confusing that this runs on the RDA pipeline, but I understand the reasoning. Ideally this could run as its own pipeline, but that would increase the complexity a fair bit. I think this is fine for now and we can revisit once we're running in ECS.

return null;
}
int batchSize = config.intValue(SSM_PATH_SAMHSA_BACKFILL_BATCH_SIZE, 10000);
BackfillConfigOptions backfillConfigOptions =
BackfillConfigOptions.builder().enabled(enabled).batchSize(batchSize).build();
return backfillConfigOptions;
}

/**
* Load configuration variables using the provided {@link ConfigLoader} instance and build an
* {@link AppConfiguration} instance from them.
Expand All @@ -501,7 +545,6 @@ static AppConfiguration loadConfig(ConfigLoader config) {
int hicnHashIterations = config.positiveIntValue(SSM_PATH_HICN_HASH_ITERATIONS);
byte[] hicnHashPepper = config.hexBytes(SSM_PATH_HICN_HASH_PEPPER);
int hicnHashCacheSize = config.intValue(SSM_PATH_HICN_HASH_CACHE_SIZE);

final boolean idempotencyRequired = config.booleanValue(SSM_PATH_IDEMPOTENCY_REQUIRED);

final var benePerformanceSettings = loadBeneficiaryPerformanceSettings(config);
Expand All @@ -511,7 +554,6 @@ static AppConfiguration loadConfig(ConfigLoader config) {
Math.max(
benePerformanceSettings.getLoaderThreads(),
claimPerformanceSettings.getLoaderThreads());

MetricOptions metricOptions = loadMetricOptions(config);
DatabaseOptions databaseOptions = loadDatabaseOptions(config, maxLoaderThreads);

Expand All @@ -527,11 +569,17 @@ static AppConfiguration loadConfig(ConfigLoader config) {
claimPerformanceSettings);

CcwRifLoadOptions ccwRifLoadOptions = loadCcwRifLoadOptions(config, loadOptions);

RdaLoadOptions rdaLoadOptions = loadRdaLoadOptions(config, loadOptions.getIdHasherConfig());
BackfillConfigOptions backfillConfigOptions =
loadBackfillConfigOptions(config, ccwRifLoadOptions != null);
AwsClientConfig awsClientConfig = BaseAppConfiguration.loadAwsClientConfig(config);
return new AppConfiguration(
metricOptions, databaseOptions, awsClientConfig, ccwRifLoadOptions, rdaLoadOptions);
metricOptions,
databaseOptions,
awsClientConfig,
ccwRifLoadOptions,
rdaLoadOptions,
backfillConfigOptions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState;
import gov.cms.bfd.pipeline.sharedutils.PipelineJob;
import gov.cms.bfd.pipeline.sharedutils.s3.AwsS3ClientFactory;
import gov.cms.bfd.pipeline.sharedutils.samhsa.backfill.BackfillConfigOptions;
import gov.cms.bfd.pipeline.sharedutils.samhsa.backfill.SamhsaBackfillJob;
import gov.cms.bfd.sharedutils.config.AppConfigurationException;
import gov.cms.bfd.sharedutils.config.AwsClientConfig;
import gov.cms.bfd.sharedutils.config.ConfigException;
Expand Down Expand Up @@ -180,11 +182,9 @@ private void runPipeline() throws FatalAppException, SQLException, IOException {
LOGGER.warn("Invalid app configuration.", e);
throw new FatalAppException("Invalid app configuration", e, EXIT_CODE_BAD_CONFIG);
}

final MetricOptions metricOptions = appConfig.getMetricOptions();
final var appMeters = new CompositeMeterRegistry();
final var appMetrics = new MetricRegistry();

configureMetrics(metricOptions, configLoader, appMeters, appMetrics);

// Create a pooled data source for use by any registered jobs.
Expand Down Expand Up @@ -426,6 +426,32 @@ private boolean anySmokeTestFailed(List<PipelineJob> jobs) {
return anyTestFailed;
}

/**
* Creates the pipeline job for the SAMHSA backfill.
*
* @param batchSize The query batch size.
* @param appMeters The meter registry.
* @param appMetrics The metrics registry.
* @param pooledDataSource The Hikari data source.
* @param clock Clock.
* @return The Pipeline job.
*/
PipelineJob createBackfillJob(
int batchSize,
MeterRegistry appMeters,
MetricRegistry appMetrics,
HikariDataSource pooledDataSource,
Clock clock) {
final PipelineApplicationState appState =
new PipelineApplicationState(
appMeters,
appMetrics,
pooledDataSource,
PipelineApplicationState.RDA_PERSISTENCE_UNIT_NAME,
clock);
return new SamhsaBackfillJob(appState, batchSize);
}

/**
* Create all pipeline jobs and return them in a list.
*
Expand Down Expand Up @@ -500,6 +526,21 @@ List<PipelineJob> createAllJobs(
} else {
LOGGER.info("RDA API jobs are not enabled in app configuration.");
}
final Optional<BackfillConfigOptions> backfillConfigOptions =
appConfig.getBackfillConfigOptions();
if (backfillConfigOptions.isPresent()) {
final var backfillJob =
createBackfillJob(
backfillConfigOptions.get().getBatchSize(),
appMeters,
appMetrics,
pooledDataSource,
clock);
jobs.add(backfillJob);
LOGGER.warn("Registered SAMHSA backfill job.");
} else {
LOGGER.info("SAMHSA backfill job is disabled.");
}
return jobs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest.DataSetManifestEntry;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest.PreValidationProperties;
import gov.cms.bfd.pipeline.sharedutils.IdHasher;
import gov.cms.bfd.pipeline.sharedutils.TransactionManager;
import gov.cms.bfd.pipeline.sharedutils.model.TagCode;
import gov.cms.bfd.pipeline.sharedutils.samhsa.backfill.SamhsaBackfillService;
import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory;
import jakarta.persistence.EntityTransaction;
import jakarta.persistence.Query;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.Root;
Expand Down Expand Up @@ -134,6 +137,45 @@ public void loadSampleASamhsa() {
validateSamhsaTagsInDatabase(244, SnfTag.class);
}

/** Tests that SAMHSA tags are properly created by the backfill service for SAMHSA test claims. */
@Test
public void testSamhsaBackfill() {

List<StaticRifResource> sampleResources =
Arrays.asList(StaticRifResourceGroup.SAMPLE_A_SAMHSA.getResources());
final var rifFiles =
sampleResources.stream().map(r -> r.toRifFile()).collect(Collectors.toList());
RifFilesEvent rifFilesEvent = new RifFilesEvent(Instant.now(), false, rifFiles);
loadSample(
sampleResources.get(0).getResourceUrl().toString(),
CcwRifLoadTestUtils.getLoadOptions(),
rifFilesEvent);

// Since the SAMHSA tags would have been created by the pipeline, we want to delete them
// so we can test that the backfill properly recreates them.

deleteSamhsaTags();
validateSamhsaTagsInDatabase(0, CarrierTag.class);
validateSamhsaTagsInDatabase(0, DmeTag.class);
validateSamhsaTagsInDatabase(0, HhaTag.class);
validateSamhsaTagsInDatabase(0, HospiceTag.class);
validateSamhsaTagsInDatabase(0, InpatientTag.class);
validateSamhsaTagsInDatabase(0, OutpatientTag.class);
validateSamhsaTagsInDatabase(0, SnfTag.class);
SamhsaBackfillService backfill =
SamhsaBackfillService.createBackfillService(
PipelineTestUtils.get().getPipelineApplicationState(), 100);
backfill.startBackFill(true, false);

validateSamhsaTagsInDatabase(124, CarrierTag.class);
validateSamhsaTagsInDatabase(124, DmeTag.class);
validateSamhsaTagsInDatabase(124, HhaTag.class);
validateSamhsaTagsInDatabase(124, HospiceTag.class);
validateSamhsaTagsInDatabase(244, InpatientTag.class);
validateSamhsaTagsInDatabase(238, OutpatientTag.class);
validateSamhsaTagsInDatabase(244, SnfTag.class);
}

/** Runs {@link RifLoader} against the {@link StaticRifResourceGroup#SAMPLE_A} data. */
@Test
public void loadSampleA() {
Expand Down Expand Up @@ -1098,6 +1140,30 @@ private Stream<RifFile> editStreamToBeUpdate(Stream<RifFile> samplesStream) {
return editSamples(samplesStream, fileEditor);
}

/** Deletes SAMHSA tags from the test database. */
private void deleteSamhsaTags() {
TransactionManager transactionManager =
new TransactionManager(
PipelineTestUtils.get().getPipelineApplicationState().getEntityManagerFactory());
final String DELETE_QUERY = "DELETE FROM %s";
final List<String> TAG_TABLES =
List.of(
"ccw.carrier_tags",
"ccw.dme_tags",
"ccw.hha_tags",
"ccw.hospice_tags",
"ccw.inpatient_tags",
"ccw.outpatient_tags",
"ccw.snf_tags");
transactionManager.executeProcedure(
entityManager -> {
for (String table : TAG_TABLES) {
Query query = entityManager.createNativeQuery(String.format(DELETE_QUERY, table));
query.executeUpdate();
}
});
}

/**
* Validates that the correct number of SAMHSA tags were created for a claim type.
*
Expand Down
4 changes: 4 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-shared-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,18 @@ public static void createSamhsaMap() throws IOException {
*
* @param claim The claim to process.
* @param entityManager the EntityManager used to persist the tag.
* @return true if a claim was persisted.
* @param <TClaim> Generic type of the claim.
*/
public <TClaim> void processRdaClaim(TClaim claim, EntityManager entityManager) {
public <TClaim> boolean processRdaClaim(TClaim claim, EntityManager entityManager) {
switch (claim) {
case RdaFissClaim fissClaim -> {
Optional<List<FissTag>> tags = Optional.of(checkAndProcessFissClaim(fissClaim));
persistTags(tags, entityManager);
return persistTags(tags, entityManager);
}
case RdaMcsClaim mcsClaim -> {
Optional<List<McsTag>> tags = Optional.of(checkAndProcessMcsClaim(mcsClaim));
persistTags(tags, entityManager);
return persistTags(tags, entityManager);
}
default -> throw new RuntimeException("Unknown claim type.");
}
Expand Down Expand Up @@ -161,7 +162,6 @@ public static <TTag> boolean persistTags(Optional<List<TTag>> tags, EntityManage
* @return true if a tag was persisted.
*/
public <TClaim> boolean processCcwClaim(TClaim claim, EntityManager entityManager) {
boolean persisted = false;
try {
SamhsaAdapterBase adapter =
switch (claim) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package gov.cms.bfd.pipeline.sharedutils.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/** POJO to contain information about the tables to use in the backfill SQL queries. */
@Getter
@Setter
@AllArgsConstructor
public class TableEntry {
/** The claim table. */
String claimTable;

/** The class of the claim. */
Class claimClass;

/** The tag table associated with this claim. */
String tagTable;

/** The column that contains the claim id. */
String claimColumnName;
}
Loading