2023-02-17 12:51:38 +02:00
""" This is only a temporary solution. Complete Photos API client is yet to be developed. """
2023-02-14 17:25:56 +02:00
import asyncio
from base64 import b64decode , b64encode
2023-01-17 15:11:23 +02:00
from os import makedirs , path , sep
from random import choice
2023-02-14 17:25:56 +02:00
from typing import Tuple , Union
from requests import get , patch , post
2023-02-17 17:45:51 +02:00
from classes . exceptions import SubmissionUploadError
2023-02-14 17:25:56 +02:00
from modules . logger import logWrite
2023-01-17 15:11:23 +02:00
from modules . utils import configGet
2023-02-14 17:25:56 +02:00
async def authorize ( ) - > str :
2023-01-17 15:11:23 +02:00
makedirs ( configGet ( " cache " , " locations " ) , exist_ok = True )
if path . exists ( configGet ( " cache " , " locations " ) + sep + " api_access " ) is True :
with open ( configGet ( " cache " , " locations " ) + sep + " api_access " , " rb " ) as file :
token = b64decode ( file . read ( ) ) . decode ( " utf-8 " )
2023-02-14 17:25:56 +02:00
if get ( configGet ( " address " , " posting " , " api " ) + " /users/me/ " , headers = { " Authorization " : f " Bearer { token } " } ) . status_code == 200 :
2023-01-17 15:11:23 +02:00
return token
payload = {
" grant_type " : " password " ,
" scope " : " me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write " ,
" username " : configGet ( " username " , " posting " , " api " ) ,
" password " : configGet ( " password " , " posting " , " api " )
}
2023-02-14 17:25:56 +02:00
response = post ( configGet ( " address " , " posting " , " api " ) + " /token " , data = payload )
if response . status_code != 200 :
logWrite ( f ' Incorrect API credentials! Could not login into " { configGet ( " address " , " posting " , " api " ) } " using login " { configGet ( " username " , " posting " , " api " ) } " : HTTP { response . status_code } ' )
raise ValueError
2023-01-17 15:11:23 +02:00
with open ( configGet ( " cache " , " locations " ) + sep + " api_access " , " wb " ) as file :
2023-02-14 17:25:56 +02:00
file . write ( b64encode ( response . json ( ) [ " access_token " ] . encode ( " utf-8 " ) ) )
return response . json ( ) [ " access_token " ]
2023-01-17 15:11:23 +02:00
2023-02-14 17:25:56 +02:00
async def random_pic ( token : Union [ str , None ] = None ) - > Tuple [ str , str ] :
2023-01-17 15:11:23 +02:00
""" Returns random image id and filename from the queue.
### Returns:
* ` Tuple [ str , str ] ` : First value is an ID and the filename in the filesystem to be indexed .
"""
2023-02-17 12:51:38 +02:00
token = await authorize ( ) if token is None else token
2023-02-24 20:33:34 +02:00
logWrite ( f ' { configGet ( " address " , " posting " , " api " ) } /albums/ { configGet ( " album " , " posting " , " api " ) } /photos?q=&page_size= { configGet ( " page_size " , " posting " ) } &caption=queue ' )
resp = get ( f ' { configGet ( " address " , " posting " , " api " ) } /albums/ { configGet ( " album " , " posting " , " api " ) } /photos?q=&page_size= { configGet ( " page_size " , " posting " ) } &caption=queue ' , headers = { " Authorization " : f " Bearer { token } " } )
2023-02-14 17:25:56 +02:00
if resp . status_code != 200 :
logWrite ( f ' Could not get photos from album { configGet ( " album " , " posting " , " api " ) } : HTTP { resp . status_code } ' )
2023-02-24 20:33:34 +02:00
logWrite ( f ' Could not get photos from " { configGet ( " address " , " posting " , " api " ) } /albums/ { configGet ( " album " , " posting " , " api " ) } /photos?q=&page_size= { configGet ( " page_size " , " posting " ) } &caption=queue " using token " { token } " : HTTP { resp . status_code } ' , debug = True )
2023-02-14 17:25:56 +02:00
raise ValueError
if len ( resp . json ( ) [ " results " ] ) == 0 :
raise KeyError
pic = choice ( resp . json ( ) [ " results " ] )
2023-01-17 15:11:23 +02:00
return pic [ " id " ] , pic [ " filename " ]
2023-02-19 21:44:00 +02:00
async def upload_pic ( filepath : str , allow_duplicates : bool = False , token : Union [ str , None ] = None ) - > Tuple [ bool , list ] :
2023-02-17 12:51:38 +02:00
token = await authorize ( ) if token is None else token
2023-01-17 15:11:23 +02:00
try :
pic_name = path . basename ( filepath )
files = { ' file ' : ( pic_name , open ( filepath , ' rb ' ) , ' image/jpeg ' ) }
2023-02-19 21:44:00 +02:00
response = post ( f ' { configGet ( " address " , " posting " , " api " ) } /albums/ { configGet ( " album " , " posting " , " api " ) } /photos ' , params = { " caption " : " queue " , " compress " : False , " ignore_duplicates " : allow_duplicates } , headers = { " Authorization " : f " Bearer { token } " } , files = files )
2023-02-16 17:41:01 +02:00
if response . status_code != 200 and response . status_code != 409 :
logWrite ( f " Could not upload ' { filepath } ' to API: HTTP { response . status_code } with message ' { response . content } ' " )
2023-02-17 17:45:51 +02:00
raise SubmissionUploadError ( str ( filepath ) , response . status_code , response . content )
2023-01-17 15:11:23 +02:00
duplicates = [ ]
2023-02-16 17:41:01 +02:00
if " duplicates " in response . json ( ) :
2023-02-18 01:55:58 +02:00
for index , duplicate in enumerate ( response . json ( ) [ " duplicates " ] ) :
2023-02-19 21:31:08 +02:00
if response . json ( ) [ " access_token " ] is None :
duplicates . append ( f ' ` { duplicate [ " id " ] } `: \n { configGet ( " address_external " , " posting " , " api " ) } /photos/ { duplicate [ " id " ] } ' )
else :
duplicates . append ( f ' ` { duplicate [ " id " ] } `: \n { configGet ( " address_external " , " posting " , " api " ) } /token/photo/ { response . json ( ) [ " access_token " ] } ?id= { index } ' )
2023-01-17 15:11:23 +02:00
return True , duplicates
except :
return False , [ ]
2023-02-17 12:51:38 +02:00
async def find_pic ( name : str , caption : Union [ str , None ] = None , token : Union [ str , None ] = None ) - > Union [ dict , None ] :
token = await authorize ( ) if token is None else token
try :
response = get ( f ' { configGet ( " address " , " posting " , " api " ) } /albums/ { configGet ( " album " , " posting " , " api " ) } /photos ' , params = { " q " : name , " caption " : caption } , headers = { " Authorization " : f " Bearer { token } " } )
# logWrite(response.json())
if response . status_code != 200 :
return None
if len ( response . json ( ) [ " results " ] ) == 0 :
return None
return response . json ( ) [ " results " ]
except Exception as exp :
logWrite ( f " Could not find image with name ' { name } ' and caption ' { caption } ' due to: { exp } " )
return None
2023-01-17 15:11:23 +02:00
2023-02-17 12:51:38 +02:00
async def move_pic ( id : str , token : Union [ str , None ] = None ) - > bool :
token = await authorize ( ) if token is None else token
2023-01-17 15:11:23 +02:00
try :
patch ( f ' { configGet ( " address " , " posting " , " api " ) } /photos/ { id } ?caption=sent ' , headers = { " Authorization " : f " Bearer { token } " } )
return True
except :
return False
if __name__ == " __main__ " :
2023-02-14 17:25:56 +02:00
print ( asyncio . run ( authorize ( ) ) )