Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
jozefKruszynski committed Nov 8, 2024
1 parent 3e30cb7 commit f372fd1
Showing 1 changed file with 148 additions and 102 deletions.
250 changes: 148 additions & 102 deletions custom_components/mass/intent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,163 @@
from . import MusicAssistantConfigEntry

INTENT_PLAY_MEDIA_ON_MEDIA_PLAYER = "MassPlayMediaOnMediaPlayer"
INTENT_PLAY_MEDIA_ASSIST = "MassPlayMediaAssist"
NAME_SLOT = "name"
AREA_SLOT = "area"
QUERY_SLOT = "query"
ARTIST_SLOT = "artist"
TRACK_SLOT = "track"
ALBUM_SLOT = "album"
RADIO_SLOT = "radio"
PLAYLIST_SLOT = "playlist"
DONT_STOP_SLOT = "dont_stop"
SLOT_VALUE = "value"


async def async_setup_intents(hass: HomeAssistant) -> None:
"""Set up the Music Assistant intents."""
intent.async_register(hass, MassPlayMediaOnMediaPlayerHandler(hass))
intent.async_register(hass, MassPlayMediaAssistHandler(hass))
if any(
config_entry.data.get(CONF_OPENAI_AGENT_ID)
for config_entry in hass.config_entries.async_entries(DOMAIN)
):
intent.async_register(hass, MassPlayMediaOnMediaPlayerHandler(hass))


class MassPlayMediaOnMediaPlayerHandler(intent.IntentHandler):
class MassIntentHandlerBase(intent.IntentHandler):
"""Base class for Mass intent handlers."""

async def _get_loaded_config_entry(self, hass: HomeAssistant) -> ConfigEntry:
"""Get the correct config entry."""
config_entries = hass.config_entries.async_entries(DOMAIN)
for config_entry in config_entries:
if config_entry.state == ConfigEntryState.LOADED:
return config_entry
raise intent.IntentHandleError("Music Assistant not loaded")

async def _async_get_matched_mass_player(
self, intent_obj: intent.Intent, slots: intent._SlotsType
) -> str:
name: str | None = slots.get(NAME_SLOT, {}).get(SLOT_VALUE)
if name == "all":
# Don't match on name if targeting all entities
name = None
area_name = slots.get(AREA_SLOT, {}).get(SLOT_VALUE)
state = await self._get_matched_state(intent_obj, name, area_name)
entity_registry = er.async_get(self.hass)
if entity := entity_registry.async_get(state.entity_id):
return entity.unique_id.split("mass_", 1)[1]
raise intent.IntentHandleError(
f"No entities matched for: name={name}, area_name={area_name}"
)

async def _get_media_items(
self, mass: MusicAssistantClient, media_id: str | list[str], media_type
) -> MediaItemType | list[MediaItemType]:
if isinstance(media_id, list):
return [
(
await mass.music.get_item_by_name(
item, media_type=MediaType(media_type)
)
).to_dict()
for item in media_id
]
return await mass.music.get_item_by_name(
media_id, media_type=MediaType(media_type)
)

async def _get_matched_state(
self, intent_obj: intent.Intent, name: str | None, area_name: str | None
) -> State:
mass_states = {
state
for state in intent_obj.hass.states.async_all(MEDIA_PLAYER_DOMAIN)
if state.attributes.get(ATTR_MASS_PLAYER_TYPE) is not None
}
states = list(
intent.async_match_states(
intent_obj.hass,
name=name,
area_name=area_name,
states=mass_states,
)
)
if not states:
raise intent.IntentHandleError(
f"No entities matched for: name={name}, area_name={area_name}"
)
if len(states) > 1:
raise intent.IntentHandleError(
f"Multiple entities matched for: name={name}, area_name={area_name}"
)
return states[0]


class MassPlayMediaAssistHandler(MassIntentHandlerBase):
"""Handle Assist Play Media intents."""

intent_type = INTENT_PLAY_MEDIA_ASSIST

def __init__(self, hass: HomeAssistant) -> None:
"""Initialize MassPlayMediaAssistHandler."""
self.hass = hass

slot_schema = {
vol.Any(NAME_SLOT, AREA_SLOT): cv.string,
vol.Optional(ARTIST_SLOT): cv.string,
vol.Optional(TRACK_SLOT): cv.string,
vol.Optional(ALBUM_SLOT): cv.string,
vol.Optional(RADIO_SLOT): cv.string,
vol.Optional(PLAYLIST_SLOT): cv.string,
vol.Optional(DONT_STOP_SLOT): cv.boolean,
}

async def async_handle(self, intent_obj: intent.Intent) -> intent.IntentResponse:
"""Handle the intent."""
# pylint: disable=too-many-locals
response = intent_obj.create_response()
slots = self.async_validate_slots(intent_obj.slots)
config_entry: MusicAssistantConfigEntry = await self._get_loaded_config_entry(
intent_obj.hass
)
mass = config_entry.runtime_data.mass
mass_player_id = await self._async_get_matched_mass_player(intent_obj, slots)
artist = slots.get(ARTIST_SLOT, {}).get(SLOT_VALUE, "")
track = slots.get(TRACK_SLOT, {}).get(SLOT_VALUE, "")
album = slots.get(ALBUM_SLOT, {}).get(SLOT_VALUE, "")
radio_mode = False
if track:
media_item = await mass.music.get_item_by_name(
track, artist=artist, album=album, media_type=MediaType.TRACK
)
elif album:
media_item = await mass.music.get_item_by_name(
album, artist=artist, media_type=MediaType.ALBUM
)
elif artist:
media_item = await mass.music.get_item_by_name(
artist, artist=artist, album=album, media_type=MediaType.ARTIST
)
else:
raise intent.IntentHandleError("No media item parsed from query")
try:
await mass.player_queues.play_media(
queue_id=mass_player_id,
media=(
media_item if isinstance(media_item, list) else media_item.to_dict()
),
radio_mode=radio_mode,
)
except MusicAssistantError as err:
raise intent.IntentHandleError(err.args[0] if err.args else "") from err

response.response_type = intent.IntentResponseType.ACTION_DONE
response.async_set_speech("Okay")
return response


class MassPlayMediaOnMediaPlayerHandler(MassIntentHandlerBase):
"""Handle PlayMediaOnMediaPlayer intents."""

intent_type = INTENT_PLAY_MEDIA_ON_MEDIA_PLAYER
Expand All @@ -53,21 +195,10 @@ def __init__(self, hass: HomeAssistant) -> None:
"""Initialize MassPlayMediaOnMediaPlayerHandler."""
self.hass = hass

@property
def slot_schema(self) -> dict | None:
"""Return a slot schema."""
slot_schema = {
vol.Any(NAME_SLOT, AREA_SLOT): cv.string,
vol.Optional(ARTIST_SLOT): cv.string,
vol.Optional(TRACK_SLOT): cv.string,
vol.Optional(ALBUM_SLOT): cv.string,
}
if any(
config_entry.data.get(CONF_OPENAI_AGENT_ID)
for config_entry in self.hass.config_entries.async_entries(DOMAIN)
):
slot_schema[vol.Optional(QUERY_SLOT)] = cv.string
return slot_schema
slot_schema = {
vol.Any(NAME_SLOT, AREA_SLOT): cv.string,
vol.Optional(QUERY_SLOT): cv.string,
}

async def async_handle(self, intent_obj: intent.Intent) -> intent.IntentResponse:
"""Handle the intent."""
Expand Down Expand Up @@ -98,25 +229,6 @@ async def async_handle(self, intent_obj: intent.Intent) -> intent.IntentResponse
media_type = json_payload.get(ATTR_MEDIA_TYPE)
media_item = await self._get_media_items(mass, media_id, media_type)
radio_mode = json_payload.get(ATTR_RADIO_MODE, False)
else:
artist = slots.get(ARTIST_SLOT, {}).get(SLOT_VALUE, "")
track = slots.get(TRACK_SLOT, {}).get(SLOT_VALUE, "")
album = slots.get(ALBUM_SLOT, {}).get(SLOT_VALUE, "")
if track:
media_item = await mass.music.get_item_by_name(
track, artist=artist, album=album, media_type=MediaType.TRACK
)
elif album:
media_item = await mass.music.get_item_by_name(
album, artist=artist, media_type=MediaType.ALBUM
)
elif artist:
media_item = await mass.music.get_item_by_name(
artist, artist=artist, album=album, media_type=MediaType.ARTIST
)
else:
raise intent.IntentHandleError("No media item parsed from query")

try:
await mass.player_queues.play_media(
queue_id=mass_player_id,
Expand All @@ -132,38 +244,6 @@ async def async_handle(self, intent_obj: intent.Intent) -> intent.IntentResponse
response.async_set_speech("Okay")
return response

async def _get_media_items(
self, mass: MusicAssistantClient, media_id: str | list[str], media_type
) -> MediaItemType | list[MediaItemType]:
if isinstance(media_id, list):
return [
(
await mass.music.get_item_by_name(
item, media_type=MediaType(media_type)
)
).to_dict()
for item in media_id
]
return await mass.music.get_item_by_name(
media_id, media_type=MediaType(media_type)
)

async def _async_get_matched_mass_player(
self, intent_obj: intent.Intent, slots: intent._SlotsType
) -> str:
name: str | None = slots.get(NAME_SLOT, {}).get(SLOT_VALUE)
if name == "all":
# Don't match on name if targeting all entities
name = None
area_name = slots.get(AREA_SLOT, {}).get(SLOT_VALUE)
state = await self._get_matched_state(intent_obj, name, area_name)
entity_registry = er.async_get(self.hass)
if entity := entity_registry.async_get(state.entity_id):
return entity.unique_id.split("mass_", 1)[1]
raise intent.IntentHandleError(
f"No entities matched for: name={name}, area_name={area_name}"
)

async def _async_query_ai(
self, intent_obj: intent.Intent, query: str, config_entry: ConfigEntry
) -> str:
Expand All @@ -179,37 +259,3 @@ async def _async_query_ai(
return_response=True,
)
return ai_response["response"]["speech"]["plain"]["speech"]

async def _get_loaded_config_entry(self, hass: HomeAssistant) -> ConfigEntry:
"""Get the correct config entry."""
config_entries = hass.config_entries.async_entries(DOMAIN)
for config_entry in config_entries:
if config_entry.state == ConfigEntryState.LOADED:
return config_entry
raise intent.IntentHandleError("Music Assistant not loaded")

async def _get_matched_state(
self, intent_obj: intent.Intent, name: str | None, area_name: str | None
) -> State:
mass_states = {
state
for state in intent_obj.hass.states.async_all(MEDIA_PLAYER_DOMAIN)
if state.attributes.get(ATTR_MASS_PLAYER_TYPE) is not None
}
states = list(
intent.async_match_states(
intent_obj.hass,
name=name,
area_name=area_name,
states=mass_states,
)
)
if not states:
raise intent.IntentHandleError(
f"No entities matched for: name={name}, area_name={area_name}"
)
if len(states) > 1:
raise intent.IntentHandleError(
f"Multiple entities matched for: name={name}, area_name={area_name}"
)
return states[0]

0 comments on commit f372fd1

Please sign in to comment.