60 lines
2.7 KiB
Python
60 lines
2.7 KiB
Python
from os import makedirs, path, sep
|
|
from base64 import b64encode, b64decode
|
|
from random import choice
|
|
from typing import Tuple
|
|
from requests import get, post, put, patch
|
|
from modules.utils import configGet
|
|
|
|
def authorize() -> str:
|
|
makedirs(configGet("cache", "locations"), exist_ok=True)
|
|
if path.exists(configGet("cache", "locations")+sep+"api_access") is True:
|
|
with open(configGet("cache", "locations")+sep+"api_access", "rb") as file:
|
|
token = b64decode(file.read()).decode("utf-8")
|
|
if "user" in get(configGet("address", "posting", "api")+"/users/me/", headers={"Authorization": f"Bearer {token}"}).json():
|
|
return token
|
|
payload = {
|
|
"grant_type": "password",
|
|
"scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write",
|
|
"username": configGet("username", "posting", "api"),
|
|
"password": configGet("password", "posting", "api")
|
|
}
|
|
response = post(configGet("address", "posting", "api")+"/token", data=payload).json()
|
|
with open(configGet("cache", "locations")+sep+"api_access", "wb") as file:
|
|
file.write(b64encode(response["access_token"].encode("utf-8")))
|
|
return response["access_token"]
|
|
|
|
def random_pic() -> Tuple[str, str]:
|
|
"""Returns random image id and filename from the queue.
|
|
|
|
### Returns:
|
|
* `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed.
|
|
"""
|
|
token = authorize()
|
|
pic = choice(get(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size=100&caption=queue', headers={"Authorization": f"Bearer {token}"}).json()["results"])
|
|
return pic["id"], pic["filename"]
|
|
|
|
def upload_pic(filepath: str) -> bool:
|
|
token = authorize()
|
|
try:
|
|
pic_name = path.basename(filepath)
|
|
files = {'file': (pic_name, open(filepath, 'rb'), 'image/jpeg')}
|
|
response = post(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos&caption=queue', headers={"Authorization": f"Bearer {token}"}, files=files).json()
|
|
print(response, flush=True)
|
|
duplicates = []
|
|
if "duplicates" in response:
|
|
for duplicate in response["duplicates"]:
|
|
duplicates.append(f'{configGet("address", "posting", "api")}/photos/{duplicate["id"]}')
|
|
return True, duplicates
|
|
except:
|
|
return False, []
|
|
|
|
def move_pic(id: str) -> bool:
|
|
token = authorize()
|
|
try:
|
|
patch(f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent', headers={"Authorization": f"Bearer {token}"})
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
print(authorize()) |