"""This is only a temporary solution. Complete Photos API client is yet to be developed.""" import asyncio from base64 import b64decode, b64encode from os import makedirs, path, sep from random import choice from traceback import print_exc from typing import Tuple, Union import aiofiles from aiohttp import ClientSession, FormData from ujson import dumps from classes.exceptions import ( AlbumCreationDuplicateError, AlbumCreationError, AlbumCreationNameError, SubmissionUploadError, UserCreationDuplicateError, UserCreationError, ) from modules.logger import logWrite from modules.utils import configGet http_session = ClientSession( json_serialize=dumps, ) async def authorize() -> str: makedirs(configGet("cache", "locations"), exist_ok=True) if path.exists(configGet("cache", "locations") + sep + "api_access") is True: async with aiofiles.open( configGet("cache", "locations") + sep + "api_access", "rb" ) as file: token = b64decode(await file.read()).decode("utf-8") if ( await http_session.get( configGet("address", "posting", "api") + "/users/me/", headers={"Authorization": f"Bearer {token}"}, ) ).status == 200: return token payload = { "grant_type": "password", "scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write", "username": configGet("username", "posting", "api"), "password": configGet("password", "posting", "api"), } response = await http_session.post( configGet("address", "posting", "api") + "/token", data=payload ) if not response.ok: logWrite( f'Incorrect API credentials! Could not login into "{configGet("address", "posting", "api")}" using login "{configGet("username", "posting", "api")}": HTTP {response.status}' ) raise ValueError async with aiofiles.open( configGet("cache", "locations") + sep + "api_access", "wb" ) as file: await file.write( b64encode((await response.json())["access_token"].encode("utf-8")) ) return (await response.json())["access_token"] async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]: """Returns random image id and filename from the queue. ### Returns: * `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed. """ token = await authorize() if token is None else token logWrite( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue' ) resp = await http_session.get( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue', headers={"Authorization": f"Bearer {token}"}, ) logWrite("Random pic response: " + str(await resp.json()), debug=True) if resp.status != 200: logWrite( f'Could not get photos from album {configGet("album", "posting", "api")}: HTTP {resp.status}' ) logWrite( f'Could not get photos from "{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue" using token "{token}": HTTP {resp.status}', debug=True, ) raise ValueError if len((await resp.json())["results"]) == 0: raise KeyError pic = choice((await resp.json())["results"]) return pic["id"], pic["filename"] async def upload_pic( filepath: str, allow_duplicates: bool = False, token: Union[str, None] = None ) -> Tuple[bool, list, Union[str, None]]: token = await authorize() if token is None else token try: pic_name = path.basename(filepath) logWrite(f"Uploading {pic_name} to the API...", debug=True) async with aiofiles.open(filepath, "rb") as f: file_bytes = await f.read() formdata = FormData() formdata.add_field( "file", file_bytes, filename=pic_name, content_type="image/jpeg" ) response = await http_session.post( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos', params={ "caption": "queue", "compress": "false", "ignore_duplicates": str(allow_duplicates).lower(), }, headers={"Authorization": f"Bearer {token}"}, data=formdata, ) response_json = await response.json() if response.status != 200 and response.status != 409: logWrite( f"Could not upload '{filepath}' to API: HTTP {response.status} with message '{response.content}'" ) raise SubmissionUploadError( str(filepath), response.status, response.content ) id = response_json["id"] if "id" in await response.json() else None duplicates = [] if "duplicates" in response_json: for index, duplicate in enumerate(response_json)["duplicates"]: # type: ignore if response_json["access_token"] is None: duplicates.append( f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/photos/{duplicate["id"]}' ) else: duplicates.append( f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{response_json["access_token"]}?id={index}' ) return True, duplicates, id except Exception as exp: print_exc() return False, [], None async def find_pic( name: str, caption: Union[str, None] = None, token: Union[str, None] = None ) -> Union[dict, None]: token = await authorize() if token is None else token try: response = await http_session.get( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos', params={"q": name, "caption": caption}, headers={"Authorization": f"Bearer {token}"}, ) # logWrite(response.json()) if response.status != 200: return None if len((await response.json())["results"]) == 0: return None return (await response.json())["results"] except Exception as exp: logWrite( f"Could not find image with name '{name}' and caption '{caption}' due to: {exp}" ) return None async def move_pic(id: str, token: Union[str, None] = None) -> bool: token = await authorize() if token is None else token try: await http_session.patch( f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent', headers={"Authorization": f"Bearer {token}"}, ) return True except: return False async def create_user(username: str, email: str, password: str) -> None: response = await http_session.post( f'{configGet("address", "posting", "api")}/users', data={"user": username, "email": email, "password": password}, ) if response.status == 409: raise UserCreationDuplicateError(username) elif response.status != 204: raise UserCreationError(response.status, await response.text(encoding="utf-8")) return None async def create_album(name: str, title: str) -> None: token = await authorize() response = await http_session.post( f'{configGet("address", "posting", "api")}/albums', params={"name": name, "title": title}, headers={"Authorization": f"Bearer {token}"}, ) if response.status == 409: raise AlbumCreationDuplicateError(name) elif response.status == 406: raise AlbumCreationNameError(await response.json()) elif response.status != 200: raise AlbumCreationError(response.status, await response.text(encoding="utf-8")) return None if __name__ == "__main__": print(asyncio.run(authorize()))