From beb652e86191b5d1d856870ec3c404b4a9972c09 Mon Sep 17 00:00:00 2001 From: Twarit Verma Date: Tue, 3 Sep 2024 15:28:36 +0530 Subject: [PATCH] adding the updated changes around code and documentation optimization --- README.md | 172 +++++++++++++++- sample.py | 136 ++++++------ test.py | 90 ++++---- ultra_rest_client/about.py | 8 +- ultra_rest_client/connection.py | 126 +++++------ ultra_rest_client/ultra_rest_client.py | 275 +++++++++++++++---------- 6 files changed, 517 insertions(+), 290 deletions(-) diff --git a/README.md b/README.md index 572d827..256adda 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,20 @@ -python_rest_api_client -====================== +# UltraDNS REST API Client for Python -A sample Python client for communicating with the UltraDNS REST API +This is a Python client for communicating with the UltraDNS REST API. It provides a simple and intuitive interface for interacting with the UltraDNS services. -Dependencies and Installation -======================== +Jump To: + +* [Getting Started](#Getting-Started) +* [Usage](#Usage) +* [Functionality](#Functionality) +* [API Reference](#API-Reference) +* [Contributing](#Contributing) +* [License](#License) +* [Questions](#Questions) + +## Getting Started + +### Dependencies and Installation This sample code depends on the requests library, which can be found at: http://docs.python-requests.org/en/latest/ @@ -14,13 +24,155 @@ If you have pip installed, you can add the client and requests to your environme pip install ultra_rest_client ``` -Functionality -============= +Once installed, you can use the `ultra_rest_client` module in your Python scripts: + +```python +from ultra_rest_client import RestApiClient +client = RestApiClient(args) +``` + +## Usage + +### Authentication + +#### Authenticating using Username and Password + +```python +from ultra_rest_client import RestApiClient + +client = RestApiClient(your_username, your_password) + +domain = "udns-python-rest-client-test.com." + +# Get Zone Metadata +print(f"Get metadata for zone {domain}: {client.get_zone_metadata(domain)}") +``` + +#### Authenticating using Bearer Token and Refresh Token + +```python +from ultra_rest_client import RestApiClient + +client = RestApiClient(your_bearer_token, your_refresh_token, use_token=True) + +domain = "udns-python-rest-client-test.com." + +# Get Zone Metadata +print(f"Get metadata for zone {domain}: {client.get_zone_metadata(domain)}") +``` + +#### Authenticating using Bearer Token + +```python +from ultra_rest_client import RestApiClient + +client = RestApiClient(your_bearer_token, use_token=True) + +domain = "udns-python-rest-client-test.com." + +# Get Zone Metadata +print(f"Get metadata for zone {domain}: {client.get_zone_metadata(domain)}") +``` + +### Quick Examples +This example shows a complete working python file which will create a primary zone in UltraDNS. This example highlights how to get services using client and make requests. + +```python +#!/usr/bin/env python3 + +from ultra_rest_client import RestApiClient +import sys + +def create_zone(client, domain): + """Create a zone in UltraDNS. This function will create a zone with the name specified in the domain argument. + It uses the accounts API to get the account name. This is required to create a zone. + + Args: + - client (RestApiClient): An instance of the RestApiClient class. + - domain (str): The domain name to be created. + + Returns: + - dict: The response body. + """ + account_details = client.get_account_details() + account_name = account_details['accounts'][0]['accountName'] + return client.create_primary_zone(account_name, domain) + +def create_a_record(client, domain): + """Create an A record in UltraDNS. This function will create an A record with the name specified in the domain + + Args: + - client (RestApiClient): An instance of the RestApiClient class. + - domain (str): The domain name. + """ + return client.create_rrset(domain, "A", domain, 300, "192.0.2.1") + + +def create_cname_record(client, domain): + """Create a CNAME record in UltraDNS. This function will create a CNAME record with the name specified in the domain + + Args: + - client (RestApiClient): An instance of the RestApiClient class. + - domain (str): The domain name. + + Returns: + - dict: The response body. + """ + return client.create_rrset(domain, "CNAME", f"www.{domain}", 300, [domain]) + +def delete_zone(client, domain): + """Delete the zone from UltraDNS. + + Args: + - client (RestApiClient): An instance of the RestApiClient class. + - domain (str): The domain name. + """ + client.delete_zone(domain) + return "Zone deleted Successfully" + +def main(): + """The main function. This is the entry point for the script. It parses the command line arguments and calls the + create_zone, create_a_record, and create_cname_record functions.""" + + username = sys.argv[1] + password = sys.argv[2] + domain = "ultra-rest-client-test.com." + + # Create an instance of your client + client = RestApiClient(username, password) + + # Create the domain + print(f"Creating zone {domain}: {create_zone(client, domain)}") + + # Create an A record for the domain + print(f"Creating an A record pointing to 192.0.2.1: {create_a_record(client, domain)}") + + # Create a CNAME record for the domain + print(f"Creating a 'www' CNAME pointing to {domain}: {create_cname_record(client, domain)}") + + # Delete the domain + print(f"Deleting zone {domain}: {delete_zone(client, domain)}") + +if __name__ == "__main__": + main() +``` + +## Functionality + +The sample code does not attempt to implement a client for all available UltraDNS REST API functionality. It provides access to basic functionality. Adding additional functionality should be relatively straightforward, and any contributions from the UltraDNS community would be greatly appreciated. See [sample.py](sample.py) for an example of how to use this library in your own code. + +## API Reference + +For detailed API reference, please refer to the UltraDNS API documentation. + +## Contributing + +Contributions are always welcome! Please open a pull request with your changes, or open an issue if you encounter any problems or have suggestions. -The sample code does not attempt to implement a client for all available UltraDNS REST API functionality. It provides access to basic functionality. Adding additional functionality should be relatively straightforward, and any contributions from the UltraDNS community would be greatly appreciated. See sample.py for an example of how to use this library in your own code. +## License +This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for more details. -Questions -========= +## Questions Please contact UltraDNS support if you have any questions or encounter any issues with this code. diff --git a/sample.py b/sample.py index ae34a3c..1cff18f 100644 --- a/sample.py +++ b/sample.py @@ -1,103 +1,105 @@ -# Copyright 2023 - Vercara. All rights reserved. +# Copyright 2024 - Vercara. All rights reserved. # Vercara, the Vercara logo and related names and logos are registered # trademarks, service marks or tradenames of Vercara. All other # product names, company names, marks, logos and symbols may be trademarks # of their respective owners. __author__ = 'UltraDNS' -import ultra_rest_client +from ultra_rest_client import RestApiClient import sys import time -if len(sys.argv) != 5 and len(sys.argv) != 3: - raise Exception("Expected use: python sample.py username password [use_http host:port]") +if len(sys.argv) != 6 and len(sys.argv) != 3: + raise Exception("Expected use: python sample.py username password [use_token use_http host:port]") username = sys.argv[1] password = sys.argv[2] use_http = 'False' +use_token = 'False' domain = 'restapi.ultradns.com' -if len(sys.argv) == 5: - use_http = sys.argv[3] - domain = sys.argv[4] +if len(sys.argv) == 6: + use_token = sys.argv[3] + use_http = sys.argv[4] + domain = sys.argv[5] -c = ultra_rest_client.RestApiClient(username, password, 'True' == use_http, domain) -print 'version %s' % c.version() -print 'status %s' % c.status() -account_details = c.get_account_details() -account_name = account_details[u'accounts'][0][u'accountName'] -print 'account name %s' % account_name -print 'get zone metadata %s' % c.get_zone_metadata_v3("mjoy-example.com.") -print 'get first 5 primary zones with mjoy: %s' % c.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"name":"mjoy", "zone_type":"PRIMARY"}) -print '\n' -print 'get first 5 secondary zones: %s' % c.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"zone_type":"SECONDARY"}) -print '\n' -print 'get all zones with 20 zones per page. First page is returned by default: %s' % c.get_zones_v3(limit=20, q={"zone_status":"ALL"}) -print '\n' -print 'get next page of zones with 20 zones per page. Cursor returned by above request is used: %s' % c.get_zones_v3(limit=20, cursor='MDAwMC10YW52aS1zaWduZWQuY29tLjpORVhU', q={"zone_status":"ALL"}) -print 'create primary zone result %s' % c.create_primary_zone(account_name, "foo.invalid.") -print 'get zone metadata %s' % c.get_zone_metadata("foo.invalid.") -print 'delete zone %s ' % c.delete_zone("foo.invalid.") -all_zones = c.get_zones_of_account(account_name, offset=0, limit=5, reverse=True) -print all_zones -first_zone_name = all_zones[u'zones'][0][u'properties'][u'name'] -print 'zone name %s ' % first_zone_name -print 'get_rrsets %s ' % c.get_rrsets(first_zone_name) -print 'create_rrset %s ' % c.create_rrset(first_zone_name, "A", "foo", 300, "1.2.3.4") -print 'get_rrsets %s ' % c.get_rrsets(first_zone_name) -print 'get_rrsets_by_type %s ' % c.get_rrsets_by_type(first_zone_name, "A") -print 'edit_rrset %s ' % c.edit_rrset(first_zone_name, "A", "foo", 100, ["10.20.30.40"]) -print 'get_rrsets %s ' % c.get_rrsets(first_zone_name) -print 'get_rrsets_by_type %s ' % c.get_rrsets_by_type(first_zone_name, "A") -print 'get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(first_zone_name, "A", "foo") -print 'delete_rrset %s ' % c.delete_rrset(first_zone_name, "A", "foo") -print 'get_rrsets %s ' % c.get_rrsets(first_zone_name) -print 'get_rrsets_by_type %s ' % c.get_rrsets_by_type(first_zone_name, "A") -print 'get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(first_zone_name, "A", "foo") -print 'batch delete %s ' % c.batch([ - {'method': 'DELETE', 'uri': '/v1/zones/' + first_zone_name + '/rrsets/A/foo2'}, - {'method': 'DELETE', 'uri': '/v1/zones/' + first_zone_name + '/rrsets/A/foo3'}, -]) +test_zone_name='udns-python-rest-client-test.com.' -print 'get_rrsets_by_type %s ' % c.get_rrsets_by_type(first_zone_name, "A") -print 'get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(first_zone_name, "A", "foo2") -print 'batch create %s ' % c.batch([ - {'method': 'POST', 'uri': '/v1/zones/' + first_zone_name + '/rrsets/A/foo2', 'body': {'ttl': 100, 'rdata': ['2.4.6.8']}}, - {'method': 'POST', 'uri': '/v1/zones/' + first_zone_name + '/rrsets/A/foo3', 'body': {'ttl': 100, 'rdata': ['20.40.60.80']}}, -]) -print 'get_rrsets_by_type %s ' % c.get_rrsets_by_type(first_zone_name, "A") -print 'get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(first_zone_name, "A", "foo2") -print 'batch delete %s ' % c.batch([ - {'method': 'DELETE', 'uri': '/v1/zones/' + first_zone_name + '/rrsets/A/foo2'}, - {'method': 'DELETE', 'uri': '/v1/zones/' + first_zone_name + '/rrsets/A/foo3'}, -]) -print 'get_rrsets_by_type %s ' % c.get_rrsets_by_type(first_zone_name, "A") -print 'get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(first_zone_name, "A", "foo2") +client = RestApiClient(username, password, 'True' == use_token, 'True' == use_http, domain) +print('version %s' % client.version()) +print('status %s' % client.status()) +account_details = client.get_account_details() +account_name = account_details['accounts'][0]['accountName'] +print('account name %s' % account_name) +print('get zone metadata %s' % client.get_zone_metadata_v3("mjoy-example.com.")) +print('sleeping for 20 mins') +print('get first 5 primary zones with mjoy: %s' % client.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"name":"mjoy", "zone_type":"PRIMARY"})) +print('\n') +print('get first 5 secondary zones: %s' % client.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"zone_type":"SECONDARY"})) +print('\n') +print('get all zones with 20 zones per page. First page is returned by default: %s' % client.get_zones_v3(limit=20, q={"zone_status":"ALL"})) +print('\n') +print('get next page of zones with 20 zones per page. Cursor returned by above request is used: %s' % client.get_zones_v3(limit=20, cursor='MDAwMC10YW52aS1zaWduZWQuY29tLjpORVhU', q={"zone_status":"ALL"})) +print('create primary zone result %s' % client.create_primary_zone(account_name, "foo.invalid.")) +print('get zone metadata %s' % client.get_zone_metadata("foo.invalid.")) +print('delete zone %s ' % client.delete_zone("foo.invalid.")) +print('zone name %s ' % test_zone_name) +print('get_rrsets %s ' % client.get_rrsets(test_zone_name)) +print('create_rrset %s ' % client.create_rrset(test_zone_name, "A", "foo", 300, "1.2.3.4")) +print('get_rrsets %s ' % client.get_rrsets(test_zone_name)) +print('get_rrsets_by_type %s ' % client.get_rrsets_by_type(test_zone_name, "A")) +print('edit_rrset %s ' % client.edit_rrset(test_zone_name, "A", "foo", 100, ["10.20.30.40"])) +print('get_rrsets %s ' % client.get_rrsets(test_zone_name)) +print('get_rrsets_by_type %s ' % client.get_rrsets_by_type(test_zone_name, "A")) +print('get_rrsets_by_type_owner %s ' % client.get_rrsets_by_type_owner(test_zone_name, "A", "foo")) +print('delete_rrset %s ' % client.delete_rrset(test_zone_name, "A", "foo")) +print('get_rrsets %s ' % client.get_rrsets(test_zone_name)) +print('get_rrsets_by_type %s ' % client.get_rrsets_by_type(test_zone_name, "A")) +print('get_rrsets_by_type_owner %s ' % client.get_rrsets_by_type_owner(test_zone_name, "A", "foo")) +print('batch delete %s ' % client.batch([ + {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo2'}, + {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo3'}, +])) + +print('get_rrsets_by_type %s ' % client.get_rrsets_by_type(test_zone_name, "A")) +print('get_rrsets_by_type_owner %s ' % client.get_rrsets_by_type_owner(test_zone_name, "A", "foo2")) +print('batch create %s ' % client.batch([ + {'method': 'POST', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo2', 'body': {'ttl': 100, 'rdata': ['2.4.6.8']}}, + {'method': 'POST', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo3', 'body': {'ttl': 100, 'rdata': ['20.40.60.80']}}, +])) +print('get_rrsets_by_type %s ' % client.get_rrsets_by_type(test_zone_name, "A")) +print('get_rrsets_by_type_owner %s ' % client.get_rrsets_by_type_owner(test_zone_name, "A", "foo2")) +print('batch delete %s ' % client.batch([ + {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo2'}, + {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo3'}, +])) +print('get_rrsets_by_type %s ' % client.get_rrsets_by_type(test_zone_name, "A")) +print('get_rrsets_by_type_owner %s ' % client.get_rrsets_by_type_owner(test_zone_name, "A", "foo2")) #getting zones with q, sort, offset, limit -print 'get first 5 primary zones with j: %s' % c.get_zones(offset=0, limit=5, sort="NAME", reverse=False, q={"name":"j", "zone_type":"PRIMARY"}) +print('get first 5 primary zones with j: %s' % client.get_zones(offset=0, limit=5, sort="NAME", reverse=False, q={"name":"j", "zone_type":"PRIMARY"})) #creating a zone with upload -result = c.create_primary_zone_by_upload(account_name, 'sample.client.me.', '../zone.txt') -print 'create zone via upload: %s' % result +result = client.create_primary_zone_by_upload(account_name, 'sample.client.me.', './zone.txt') +print('create zone via upload: %s' % result) # check the task status while True: - task_status = c.get_task(result['task_id']) - print 'task status: %s ' % c.get_task(result['task_id']) + task_status = client.get_task(result['task_id']) + print('task status: %s ' % client.get_task(result['task_id'])) if task_status['code'] != 'IN_PROCESS': break time.sleep(1) #check all task status -print 'all task status: %s ' % c.get_all_tasks() +print('all task status: %s ' % client.get_all_tasks()) #delete task status -print 'delete task status: %s ' % c.clear_task(result['task_id']) +print('delete task status: %s ' % client.clear_task(result['task_id'])) #export zonefile in bind format -print('export zone: %s ' % c.export_zone('sample.client.me.')) +print(('export zone: %s ' % client.export_zone('sample.client.me.'))) #delete the zone -print 'delete zone: %s ' % c.delete_zone('sample.client.me.') +print('delete zone: %s ' % client.delete_zone('sample.client.me.')) diff --git a/test.py b/test.py index d49e69b..4b058e3 100644 --- a/test.py +++ b/test.py @@ -8,80 +8,80 @@ test_zone_name='udns-python-rest-client-test.com.' -c = ultra_rest_client.RestApiClient(username, password, False, domain) -print('version %s' % c.version()) -print('status %s' % c.status()) +c = ultra_rest_client.RestApiClient(username, password, False, False, domain) +print(('version %s' % c.version())) +print(('status %s' % c.status())) account_details = c.get_account_details() -account_name = account_details[u'accounts'][0][u'accountName'] -print('account name %s' % account_name ) -print('get zone metadata %s' % c.get_zone_metadata_v3(test_zone_name)) -print('get first 5 primary zones with udns: %s' % c.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"name":"udns", "zone_type":"PRIMARY"})) +account_name = account_details['accounts'][0]['accountName'] +print(('account name %s' % account_name )) +print(('get zone metadata %s' % c.get_zone_metadata_v3(test_zone_name))) +print(('get first 5 primary zones with udns: %s' % c.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"name":"udns", "zone_type":"PRIMARY"}))) print('\n') -print('get first 5 secondary zones: %s' % c.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"zone_type":"SECONDARY"})) +print(('get first 5 secondary zones: %s' % c.get_zones_v3(limit=5, sort="NAME", reverse=False, q={"zone_type":"SECONDARY"}))) print('\n') -print('get all zones with 25 zones per page. First page is returned by default: %s' % c.get_zones_v3(limit=25, q={"zone_status":"ALL"})) +print(('get all zones with 25 zones per page. First page is returned by default: %s' % c.get_zones_v3(limit=25, q={"zone_status":"ALL"}))) print('\n') -print('get next page of zones with 25 zones per page. Cursor returned by above request is used: %s' % c.get_zones_v3(limit=25, cursor='MTY1NDUxNTM4MTczMXNiLXN1YnBvb2wtY3J1ZC0xLmNvbS46TkVYVA==', q={"zone_status":"ALL"})) -print('create primary zone result %s' % c.create_primary_zone(account_name, "foo.invalid.")) -print('get zone metadata %s' % c.get_zone_metadata("foo.invalid.")) -print('delete zone %s ' % c.delete_zone("foo.invalid.")) -print('zone name %s ' % test_zone_name) -print('get_rrsets %s ' % c.get_rrsets(test_zone_name)) -print('create_rrset %s ' % c.create_rrset(test_zone_name, "A", "foo", 300, "1.2.3.4")) -print('get_rrsets %s ' % c.get_rrsets(test_zone_name)) -print('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A")) -print('edit_rrset %s ' % c.edit_rrset(test_zone_name, "A", "foo", 100, ["10.20.30.40"])) -print('get_rrsets %s ' % c.get_rrsets(test_zone_name)) -print('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A")) -print('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo")) -print('delete_rrset %s ' % c.delete_rrset(test_zone_name, "A", "foo")) -print('get_rrsets %s ' % c.get_rrsets(test_zone_name)) -print('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A")) -print('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo")) -print('batch delete %s ' % c.batch([ +print(('get next page of zones with 25 zones per page. Cursor returned by above request is used: %s' % c.get_zones_v3(limit=25, cursor='MTY1NDUxNTM4MTczMXNiLXN1YnBvb2wtY3J1ZC0xLmNvbS46TkVYVA==', q={"zone_status":"ALL"}))) +print(('create primary zone result %s' % c.create_primary_zone(account_name, "foo.invalid."))) +print(('get zone metadata %s' % c.get_zone_metadata("foo.invalid."))) +print(('delete zone %s ' % c.delete_zone("foo.invalid."))) +print(('zone name %s ' % test_zone_name)) +print(('get_rrsets %s ' % c.get_rrsets(test_zone_name))) +print(('create_rrset %s ' % c.create_rrset(test_zone_name, "A", "foo", 300, "1.2.3.4"))) +print(('get_rrsets %s ' % c.get_rrsets(test_zone_name))) +print(('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A"))) +print(('edit_rrset %s ' % c.edit_rrset(test_zone_name, "A", "foo", 100, ["10.20.30.40"]))) +print(('get_rrsets %s ' % c.get_rrsets(test_zone_name))) +print(('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A"))) +print(('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo"))) +print(('delete_rrset %s ' % c.delete_rrset(test_zone_name, "A", "foo"))) +print(('get_rrsets %s ' % c.get_rrsets(test_zone_name))) +print(('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A"))) +print(('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo"))) +print(('batch delete %s ' % c.batch([ {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo2'}, {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo3'}, -])) +]))) -print('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A")) -print('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo2")) -print('batch create %s ' % c.batch([ +print(('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A"))) +print(('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo2"))) +print(('batch create %s ' % c.batch([ {'method': 'POST', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo2', 'body': {'ttl': 100, 'rdata': ['2.4.6.8']}}, {'method': 'POST', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo3', 'body': {'ttl': 100, 'rdata': ['20.40.60.80']}}, -])) -print('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A")) -print('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo2")) -print('batch delete %s ' % c.batch([ +]))) +print(('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A"))) +print(('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo2"))) +print(('batch delete %s ' % c.batch([ {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo2'}, {'method': 'DELETE', 'uri': '/v1/zones/' + test_zone_name + '/rrsets/A/foo3'}, -])) -print('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A")) -print('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo2")) +]))) +print(('get_rrsets_by_type %s ' % c.get_rrsets_by_type(test_zone_name, "A"))) +print(('get_rrsets_by_type_owner %s ' % c.get_rrsets_by_type_owner(test_zone_name, "A", "foo2"))) #getting zones with q, sort, offset, limit -print('get first 5 primary zones with j: %s' % c.get_zones(offset=0, limit=5, sort="NAME", reverse=False, q={"name":"j", "zone_type":"PRIMARY"})) +print(('get first 5 primary zones with j: %s' % c.get_zones(offset=0, limit=5, sort="NAME", reverse=False, q={"name":"j", "zone_type":"PRIMARY"}))) #creating a zone with upload result = c.create_primary_zone_by_upload(account_name, 'sample.client.me.', './zone.txt') -print('create zone via upload: %s' % result) +print(('create zone via upload: %s' % result)) # check the task status while True: task_status = c.get_task(result['task_id']) - print('task status: %s ' % c.get_task(result['task_id'])) + print(('task status: %s ' % c.get_task(result['task_id']))) if task_status['code'] != 'IN_PROCESS': break time.sleep(1) #check all task status -print('all task status: %s ' % c.get_all_tasks()) +print(('all task status: %s ' % c.get_all_tasks())) #delete task status -print('delete task status: %s ' % c.clear_task(result['task_id'])) +print(('delete task status: %s ' % c.clear_task(result['task_id']))) #export zonefile in bind format -print('export zone: %s ' % c.export_zone('sample.client.me.')) +print(('export zone: %s ' % c.export_zone('sample.client.me.'))) #delete the zone -print('delete zone: %s ' % c.delete_zone('sample.client.me.')) +print(('delete zone: %s ' % c.delete_zone('sample.client.me.'))) diff --git a/ultra_rest_client/about.py b/ultra_rest_client/about.py index 5ebf176..ef953a5 100644 --- a/ultra_rest_client/about.py +++ b/ultra_rest_client/about.py @@ -1,5 +1,5 @@ -__version__="0.0.0" -__prefix__="udns-python-rest-client-" +VERSION = "0.0.0" +PREFIX = "udns-python-rest-client-" -def get_client_user_agent() : - return __prefix__+__version__ \ No newline at end of file +def get_client_user_agent(): + return f"{PREFIX}{VERSION}" \ No newline at end of file diff --git a/ultra_rest_client/connection.py b/ultra_rest_client/connection.py index 33c2de0..cb34f8a 100644 --- a/ultra_rest_client/connection.py +++ b/ultra_rest_client/connection.py @@ -11,32 +11,30 @@ class AuthError(Exception): def __init__(self, message): - self.message = message + super().__init__(message) def __str__(self): - return repr(self.message) + return str(self.message) class RestError(Exception): def __init__(self, message): - self.message = message + super().__init__(message) def __str__(self): - return repr(self.message) + return str(self.message) class RestApiConnection: - def __init__(self, use_http=False, host="restapi.ultradns.com"): + def __init__(self, use_http=False, host="restapi.ultradns.com", access_token: str = "", refresh_token: str = ""): self.use_http = use_http self.host = host - self.access_token = "" - self.refresh_token = "" + self.access_token = access_token + self.refresh_token = refresh_token def _get_connection(self): - if self.use_http: - return "http://"+ self.host - else: - return "https://"+ self.host + protocol = "http://" if self.use_http else "https://" + return protocol + self.host # Authentication # We need the ability to take in a username and password and get @@ -44,50 +42,56 @@ def _get_connection(self): # to an invalid auth token, refresh must be automatically invoked, the # new auth token and refresh token stored, and the request tried again # with the new auth token. + def auth(self, username, password): - h1 = self._get_connection() - payload = {"grant_type":"password", "username":username, "password":password} - r1 = requests.post(h1+"/v1/authorization/token",data=payload) - if r1.status_code == requests.codes.OK: - json_body = r1.json() - self.access_token = json_body[u'accessToken'] - self.refresh_token = json_body[u'refreshToken'] + host = self._get_connection() + payload = { + "grant_type":"password", + "username":username, + "password":password + } + response = requests.post(f"{host}/v1/authorization/token", data=payload) + if response.status_code == requests.codes.OK: + json_body = response.json() + self.access_token = json_body.get('accessToken') + self.refresh_token = json_body.get('refreshToken') else: - raise AuthError(r1.json()) + raise AuthError(response.json()) def _refresh(self): - h1 = self._get_connection() - payload = {"grant_type":"refresh_token","refresh_token":self.refresh_token} - r1 = requests.post(h1+"/v1/authorization/token", data=payload) - if r1.status_code == requests.codes.OK: - json_body = r1.json() - self.access_token = json_body[u'accessToken'] - self.refresh_token = json_body[u'refreshToken'] + host = self._get_connection() + payload = { + "grant_type":"refresh_token", + "refresh_token":self.refresh_token + } + response = requests.post(f"{host}/v1/authorization/token", data=payload) + if response.status_code == requests.codes.OK: + json_body = response.json() + self.access_token = json_body.get('accessToken') + self.refresh_token = json_body.get('refreshToken') else: - raise AuthError(r1.json()) + raise AuthError(response.json()) def _build_headers(self, content_type): - result = {"Accept": "application/json", - "Authorization": "Bearer " + self.access_token, - "User-Agent": get_client_user_agent()} - if content_type != "": - result["Content-type"] = content_type - return result + headers = { + "Accept": "application/json", + "Authorization": f"Bearer {self.access_token}", + "User-Agent": get_client_user_agent() + } + if content_type: + headers["Content-Type"] = content_type + return headers def get(self, uri, params=None): - if params is None: - params = {} + params = params or {} return self._do_call(uri, "GET", params=params) def post_multi_part(self, uri, files): #use empty string for content type so we don't set it - return self._do_call(uri, "POST", files=files, content_type="") + return self._do_call(uri, "POST", files=files, content_type=None) def post(self, uri, json=None): - if json is not None: - return self._do_call(uri, "POST", body=json) - else: - return self._do_call(uri, "POST") + return self._do_call(uri, "POST", body=json) if json is not None else self._do_call(uri, "POST") def put(self, uri, json): return self._do_call(uri, "PUT", body=json) @@ -98,27 +102,29 @@ def patch(self, uri, json): def delete(self, uri): return self._do_call(uri, "DELETE") - def _do_call(self, uri, method, params=None, body=None, retry=True, files=None, content_type = "application/json"): - h1 = self._get_connection() - r1 = requests.request(method, h1+uri, params=params, data=body, headers=self._build_headers(content_type), files=files) - # bad access token = status 400, - # body = {"errorCode":60001,"errorMessage":"invalid_grant:token not found, expired or invalid"} - if r1.status_code == requests.codes.NO_CONTENT: + def _do_call(self, uri, method, params=None, body=None, retry=True, files=None, content_type="application/json"): + host = self._get_connection() + response = requests.request( + method, + host + uri, + params=params, + data=body, + headers=self._build_headers(content_type), + files=files + ) + if response.status_code == requests.codes.NO_CONTENT: return {} # if the content-type is text/plain just return the text - if r1.headers['Content-Type'] == 'text/plain': - return r1.text - json_body = r1.json() + if response.headers.get('Content-Type') == 'text/plain': + return response.text + + json_body = response.json() # if this is a background task, add the task id to the body - if r1.status_code == requests.codes.ACCEPTED: - json_body['task_id'] = r1.headers['x-task-id'] - if type(json_body) is dict: - if retry and u'errorCode' in json_body and json_body[u'errorCode'] == 60001: - self._refresh() - json_body = self._do_call(uri, method, params, body, False) - #disabling error raising for now, because it only happens for batch - #because all other errors are returned in a list, not in a dict - #should have been raising errors for those, too, but haven't been - #elif r1.status_code >= requests.codes.BAD_REQUEST: - # raise RestError(json_body) + if response.status_code == requests.codes.ACCEPTED: + json_body['task_id'] = response.headers.get('x-task-id') + + if isinstance(json_body, dict) and retry and json_body.get('errorCode') == 60001: + self._refresh() + return self._do_call(uri, method, params, body, False) + return json_body diff --git a/ultra_rest_client/ultra_rest_client.py b/ultra_rest_client/ultra_rest_client.py index 21d1cb9..6164f1a 100644 --- a/ultra_rest_client/ultra_rest_client.py +++ b/ultra_rest_client/ultra_rest_client.py @@ -9,20 +9,34 @@ import time class RestApiClient: - def __init__(self, username, password, use_http=False, host="restapi.ultradns.com"): + def __init__(self, bu: str, pr: str = None, use_token: bool = False, use_http: bool =False, host: str = "api.test.ultradns.net"): """Initialize a Rest API Client. Arguments: - username -- The username of the user - password -- The password of the user + bu (str) -- Either username or bearer token based on `use_token` flag. + pr (str, optional) -- Either password or refresh token based on `use_token` flag. Defaults to None. + use_token (bool, optional) -- If True, treats `bu` as bearer token and `pr` as refresh token. Defaults to False. Keyword Arguments: - use_http -- For internal testing purposes only, lets developers use http instead of https. - host -- Allows you to point to a server other than the production server. + use_http (bool, optional) -- For internal testing purposes only, lets developers use http instead of https. + host (str) -- Allows you to point to a server other than the production server. + Raises: + ValueError -- If `pr` is not provided when `use_token` is True. """ - self.rest_api_connection = RestApiConnection(use_http, host) - self.rest_api_connection.auth(username, password) + + if use_token: + self.access_token = bu + self.refresh_token = pr + self.rest_api_connection = RestApiConnection(use_http, host, bu, pr) + if not self.refresh_token: + print( + "Warning: Passing a Bearer token with no refresh token means the client state will expire after an hour.") + else: + if not pr: + raise ValueError("Password is required when providing a username.") + self.rest_api_connection = RestApiConnection(use_http, host) + self.rest_api_connection.auth(bu, pr) # Zones # create a primary zone @@ -34,9 +48,17 @@ def create_primary_zone(self, account_name, zone_name): zone_name -- The name of the zone. It must be unique. """ - zone_properties = {"name": zone_name, "accountName": account_name, "type": "PRIMARY"} - primary_zone_info = {"forceImport": True, "createType": "NEW"} - zone_data = {"properties": zone_properties, "primaryCreateInfo": primary_zone_info} + zone_data = { + "properties": { + "name": zone_name, + "accountName": account_name, + "type": "PRIMARY" + }, + "primaryCreateInfo": { + "forceImport": True, + "createType": "NEW" + } + } return self.rest_api_connection.post("/v1/zones", json.dumps(zone_data)) # create primary zone by file upload @@ -49,11 +71,21 @@ def create_primary_zone_by_upload(self, account_name, zone_name, bind_file): bind_file -- The file to upload. """ - zone_properties = {"name": zone_name, "accountName": account_name, "type": "PRIMARY"} - primary_zone_info = {"forceImport": True, "createType": "UPLOAD"} - zone_data = {"properties": zone_properties, "primaryCreateInfo": primary_zone_info} - files = {'zone': ('', json.dumps(zone_data), 'application/json'), - 'file': ('file', open(bind_file, 'rb'), 'application/octet-stream')} + zone_data = { + "properties": { + "name": zone_name, + "accountName": account_name, + "type": "PRIMARY" + }, + "primaryCreateInfo": { + "forceImport": True, + "createType": "UPLOAD" + } + } + files = { + 'zone': ('', json.dumps(zone_data), 'application/json'), + 'file': ('file', open(bind_file, 'rb'), 'application/octet-stream') + } return self.rest_api_connection.post_multi_part("/v1/zones", files) # create a primary zone using axfr @@ -71,13 +103,24 @@ def create_primary_zone_by_axfr(self, account_name, zone_name, master, tsig_key= key_value -- TSIG key secret. """ - zone_properties = {"name": zone_name, "accountName": account_name, "type": "PRIMARY"} - if tsig_key is not None and key_value is not None: - name_server_info = {"ip": master, "tsigKey": tsig_key, "tsigKeyValue": key_value} - else: - name_server_info = {"ip": master} - primary_zone_info = {"forceImport": True, "createType": "TRANSFER", "nameServer": name_server_info} - zone_data = {"properties": zone_properties, "primaryCreateInfo": primary_zone_info} + zone_properties = { + "name": zone_name, + "accountName": account_name, + "type": "PRIMARY" + } + name_server_info = {"ip": master} + if tsig_key and key_value: + name_server_info.update({"tsigKey": tsig_key, "tsigKeyValue": key_value}) + + primary_zone_info = { + "forceImport": True, + "createType": "TRANSFER", + "nameServer": name_server_info + } + zone_data = { + "properties": zone_properties, + "primaryCreateInfo": primary_zone_info + } return self.rest_api_connection.post("/v1/zones", json.dumps(zone_data)) # create a secondary zone @@ -95,15 +138,27 @@ def create_secondary_zone(self, account_name, zone_name, master, tsig_key=None, key_value -- TSIG key secret. """ - zone_properties = {"name": zone_name, "accountName": account_name, "type": "SECONDARY"} - if tsig_key is not None and key_value is not None: - name_server_info = {"ip": master, "tsigKey": tsig_key, "tsigKeyValue": key_value} - else: - name_server_info = {"ip": master} - name_server_ip_1 = {"nameServerIp1": name_server_info} - name_server_ip_list = {"nameServerIpList": name_server_ip_1} - secondary_zone_info = {"primaryNameServers": name_server_ip_list} - zone_data = {"properties": zone_properties, "secondaryCreateInfo": secondary_zone_info} + zone_properties = { + "name": zone_name, + "accountName": account_name, + "type": "SECONDARY" + } + name_server_info = {"ip": master} + if tsig_key and key_value: + name_server_info.update({"tsigKey": tsig_key, "tsigKeyValue": key_value}) + + name_server_ip_list = { + "nameServerIpList": { + "nameServerIp1": name_server_info + } + } + secondary_zone_info = { + "primaryNameServers": name_server_ip_list + } + zone_data = { + "properties": zone_properties, + "secondaryCreateInfo": secondary_zone_info + } return self.rest_api_connection.post("/v1/zones", json.dumps(zone_data)) # force zone axfr @@ -114,7 +169,7 @@ def force_axfr(self, zone_name): zone_name -- The zone name. The trailing dot is optional. """ - return self.rest_api_connection.post("/v1/zones/" + zone_name + "/transfer") + return self.rest_api_connection.post(f"/v1/zones/{zone_name}/transfer") # convert secondary def convert_zone(self, zone_name): @@ -124,7 +179,7 @@ def convert_zone(self, zone_name): zone_name -- The zone name. The trailing dot is optional. """ - return self.rest_api_connection.post("/v1/zones/" + zone_name + "/convert") + return self.rest_api_connection.post(f"/v1/zones/{zone_name}/convert") # list zones for account def get_zones_of_account(self, account_name, q=None, **kwargs): @@ -150,7 +205,7 @@ def get_zones_of_account(self, account_name, q=None, **kwargs): limit -- The maximum number of rows to be returned. """ - uri = "/v1/accounts/" + account_name + "/zones" + uri = f"/v1/accounts/{account_name}/zones" params = build_params(q, kwargs) return self.rest_api_connection.get(uri, params) @@ -211,7 +266,7 @@ def get_zone_metadata(self, zone_name): zone_name -- The name of the zone being returned. """ - return self.rest_api_connection.get("/v1/zones/" + zone_name) + return self.rest_api_connection.get(f"/v1/zones/{zone_name}") # get zone metadata v3 def get_zone_metadata_v3(self, zone_name): @@ -221,7 +276,7 @@ def get_zone_metadata_v3(self, zone_name): zone_name -- The name of the zone being returned. """ - return self.rest_api_connection.get("/v3/zones/" + zone_name) + return self.rest_api_connection.get(f"/v3/zones/{zone_name}") # delete a zone def delete_zone(self, zone_name): @@ -231,7 +286,8 @@ def delete_zone(self, zone_name): zone_name -- The name of the zone being deleted. """ - return self.rest_api_connection.delete("/v1/zones/" + zone_name) + return self.rest_api_connection.delete(f"/v1/zones/{zone_name}") + # update secondary zone name servers (PATCH) def edit_secondary_name_server(self, zone_name, primary=None, backup=None, second_backup=None): @@ -246,17 +302,17 @@ def edit_secondary_name_server(self, zone_name, primary=None, backup=None, secon second_backup -- The second backup name server. """ - name_server_info = {} - if primary is not None: - name_server_info['nameServerIp1'] = {'ip':primary} - if backup is not None: - name_server_info['nameServerIp2'] = {'ip':backup} - if second_backup is not None: - name_server_info['nameServerIp3'] = {'ip':second_backup} - name_server_ip_list = {"nameServerIpList": name_server_info} - secondary_zone_info = {"primaryNameServers": name_server_ip_list} - zone_data = {"secondaryCreateInfo": secondary_zone_info} - return self.rest_api_connection.patch("/v1/zones/" + zone_name, json.dumps(zone_data)) + name_server_info = { + f'nameServerIp{i+1}': {'ip': ip} for i, ip in enumerate([primary, backup, second_backup]) if ip is not None + } + zone_data = { + "secondaryCreateInfo": { + "primaryNameServers": { + "nameServerIpList": name_server_info + } + } + } + return self.rest_api_connection.patch(f"/v1/zones/{zone_name}", json.dumps(zone_data)) # RRSets # list rrsets for a zone @@ -280,7 +336,7 @@ def get_rrsets(self, zone_name, q=None, **kwargs): limit -- The maximum number of rows to be returned. """ - uri = "/v1/zones/" + zone_name + "/rrsets" + uri = f"/v1/zones/{zone_name}/rrsets" params = build_params(q, kwargs) return self.rest_api_connection.get(uri, params) @@ -308,7 +364,7 @@ def get_rrsets_by_type(self, zone_name, rtype, q=None, **kwargs): limit -- The maximum number of rows to be returned. """ - uri = "/v1/zones/" + zone_name + "/rrsets/" + rtype + uri = f"/v1/zones/{zone_name}/rrsets/{rtype}" params = build_params(q, kwargs) return self.rest_api_connection.get(uri, params) @@ -337,7 +393,7 @@ def get_rrsets_by_type_owner(self, zone_name, rtype, owner_name, q=None, **kwarg limit -- The maximum number of rows to be returned. """ - uri = "/v1/zones/" + zone_name + "/rrsets/" + rtype + "/" + owner_name + uri = f"/v1/zones/{zone_name}/rrsets/{rtype}/{owner_name}" params = build_params(q, kwargs) return self.rest_api_connection.get(uri, params) @@ -358,10 +414,10 @@ def create_rrset(self, zone_name, rtype, owner_name, ttl, rdata): If there are multiple resource records in this RRSet, pass in a list of strings. """ - if type(rdata) is not list: + if not isinstance(rdata, list): rdata = [rdata] rrset = {"ttl": ttl, "rdata": rdata} - return self.rest_api_connection.post("/v1/zones/" + zone_name + "/rrsets/" + rtype + "/" + owner_name, json.dumps(rrset)) + return self.rest_api_connection.post(f"/v1/zones/{zone_name}/rrsets/{rtype}/{owner_name}", json.dumps(rrset)) # edit an rrset (PUT) def edit_rrset(self, zone_name, rtype, owner_name, ttl, rdata, profile=None): @@ -381,12 +437,12 @@ def edit_rrset(self, zone_name, rtype, owner_name, ttl, rdata, profile=None): profile -- The profile info if this is updating a resource pool """ - if type(rdata) is not list: + if not isinstance(rdata, list): rdata = [rdata] rrset = {"ttl": ttl, "rdata": rdata} if profile: rrset["profile"] = profile - uri = "/v1/zones/" + zone_name + "/rrsets/" + rtype + "/" + owner_name + uri = f"/v1/zones/{zone_name}/rrsets/{rtype}/{owner_name}" return self.rest_api_connection.put(uri, json.dumps(rrset)) # edit an rrset's rdata (PATCH) @@ -406,14 +462,14 @@ def edit_rrset_rdata(self, zone_name, rtype, owner_name, rdata, profile=None): profile -- The profile info if this is updating a resource pool """ - if type(rdata) is not list: + if not isinstance(rdata, list): rdata = [rdata] rrset = {"rdata": rdata} method = "patch" if profile: rrset["profile"] = profile method = "put" - uri = "/v1/zones/" + zone_name + "/rrsets/" + rtype + "/" + owner_name + uri = f"/v1/zones/{zone_name}/rrsets/{rtype}/{owner_name}" return getattr(self.rest_api_connection, method)(uri,json.dumps(rrset)) # delete an rrset @@ -429,7 +485,7 @@ def delete_rrset(self, zone_name, rtype, owner_name): If a trailing dot is supplied, the owner name is assumed to be absolute (foo.zonename.com.) """ - return self.rest_api_connection.delete("/v1/zones/" + zone_name + "/rrsets/" + rtype + "/" + owner_name) + return self.rest_api_connection.delete(f"/v1/zones/{zone_name}/rrsets/{rtype}/{owner_name}") # Web Forwards # get web forwards @@ -441,7 +497,7 @@ def get_web_forwards(self, zone_name): the system-generated guid for each object. """ - return self.rest_api_connection.get("/v1/zones/" + zone_name + "/webforwards") + return self.rest_api_connection.get(f"/v1/zones/{zone_name}/webforwards") # create web forward def create_web_forward(self, zone_name, request_to, redirect_to, forward_type): @@ -458,8 +514,13 @@ def create_web_forward(self, zone_name, request_to, redirect_to, forward_type): HTTP_307_REDIRECT """ - web_forward = {"requestTo": request_to, "defaultRedirectTo": redirect_to, "defaultForwardType": forward_type} - return self.rest_api_connection.post("/v1/zones/" + zone_name + "/webforwards", json.dumps(web_forward)) + web_forward = { + "requestTo": request_to, + "defaultRedirectTo": redirect_to, + "defaultForwardType": forward_type + } + uri = f"/v1/zones/{zone_name}/webforwards" + return self.rest_api_connection.post(uri, json.dumps(web_forward)) # delete web forward def delete_web_forward(self, zone_name, guid): @@ -470,7 +531,7 @@ def delete_web_forward(self, zone_name, guid): guid -- The system-generated unique id for the web forward. """ - return self.rest_api_connection.delete("/v1/zones/" + zone_name + "/webforwards/" + guid) + return self.rest_api_connection.delete(f"/v1/zones/{zone_name}/webforwards/{guid}") # Accounts # get account details for user @@ -495,10 +556,10 @@ def get_all_tasks(self): return self.rest_api_connection.get("/v1/tasks") def get_task(self, task_id): - return self.rest_api_connection.get("/v1/tasks/"+task_id) + return self.rest_api_connection.get(f"/v1/tasks/{task_id}") def clear_task(self, task_id): - return self.rest_api_connection.delete("/v1/tasks/"+task_id) + return self.rest_api_connection.delete(f"/v1/tasks/{task_id}") # Batch def batch(self, batch_list): @@ -569,17 +630,21 @@ def batch(self, batch_list): # } def _build_sb_rrset(self, backup_record_list, pool_info, rdata_info, ttl): - rdata = [] - rdata_info_list = [] - for rr in rdata_info: - rdata.append(rr) - rdata_info_list.append(rdata_info[rr]) - profile = {"@context": "http://schemas.ultradns.com/SBPool.jsonschema"} + rdata = list(rdata_info.keys()) + rdata_info_list = list(rdata_info.values()) + + profile = { + "@context": "http://schemas.ultradns.com/SBPool.jsonschema", + "backupRecords": backup_record_list, + "rdataInfo": rdata_info_list + } for p in pool_info: profile[p] = pool_info[p] - profile["backupRecords"] = backup_record_list - profile["rdataInfo"] = rdata_info_list - rrset = {"ttl": ttl, "rdata": rdata, "profile": profile} + rrset = { + "ttl": ttl, + "rdata": rdata, + "profile": profile + } return rrset def create_sb_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, backup_record_list): @@ -600,10 +665,9 @@ def create_sb_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, back rdata - the A or CNAME for the backup record failoverDelay - the time to wait to fail over (optional, defaults to 0) """ - rrset = self._build_sb_rrset(backup_record_list, pool_info, rdata_info, ttl) - return self.rest_api_connection.post("/v1/zones/" + zone_name + "/rrsets/A/" + owner_name, json.dumps(rrset)) - + endpoint = f"/v1/zones/{zone_name}/rrsets/A/{owner_name}" + return self.rest_api_connection.post(endpoint, json=rrset) # Update an SB Pool def edit_sb_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, backup_record_list): @@ -623,20 +687,25 @@ def edit_sb_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, backup failoverDelay - the time to wait to fail over (optional, defaults to 0) """ rrset = self._build_sb_rrset(backup_record_list, pool_info, rdata_info, ttl) - return self.rest_api_connection.put("/v1/zones/" + zone_name + "/rrsets/A/" + owner_name, json.dumps(rrset)) + endpoint = f"/v1/zones/{zone_name}/rrsets/A/{owner_name}" + return self.rest_api_connection.put(endpoint, json=rrset) def _build_tc_rrset(self, backup_record, pool_info, rdata_info, ttl): - rdata = [] - rdata_info_list = [] - for rr in rdata_info: - rdata.append(rr) - rdata_info_list.append(rdata_info[rr]) - profile = {"@context": "http://schemas.ultradns.com/TCPool.jsonschema"} + rdata = list(rdata_info.keys()) + rdata_info_list = list(rdata_info.values()) + + profile = { + "@context": "http://schemas.ultradns.com/TCPool.jsonschema", + "backupRecord": backup_record, + "rdataInfo": rdata_info_list + } for p in pool_info: profile[p] = pool_info[p] - profile["backupRecord"] = backup_record - profile["rdataInfo"] = rdata_info_list - rrset = {"ttl": ttl, "rdata": rdata, "profile": profile} + rrset = { + "ttl": ttl, + "rdata": rdata, + "profile": profile + } return rrset # Create a TC Pool @@ -712,8 +781,8 @@ def create_tc_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, back """ rrset = self._build_tc_rrset(backup_record, pool_info, rdata_info, ttl) - return self.rest_api_connection.post("/v1/zones/" + zone_name + "/rrsets/A/" + owner_name, json.dumps(rrset)) - + endpoint = f"/v1/zones/{zone_name}/rrsets/A/{owner_name}" + return self.rest_api_connection.post(endpoint, json=rrset) # Update an SB Pool def edit_tc_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, backup_record): @@ -733,7 +802,8 @@ def edit_tc_pool(self, zone_name, owner_name, ttl, pool_info, rdata_info, backup failoverDelay - the time to wait to fail over (optional, defaults to 0) """ rrset = self._build_tc_rrset(backup_record, pool_info, rdata_info, ttl) - return self.rest_api_connection.put("/v1/zones/" + zone_name + "/rrsets/A/" + owner_name, json.dumps(rrset)) + endpoint = f"/v1/zones/{zone_name}/rrsets/A/{owner_name}" + return self.rest_api_connection.put(endpoint, json=rrset) # export zone in bind format def export_zone(self, zone_name): @@ -743,25 +813,22 @@ def export_zone(self, zone_name): zone_name -- The name of the zone being returned. A single zone as a string. """ - zonelist = [zone_name] - zonejson=json.dumps({'zoneNames': zonelist}) - status = self.rest_api_connection.post("/v3/zones/export", zonejson) - taskId = status.get('task_id') + zonejson = json.dumps({'zoneNames': [zone_name]}) + status = self.rest_api_connection.post("/v3/zones/export", json=zonejson) + task_id = status.get('task_id') + while True: - task_status = self.rest_api_connection.get("/v1/tasks/"+taskId) + task_status = self.rest_api_connection.get(f"/v1/tasks/{task_id}") if task_status['code'] != 'IN_PROCESS': - break + break time.sleep(1) - result=self.rest_api_connection.get("/v1/tasks/"+taskId+"/result") - self.clear_task(taskId) + + result = self.rest_api_connection.get(f"/v1/tasks/{task_id}/result") + self.clear_task(task_id) return result def build_params(q, args): - params = {} - params.update(args) - if q is not None: - all = [] - for k in q: - all.append("%s:%s" % (k, q[k])) - params['q']= ' '.join(all) + params = args.copy() + if q: + params['q'] = ' '.join(f"{k}:{v}" for k, v in q.items()) return params