|
|
@ -7,11 +7,13 @@ except ModuleNotFoundError:
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
import asyncio
|
|
|
|
from base64 import b64decode, b64encode
|
|
|
|
from base64 import b64decode, b64encode
|
|
|
|
|
|
|
|
from traceback import print_exc
|
|
|
|
|
|
|
|
import aiofiles
|
|
|
|
from ujson import dumps
|
|
|
|
from ujson import dumps
|
|
|
|
from os import makedirs, path, sep
|
|
|
|
from os import makedirs, path, sep
|
|
|
|
from random import choice
|
|
|
|
from random import choice
|
|
|
|
from typing import Tuple, Union
|
|
|
|
from typing import Tuple, Union
|
|
|
|
from aiohttp import ClientSession
|
|
|
|
from aiohttp import ClientSession, FormData
|
|
|
|
|
|
|
|
|
|
|
|
from classes.exceptions import SubmissionUploadError
|
|
|
|
from classes.exceptions import SubmissionUploadError
|
|
|
|
|
|
|
|
|
|
|
@ -25,8 +27,8 @@ http_session = ClientSession(json_serialize=dumps, )
|
|
|
|
async def authorize() -> str:
|
|
|
|
async def authorize() -> str:
|
|
|
|
makedirs(configGet("cache", "locations"), exist_ok=True)
|
|
|
|
makedirs(configGet("cache", "locations"), exist_ok=True)
|
|
|
|
if path.exists(configGet("cache", "locations")+sep+"api_access") is True:
|
|
|
|
if path.exists(configGet("cache", "locations")+sep+"api_access") is True:
|
|
|
|
with open(configGet("cache", "locations")+sep+"api_access", "rb") as file:
|
|
|
|
async with aiofiles.open(configGet("cache", "locations")+sep+"api_access", "rb") as file:
|
|
|
|
token = b64decode(file.read()).decode("utf-8")
|
|
|
|
token = b64decode(await file.read()).decode("utf-8")
|
|
|
|
if (await http_session.get(configGet("address", "posting", "api")+"/users/me/", headers={"Authorization": f"Bearer {token}"})).status == 200:
|
|
|
|
if (await http_session.get(configGet("address", "posting", "api")+"/users/me/", headers={"Authorization": f"Bearer {token}"})).status == 200:
|
|
|
|
return token
|
|
|
|
return token
|
|
|
|
payload = {
|
|
|
|
payload = {
|
|
|
@ -36,11 +38,11 @@ async def authorize() -> str:
|
|
|
|
"password": configGet("password", "posting", "api")
|
|
|
|
"password": configGet("password", "posting", "api")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
response = await http_session.post(configGet("address", "posting", "api")+"/token", data=payload)
|
|
|
|
response = await http_session.post(configGet("address", "posting", "api")+"/token", data=payload)
|
|
|
|
if response.status != 200:
|
|
|
|
if not response.ok:
|
|
|
|
logWrite(f'Incorrect API credentials! Could not login into "{configGet("address", "posting", "api")}" using login "{configGet("username", "posting", "api")}": HTTP {response.status}')
|
|
|
|
logWrite(f'Incorrect API credentials! Could not login into "{configGet("address", "posting", "api")}" using login "{configGet("username", "posting", "api")}": HTTP {response.status}')
|
|
|
|
raise ValueError
|
|
|
|
raise ValueError
|
|
|
|
with open(configGet("cache", "locations")+sep+"api_access", "wb") as file:
|
|
|
|
async with aiofiles.open(configGet("cache", "locations")+sep+"api_access", "wb") as file:
|
|
|
|
file.write(b64encode((await response.json())["access_token"].encode("utf-8")))
|
|
|
|
await file.write(b64encode((await response.json())["access_token"].encode("utf-8")))
|
|
|
|
return (await response.json())["access_token"]
|
|
|
|
return (await response.json())["access_token"]
|
|
|
|
|
|
|
|
|
|
|
|
async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]:
|
|
|
|
async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]:
|
|
|
@ -66,8 +68,17 @@ async def upload_pic(filepath: str, allow_duplicates: bool = False, token: Union
|
|
|
|
token = await authorize() if token is None else token
|
|
|
|
token = await authorize() if token is None else token
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
pic_name = path.basename(filepath)
|
|
|
|
pic_name = path.basename(filepath)
|
|
|
|
files = {'file': (pic_name, open(filepath, 'rb'), 'image/jpeg')}
|
|
|
|
logWrite(f"Uploading {pic_name} to the API...", debug=True)
|
|
|
|
response = await http_session.post(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos', params={"caption": "queue", "compress": False, "ignore_duplicates": allow_duplicates}, headers={"Authorization": f"Bearer {token}"}, files=files)
|
|
|
|
async with aiofiles.open(filepath, "rb") as f:
|
|
|
|
|
|
|
|
file_bytes = await f.read()
|
|
|
|
|
|
|
|
formdata = FormData()
|
|
|
|
|
|
|
|
formdata.add_field('file', file_bytes, filename=pic_name, content_type='image/jpeg')
|
|
|
|
|
|
|
|
response = await http_session.post(
|
|
|
|
|
|
|
|
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
|
|
|
|
|
|
|
|
params={"caption": "queue", "compress": "false", "ignore_duplicates": str(allow_duplicates).lower()},
|
|
|
|
|
|
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
|
|
|
|
|
|
data=formdata
|
|
|
|
|
|
|
|
)
|
|
|
|
if response.status != 200 and response.status != 409:
|
|
|
|
if response.status != 200 and response.status != 409:
|
|
|
|
logWrite(f"Could not upload '{filepath}' to API: HTTP {response.status} with message '{response.content}'")
|
|
|
|
logWrite(f"Could not upload '{filepath}' to API: HTTP {response.status} with message '{response.content}'")
|
|
|
|
raise SubmissionUploadError(str(filepath), response.status, response.content)
|
|
|
|
raise SubmissionUploadError(str(filepath), response.status, response.content)
|
|
|
@ -79,7 +90,8 @@ async def upload_pic(filepath: str, allow_duplicates: bool = False, token: Union
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
duplicates.append(f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{(await response.json())["access_token"]}?id={index}')
|
|
|
|
duplicates.append(f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{(await response.json())["access_token"]}?id={index}')
|
|
|
|
return True, duplicates
|
|
|
|
return True, duplicates
|
|
|
|
except:
|
|
|
|
except Exception as exp:
|
|
|
|
|
|
|
|
print_exc()
|
|
|
|
return False, []
|
|
|
|
return False, []
|
|
|
|
|
|
|
|
|
|
|
|
async def find_pic(name: str, caption: Union[str, None] = None, token: Union[str, None] = None) -> Union[dict, None]:
|
|
|
|
async def find_pic(name: str, caption: Union[str, None] = None, token: Union[str, None] = None) -> Union[dict, None]:
|
|
|
|