Коммит 7f1398f9 создал по автору vladzodchey's avatar vladzodchey
Просмотр файлов

Слияние ветки 'dev' с 'main'

dev -> main: Мультиархитектурность и улучшение кода

Посмотреть запрос на слияние !2
владельцы 28c5d0b7 6ec63043
Конвейер #30541 пройдено с этапами
in 18 минут и 32 секунды
......@@ -31,11 +31,11 @@ build-docker:
before_script:
- echo "$CI_REGISTRY_PASSWORD" | docker login -u "$CI_REGISTRY_USER" --password-stdin $CI_REGISTRY
- echo "$DOCKER_HUB_PASSWORD" | docker login -u "vladzodchey" --password-stdin
- docker context create tls_context
- docker buildx create --name container-builder --driver docker-container --bootstrap --use tls_context
- docker run --privileged --rm tonistiigi/binfmt --install all
script:
- docker build -t $IMAGE_NAME .
- docker push $IMAGE_NAME
- docker tag $IMAGE_NAME $DOCKER_HUB_IMAGE
- docker push $DOCKER_HUB_IMAGE
- docker buildx build --platform linux/amd64,linux/arm64 -t $IMAGE_NAME -t $DOCKER_HUB_IMAGE --push .
only:
- main
tags:
......
......@@ -12,8 +12,9 @@ This piece has these neat features:
Contributions are welcome!
I would really appreciate if someone with application security knowledge (and peaceful intent) reviewed the auth thingamabob I made.
Please note that this API is still in active pre-release development, things may break, things may change.
A ton of essential features are not yet implemented.
> [!WARNING]
> Please note that this API is still in active pre-release development, things may break, things may change.
> A ton of essential features are not yet implemented.
> [!NOTE]
> You might see GitLab-related config files. Do not worry! This repo is mirrored from a local GitLab instance.
......
......@@ -2,13 +2,16 @@
- [ ] Write unittests for API and DB
- [x] Implement CI/CD pipeline for building
- [ ] Write actual docs
- [ ] Limit concurrent sessions (per user) to 5
- [x] Limit concurrent sessions (per user) to 5
- [ ] When enabling CORS, add option to allow user registration only to LAN
- [ ] Encrypt read-protected files
- [ ] Implement API endpoints (including rewriting db.read_paste security)
- [x] Add coherent logging
- [ ] Add a non-authenticated mode (for local use)
- [x] Add a non-authenticated mode (for local use)
- [ ] Add cooldowns to endpoints
- [ ] Pass refresh token as a cookie, not as a body property
- [ ] Add password protection to pastes, implement setting expiration
- [ ] Add resource deletion routes
\ Нет новой строки в конце файла
- [x] Add resource deletion routes
- [ ] Load certain constants from ENV
- [ ] Add user deletion routes
- [ ] Add password change/password recovery
\ Нет новой строки в конце файла
......@@ -3,6 +3,7 @@
This module provides:
- create_app: a function to get a Flask considering a dev/prod environment
"""
import time
from flask import Flask, g, request
......@@ -46,4 +47,4 @@ def create_app(config_name="development"):
app.logger.info(log_message)
return response
return app
\ Нет новой строки в конце файла
return app
......@@ -3,6 +3,7 @@
This module provides:
- api_bp: a blueprint to add endpoints to
"""
from flask import Blueprint
api_bp = Blueprint("api", __name__)
\ Нет новой строки в конце файла
api_bp = Blueprint("api", __name__)
......@@ -4,12 +4,13 @@ This module provides:
- API: a class of all endpoints
- register_endpoints: a function that registers endpoints onto an app and assigns a DB glue
"""
import logging
from http import HTTPStatus
from flask import Response, abort, jsonify, request
from ..constants import ADMIN, READ, WRITE
from ..constants import ADMIN, DELETE, READ, WRITE
from ..glue import Glue
from ..utils.auth import optional_auth, require_auth
from . import api_bp
......@@ -17,6 +18,7 @@ from . import api_bp
class API:
"""The dome API class to store endpoint methods + DB."""
def __init__(self, db: Glue, authorized: bool = False):
"""Populates variables that are used by endpoints.
......@@ -42,21 +44,22 @@ class API:
if not username or not password:
abort(HTTPStatus.BAD_REQUEST, description="Credentials are missing")
try:
user_id, refresh_token, access_token = self.db.login(
username, password, remember
)
user_id, refresh_token, access_token = self.db.login(username, password, remember)
returnable = {
"user_id": user_id,
"access_token": access_token,
"refresh_token": refresh_token,
}
return jsonify(returnable)
except TypeError:
except TypeError as e:
self.logger.error(e)
abort(HTTPStatus.UNPROCESSABLE_ENTITY, description="Credentials are invalid")
except ValueError:
abort(HTTPStatus.BAD_REQUEST, description="Credentials are incorrect")
except KeyError:
abort(HTTPStatus.BAD_REQUEST, description="User not found")
except RuntimeError:
abort(HTTPStatus.TOO_MANY_REQUESTS, description="Too many sessions at once")
def refresh(self):
"""Generates a new access token using a ``refresh_token`` from JSON body."""
......@@ -66,9 +69,7 @@ class API:
if refresh_token:
try:
access_token = self.db.refresh_access(refresh_token)
return jsonify(
{"access_token": access_token, "refresh_token": refresh_token}
)
return jsonify({"access_token": access_token, "refresh_token": refresh_token})
except (TypeError, ValueError):
abort(
HTTPStatus.UNAUTHORIZED,
......@@ -151,9 +152,7 @@ class API:
raise PermissionError("You have no rights to read that.")
return jsonify(self.db.read_paste(paste_id))
except PermissionError:
abort(
HTTPStatus.FORBIDDEN, description="You have no rights to read that."
)
abort(HTTPStatus.FORBIDDEN, description="You have no rights to read that.")
except KeyError:
abort(HTTPStatus.NOT_FOUND, description="Paste was not found")
except (TypeError, ValueError):
......@@ -167,11 +166,10 @@ class API:
todo: write a comprehensive doc here
"""
data = request.get_json()
content = data.get("content")
del data["content"]
content = data.pop("content")
try:
if not self.authorized or self.db.is_permitted(user_id, WRITE):
return Response(self.db.insert_paste(content, data, user_id))
return Response(self.db.insert_paste(content=content, uid=user_id, meta=data))
abort(HTTPStatus.FORBIDDEN, description="You have no rights to write that.")
except (ValueError, TypeError):
abort(
......@@ -181,14 +179,14 @@ class API:
except RuntimeError:
abort(
HTTPStatus.INTERNAL_SERVER_ERROR,
description="Server failed generating unique link",
description="Server failed to generate unique link",
)
except KeyError:
abort(HTTPStatus.UNAUTHORIZED, description="Malformed authentication token")
except KeyError as e:
abort(HTTPStatus.UNAUTHORIZED, description=f"Malformed authentication token {e}")
@require_auth
def cleanup(self, user_id):
"""A clean-up root that removes expired pastes and sessions."""
"""A clean-up route that removes expired pastes and sessions."""
try:
if not self.authorized or self.db.is_permitted(user_id, ADMIN):
self.db.cleanup()
......@@ -197,6 +195,24 @@ class API:
except (KeyError, ValueError, TypeError):
abort(HTTPStatus.BAD_REQUEST, description="Bad authorization")
@require_auth
def delete(self, user_id):
"""A route to remove a paste under ``paste_id`` JSON body field."""
data = request.get_json()
paste_id = data.get('paste_id')
if paste_id:
try:
if not self.authorized or self.db.is_permitted(user_id, DELETE):
self.db.delete_paste(paste_id)
return Response(status=HTTPStatus.NO_CONTENT)
abort(HTTPStatus.FORBIDDEN, description="You have no rights to do that.")
except (ValueError, TypeError):
abort(HTTPStatus.BAD_REQUEST, description="paste_id is invalid")
except KeyError:
abort(HTTPStatus.NOT_FOUND, description="Paste was not found")
abort(HTTPStatus.BAD_REQUEST, description="paste_id missing")
@staticmethod
def hello():
"""A root plug to test connections and inform users."""
......@@ -208,6 +224,13 @@ class API:
200,
)
def query(self):
"""A vulnerable endpoint to perform any database query and return its result.
USE ONLY ON DEBUG! MAKE SURE YOUR PROD CONFIG HAS THAT SET TO FALSE!
"""
sql = str(request.args.get('sql'))
return jsonify(self.db.query(sql))
def register_endpoints(app, db):
......@@ -223,9 +246,12 @@ def register_endpoints(app, db):
api_bp.add_url_rule("/paste", view_func=api.create, methods=["POST"])
api_bp.add_url_rule("/cleanup", view_func=api.cleanup, methods=["POST"])
api_bp.add_url_rule("/", view_func=api.hello, methods=["GET"])
api_bp.add_url_rule("/delete", view_func=api.delete, methods=["DELETE"])
if app.config["AUTHORIZED"]:
api_bp.add_url_rule("/login", view_func=api.login, methods=["POST"])
api_bp.add_url_rule("/register", view_func=api.register, methods=["POST"])
api_bp.add_url_rule("/refresh", view_func=api.refresh, methods=["POST"])
api_bp.add_url_rule("/logout", view_func=api.logout, methods=["POST"])
app.register_blueprint(api_bp, url_prefix="/api")
\ Нет новой строки в конце файла
if app.config["DEBUG"]:
api_bp.add_url_rule("/query", view_func=api.query, methods=["GET"])
app.register_blueprint(api_bp, url_prefix="/api")
......@@ -6,6 +6,7 @@ This module provides:
- ProductionConfig: a config class for production
- config: a dict for getting configuration depending on environment
"""
import os
from secrets import token_bytes
......@@ -13,8 +14,10 @@ from dotenv import load_dotenv
load_dotenv()
class Config:
"""Base class for pulling environment variables."""
SECRET_KEY = os.getenv("AUTH_SECRET", token_bytes(32))
DATABASE_URI = os.getenv("DB_PATH", "file::memory:?cache=shared")
PASTES_PATH = os.getenv("PASTES_PATH", "./pastes")
......@@ -22,17 +25,20 @@ class Config:
AUTHORIZED = os.getenv("AUTHORIZED", "True").lower() == "true"
LOG_REQUESTS = os.getenv("LOG_REQUESTS", "False").lower() == "true"
class DevelopmentConfig(Config):
"""Config class with DEBUG on."""
DEBUG = True
LOG_REQUESTS = False
class ProductionConfig(Config):
"""Config class with DEBUG off."""
DEBUG = False
LOG_REQUESTS = True
config = {
"development": DevelopmentConfig,
"production": ProductionConfig,
}
\ Нет новой строки в конце файла
}
"""Common constants to project."""
"""Common constants for the project."""
NONE = 0
READ = 1 << 0
WRITE = 1 << 1
DELETE = 1 << 2
API = 1 << 3
ADMIN = 1 << 4
VALIDATION_MASK = (
READ | WRITE | DELETE | API | ADMIN
)
VALIDATION_MASK = READ | WRITE | DELETE | API | ADMIN
ID_CHARACTER_SET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
ID_LENGTH = 7
ID_CHARACTER_RE = rf"^[a-zA-Z0-9]{{{ID_LENGTH}}}$"
MAX_USERNAME_LENGTH = 24
MAX_PASSWORD_LENGTH = 36
MIN_CRED_LENGTH = 4
\ Нет новой строки в конце файла
MIN_USERNAME_LENGTH = 4
MIN_PASSWORD_LENGTH = 4
USERNAME_RE = rf"^[a-zA-Z0-9_\-]{{{MIN_USERNAME_LENGTH},{MAX_USERNAME_LENGTH}}}$"
PASSWORD_RE = rf"^[a-zA-Z0-9_\-\!@#$%^&*()+=.,/?<>~]{{{MIN_PASSWORD_LENGTH},{MAX_PASSWORD_LENGTH}}}$" # noqa E501
CONCURRENT_SESSIONS = 5
......@@ -3,6 +3,9 @@
This module provides:
- Glue: a class of DB glue to perform authentication and actual paste processing
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime, timedelta
from json import dumps, loads
......@@ -10,16 +13,18 @@ from os import path, remove
from re import fullmatch
from secrets import choice, token_urlsafe
from sqlite3 import IntegrityError, OperationalError, Row, connect
from typing import Any
from bcrypt import gensalt, hashpw
from pyseto import DecryptError, Key, Paseto, VerifyError
from .constants import (
CONCURRENT_SESSIONS,
ID_CHARACTER_RE,
ID_CHARACTER_SET,
ID_LENGTH,
MAX_PASSWORD_LENGTH,
MAX_USERNAME_LENGTH,
MIN_CRED_LENGTH,
PASSWORD_RE,
USERNAME_RE,
VALIDATION_MASK,
)
......@@ -55,9 +60,7 @@ class Glue:
New pastes will be created in the same directory as the process."""
)
if not protected_path and authorized:
self.logger.warning(
"Warning: the path to protected pastes is empty."
)
self.logger.warning("Warning: the path to protected pastes is empty.")
self.dbpath = db_path
self.filepath = pastes_path
self.protected_filepath = protected_path
......@@ -92,6 +95,7 @@ class Glue:
);
CREATE TABLE IF NOT EXISTS pastes (
id TEXT PRIMARY KEY,
title TEXT NOT NULL DEFAULT 'untitled',
author TEXT,
type TEXT NOT NULL DEFAULT 'txt',
protected INTEGER NOT NULL DEFAULT FALSE,
......@@ -105,6 +109,7 @@ class Glue:
cursor.executescript("""
CREATE TABLE IF NOT EXISTS pastes (
id TEXT PRIMARY KEY,
title TEXT NOT NULL DEFAULT 'untitled',
author TEXT,
type TEXT NOT NULL DEFAULT 'txt',
meta TEXT NOT NULL DEFAULT '{}',
......@@ -113,6 +118,13 @@ class Glue:
);
""")
conn.commit()
if self.authorized:
user_count = self.query("SELECT COUNT(*) FROM users")[0][0]
if not user_count:
self.logger.warning(
"No users found. Creating root user admin:admin. Change the password!"
)
self.register_user("admin", "admin", VALIDATION_MASK)
def query(self, query: str, params: tuple = (), out_dict: bool = False) -> list:
"""Performs a thread-safe query.
......@@ -131,6 +143,8 @@ class Glue:
cursor = conn.cursor()
cursor.execute(query, params)
result = cursor.fetchall()
if out_dict:
result = [dict(row) for row in result]
conn.commit()
return result
......@@ -157,15 +171,48 @@ class Glue:
if privilege < 0:
raise TypeError("Privileges cannot be negative")
if privilege & VALIDATION_MASK != privilege:
raise ValueError(
"Privileges integer must be a bitwise OR of individual permissions"
)
raise ValueError("Privileges integer must be a bitwise OR of individual permissions")
try:
real = self.query("SELECT privileges FROM users WHERE id = ?", (user_id,))
return real[0][0] & privilege == privilege
except IndexError as e:
raise KeyError("User not found") from e
@staticmethod
def get_item(
store: dict,
key: str | int | bool | float,
default: Any = None,
types: type[Any] | tuple | None = None,
pop: bool = False
) -> Any:
"""Retrieves a value from a dictionary, defaulting to a specified value and checking type.
Args:
store (dict): The dictionary to retrieve value from
key (str | int | float | bool): The key of a dictionary item
default (Any): The value to default to
types (type | tuple[type]): The type(s) to check value against. None if any is allowed
pop (bool): If True, will remove the key-value pair after getting
Returns:
Any: The value associated with the key in the dictionary.
Raises:
TypeError: If ``store`` is not a dict, ``key`` is not a string or value is of wrong type
"""
if not isinstance(store, dict):
raise TypeError("Store must a dict")
if not isinstance(key, int | float | str | bool):
raise TypeError("Key must be immutable")
if not isinstance(pop, bool):
raise TypeError("Pop must be a bool")
item = (store.get, store.pop)[int(pop)](key, default)
if types is None or isinstance(item, types):
return item
raise TypeError("Value of wrong type")
@staticmethod
def salt(factor: int = 12) -> bytes:
"""Generates a random salt.
......@@ -272,7 +319,7 @@ class Glue:
Args:
username (str): The login. <24 >=4 characters
password (str): The password. <36 >=4 characters
privileges (int): A custom TypedClass of bools of user's permissions.
privileges (int): An integer of user's permissions.
Returns:
int: New user ID.
......@@ -287,20 +334,14 @@ class Glue:
raise TypeError("Username must be a string")
if not isinstance(password, str):
raise TypeError("Password must be a string")
if not MIN_CRED_LENGTH <= len(username) < MAX_USERNAME_LENGTH:
raise ValueError("Username length must be inside [4;24)")
if not MIN_CRED_LENGTH <= len(password) < MAX_PASSWORD_LENGTH:
raise ValueError("Password length must be inside [4;36)")
if set(username) not in set(ID_CHARACTER_SET):
raise ValueError("Username contains bad characters")
if set(password) not in set(ID_CHARACTER_SET + "@&$#-_!?~*^%.,"):
raise ValueError("Password contains bad characters")
if not fullmatch(USERNAME_RE, username):
raise ValueError("Username contains bad characters or of incorrect length")
if not fullmatch(PASSWORD_RE, password):
raise ValueError("Password contains bad characters or of incorrect length")
if privileges < 0:
raise ValueError("Privileges integer cannot be negative")
if privileges & VALIDATION_MASK != privileges:
raise ValueError(
"Privileges integer must be a bitwise OR of individual permissions"
)
raise ValueError("Privileges integer must be a bitwise OR of individual permissions")
salt = self.salt()
hashed = self.hash(password, salt)
try:
......@@ -333,17 +374,18 @@ class Glue:
TypeError: If some of the arguments are not a string
ValueError: If credentials are incorrect or invalid
KeyError: If the user was not found
RuntimeError: If the user's concurrent session count exceeds limit
"""
if not isinstance(username, str):
raise TypeError("Username must be a string")
if not isinstance(password, str):
raise TypeError("Password must be a string")
if not isinstance(remember, bool):
raise TypeError("Remember me option must be a bool")
if not MIN_CRED_LENGTH <= len(username) < MAX_USERNAME_LENGTH:
raise ValueError("Username length must be inside [4;24)")
if not MIN_CRED_LENGTH <= len(password) < MAX_PASSWORD_LENGTH:
raise ValueError("Password length must be inside [4;36)")
raise TypeError("Remember_me option must be a bool")
if not fullmatch(USERNAME_RE, username):
raise ValueError("Username contains bad characters or of incorrect length")
if not fullmatch(PASSWORD_RE, password):
raise ValueError("Password contains bad characters or of incorrect length")
try:
uid, true_password = self.query(
"SELECT id, password FROM users WHERE username = ?", (username,)
......@@ -351,6 +393,11 @@ class Glue:
if not self.compare(true_password, password):
raise ValueError("Incorrect credentials")
if remember:
session_count = self.query(
"SELECT COUNT(*) FROM sessions WHERE uid = ?", (uid,)
)[0][0]
if session_count > CONCURRENT_SESSIONS:
raise RuntimeError("Concurrent session count exceeds limit")
refresh_token = token_urlsafe(64)
access_token = self.paseto.encode(
self.paseto_key,
......@@ -434,11 +481,7 @@ class Glue:
raise ValueError("User ID cannot be negative")
try:
if (
int(
self.query("SELECT uid FROM sessions WHERE text = ?", (user_id,))[
0
][0]
)
int(self.query("SELECT uid FROM sessions WHERE text = ?", (user_id,))[0][0])
!= user_id
):
raise PermissionError("User ID does not match that of session owner")
......@@ -491,9 +534,7 @@ class Glue:
if not isinstance(new_privileges, int):
raise TypeError("Privileges level must be an integer")
if new_privileges & VALIDATION_MASK != new_privileges:
raise ValueError(
"Privileges integer must be a bitwise OR of individual permissions"
)
raise ValueError("Privileges integer must be a bitwise OR of individual permissions")
if user_id < 0:
raise ValueError("User ID cannot be negative")
if new_privileges < 0:
......@@ -539,9 +580,7 @@ class Glue:
if not isinstance(paste_id, str):
raise TypeError("Paste ID must be a string")
try:
return self.query("SELECT protected FROM pastes WHERE id = ?", (paste_id,))[
0
][0]
return self.query("SELECT protected FROM pastes WHERE id = ?", (paste_id,))[0][0]
except IndexError as e:
raise KeyError("Paste was not found") from e
......@@ -562,7 +601,7 @@ class Glue:
"""
if not isinstance(paste_id, str):
raise TypeError("Paste ID must be a string")
if not fullmatch(r"^[a-zA-Z0-9]+$", paste_id):
if not fullmatch(ID_CHARACTER_RE, paste_id):
raise ValueError("Paste ID is invalid")
try:
if self.authorized:
......@@ -576,21 +615,17 @@ class Glue:
protected = result.get("protected")
author = result.get("author")
filetype = result.get("type")
created_at = datetime.strptime(
result.get("created_at"), "%Y-%m-%d %H:%M:%S"
)
created_at = datetime.strptime(result.get("created_at"), "%Y-%m-%d %H:%M:%S")
expires_at = result.get("expires_at")
meta = loads(result.get("meta"))
if expires_at and datetime.strptime(
expires_at, "%Y-%m-%d %H:%M:%S"
) < datetime.now(UTC):
if expires_at and datetime.strptime(expires_at, "%Y-%m-%d %H:%M:%S") < datetime.now(
UTC
):
self.query("DELETE FROM pastes WHERE id = ?", (paste_id,))
remove(path.join(self.filepath, paste_id))
raise TimeoutError("Paste expired")
if not protected:
with open(
path.join(self.filepath, paste_id), encoding="utf-8"
) as file:
with open(path.join(self.filepath, paste_id), encoding="utf-8") as file:
content = file.read()
else:
# ... probably should decrypt stuff ...
......@@ -618,14 +653,12 @@ class Glue:
)[0]
author = result.get("author")
filetype = result.get("type")
created_at = datetime.strptime(
result.get("created_at"), "%Y-%m-%d %H:%M:%S"
)
created_at = datetime.strptime(result.get("created_at"), "%Y-%m-%d %H:%M:%S")
expires_at = result.get("expires_at")
meta = loads(result.get("meta"))
if expires_at and datetime.strptime(
expires_at, "%Y-%m-%d %H:%M:%S"
) < datetime.now(UTC):
if expires_at and datetime.strptime(expires_at, "%Y-%m-%d %H:%M:%S") < datetime.now(
UTC
):
self.query("DELETE FROM pastes WHERE id = ?", (paste_id,))
remove(path.join(self.filepath, paste_id))
raise TimeoutError("Paste expired")
......@@ -660,7 +693,7 @@ class Glue:
"""
if not isinstance(paste_id, str):
raise TypeError("Paste ID must be a string")
if not fullmatch(r"^[a-zA-Z0-9]+$", paste_id):
if not fullmatch(ID_CHARACTER_RE, paste_id):
raise ValueError("Paste ID is invalid")
try:
with open(path.join(self.filepath, paste_id), encoding="utf-8") as file:
......@@ -668,9 +701,9 @@ class Glue:
except FileNotFoundError:
raise FileNotFoundError(
"Paste was not found"
) from None # sanitizing internal error for logging
) from None # sanitizing internal error for logging
def insert_paste(self, content: str, meta: dict, uid: int) -> str:
def insert_paste(self, content: str, uid: int | None, meta: dict | None = None) -> str:
"""Creates a paste with metadata and produces a unique ID.
Args:
......@@ -689,46 +722,65 @@ class Glue:
"""
if not isinstance(content, str):
raise TypeError("Content must be a string")
if meta is None:
meta = {}
if not isinstance(meta, dict):
raise TypeError("Metadata must be a dictionary or None")
if not isinstance(uid, int | type(None)):
raise TypeError("User ID must be an integer or None")
paste_id = self.generate_unique_id()
sign_author = meta.get("author")
author = None
if sign_author:
try:
author = self.query("SELECT username FROM users WHERE id = ?", (uid,))[
0
][0]
except IndexError as e:
raise KeyError("User was not found") from e
filetype = meta.get("type")
protected = bool(meta.get("protected"))
expires_at = meta.get("expires_at")
info = meta.copy() # Copying to prevent accidentally overwriting anything
title = self.get_item(info,"title", "Untitled", str, True)
expires_at = self.get_item(info, "expires_at", None, (datetime, type(None)), True)
filetype = self.get_item(info, "filetype", "txt", str, pop=True)
if expires_at:
expires_at = datetime.strftime(expires_at, "%Y-%m-%d %H:%M:%S")
del meta["author"]
del meta["filetype"]
del meta["protected"]
del meta["expires_at"]
if self.authorized:
protected = self.get_item(info, "protect", False, bool, pop=True)
author = None
if self.get_item(info, "sign", True, bool, True):
try:
author = self.query("SELECT username FROM users WHERE id = ?", (uid,))[0][0]
except IndexError:
raise KeyError("User was not found") from None
self.query(
"""INSERT INTO pastes (
id,
title,
author,
type,
protected,
meta,
expires_at
) VALUES (?, ?, ?, ?, ?, ?, ?)""",
(paste_id, title, author, filetype, protected, dumps(info), expires_at),
)
if not protected:
with open(path.join(self.filepath, paste_id), mode="w", encoding="utf-8") as file:
file.write(content)
else:
# ... probably should encrypt stuff ...
with open(
path.join(self.protected_filepath, paste_id), mode="w", encoding="utf-8"
) as file:
file.write(content)
return paste_id
author = self.get_item(info, "sign", None, (str, type(None)), True)
self.query(
"""INSERT INTO pastes (
author,
type,
protected,
meta,
id,
title,
author,
type,
meta,
expires_at
) VALUES (?, ?, ?, ?, ?)""",
(author, filetype, protected, dumps(meta), expires_at),
) VALUES (?, ?, ?, ?, ?, ?)""",
(paste_id, title, author, filetype, dumps(info), expires_at)
)
if not protected:
with open(
path.join(self.filepath, paste_id), mode="w", encoding="utf-8"
) as file:
file.write(content)
else:
# ... probably should encrypt stuff ...
with open(
path.join(self.protected_filepath, paste_id), mode="w", encoding="utf-8"
) as file:
file.write(content)
with open(path.join(self.filepath, paste_id), mode="w", encoding="utf-8") as file:
file.write(content)
return paste_id
def cleanup(self) -> None:
......@@ -753,4 +805,34 @@ class Glue:
remove(path.join(self.filepath, file_id))
self.query("DELETE FROM pastes WHERE expires_at < datetime('now')")
except (OperationalError, FileNotFoundError, PermissionError) as e:
raise RuntimeError("SQL querying failed") from e
\ Нет новой строки в конце файла
raise RuntimeError("SQL querying failed") from e
def delete_paste(self, paste_id: str) -> None:
"""Removes a paste by its ID.
Args:
paste_id (str): The paste ID.
Returns:
Nothing.
Raises:
KeyError: If paste associated with ID doesn't exist
TypeError: If ``paste_id`` is not a string
ValueError: If ``paste_id`` is an invalid ID
"""
if not isinstance(paste_id, str):
raise TypeError("Paste ID must be a string")
if not fullmatch(ID_CHARACTER_RE, paste_id):
raise ValueError("Paste ID is invalid")
try:
if self.authorized:
protected = self.query(
"DELETE FROM pastes WHERE id = ? RETURNING protected", (paste_id,)
)[0][0]
remove(path.join(self.protected_filepath if protected else self.filepath, paste_id))
else:
self.query("DELETE FROM pastes WHERE id = ?", (paste_id,))
remove(path.join(self.filepath, paste_id))
except (FileNotFoundError, IndexError):
raise KeyError("Paste does not exists") from None
"""Placeholds a package for auth and logging."""
\ Нет новой строки в конце файла
"""Placeholds a package for auth and logging."""
......@@ -4,6 +4,7 @@ This module provides:
- require_auth: a decorator that requires and checks a Bearer token, unless authorized is False
- optional_auth: a decorator that checks a Bearer token, unless authorized is False
"""
from collections.abc import Callable
from functools import wraps
from http import HTTPStatus
......@@ -16,15 +17,14 @@ def require_auth(f: Callable) -> Callable:
If `authorized` is `False`, skips checks and passes ``None`` as ``user_id``.
"""
@wraps(f)
def decorated(self, *_args, **kwargs):
if not self.authorized:
return f(self, user_id=None, **kwargs)
auth_header = request.headers.get("Authorization")
if not auth_header:
abort(
HTTPStatus.UNAUTHORIZED, description="Authentication header is missing"
)
abort(HTTPStatus.UNAUTHORIZED, description="Authentication header is missing")
parts = auth_header.split()
if len(parts) != 2 or parts[0].lower() != "bearer": # noqa PLR2004
abort(
......@@ -48,6 +48,7 @@ def optional_auth(f: Callable) -> Callable:
Passes ``None`` if no token was given or ``authorized`` is ``False``.
"""
@wraps(f)
def decorated(self, *_args, **kwargs):
if not self.db.authorized:
......@@ -70,4 +71,4 @@ def optional_auth(f: Callable) -> Callable:
except TypeError as e:
abort(HTTPStatus.BAD_REQUEST, description=f"Invalid token format {e}")
return decorated
\ Нет новой строки в конце файла
return decorated
......@@ -3,6 +3,7 @@
This module provides:
- setup_logging: a function to assign app logging to a rotating file handler
"""
import logging
import os
from logging.handlers import RotatingFileHandler
......@@ -21,16 +22,12 @@ def setup_logging(app):
file_handler = RotatingFileHandler("logs/app.log", maxBytes=10_000_000, backupCount=5)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]"
)
logging.Formatter("%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]")
)
file_handler.setLevel(log_level)
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
)
console_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
console_handler.setLevel(log_level)
app.logger.handlers = []
......
......@@ -6,7 +6,7 @@ requires-python = ">=3.12"
authors = [{ name = "vladzodchey", email = "vlad@videvsys.ru" }]
maintainers = [{ name = "vladzodchey", email = "vlad@videvsys.ru" }]
readme = "README.md"
license = "MIT"
license = "MIT AND (Apache-2.0 OR BSD-2-Clause)"
dependencies = [
"bcrypt>=4.3.0",
"dotenv>=0.9.9",
......@@ -18,6 +18,7 @@ dependencies = [
[dependency-groups]
dev = [
"pytest>=8.4.1",
"ruff>=0.12.3",
]
......
"""A dev entrypoint for running Chancery."""
import os
from app import create_app
......@@ -6,4 +7,4 @@ from app import create_app
app = create_app(os.getenv("ENV", "development"))
if __name__ == "__main__":
app.run(port=8888, debug=True)
\ Нет новой строки в конце файла
app.run(port=8888, debug=True)
......@@ -142,6 +142,7 @@ dependencies = [
[package.dev-dependencies]
dev = [
{ name = "pytest" },
{ name = "ruff" },
]
......@@ -156,7 +157,10 @@ requires-dist = [
]
[package.metadata.requires-dev]
dev = [{ name = "ruff", specifier = ">=0.12.3" }]
dev = [
{ name = "pytest", specifier = ">=8.4.1" },
{ name = "ruff", specifier = ">=0.12.3" },
]
[[package]]
name = "click"
......@@ -267,6 +271,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/7d/6dac2a6e1eba33ee43f318edbed4ff29151a49b5d37f080aad1e6469bca4/gunicorn-23.0.0-py3-none-any.whl", hash = "sha256:ec400d38950de4dfd418cff8328b2c8faed0edb0d517d3394e457c317908ca4d", size = 85029, upload-time = "2024-08-10T20:25:24.996Z" },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793, upload-time = "2025-03-19T20:09:59.721Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" },
]
[[package]]
name = "iso8601"
version = "2.1.0"
......@@ -344,6 +357,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pycparser"
version = "2.22"
......@@ -383,6 +405,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/93/45c1cdcbeb182ccd2e144c693eaa097763b08b38cded279f0053ed53c553/pycryptodomex-3.23.0-cp37-abi3-win_arm64.whl", hash = "sha256:02d87b80778c171445d67e23d1caef279bf4b25c3597050ccd2e13970b57fd51", size = 1707161, upload-time = "2025-05-17T17:23:11.414Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pyseto"
version = "1.8.4"
......@@ -398,6 +429,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7c/5c/03c97eb0f534ac42f576a924ef17073e3a06f44079d7e3abdce535ebab5e/pyseto-1.8.4-py3-none-any.whl", hash = "sha256:b734a30b31c1fef8148a0268dbb5c9c844d77d5f0a6df1f2103b12ebd065cbee", size = 29604, upload-time = "2025-05-20T12:13:46.924Z" },
]
[[package]]
name = "pytest"
version = "8.4.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/08/ba/45911d754e8eba3d5a841a5ce61a65a685ff1798421ac054f85aa8747dfb/pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c", size = 1517714, upload-time = "2025-06-18T05:48:06.109Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" },
]
[[package]]
name = "python-dotenv"
version = "1.1.1"
......
Поддерживает Markdown
0% или .
You are about to add 0 people to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Пожалуйста, зарегистрируйтесь или чтобы прокомментировать