from configparser import ConfigParser import operator from os.path import exists import random from secrets import token_urlsafe from time import time from flask import ( Flask, request, render_template, Response, ) import sqlalchemy import sqlalchemy.orm from nextcloudregister.lib import ( CONFIG_PATH, NextcloudApi, NextcloudApiCannotSendEmail, NextcloudApiException, NextcloudApiInvalidInputData, NextcloudApiNoEmailPassword, NextcloudApiPermissionDenied, NextcloudApiUsernamealreadyExists, NextcloudApiUnknownError, ) config = ConfigParser() config.read(CONFIG_PATH) app = Flask(__name__) base_uri = config.get("web", "base_uri", fallback="") base_uri = base_uri + ("" if base_uri.endswith("/") else "/") db_path = config.get("rules", "db_path", fallback="db.sqlite") Base = sqlalchemy.orm.declarative_base() OPERATORS = ( ('+', operator.add), ('×', operator.mul), ('-', operator.sub), ) class Captcha(Base): __tablename__ = "captcha" token = sqlalchemy.Column(sqlalchemy.String(90), primary_key=True) answer = sqlalchemy.Column(sqlalchemy.String(3)) expiration = sqlalchemy.Column(sqlalchemy.Integer()) def get_session(): engine = sqlalchemy.create_engine(f"sqlite:///{db_path}") if not exists(db_path): Base.metadata.create_all(engine) session_factory = sqlalchemy.orm.sessionmaker(engine) return session_factory() def clean_db(session): session.query(Captcha).filter(Captcha.expiration < int(time())).delete() def generate_captacha(): session = get_session() clean_db(session) first_number = random.randrange(10) second_number = random.randrange(10) op_text, op_func = random.choice(OPERATORS) if op_text == "-" and first_number < second_number: first_number, second_number = second_number, first_number result = op_func(first_number, second_number) captcha = Captcha( token=token_urlsafe(67), answer=str(result), expiration=int(time()) + config.getint("rules", "captcha_timout", fallback=3600), ) session.add(captcha) session.commit() return { "first_number": first_number, "second_number": second_number, "op_text": op_text, "token": captcha.token, } def validate_captcha(token, value): result = False session = get_session() clean_db(session) captcha = session.query(Captcha).filter(Captcha.token == token).one_or_none() if captcha: result = value == captcha.answer session.query(Captcha).filter(Captcha.token == token).delete() session.commit() return result @app.route(base_uri, methods=["GET"]) def form_get(data=None, error=None, info=None, success=False): context = { "base_uri": base_uri, "data": data or {}, "disable": success, "error": error, "eula": config.get("web", "eula", fallback=""), "favicon": config.get("web", "favicon", fallback=""), "info": info, "instance_link": config.get("web", "instance_link", fallback=""), "max_accounts": config.getint("rules", "max_accounts", fallback=0), "org_link": config.get("web", "org_link", fallback=""), "success": success, } context["mandatory_password"] = config.getboolean( "rules", "mandatory_password", fallback=True, ) context["mandatory_email"] = config.getboolean( "rules", "mandatory_email", fallback=False, ) count_accounts = 0 api = NextcloudApi() try: count_accounts = api.count_accounts() except NextcloudApiException: context["count_accounts"] = context["max_accounts"] if count_accounts >= context["max_accounts"] and not success: context["disable"] = True context["error"] = ( "Tous les comptes disponibles sur cette instance ont deja été " "distribués." ) context["count_accounts"] = 0 else: context["count_accounts"] = context["max_accounts"] - count_accounts context |= generate_captacha() return render_template("form.html", **context) @app.route(base_uri, methods=["POST"]) def form_post(): mandatory_email = config.getboolean( "rules", "mandatory_email", fallback=False, ) mandatory_password = config.getboolean( "rules", "mandatory_password", fallback=False, ) # validate captcha if not validate_captcha(token=request.form.get("token"), value=request.form.get("answer")): return form_get( data=request.form, error="Le captcha est invalide." ) # validate mandatory fields if not request.form.get("username"): return form_get( data=request.form, error="Un nom d'utilisateur est obligatoire pour vous inscrire." ) if mandatory_email and not request.form.get("email"): return form_get( data=request.form, error="Une adresse mail est obligatoire pour vous inscrire." ) if mandatory_password and not request.form.get("password1"): return form_get( data=request.form, error="Un mot de passe est obligatoire pour vous inscrire." ) # validate password if request.form.get("password1"): password1 = request.form.get("password1") password2 = request.form.get("password2") if not password1 == password2: return form_get( data=request.form, error="Les mots de passe ne correspondent pas." ) api = NextcloudApi() try: if api.count_accounts() >= config.getint("rules", "max_accounts", fallback=0): return form_get( data=request.form, error="Tous les comptes disponibles sur cette instance ont deja été distribués." ) except NextcloudApiException: return form_get( data=request.form, error="Tous les comptes disponibles sur cette instance ont deja été distribués." ) try: api.create_account( username=request.form.get("username", "").strip(), password=request.form.get("password1"), email=request.form.get("email", "").strip(), ) except NextcloudApiCannotSendEmail: return form_get( data=request.form, error="Impossible d'envoyer un email pour la création du compte." ) except NextcloudApiInvalidInputData: return form_get( data=request.form, error=( "Erreur interne. Merci de contacter l'administrateur de " "l'instance Nextcloud." ) ) except NextcloudApiNoEmailPassword: return form_get( data=request.form, error=( "Une adresse email ou un mot de passe sont obligatoires pour " "créer un compte Nextcloud." ) ) except NextcloudApiPermissionDenied: return form_get( data=request.form, error=( "Erreur interne. Merci de contacter l'administrateur de " "l'instance Nextcloud. (Permission denied)" ) ) except NextcloudApiUsernamealreadyExists: return form_get( data=request.form, error=( "Le nom d'utilisateur que vous avez choisi est déjà utilisé " "sur cette instance Nextcloud." ) ) except NextcloudApiUnknownError as err: return form_get( data=request.form, error=err.hint ) except NextcloudApiException as err: return form_get( data=request.form, error=( "Erreur interne. Merci de contacter l'administrateur de " "l'instance Nextcloud. (%s)" % err ) ) except Exception: return form_get( data=request.form, error=( "Erreur interne. Merci de contacter l'administrateur de " "l'instance Nextcloud." ) ) if config.get("web", "instance_link", fallback=""): return form_get( info=( "L'inscription est un succès, vous allez maintenant être " "redirigé vers votre instance Nextcloud." ), success=True, ) else: return form_get( info="Vous êtes maintenant inscrit sur cette instance Nextcloud", success=True, ) @app.route(base_uri + "style.css") def style(): context = { "color": config.get("web", "color") } css_content = render_template("style.css", **context) return Response(css_content, mimetype="text/css") application = app