This repository has been archived on 2024-08-21. You can view files and clone it, but cannot push or open issues or pull requests.
TelegramPoster/modules/cli.py

119 lines
4.4 KiB
Python
Raw Permalink Normal View History

2023-07-03 12:27:15 +03:00
import asyncio
from argparse import ArgumentParser
from sys import exit
from traceback import print_exc
from libbot import config_get, config_set, sync
from photosapi_client.api.default.album_create_albums_post import (
asyncio as album_create,
)
from photosapi_client.api.default.login_for_access_token_token_post import (
asyncio as login,
)
from photosapi_client.api.default.user_create_users_post import asyncio as user_create
from photosapi_client.client import AuthenticatedClient, Client
from photosapi_client.models.body_login_for_access_token_token_post import (
BodyLoginForAccessTokenTokenPost,
)
from photosapi_client.models.body_user_create_users_post import BodyUserCreateUsersPost
parser = ArgumentParser(
prog="Telegram Poster",
description="Bot for posting some of your stuff and also receiving submissions.",
)
parser.add_argument("--create-user", action="store_true")
parser.add_argument("--create-album", action="store_true")
args = parser.parse_args()
if args.create_user or args.create_album:
unauthorized_client = Client(
base_url=sync.config_get("address", "posting", "api"),
timeout=5.0,
verify_ssl=True,
raise_on_unexpected_status=True,
follow_redirects=False,
)
2023-07-03 12:27:15 +03:00
async def cli_create_user() -> None:
print(
"To set up Photos API connection you need to create a new user.\nIf you have email confirmation enabled in your Photos API config - you need to use a real email that will get a confirmation code afterwards.",
flush=True,
)
username = input("Choose username for new Photos API user: ").strip()
email = input(f"Choose email for user '{username}': ").strip()
password = input(f"Choose password for user '{username}': ").strip()
try:
result_1 = await user_create(
client=unauthorized_client,
form_data=BodyUserCreateUsersPost(
user=username, email=email, password=password
),
)
# asyncio.run(create_user(username, email, password))
await config_set("username", username, "posting", "api")
await config_set("password", password, "posting", "api")
none = input(
"Alright. If you have email confirmation enabled - please confirm registration by using the link in your email. After that press Enter. Otherwise just press Enter."
)
except Exception as exp:
print(f"Could not create a user due to {exp}", flush=True)
print_exc()
exit()
if not args.create_album:
print("You're done!", flush=True)
exit()
return None
async def cli_create_album() -> None:
print(
"To use Photos API your user needs to have an album to store its data.\nThis wizard will help you to create a new album with its name and title.",
flush=True,
)
name = input("Choose a name for your album: ").strip()
title = input(f"Choose a title for album '{name}': ").strip()
try:
login_token = await login(
client=unauthorized_client,
form_data=BodyLoginForAccessTokenTokenPost(
grant_type="password",
scope="me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write",
username=await config_get("username", "posting", "api"),
password=await config_get("password", "posting", "api"),
),
)
client = AuthenticatedClient(
base_url=await config_get("address", "posting", "api"),
timeout=5.0,
verify_ssl=True,
raise_on_unexpected_status=True,
token=login_token.access_token,
follow_redirects=False,
)
result_2 = await album_create(client=client, name=name, title=title)
# asyncio.run(create_album(name, title))
await config_set("album", name, "posting", "api")
except Exception as exp:
print(f"Could not create an album due to {exp}", flush=True)
print_exc()
exit()
print("You're done!", flush=True)
exit()
if args.create_user or args.create_album:
loop = asyncio.get_event_loop()
tasks = []
if args.create_user:
loop.run_until_complete(asyncio.wait([loop.create_task(cli_create_user())]))
if args.create_album:
loop.run_until_complete(asyncio.wait([loop.create_task(cli_create_album())]))
loop.close()