"""This is only a temporary solution. Complete Photos API client is yet to be developed.""" import asyncio from base64 import b64decode, b64encode from os import makedirs, path, sep from random import choice from traceback import print_exc from typing import Tuple, Union import aiofiles from aiohttp import ClientSession, FormData from ujson import dumps from classes.exceptions import ( AlbumCreationDuplicateError, AlbumCreationError, AlbumCreationNameError, SubmissionUploadError, UserCreationDuplicateError, UserCreationError, ) from modules.logger import logWrite from modules.utils import configGet http_session = ClientSession( json_serialize=dumps, ) async def authorize() -> str: makedirs(configGet("cache", "locations"), exist_ok=True) if path.exists(configGet("cache", "locations") + sep + "api_access") is True: async with aiofiles.open( configGet("cache", "locations") + sep + "api_access", "rb" ) as file: token = b64decode(await file.read()).decode("utf-8") if ( await http_session.get( configGet("address", "posting", "api") + "/users/me/", headers={"Authorization": f"Bearer {token}"}, ) ).status == 200: return token payload = { "grant_type": "password", "scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write", "username": configGet("username", "posting", "api"), "password": configGet("password", "posting", "api"), } response = await http_session.post( configGet("address", "posting", "api") + "/token", data=payload ) if not response.ok: logWrite( f'Incorrect API credentials! Could not login into "{configGet("address", "posting", "api")}" using login "{configGet("username", "posting", "api")}": HTTP {response.status}' ) raise ValueError async with aiofiles.open( configGet("cache", "locations") + sep + "api_access", "wb" ) as file: await file.write( b64encode((await response.json())["access_token"].encode("utf-8")) ) return (await response.json())["access_token"] async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]: """Returns random image id and filename from the queue. ### Returns: * `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed. """ token = await authorize() if token is None else token logWrite( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue' ) resp = await http_session.get( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue', headers={"Authorization": f"Bearer {token}"}, ) print(await resp.json(), flush=True) if resp.status != 200: logWrite( f'Could not get photos from album {configGet("album", "posting", "api")}: HTTP {resp.status}' ) logWrite( f'Could not get photos from "{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue" using token "{token}": HTTP {resp.status}', debug=True, ) raise ValueError if len((await resp.json())["results"]) == 0: raise KeyError pic = choice((await resp.json())["results"]) return pic["id"], pic["filename"] async def upload_pic( filepath: str, allow_duplicates: bool = False, token: Union[str, None] = None ) -> Tuple[bool, list, str]: token = await authorize() if token is None else token try: pic_name = path.basename(filepath) logWrite(f"Uploading {pic_name} to the API...", debug=True) async with aiofiles.open(filepath, "rb") as f: file_bytes = await f.read() formdata = FormData() formdata.add_field( "file", file_bytes, filename=pic_name, content_type="image/jpeg" ) response = await http_session.post( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos', params={ "caption": "queue", "compress": "false", "ignore_duplicates": str(allow_duplicates).lower(), }, headers={"Authorization": f"Bearer {token}"}, data=formdata, ) if response.status != 200 and response.status != 409: logWrite( f"Could not upload '{filepath}' to API: HTTP {response.status} with message '{response.content}'" ) raise SubmissionUploadError( str(filepath), response.status, response.content ) duplicates = [] if "duplicates" in (await response.json()): for index, duplicate in enumerate((await response.json())["duplicates"]): if (await response.json())["access_token"] is None: duplicates.append( f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/photos/{duplicate["id"]}' ) else: duplicates.append( f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{(await response.json())["access_token"]}?id={index}' ) return True, duplicates, (await response.json())["id"] except Exception as exp: print_exc() return False, [], "" async def find_pic( name: str, caption: Union[str, None] = None, token: Union[str, None] = None ) -> Union[dict, None]: token = await authorize() if token is None else token try: response = await http_session.get( f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos', params={"q": name, "caption": caption}, headers={"Authorization": f"Bearer {token}"}, ) # logWrite(response.json()) if response.status != 200: return None if len((await response.json())["results"]) == 0: return None return (await response.json())["results"] except Exception as exp: logWrite( f"Could not find image with name '{name}' and caption '{caption}' due to: {exp}" ) return None async def move_pic(id: str, token: Union[str, None] = None) -> bool: token = await authorize() if token is None else token try: await http_session.patch( f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent', headers={"Authorization": f"Bearer {token}"}, ) return True except: return False async def create_user(username: str, email: str, password: str) -> None: response = await http_session.post( f'{configGet("address", "posting", "api")}/users', data={"user": username, "email": email, "password": password}, ) if response.status == 409: raise UserCreationDuplicateError(username) elif response.status != 204: raise UserCreationError(response.status, await response.text(encoding="utf-8")) return None async def create_album(name: str, title: str) -> None: token = await authorize() response = await http_session.post( f'{configGet("address", "posting", "api")}/albums', params={"name": name, "title": title}, headers={"Authorization": f"Bearer {token}"}, ) if response.status == 409: raise AlbumCreationDuplicateError(name) elif response.status == 406: raise AlbumCreationNameError(await response.json()) elif response.status != 200: raise AlbumCreationError(response.status, await response.text(encoding="utf-8")) return None if __name__ == "__main__": print(asyncio.run(authorize()))