Source code for modio.client

"""The Client object is the base class from which all the requests are made,
this is where you can get your  games, authentify and get the models for
your authenticated user.
"""
import asyncio
import datetime
import logging
import math
import time
from typing import Optional
import warnings
import aiohttp
import requests

from modio.utils import async_ratelimit_retry, ratelimit_retry

from .errors import modioException
from .entities import Event, Message, ModFile, Rating, User
from .enums import TargetPlatform, TargetPortal
from .objects import Pagination, Returned, Filter
from .game import Game
from .mod import Mod

MAX_TRIES = 2

class Connection:
    """Class handling under the hood requests and ratelimits."""

    def __init__(self, api_key, access_token, lang, version, test, platform, portal, ratelimit_max_sleep):
        self.test = test
        self.version = version
        self.access_token = access_token
        self.api_key = api_key
        self.lang = lang
        self.platform = platform
        self.portal = portal

        self.rate_limit = None
        self.rate_remain = None
        self.retry_after = 0
        self.ratelimit_max_sleep = ratelimit_max_sleep

        self.session = requests.Session()
        self._async_session = None

    @property
    def async_session(self):
        if self._async_session is None:
            raise AttributeError("No async session found, did you forget to use Client.start?")

        return self._async_session

    @async_session.setter
    def async_session(self, session):
        self._async_session = session

    @property
    def _base_path(self):
        if self.test:
            return f"https://api.test.mod.io/{self.version}"

        return f"https://api.mod.io/{self.version}"

    def __repr__(self):
        return f"<Connection retry_after={self.retry_after}>"

    async def close(self):
        """Close session"""
        await self.async_session.close()

    async def start(self):
        """Start session"""
        self.async_session = aiohttp.ClientSession()

    async def is_ratelimited(self):
        return self.retry_after <= self.ratelimit_max_sleep

    def enforce_ratelimit(self):
        if self.is_ratelimited():
            logging.info("Ratelimited, sleeping for %s seconds", self.retry_after)
            time.sleep(self.retry_after)

    async def async_enforce_ratelimit(self):
        if self.is_ratelimited():
            logging.info("Ratelimited, sleeping for %s seconds", self.retry_after)
            await asyncio.sleep(self.retry_after)

    def _error_check(self, resp, request_json):
        """Updates the rate-limit attributes and check validity of the request."""
        self.retry_after = int(resp.headers.get("retry-after", "0"))
        code = getattr(resp, "status_code", getattr(resp, "status", None))

        if code == 204:
            return resp

        if "error" in request_json:
            error_code = request_json["error"]["code"]
            msg = request_json["error"]["message"]
            ref = request_json["error"]["error_ref"]
            errors = request_json["error"].get("errors", {})

            raise modioException(msg, error_code, ref, errors)

        return request_json

    def _define_headers(self, h_type):
        if h_type == 0:
            # regular O auth 2 header when submitting data
            headers = {
                "Authorization": "Bearer " + self.access_token,
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": "application/json",
                "Accept-Language": self.lang,
            }
        elif h_type == 1:
            # o auth 2 header for submitting multipart/data, had to remove Content-Type
            # because it is already added by the requests lib when defining a files parameter
            headers = {
                "Authorization": "Bearer " + self.access_token,
                "Accept": "application/json",
                "Accept-Language": self.lang,
            }
        elif h_type == 2:
            # header to use when making calls using the api key, the key itself is added in
            # the parameters in the methods below
            headers = {
                "Accept": "application/json",
                "Accept-Language": self.lang,
                "Content-Type": "application/x-www-form-urlencoded",
            }

        if self.platform is not None:
            headers["X-Modio-Platform"] = self.platform.value

        if self.portal is not None:
            headers["X-Modio-Portal"] = self.portal.value

        return headers

    def _post_process(self, resp):
        try:
            resp_json = resp.json()
        except requests.JSONDecodeError:
            resp_json = {}

        try:
            data = self._error_check(resp, resp_json)
        except modioException as e:
            if e.code == 429:
                self.enforce_ratelimit()

            raise e

        return data

    @ratelimit_retry(MAX_TRIES)
    def get_request(self, url, *, h_type=0, **fields):
        filters = fields.pop("filters", None)
        filters = (filters or Filter()).get_dict()

        extra = {**fields, **filters}

        if not self.access_token:
            extra["api_key"] = self.api_key
            h_type = 2

        resp = self.session.get(self._base_path + url, headers=self._define_headers(h_type), params=extra)
        return self._post_process(resp)

    @ratelimit_retry(MAX_TRIES)
    def post_request(self, url, *, h_type=0, **fields):
        resp = self.session.post(self._base_path + url, headers=self._define_headers(h_type), **fields)
        return self._post_process(resp)

    @ratelimit_retry(MAX_TRIES)
    def put_request(self, url, *, h_type=0, **fields):
        resp = self.session.put(self._base_path + url, headers=self._define_headers(h_type), **fields)
        return self._post_process(resp)

    @ratelimit_retry(MAX_TRIES)
    def delete_request(self, url, *, h_type=0, **fields):
        resp = self.session.delete(self._base_path + url, headers=self._define_headers(h_type), **fields)
        return self._post_process(resp)

    async def _async_post_process(self, resp):
        try:
            resp_json = await resp.json()
        except aiohttp.ContentTypeError:
            resp_json = {}

        try:
            data = self._error_check(resp, resp_json)
        except modioException as e:
            if e.code == 429:
                await self.async_enforce_ratelimit()

            raise e

        return data

    @async_ratelimit_retry(MAX_TRIES)
    async def async_get_request(self, url, *, h_type=0, **fields):
        filters = fields.pop("filters", None)
        filters = (filters or Filter()).get_dict()

        extra = {**fields, **filters}

        if not self.access_token:
            extra["api_key"] = self.api_key
            h_type = 2

        async with self.async_session.get(
            self._base_path + url, headers=self._define_headers(h_type), params=extra
        ) as resp:
            return await self._async_post_process(resp)

    @async_ratelimit_retry(MAX_TRIES)
    async def async_post_request(self, url, *, h_type=0, **fields):
        files = fields.pop("files", {})
        data = fields.pop("data", {})

        form = aiohttp.FormData()
        for key, value in data.items():
            if value is None:
                continue

            form.add_field(key, str(value))

        for key, value in files.items():
            if value is None:
                continue

            if isinstance(value, tuple):
                form.add_field(key, value[1], filename=value[0], content_type="multipart/form-data")
            else:
                form.add_field(key, value, content_type="multipart/form-data")

        async with self.async_session.post(
            self._base_path + url, headers=self._define_headers(h_type), data=form
        ) as resp:
            return await self._async_post_process(resp)

    @async_ratelimit_retry(MAX_TRIES)
    async def async_put_request(self, url, *, h_type=0, **fields):
        async with self.async_session.put(
            self._base_path + url, headers=self._define_headers(h_type), **fields
        ) as resp:
            return await self._async_post_process(resp)

    @async_ratelimit_retry(MAX_TRIES)
    async def async_delete_request(self, url, *, h_type=0, **fields):
        async with self.async_session.delete(
            self._base_path + url, headers=self._define_headers(h_type), **fields
        ) as resp:
            return await self._async_post_process(resp)


[docs]class Client: """Represents an authenticated client to make requests to the mod.io API with. If you desire to make aysnc requests you must call :ref:`Client.start` before making any async request. Parameters ----------- api_key : Optional[str] The api key that will be used to authenticate the bot while it makes most of its GET requests. This can be generated on the mod.io website. Optional if an access token is supplied. access_token : Optional[str] The OAuth 2 token that will be used to make more complex GET requests and to make POST requests. This can either be generated using the library's oauth2 functions or through the mod.io website. This is referred as an access token in the rest of the documentation. If an access token is supplied it will be used for all requests. lang : Optional[str] The mod.io API provides localization for a collection of languages. To specify responses from the API to be in a particular language, simply provide the lang parameter with an ISO 639 compliant language code. Default is US English. test : Optional[bool] Whether or not to use the mod.io test environment. If not included will default to False. version : Optional[str] An optional keyword argument to allow you to pick a specific version of the API to query, usually you shouldn't need to change this. Default is the latest supported version. platform : Optiona[TargetPlatform] The platform to target with requests. portal : Optional[TargetPortal] The portal to target with requests. ratelimit_max_sleep : Optiona[int] The maximum amount of time the library will sleep in the case of a ratelimit. If the ratelimit header returned dictates a longer sleep than that value then the library will instead raise the ratelimit. If it is less then the library will sleep for the duration required before retrying the request once. Attributes ----------- retry_after : int Number of seconds until the rate limits are reset for this API Key/access token. Is 0 until the API returns a 429. """ def __init__( self, *, api_key=None, access_token=None, lang="en", version="v1", test=False, platform=None, portal=None, ratelimit_max_sleep=math.inf ): self.lang = lang self.version = version self.test = test self.connection = Connection( test=test, api_key=api_key, access_token=access_token, version=version, lang=lang, platform=platform, portal=portal, ratelimit_max_sleep=ratelimit_max_sleep ) def __repr__(self): return f"< Client version={self.version} test={self.test} >" @property def rate_limit(self): warnings.warn("rate_limit is deprecated and will be removed in a future version", DeprecationWarning) return self.connection.rate_limit @property def rate_remain(self): warnings.warn("rate_remain is deprecated and will be removed in a future version", DeprecationWarning) return self.connection.rate_remain @property def retry_after(self): return self.connection.retry_after
[docs] def set_platform(self, platform: Optional[TargetPlatform] = None) -> None: """Change the platform targetted by the client. Call without an argument to not target any specific paltform. Parameters ----------- platform : Optional[TargetPlatform] The platform to set """ self.connection.platform = platform
[docs] def set_portal(self, portal: Optional[TargetPortal] = None) -> None: """Change the portal targetted by the client. Call without an argument to not target any specific portal. Parameters ----------- portal : Optional[TargetPortal] The portal to set """ self.connection.portal = portal
[docs] async def close(self): """|async| This function is used to clean up the client in order to close the application that it uses gracefully. At the moment it is only used to close the client's Session. |coro| """ await self.connection.close()
[docs] async def start(self): """|async| This function is used to start up the async part of the client. This is required to avoid sync users from having to clean up stuff. |coro| """ await self.connection.start()
[docs] def get_game(self, game_id: int) -> Game: """Queries the mod.io API for the given game ID and if found returns it as a Game instance. If not found raises NotFound. |coro| Parameters ----------- game_id : int The ID of the game to query the API for Raises ------- NotFound A game with the supplied id was not found. Returns -------- Game The game with the given ID """ game_json = self.connection.get_request(f"/games/{game_id}") return Game(connection=self.connection, **game_json)
[docs] async def async_get_game(self, game_id: int) -> Game: game_json = await self.connection.async_get_request(f"/games/{game_id}") return Game(connection=self.connection, **game_json)
[docs] def get_games(self, *, filters: Filter = None) -> Returned[Game]: """Gets all the games available on mod.io. Returns a named tuple with parameters results and pagination. |filterable| |coro| Parameters ----------- filters : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Returns -------- Returned[List[Game], Pagination] The results and pagination tuple from this request """ game_json = self.connection.get_request("/games", filters=filters) return Returned( [Game(connection=self.connection, **game) for game in game_json["data"]], Pagination(**game_json) )
[docs] async def async_get_games(self, *, filters: Filter = None) -> Returned[Game]: game_json = await self.connection.async_get_request("/games", filters=filters) return Returned( [Game(connection=self.connection, **game) for game in game_json["data"]], Pagination(**game_json) )
[docs] def get_my_user(self) -> User: """Gets the authenticated user's details (aka the user who created the API key/access token) |coro| Raises ------- Forbidden The access token is invalid/missing Returns ------- User The authenticated user """ me_json = self.connection.get_request("/me") return User(connection=self.connection, **me_json)
[docs] async def async_get_my_user(self) -> User: me_json = await self.connection.async_get_request("/me") return User(connection=self.connection, **me_json)
[docs] def get_my_subs(self, *, filters: Filter = None) -> Returned[Mod]: """Gets all the mods the authenticated user is subscribed to. |filterable| |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Raises ------- Forbidden The access token is invalid/missing Returns -------- Returned[List[Mod], Pagination] The results and pagination tuple from this request """ mod_json = self.connection.get_request("/me/subscribed", filters=filters) return Returned( [Mod(connection=self.connection, **mod) for mod in mod_json["data"]], Pagination(**mod_json) )
[docs] async def async_get_my_subs(self, *, filters: Filter = None) -> Returned[Mod]: mod_json = await self.connection.async_get_request("/me/subscribed", filters=filters) return Returned( [Mod(connection=self.connection, **mod) for mod in mod_json["data"]], Pagination(**mod_json) )
[docs] def get_my_events(self, *, filters: Filter = None) -> Returned[Event]: """Get events that have been fired specifically for the authenticated user. |filterable| |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Returns -------- Returned[List[Event], Pagination] The results and pagination tuple from this request """ events_json = self.connection.get_request("/me/events", filters=filters) return Returned([Event(**event) for event in events_json["data"]], Pagination(**events_json))
[docs] async def async_get_my_events(self, *, filters: Filter = None) -> Returned[Event]: events_json = await self.connection.async_get_request("/me/events", filters=filters) return Returned([Event(**event) for event in events_json["data"]], Pagination(**events_json))
[docs] def get_my_games(self, filters: Filter = None) -> Returned[Game]: """Get all the games the authenticated user added or is a team member of. |filterable| |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Raises ------- Forbidden The access token is invalid/missing Returns -------- Returned[List[Game], Pagination] The results and pagination tuple from this request """ game_json = self.connection.get_request("/me/games", filters=filters) return Returned( [Game(connection=self.connection, **game) for game in game_json["data"]], Pagination(**game_json) )
[docs] async def async_get_my_games(self, filters: Filter = None) -> Returned[Game]: game_json = await self.connection.async_get_request("/me/games", filters=filters) return Returned( [Game(connection=self.connection, **game) for game in game_json["data"]], Pagination(**game_json) )
[docs] def get_my_mods(self, *, filters: Filter = None) -> Returned[Mod]: """Get all the mods the authenticated user added or is a team member of. |filterable| |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Raises ------- Forbidden The access token is invalid/missing Returns -------- Returned[List[Mod], Pagination] The results and pagination tuple from this request """ mod_json = self.connection.get_request("/me/mods", filters=filters) return Returned( [Mod(connection=self.connection, **mod) for mod in mod_json["data"]], Pagination(**mod_json) )
[docs] async def async_get_my_mods(self, *, filters: Filter = None) -> Returned[Mod]: mod_json = await self.connection.async_get_request("/me/mods", filters=filters) return Returned( [Mod(connection=self.connection, **mod) for mod in mod_json["data"]], Pagination(**mod_json) )
[docs] def get_my_modfiles(self, *, filters: Filter = None) -> Returned[ModFile]: """Get all the mods the authenticated user uploaded. The returned modfile objects cannot be edited or deleted and do not have a `game_id` attribute. Returns a named tuple with parameters results and pagination. |filterable| |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Raises ------- Forbidden The access token is invalid/missing Returns -------- Returned[List[ModFile], Pagination] The results and pagination tuple from this request """ files_json = self.connection.get_request("/me/files", filters=filters) return Returned( [ModFile(**file, connection=self.connection) for file in files_json["data"]], Pagination(**files_json), )
[docs] async def async_get_my_modfiles(self, *, filters: Filter = None) -> Returned[ModFile]: files_json = await self.connection.async_get_request("/me/files", filters=filters) return Returned( [ModFile(**file, connection=self.connection) for file in files_json["data"]], Pagination(**files_json), )
[docs] def get_my_ratings(self, *, filters: Filter = None) -> Returned[Rating]: """Get all the ratings the authentitated user has submitted. Returns a named with parameter results and pagination. |filterable| |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Raises ------- Forbidden The access token is invalid/missing Returns -------- Returned[List[Rating], Pagination] The results and pagination tuple from this request """ ratings = self.connection.get_request("/me/ratings", filters=filters) return Returned( [Rating(**rating, connection=self.connection) for rating in ratings["data"]], Pagination(**ratings), )
[docs] async def async_get_my_ratings(self, *, filters: Filter = None) -> Returned[Rating]: ratings = await self.connection.async_get_request("/me/ratings", filters=filters) return Returned( [Rating(**rating, connection=self.connection) for rating in ratings["data"]], Pagination(**ratings), )
[docs] def get_my_mutes(self, *, filters: Filter = None) -> Returned[User]: """Get all users muted by this user |coro| Parameters ----------- filter : Optional[Filter] A instance of Filter to be used for filtering, paginating and sorting results Raises ------- Forbidden The access token is invalid/missing Returns -------- Returned[List[User], Pagination] The results and pagination tuple from this request """ users = self.connection.get_request("/me/users/muted", filters=filters) return Returned( [User(**user, connection=self.connection) for user in users["data"]], Pagination(**users) )
[docs] async def async_get_my_mutes(self, *, filters: Filter = None) -> Returned[User]: users = await self.connection.async_get_request("/me/users/muted", filters=filters) return Returned( [User(**user, connection=self.connection) for user in users["data"]], Pagination(**users) )
[docs] def email_request(self, email: str): # pragma: no cover """Posts an email request for an OAuth2 token. A code will be sent to the given email address which can then be entered into :func:`email_exchange`. |coro| Parameters ---------- email : str A valid email to which the 5-digit code will be sent """ resp = self.connection.post_request( "/oauth/emailrequest", params={"email": email, "api_key": self.connection.api_key}, h_type=2 ) return Message(**resp)
[docs] async def async_email_request(self, email: str): # pragma: no cover resp = await self.connection.async_post_request( "/oauth/emailrequest", params={"email": email, "api_key": self.connection.api_key}, h_type=2 ) return Message(**resp)
[docs] def email_exchange(self, code: int, *, date_expires: datetime.datetime = None) -> str: # pragma: no cover """Exchanges the given 5-digit code for an OAuth2 token. |coro| Parameters ---------- code : int A 5-digit code received by email less than 15 minutes ago date_expires : Optional[datetime.datetime] Datetime of when the token will expire. By default this is a year, value cannot be greater than a year. Raises ------- Unauthorized Invalid security code ValueError Security code was not 5 digits long Returns -------- str The access code. """ if len(code) != 5: raise ValueError("Security code must be 5 digits") params = {"security_code": code, "api_key": self.connection.api_key} if date_expires is not None: params["date_expires"] = date_expires.timestamp() resp = self.connection.post_request( "/oauth/emailexchange", params=params, h_type=2, ) return resp["access_token"]
[docs] async def async_email_exchange( self, code: int, *, date_expires: datetime.datetime = None ) -> str: # pragma: no cover if len(code) != 5: raise ValueError("Security code must be 5 digits") params = {"security_code": code, "api_key": self.connection.api_key} if date_expires is not None: params["date_expires"] = date_expires.timestamp() resp = await self.connection.async_post_request( "/oauth/emailexchange", params=params, h_type=2, ) return resp["access_token"]