Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjusted to allow for a more similar feel to Google Photos integration #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions custom_components/immich/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,21 @@
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import config_validation as cv

from .const import CONF_WATCHED_ALBUMS, DOMAIN
from .const import (
CONF_WATCHED_ALBUMS,
DOMAIN,
CONF_CROP_MODE,
CONF_IMAGE_SELECTION_MODE,
CONF_UPDATE_INTERVAL,
CONF_UPDATE_INTERVAL_UNIT,
DEFAULT_CROP_MODE,
DEFAULT_IMAGE_SELECTION_MODE,
DEFAULT_UPDATE_INTERVAL,
DEFAULT_UPDATE_INTERVAL_UNIT,
CROP_MODES,
IMAGE_SELECTION_MODES,
UPDATE_INTERVAL_UNITS,
)
from .hub import CannotConnect, ImmichHub, InvalidAuth

_LOGGER = logging.getLogger(__name__)
Expand All @@ -26,13 +40,8 @@
}
)


async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the user input allows us to connect.

Data has the keys from STEP_USER_DATA_SCHEMA with values provided by the user.
"""

"""Validate the user input allows us to connect."""
url = url_normalize(data[CONF_HOST])
api_key = data[CONF_API_KEY]

Expand All @@ -45,17 +54,15 @@ async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str,
username = user_info["name"]
clean_hostname = urlparse(url).hostname

# Return info that you want to store in the config entry.
return {
"title": f"{username} @ {clean_hostname}",
"data": {CONF_HOST: url, CONF_API_KEY: api_key},
}


class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for immich."""

VERSION = 1
VERSION = 3.0

async def async_step_user(
self, user_input: dict[str, Any] | None = None
Expand Down Expand Up @@ -87,7 +94,6 @@ def async_get_options_flow(
"""Create the options flow."""
return OptionsFlowHandler(config_entry)


class OptionsFlowHandler(config_entries.OptionsFlow):
"""Immich options flow handler."""

Expand All @@ -102,34 +108,35 @@ async def async_step_init(
if user_input is not None:
return self.async_create_entry(title="", data=user_input)

# Get a connection to the hub in order to list the available albums
url = url_normalize(self.config_entry.data[CONF_HOST])
api_key = self.config_entry.data[CONF_API_KEY]
hub = ImmichHub(host=url, api_key=api_key)

if not await hub.authenticate():
raise InvalidAuth

# Get the list of albums and create a mapping of album id to album name
albums = await hub.list_all_albums()
album_map = {album["id"]: album["albumName"] for album in albums}

# Filter out any album ids that are no longer returned by the API
current_albums_value = [
album
for album in self.config_entry.options.get(CONF_WATCHED_ALBUMS, [])
if album in album_map
]

# Allow the user to select which albums they want to create entities for
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
{
vol.Required(
CONF_WATCHED_ALBUMS,
default=current_albums_value,
): cv.multi_select(album_map)
}
),
current_crop_mode = self.config_entry.options.get(CONF_CROP_MODE, DEFAULT_CROP_MODE)
current_image_selection_mode = self.config_entry.options.get(CONF_IMAGE_SELECTION_MODE, DEFAULT_IMAGE_SELECTION_MODE)
current_update_interval = self.config_entry.options.get(CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL)
current_update_interval_unit = self.config_entry.options.get(CONF_UPDATE_INTERVAL_UNIT, DEFAULT_UPDATE_INTERVAL_UNIT)

options_schema = vol.Schema(
{
vol.Required(CONF_CROP_MODE, default=current_crop_mode): vol.In(CROP_MODES),
vol.Required(CONF_IMAGE_SELECTION_MODE, default=current_image_selection_mode): vol.In(IMAGE_SELECTION_MODES),
vol.Required(CONF_UPDATE_INTERVAL, default=current_update_interval): vol.Coerce(int),
vol.Required(CONF_UPDATE_INTERVAL_UNIT, default=current_update_interval_unit): vol.In(UPDATE_INTERVAL_UNITS),
vol.Required(CONF_WATCHED_ALBUMS, default=current_albums_value): cv.multi_select(album_map),
}
)

return self.async_show_form(step_id="init", data_schema=options_schema)
22 changes: 22 additions & 0 deletions custom_components/immich/const.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
"""Constants for the immich integration."""

import voluptuous as vol

DOMAIN = "immich"
CONF_WATCHED_ALBUMS = "watched_albums"

# Crop Mode Constants
CROP_MODES = ["Combine images", "Crop single image", "None"]
CONF_CROP_MODE = "crop_mode"
DEFAULT_CROP_MODE = "Combine images"

# Image Selection Constants
CONF_IMAGE_SELECTION_MODE = "image_selection_mode"
IMAGE_SELECTION_MODES = ["Random", "Sequential"]
DEFAULT_IMAGE_SELECTION_MODE = "Random"

# Update Interval Constants
CONF_UPDATE_INTERVAL = "update_interval"
CONF_UPDATE_INTERVAL_UNIT = "update_interval_unit"
DEFAULT_UPDATE_INTERVAL = 60 # in seconds
DEFAULT_UPDATE_INTERVAL_UNIT = "seconds"
UPDATE_INTERVAL_UNITS = ["seconds", "minutes"]

# Validation for update interval (min=1 second, max=24 hours)
UPDATE_INTERVAL_VALIDATOR = vol.All(vol.Coerce(int), vol.Range(min=1, max=86400))
122 changes: 122 additions & 0 deletions custom_components/immich/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from PIL import Image, ImageOps
from io import BytesIO
from typing import List, Tuple
import logging

Image.MAX_IMAGE_PIXELS = None
import requests

_LOGGER = logging.getLogger(__name__)

# Global variable to store a held portrait image
held_portrait_image = None

def fetch_image_from_immich(image_url: str) -> Image.Image:
"""Fetches an image from the Immich API."""
response = requests.get(image_url)

if response.status_code == 200:
img = Image.open(BytesIO(response.content))
img = ImageOps.exif_transpose(img)
return img
else:
raise Exception(f"Failed to fetch image from {image_url}")

def is_portrait(image: Image.Image) -> bool:
"""Check if the image is in portrait orientation."""
width, height = image.size
return height > width

def correct_image_orientation(image: Image.Image) -> Image.Image:
"""Correct the image orientation based on EXIF data."""
try:
exif = dict(image._getexif().items())
orientation = exif.get(274, 1) # 274 is the EXIF tag for orientation

if orientation == 1:
return image
elif orientation == 2:
return ImageOps.mirror(image)
elif orientation == 3:
return image.rotate(180, expand=True)
elif orientation == 4:
return ImageOps.mirror(image.rotate(180, expand=True))
elif orientation == 5:
return ImageOps.mirror(image.rotate(270, expand=True))
elif orientation == 6:
return image.rotate(270, expand=True)
elif orientation == 7:
return ImageOps.mirror(image.rotate(90, expand=True))
elif orientation == 8:
return image.rotate(90, expand=True)
else:
return image
except (AttributeError, KeyError, IndexError, TypeError):
# Cases: image don't have getexif
_LOGGER.warning("EXIF data not available or incomplete. Using original image orientation.")
return image

def combine_portrait_images(images: List[Image.Image], width: int, height: int) -> Image.Image:
"""Combines two portrait images side-by-side into a single image, vertically centered."""
assert len(images) >= 2, "This function expects at least two images"

# Resize images to fit within half the width and full height
resized_images = [ImageOps.contain(img, (width // 2, height), Image.Resampling.LANCZOS) for img in images[:2]]

combined_image = Image.new('RGB', (width, height))

for i, img in enumerate(resized_images):
# Calculate vertical offset to center the image
y_offset = (height - img.height) // 2
combined_image.paste(img, (i * (width // 2), y_offset))

return combined_image

def process_single_image(image: Image.Image, width: int, height: int) -> Image.Image:
"""Process a single image, ensuring it's not cut off."""
return ImageOps.contain(image, (width, height), Image.Resampling.LANCZOS)

def process_images_for_slideshow(
image_bytes_list: List[bytes],
width: int,
height: int,
crop_mode: str = "Combine images",
image_selection_mode: str = "Random"
) -> Tuple[Image.Image, bool]:
"""
Processes images for the slideshow, applying crop or combining as needed.
Returns a tuple of (processed_image, is_combined).
"""
global held_portrait_image

images = [correct_image_orientation(Image.open(BytesIO(image_bytes))) for image_bytes in image_bytes_list]

_LOGGER.debug(f"Processing {len(images)} images. Crop mode: {crop_mode}, Selection mode: {image_selection_mode}")

for i, img in enumerate(images):
_LOGGER.debug(f"Image {i+1}: Size={img.size}, Mode={img.mode}, Format={img.format}, Orientation={'Portrait' if is_portrait(img) else 'Landscape'}")

if crop_mode == "Combine images":
portrait_images = [img for img in images if is_portrait(img)]

if held_portrait_image:
portrait_images.insert(0, held_portrait_image)

if len(portrait_images) >= 2:
held_portrait_image = None
return combine_portrait_images(portrait_images[:2], width, height), True
elif len(portrait_images) == 1:
held_portrait_image = portrait_images[0]
landscape_images = [img for img in images if not is_portrait(img)]
if landscape_images:
return process_single_image(landscape_images[0], width, height), False
else:
# If no landscape image is available, return None to indicate no image should be displayed
return None, False
else:
# Only landscape images available
return process_single_image(images[0], width, height), False
elif crop_mode == "Crop single image":
return ImageOps.fit(images[0], (width, height), Image.Resampling.LANCZOS), False
else: # "None" mode
return process_single_image(images[0], width, height), False
Loading