import asyncio from argparse import ArgumentParser from sys import exit from traceback import print_exc from libbot import config_get, config_set, sync from photosapi_client.api.default.album_create_albums_post import ( asyncio as album_create, ) from photosapi_client.api.default.login_for_access_token_token_post import ( asyncio as login, ) from photosapi_client.api.default.user_create_users_post import asyncio as user_create from photosapi_client.client import AuthenticatedClient, Client from photosapi_client.models.body_login_for_access_token_token_post import ( BodyLoginForAccessTokenTokenPost, ) from photosapi_client.models.body_user_create_users_post import BodyUserCreateUsersPost parser = ArgumentParser( prog="Telegram Poster", description="Bot for posting some of your stuff and also receiving submissions.", ) parser.add_argument("--create-user", action="store_true") parser.add_argument("--create-album", action="store_true") args = parser.parse_args() if args.create_user or args.create_album: unauthorized_client = Client( base_url=sync.config_get("address", "posting", "api"), timeout=5.0, verify_ssl=True, raise_on_unexpected_status=True, follow_redirects=False, ) async def cli_create_user() -> None: print( "To set up Photos API connection you need to create a new user.\nIf you have email confirmation enabled in your Photos API config - you need to use a real email that will get a confirmation code afterwards.", flush=True, ) username = input("Choose username for new Photos API user: ").strip() email = input(f"Choose email for user '{username}': ").strip() password = input(f"Choose password for user '{username}': ").strip() try: result_1 = await user_create( client=unauthorized_client, form_data=BodyUserCreateUsersPost( user=username, email=email, password=password ), ) # asyncio.run(create_user(username, email, password)) await config_set("username", username, "posting", "api") await config_set("password", password, "posting", "api") none = input( "Alright. If you have email confirmation enabled - please confirm registration by using the link in your email. After that press Enter. Otherwise just press Enter." ) except Exception as exp: print(f"Could not create a user due to {exp}", flush=True) print_exc() exit() if not args.create_album: print("You're done!", flush=True) exit() return None async def cli_create_album() -> None: print( "To use Photos API your user needs to have an album to store its data.\nThis wizard will help you to create a new album with its name and title.", flush=True, ) name = input("Choose a name for your album: ").strip() title = input(f"Choose a title for album '{name}': ").strip() try: login_token = await login( client=unauthorized_client, form_data=BodyLoginForAccessTokenTokenPost( grant_type="password", scope="me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write", username=await config_get("username", "posting", "api"), password=await config_get("password", "posting", "api"), ), ) client = AuthenticatedClient( base_url=await config_get("address", "posting", "api"), timeout=5.0, verify_ssl=True, raise_on_unexpected_status=True, token=login_token.access_token, follow_redirects=False, ) result_2 = await album_create(client=client, name=name, title=title) # asyncio.run(create_album(name, title)) await config_set("album", name, "posting", "api") except Exception as exp: print(f"Could not create an album due to {exp}", flush=True) print_exc() exit() print("You're done!", flush=True) exit() if args.create_user or args.create_album: loop = asyncio.get_event_loop() tasks = [] if args.create_user: loop.run_until_complete(asyncio.wait([loop.create_task(cli_create_user())])) if args.create_album: loop.run_until_complete(asyncio.wait([loop.create_task(cli_create_album())])) loop.close()