mirror of
https://github.com/Akarys42/cloudflare-ddns-docker.git
synced 2025-05-10 08:45:16 -06:00
Switching to an ApplicationJob based app
This commit is contained in:
0
cloudflare_ddns/__init__.py
Normal file
0
cloudflare_ddns/__init__.py
Normal file
30
cloudflare_ddns/__main__.py
Normal file
30
cloudflare_ddns/__main__.py
Normal file
@ -0,0 +1,30 @@
|
||||
import logging
|
||||
|
||||
import click
|
||||
from typing import Tuple
|
||||
|
||||
from cloudflare_ddns.app import ApplicationJob
|
||||
from cloudflare_ddns.constants import DEFAULT_DELAY
|
||||
|
||||
log = logging.getLogger("ddns")
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option('--delay', '-d', default=DEFAULT_DELAY, show_default=True)
|
||||
@click.option('--token', '-k', prompt="Enter your Cloudflare Token", hide_input=True, show_envvar=True)
|
||||
@click.option('-v', '--verbose', is_flag=True, default=False)
|
||||
@click.option('--ipv6/--ipv4', '-6/-4')
|
||||
@click.argument("domain", nargs=-1)
|
||||
def start(delay: str, token: str, verbose: int, domain: Tuple[str], ipv6: bool) -> None:
|
||||
"""Main application entrypoint."""
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
level=logging.DEBUG if verbose else logging.INFO
|
||||
)
|
||||
|
||||
ApplicationJob(delay, token, domain, ipv6).launch()
|
||||
|
||||
|
||||
# Main entrypoint
|
||||
if __name__ == "__main__":
|
||||
start(auto_envvar_prefix="CF_DDNS")
|
67
cloudflare_ddns/app.py
Normal file
67
cloudflare_ddns/app.py
Normal file
@ -0,0 +1,67 @@
|
||||
import logging
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from typing import Tuple
|
||||
|
||||
import requests
|
||||
|
||||
from cloudflare_ddns.constants import VERIFY_TOKEN
|
||||
from cloudflare_ddns.utils import parse_duration, BearerAuth
|
||||
|
||||
log = logging.getLogger("ddns")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Domain:
|
||||
domain: str
|
||||
record_type: str
|
||||
|
||||
|
||||
class ApplicationJob(threading.Thread):
|
||||
def __init__(self, delay: str, token: str, raw_domains: Tuple[str], default_ipv6: bool):
|
||||
super().__init__()
|
||||
|
||||
self.stop_signal = threading.Event()
|
||||
|
||||
self.delay = delay
|
||||
self.auth = BearerAuth(token)
|
||||
self.raw_domains = raw_domains
|
||||
self.default_ipv6 = default_ipv6
|
||||
|
||||
def launch(self) -> None:
|
||||
self.validate_arguments()
|
||||
|
||||
def validate_arguments(self):
|
||||
failed = False
|
||||
|
||||
if not self.raw_domains:
|
||||
log.error("Please provide at least one domain.")
|
||||
failed = True
|
||||
|
||||
try:
|
||||
self.delay = parse_duration(self.delay)
|
||||
except ValueError as e:
|
||||
log.error(f"Failed to parse delay: {e}")
|
||||
failed = True
|
||||
|
||||
try:
|
||||
log.debug("Validating bearer token.")
|
||||
|
||||
self.validate_bearer()
|
||||
except ValueError as e:
|
||||
log.error(f"Failed to validate bearer token: {e}")
|
||||
failed = True
|
||||
else:
|
||||
log.info("Successfully validated the bearer token.")
|
||||
|
||||
if failed:
|
||||
log.info("Exiting with code 64.")
|
||||
exit(64)
|
||||
|
||||
def validate_bearer(self) -> None:
|
||||
"""Utility method to validate a CF bearer token."""
|
||||
r = requests.get(VERIFY_TOKEN, auth=self.auth)
|
||||
|
||||
if not r.json()["success"]:
|
||||
error_message = ' / '.join(error["message"] for error in r.json()["errors"])
|
||||
raise ValueError(error_message)
|
7
cloudflare_ddns/constants.py
Normal file
7
cloudflare_ddns/constants.py
Normal file
@ -0,0 +1,7 @@
|
||||
# App defaults
|
||||
DEFAULT_DELAY = "5 minutes"
|
||||
|
||||
# Endpoints
|
||||
BASE_ENDPOINT = "https://api.cloudflare.com/client/v4/"
|
||||
|
||||
VERIFY_TOKEN = BASE_ENDPOINT + "user/tokens/verify"
|
55
cloudflare_ddns/utils.py
Normal file
55
cloudflare_ddns/utils.py
Normal file
@ -0,0 +1,55 @@
|
||||
import re
|
||||
|
||||
import requests
|
||||
from requests import Request
|
||||
from requests.auth import AuthBase
|
||||
|
||||
from .constants import VERIFY_TOKEN
|
||||
|
||||
DURATION_REGEX = re.compile(
|
||||
r"((?P<days>\d+?) ?(days|day|D|d) ?)?"
|
||||
r"((?P<hours>\d+?) ?(hours|hour|H|h) ?)?"
|
||||
r"((?P<minutes>\d+?) ?(minutes|minute|min|M|m) ?)?"
|
||||
r"((?P<seconds>\d+?) ?(seconds|second|sec|S|s))?"
|
||||
)
|
||||
UNIT_TO_SECONDS = {
|
||||
"days": 86400,
|
||||
"hours": 3600,
|
||||
"minutes": 60,
|
||||
"seconds": 1
|
||||
}
|
||||
|
||||
|
||||
def parse_duration(duration: str) -> int:
|
||||
"""
|
||||
Parameter type for durations.
|
||||
|
||||
The converter supports the following symbols for each unit of time:
|
||||
- days: `d`, `D`, `day`, `days`
|
||||
- hours: `H`, `h`, `hour`, `hours`
|
||||
- minutes: `M`, `m`, `minute`, `minutes`, `min`
|
||||
- seconds: `S`, `s`, `second`, `seconds`, `sec`
|
||||
The units need to be provided in descending order of magnitude.
|
||||
"""
|
||||
match = DURATION_REGEX.fullmatch(duration)
|
||||
if not match:
|
||||
raise ValueError(f"{duration} isn't a valid duration.")
|
||||
|
||||
duration = 0
|
||||
for unit, time_value in match.groupdict().items():
|
||||
if time_value:
|
||||
duration += int(time_value) * UNIT_TO_SECONDS[unit]
|
||||
|
||||
return duration
|
||||
|
||||
|
||||
class BearerAuth(AuthBase):
|
||||
"""Bearer based authentication."""
|
||||
|
||||
def __init__(self, token: str) -> None:
|
||||
self.token = token
|
||||
|
||||
def __call__(self, r: Request) -> Request:
|
||||
"""Attach the Authorization header to the request."""
|
||||
r.headers["Authorization"] = f"Bearer {self.token}"
|
||||
return r
|
Reference in New Issue
Block a user