-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add: add new proposal of Federated Incremental Learning for Label Sca… #124
Conversation
…rcity: Base on KubeEdge-Ianvs Signed-off-by: Marchons <[email protected]>
…rcity: Base on KubeEdge-Ianvs Signed-off-by: Marchons <[email protected]>
…rcity: Base on KubeEdge-Ianvs Signed-off-by: Marchons <[email protected]>
…rcity: Base on KubeEdge-Ianvs Signed-off-by: Marchons <[email protected]>
…rcity: Base on KubeEdge-Ianvs Signed-off-by: Marchons <[email protected]>
…el Scarcity: Base on KubeEdge-Ianvs Signed-off-by: Marchons <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to see the proposal. The architecture format looks great to me. This version is using a single node instead of multiple nodes simulation.
Remain works include:
- The procedures include federated learning but have not yet cooperated with Sedna federated learning lib. Need to run a single-node version federated learning using Sedna lib. Marked new modules in the architecutre.
- The forget rate looks like sample-wise instead of task-wise, thus it is different against FWT and BWT. Need to describe the forget rate correctly and add a formular.
@hsj576 might also need to take a look at this proposal |
Currently, Ianvs does not support Federated Learning, so it may be necessary to consider how to implement federated learning in Ianvs before implement Federated Incremental Learning in this proposal. |
Signed-off-by: Marchons <[email protected]>
Signed-off-by: Marchons <[email protected]>
Thanks for the advice! The new proposal have been updated and marked the new modules in the architecture,formular of forget rate also added. |
i wrote a simple demo of my design at https://github.com/Yoda-wu/ianvs/blob/dev_script/core/testcasecontroller/algorithm/paradigm/federated_learning/federeated_learning.py Since the formula does not display properly in github, here I post a picture: |
Signed-off-by: Marchons <[email protected]>
2ce5a56
to
dbb0458
Compare
…ecture Signed-off-by: Marchons <[email protected]>
dbb0458
to
8cacd7f
Compare
Sicne 8.22 weekly meeting is suspended, i would like to discuss the implement detail in github.
Sedna LibSedna Federated Learning LibThe core class of Federated learning in Sedna is class FederatedLearning(JobBase): In FederatedLearning, there are three main components: init(self, estimator, aggregation="FedAvg")source code def __init__(self, estimator, aggregation="FedAvg"):
protocol = Context.get_parameters("AGG_PROTOCOL", "ws")
agg_ip = Context.get_parameters("AGG_IP", "127.0.0.1")
agg_port = int(Context.get_parameters("AGG_PORT", "7363"))
agg_uri = f"{protocol}://{agg_ip}:{agg_port}/{aggregation}"
config = dict(
protocol=protocol,
agg_ip=agg_ip,
agg_port=agg_port,
agg_uri=agg_uri
)
super(FederatedLearning, self).__init__(
estimator=estimator, config=config)
self.aggregation = ClassFactory.get_cls(ClassType.FL_AGG, aggregation)
connect_timeout = int(Context.get_parameters("CONNECT_TIMEOUT", "300"))
self.node = None
self.register(timeout=connect_timeout) register(self, timeout=300)
def register(self, timeout=300):
"""
Deprecated, Client proactively subscribes to the aggregation service.
Parameters
----------
timeout: int, connect timeout. Default: 300
"""
self.log.info(
f"Node {self.worker_name} connect to : {self.config.agg_uri}")
self.node = AggregationClient(
url=self.config.agg_uri,
client_id=self.worker_name,
ping_timeout=timeout
)
FileOps.clean_folder([self.config.model_url], clean=False)
self.aggregation = self.aggregation()
self.log.info(f"{self.worker_name} model prepared")
if callable(self.estimator):
self.estimator = self.estimator() train(self, train_data, valid_data=None, post_process=None, **kwargs)source Firstly, it need to initialize some local variable: def train(self, train_data,
valid_data=None,
post_process=None,
**kwargs):
callback_func = None
if post_process:
callback_func = ClassFactory.get_cls(
ClassType.CALLBACK, post_process)
round_number = 0
num_samples = len(train_data)
_flag = True
start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
res = None Then is the training loop, Sedna implement it with a while true loop, and the exit logic is left to the server. while 1:
if _flag:
round_number += 1
start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log.info(
f"Federated learning start, round_number={round_number}")
res = self.estimator.train(
train_data=train_data, valid_data=valid_data, **kwargs)
current_weights = self.estimator.get_weights()
send_data = {"num_samples": num_samples,
"weights": current_weights}
self.node.send(
send_data, msg_type="update_weight", job_name=self.job_name
)
received = self.node.recv(wait_data_type="recv_weight")
if not received:
_flag = False
continue
_flag = True sedna then collates the messages received from the server, logs them, and aggregates parameters using estimator. rec_data = received.get("data", {})
exit_flag = rec_data.get("exit_flag", "")
server_round = int(rec_data.get("round_number"))
total_size = int(rec_data.get("total_sample"))
self.log.info(
f"Federated learning recv weight, "
f"round: {server_round}, total_sample: {total_size}"
)
n_weight = rec_data.get("weights")
self.estimator.set_weights(n_weight)
task_info = {
'currentRound': round_number,
'sampleCount': total_size,
'startTime': start,
'updateTime': time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime())
}
model_paths = self.estimator.save()
task_info_res = self.estimator.model_info(
model_paths, result=res, relpath=self.config.data_path_prefix)
if exit_flag == "ok":
self.report_task_info(
task_info,
K8sResourceKindStatus.COMPLETED.value,
task_info_res)
self.log.info(f"exit training from [{self.worker_name}]")
return callback_func(
self.estimator) if callback_func else self.estimator
else:
self.report_task_info(
task_info,
K8sResourceKindStatus.RUNNING.value,
task_info_res) That's all the functionalities of FederatedLearning, you can see that this section is Sedna's definition of client side in federated learning, then let's introduce the functionalities of Server side provided by Sedna Sedna Aggregate Server Libsource code class Aggregator(WSServerBase): It consists of init(self, **kwargs)The initialization is to instantiate the aggregation algorithm implemented by the user and initialize some training parameters such as the number of exit rounds (total number of training rounds), the number of participating clients, and the current training round. def __init__(self, **kwargs):
super(Aggregator, self).__init__()
self.exit_round = int(kwargs.get("exit_round", 3))
aggregation = kwargs.get("aggregation", "FedAvg")
self.aggregation = ClassFactory.get_cls(ClassType.FL_AGG, aggregation)
if callable(self.aggregation):
self.aggregation = self.aggregation()
self.participants_count = int(kwargs.get("participants_count", "1"))
self.current_round = 0 async def send_message(self, client_id: str, msg: Dict)The main functions of sending and receiving messages, Server and Client in Sedna mainly communicate through websocket protocol, so here we use an asynchronous method to achieve. As you can see, the main thing the source code does is collate the information from the clients, and if the number of clients has been aggregated is reached, the aggregate function in the aggregation is called, and the aggregated parameters are returned to the user. async def send_message(self, client_id: str, msg: Dict):
data = msg.get("data")
if data and msg.get("type", "") == "update_weight":
info = AggClient()
info.num_samples = int(data["num_samples"])
info.weights = data["weights"]
self._client_meta[client_id].info = info
current_clinets = [
x.info for x in self._client_meta.values() if x.info
]
# exit while aggregation job is NOT start
if len(current_clinets) < self.participants_count:
return
self.current_round += 1
weights = self.aggregation.aggregate(current_clinets)
exit_flag = "ok" if self.exit_check() else "continue"
msg["type"] = "recv_weight"
msg["round_number"] = self.current_round
msg["data"] = {
"total_sample": self.aggregation.total_size,
"round_number": self.current_round,
"weights": weights,
"exit_flag": exit_flag
}
for to_client, websocket in self._clients.items():
try:
await websocket.send_json(msg)
except Exception as err:
LOGGER.error(err)
else:
if msg["type"] == "recv_weight":
self._client_meta[to_client].info = None exit_checkThis function is to check if Problem
Two Plans to integrate Federeated Leanring Paradigm and Federated Class Incremental Learning to IanvsThe current proposal requires the implementation of two paradigms, one Federated Learning and the other Federated Class Incremental Learning Plan ADemo Code from core.testcasecontroller.algorithm.paradigm import FederatedLearning
if self.paradigm_type == ParadigmType.FEDERATED_LEARNING.value:
return FederatedLearning(workspace, **config) Then in FederatedLearning, start the server and client in a multi-threaded way def run_server(self):
aggregation_algorithm = self.aggregation
exit_round = self.rounds
participants_count = self.clients_number
LOGGER.info("running server!!!!")
server = AggregationServer(
aggregation=aggregation_algorithm,
exit_round=exit_round,
ws_size=1000 * 1024 * 1024,
participants_count=participants_count,
host=self.LOCAL_HOST
)
server.start() client_train: def client_train(self, train_datasets, validation_datasets, post_process, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = self.build_paradigm_job(ParadigmType.FEDERATED_LEARNING.value)
client.train(train_datasets, validation_datasets, post_process, **kwargs)
loop.close()
self.clients.append(client) Also like the other Paradigm, when we call build_paradigm_job, we instantiate a FederatedLearning object from Sedna from sedna.core.federated_learning import FederatedLearning
if paradigm_type == ParadigmType.FEDERATED_LEARNING.value:
agg_name, agg = self.module_instances.get(ModuleType.AGGREGATION.value)
return FederatedLearning(
estimator=self.module_instances.get(ModuleType.BASEMODEL.value),
aggregation= agg_name
) Pros
Plan Bdemo code These two components are provided by Sedna to the user through the ClassFactory interface, and the user implements Sedna hides the details of communication from the user, that is, you only need to focus on implementing the local training function in estimator and the aggregation function in aggregation. So we can do the same in Ianvs, we can omit the communication, and directly collect and aggregate parameters by the program. Essentially using estimator and aggregation. However, instead of communicating with WebSockets, we can do it directly in memory. for example in from core.testcasecontroller.algorithm.paradigm import FederatedClassIncrementalLearning
if self.paradigm_type == ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value:
return FederatedClassIncrementalLearning(workspace, **config) and the FederatedClassIncrementalLearning: class FederatedClassIncrementalLearning(FederatedLearning):
def __init__(self, workspace, **kwargs):
super(FederatedClassIncrementalLearning, self).__init__(workspace, **kwargs)
self.rounds = kwargs.get("incremental_rounds", 1)
self.task_size = kwargs.get("task_size", 10)
self.system_metric_info = {}
self.lock = RLock()
self.aggregate_clients=[]
self.train_infos=[]
self.aggregation, self.aggregator = self.module_instances.get(ModuleType.AGGREGATION.value)
def init_client(self):
import copy
tempalte = self.build_paradigm_job(ParadigmType.FEDERATED_CLASS_INCREMENTAL_LEARNING.value)
self.clients = [copy.deepcopy(tempalte) for _ in range(self.task_size)] Here we can directly access the def run(self):
self.init_client()
dataset_files = self._split_dataset(self.task_size)
for r in range(self.rounds):
task_id = r // self.task_size
LOGGER.info(f"Round {r} task id: {task_id}")
train_datasets = self.task_definition(dataset_files, task_id)
self._train(train_datasets, task_id=task_id, round=r, task_size=self.task_size)
global_weights = self.aggregator.aggregate(self.aggregate_clients)
if hasattr(self.aggregator, "helper_function"):
self.helper_function(self.train_infos)
self.send_weights_to_clients(global_weights)
self.aggregate_clients.clear()
self.train_infos.clear()
test_res = self.predict(self.dataset.test_url)
return test_res, self.system_metric_info Pros
Which plan should we adopt? If there are any other options, please feel free to suggest them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The formula is clarified. Now the acc is a per task / per class metric, instead of a per sample one. That is revised in the proposal now.
Two suggestions:
- As for the new federated learning scheme in ianvs, Plan B is fine to me, which adds merely the algorithm part of sedna into ianvs, while Plan A introduces the server functions which might not be necessary for all ianvs users. A general federated learning scheme needs to be added into the ianvs core. Examples also need the advanced versions, i.e., the web socket version of federated learning and the incremental learning version of federated learning. Then the architecture can be revised accordingly.
- Another great contribution at the routine meeting is that the web socket issue mentioned by Yoda-wu, where sedna lacks a heartbeat message to ensure consistent network connection for federated learning in Web Socket. An issue could be raised in Sedna.
…he web socket version of federated learning and the incremental learning version of federated learning. Signed-off-by: Marchons <[email protected]>
Thanks for the suggestions! I update the prorposal and raise an issue in sedna: kubeedge/sedna#442 |
…he web socket version of federated learning and the incremental learning version of federated learning. Signed-off-by: Marchons <[email protected]>
…he web socket version of federated learning and the incremental learning version of federated learning. Signed-off-by: Marchons <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to see the updated version. As mentioned, the architecture can be revised accordingly. It will be appreciated if the developed part can be highlighted in the architecture.
- A general federated learning scheme needs to be added into the ianvs core (within controller).
- (The directory of ) Examples also need the advanced versions, i.e., the web socket version of federated learning and the incremental learning version of federated learning.
Signed-off-by: Marchons <[email protected]>
Thanks for the advice! I have updated the proposal:
|
…nt detail Signed-off-by: Marchons <[email protected]>
Signed-off-by: Marchons <[email protected]>
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hsj576, MooreZheng The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
…rcity: Base on KubeEdge-Ianvs
Federated Incremental Learning for Label Scarcity: Base on KubeEdge-Ianvs proposal
What type of PR is this?
/kind: design
What this PR does / why we need it:
The PR is a proposal to add a new benchmarking paradigm——federated class-incremental learning paradigm
Which issue(s) this PR fixes:
#97
Fixes #