From f19030862e6cab7089667aca89be6c3992152615 Mon Sep 17 00:00:00 2001 From: Faakhir30 Date: Sun, 15 Sep 2024 04:06:32 +0500 Subject: [PATCH] Added create and fetch aliases using csv. --- news/1812.feature | 1 + src/plone/restapi/services/aliases/add.py | 43 +++++++++++++++++- src/plone/restapi/services/aliases/get.py | 53 +++++++++++++++++++---- 3 files changed, 87 insertions(+), 10 deletions(-) create mode 100644 news/1812.feature diff --git a/news/1812.feature b/news/1812.feature new file mode 100644 index 0000000000..6d5f362f29 --- /dev/null +++ b/news/1812.feature @@ -0,0 +1 @@ +Aliases Service: Added upload and download aliases using csv. @Faakhir30 diff --git a/src/plone/restapi/services/aliases/add.py b/src/plone/restapi/services/aliases/add.py index 555fbdbe0f..1ccdcb16b6 100644 --- a/src/plone/restapi/services/aliases/add.py +++ b/src/plone/restapi/services/aliases/add.py @@ -84,9 +84,48 @@ class AliasesRootPost(Service): """Creates new aliases via controlpanel""" def reply(self): - data = json_body(self.request) storage = getUtility(IRedirectionStorage) - aliases = data.get("items", []) + + if self.request.getHeader("Content-Type") == "text/csv": + content = self.request.get("BODY", "") + aliases = [] + if not isinstance(content, bytes): + raise BadRequest("Invalid CSV content") + + content = content.decode("utf-8") + lines = content.strip().split("\n") + headers = lines[0].split(",") + headers = [header.strip() for header in headers] + expected_headers = ["old path", "new path", "datetime", "manual"] + if headers != expected_headers: + raise ValueError( + f"Invalid CSV headers. Expected {expected_headers}, but got {headers}" + ) + for line in lines[1:]: + if not line.strip(): + continue + fields = line.split(",") + if len(fields) != 4: + continue + path, target, date, is_manual = fields + if ( + not path + or not target + or not date + or is_manual not in ["True", "False"] + ): + continue + aliases.append( + { + "path": path, + "redirect-to": target, + "datetime": date, + "manual": is_manual == "True", + } + ) + else: + data = json_body(self.request) + aliases = data.get("items", []) # Disable CSRF protection if "IDisableCSRFProtection" in dir(plone.protect.interfaces): diff --git a/src/plone/restapi/services/aliases/get.py b/src/plone/restapi/services/aliases/get.py index c1fcf0b6ec..c71ef2913c 100644 --- a/src/plone/restapi/services/aliases/get.py +++ b/src/plone/restapi/services/aliases/get.py @@ -9,6 +9,7 @@ from zope.component.hooks import getSite from zope.interface import implementer from zope.interface import Interface +import json @implementer(IExpandableElement) @@ -26,7 +27,7 @@ def reply_item(self): redirects = storage.redirects(context_path) aliases = [deroot_path(alias) for alias in redirects] self.request.response.setStatus(201) - return [{"path": alias} for alias in aliases] + return [{"path": alias} for alias in aliases], len(aliases) def reply_root(self): """ @@ -50,22 +51,48 @@ def reply_root(self): items_total = len([item for item in newbatch]) return redirects, items_total + def reply_root_csv(self): + batch = RedirectsControlPanel(self.context, self.request).redirects() + redirects = [entry for entry in batch] + + for redirect in redirects: + del redirect["redirect"] + redirect["datetime"] = datetimelike_to_iso(redirect["datetime"]) + self.request.response.setStatus(201) + + self.request.form["b_start"] = "0" + self.request.form["b_size"] = "1000000" + self.request.__annotations__.pop("plone.memoize") + + filestream = RedirectsControlPanel(self.context, self.request).download() + content = filestream.read() + self.request.response.setHeader("Content-Type", "text/csv") + self.request.response.setHeader( + "Content-Disposition", "attachment; filename=redirects.csv" + ) + self.request.response.setHeader("Content-Length", str(len(content))) + return content + def __call__(self, expand=False): result = {"aliases": {"@id": f"{self.context.absolute_url()}/@aliases"}} if not expand: return result - if IPloneSiteRoot.providedBy(self.context): - items, items_total = self.reply_root() - result["aliases"]["items"] = items - result["aliases"]["items_total"] = items_total + if self.request.getHeader("Accept") == "text/csv": + result["aliases"]["items"] = self.reply_root_csv() + return result + else: + items, items_total = self.reply_root() else: - result["aliases"]["items"] = self.reply_item() - result["aliases"]["items_total"] = len(result["aliases"]["items"]) - + items, items_total = self.reply_item() + result["aliases"]["items"] = items + result["aliases"]["items_total"] = items_total return result +_no_content_marker = object() + + class AliasesGet(Service): """Get aliases""" @@ -73,6 +100,16 @@ def reply(self): aliases = Aliases(self.context, self.request) return aliases(expand=True)["aliases"] + def render(self): + self.check_permission() + content = self.reply() + if self.request.getHeader("Accept") == "text/csv": + return content + if content is not _no_content_marker: + return json.dumps( + content, indent=2, sort_keys=True, separators=(", ", ": ") + ) + def deroot_path(path): """Remove the portal root from alias"""