forked from edgexfoundry-holding/app-functions-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conversion.py
74 lines (64 loc) · 3.08 KB
/
conversion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright (C) 2024 IOTech Ltd
# SPDX-License-Identifier: Apache-2.0
"""
This module provides the classes and functions for Conversion
"""
import json
from typing import Any, Tuple
from ..contracts import errors
from ..contracts.clients.utils.common import convert_any_to_dict
from ..contracts.common.constants import CONTENT_TYPE_XML, CONTENT_TYPE_JSON
from ..contracts.dtos.event import Event
from ..interfaces import AppFunctionContext
TRANSFORM_TYPE = "type"
TRANSFORM_XML = "xml"
TRANSFORM_JSON = "json"
class Conversion:
""" Conversion convert the data from the pipeline """
def transform_to_xml(
self, ctx: AppFunctionContext, data: Any) -> Tuple[bool, Any]:
""" TransformToXML transforms an EdgeX event to XML. It will return an error and stop
the pipeline if a non-edgex event is received or if no data is received."""
if data is None:
# We didn't receive a result
return False, errors.new_common_edgex(
errors.ErrKind.CONTRACT_INVALID,
f"function TransformToXML in pipeline '{ctx.pipeline_id()}': No Data Received")
ctx.logger().debug("Transforming to XML in pipeline '%s'", ctx.pipeline_id())
if isinstance(data, Event):
xml, err = data.to_xml()
if err is not None:
return False, errors.new_common_edgex(
errors.ErrKind.SERVER_ERROR,
f"unable to marshal Event to XML in pipeline '{ctx.pipeline_id()}'",
err
)
ctx.set_response_content_type(CONTENT_TYPE_XML)
return True, xml
return False, errors.new_common_edgex(
errors.ErrKind.SERVER_ERROR,
f"function TransformToXML in pipeline '{ctx.pipeline_id()}': unexpected type received")
def transform_to_json(
self, ctx: AppFunctionContext, data: Any) -> Tuple[bool, Any]:
""" TransformToJSON transforms an EdgeX event to JSON. It will return an error and stop
the pipeline if a non-edgex event is received or if no data is received."""
if data is None:
# We didn't receive a result
return False, errors.new_common_edgex(
errors.ErrKind.CONTRACT_INVALID,
f"function TransformToJSON in pipeline '{ctx.pipeline_id()}': No Data Received")
ctx.logger().debug("Transforming to JSON in pipeline '%s'", ctx.pipeline_id())
if isinstance(data, Event):
try:
b = json.dumps(convert_any_to_dict(data)).encode('utf-8')
except TypeError as e:
return False, errors.new_common_edgex(
errors.ErrKind.SERVER_ERROR,
f"unable to marshal Event to JSON in pipeline '{ctx.pipeline_id()}'",
e
)
ctx.set_response_content_type(CONTENT_TYPE_JSON)
return True, b
return False, errors.new_common_edgex(
errors.ErrKind.SERVER_ERROR,
f"function TransformToJSON in pipeline '{ctx.pipeline_id()}': unexpected type received")