Skip to content

Commit

Permalink
KF-3620 custom Prometheus exporter (#162)
Browse files Browse the repository at this point in the history
* integrate prometheus exporter with mlflow-server as a second container
* add tests
  • Loading branch information
misohu authored Jun 27, 2023
1 parent 7404d58 commit 3a8ce66
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 19 deletions.
10 changes: 10 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ options:
The nodeport for MLFlow
type: int
default: 31380
mlflow_prometheus_exporter_port:
description: |
The port MLFlow Prometheus exporter will be listening on
type: int
default: 8000
mlflow_prometheus_exporter_nodeport:
description: |
The nodeport for MLFlow Prometheus exporter
type: int
default: 31381
enable_mlflow_nodeport:
description: Deploy the NodePort service for MLFlow
type: boolean
Expand Down
6 changes: 6 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@ docs: https://discourse.charmhub.io/t/mlflow-server-docs-index/10816
containers:
mlflow-server:
resource: oci-image
mlflow-prometheus-exporter:
resource: exporter-oci-image
resources:
oci-image:
type: oci-image
description: Backing OCI image
upstream-source: docker.io/charmedkubeflow/mlflow:v2.1.1_22.04_1
exporter-oci-image:
type: oci-image
description: Image for Prometheus exporter
upstream-source: docker.io/charmedkubeflow/mlflow-prometheus-exporter:1.0-22.04
requires:
relational-db:
interface: mysql_client
Expand Down
77 changes: 67 additions & 10 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ def __init__(self, *args):

self.logger = logging.getLogger(__name__)
self._port = self.model.config["mlflow_port"]
self._exporter_port = self.model.config["mlflow_prometheus_exporter_port"]
self._container_name = "mlflow-server"
self._exporter_container_name = "mlflow-prometheus-exporter"
self._database_name = "mlflow"
self._container = self.unit.get_container(self._container_name)
self._exporter_container = self.unit.get_container(self._exporter_container_name)
self.database = DatabaseRequires(
self, relation_name="relational-db", database_name=self._database_name
)
Expand Down Expand Up @@ -87,23 +90,39 @@ def container(self):
"""Return container."""
return self._container

@property
def exporter_container(self):
"""Return container."""
return self._exporter_container

def _create_service(self):
"""Create k8s service based on charm'sconfig."""
if self.config["enable_mlflow_nodeport"]:
service_type = "NodePort"
self._node_port = self.model.config["mlflow_nodeport"]
self._exporter_node_port = self.model.config["mlflow_prometheus_exporter_nodeport"]
port = ServicePort(
int(self._port),
name=f"{self.app.name}",
targetPort=int(self._port),
nodePort=int(self._node_port),
)

exporter_port = ServicePort(
int(self._exporter_port),
name=f"{self.app.name}-prometheus-exporter",
targetPort=int(self._exporter_port),
nodePort=int(self._exporter_node_port),
)
else:
service_type = "ClusterIP"
port = ServicePort(int(self._port), name=f"{self.app.name}")
exporter_port = ServicePort(
int(self._exporter_port), name=f"{self.app.name}-prometheus-exporter"
)
self.service_patcher = KubernetesServicePatch(
self,
[port],
[port, exporter_port],
service_type=service_type,
service_name=f"{self.model.app.name}",
refresh_event=self.on.config_changed,
Expand Down Expand Up @@ -155,6 +174,29 @@ def _charmed_mlflow_layer(self, env_vars, default_artifact_root) -> Layer:

return Layer(layer_config)

def _mlflow_exporter_layer(self) -> Layer:
"""Create and return Pebble framework layer."""

layer_config = {
"summary": "mlflow-prometheus-exporter layer",
"description": "Pebble config layer for mlflow-prometheus-exporter",
"services": {
self._exporter_container_name: {
"override": "replace",
"summary": "Entrypoint of mlflow-prometheus-exporter image",
"command": (
"python3 "
"mlflow_exporter.py "
f"--port {self._exporter_port} "
f"--mlflowurl http://localhost:{self._port}/"
),
"startup": "enabled",
},
},
}

return Layer(layer_config)

def _get_interfaces(self):
"""Retrieve interface object."""
try:
Expand Down Expand Up @@ -282,18 +324,16 @@ def _check_leader(self):
self.logger.info("Not a leader, skipping setup")
raise ErrorWithStatus("Waiting for leadership", WaitingStatus)

def _update_layer(self, envs, default_artifact_root) -> None:
"""Update the Pebble configuration layer (if changed)."""
if not self.container.can_connect():
raise ErrorWithStatus("Container is not ready", WaitingStatus)
def _update_layer(self, container, container_name, new_layer) -> None:
current_layer = self.container.get_plan()
new_layer = self._charmed_mlflow_layer(envs, default_artifact_root)
if current_layer.services != new_layer.services:
self.unit.status = MaintenanceStatus("Applying new pebble layer")
self.container.add_layer(self._container_name, new_layer, combine=True)
container.add_layer(container_name, new_layer, combine=True)
try:
self.logger.info("Pebble plan updated with new configuration, replaning")
self.container.replan()
self.logger.info(
f"Pebble plan updated with new configuration, replaning for {container_name}"
)
container.replan()
except ChangeError as err:
raise ErrorWithStatus(f"Failed to replan with error: {str(err)}", BlockedStatus)

Expand Down Expand Up @@ -346,7 +386,24 @@ def _on_event(self, event) -> None:
bucket_name=bucket_name, s3_wrapper=s3_wrapper
):
self._create_default_s3_bucket(s3_wrapper, bucket_name)
self._update_layer(envs, bucket_name)

if not self.container.can_connect():
raise ErrorWithStatus(
f"Container {self._container_name} is not ready", WaitingStatus
)
self._update_layer(
self.container, self._container_name, self._charmed_mlflow_layer(envs, bucket_name)
)
if not self.exporter_container.can_connect():
raise ErrorWithStatus(
f"Container {self._exporter_container_name} is not ready", WaitingStatus
)
self._update_layer(
self.exporter_container,
self._exporter_container_name,
self._mlflow_exporter_layer(),
)

secrets_context = {
"app_name": self.app.name,
"s3_endpoint": f"http://{object_storage_data['service']}.{object_storage_data['namespace']}:{object_storage_data['port']}", # noqa: E501
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,14 @@ async def test_get_minio_credentials_action(self, ops_test: OpsTest):

assert access_key == OBJECT_STORAGE_CONFIG["access-key"]
assert secret_access_key == OBJECT_STORAGE_CONFIG["secret-key"]

@pytest.mark.abort_on_fail
async def test_can_connect_exporter_and_get_metrics(self, ops_test: OpsTest):
config = await ops_test.model.applications[CHARM_NAME].get_config()
url = f"http://localhost:{config['mlflow_prometheus_exporter_nodeport']['value']}/metrics"
response = requests.get(url)
assert response.status_code == 200
metrics_text = response.text
assert 'mlflow_metric{metric_name="num_experiments"} 2.0' in metrics_text
assert 'mlflow_metric{metric_name="num_registered_models"} 0.0' in metrics_text
assert 'mlflow_metric{metric_name="num_runs"} 0' in metrics_text
3 changes: 2 additions & 1 deletion tests/integration/test_deploy_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ async def test_build_and_deploy(self, ops_test: OpsTest):
"""
charm_under_test = await ops_test.build_charm(".")
image_path = METADATA["resources"]["oci-image"]["upstream-source"]
resources = {"oci-image": image_path}
exporter_image_path = METADATA["resources"]["exporter-oci-image"]["upstream-source"]
resources = {"oci-image": image_path, "exporter-oci-image": exporter_image_path}

await ops_test.model.deploy(
charm_under_test, resources=resources, application_name=CHARM_NAME, trust=True
Expand Down
51 changes: 43 additions & 8 deletions tests/unit/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def harness() -> Harness:
return harness


def enable_exporter_container(harness: harness) -> Harness:
"""Enable mlflow-prometheus-exporter for connections."""
harness.set_can_connect("mlflow-prometheus-exporter", True)
return harness


def add_object_storage_to_harness(harness: Harness):
"""Helper function to handle object storage relation"""
object_storage_data = {"_supported_versions": "- v1", "data": yaml.dump(OBJECT_STORAGE_DATA)}
Expand Down Expand Up @@ -379,7 +385,7 @@ def test_update_layer_failure_container_problem(
container.replan.side_effect = _FakeChangeError("Fake problem during layer update", change)
harness.begin()
with pytest.raises(ErrorWithStatus) as exc_info:
harness.charm._update_layer({}, "")
harness.charm._update_layer(container, harness.charm._container_name, MagicMock())

assert exc_info.value.status_type(BlockedStatus)
assert "Failed to replan with error: " in str(exc_info)
Expand All @@ -393,7 +399,11 @@ def test_update_layer_success(
harness: Harness,
):
harness.begin()
harness.charm._update_layer({}, "")
harness.charm._update_layer(
harness.charm.container,
harness.charm._container_name,
harness.charm._charmed_mlflow_layer({}, ""),
)
assert harness.charm.container.get_plan().services == EXPECTED_SERVICE

@patch(
Expand Down Expand Up @@ -449,16 +459,41 @@ def test_send_manifests(self, create_manifests: MagicMock, harness: Harness):
"charm.S3BucketWrapper.__init__",
lambda *args, **kw: None,
)
@patch("charm.MlflowCharm._get_object_storage_data")
@patch("charm.MlflowCharm._get_relational_db_data")
@patch("charm.MlflowCharm._get_object_storage_data", return_value=OBJECT_STORAGE_DATA)
@patch("charm.MlflowCharm._get_relational_db_data", return_value=RELATIONAL_DB_DATA)
def test_on_event_wainting_for_exporter(
self,
_: MagicMock,
__: MagicMock,
harness: Harness,
):
harness.set_leader(True)
harness.begin()
harness.charm._on_event(None)
assert harness.charm.model.unit.status == WaitingStatus(
"Container mlflow-prometheus-exporter is not ready"
)

@patch(
"charm.KubernetesServicePatch",
lambda x, y, service_name, service_type, refresh_event: None,
)
@patch(
"charm.MlflowCharm._validate_default_s3_bucket_name_and_access", lambda *args, **kw: True
)
@patch(
"charm.S3BucketWrapper.__init__",
lambda *args, **kw: None,
)
@patch("charm.MlflowCharm._get_object_storage_data", return_value=OBJECT_STORAGE_DATA)
@patch("charm.MlflowCharm._get_relational_db_data", return_value=RELATIONAL_DB_DATA)
def test_on_event(
self,
get_relational_db_data: MagicMock,
get_object_storage_data: MagicMock,
_: MagicMock,
__: MagicMock,
harness: Harness,
):
get_object_storage_data.return_value = OBJECT_STORAGE_DATA
get_relational_db_data.return_value = RELATIONAL_DB_DATA
harness = enable_exporter_container(harness)
harness.set_leader(True)
harness.begin()
harness.charm._on_event(None)
Expand Down

0 comments on commit 3a8ce66

Please sign in to comment.