diff --git a/certbot_dns_ispconfig/dns_ispconfig.py b/certbot_dns_ispconfig/dns_ispconfig.py index 528139b..4d64c52 100644 --- a/certbot_dns_ispconfig/dns_ispconfig.py +++ b/certbot_dns_ispconfig/dns_ispconfig.py @@ -20,7 +20,7 @@ class Authenticator(dns_common.DNSAuthenticator): This Authenticator uses the ISPConfig Remote REST API to fulfill a dns-01 challenge. """ - description = ('Obtain certificates using a DNS TXT record (if you are using ISPConfig for DNS).') + description = "Obtain certificates using a DNS TXT record (if you are using ISPConfig for DNS)." ttl = 60 def __init__(self, *args, **kwargs): @@ -29,32 +29,44 @@ class Authenticator(dns_common.DNSAuthenticator): @classmethod def add_parser_arguments(cls, add): # pylint: disable=arguments-differ - super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=120) - add('credentials', help='ISPConfig credentials INI file.') + super(Authenticator, cls).add_parser_arguments( + add, default_propagation_seconds=120 + ) + add("credentials", help="ISPConfig credentials INI file.") - def more_info(self): # pylint: disable=missing-docstring,no-self-use - return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \ - 'the ISPConfig Remote REST API.' + def more_info(self): # pylint: disable=missing-docstring,no-self-use + return ( + "This plugin configures a DNS TXT record to respond to a dns-01 challenge using " + + "the ISPConfig Remote REST API." + ) def _setup_credentials(self): self.credentials = self._configure_credentials( - 'credentials', - 'ISPConfig credentials INI file', + "credentials", + "ISPConfig credentials INI file", { - 'endpoint': 'URL of the ISPConfig Remote API.', - 'username': 'Username for ISPConfig Remote API.', - 'password': 'Password for ISPConfig Remote API.' - } + "endpoint": "URL of the ISPConfig Remote API.", + "username": "Username for ISPConfig Remote API.", + "password": "Password for ISPConfig Remote API.", + }, ) def _perform(self, domain, validation_name, validation): - self._get_ispconfig_client().add_txt_record(domain, validation_name, validation, self.ttl) + self._get_ispconfig_client().add_txt_record( + domain, validation_name, validation, self.ttl + ) def _cleanup(self, domain, validation_name, validation): - self._get_ispconfig_client().del_txt_record(domain, validation_name, validation, self.ttl) + self._get_ispconfig_client().del_txt_record( + domain, validation_name, validation, self.ttl + ) def _get_ispconfig_client(self): - return _ISPConfigClient(self.credentials.conf('endpoint'), self.credentials.conf('username'), self.credentials.conf('password')) + return _ISPConfigClient( + self.credentials.conf("endpoint"), + self.credentials.conf("username"), + self.credentials.conf("password"), + ) class _ISPConfigClient(object): @@ -63,7 +75,7 @@ class _ISPConfigClient(object): """ def __init__(self, endpoint, username, password): - logger.debug('creating ispconfigclient') + logger.debug("creating ispconfigclient") self.endpoint = endpoint self.username = username self.password = password @@ -73,39 +85,42 @@ class _ISPConfigClient(object): def _login(self): if self.session_id is not None: return - logger.debug('logging in') - logindata = {'username': self.username, 'password':self.password} - self.session_id = self._api_request('login', logindata) - logger.debug('session id is %s', self.session_id) + logger.debug("logging in") + logindata = {"username": self.username, "password": self.password} + self.session_id = self._api_request("login", logindata) + logger.debug("session id is %s", self.session_id) def _api_request(self, action, data): if self.session_id is not None: - data['session_id'] = self.session_id + data["session_id"] = self.session_id url = self._get_url(action) - resp = self.session.get( - url, - json=data - ) - logger.debug('API REquest to URL: %s', url) + resp = self.session.get(url, json=data) + logger.debug("API REquest to URL: %s", url) if resp.status_code != 200: - raise errors.PluginError('HTTP Error during login {0}'.format(resp.status_code)) + raise errors.PluginError( + "HTTP Error during login {0}".format(resp.status_code) + ) try: result = resp.json() except: - raise errors.PluginError('API response with non JSON: {0}'.format(resp.text)) - if (result['code'] == 'ok'): - return result['response'] - elif (result['code'] == 'remote_fault'): - raise errors.PluginError('API response with an error: {0}'.format(result['message'])) + raise errors.PluginError( + "API response with non JSON: {0}".format(resp.text) + ) + if result["code"] == "ok": + return result["response"] + elif result["code"] == "remote_fault": + raise errors.PluginError( + "API response with an error: {0}".format(result["message"]) + ) else: - raise errors.PluginError('API response unknown {0}'.format(resp.text)) + raise errors.PluginError("API response unknown {0}".format(resp.text)) def _get_url(self, action): - return '{0}?{1}'.format(self.endpoint, action) + return "{0}?{1}".format(self.endpoint, action) def _get_server_id(self, zone_id): - zone = self._api_request('dns_zone_get', {'primary_id': zone_id}) - return zone['server_id'] + zone = self._api_request("dns_zone_get", {"primary_id": zone_id}) + return zone["server_id"] def add_txt_record(self, domain, record_name, record_content, record_ttl): """ @@ -121,20 +136,24 @@ class _ISPConfigClient(object): zone_id, zone_name = self._find_managed_zone_id(domain) if zone_id is None: raise errors.PluginError("Domain not known") - logger.debug('domain found: %s with id: %s', zone_name, zone_id) + logger.debug("domain found: %s with id: %s", zone_name, zone_id) o_record_name = record_name - record_name = record_name.replace(zone_name, '')[:-1] - logger.debug('using record_name: %s from original: %s', record_name, o_record_name) + record_name = record_name.replace(zone_name, "")[:-1] + logger.debug( + "using record_name: %s from original: %s", record_name, o_record_name + ) record = self.get_existing_txt(zone_id, record_name, record_content) if record is not None: - if record['data'] == record_content: - logger.info('already there, id {0}'.format(record['id'])) + if record["data"] == record_content: + logger.info("already there, id {0}".format(record["id"])) return else: - logger.info('update {0}'.format(record['id'])) - self._update_txt_record(zone_id, record['id'], record_name, record_content, record_ttl) + logger.info("update {0}".format(record["id"])) + self._update_txt_record( + zone_id, record["id"], record_name, record_content, record_ttl + ) else: - logger.info('insert new txt record') + logger.info("insert new txt record") self._insert_txt_record(zone_id, record_name, record_content, record_ttl) def del_txt_record(self, domain, record_name, record_content, record_ttl): @@ -151,49 +170,53 @@ class _ISPConfigClient(object): zone_id, zone_name = self._find_managed_zone_id(domain) if zone_id is None: raise errors.PluginError("Domain not known") - logger.debug('domain found: %s with id: %s', zone_name, zone_id) + logger.debug("domain found: %s with id: %s", zone_name, zone_id) o_record_name = record_name - record_name = record_name.replace(zone_name, '')[:-1] - logger.debug('using record_name: %s from original: %s', record_name, o_record_name) + record_name = record_name.replace(zone_name, "")[:-1] + logger.debug( + "using record_name: %s from original: %s", record_name, o_record_name + ) record = self.get_existing_txt(zone_id, record_name, record_content) if record is not None: - if record['data'] == record_content: - logger.debug('delete TXT record: %s', record['id']) - self._delete_txt_record(record['id']) + if record["data"] == record_content: + logger.debug("delete TXT record: %s", record["id"]) + self._delete_txt_record(record["id"]) def _prepare_rr_data(self, zone_id, record_name, record_content, record_ttl): server_id = self._get_server_id(zone_id) data = { - 'client_id': None, - 'rr_type': 'TXT', - 'params':{ - 'server_id': server_id, - 'name': record_name, - 'active': 'Y', - 'type': 'TXT', - 'data': record_content, - 'zone': zone_id, - 'ttl': record_ttl, - 'update_serial':False, + "client_id": None, + "rr_type": "TXT", + "params": { + "server_id": server_id, + "name": record_name, + "active": "Y", + "type": "TXT", + "data": record_content, + "zone": zone_id, + "ttl": record_ttl, + "update_serial": False, }, } return data def _insert_txt_record(self, zone_id, record_name, record_content, record_ttl): data = self._prepare_rr_data(zone_id, record_name, record_content, record_ttl) - logger.debug('insert with data: %s', data) - result = self._api_request('dns_txt_add', data) + logger.debug("insert with data: %s", data) + result = self._api_request("dns_txt_add", data) - def _update_txt_record(self, zone_id, primary_id, record_name, record_content, record_ttl): + def _update_txt_record( + self, zone_id, primary_id, record_name, record_content, record_ttl + ): data = self._prepare_rr_data(zone_id, record_name, record_content, record_ttl) - data['primary_id'] = primary_id - logger.debug('update with data: %s', data) - result = self._api_request('dns_txt_update', data) + data["primary_id"] = primary_id + logger.debug("update with data: %s", data) + result = self._api_request("dns_txt_update", data) def _delete_txt_record(self, primary_id): - data = { 'primary_id': primary_id } - logger.debug('delete with data: %s', data) - result = self._api_request('dns_txt_delete', data) + data = {"primary_id": primary_id} + logger.debug("delete with data: %s", data) + result = self._api_request("dns_txt_delete", data) def _find_managed_zone_id(self, domain): """ @@ -208,10 +231,10 @@ class _ISPConfigClient(object): zone_dns_name_guesses = dns_common.base_domain_name_guesses(domain) for zone_name in zone_dns_name_guesses: - #get the zone id + # get the zone id try: - logger.debug('looking for zone: %s', zone_name) - zone_id = self._api_request('dns_zone_get_id', {'origin': zone_name}) + logger.debug("looking for zone: %s", zone_name) + zone_id = self._api_request("dns_zone_get_id", {"origin": zone_name}) return zone_id, zone_name except errors.PluginError as e: pass @@ -232,9 +255,13 @@ class _ISPConfigClient(object): """ self._login() - read_zone_data = {'zone_id': zone_id} - zone_data = self._api_request('dns_rr_get_all_by_zone', read_zone_data) + read_zone_data = {"zone_id": zone_id} + zone_data = self._api_request("dns_rr_get_all_by_zone", read_zone_data) for entry in zone_data: - if entry['name'] == record_name and entry['type'] == 'TXT' and entry['data'] == record_content: + if ( + entry["name"] == record_name + and entry["type"] == "TXT" + and entry["data"] == record_content + ): return entry return None diff --git a/certbot_dns_ispconfig/dns_ispconfig_test.py b/certbot_dns_ispconfig/dns_ispconfig_test.py index 6f27767..7c5839a 100644 --- a/certbot_dns_ispconfig/dns_ispconfig_test.py +++ b/certbot_dns_ispconfig/dns_ispconfig_test.py @@ -15,26 +15,31 @@ from certbot.tests import util as test_util FAKE_USER = "remoteuser" FAKE_PW = "password" -FAKE_ENDPOINT = 'mock://endpoint' +FAKE_ENDPOINT = "mock://endpoint" -class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest): - +class AuthenticatorTest( + test_util.TempDirTestCase, dns_test_common.BaseAuthenticatorTest +): def setUp(self): super(AuthenticatorTest, self).setUp() from certbot_dns_ispconfig.dns_ispconfig import Authenticator - path = os.path.join(self.tempdir, 'file.ini') - dns_test_common.write({ - "ispconfig_username": FAKE_USER, - "ispconfig_password": FAKE_PW, - "ispconfig_endpoint": FAKE_ENDPOINT, - }, path) + path = os.path.join(self.tempdir, "file.ini") + dns_test_common.write( + { + "ispconfig_username": FAKE_USER, + "ispconfig_password": FAKE_PW, + "ispconfig_endpoint": FAKE_ENDPOINT, + }, + path, + ) super(AuthenticatorTest, self).setUp() - self.config = mock.MagicMock(ispconfig_credentials=path, - ispconfig_propagation_seconds=0) # don't wait during tests + self.config = mock.MagicMock( + ispconfig_credentials=path, ispconfig_propagation_seconds=0 + ) # don't wait during tests self.auth = Authenticator(self.config, "ispconfig") @@ -45,7 +50,11 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic def test_perform(self): self.auth.perform([self.achall]) - expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] + expected = [ + mock.call.add_txt_record( + DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY + ) + ] self.assertEqual(expected, self.mock_client.mock_calls) def test_cleanup(self): @@ -53,7 +62,11 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic self.auth._attempt_cleanup = True self.auth.cleanup([self.achall]) - expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)] + expected = [ + mock.call.del_txt_record( + DOMAIN, "_acme-challenge." + DOMAIN, mock.ANY, mock.ANY + ) + ] self.assertEqual(expected, self.mock_client.mock_calls) @@ -68,14 +81,14 @@ class ISPConfigClientTest(unittest.TestCase): self.adapter = requests_mock.Adapter() self.client = _ISPConfigClient(FAKE_ENDPOINT, FAKE_USER, FAKE_PW) - self.client.session.mount('mock', self.adapter) + self.client.session.mount("mock", self.adapter) - def _register_response(self, ep_id, response=None, message=None, additional_matcher=None, **kwargs): - resp = {"code":"ok", - "message":message, - "response":response} + def _register_response( + self, ep_id, response=None, message=None, additional_matcher=None, **kwargs + ): + resp = {"code": "ok", "message": message, "response": response} if message is not None: - resp['code'] = "remote_failure" + resp["code"] = "remote_failure" def add_matcher(request): data = json.loads(request.text) @@ -83,55 +96,73 @@ class ISPConfigClientTest(unittest.TestCase): if additional_matcher is not None: add_result = additionsal_matcher(request) - return ((('username' in data and data['username'] == FAKE_USER) and - ('username' in data and data['password'] == FAKE_PW)) or - data['session_id'] == 'FAKE_SESSION') and add_result + return ( + ( + ("username" in data and data["username"] == FAKE_USER) + and ("username" in data and data["password"] == FAKE_PW) + ) + or data["session_id"] == "FAKE_SESSION" + ) and add_result self.adapter.register_uri( requests_mock.ANY, - '{0}?{1}'.format(FAKE_ENDPOINT, ep_id), + "{0}?{1}".format(FAKE_ENDPOINT, ep_id), text=json.dumps(resp), additional_matcher=add_matcher, **kwargs ) def test_add_txt_record(self): - if request_mock.__version__ < '1.6.0' - self._register_response('login', response='FAKE_SESSION') - self._register_response('dns_zone_get_id', response=23) - self._register_response('dns_txt_add', response=99) - self._register_response('dns_zone_get', response={'zone_id': 102, 'server_id': 1}) - self._register_response('dns_rr_get_all_by_zone', response=[]) - self.client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self._register_response("login", response="FAKE_SESSION") + self._register_response("dns_zone_get_id", response=23) + self._register_response("dns_txt_add", response=99) + self._register_response( + "dns_zone_get", response={"zone_id": 102, "server_id": 1} + ) + self._register_response("dns_rr_get_all_by_zone", response=[]) + self.client.add_txt_record( + DOMAIN, self.record_name, self.record_content, self.record_ttl + ) def test_add_txt_record_fail_to_find_domain(self): - self._register_response('login', response='FAKE_SESSION') - self._register_response('dns_zone_get_id', message='Not Found') + self._register_response("login", response="FAKE_SESSION") + self._register_response("dns_zone_get_id", message="Not Found") with self.assertRaises(errors.PluginError) as context: - self.client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self.client.add_txt_record( + DOMAIN, self.record_name, self.record_content, self.record_ttl + ) def test_add_txt_record_fail_to_authenticate(self): - self._register_response('login', message='FAILED') + self._register_response("login", message="FAILED") with self.assertRaises(errors.PluginError) as context: - self.client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self.client.add_txt_record( + DOMAIN, self.record_name, self.record_content, self.record_ttl + ) def test_del_txt_record(self): - self._register_response('login', response='FAKE_SESSION') - self._register_response('dns_zone_get_id', response=23) - self._register_response('dns_rr_get_all_by_zone', response=[]) - self._register_response('dns_txt_delete', response='') - self.client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self._register_response("login", response="FAKE_SESSION") + self._register_response("dns_zone_get_id", response=23) + self._register_response("dns_rr_get_all_by_zone", response=[]) + self._register_response("dns_txt_delete", response="") + self.client.del_txt_record( + DOMAIN, self.record_name, self.record_content, self.record_ttl + ) def test_del_txt_record_fail_to_find_domain(self): - self._register_response('login', response='FAKE_SESSION') - self._register_response('dns_zone_get_id', message='Not Found') + self._register_response("login", response="FAKE_SESSION") + self._register_response("dns_zone_get_id", message="Not Found") with self.assertRaises(errors.PluginError) as context: - self.client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self.client.del_txt_record( + DOMAIN, self.record_name, self.record_content, self.record_ttl + ) def test_del_txt_record_fail_to_authenticate(self): - self._register_response('login', message='FAILED') + self._register_response("login", message="FAILED") with self.assertRaises(errors.PluginError) as context: - self.client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl) + self.client.del_txt_record( + DOMAIN, self.record_name, self.record_content, self.record_ttl + ) + if __name__ == "__main__": unittest.main() # pragma: no cover diff --git a/setup.py b/setup.py index 8b229ea..bffd7c6 100644 --- a/setup.py +++ b/setup.py @@ -1,57 +1,63 @@ from setuptools import setup from setuptools import find_packages -version = '0.1.8' +version = "0.2.0" -# Remember to update local-oldest-requirements.txt when changing the minimum -# acme/certbot version. install_requires = [ - 'acme>=0.29.0', - 'certbot>=0.34.0', - 'setuptools', - 'requests', - 'mock', - 'requests-mock', + "acme>=0.29.0", + "certbot>=0.34.0", + "setuptools", + "requests", + "mock", + "requests-mock", ] +# read the contents of your README file +from os import path + +this_directory = path.abspath(path.dirname(__file__)) +with open(path.join(this_directory, "README.rst"), encoding="utf-8") as f: + long_description = f.read() + setup( - name='certbot-dns-ispconfig', + name="certbot-dns-ispconfig", version=version, description="ispconfig DNS Authenticator plugin for Certbot", - url='https://github.com/certbot/certbot', - author="Certbot Project", - author_email='client-dev@letsencrypt.org', - license='Apache License 2.0', - python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*', + long_description=long_description, + long_description_content_type="text/x-rst", + url="https://github.com/m42e/certbot-dns-ispconfig", + author="Matthias Bilger", + author_email="matthias@bilger.info", + license="Apache License 2.0", + python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*", classifiers=[ - 'Development Status :: 3 - Alpha', - 'Environment :: Plugins', - 'Intended Audience :: System Administrators', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Topic :: Internet :: WWW/HTTP', - 'Topic :: Security', - 'Topic :: System :: Installation/Setup', - 'Topic :: System :: Networking', - 'Topic :: System :: Systems Administration', - 'Topic :: Utilities', + "Development Status :: 3 - Alpha", + "Environment :: Plugins", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: Apache Software License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Security", + "Topic :: System :: Installation/Setup", + "Topic :: System :: Networking", + "Topic :: System :: Systems Administration", + "Topic :: Utilities", ], - packages=find_packages(), include_package_data=True, install_requires=install_requires, entry_points={ - 'certbot.plugins': [ - 'dns-ispconfig = certbot_dns_ispconfig.dns_ispconfig:Authenticator', - ], + "certbot.plugins": [ + "dns-ispconfig = certbot_dns_ispconfig.dns_ispconfig:Authenticator" + ] }, - test_suite='certbot_dns_ispconfig', + test_suite="certbot_dns_ispconfig", )