Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement user-defined entity selection strategies in Presidio Structured #1319

Merged
merged 9 commits into from
Mar 20, 2024
21 changes: 21 additions & 0 deletions docs/structured/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ A more detailed sample can be found here:

- <https://github.com/microsoft/presidio/blob/main/docs/samples/python/example_structured.ipynb>

#### Selection Strategy for Entity Detection in Tabular Data

- **Most Common (default):** Identifies the most frequently occurring PII entity in a data column or field.
- **Highest Confidence:** Selects PII entities based on the highest confidence scores, irrespective of their occurrence frequency.
- **Mixed:** Combines the strengths of both the above strategies. It selects the entity with the highest confidence score if that score exceeds a specified threshold (controlled by `mixed_strategy_threshold`); otherwise, it defaults to the most common entity.

##### Usage

Specify the `selection_strategy` and optionally the `mixed_strategy_threshold` in the `generate_analysis()` method:

```python
# Generate a tabular analysis using the most common strategy
tabular_analysis = PandasAnalysisBuilder().generate_analysis(sample_df)

# Generate a tabular analysis using the highest confidence strategy
tabular_analysis = PandasAnalysisBuilder().generate_analysis(sample_df, selection_strategy="highest_confidence")

# Generate a tabular analysis using the mixed strategy
tabular_analysis = PandasAnalysisBuilder().generate_analysis(sample_df, selection_strategy="mixed", mixed_strategy_threshold=0.75)
```

#### Future work

- Improve support for datasets with mixed free-text and structure data (e.g. some columns contain free text)
Expand Down
189 changes: 157 additions & 32 deletions presidio-structured/presidio_structured/analysis_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,26 @@ class TabularAnalysisBuilder(AnalysisBuilder):
class PandasAnalysisBuilder(TabularAnalysisBuilder):
"""Concrete configuration generator for tabular data."""

entity_selection_strategies = {"highest_confidence", "mixed", "most_common"}

def generate_analysis(
omri374 marked this conversation as resolved.
Show resolved Hide resolved
self,
df: DataFrame,
n: Optional[int] = None,
language: str = "en",
selection_strategy: str = "most_common",
mixed_strategy_threshold: float = 0.5,
) -> StructuredAnalysis:
"""
Generate a configuration from the given tabular data.

:param df: The input tabular data (dataframe).
:param n: The number of samples to be taken from the dataframe.
:param language: The language to be used for analysis.
:param selection_strategy: A string that specifies the entity selection strategy
('highest_confidence', 'mixed', or default to most common).
:param mixed_strategy_threshold: A float value for the threshold to be used in
the entity selection mixed strategy.
:return: A StructuredAnalysis object containing the analysis results.
"""
if not n:
Expand All @@ -179,7 +187,12 @@ def generate_analysis(

df = df.sample(n, random_state=123)

key_recognizer_result_map = self._generate_key_rec_results_map(df, language)
key_recognizer_result_map = self._generate_key_rec_results_map(
df,
language,
selection_strategy,
mixed_strategy_threshold
)

key_entity_map = {
key: result.entity_type
Expand All @@ -190,7 +203,11 @@ def generate_analysis(
return StructuredAnalysis(entity_mapping=key_entity_map)

def _generate_key_rec_results_map(
self, df: DataFrame, language: str
self,
df: DataFrame,
language: str,
selection_strategy: str = "most_common",
mixed_strategy_threshold: float = 0.5,
) -> Dict[str, RecognizerResult]:
"""
Find the most common entity in a dataframe column.
Expand All @@ -199,13 +216,19 @@ def _generate_key_rec_results_map(

:param df: The dataframe where entities will be searched.
:param language: Language to be used in the analysis engine.
:param selection_strategy: A string that specifies the entity selection strategy
('highest_confidence', 'mixed', or default to most common).
:param mixed_strategy_threshold: A float value for the threshold to be used in
the entity selection mixed strategy.
:return: A dictionary mapping column names to the most common RecognizerResult.
"""
column_analyzer_results_map = self._batch_analyze_df(df, language)
key_recognizer_result_map = {}
for column, analyzer_result in column_analyzer_results_map.items():
key_recognizer_result_map[column] = self._find_most_common_entity(
analyzer_result
key_recognizer_result_map[column] = self._find_entity_based_on_strategy(
analyzer_result,
selection_strategy,
mixed_strategy_threshold
)
return key_recognizer_result_map

Expand All @@ -230,44 +253,146 @@ def _batch_analyze_df(

return column_analyzer_results_map

def _find_most_common_entity(
self, analyzer_results: List[List[RecognizerResult]]
def _find_entity_based_on_strategy(
self,
analyzer_results: List[List[RecognizerResult]],
selection_strategy: str,
mixed_strategy_threshold: float
) -> RecognizerResult:
"""
Find the most common entity in a list of analyzer results for \
a dataframe column.

It takes the most common entity type and calculates the confidence score based
on the number of cells it appears in.

:param analyzer_results: List of lists of RecognizerResults for each \
cell in the column.
:return: A RecognizerResult with the most common entity type and the \
calculated confidence score.
Determine the most suitable entity based on the specified selection strategy.

:param analyzer_results: A nested list of RecognizerResult objects from the
analysis results.
:param selection_strategy: A string that specifies the entity selection strategy
('highest_confidence', 'mixed', or default to most common).
:return: A RecognizerResult object representing the selected entity based on the
given strategy.
"""
if selection_strategy not in self.entity_selection_strategies:
raise ValueError(
f"Unsupported entity selection strategy: {selection_strategy}."
)

if not any(analyzer_results):
return RecognizerResult(
entity_type=NON_PII_ENTITY_TYPE, start=0, end=1, score=1.0
)
return RecognizerResult(entity_type=NON_PII_ENTITY_TYPE, start=0, end=1,
score=1.0)

# Flatten the list of lists while keeping track of the cell index
flat_results = [
(cell_idx, res)
for cell_idx, cell_results in enumerate(analyzer_results)
for res in cell_results
]
flat_results = self._flatten_results(analyzer_results)

# Count the occurrences of each entity type in different cells
type_counter = Counter(res.entity_type for cell_idx, res in flat_results)
# Select the entity based on the desired strategy
miltonsim marked this conversation as resolved.
Show resolved Hide resolved
if selection_strategy == "highest_confidence":
return self._select_highest_confidence_entity(flat_results)
elif selection_strategy == "mixed":
return self._select_mixed_strategy_entity(flat_results,
mixed_strategy_threshold)

# Find the most common entity type based on the number of cells it appears in
most_common_type, _ = type_counter.most_common(1)[0]
return self._select_most_common_entity(flat_results)

# The score is the ratio of the most common entity type's count to the total
most_common_count = type_counter[most_common_type]
score = most_common_count / len(analyzer_results)
def _select_most_common_entity(self, flat_results):
"""
Select the most common entity from the flattened analysis results.

:param flat_results: A list of tuples containing index and RecognizerResult
objects from the flattened analysis results.
:return: A RecognizerResult object for the most commonly found entity type.
"""
# Count occurrences of each entity type
type_counter = Counter(res.entity_type for _, res in flat_results)
most_common_type, most_common_count = type_counter.most_common(1)[0]

# Calculate the score as the proportion of occurrences
score = most_common_count / len(flat_results)

return RecognizerResult(
entity_type=most_common_type, start=0, end=1, score=score
)

def _select_highest_confidence_entity(self, flat_results):
"""
Select the entity with the highest confidence score.

:param flat_results: A list of tuples containing index and RecognizerResult
objects from the flattened analysis results.
:return: A RecognizerResult object for the entity with the highest confidence
score.
"""
score_aggregator = self._aggregate_scores(flat_results)

# Find the highest score across all entities
highest_score = max(max(scores) for scores in score_aggregator.values()
if scores)

# Find the entities with the highest score and count their occurrences
entities_highest_score = {
entity: scores.count(highest_score)
for entity, scores in score_aggregator.items() if highest_score in scores
}

# Find the entity(ies) with the most number of high scores
max_occurrences = max(entities_highest_score.values())
highest_confidence_entities = [
entity for entity, count in entities_highest_score.items()
if count == max_occurrences
]

return RecognizerResult(
entity_type=highest_confidence_entities[0], start=0, end=1,
score=highest_score
)

def _select_mixed_strategy_entity(self, flat_results, mixed_strategy_threshold):
"""
Select an entity using a mixed strategy.

Chooses an entity based on the highest confidence score if it is above the
threshold. Otherwise, it defaults to the most common entity.

:param flat_results: A list of tuples containing index and RecognizerResult
objects from the flattened analysis results.
:return: A RecognizerResult object selected based on the mixed strategy.
"""
# Check if mixed strategy threshold is within the valid range
if not 0 <= mixed_strategy_threshold <= 1:
raise ValueError(
f"Invalid mixed strategy threshold: {mixed_strategy_threshold}."
)

score_aggregator = self._aggregate_scores(flat_results)

# Check if the highest score is greater than threshold and select accordingly
highest_score = max(max(scores) for scores in score_aggregator.values()
if scores)
if highest_score > mixed_strategy_threshold:
return self._select_highest_confidence_entity(flat_results)
else:
return self._select_most_common_entity(flat_results)

@staticmethod
def _aggregate_scores(flat_results):
"""
Aggregate the scores for each entity type from the flattened analysis results.

:param flat_results: A list of tuples containing index and RecognizerResult
objects from the flattened analysis results.
:return: A dictionary with entity types as keys and lists of scores as values.
"""
score_aggregator = {}
for _, res in flat_results:
if res.entity_type not in score_aggregator:
score_aggregator[res.entity_type] = []
score_aggregator[res.entity_type].append(res.score)
return score_aggregator

@staticmethod
def _flatten_results(analyzer_results):
"""
Flattens a nested lists of RecognizerResult objects into a list of tuples.

:param analyzer_results: A nested list of RecognizerResult objects from
the analysis results.
:return: A flattened list of tuples containing index and RecognizerResult
objects.
"""
return [(cell_idx, res) for cell_idx, cell_results in
enumerate(analyzer_results) for res in cell_results]
11 changes: 11 additions & 0 deletions presidio-structured/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ def sample_df():
}
return pd.DataFrame(data)

@pytest.fixture
def sample_df_strategy():
data = {
'name': ['John Doe', 'Jane Smith', 'Alice Johnson'],
'email': ['[email protected]', '[email protected]', '[email protected]'],
'city': ['Anytown', 'Somewhere', 'Elsewhere'],
'state': ['CA', 'TX', 'NY'],
'postal_code': [12345, 67890, 11223]
}

return pd.DataFrame(data)

@pytest.fixture
def sample_json():
Expand Down
41 changes: 39 additions & 2 deletions presidio-structured/tests/test_analysis_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,53 @@ def test_generate_analysis_tabular_with_invalid_sampling(
tabular_analysis_builder.generate_analysis(sample_df, n=-1)


def test_find_most_common_entity(tabular_analysis_builder, sample_df):
def test_find_most_common_entity(tabular_analysis_builder, sample_df_strategy):
key_recognizer_result_map = tabular_analysis_builder._generate_key_rec_results_map(
sample_df, "en"
sample_df_strategy, "en", selection_strategy = "most_common"
)
assert len(key_recognizer_result_map) == 5
assert key_recognizer_result_map["name"].entity_type == "PERSON"
assert key_recognizer_result_map["email"].entity_type == "URL"
assert key_recognizer_result_map["city"].entity_type == "LOCATION"
assert key_recognizer_result_map["postal_code"].entity_type == "NON_PII"

def test_find_highest_confidence_entity(tabular_analysis_builder, sample_df_strategy):
key_recognizer_result_map = tabular_analysis_builder._generate_key_rec_results_map(
sample_df_strategy, "en", selection_strategy = "highest_confidence"
)
assert len(key_recognizer_result_map) == 5
assert key_recognizer_result_map["name"].entity_type == "PERSON"
assert key_recognizer_result_map["email"].entity_type == "EMAIL_ADDRESS"
assert key_recognizer_result_map["city"].entity_type == "LOCATION"
assert key_recognizer_result_map["postal_code"].entity_type == "NON_PII"

def test_find_mixed_strategy_entity(tabular_analysis_builder, sample_df_strategy):
key_recognizer_result_map = tabular_analysis_builder._generate_key_rec_results_map(
sample_df_strategy, "en", selection_strategy = "mixed"
)
assert len(key_recognizer_result_map) == 5
assert key_recognizer_result_map["name"].entity_type == "PERSON"
assert key_recognizer_result_map["email"].entity_type == "EMAIL_ADDRESS"
assert key_recognizer_result_map["city"].entity_type == "LOCATION"
assert key_recognizer_result_map["postal_code"].entity_type == "NON_PII"

def test_find_mixed_strategy_entity_with_custom_mixed_strategy_threshold(tabular_analysis_builder, sample_df):
key_recognizer_result_map = tabular_analysis_builder._generate_key_rec_results_map(
sample_df, "en", selection_strategy = "mixed", mixed_strategy_threshold = 0.4
)
assert len(key_recognizer_result_map) == 3
assert key_recognizer_result_map["name"].entity_type == "PERSON"
assert key_recognizer_result_map["email"].entity_type == "EMAIL_ADDRESS"
assert key_recognizer_result_map["phone"].entity_type == "PHONE_NUMBER"

def test_find_entity_with_invalid_strategy_raises_exception(tabular_analysis_builder, sample_df_strategy):
selection_strategy = "invalid"
with pytest.raises(ValueError) as excinfo:
key_recognizer_result_map = tabular_analysis_builder._generate_key_rec_results_map(
sample_df_strategy, "en", selection_strategy = selection_strategy
)

assert f"Unsupported entity selection strategy: {selection_strategy}." in str(excinfo.value)

def test_find_most_common_entity_with_empty_df(tabular_analysis_builder):
df = pd.DataFrame()
Expand Down
Loading