TelegramPoster/modules/api_client.py

268 lines
9.6 KiB
Python

"""This is only a temporary solution. Complete Photos API client is yet to be developed."""
import asyncio
from base64 import b64decode, b64encode
from os import makedirs, path, sep
from random import choice
from traceback import print_exc
from typing import Tuple, Union
import aiofiles
from aiohttp import ClientSession, FormData
from ujson import dumps
from classes.exceptions import (
AlbumCreationDuplicateError,
AlbumCreationError,
AlbumCreationNameError,
SubmissionUploadError,
UserCreationDuplicateError,
UserCreationError,
)
from modules.logger import logWrite
from modules.utils import configGet, locale
http_session = ClientSession(
json_serialize=dumps,
)
async def authorize() -> str:
makedirs(configGet("cache", "locations"), exist_ok=True)
if path.exists(configGet("cache", "locations") + sep + "api_access") is True:
async with aiofiles.open(
configGet("cache", "locations") + sep + "api_access", "rb"
) as file:
token = b64decode(await file.read()).decode("utf-8")
if (
await http_session.get(
configGet("address", "posting", "api") + "/users/me/",
headers={"Authorization": f"Bearer {token}"},
)
).status == 200:
return token
payload = {
"grant_type": "password",
"scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write",
"username": configGet("username", "posting", "api"),
"password": configGet("password", "posting", "api"),
}
response = await http_session.post(
configGet("address", "posting", "api") + "/token", data=payload
)
if not response.ok:
logWrite(
locale(
"api_creds_invalid",
"console",
locale=configGet("locale_log").format(
configGet("address", "posting", "api"),
configGet("username", "posting", "api"),
response.status,
),
)
)
raise ValueError
async with aiofiles.open(
configGet("cache", "locations") + sep + "api_access", "wb"
) as file:
await file.write(
b64encode((await response.json())["access_token"].encode("utf-8"))
)
return (await response.json())["access_token"]
async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]:
"""Returns random image id and filename from the queue.
### Returns:
* `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed.
"""
token = await authorize() if token is None else token
logWrite(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue'
)
resp = await http_session.get(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue',
headers={"Authorization": f"Bearer {token}"},
)
logWrite(
locale("random_pic_response", "console", locale=configGet("locale_log")).format(
await resp.json()
),
debug=True,
)
if resp.status != 200:
logWrite(
locale(
"random_pic_error_code",
"console",
locale=configGet("locale_log").format(
configGet("album", "posting", "api"), resp.status
),
),
)
logWrite(
locale(
"random_pic_error_debug",
"console",
locale=configGet("locale_log").format(
configGet("address", "posting", "api"),
configGet("album", "posting", "api"),
configGet("page_size", "posting"),
token,
resp.status,
),
),
debug=True,
)
raise ValueError
if len((await resp.json())["results"]) == 0:
raise KeyError
pic = choice((await resp.json())["results"])
return pic["id"], pic["filename"]
async def upload_pic(
filepath: str, allow_duplicates: bool = False, token: Union[str, None] = None
) -> Tuple[bool, list, Union[str, None]]:
token = await authorize() if token is None else token
try:
pic_name = path.basename(filepath)
logWrite(f"Uploading {pic_name} to the API...", debug=True)
async with aiofiles.open(filepath, "rb") as f:
file_bytes = await f.read()
formdata = FormData()
formdata.add_field(
"file", file_bytes, filename=pic_name, content_type="image/jpeg"
)
response = await http_session.post(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
params={
"caption": "queue",
"compress": "false",
"ignore_duplicates": str(allow_duplicates).lower(),
},
headers={"Authorization": f"Bearer {token}"},
data=formdata,
)
response_json = await response.json()
if response.status != 200 and response.status != 409:
logWrite(
locale(
"pic_upload_error",
"console",
locale=configGet("locale_log").format(
filepath, response.status, response.content
),
),
)
raise SubmissionUploadError(
str(filepath), response.status, response.content
)
id = response_json["id"] if "id" in await response.json() else None
duplicates = []
if "duplicates" in response_json:
for index, duplicate in enumerate(response_json["duplicates"]): # type: ignore
if response_json["access_token"] is None:
duplicates.append(
f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/photos/{duplicate["id"]}'
)
else:
duplicates.append(
f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{response_json["access_token"]}?id={index}'
)
return True, duplicates, id
except Exception as exp:
print_exc()
return False, [], None
async def find_pic(
name: str, caption: Union[str, None] = None, token: Union[str, None] = None
) -> Union[dict, None]:
token = await authorize() if token is None else token
try:
response = await http_session.get(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
params={"q": name, "caption": caption},
headers={"Authorization": f"Bearer {token}"},
)
# logWrite(response.json())
if response.status != 200:
return None
if len((await response.json())["results"]) == 0:
return None
return (await response.json())["results"]
except Exception as exp:
logWrite(
locale(
"find_pic_error",
"console",
locale=configGet("locale_log").format(name, caption, exp),
),
)
return None
async def move_pic(id: str, token: Union[str, None] = None) -> bool:
token = await authorize() if token is None else token
try:
response = await http_session.patch(
f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent',
headers={"Authorization": f"Bearer {token}"},
)
if response.status != 200:
logWrite(f"Media moving failed with HTTP {response.status}", debug=True)
return False
return True
except:
return False
async def remove_pic(id: str, token: Union[str, None] = None) -> bool:
token = await authorize() if token is None else token
try:
response = await http_session.delete(
f'{configGet("address", "posting", "api")}/photos/{id}',
headers={"Authorization": f"Bearer {token}"},
)
if response.status != 204:
logWrite(f"Media removal failed with HTTP {response.status}", debug=True)
return False
return True
except:
return False
async def create_user(username: str, email: str, password: str) -> None:
response = await http_session.post(
f'{configGet("address", "posting", "api")}/users',
data={"user": username, "email": email, "password": password},
)
if response.status == 409:
raise UserCreationDuplicateError(username)
elif response.status != 204:
raise UserCreationError(response.status, await response.text(encoding="utf-8"))
return None
async def create_album(name: str, title: str) -> None:
token = await authorize()
response = await http_session.post(
f'{configGet("address", "posting", "api")}/albums',
params={"name": name, "title": title},
headers={"Authorization": f"Bearer {token}"},
)
if response.status == 409:
raise AlbumCreationDuplicateError(name)
elif response.status == 406:
raise AlbumCreationNameError(await response.json())
elif response.status != 200:
raise AlbumCreationError(response.status, await response.text(encoding="utf-8"))
return None
if __name__ == "__main__":
print(asyncio.run(authorize()))