Use a requests session instead of normal requests

This commit is contained in:
Matteo Bertucci
2021-01-30 18:18:40 +01:00
parent d344306a82
commit dc284f7a70
2 changed files with 9 additions and 7 deletions

View File

@ -46,6 +46,8 @@ class ApplicationJob(threading.Thread):
self.raw_domains = raw_domains self.raw_domains = raw_domains
self.raw_delay = raw_delay self.raw_delay = raw_delay
self.session = requests.session()
def launch(self) -> None: def launch(self) -> None:
"""Launch the application by validating arguments and starting the thread.""" """Launch the application by validating arguments and starting the thread."""
self.validate_arguments() self.validate_arguments()
@ -87,9 +89,9 @@ class ApplicationJob(threading.Thread):
for record in self.domains: for record in self.domains:
log.debug(f"Updating record for {record.domain}.") log.debug(f"Updating record for {record.domain}.")
check_status(requests.patch( check_status(self.session.patch(
PATCH_DNS.format(zone_identifier=record.zone, identifier=record.id), PATCH_DNS.format(zone_identifier=record.zone, identifier=record.id),
json={"content": get_ip(record.record_type == 'AAAA')}, json={"content": get_ip(record.record_type == 'AAAA', self.session)},
auth=self.auth auth=self.auth
)) ))
@ -99,8 +101,8 @@ class ApplicationJob(threading.Thread):
"""Parse the domain in `raw_domains` and populate the `domains` array with `Domain` objects.""" """Parse the domain in `raw_domains` and populate the `domains` array with `Domain` objects."""
found_domains = {} found_domains = {}
for zone_json in check_status(requests.get(LIST_ZONES, auth=self.auth)).json()["result"]: for zone_json in check_status(self.session.get(LIST_ZONES, auth=self.auth)).json()["result"]:
for record_json in check_status(requests.get( for record_json in check_status(self.session.get(
LIST_DNS.format(zone_identifier=zone_json["id"]), LIST_DNS.format(zone_identifier=zone_json["id"]),
auth=self.auth auth=self.auth
)).json()["result"]: )).json()["result"]:
@ -186,7 +188,7 @@ class ApplicationJob(threading.Thread):
def validate_bearer(self) -> None: def validate_bearer(self) -> None:
"""Utility method to validate a CF bearer token.""" """Utility method to validate a CF bearer token."""
r = requests.get(VERIFY_TOKEN, auth=self.auth) r = self.session.get(VERIFY_TOKEN, auth=self.auth)
if not r.json()["success"]: if not r.json()["success"]:
error_message = ' / '.join(error["message"] for error in r.json()["errors"]) error_message = ' / '.join(error["message"] for error in r.json()["errors"])

View File

@ -54,9 +54,9 @@ class BearerAuth(AuthBase):
return r return r
def get_ip(ipv6: bool) -> str: def get_ip(ipv6: bool, session: requests.Session) -> str:
"""Return the host public IP as detected by ipify.org.""" """Return the host public IP as detected by ipify.org."""
r = check_status(requests.get(IP_API_URL_IPV4 if not ipv6 else IP_API_URL_IPV6)) r = check_status(session.get(IP_API_URL_IPV4 if not ipv6 else IP_API_URL_IPV6))
return r.text return r.text