From 0af5be57ac52b6eaa63bb0f3875d58fb090a00d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Abdulkadir=20Furkan=20=C5=9Eanl=C4=B1?= Date: Tue, 23 Jan 2024 17:13:01 +0100 Subject: [PATCH] Add backfill (DANGER), ensure message timestamp's Monday is used, only respond to recent !pow command. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Abdulkadir Furkan Şanlı --- main.py | 92 ++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 74 insertions(+), 18 deletions(-) diff --git a/main.py b/main.py index d196ae2..68f51c4 100755 --- a/main.py +++ b/main.py @@ -1,17 +1,19 @@ #!/usr/bin/env python3 """parkerbot: Matrix bot to generate YouTube (music) playlists from links sent to a channel.""" +import argparse import asyncio import os import pickle import re import sqlite3 -import sys +import time from datetime import datetime, timedelta from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build +from googleapiclient import errors from nio import AsyncClient, RoomMessageText, SyncResponse DATA_DIR = os.getenv("DATA_DIR", "./") @@ -33,6 +35,19 @@ conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Matrix bot to generate YouTube (music) playlists from links sent to a channel." + ) + parser.add_argument( + "--backwards-sync", + action="store_true", + help="Run backwards sync on start (this may cause you to exceed your daily API quota).", + ) + return parser.parse_args() + + def define_tables(): """Define tables for use with program.""" with conn: @@ -87,10 +102,10 @@ def get_authenticated_service(): return build("youtube", "v3", credentials=credentials) -def get_monday_date(): - """Get Monday of current week. Weeks start on Monday.""" - today = datetime.now() - return today - timedelta(days=today.weekday()) +def get_monday_date(timestamp): + """Get Monday of the week for the given timestamp. Weeks start on Monday.""" + date = datetime.utcfromtimestamp(timestamp / 1000) + return date - timedelta(days=date.weekday()) def make_playlist(youtube, title): @@ -136,17 +151,25 @@ def get_or_make_playlist(youtube, monday_date): return playlist_id -def add_video_to_playlist(youtube, playlist_id, video_id): +def add_video_to_playlist(youtube, playlist_id, video_id, retry_count=3): """Add video to playlist.""" - youtube.playlistItems().insert( - part="snippet", - body={ - "snippet": { - "playlistId": playlist_id, - "resourceId": {"kind": "youtube#video", "videoId": video_id}, - } - }, - ).execute() + for attempt in range(retry_count): + try: + youtube.playlistItems().insert( + part="snippet", + body={ + "snippet": { + "playlistId": playlist_id, + "resourceId": {"kind": "youtube#video", "videoId": video_id}, + } + }, + ).execute() + break + except errors.HttpError as error: + if attempt < retry_count - 1: + time.sleep(2**attempt) + continue + raise error def is_music(youtube, video_id): @@ -165,12 +188,17 @@ async def message_callback(client, room, event): body = event.body timestamp = event.server_timestamp room_id = room.room_id - monday_date = get_monday_date() + monday_date = get_monday_date(timestamp) youtube = get_authenticated_service() playlist_id = get_or_make_playlist(youtube, monday_date) youtube_links = re.findall(youtube_link_pattern, body) - if body == "!pow": + timestamp_sec = datetime.utcfromtimestamp( + event.server_timestamp / 1000 + ) # milisec to sec + current_time = datetime.utcnow() + + if body == "!pow" and current_time - timestamp_sec < timedelta(seconds=30): playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}" reply_msg = f"{sender}, here's the playlist of the week: {playlist_link}" await client.room_send( @@ -243,15 +271,43 @@ async def get_client(): ) client.add_response_callback(sync_callback, SyncResponse) print(await client.login(MATRIX_PASSWORD)) - await client.join(MATRIX_ROOM) return client +async def backwards_sync(client, room, start_token): + """Fetch and process historical messages from a given room.""" + print("Starting to process channel log...") + from_token = start_token + room_id = room.room_id + while True: + # Fetch room messages + response = await client.room_messages(room_id, from_token, direction="b") + + # Process each message + for event in response.chunk: + if isinstance(event, RoomMessageText): + await message_callback(client, room, event) + + # Break if there are no more messages to fetch + if not response.end or response.end == from_token: + break + + # Update the from_token for the next iteration + from_token = response.end + + async def main(): """Get DB and Matrix client ready, and start syncing.""" + args = parse_arguments() define_tables() client = await get_client() sync_token = load_sync_token() + + if args.backwards_sync: + init_sync = await client.sync(30000) + room = await client.room_resolve_alias(MATRIX_ROOM) + await backwards_sync(client, room, init_sync.next_batch) + await client.sync_forever(30000, full_state=True, since=sync_token)