This repository has been archived on 2024-08-21. You can view files and clone it, but cannot push or open issues or pull requests.
TelegramPoster/modules/api_client.py

209 lines
8.0 KiB
Python

"""This is only a temporary solution. Complete Photos API client is yet to be developed."""
import asyncio
from base64 import b64decode, b64encode
from os import makedirs, path, sep
from random import choice
from traceback import print_exc
from typing import Tuple, Union
import aiofiles
from aiohttp import ClientSession, FormData
from ujson import dumps
from classes.exceptions import (
AlbumCreationDuplicateError,
AlbumCreationError,
AlbumCreationNameError,
SubmissionUploadError,
UserCreationDuplicateError,
UserCreationError,
)
from modules.logger import logWrite
from modules.utils import configGet
http_session = ClientSession(
json_serialize=dumps,
)
async def authorize() -> str:
makedirs(configGet("cache", "locations"), exist_ok=True)
if path.exists(configGet("cache", "locations") + sep + "api_access") is True:
async with aiofiles.open(
configGet("cache", "locations") + sep + "api_access", "rb"
) as file:
token = b64decode(await file.read()).decode("utf-8")
if (
await http_session.get(
configGet("address", "posting", "api") + "/users/me/",
headers={"Authorization": f"Bearer {token}"},
)
).status == 200:
return token
payload = {
"grant_type": "password",
"scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write",
"username": configGet("username", "posting", "api"),
"password": configGet("password", "posting", "api"),
}
response = await http_session.post(
configGet("address", "posting", "api") + "/token", data=payload
)
if not response.ok:
logWrite(
f'Incorrect API credentials! Could not login into "{configGet("address", "posting", "api")}" using login "{configGet("username", "posting", "api")}": HTTP {response.status}'
)
raise ValueError
async with aiofiles.open(
configGet("cache", "locations") + sep + "api_access", "wb"
) as file:
await file.write(
b64encode((await response.json())["access_token"].encode("utf-8"))
)
return (await response.json())["access_token"]
async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]:
"""Returns random image id and filename from the queue.
### Returns:
* `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed.
"""
token = await authorize() if token is None else token
logWrite(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue'
)
resp = await http_session.get(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue',
headers={"Authorization": f"Bearer {token}"},
)
print(await resp.json(), flush=True)
if resp.status != 200:
logWrite(
f'Could not get photos from album {configGet("album", "posting", "api")}: HTTP {resp.status}'
)
logWrite(
f'Could not get photos from "{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue" using token "{token}": HTTP {resp.status}',
debug=True,
)
raise ValueError
if len((await resp.json())["results"]) == 0:
raise KeyError
pic = choice((await resp.json())["results"])
return pic["id"], pic["filename"]
async def upload_pic(
filepath: str, allow_duplicates: bool = False, token: Union[str, None] = None
) -> Tuple[bool, list]:
token = await authorize() if token is None else token
try:
pic_name = path.basename(filepath)
logWrite(f"Uploading {pic_name} to the API...", debug=True)
async with aiofiles.open(filepath, "rb") as f:
file_bytes = await f.read()
formdata = FormData()
formdata.add_field(
"file", file_bytes, filename=pic_name, content_type="image/jpeg"
)
response = await http_session.post(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
params={
"caption": "queue",
"compress": "false",
"ignore_duplicates": str(allow_duplicates).lower(),
},
headers={"Authorization": f"Bearer {token}"},
data=formdata,
)
if response.status != 200 and response.status != 409:
logWrite(
f"Could not upload '{filepath}' to API: HTTP {response.status} with message '{response.content}'"
)
raise SubmissionUploadError(
str(filepath), response.status, response.content
)
duplicates = []
if "duplicates" in (await response.json()):
for index, duplicate in enumerate((await response.json())["duplicates"]):
if (await response.json())["access_token"] is None:
duplicates.append(
f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/photos/{duplicate["id"]}'
)
else:
duplicates.append(
f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{(await response.json())["access_token"]}?id={index}'
)
return True, duplicates
except Exception as exp:
print_exc()
return False, []
async def find_pic(
name: str, caption: Union[str, None] = None, token: Union[str, None] = None
) -> Union[dict, None]:
token = await authorize() if token is None else token
try:
response = await http_session.get(
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
params={"q": name, "caption": caption},
headers={"Authorization": f"Bearer {token}"},
)
# logWrite(response.json())
if response.status != 200:
return None
if len((await response.json())["results"]) == 0:
return None
return (await response.json())["results"]
except Exception as exp:
logWrite(
f"Could not find image with name '{name}' and caption '{caption}' due to: {exp}"
)
return None
async def move_pic(id: str, token: Union[str, None] = None) -> bool:
token = await authorize() if token is None else token
try:
await http_session.patch(
f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent',
headers={"Authorization": f"Bearer {token}"},
)
return True
except:
return False
async def create_user(username: str, email: str, password: str) -> None:
response = await http_session.post(
f'{configGet("address", "posting", "api")}/users',
data={"user": username, "email": email, "password": password},
)
if response.status == 409:
raise UserCreationDuplicateError(username)
elif response.status != 204:
raise UserCreationError(response.status, await response.text(encoding="utf-8"))
return None
async def create_album(name: str, title: str) -> None:
token = await authorize()
response = await http_session.post(
f'{configGet("address", "posting", "api")}/albums',
params={"name": name, "title": title},
headers={"Authorization": f"Bearer {token}"},
)
if response.status == 409:
raise AlbumCreationDuplicateError(name)
elif response.status == 406:
raise AlbumCreationNameError(await response.json())
elif response.status != 200:
raise AlbumCreationError(response.status, await response.text(encoding="utf-8"))
return None
if __name__ == "__main__":
print(asyncio.run(authorize()))