-
Notifications
You must be signed in to change notification settings - Fork 35
/
utils.py
607 lines (563 loc) · 24.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
"""
Helper functions for the XLIFF serializer
"""
from __future__ import annotations
import datetime
import difflib
import glob
import logging
import os
import uuid
from typing import TYPE_CHECKING
from django.conf import settings
from django.contrib import messages
from django.core import serializers
from django.core.cache import cache
from django.core.files.base import ContentFile
from django.core.files.storage import FileSystemStorage
from django.db import IntegrityError, transaction
from django.forms.models import model_to_dict
from django.utils.html import format_html, format_html_join
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext_lazy
from linkcheck import update_lock
from ..cms.constants import text_directions
from ..cms.forms import PageTranslationForm
from ..cms.models import Page, PageTranslation
from ..cms.utils.file_utils import create_zip_archive
from ..cms.utils.stringify_list import iter_to_string
from ..cms.utils.translation_utils import gettext_many_lazy as __
if TYPE_CHECKING:
from typing import Any
from uuid import UUID
from django.core.serializers.base import DeserializedObject
from django.http import HttpRequest
from ..cms.models.languages.language import Language
from ..cms.models.pages.page import PageQuerySet
upload_storage = FileSystemStorage(location=settings.XLIFF_UPLOAD_DIR)
download_storage = FileSystemStorage(
location=settings.XLIFF_DOWNLOAD_DIR, base_url=settings.XLIFF_URL
)
logger = logging.getLogger(__name__)
def pages_to_xliff_file(
request: HttpRequest,
pages: PageQuerySet,
target_languages: list[Language],
only_public: bool = False,
) -> str | None:
"""
Export a list of page IDs to a ZIP archive containing XLIFF files for a specified target language (or just a single
XLIFF file if only one page is converted)
:param request: The current request (used for error messages)
:param pages: list of pages which should be translated
:param target_languages: list of target languages (should exclude the region's default language)
:param only_public: Whether only public versions should be exported
:return: The path of the generated zip file
"""
logger.debug(
"XLIFF export started by %r for pages %r and languages %r",
request.user,
pages,
target_languages,
)
# Generate unique directory for this export
dir_name = uuid.uuid4()
# Create XLIFF files
xliff_paths = {}
for target_language in target_languages:
for page in pages:
try:
xliff_paths[f"{target_language}/{page}"] = page_to_xliff(
page, target_language, dir_name, only_public=only_public
)
except RuntimeError as e:
messages.error(request, e)
except RuntimeWarning as e:
messages.warning(request, e)
# Check how many XLIFF files were created
if not xliff_paths:
return None
if len(xliff_paths) == 1:
# If only one xliff file was created, return it directly instead of creating zip file
page, xliff_file_path = xliff_paths.popitem()
xliff_file_url = download_storage.url(
os.path.relpath(xliff_file_path, settings.XLIFF_DOWNLOAD_DIR)
)
logger.info(
"XLIFF export: %r converted to XLIFF file %r by %r",
page,
xliff_file_url,
request.user,
)
return xliff_file_url
# Generate file path for ZIP archive
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
region = pages[0].region
if len(target_languages) == 1:
target_language = target_languages[0]
zip_name = f"{region.slug}_{timestamp}_{region.get_source_language(target_language.slug).slug}_{target_language.slug}.zip"
else:
zip_name = f"{region.slug}_{timestamp}_multiple_languages.zip"
actual_filename = download_storage.save(f"{dir_name}/{zip_name}", ContentFile(""))
# Create ZIP archive
create_zip_archive(
list(xliff_paths.values()), download_storage.path(actual_filename)
)
xliff_file_url = download_storage.url(actual_filename)
logger.info(
"XLIFF export: %r converted to XLIFF ZIP archive %r by %r",
xliff_paths.keys(),
xliff_file_url,
request.user,
)
return xliff_file_url
def page_to_xliff(
page: Page, target_language: Language, dir_name: UUID, only_public: bool = False
) -> str:
"""
Export a page to an XLIFF file for a specified target language
:param page: Page which should be translated
:param target_language: The target language (should not be the region's default language)
:param dir_name: The directory in which the xliff files should be created
:param only_public: Whether only public versions should be exported
:raises RuntimeWarning: If the selected page translation does not have a source translation
:raises RuntimeError: When an unexpected error occurs during serialization
:return: The path of the generated XLIFF file
"""
target_page_translation = (
page.get_public_translation(target_language.slug)
if only_public
else page.get_public_or_draft_translation(target_language.slug)
)
if not target_page_translation:
# Create temporary target translation
target_page_translation = PageTranslation(
page=page,
language=target_language,
)
source_translation = (
target_page_translation.public_source_translation
if only_public
else target_page_translation.public_or_draft_source_translation
)
if not source_translation:
source_language = page.region.get_source_language(target_language.slug)
logger.warning(
"Page translation %r does not have a %ssource translation in %r and therefore cannot be exported to XLIFF.",
target_page_translation,
"published " if only_public else "",
source_language,
)
raise RuntimeWarning(
_(
"Page {} does not have a {}source translation in {} and "
"therefore cannot be exported as XLIFF for translation to {}."
).format(
target_page_translation.readable_title,
_("published") + " " if only_public else "",
source_language,
target_language,
)
)
try:
xliff_content = serializers.serialize(
settings.XLIFF_EXPORT_VERSION,
[target_page_translation],
only_public=only_public,
)
except Exception as e:
# All these error should already have been prevented
logger.exception(
"An unexpected error has occurred while exporting %r to XLIFF: %s",
target_page_translation,
e,
)
raise RuntimeError(
__(
_("An unexpected error has occurred while exporting page {}.").format(
target_page_translation.readable_title
),
_("Please try again later or contact an administrator."),
)
) from e
filename = (
f"{page.region.slug}_{source_translation.language.slug}_{target_language.slug}_"
f"{page.id}_{source_translation.version}_{source_translation.slug}.xliff"
)
actual_filename = download_storage.save(
f"{dir_name}/{filename}", ContentFile(xliff_content)
)
logger.debug("Created XLIFF file %r", actual_filename)
# Set "currently in translation" status for existing target translations
if settings.REDIS_CACHE:
target_page_translation.all_versions.invalidated_update(
currently_in_translation=True
)
else:
target_page_translation.all_versions.update(currently_in_translation=True)
return download_storage.path(actual_filename)
def xliffs_to_pages(
request: HttpRequest, xliff_dir: str
) -> dict[str, list[DeserializedObject]]:
"""
Export a page to an XLIFF file for a specified target language
:param request: The current request (used for error messages)
:param xliff_dir: The directory containing the xliff files
:return: A dict of all page translations as ``DeserializedObject``
"""
# Check if result is cached
if cached_result := cache.get(f"xliff-{xliff_dir}"):
logger.debug(
"Returning cached result for deserializing all XLIFF files of %r",
xliff_dir,
)
return cached_result
# Get all xliff files in the given directory (sort for deterministic order)
xliff_file_paths = sorted(glob.glob(f"{xliff_dir}/**/*.xliff", recursive=True))
page_translations = {}
for xliff_file_path in xliff_file_paths:
with open(xliff_file_path, "r", encoding="utf-8") as xliff_file:
logger.debug(
"Deserializing XLIFF file %r",
xliff_file_path,
)
xliff_file_path_rel = os.path.relpath(xliff_file_path, xliff_dir)
try:
# Try to deserialize the file
page_translations[xliff_file_path_rel] = list(
serializers.deserialize("xliff", xliff_file.read())
)
except Page.DoesNotExist:
logger.error(
"The page of XLIFF file %r does not exist.",
xliff_file_path,
)
messages.error(
request,
__(
_(
'The page referenced in XLIFF file "{}" could not be found.'
).format(xliff_file_path_rel),
_("Please contact the administrator."),
),
)
# In this case, we want to catch all exceptions because the import of the other files should work even if
# some xliff files are broken or other unexpected errors occur
except Exception as e: # pylint: disable=broad-except
# All these error should already have been prevented, so probably the XLIFF file is broken.
logger.exception(
"An unexpected error has occurred while importing XLIFF file %r: %s",
xliff_file_path,
e,
)
messages.error(
request,
__(
_(
'An unexpected error has occurred while importing XLIFF file "{}".'
).format(xliff_file_path_rel),
_("Please try again later or contact an administrator."),
),
)
# Store deserialized objects in cache
cache.set(f"xliff-{xliff_dir}", page_translations)
return page_translations
def get_xliff_import_diff(request: HttpRequest, xliff_dir: str) -> list[dict[str, Any]]:
"""
Show the XLIFF import diff
:param request: The current request (used for error messages)
:param xliff_dir: The directory containing the xliff files
:return: A dict containing data about the imported xliff files
"""
diff: dict[str, dict] = {}
for xliff_file, deserialized_objects in xliffs_to_pages(request, xliff_dir).items():
for deserialized in deserialized_objects:
page_translation = deserialized.object
# The prefetched translations now also contain the new deserialized object with id None, so we have to delete
# the cached property and query it from the database again.
try:
del page_translation.page.prefetched_translations_by_language_slug
except AttributeError:
pass
existing_translation = page_translation.latest_version or PageTranslation(
page=page_translation.page,
language=page_translation.language,
)
if (translation_key := get_translation_key(existing_translation)) in diff:
# Show global error to indicate duplicate translation
messages.error(
request,
format_html(
__(
_(
"Page <b>{}</b> from file <b>{}</b> was also translated in file <b>{}</b>."
),
_(
"Please check which of the files contains the most recent version and upload only this file."
),
_(
"If you confirm this import, only the first file will be imported and the latter will be ignored."
),
),
existing_translation.readable_title,
diff[translation_key]["file"],
xliff_file,
),
)
# Show warning in the section of the first occurrence of this duplicate
diff[translation_key]["errors"].append(
{
"level_tag": "warning",
"message": format_html(
_(
"This page was also translated in file <b>{}</b>, which will be ignored."
),
xliff_file,
),
}
)
# Skip this duplicated translation
continue
diff[translation_key] = {
"file": xliff_file,
"existing": existing_translation,
"import": page_translation,
"source_diff": {
"title": "\n".join(
list(
difflib.unified_diff(
[existing_translation.title],
[page_translation.title],
lineterm="",
)
)[2:]
),
"content": "\n".join(
list(
difflib.unified_diff(
existing_translation.content.splitlines(),
page_translation.content.splitlines(),
lineterm="",
)
)[2:]
),
},
"right_to_left": page_translation.language.text_direction
== text_directions.RIGHT_TO_LEFT,
"errors": get_xliff_import_errors_and_clean_translation(
request, page_translation, add_message_if_unchanged=True
)[0],
}
# Throw away the translation keys, only take the value dictionaries
return list(diff.values())
def xliff_import_confirm(
request: HttpRequest, xliff_dir: str, machine_translated: bool
) -> bool:
"""
Confirm the XLIFF import and write changes to database
:param request: The current request (used for error messages)
:param xliff_dir: The directory containing the xliff files
:param machine_translated: A flag indicating the import was marked as machine translated
:return: Whether the import was successful
"""
success = True
successful_imports = []
imports_without_changes = []
# Acquire linkcheck lock to avoid race conditions between post_save signal and links.delete()
with update_lock:
# Iterate over all xliff files
for xliff_file, deserialized_objects in xliffs_to_pages(
request, xliff_dir
).items():
# Iterate over all objects of one xliff file
# (typically, one xliff file contains exactly one page translation)
for deserialized in deserialized_objects:
page_translation = deserialized.object
errors, has_changed = get_xliff_import_errors_and_clean_translation(
request, page_translation
)
if errors:
logger.warning(
"XLIFF import of %r not possible because validation of %r failed with the errors: %r",
xliff_file,
page_translation,
errors,
)
messages.error(
request,
format_html(
"{} <ul>{}</ul>",
_(
"Page {} could not be imported successfully because of the errors:"
).format(page_translation.readable_title),
format_html_join(
"",
"<li><i icon-name='alert-triangle' class='pb-1'></i>{}</li>",
[[error["message"]] for error in errors],
),
),
)
success = False
else:
# Check if previous version already exists
if existing_translation := page_translation.latest_version:
# Delete link objects of existing translation
existing_translation.links.all().delete()
page_translation.machine_translated = machine_translated
# Confirm import and write changes to the database
try:
# Use encapsulated transaction to allow rollback in case of IntegrityError
with transaction.atomic():
page_translation.save()
except IntegrityError as e:
logger.error(
"%s when importing new version for %r from %r by %r: %s",
type(e).__name__,
existing_translation,
xliff_file,
request.user,
e,
)
messages.error(
request,
format_html(
__(
_(
"Page {} from the file <b>{}</b> could not be imported."
),
_(
"Check if you have uploaded any other conflicting files for this page."
),
_(
"If the problem persists, contact an administrator."
),
),
page_translation.readable_title,
xliff_file,
),
)
else:
if has_changed:
logger.info(
"%r of XLIFF file %r was imported successfully by %r",
page_translation,
xliff_file,
request.user,
)
successful_imports.append(page_translation.readable_title)
else:
logger.info(
"%r of XLIFF file %r was imported without changes by %r",
existing_translation,
xliff_file,
request.user,
)
imports_without_changes.append(
page_translation.readable_title
)
if successful_imports:
messages.success(
request,
ngettext_lazy(
"Page {} was imported successfully.",
"Pages {} were imported successfully.",
len(successful_imports),
).format(iter_to_string(successful_imports, quotation_char="")),
)
if imports_without_changes:
messages.info(
request,
ngettext_lazy(
"Page {} was imported without changes.",
"Pages {} were imported without changes.",
len(imports_without_changes),
).format(iter_to_string(imports_without_changes, quotation_char="")),
)
return success
def get_xliff_import_errors_and_clean_translation(
request: HttpRequest,
page_translation: PageTranslation,
add_message_if_unchanged: bool = False,
) -> tuple[list, bool]:
"""
Validate an imported page translation and return all errors
As a side effect, a unique slug is generated for the translation if it does not yet have one.
Additionally, the content of the translation will be cleaned by a PageTranslationForm.
:param request: The current request (used for error messages)
:param page_translation: The page translation which is being imported
:param add_message_if_unchanged: Whether a message should be added if no changes are detected
:return: All errors of this XLIFF import
"""
error_messages = []
# Get current region
region = request.region
# Check whether user can import the page translation
if not request.user.has_perm("cms.change_page_object", page_translation.page):
error_messages.append(
{
"level_tag": "error",
"message": _(
"You don't have the permission to import this page."
).format(page_translation.readable_title),
}
)
# Check whether the page translation belongs to the current region
if not page_translation.page.region == region:
error_messages.append(
{
"level_tag": "error",
"message": _("This page belongs to another region ({}).").format(
page_translation.page.region.full_name,
),
}
)
# Retrieve existing page translation in the target language
# The prefetched translations now also contain the new deserialized object with id None, so we have to delete
# the cached property and query it from the database again.
try:
del page_translation.latest_version
except AttributeError:
pass
try:
del page_translation.page.prefetched_translations_by_language_slug
except AttributeError:
pass
existing_translation = page_translation.latest_version
# Validate page translation
page_translation_form = PageTranslationForm(
data=model_to_dict(page_translation),
instance=existing_translation,
additional_instance_attributes={
"creator": request.user,
"language": page_translation.language,
"page": page_translation.page,
},
)
if not page_translation_form.is_valid():
error_messages.extend(
[
{"level_tag": message["type"], "message": message["text"]}
for message in page_translation_form.get_error_messages()
]
)
else:
# Use the unique slug generated by the form's clean-method for imported page translations
page_translation.slug = page_translation_form.instance.slug
# Use the cleaned content from the form
page_translation.content = page_translation_form.instance.content
if add_message_if_unchanged and not page_translation_form.has_changed():
error_messages.append(
{
"level_tag": "info",
"message": _("No changes detected."),
}
)
return error_messages, page_translation_form.has_changed()
def get_translation_key(translation: PageTranslation) -> str:
"""
Return a unique key for each translation to be able to find duplicates.
We cannot use the translation id here because the objects have not been written to the database.
:param translation: The translation
:return: A unique key for this ``Page``/``Language``/``version`` combination
"""
return f"{translation.page.id}-{translation.language.id}-{translation.version}"