Add bearer validation

This commit is contained in:
Matteo Bertucci
2021-01-14 16:08:24 +01:00
parent 8f8503d4f7
commit b8c4a114f6
4 changed files with 53 additions and 19 deletions

View File

@ -1,39 +1,43 @@
import logging import logging
import click import click
from email_validator import EmailNotValidError, validate_email
from .constants import DEFAULT_DELAY
from .utils import parse_duration, validate_bearer from .utils import parse_duration, validate_bearer
DEFAULT_DELAY = "5 minutes" log = logging.getLogger("ddns")
@click.command() @click.command()
@click.option('--delay', '-d', default=DEFAULT_DELAY, show_default=True) @click.option('--delay', '-d', default=DEFAULT_DELAY, show_default=True)
@click.option('--email', '-u', prompt="Enter your Cloudflare Email address") @click.option('--token', '-k', prompt="Enter your Cloudflare Token", hide_input=True, show_envvar=True)
@click.option('--key', '-k', prompt="Enter your Cloudflare Auth key", hide_input=True) @click.option('-v', '--verbose', is_flag=True, default=False)
def start(delay: str, email: str, key: str) -> None: def start(delay: str, token: str, verbose: int) -> None:
"""Main application entrypoint.""" """Main application entrypoint."""
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.DEBUG if verbose else logging.INFO
)
try: try:
duration = parse_duration(delay) duration = parse_duration(delay)
except ValueError as e: except ValueError as e:
logging.error(f"Failed to parse delay: {e}") log.error(f"Failed to parse delay: {e}")
logging.error("Exiting with code 64.") log.error("Exiting with code 64.")
exit(64) exit(64)
try: try:
validate_email(email) log.debug("Validating bearer token.")
except EmailNotValidError:
logging.warning(f"The email address {email} don't seem valid. Do you have a typo?")
try: validate_bearer(token)
validate_bearer(key) except ValueError as e:
except ...: log.error(f"Failed to valid bearer token: {e}")
... log.error("Exiting with code 64.")
exit(64)
else:
log.info("Successfully validated the bearer token.")
# Main entrypoint # Main entrypoint
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
start(auto_envvar_prefix="CF_DDNS") start(auto_envvar_prefix="CF_DDNS")

View File

@ -0,0 +1,7 @@
# App defaults
DEFAULT_DELAY = "5 minutes"
# Endpoints
BASE_ENDPOINT = "https://api.cloudflare.com/client/v4/"
VERIFY_TOKEN = BASE_ENDPOINT + "user/tokens/verify"

View File

@ -1,5 +1,11 @@
import re import re
import requests
from requests import Request
from requests.auth import AuthBase
from .constants import VERIFY_TOKEN
DURATION_REGEX = re.compile( DURATION_REGEX = re.compile(
r"((?P<days>\d+?) ?(days|day|D|d) ?)?" r"((?P<days>\d+?) ?(days|day|D|d) ?)?"
r"((?P<hours>\d+?) ?(hours|hour|H|h) ?)?" r"((?P<hours>\d+?) ?(hours|hour|H|h) ?)?"
@ -37,5 +43,23 @@ def parse_duration(duration: str) -> int:
return duration return duration
def validate_bearer(bearer: str) -> None: class BearerAuth(AuthBase):
... """Bearer based authentication."""
def __init__(self, token: str) -> None:
self.token = token
def __call__(self, r: Request) -> Request:
"""Attach the Authorization header to the request."""
r.headers["Authorization"] = f"Bearer {self.token}"
return r
def validate_bearer(token: str) -> None:
"""Utility method to validate a CF bearer token."""
bearer = BearerAuth(token)
r = requests.get(VERIFY_TOKEN, auth=bearer)
if not r.json()["success"]:
error_message = ' / '.join(error["message"] for error in r.json()["errors"])
raise ValueError(error_message)

View File

@ -1,3 +1,2 @@
requests~=2.25.1 requests~=2.25.1
click8~=8.0.1 click8~=8.0.1
email-validator~=1.1.2