from configparser import ConfigParser import requests CONFIG_PATH = "/etc/nextcloudregister/config.ini" class NextcloudApiException(Exception): # base exception for unknown or undocumented errors pass class NextcloudApiInvalidInputData(NextcloudApiException): # data geven to the api were not valid pass class NextcloudApiUsernamealreadyExists(NextcloudApiException): # try to create a user with an already existing username pass class NextcloudApiPermissionDenied(NextcloudApiException): # Not enough right to perform that action pass class NextcloudApiUnknownError(NextcloudApiException): # generic error with an hint def __init__(self, hint): self.hint = hint class NextcloudApiNoEmailPassword(NextcloudApiException): # At least one of email or password is mandatory pass class NextcloudApiCannotSendEmail(NextcloudApiException): # invitation email not sent pass class NextcloudApi: """Class that abstract the Nexcloud api to handle the user creation. This class only abstract a small part of the api. :param str config_path: path to the configfile for the application """ def __init__(self, config_path=CONFIG_PATH): self.config_path = config_path @property def config(self): """lazy loading for the configuration to make the class free to instanciate """ if not hasattr(self, "_config"): self._config = ConfigParser() self._config.read(self.config_path) return self._config def count_accounts(self): """Returns the number of accounts already existing on this nextcloud instance :rtype: int :returns the number of accounts :raise: NextcloudApiException if the api was not available """ raw_dict = self._request(endpoint="users") users = raw_dict['ocs']['data'].get('users', []) return len(users) def create_account(self, username, password=None, email=None): """Try to create a new user account using the provided data. :param str username: the username to use for this account :param str password: the password for the new user. optional is an email is given :param str email: the email for the new user. optional if a password is given :raise: ValueError: if the email and the password are empty NextcloudApiException: if the api was not available or an unknow error ocured NextcloudApiInvalidInputData: the given data are somwhat invalid NextcloudApiUsernamealreadyExists: the given username is already used NextcloudApiPermissionDenied: the given user in the options has no right to create a user NextcloudApiUnknownError: un anknow error acured but with an hint NextcloudApiCannotSendEmail: could not send the invitation email """ data = { "userid": username, } if password: data["password"] = password if email: data["email"] = email result = self._request( endpoint="users", data=data, verb="POST", ) status = result['ocs']['meta']['statuscode'] if status == 101: raise NextcloudApiInvalidInputData() elif status == 102: raise NextcloudApiUsernamealreadyExists() elif status == 103: raise NextcloudApiException("103") elif status == 105: raise NextcloudApiPermissionDenied() elif status == 107: raise NextcloudApiUnknownError( hint=result['ocs']['meta']['message'] ) elif status == 108: raise NextcloudApiNoEmailPassword() elif status == 109: NextcloudApiCannotSendEmail() elif status != 100: raise NextcloudApiException(str(status)) def _request(self, endpoint, *, data=None, verb="GET"): """Abstract most of the boilerplate work to make a restapi call to nextcloud. :rtype: dict :returns: the api answer to the request :raise: NextcloudApiException if the api was not available """ if verb.upper() not in ("GET", "POST", "PUT", "DELETE"): raise ValueError("Unknown http verb") url = self._get_url(endpoint) headers = { "OCS-APIRequest": "true", "Accept": "application/json", } if verb.upper() == "POST": headers["Content-Type"] = "application/x-www-form-urlencoded" url = f"{url}?format=json" method = getattr(requests, verb.lower()) kwargs = { "headers": headers, "auth": ( self.config.get("api", "username"), self.config.get("api", "password") ), "data": data, } if self.config.getboolean("api", "use_https"): if not self.config.getboolean("api", "check_certificate"): kwargs["verify"] = False result = method(url, **kwargs) if result.status_code not in (200, 204): raise NextcloudApiException( "Nextcloud api unavailable code: %s" % result.status_code ) return result.json() def _get_url(self, endpoint): """Builds the url to call for the iven endpoint of api :rtype: str :returns: the full url with scheme for the given endpoint """ url = "" # build protocol scheme url += "http" if self.config.getboolean("api", "use_https"): url += "s" url += "://" # add domain url += self.config.get("api", "domain") # add fullpath to endpoint url = "/".join( ( url, self.config.get("api", "base_uri"), endpoint, ) ) return url