2023-07-03 12:27:15 +03:00
import asyncio
from argparse import ArgumentParser
from sys import exit
from traceback import print_exc
from libbot import config_get , config_set , sync
from photosapi_client . api . default . album_create_albums_post import (
asyncio as album_create ,
)
from photosapi_client . api . default . login_for_access_token_token_post import (
asyncio as login ,
)
from photosapi_client . api . default . user_create_users_post import asyncio as user_create
from photosapi_client . client import AuthenticatedClient , Client
from photosapi_client . models . body_login_for_access_token_token_post import (
BodyLoginForAccessTokenTokenPost ,
)
from photosapi_client . models . body_user_create_users_post import BodyUserCreateUsersPost
parser = ArgumentParser (
prog = " Telegram Poster " ,
description = " Bot for posting some of your stuff and also receiving submissions. " ,
)
parser . add_argument ( " --create-user " , action = " store_true " )
parser . add_argument ( " --create-album " , action = " store_true " )
args = parser . parse_args ( )
2023-07-03 15:42:55 +03:00
if args . create_user or args . create_album :
unauthorized_client = Client (
base_url = sync . config_get ( " address " , " posting " , " api " ) ,
timeout = 5.0 ,
verify_ssl = True ,
raise_on_unexpected_status = True ,
follow_redirects = False ,
)
2023-07-03 12:27:15 +03:00
async def cli_create_user ( ) - > None :
print (
" To set up Photos API connection you need to create a new user. \n If you have email confirmation enabled in your Photos API config - you need to use a real email that will get a confirmation code afterwards. " ,
flush = True ,
)
username = input ( " Choose username for new Photos API user: " ) . strip ( )
email = input ( f " Choose email for user ' { username } ' : " ) . strip ( )
password = input ( f " Choose password for user ' { username } ' : " ) . strip ( )
try :
result_1 = await user_create (
client = unauthorized_client ,
form_data = BodyUserCreateUsersPost (
user = username , email = email , password = password
) ,
)
# asyncio.run(create_user(username, email, password))
await config_set ( " username " , username , " posting " , " api " )
await config_set ( " password " , password , " posting " , " api " )
none = input (
" Alright. If you have email confirmation enabled - please confirm registration by using the link in your email. After that press Enter. Otherwise just press Enter. "
)
2023-08-16 14:27:23 +03:00
except Exception as exc :
print ( f " Could not create a user due to { exc } " , flush = True )
2023-07-03 12:27:15 +03:00
print_exc ( )
exit ( )
if not args . create_album :
print ( " You ' re done! " , flush = True )
exit ( )
return None
async def cli_create_album ( ) - > None :
print (
" To use Photos API your user needs to have an album to store its data. \n This wizard will help you to create a new album with its name and title. " ,
flush = True ,
)
name = input ( " Choose a name for your album: " ) . strip ( )
title = input ( f " Choose a title for album ' { name } ' : " ) . strip ( )
try :
login_token = await login (
client = unauthorized_client ,
form_data = BodyLoginForAccessTokenTokenPost (
grant_type = " password " ,
scope = " me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write " ,
username = await config_get ( " username " , " posting " , " api " ) ,
password = await config_get ( " password " , " posting " , " api " ) ,
) ,
)
client = AuthenticatedClient (
base_url = await config_get ( " address " , " posting " , " api " ) ,
timeout = 5.0 ,
verify_ssl = True ,
raise_on_unexpected_status = True ,
token = login_token . access_token ,
follow_redirects = False ,
)
result_2 = await album_create ( client = client , name = name , title = title )
# asyncio.run(create_album(name, title))
await config_set ( " album " , name , " posting " , " api " )
2023-08-16 14:27:23 +03:00
except Exception as exc :
print ( f " Could not create an album due to { exc } " , flush = True )
2023-07-03 12:27:15 +03:00
print_exc ( )
exit ( )
print ( " You ' re done! " , flush = True )
exit ( )
if args . create_user or args . create_album :
loop = asyncio . get_event_loop ( )
tasks = [ ]
if args . create_user :
loop . run_until_complete ( asyncio . wait ( [ loop . create_task ( cli_create_user ( ) ) ] ) )
if args . create_album :
loop . run_until_complete ( asyncio . wait ( [ loop . create_task ( cli_create_album ( ) ) ] ) )
loop . close ( )