本文档描述了如何开发算法模块,使得该模块可以在 FATE 架构下被调用。
要开发模块,需要执行以下 6 个步骤。
- 定义将在此模块中使用的 python 参数对象。
- 定义模块的 meta 文件。
- 如果模块需要联邦,则需定义传输变量配置文件。
- 您的算法模块需要继承
model_base
类,并完成几个指定的函数。 - 定义模型保存所需的protobuf文件。
- 若希望通过python脚本直接启动组件,需要在
fate_client
中定义Pipeline组件。 - 重启fate flow服务。
在以下各节中,我们将通过 hetero_lr
详细描述这 7 个步骤。
参数对象是将用户定义的运行时参数传递给开发模块的唯一方法,因此每个模块都有其自己的参数对象。
为定义可用的参数对象,需要三个步骤。
- 打开一个新的 python 文件,将其重命名为
xxx_param.py
,其中xxx代表您模块的名称,并将其放置在python/federatedml/param/
文件夹中。 在xxx_param.py
中定义它的类对象,应该继承python/federatedml/param/base_param.py
中定义的 BaseParam 类。 - 参数类的
__init__
方法应该指定模块使用的所有参数。 - 重载 BaseParam 的参数检查接口,否则将会抛出未实现的错误。检查方法被用于验证参数变量是否可用。
以 hetero lr 的参数对象为例,python文件为
federatedml/param/logistic_regression_param.py
首先,它继承自 BaseParam:
class LogisticParam(BaseParam):
然后,在 __init__
方法中定义所有参数变量:
def __init__(self, penalty='L2',
tol=1e-4, alpha=1.0, optimizer='rmsprop',
batch_size=-1, learning_rate=0.01, init_param=InitParam(),
max_iter=100, early_stop='diff', encrypt_param=EncryptParam(),
predict_param=PredictParam(), cv_param=CrossValidationParam(),
decay=1, decay_sqrt=True,
multi_class='ovr', validation_freqs=None, early_stopping_rounds=None,
stepwise_param=StepwiseParam(), floating_point_precision=23,
metrics=None,
use_first_metric_only=False,
callback_param=CallbackParam()
):
super(LogisticParam, self).__init__()
self.penalty = penalty
self.tol = tol
self.alpha = alpha
self.optimizer = optimizer
self.batch_size = batch_size
self.learning_rate = learning_rate
self.init_param = copy.deepcopy(init_param)
self.max_iter = max_iter
self.early_stop = early_stop
self.encrypt_param = encrypt_param
self.predict_param = copy.deepcopy(predict_param)
self.cv_param = copy.deepcopy(cv_param)
self.decay = decay
self.decay_sqrt = decay_sqrt
self.multi_class = multi_class
self.validation_freqs = validation_freqs
self.stepwise_param = copy.deepcopy(stepwise_param)
self.early_stopping_rounds = early_stopping_rounds
self.metrics = metrics or []
self.use_first_metric_only = use_first_metric_only
self.floating_point_precision = floating_point_precision
self.callback_param = copy.deepcopy(callback_param)
如上面的示例所示,该参数也可以是 Param
类。此类参数的默认设置是此类的一个实例。然后将该实例的深度复制(deepcopy
)版本分配给类归属。深度复制功能用于避免任务同时运行时指向相同内存的风险。
一旦正确定义了类,已有的参数解析器就可以递归地解析每个属性的值。
之后,重载参数检查的接口:
def check(self):
descr = "logistic_param's"
if type(self.penalty).__name__ != "str":
raise ValueError(
"logistic_param's penalty {} not supported, should be str type".format(self.penalty))
else:
self.penalty = self.penalty.upper()
if self.penalty not in ['L1', 'L2', 'NONE']:
raise ValueError(
"logistic_param's penalty not supported, penalty should be 'L1', 'L2' or 'none'")
if type(self.eps).__name__ != "float":
raise ValueError(
"logistic_param's eps {} not supported, should be float type".format(self.eps))
定义meta文件是为了使 FATE-Flow 模块通过该文件以获取有关如何启动模块程序的信息。
-
在 components 中定义名为 xxx.py 的meta文件,其中 xxx 是您要开发的模块。
-
配置 meta 文件。
-
继承 ComponentMeta, 用模块名为其命名, 例如 xxx_cpn_meta = ComponentMeta("XXX"). XXX 即在 dsl 中调用的模块名。
from .components import ComponentMeta hetero_lr_cpn_meta = ComponentMeta("HeteroLR")
-
使用装饰器
xxx_cpn_meta.bind_runner.on_$role
将模块object绑定至每个角色。 $role 包括 guest\host\arbiter. 如果多个角色使用同一模块object,可以使用xxx_cpn_meta.bind_runner.on_$role1.on_$role2.on_$role3
格式注明。 装饰器方程将引入并返回对应角色的模块object。以hetero-lr 为例: python/federatedml/components/hetero_lr.py
@hetero_lr_cpn_meta.bind_runner.on_guest def hetero_lr_runner_guest(): from federatedml.linear_model.coordinated_linear_model.logistic_regression import HeteroLRGuest return HeteroLRGuest @hetero_lr_cpn_meta.bind_runner.on_host def hetero_lr_runner_host(): from federatedml.linear_model.coordinated_linear_model.logistic_regression import HeteroLRHost return HeteroLRHost
-
使用装饰器
xxx_cpn_meta.bind_param
将参数object绑定至step1中定义的开发组件, 装饰器将返回对应参数object。@hetero_lr_cpn_meta.bind_param def hetero_lr_param(): from federatedml.param.logistic_regression_param import HeteroLogisticParam return HeteroLogisticParam
-
仅在此模块需要联邦时(即不同参与方之间存在信息交互)才需要执行此步骤。
!!!Note
应将其放在 [`transfer_class`](../../python/federatedml/transfer_variable/transfer_class) 文件夹中。
在该定义文件中,您需要创建需要的 transfer_variable
类,并继承BaseTransferVariables
类,然后定义相应的变量,并为其赋予需要的传输权限。以HeteroLRTransferVariable
为例,可以参考以下代码:
from federatedml.transfer_variable.base_transfer_variable import BaseTransferVariables
# noinspection PyAttributeOutsideInit
class HeteroLRTransferVariable(BaseTransferVariables):
def __init__(self, flowid=0):
super().__init__(flowid)
self.batch_data_index = self._create_variable(name='batch_data_index', src=['guest'], dst=['host'])
self.batch_info = self._create_variable(name='batch_info', src=['guest'], dst=['host', 'arbiter'])
self.converge_flag = self._create_variable(name='converge_flag', src=['arbiter'], dst=['host', 'guest'])
self.fore_gradient = self._create_variable(name='fore_gradient', src=['guest'], dst=['host'])
self.forward_hess = self._create_variable(name='forward_hess', src=['guest'], dst=['host'])
self.guest_gradient = self._create_variable(name='guest_gradient', src=['guest'], dst=['arbiter'])
self.guest_hess_vector = self._create_variable(name='guest_hess_vector', src=['guest'], dst=['arbiter'])
self.guest_optim_gradient = self._create_variable(name='guest_optim_gradient', src=['arbiter'], dst=['guest'])
self.host_forward_dict = self._create_variable(name='host_forward_dict', src=['host'], dst=['guest'])
self.host_gradient = self._create_variable(name='host_gradient', src=['host'], dst=['arbiter'])
self.host_hess_vector = self._create_variable(name='host_hess_vector', src=['host'], dst=['arbiter'])
self.host_loss_regular = self._create_variable(name='host_loss_regular', src=['host'], dst=['guest'])
self.host_optim_gradient = self._create_variable(name='host_optim_gradient', src=['arbiter'], dst=['host'])
self.host_prob = self._create_variable(name='host_prob', src=['host'], dst=['guest'])
self.host_sqn_forwards = self._create_variable(name='host_sqn_forwards', src=['host'], dst=['guest'])
self.loss = self._create_variable(name='loss', src=['guest'], dst=['arbiter'])
self.loss_intermediate = self._create_variable(name='loss_intermediate', src=['host'], dst=['guest'])
self.paillier_pubkey = self._create_variable(name='paillier_pubkey', src=['arbiter'], dst=['host', 'guest'])
self.sqn_sample_index = self._create_variable(name='sqn_sample_index', src=['guest'], dst=['host'])
self.use_async = self._create_variable(name='use_async', src=['guest'], dst=['host'])
其中,需要设定的属性为:
-
name
变量名 -
src
应为 "guest","host","arbiter" 的某些组合,它表示发送交互信息从何处发出。 -
dst
应为 "guest","host","arbiter" 的某些组合列表,用于定义将交互信息发送到何处。
fate_flow_client
模块的运行规则是:
- 从数据库中检索fate的组件注册信息,获取component的每个
role
对应的运行对象。 - 初始化各方的运行对象。
- 调用运行对象的 run 方法。
- 如果需要,调用
save_data
方法。 - 如果需要,调用
export_model
方法。
在本节中,我们讲解如何执行规则 2 至 5。需要被继承的model_base
类位于:python/federatedml/model_base.py。
-
重载
__init__
接口
指定模块参数类型为第一步中定义的类.
以hetero_lr_base.py
为例, 最后一行代码指定了新定义的模块的参数类型。def __init__(self): super().__init__() self.model_name = 'HeteroLogisticRegression' self.model_param_name = 'HeteroLogisticRegressionParam' self.model_meta_name = 'HeteroLogisticRegressionMeta' self.mode = consts.HETERO self.aggregator = None self.cipher = None self.batch_generator = None self.gradient_loss_operator = None self.converge_procedure = None self.model_param = HeteroLogisticParam()
注: 这一步是强制的. 如果你不指定
self.model_param
的值, 在_init_model(self, params)
方法中将不能获取params的值. -
在需要时重载 fit 接口
fit 函数具有以下形式。
def fit(self, train_data, validate_data=None):
fit函数是启动建模组件的训练,或者特征工程组件的fit功能的入口。接受训练数据和验证集数据,validate数据可不提供。该函数在用户启动训练任务时,被
model_base
自动调起,您只需在该函数完成自身需要的fit任务即可。 -
在需要的时候重载 predict 接口
predict 函数具有如下形式.
def predict(self, data_inst):
data_inst
是一个 Table, 用于建模组件的预测功能。在用户启动预测任务时,将被model_base
自动调起。 另外,在训练任务中,建模组件也会调用predict
函数对训练数据和验证集数据(如果有)进行预测,并输出预测结果。该函数的返回结果,如果后续希望接入evaluation
,需要输出符合下列格式的Table:-
二分类,多分类,回归任务返回一张表 表的格式为: ["label", "predict_result", "predict_score", "predict_detail", "type"]
- `label`: 提供的标签 - predict_result: 模型预测的结果 - `predict_score`: 对于2分类为1的预测分数,对于多分类为概率最高的那一类的分数,对于回归任务,与predict\_result相同 - `predict_detail`: 对于分类任务,列出各分类的得分,对于回归任务,列出回归预测值 - `type`: 表明该结果来源(是训练数据或者是验证及数据),该结果`model_base`会自动拼接。
-
聚类任务返回两张表
第一张的格式为: ["cluster_sample_count", "cluster_inner_dist", "inter_cluster_dist"] - `cluster_sample_count`: 每个类别下的样本个数 - `cluster_inner_dist`: 类内距离 - `inter_cluster_dist`: 类间距离 第二张表的格式为: `["predicted_cluster_index", "distance"]` - `predicted_cluster_index`: 预测的所属类别 - `distance`: 该样本到中心点的距离
-
-
在需要的时候重载
transform
接口transform 函数具有如下形式.
def transform(self, data_inst):
data_inst
是一个 Table, 用于特征工程组件对数据进行转化功能。在用户启动预测任务时,将被model_base
自动调起。 -
定义您的
save_data
接口
以便 fate-flow 可以在需要时通过它获取输出数据。def save_data(self): return self.data_output
为了方便模型跨平台保存和读取模型,FATE使用protobuf文件定义每个模型所需的参数和模型内容。当您开发自己的模块时,需要定义本模块中需要保存的内容并创建相应的protobuf文件。protobuf文件所在的位置为 这个目录 。
更多使用protobuf的细节,请参考 这个教程
每个模型一般需要两个proto文件,其中后缀为meta的文件中保存某一次任务的配置,后缀为param的文件中保存某次任务的模型结果。
在完成proto文件的定义后,可执行protobuf目录下的 generate_py.sh文件 生成对应的python文件。之后,您可在自己的项目中引用自己设计的proto类型,并进行保存:
bash proto_generate.sh
以便 fate-flow 可以在需要时通过它获取输出的模型。应为同时包含 “Meta” 和 “Param” 包含了产生的proto buffer类的 dict 格式。这里展示了如何导出模型。
def export_model(self):
meta_obj = self._get_meta()
param_obj = self._get_param()
result = {
self.model_meta_name: meta_obj,
self.model_param_name: param_obj
}
return result
若希望后续用户可以通过python脚本形式启动建模任务,需要在
python/fate_client/pipeline/component
中添加自己的组件。详情请参考Pipeline的
文档
当上面的开发步骤都完成后,需要重启fate flow服务,否则后续提交任务可能会报一些错误如"新组件的provider找不到"。 fate flow服务也可通过debug模式启动,启动方式: "python fate_flow_server.py --debug", debug模式可以让修改的代码不重启也生效。
这里给出开发完成后,启动建模任务的一个简单示例。
在开始任务之前,您需要加载来自所有提供者的数据。为此,需要准备load_file
配置,然后运行以下命令:
flow data upload -c upload_data.json
!!!Note
每个数据提供节点(即`guest`和`host`)都需要执行此步骤。
在此步骤中,应准备两个与 dsl 配置文件和组件配置文件相对应的配置文件。请确保配置文件中的 table_name
和namespace
与upload_data conf
匹配。然后运行以下命令:
flow job submit -d ${your_dsl_file.json} -c ${your_component_conf_json}
若您已在fate_client
中添加了自己的组件,也可以准备好自己的pipeline脚本,然后使用python命令直接启动:
python ${your_pipeline.py}
现在,您可以在以下路径中检查日志:$PROJECT_BASE/fateflow/logs/{your jobid}
.
有关 dsl 配置文件和参数配置文件的更多详细信息,请参考此处的examples/dsl/v2
中查看。