This repository has been archived on 2024-08-21. You can view files and clone it, but cannot push or open issues or pull requests.
TelegramPoster/plugins/commands/photos.py
Renovate 1ac5abd7bf Update dependency photosapi_client to v0.6.0 (#75)
This PR contains the following updates:

| Package | Update | Change |
|---|---|---|
| photosapi_client | minor | `==0.5.0` -> `==0.6.0` |

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined).

🚦 **Automerge**: Enabled.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box

---

This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate).
<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNi4zNS4wIiwidXBkYXRlZEluVmVyIjoiMzYuMzUuMCIsInRhcmdldEJyYW5jaCI6ImRldiJ9-->

Co-authored-by: profitroll <vozhd.kk@gmail.com>
Reviewed-on: #75
Co-authored-by: Renovate <renovate@git.end-play.xyz>
Co-committed-by: Renovate <renovate@git.end-play.xyz>
2024-03-19 23:25:03 +02:00

361 lines
11 KiB
Python

import asyncio
import logging
from glob import iglob
from io import BytesIO
from os import getcwd, makedirs, path, remove
from pathlib import Path
from shutil import disk_usage, rmtree
from traceback import format_exc
from uuid import uuid4
from zipfile import ZipFile
from convopyro import listen_message
from photosapi_client.errors import UnexpectedStatus
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.types import (
KeyboardButton,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
)
from ujson import loads
from classes.pyroclient import PyroClient
from modules.api_client import (
BodyPhotoUpload,
File,
client,
photo_delete,
photo_upload,
video_delete,
video_upload,
)
from modules.utils import USERS_WITH_CONTEXT, extract_and_save
logger = logging.getLogger(__name__)
@Client.on_message(~filters.scheduled & filters.command(["import"], prefixes=["", "/"]))
async def cmd_import(app: PyroClient, message: Message):
if message.from_user.id not in app.admins:
return
global USERS_WITH_CONTEXT
if message.from_user.id not in USERS_WITH_CONTEXT:
USERS_WITH_CONTEXT.append(message.from_user.id)
else:
return
user = await app.find_user(message.from_user)
await message.reply_text(app._("import_request", "message", locale=user.locale))
answer = await listen_message(app, message.chat.id, timeout=600)
USERS_WITH_CONTEXT.remove(message.from_user.id)
if answer is None:
await message.reply_text(
app._("import_ignored", "message", locale=user.locale),
quote=True,
)
return
if answer.text == "/cancel":
await answer.reply_text(app._("import_abort", "message", locale=user.locale))
return
if answer.document is None:
await answer.reply_text(
app._(
"import_invalid_media",
"message",
locale=user.locale,
),
quote=True,
)
return
if answer.document.mime_type != "application/zip":
await answer.reply_text(
app._("import_invalid_mime", "message", locale=user.locale),
quote=True,
)
return
if disk_usage(getcwd())[2] < (answer.document.file_size) * 3:
await message.reply_text(
app._("import_too_big", "message", locale=user.locale).format(
answer.document.file_size // (2**30),
disk_usage(getcwd())[2] // (2**30),
)
)
return
tmp_dir = str(uuid4())
logging.info(
"Importing '%s' file %s bytes big (TMP ID %s)",
answer.document.file_name,
answer.document.file_size,
tmp_dir,
)
makedirs(Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), exist_ok=True)
tmp_path = Path(f"{app.config['locations']['tmp']}/{answer.document.file_id}")
downloading = await answer.reply_text(
app._("import_downloading", "message", locale=user.locale),
quote=True,
)
await app.download_media(answer, file_name=str(tmp_path))
await downloading.edit(app._("import_unpacking", "message", locale=user.locale))
try:
with ZipFile(tmp_path, "r") as handle:
tasks = [
extract_and_save(
handle, name, Path(f"{app.config['locations']['tmp']}/{tmp_dir}")
)
for name in handle.namelist()
]
_ = await asyncio.gather(*tasks)
except Exception as exc:
logger.error(
"Could not import '%s' due to %s: %s",
answer.document.file_name,
exc,
format_exc(),
)
await answer.reply_text(
app._("import_unpack_error", "message", locale=user.locale).format(
exc, format_exc()
)
)
return
logger.info("Downloaded '%s' - awaiting upload", answer.document.file_name)
await downloading.edit(app._("import_uploading", "message", locale=user.locale))
remove(tmp_path)
for filename in iglob(
str(Path(f"{app.config['locations']['tmp']}/{tmp_dir}")) + "**/**",
recursive=True,
):
if not path.isfile(filename):
continue
with open(str(filename), "rb") as fh:
photo_bytes = BytesIO(fh.read())
try:
# VIDEO SUPPORT IS PLANNED HERE TOO
uploaded = await photo_upload(
app.config["posting"]["api"]["album"],
client=client,
body=BodyPhotoUpload(
File(photo_bytes, Path(filename).name, "image/jpeg")
),
ignore_duplicates=app.config["submission"]["allow_duplicates"],
compress=False,
caption="queue",
)
except UnexpectedStatus as exc:
logger.error(
"Could not upload '%s' from '%s': %s",
filename,
Path(f"{app.config['locations']['tmp']}/{tmp_dir}"),
exc,
)
await message.reply_text(
app._(
"import_upload_error_other",
"message",
locale=user.locale,
).format(path.basename(filename)),
disable_notification=True,
)
continue
uploaded_dict = loads(uploaded.content.decode("utf-8"))
if "duplicates" in uploaded_dict:
logger.warning(
"Could not upload '%s' from '%s'. Duplicates: %s",
filename,
Path(f"{app.config['locations']['tmp']}/{tmp_dir}"),
str(uploaded_dict["duplicates"]),
)
if len(uploaded_dict["duplicates"]) > 0:
await message.reply_text(
app._(
"import_upload_error_duplicate",
"message",
locale=user.locale,
).format(path.basename(filename)),
disable_notification=True,
)
else:
await message.reply_text(
app._(
"import_upload_error_other",
"message",
locale=user.locale,
).format(path.basename(filename)),
disable_notification=True,
)
else:
logger.info(
"Uploaded '%s' from '%s' and got ID %s",
filename,
Path(f"{app.config['locations']['tmp']}/{tmp_dir}"),
uploaded.parsed.id,
)
await downloading.delete()
logger.info(
"Removing '%s' after uploading",
Path(f"{app.config['locations']['tmp']}/{tmp_dir}"),
)
rmtree(Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), ignore_errors=True)
await answer.reply_text(
app._("import_finished", "message", locale=user.locale),
quote=True,
)
return
@Client.on_message(~filters.scheduled & filters.command(["export"], prefixes=["", "/"]))
async def cmd_export(app: PyroClient, message: Message):
if message.from_user.id not in app.admins:
return
@Client.on_message(~filters.scheduled & filters.command(["remove"], prefixes=["", "/"]))
async def cmd_remove(app: PyroClient, message: Message):
if message.from_user.id not in app.admins:
return
global USERS_WITH_CONTEXT
if message.from_user.id not in USERS_WITH_CONTEXT:
USERS_WITH_CONTEXT.append(message.from_user.id)
else:
return
user = await app.find_user(message.from_user)
await message.reply_text(app._("remove_request", "message", locale=user.locale))
answer_id = await app.listen.Message(
filters.text & ~filters.me, id=filters.user(message.from_user.id), timeout=600
)
USERS_WITH_CONTEXT.remove(message.from_user.id)
if answer_id is None:
await message.reply_text(
app._("remove_ignored", "message", locale=user.locale),
quote=True,
)
return
if answer_id.text == "/cancel":
await answer_id.reply_text(app._("remove_abort", "message", locale=user.locale))
return
await message.reply_text(
app._("remove_kind", "message", locale=user.locale),
reply_markup=ReplyKeyboardMarkup(
[
[
KeyboardButton(app._("photo", "button", locale=user.locale)),
KeyboardButton(app._("video", "button", locale=user.locale)),
]
],
resize_keyboard=True,
one_time_keyboard=True,
),
)
USERS_WITH_CONTEXT.append(message.from_user.id)
answer_kind = await app.listen.Message(
filters.text & ~filters.me, id=filters.user(message.from_user.id), timeout=600
)
USERS_WITH_CONTEXT.remove(message.from_user.id)
if answer_kind is None:
await message.reply_text(
app._("remove_ignored", "message", locale=user.locale),
quote=True,
reply_markup=ReplyKeyboardRemove(),
)
return
if answer_kind.text == "/cancel":
await answer_kind.reply_text(
app._("remove_abort", "message", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
if answer_kind.text in app.in_all_locales("photo", "button"):
func = photo_delete
elif answer_kind.text in app.in_all_locales("video", "button"):
func = video_delete
else:
await answer_kind.reply_text(
app._("remove_unknown", "message", locale=user.locale).format(
app._("photo", "button", locale=user.locale),
app._("video", "button", locale=user.locale),
),
reply_markup=ReplyKeyboardRemove(),
)
return
response = await func(id=answer_id.text, client=client)
if response is None:
logger.info(
"Removed %s '%s' by request of user %s",
answer_kind.text,
answer_id.text,
answer_id.from_user.id,
)
await answer_kind.reply_text(
app._("remove_success", "message", locale=user.locale).format(
answer_id.text
),
reply_markup=ReplyKeyboardRemove(),
)
else:
logger.warning(
"Could not remove %s '%s' by request of user %s",
answer_kind.text,
answer_id.text,
answer_id.from_user.id,
)
await answer_kind.reply_text(
app._("remove_failure", "message", locale=user.locale).format(
answer_id.text
),
reply_markup=ReplyKeyboardRemove(),
)
@Client.on_message(~filters.scheduled & filters.command(["purge"], prefixes=["", "/"]))
async def cmd_purge(app: PyroClient, message: Message):
if message.from_user.id not in app.admins:
return