-
Notifications
You must be signed in to change notification settings - Fork 18
/
cleanreg.py
executable file
·739 lines (615 loc) · 30.3 KB
/
cleanreg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
#!/usr/bin/env python
# coding=utf-8
import sys
import os
import requests
import argparse
from urllib.parse import urlparse
import re
import json
import collections
import yaml
from requests.auth import HTTPBasicAuth
from datetime import datetime
from multiprocessing import Manager, Process, Pool, current_process
from itertools import islice
from functools import partial
__author__ = 'Halil-Cem Guersoy (https://github.com/hcguersoy), ' \
'Kevin Krummenauer (https://github.com/kekru)', \
'Marvin becker (https://github.com/derwebcoder)', \
'Julian Sauer (https://github.com/JulianSauer)', \
'Jonas Tschoche (https://github.com/Jonas18175)', \
'Basilio Vera (https://github.com/bvis)', \
'Valentin Fedoskin (https://github.com/slamdev)'
__license__ = '''
------------------------------------------------------------------------------
Copyright 2022
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
------------------------------------------------------------------------------
'''
def parse_arguments():
parser = argparse.ArgumentParser(description='Removes images on a docker registry (v2).',
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-v', '--verbose', action='count', default=0,
help='The verbosity level. Increase verbosity by multiple usage, e.g. -vvv .')
parser.add_argument('-r', '--registry', help="The registry server to connect to, e.g. http://1.2.3.4:5000",
required=True)
parser.add_argument('-p', '--proxy', help="Use system level proxy settings accessing registry server if set. "
"By default, the registry server will be accessed without a "
" proxy.", default=False, action='store_true')
parser.add_argument('-y', '--yes', '--assume-yes', help="If set no user action will appear and all questions "
"will be answered with YES", default=False,
action='store_true', dest="assumeyes")
parser.add_argument('-q', '--quiet', help="[deprecated] If set no user action will appear and all questions will "
"be answered with YES", default=False, action='store_true')
parser.add_argument('-n', '--reponame', help="The name of the repo which should be cleaned up. Tags are optional.")
parser.add_argument('-cf', '--clean-full-catalog', help="If set all repos of the registry will be cleaned up, "
"keeping the amount of images specified in -k option. "
"The amount for each repo can be overridden in the repofile (-f).",
default=False, action='store_true', dest='clean_full_catalog')
parser.add_argument('-k', '--keepimages', help="Amount of images (not tags!) which should be kept "
"for the given repo (if -n is set) or for each repo of the "
"registry (if -cf is set).", default=0, type=int)
parser.add_argument('-re', '--regex', help="Interpret tagnames as regular expressions", default=False,
action='store_true', dest="regex")
parser.add_argument('-s', '--since', help="Keep images which were created since this date.", default=None)
parser.add_argument('-f', '--reposfile', help="A yaml file containing the list of Repositories with additional information "
"regarding tags, dates and how many images to keep.")
parser.add_argument('-c', '--cacert', help="Path to a valid CA certificate file. This is needed if self signed "
"TLS is used in the registry server.", default=None)
parser.add_argument('-sv', '--skip-tls-verify', help="If set insecure TLS is allowed, so no need for a valid cert to verify.", default=False, action='store_true', dest="skip_tls_verify")
parser.add_argument('-i', '--ignore-ref-tags', help="Ignore a digest if it is referenced multiple times "
"in the whole registry server. In this case, a list of all "
"repositories and their images will be retrieved which can be "
"time and memory consuming. "
"ATTENTION: the default is False so an image will be deleted "
"even it is referenced multiple times.",
default=False, action='store_true', dest='ignoretag')
parser.add_argument('-u', '--basicauth-user', help="The username, if the registry is protected with basic auth",
dest='basicauthuser')
parser.add_argument('-pw', '--basicauth-pw', help="The password, if the registry is protected with basic auth",
dest='basicauthpw')
parser.add_argument('-w', '--metadata-workers', help="Parallel workers to retrieve image metadata. "
"Default value is 6.",
default=6, type=int, dest='md_workers')
args = parser.parse_args()
# check if keepimages is set that it is not negative
if (args.keepimages is not None) and (args.keepimages < 0):
parser.error("[-k] has to be a positive integer!")
# check if date is valid
if args.since is not None:
if parse_date(args.since) == "":
parser.error("[-s] format does not match")
# hackish mutually exclusive group
if bool(args.reponame) and bool(args.reposfile):
parser.error("[-n] and [-f] cant be used together")
# hackish mutually exclusive group
if bool(args.reponame) and bool(args.clean_full_catalog):
parser.error("[-n] and [-cf] cant be used together")
# hackish dependent arguments
# Either using reponame/clean_full_catalog or keepimages/regex/since is not allowed unless using a reposfile with a regular expression
if (bool(args.reponame) or args.clean_full_catalog) ^ (args.keepimages != 0 or args.regex == True or args.since != None):
if not (bool(args.reposfile) and args.regex):
parser.error("[-n] or [-cf] have to be used together with [-k], [-re] or [-s].")
# hackish dependent arguments
# Either one of these parameters has to be used
if bool(args.reponame) is False and args.clean_full_catalog is False and bool(args.reposfile) is False:
parser.error("[-n|-k] or [-cf|-k] or [-f] has to be used!")
return args
def parse_date(date_string):
"""
Converts a string to datetime
:param date_string: Date as string
:return: datetime or empty string if date_string is in an invalid format
"""
try:
date = datetime.strptime(date_string, '%Y%m%d')
except ValueError:
try:
date = datetime.strptime(date_string, '%Y-%m-%d')
except ValueError:
try:
date = datetime.strptime(date_string, '%Y%m%dT%H%M%S')
except ValueError:
try:
date = datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%S')
except ValueError:
return ""
return date
def update_progress(current, maximum, factor=2):
if maximum == 0:
raise Exception('Maximum amount should not be zero.')
progress = round((100 * current) / maximum)
sys.stdout.write('\r{0}>> {1}%'.format('=' * (int(progress / factor)), progress))
sys.stdout.flush()
def query_yes_no(question, default="no"):
"""
Shameless copied from recipe 577058 - http://code.activestate.com/recipes/577058/
Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().strip().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
def print_headers(headers):
for header_element in headers:
print (" > {0} -> {1}".format(header_element, headers.get(header_element)))
def is_v2_registry(verbose, regserver, cacert=None):
"""
Checks if the given server is really a v2 registry.
:param verbose: verbosity level
:param regserver: the URL of the reg server
:param cacert: the path to a cacert file
:return: True if it is really a v2 server
"""
if verbose > 0:
print ('Check if registry server supports v2...')
check_url = regserver
check_result = requests.get(check_url, verify=cacert, auth=get_auth())
if verbose > 1:
print ("Check result code:", check_result.status_code)
print ("Headers")
print_headers(check_result.headers)
# check if result header contains API version
if 'Docker-Distribution-Api-Version' in check_result.headers and \
check_result.headers['Docker-Distribution-Api-Version'] == 'registry/2.0':
has_api_v2 = True
else:
has_api_v2 = False
if check_result.status_code == requests.codes.ok and has_api_v2:
if verbose > 0:
print ("Registry server supports v2!")
return True
elif check_result.status_code == requests.codes.ok and has_api_v2 is False:
print ("This is really strange... someone fakes you?")
return False
elif check_result.status_code != requests.codes.ok and has_api_v2 is False:
print ("This is not a v2 registry server: ", regserver)
return False
elif check_result.status_code != requests.codes.ok and has_api_v2:
print ("Found a v2 repo but return code is ", check_result.status_code)
return False
def generate_request_headers(api_version=2):
if api_version == 1:
accept_string = 'application/vnd.docker.distribution.manifest.v1+json'
else:
accept_string = 'application/vnd.docker.distribution.manifest.v2+json'
headers = {'Accept': accept_string}
return headers
def get_auth():
if (args.basicauthuser is not None) and (args.basicauthpw is not None):
return HTTPBasicAuth(args.basicauthuser, args.basicauthpw)
else:
return None
def get_digest_by_tag(verbose, regserver, repository, tag, cacert=None):
"""
Retrieves the Digest of an image tag.
:param verbose: verbosity level
:param regserver: the URL of the reg server
:param repository: the repositroy name
:param tag: the tag of the image
:param cacert: the path to a cacert file
:return: The docker image digest
"""
# set accept type
req_headers = generate_request_headers()
req_url = regserver + repository + "/manifests/" + tag
if verbose > 1:
print ("Will use following URL to retrieve digest:", req_url)
head_result = requests.head(req_url, headers=req_headers, verify=cacert, auth=get_auth())
head_status = head_result.status_code
if verbose > 2:
print ("Digest head result status code is:", head_status)
print ("Digest head header is:")
print_headers(head_result.headers)
# check the return code and exit if not OK
if head_status != requests.codes.ok:
print ("The digest could not be retrieved due to error:", head_status)
if verbose > 0:
print (head_result)
sys.exit(2)
# if the header doesn't contains the digest information exit, too
if 'Docker-Content-Digest' not in head_result.headers:
print ("Could not find any digest information in the header. Exiting")
sys.exit(3)
# everything looks fine so we continue
cur_digest = head_result.headers['Docker-Content-Digest']
if verbose > 0:
print ("Digest for image {0}:{1} is [{2}]".format(repository, tag, cur_digest))
return cur_digest
def delete_manifest(verbose, regserver, repository, cur_digest, cacert=None):
"""
Deletes a manifest based on a digest.
Be aware that a digest can be associated with multiple tags!
:param verbose: verbosity level
:param regserver: the URL of the reg server
:param repository: the repositroy name
:param cur_digest: the digest if the image which has to be deleted
:param cacert: the path to a cacert file
"""
# Attention: this is needed if you are running a registry >= 2.3
req_headers = generate_request_headers()
req_url = regserver + repository + "/manifests/" + cur_digest
# as specified by the v2 API, DELETE returns a 202
# Be aware of the real intention of this status code:
# "The request has been accepted for processing, but the processing has not been completed."
# s. https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
del_status_ok = 202
if verbose > 1:
print ("Will use following URL to delete manifest:", req_url)
delete_result = requests.delete(req_url, headers=req_headers, verify=cacert, auth=get_auth())
delete_status = delete_result.status_code
if verbose > 1:
print ("Delete result status code is:", delete_status)
if verbose > 2:
print ("Delete result header is:")
print_headers(delete_result.headers)
if delete_status != del_status_ok:
print ("The manifest could not be deleted due to an error:", delete_status)
if verbose > 1:
print (delete_result)
sys.exit(12)
if verbose > 0:
print ("Deleted manifest with digest", cur_digest)
def deletion_digests(verbose, del_tags, digests_counts, ignore):
"""
High level method to retrieve digests to be deleted from a repository, based on tags.
:param verbose: verbosity level
:param del_tags The tags to be deleted of this repository
:param digests_counts A dict containing all digest and how often they occur
:param ignore: ignore tags if their digests are referenced multiple times (occurrence > 1)
:return The list of digests which have to be deleted
"""
deletion_digests = []
for tag, data in del_tags.items():
if ignore is True and digests_counts[data['digest']] > 1:
if verbose > 0:
print ("Ignoring digest {0} as it is referenced multiple times!".format(data['digest']))
else:
deletion_digests.append(data['digest'])
return deletion_digests
def get_all_repos(verbose, regserver, cacert=None):
"""
A method to retrieve a list of all repositories on the registry server.
:param verbose: verbosity level
:param regserver: The registry server
:param cacert: the path to a cacert file
:return: A list with all repositories
"""
req_url = regserver + "_catalog"
if verbose > 1:
print ("Will use URL {0} to retrieve a list of all repositories:".format(req_url))
repos_result = requests.get(req_url, verify=cacert, auth=get_auth())
repos_status = repos_result.status_code
if args.verbose > 2:
print ("Get catalog result is:", repos_status)
# check the return code and exit if not OK
if repos_status != requests.codes.ok:
print ("The tags could not be retrieved due to error:", repos_status)
if args.verbose > 0:
print (repos_result)
sys.exit(2)
repos_result_json = repos_result.json()
repos_all = repos_result_json['repositories']
if verbose > 1:
print ("Found repos: {0} ".format(repos_all))
return repos_all
def create_repo_list(cmd_args, regserver):
"""
Builds up a dict of repositories which have to be cleaned up and which
images have to be kept.
If the ignoreflag is set, a list of all repositories will be retrieved.
:param regserver: The registry server
:param cmd_args: the command line arguments
:return: A dict in the format repositoryname : image tag to delete, amount of images to be kept, date since when
image will be kept and a list of the repository names
"""
found_repos_counts = {}
all_registry_repos = get_all_repos(cmd_args.verbose, regserver, cmd_args.cacert)
if bool(cmd_args.reponame) is True:
if cmd_args.verbose > 1:
print ("In single repo mode.")
print ("Will keep matching images from repo {0}".format(cmd_args.reponame))
splittedNames = cmd_args.reponame.split(':')
repo = splittedNames[0]
tagname = ''
if len(splittedNames) == 2:
tagname = splittedNames[1]
found_repos_counts[repo] = (cmd_args.keepimages, tagname, cmd_args.since)
if cmd_args.verbose > 2:
print ("repos_counts: ", found_repos_counts)
if cmd_args.clean_full_catalog is True:
if cmd_args.verbose > 1:
print ("Importing all repos of the registries catalog, keeping {0} images per repo.".format(cmd_args.keepimages))
for repo in all_registry_repos:
splittedNames = repo.split(':')
repo = splittedNames[0]
tagname = ''
if len(splittedNames) == 2:
tagname = splittedNames[1]
found_repos_counts[repo] = (cmd_args.keepimages, tagname, cmd_args.since)
if bool(args.reposfile) is True:
if cmd_args.verbose > 1:
print ("Will read repo information from file {0}".format(cmd_args.reposfile))
with open(cmd_args.reposfile) as repoFile:
repos = yaml.safe_load(repoFile)
for repoName in repos:
if cmd_args.verbose > 2:
print ("Reading config for {0}: {1}".format(repoName, repos.get(repoName)))
try:
tagName = str(repos[repoName]['tag'])
except KeyError:
tagName = ""
try:
keep = int(repos[repoName]['keepimages'])
except KeyError:
keep = 0
try:
since = str(repos[repoName]['keepsince'])
except KeyError:
since = ""
if cmd_args.verbose > 2:
print (" Parsed to:")
print (" tagname: {0}, keepimages: {1}, since: {2}".format(tagName, keep, since))
found_repos_counts[repoName] = (keep, tagName, since)
if cmd_args.verbose > 1:
print ("These repos will be processed:")
print (found_repos_counts)
iter_found_repos_count = found_repos_counts.copy()
for repo in iter_found_repos_count:
if repo not in all_registry_repos:
del found_repos_counts[repo]
if cmd_args.verbose > 1:
print ("Skipping repo {0} because it is not in the catalog.".format(repo))
if cmd_args.ignoretag is True:
repos = all_registry_repos
else:
repos = found_repos_counts.keys()
return found_repos_counts, repos
def retrieve_metadata(tag, verbose, regserver, repo, managed_tags_date_digests,
managed_digests, cacert):
if verbose > 2:
print ("Processing in", current_process())
metadata_request = regserver + repo + "//manifests/" + tag
metadata_header = {'Accept': 'application/vnd.docker.distribution.manifest.v1+json'}
metadata = requests.get(metadata_request, headers=metadata_header, verify=cacert,
auth=get_auth()).json()
creation_date = json.loads(metadata['history'][0]['v1Compatibility'])['created']
digest = get_digest_by_tag(verbose, regserver, repo, tag, cacert)
managed_tags_date_digests[tag] = {'date': creation_date, 'digest': digest}
managed_digests.append(digest)
if verbose > 2:
print ("Added {0} to tag {1} on repo {2}".format(managed_tags_date_digests[tag], tag, repo))
return managed_tags_date_digests, managed_digests
def get_tags_dates_digests_byrepo(verbose, regserver, repo, results, digests, md_workers, cacert=None):
"""
Retrieves all Tags, the creation date of the layer the tag point to and digest of the layer.
:param verbose: The verbosity level
:param regserver: The registry server
:param repo: The repository name
:param results: A managed dict which is used to return a dict containing the tag, date and digest
:param digests: A managed list which contains a list of all found digests, used to check for multiple usage
:param md_workers: Amount of parallel workers to retrieve metadata
:param cacert: The path to the certificate file
:return: Returns using the managed collections results and digests
"""
manager = Manager()
managed_tags_date_digests = manager.dict()
pool = Pool(processes=md_workers)
req_url = regserver + repo + "/tags/list"
if verbose > 1:
print ("Will use URL {0} to retrieve tags for repo {1}:".format(req_url, repo))
tags_result = requests.get(req_url, verify=cacert, auth=get_auth())
tags_status = tags_result.status_code
if args.verbose > 2:
print ("Get tags result is:", tags_status)
# check the return code and exit if not OK
if tags_status != requests.codes.ok:
print ("The tags could not be retrieved due to error:", tags_status)
if args.verbose > 0:
print (tags_result)
sys.exit(2)
tags_result_json = tags_result.json()
tags_all = tags_result_json['tags']
if verbose > 1:
print ("Found tags for repo {0}: {1} ".format(repo, tags_all))
if tags_all is None:
amount_tags = 0
else:
amount_tags = len(tags_all)
if verbose > 2:
print ("amount_tags : ", amount_tags)
if verbose > 0:
print ("Retrieving metada for repository ", repo)
funcpart = partial(retrieve_metadata, verbose=verbose, regserver=regserver, repo=repo,
managed_tags_date_digests=managed_tags_date_digests,
managed_digests=digests, cacert=cacert)
pool.map(funcpart, tags_all)
# convert managed dict to a "normal dict" and put it into the other managed dict...
# Feels so unpythonic, should rewrite the stuff
# TODO make this more pythonic
tags_date_digests = {}
for (k, v) in managed_tags_date_digests.items():
tags_date_digests[k] = v
results[repo] = tags_date_digests
def get_all_tags_dates_digests(verbose, regserver, repositories, md_workers, cacert=None):
"""
Retrieve all tags and finally digests for all repositories.
:param verbose: verbosity level
:param regserver: the URL of the reg server
:param repositories: the list of repositories to be cleaned up
:param cacert: the path to a cacert file
:return: a nested dict containing all repos and for each repo the list of all tags and their digests.
"""
result = {}
manager = Manager()
repos_tags_digest = manager.dict()
managed_digests = manager.list()
procs = []
print ("Retrieving tags and digests. Be patient, this can take a little bit time.")
for repo in repositories:
if verbose > 0:
print ("Starting procs for {0}".format(repo))
# start a process to retrieve the needed data
proc = Process(target=get_tags_dates_digests_byrepo, args=(verbose, regserver, repo, repos_tags_digest,
managed_digests, md_workers, cacert))
procs.append(proc)
proc.start()
for proc in procs:
if verbose > 1:
print ("Waiting for {0} to be finished.".format(proc))
proc.join()
for repo in repositories:
if verbose > 0:
print ("Retrieving results...")
result[repo] = repos_tags_digest[repo]
return result, managed_digests
def get_deletiontags(verbose, tags_dates_digests, repo, tagname, keep_count, regex, since):
"""
Returns a dict containing a list of the tags which could be deleted due
to name and date.
:param tags_dates_digests: A dict containing image tags, their corresponding digest and the layer creation date
:param verbose: The verbosity level
:param repo: the repository name
:param tagname: tag of the repo
:param keep_count: amount of tags to be kept in repository
:param regex: True if tagnames should be interpreted as regular expressions
:param since: Keeps tags which were created since this date
:return: a dict of tags to be deleted, their digest and the date then they are created
"""
all_tags = collections.OrderedDict(sorted(tags_dates_digests.items(), key=lambda x: x[1]['date']))
deletion_tags = {}
if verbose > 3:
print (json.dumps(all_tags, indent=2))
if all_tags is None:
amount_tags = 0
else:
amount_tags = len(all_tags)
if keep_count is None:
keep_count = 0
if verbose > 1:
print ("Repo {0}: amount_tags : {1}; repo_count: {2}".format(repo, amount_tags, keep_count))
deletion_tags = all_tags.copy()
processed_tags = all_tags.copy()
if regex and tagname != "":
for tag in deletion_tags.keys():
if not re.match(tagname, tag):
del processed_tags[tag]
elif not regex and tagname != "":
processed_tags = {k: deletion_tags[k] for k in deletion_tags if tagname == k}
if since is not None and since != "":
deletion_tags = processed_tags.copy()
parsed_date = parse_date(since)
print ("Will delete and keep images created since {0}".format(parsed_date))
for tag in deletion_tags.keys():
deletion_tag_date = deletion_tags[tag]['date']
if (deletion_tag_date.endswith('Z')):
deletion_tag_date = deletion_tag_date[:-1]
tag_date = datetime.strptime(deletion_tag_date.split('.')[0], '%Y-%m-%dT%H:%M:%S')
print ("Date: {0}".format(tag_date))
if tag_date >= parsed_date:
del processed_tags[tag]
# considers keep_count to check if too many images are marked for deletion
delete_count = amount_tags - keep_count
if len(processed_tags) > delete_count:
deletion_tags = processed_tags.copy()
if amount_tags <= keep_count:
# keep all images
processed_tags.clear()
else:
# removes the last keep_count tags from deletion_tags
processed_tags = collections.OrderedDict(islice(deletion_tags.items(), delete_count))
if verbose > 1:
print ()
print ("Deletion candidates for repo {0}".format(repo))
print (json.dumps(processed_tags, indent=2))
else:
if verbose > 0:
print ("Skipping deletion in repo {0} because not enough images.".format(repo))
return processed_tags
# >>>>>>>>>>>>>>>> MAIN STUFF
if __name__ == '__main__':
args = parse_arguments()
reg_server_api = args.registry + "/v2/"
if args.proxy is False:
if args.verbose > 1:
print ("Will exclude registryserver location from proxy:", urlparse(args.registry).netloc)
os.environ['no_proxy'] = urlparse(args.registry).netloc
if args.skip_tls_verify:
args.cacert = False
# initially check if we've a v2 registry server
if is_v2_registry(args.verbose, reg_server_api, args.cacert) is False:
print ("Exiting, none V2 registry.")
sys.exit(1)
repos_counts, repos = create_repo_list(args, reg_server_api)
x = 0
repo_tags_dates_digest, all_digests = get_all_tags_dates_digests(args.verbose, reg_server_api, repos,
args.md_workers, args.cacert)
if args.verbose > 2:
print ("List of all repos, tags, their creation dates and their digests:")
print(json.dumps(repo_tags_dates_digest, indent=2))
print (all_digests)
diggests_occurrences = collections.Counter(all_digests)
digests_counts = dict(diggests_occurrences)
repo_del_tags = {}
repo_del_digests = {}
for repo, (count, tagname, since) in repos_counts.items():
x += 1
update_progress(x, len(repos_counts))
if args.verbose > 0:
print ()
print ("Will delete repo {0} and keep at least {1} images.".format(repo, count))
del_tags = get_deletiontags(args.verbose, repo_tags_dates_digest[repo], repo, tagname, count, args.regex, since)
if len(del_tags) > 0:
repo_del_tags[repo] = del_tags
repo_del_digests[repo] = set(deletion_digests(args.verbose, del_tags, digests_counts, args.ignoretag))
answer = True
if args.assumeyes is False and args.quiet is False and len(repo_del_digests) > 0:
print ()
print ("Repos and according digests to be deleted:")
for repo, del_digests in repo_del_digests.items():
print ("Repository: ", repo)
for digest in del_digests:
print (" {0}".format(digest))
answer = query_yes_no("Do you realy want to delete them?")
if answer is True and len(repo_del_digests) > 0:
print ("Deleting!")
for repo, del_digests in repo_del_digests.items():
for digest in del_digests:
print ("Deleting ", digest)
delete_manifest(args.verbose, reg_server_api, repo, digest, args.cacert)
else:
print ("Aborted by user or nothing to delete.")
sys.exit(1)
print ()
print ("Finished")
sys.exit(0)