Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:finish extracting intent classes from adapt package #248

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 209 additions & 29 deletions ovos_workshop/intents.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import abc
import itertools
from os.path import exists
from threading import RLock
from typing import List, Tuple, Optional
import abc

from ovos_bus_client.message import Message, dig_for_message
from ovos_bus_client.util import get_mycroft_bus
from ovos_utils.log import LOG, log_deprecation

try:
# backwards compat isinstancechecks
from adapt.intent import IntentBuilder as _IB, Intent as _I
except ImportError:
# adapt is optional
_I = object
_IB = object


class _IntentMeta(abc.ABCMeta):
def __instancecheck__(self, instance):
return isinstance(instance, _I) or \
super().__instancecheck__(instance)


class Intent(_I, metaclass=_IntentMeta):
def __init__(self, name="", requires=None, at_least_one=None, optional=None):
check = super().__instancecheck__(instance)
if not check:
try:
# backwards compat isinstancechecks
from adapt.intent import Intent as _I
check = isinstance(instance, _I)
except ImportError:
pass
return check


class Intent(metaclass=_IntentMeta):
def __init__(self, name="", requires=None, at_least_one=None, optional=None, excludes=None):
"""Create Intent object

Args:
name(str): Name for Intent
requires(list): Entities that are required
Expand All @@ -34,52 +36,207 @@ def __init__(self, name="", requires=None, at_least_one=None, optional=None):
self.requires = requires or []
self.at_least_one = at_least_one or []
self.optional = optional or []
self.excludes = excludes or []

def validate(self, tags, confidence):
"""Using this method removes tags from the result of validate_with_tags

Returns:
intent(intent): Results from validate_with_tags
"""
if _I is not object:
return super().validate(tags, confidence)
raise NotImplementedError("please install adapt-parser")
intent, tags = self.validate_with_tags(tags, confidence)
return intent

def validate_with_tags(self, tags, confidence):
"""Validate whether tags has required entites for this intent to fire

Args:
tags(list): Tags and Entities used for validation
confidence(float): The weight associate to the parse result,
as indicated by the parser. This is influenced by a parser
that uses edit distance or context.

Returns:
intent, tags: Returns intent and tags used by the intent on
failure to meat required entities then returns intent with
confidence
of 0.0 and an empty list for tags.
"""
if _I is not object:
return super().validate_with_tags(tags, confidence)
raise NotImplementedError("please install adapt-parser")
result = {'intent_type': self.name}
intent_confidence = 0.0
local_tags = tags[:]
used_tags = []

# Check excludes first
for exclude_type in self.excludes:
exclude_tag, _canonical_form, _tag_confidence = \
self._find_first_tag(local_tags, exclude_type)
if exclude_tag:
result['confidence'] = 0.0
return result, []

for require_type, attribute_name in self.requires:
required_tag, canonical_form, tag_confidence = \
self._find_first_tag(local_tags, require_type)
if not required_tag:
result['confidence'] = 0.0
return result, []

result[attribute_name] = canonical_form
if required_tag in local_tags:
local_tags.remove(required_tag)
used_tags.append(required_tag)
intent_confidence += tag_confidence

if len(self.at_least_one) > 0:
best_resolution = self._resolve_one_of(local_tags, self.at_least_one)
if not best_resolution:
result['confidence'] = 0.0
return result, []
else:
for key in best_resolution:
# TODO: at least one should support aliases
result[key] = best_resolution[key][0].get('key')
intent_confidence += 1.0 * best_resolution[key][0]['entities'][0].get('confidence', 1.0)
used_tags.append(best_resolution[key][0])
if best_resolution in local_tags:
local_tags.remove(best_resolution[key][0])

for optional_type, attribute_name in self.optional:
optional_tag, canonical_form, tag_confidence = \
self._find_first_tag(local_tags, optional_type)
if not optional_tag or attribute_name in result:
continue
result[attribute_name] = canonical_form
if optional_tag in local_tags:
local_tags.remove(optional_tag)
used_tags.append(optional_tag)
intent_confidence += tag_confidence

total_confidence = (intent_confidence / len(tags) * confidence) \
if tags else 0.0

CLIENT_ENTITY_NAME = 'Client' # TODO - ??? what is this magic string

target_client, canonical_form, confidence = \
self._find_first_tag(local_tags, CLIENT_ENTITY_NAME)
Comment on lines +119 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clarify the purpose of CLIENT_ENTITY_NAME and address the TODO comment

The constant CLIENT_ENTITY_NAME is set to 'Client', accompanied by a TODO comment questioning its purpose:

CLIENT_ENTITY_NAME = 'Client'  # TODO - ??? what is this magic string

It's important to address this TODO comment to improve code clarity. If CLIENT_ENTITY_NAME is necessary, please provide documentation explaining its role. If it's unused or obsolete, consider removing it to clean up the codebase.

Would you like assistance in determining the necessity of CLIENT_ENTITY_NAME and updating the code accordingly?


result['target'] = target_client.get('key') if target_client else None
result['confidence'] = total_confidence

return result, used_tags

@classmethod
def _resolve_one_of(cls, tags, at_least_one):
"""Search through all combinations of at_least_one rules to find a
combination that is covered by tags

Args:
tags(list): List of tags with Entities to search for Entities
at_least_one(list): List of Entities to find in tags

class _IntentBuilderMeta(abc.ABCMeta):
def __instancecheck__(self, instance):
return isinstance(instance, _IB) or \
super().__instancecheck__(instance)
Returns:
object:
returns None if no match is found but returns any match as an object
"""

for possible_resolution in cls._choose_1_from_each(at_least_one):
resolution = {}
pr = possible_resolution[:]
for entity_type in pr:
last_end_index = -1
if entity_type in resolution:
last_end_index = resolution[entity_type][-1].get('end_token')
tag, value, c = cls._find_first_tag(tags, entity_type,
after_index=last_end_index)
if not tag:
break
else:
if entity_type not in resolution:
resolution[entity_type] = []
resolution[entity_type].append(tag)
# Check if this is a valid resolution (all one_of rules matched)
if len(resolution) == len(possible_resolution):
return resolution

return None

@staticmethod
def _choose_1_from_each(lists):
"""
The original implementation here was functionally equivalent to
:func:`~itertools.product`, except that the former returns a generator
of lists, and itertools returns a generator of tuples. This is going to do
a light transform for now, until callers can be verified to work with
tuples.

Args:
A list of lists or tuples, expected as input to
:func:`~itertools.product`

Returns:
a generator of lists, see docs on :func:`~itertools.product`
"""
for result in itertools.product(*lists):
yield list(result)

JarbasAl marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
def _find_first_tag(tags, entity_type, after_index=-1):
"""Searches tags for entity type after given index

Args:
tags(list): a list of tags with entity types to be compared to
entity_type
entity_type(str): This is he entity type to be looking for in tags
after_index(int): the start token must be greater than this.

Returns:
( tag, v, confidence ):
tag(str): is the tag that matched
v(str): ? the word that matched?
confidence(float): is a measure of accuracy. 1 is full confidence
and 0 is none.
"""
for tag in tags:
for entity in tag.get('entities'):
for v, t in entity.get('data'):
if t.lower() == entity_type.lower() and \
(tag.get('start_token', 0) > after_index or \
tag.get('from_context', False)):
return tag, v, entity.get('confidence')

return None, None, None

class IntentBuilder(_IB, metaclass=_IntentBuilderMeta):

class _IntentBuilderMeta(abc.ABCMeta):
def __instancecheck__(self, instance):
check = super().__instancecheck__(instance)
if not check:
try:
# backwards compat isinstancechecks
from adapt.intent import IntentBuilder as _IB
check = isinstance(instance, _IB)
except ImportError:
pass
return check


class IntentBuilder(metaclass=_IntentBuilderMeta):
"""
IntentBuilder, used to construct intent parsers.

Attributes:
at_least_one(list): A list of Entities where one is required.
These are separated into lists so you can have one of (A or B) and
then require one of (D or F).
requires(list): A list of Required Entities
optional(list): A list of optional Entities
excludes(list): A list of forbidden Entities
name(str): Name of intent

Notes:
This is designed to allow construction of intents in one line.

Example:
IntentBuilder("Intent")\
.requires("A")\
Expand All @@ -90,21 +247,25 @@ class IntentBuilder(_IB, metaclass=_IntentBuilderMeta):
def __init__(self, intent_name):
"""
Constructor

Args:
intent_name(str): the name of the intents that this parser
parses/validates
"""
self.at_least_one = []
self.requires = []
self.excludes = []
self.optional = []
self.name = intent_name

def one_of(self, *args):
"""
The intent parser should require one of the provided entity types to
validate this clause.

Args:
args(args): *args notation list of entity names

Returns:
self: to continue modifications.
"""
Expand All @@ -114,10 +275,12 @@ def one_of(self, *args):
def require(self, entity_type, attribute_name=None):
"""
The intent parser should require an entity of the provided type.

Args:
entity_type(str): an entity type
attribute_name(str): the name of the attribute on the parsed intent.
Defaults to match entity_type.

Returns:
self: to continue modifications.
"""
Expand All @@ -126,14 +289,29 @@ def require(self, entity_type, attribute_name=None):
self.requires += [(entity_type, attribute_name)]
return self

def exclude(self, entity_type):
"""
The intent parser must not contain an entity of the provided type.

Args:
entity_type(str): an entity type

Returns:
self: to continue modifications.
"""
self.excludes.append(entity_type)
return self

def optionally(self, entity_type, attribute_name=None):
"""
Parsed intents from this parser can optionally include an entity of the
provided type.

Args:
entity_type(str): an entity type
attribute_name(str): the name of the attribute on the parsed intent.
Defaults to match entity_type.

Returns:
self: to continue modifications.
"""
Expand All @@ -145,10 +323,12 @@ def optionally(self, entity_type, attribute_name=None):
def build(self):
"""
Constructs an intent from the builder's specifications.

:return: an Intent instance.
"""
return Intent(self.name, self.requires,
self.at_least_one, self.optional)
self.at_least_one, self.optional,
self.excludes)


def to_alnum(skill_id: str) -> str:
Expand Down Expand Up @@ -512,10 +692,10 @@ def open_intent_envelope(message):
return Intent(intent_dict.get('name'),
intent_dict.get('requires'),
intent_dict.get('at_least_one'),
intent_dict.get('optional'))
intent_dict.get('optional'),
intent_dict.get('excludes'))


if __name__ == "__main__":
i1 = _I("a", [], [], []) # skills using adapt directly
assert isinstance(i1, Intent) # backwards compat via metaclass

Loading