This repository has been archived on 2024-08-21. You can view files and clone it, but cannot push or open issues or pull requests.
TelegramPoster/modules/api_client.py

60 lines
2.7 KiB
Python
Raw Normal View History

2023-01-17 15:11:23 +02:00
from os import makedirs, path, sep
from base64 import b64encode, b64decode
from random import choice
from typing import Tuple
from requests import get, post, put, patch
from modules.utils import configGet
def authorize() -> str:
makedirs(configGet("cache", "locations"), exist_ok=True)
if path.exists(configGet("cache", "locations")+sep+"api_access") is True:
with open(configGet("cache", "locations")+sep+"api_access", "rb") as file:
token = b64decode(file.read()).decode("utf-8")
if "user" in get(configGet("address", "posting", "api")+"/users/me/", headers={"Authorization": f"Bearer {token}"}).json():
return token
payload = {
"grant_type": "password",
"scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write",
"username": configGet("username", "posting", "api"),
"password": configGet("password", "posting", "api")
}
response = post(configGet("address", "posting", "api")+"/token", data=payload).json()
with open(configGet("cache", "locations")+sep+"api_access", "wb") as file:
file.write(b64encode(response["access_token"].encode("utf-8")))
return response["access_token"]
def random_pic() -> Tuple[str, str]:
"""Returns random image id and filename from the queue.
### Returns:
* `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed.
"""
token = authorize()
pic = choice(get(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size=100&caption=queue', headers={"Authorization": f"Bearer {token}"}).json()["results"])
return pic["id"], pic["filename"]
def upload_pic(filepath: str) -> bool:
token = authorize()
try:
pic_name = path.basename(filepath)
files = {'file': (pic_name, open(filepath, 'rb'), 'image/jpeg')}
response = post(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos&caption=queue', headers={"Authorization": f"Bearer {token}"}, files=files).json()
print(response, flush=True)
duplicates = []
if "duplicates" in response:
for duplicate in response["duplicates"]:
duplicates.append(f'{configGet("address", "posting", "api")}/photos/{duplicate["id"]}')
return True, duplicates
except:
return False, []
def move_pic(id: str) -> bool:
token = authorize()
try:
patch(f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent', headers={"Authorization": f"Bearer {token}"})
return True
except:
return False
if __name__ == "__main__":
print(authorize())