#!/usr/bin/env python3 """ParkerBot""" import argparse import asyncio import datetime import os import pickle import re import sqlite3 import time from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient import errors from nio import AsyncClient, RoomMessageText, SyncResponse, UploadResponse DATA_DIR = os.getenv("DATA_DIR", "./") DB_PATH = os.path.join(DATA_DIR, "parkerbot.sqlite3") PICKLE_PATH = os.path.join(DATA_DIR, "token.pickle") TOKEN_PATH = os.path.join(DATA_DIR, "sync_token") MATRIX_SERVER = os.getenv("MATRIX_SERVER") MATRIX_ROOM = os.getenv("MATRIX_ROOM") MATRIX_USER = os.getenv("MATRIX_USER") MATRIX_PASSWORD = os.getenv("MATRIX_PASSWORD") YOUTUBE_CLIENT_SECRETS_FILE = os.getenv("YOUTUBE_CLIENT_SECRETS_FILE") YOUTUBE_PLAYLIST_TITLE = os.getenv("YOUTUBE_PLAYLIST_TITLE") def connect_db(): """Connect to DB and return connection and cursor.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() return conn, cursor def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description=( "Matrix bot to generate YouTube (music) playlists from links sent " "to a channel." ) ) parser.add_argument( "--backwards-sync", action="store_true", help=( "Run backwards sync on start. This most probably will cause you to " "exceed your YouTube daily API quota, and other hidden YouTube rate" " limits." ), ) return parser.parse_args() def define_tables(conn, cursor): """Define tables for use with program.""" with conn: cursor.execute( """CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT, message TEXT, timestamp DATETIME, UNIQUE (sender, message, timestamp))""" ) cursor.execute( """CREATE TABLE IF NOT EXISTS playlists ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, playlist_id TEXT UNIQUE, creation_date DATE)""" ) cursor.execute( # TODO: Write migration script to add video_id. """CREATE TABLE IF NOT EXISTS playlist_tracks ( id INTEGER PRIMARY KEY AUTOINCREMENT, playlist_id INTEGER, message_id INTEGER, video_id TEXT, FOREIGN KEY (playlist_id) REFERENCES playlists(id), FOREIGN KEY (message_id) REFERENCES messages(id), UNIQUE (playlist_id, message_id))""" ) def get_authenticated_service(): """Get an authentivated YouTube service.""" credentials = None # Stores the user's access and refresh tokens. if os.path.exists(PICKLE_PATH): with open(PICKLE_PATH, "rb") as token: credentials = pickle.load(token) # If there are no valid credentials available, let the user log in. if not credentials or not credentials.valid: if credentials and credentials.expired and credentials.refresh_token: credentials.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( YOUTUBE_CLIENT_SECRETS_FILE, scopes=["https://www.googleapis.com/auth/youtube.force-ssl"], ) credentials = flow.run_local_server(port=8080) # Save the credentials for the next run with open(PICKLE_PATH, "wb") as token: pickle.dump(credentials, token) return build("youtube", "v3", credentials=credentials) def monday_date(timestamp): """Return Monday of week for given timestamp. Weeks start on Monday.""" date = datetime.datetime.fromtimestamp(timestamp / 1000, datetime.UTC) return date - datetime.timedelta(days=date.weekday()) def make_playlist(youtube, title): """Make a playlist with given title.""" response = ( youtube.playlists() .insert( part="snippet,status", body={ "snippet": { "title": title, "description": "Weekly playlist generated by ParkerBot", }, "status": {"privacyStatus": "public"}, }, ) .execute() ) return response["id"] def get_or_make_playlist(conn, cursor, youtube, playlist_date): """Get ID of playlist with given named suffix, make if doesn't exist.""" title = f"{YOUTUBE_PLAYLIST_TITLE} {playlist_date.strftime('%Y-%m-%d')}" cursor.execute("SELECT playlist_id FROM playlists WHERE title = ?", (title,)) row = cursor.fetchone() if row: return row[0] playlist_id = make_playlist(youtube, title) with conn: cursor.execute( # TODO: https://docs.python.org/3/library/sqlite3.html#default-adapters-and-converters-deprecated "INSERT INTO playlists (title, playlist_id, creation_date) VALUES (?, ?, ?)", (title, playlist_id, playlist_date), ) return playlist_id def add_video_to_playlist(youtube, playlist_id, video_id, retry_count=6): """Add video to playlist.""" for attempt in range(retry_count): try: youtube.playlistItems().insert( part="snippet", body={ "snippet": { "playlistId": playlist_id, "resourceId": {"kind": "youtube#video", "videoId": video_id}, } }, ).execute() break except errors.HttpError as error: if attempt < retry_count - 1: time.sleep(2**attempt) continue raise error def is_music(youtube, video_id): """Check whether a YouTube video is music.""" video_details = youtube.videos().list(id=video_id, part="snippet").execute() # Check if the video category is Music (typically category ID 10) return video_details["items"][0]["snippet"]["categoryId"] in ( "10", # music "24", # entertainment ) async def send_intro_message(client, sender, room_id): """Sends introduction message in reply to sender, in room with room_id.""" intro_message = ( f"Hi {sender}, I'm ParkerBot! I generate YouTube playlists from links " "sent to this channel. You can find my source code here: " "https://git.abdulocra.cy/abdulocracy/parkerbot" ) await client.room_send( room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": intro_message}, ) # TODO: Figure out how to properly send GIF, this is broken as shit. with open("./parker.gif", "rb") as gif_file: response = await client.upload(gif_file, content_type="image/gif") if isinstance(response, UploadResponse): print("Image was uploaded successfully to server. ") gif_uri = response.content_uri await client.room_send( room_id=room_id, message_type="m.room.message", content={ "msgtype": "m.image", "url": gif_uri, "body": "parker.gif", "info": {"mimetype": "image/gif"}, }, ) else: print(f"Failed to upload image. Failure response: {response}") async def send_playlist_of_week(client, sender, room_id, playlist_id): """Sends playlist of the week in reply to sender, in room with room_id.""" playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}" reply_msg = f"{sender}, here's the playlist of the week: {playlist_link}" await client.room_send( room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": reply_msg}, ) async def send_playlist_of_all(client, sender, room_id, playlist_id): """Sends playlist of all time in reply to sender, in room with room_id.""" playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}" reply_msg = f"{sender}, here's the playlist of all time: {playlist_link}" await client.room_send( room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": reply_msg}, ) async def message_callback(conn, cursor, youtube, client, room, event): """Event handler for received messages.""" sender = event.sender if sender != MATRIX_USER: body = event.body.strip() timestamp = event.server_timestamp playlist_id = get_or_make_playlist( conn, cursor, youtube, monday_date(timestamp) ) all_playlist_id = get_or_make_playlist( conn, cursor, youtube, datetime.datetime.fromtimestamp(0) ) timestamp_sec = datetime.datetime.fromtimestamp( event.server_timestamp / 1000, datetime.UTC # millisec to sec ) current_time = datetime.datetime.now(datetime.UTC) recent = current_time - timestamp_sec < datetime.timedelta(seconds=30) if body == "!parkerbot" and recent: await send_intro_message(client, sender, room.room_id) return if body == "!week" and recent: await send_playlist_of_week(client, sender, room.room_id, playlist_id) return if body == "!all" and recent: await send_playlist_of_all(client, sender, room.room_id, all_playlist_id) return youtube_link_pattern = ( r"(https?://(?:www\.|music\.)?youtube\.com/(?!playlist\?list=)watch" r"\?v=[\w-]+|https?://youtu\.be/[\w-]+)" ) youtube_links = re.findall(youtube_link_pattern, body) for link in youtube_links: video_id = link.split("v=")[-1].split("&")[0].split("/")[-1] if is_music(youtube, video_id): message_id = record_message(conn, cursor, sender, link, timestamp) if in_playlist(cursor, video_id, playlist_id): print(f"Track is already in this week's playlist: {link}") else: # Add video to playlists and record it in the database add_video_to_playlist(youtube, playlist_id, video_id) add_video_to_playlist(youtube, all_playlist_id, video_id) with conn: cursor.execute( ( "INSERT INTO playlist_tracks (playlist_id, message_id, video_id) " "VALUES (?, ?, ?)" ), (playlist_id, message_id, video_id), ) print(f"Added track to this week's playlist: {link}") def in_playlist(cursor, video_id, playlist_id): """Checks if video is in playlist.""" cursor.execute( "SELECT id FROM playlist_tracks WHERE video_id = ? AND playlist_id = ?", (video_id, playlist_id), ) if cursor.fetchone(): return True return False def record_message(conn, cursor, sender, link, timestamp): """Records message to messages table in DB, returns ID.""" try: with conn: cursor.execute( "INSERT INTO messages (sender, message, timestamp) VALUES (?, ?, ?)", (sender, link, timestamp), ) print(f"Saved message: {sender} {link} {timestamp}") except sqlite3.IntegrityError as e: if "UNIQUE constraint failed" in str(e): print(f"Entry already exists: {sender} {link} {timestamp}") else: raise e cursor.execute( "SELECT id FROM messages WHERE sender = ? AND message = ? AND timestamp = ?", (sender, link, timestamp), ) return cursor.fetchone()[0] async def sync_callback(response): """Saves Matrix sync token.""" with open(TOKEN_PATH, "w", encoding="utf-8") as f: f.write(response.next_batch) def load_sync_token(): """Gets saved Matrix sync token if it exists.""" try: with open(TOKEN_PATH, "r", encoding="utf-8") as file: return file.read().strip() except FileNotFoundError: return None async def get_client(conn, cursor, youtube): """Returns configured and logged in Matrix client.""" client = AsyncClient(MATRIX_SERVER, MATRIX_USER) client.add_event_callback( lambda room, event: message_callback( conn, cursor, youtube, client, room, event ), RoomMessageText, ) client.add_response_callback(sync_callback, SyncResponse) print(await client.login(MATRIX_PASSWORD)) return client async def backwards_sync(conn, cursor, youtube, client, room, start_token): """Fetch and process historical messages from a given room.""" print("Starting to process channel log...") from_token = start_token room_id = room.room_id while True: # Fetch room messages response = await client.room_messages(room_id, from_token, direction="b") # Process each message for event in response.chunk: if isinstance(event, RoomMessageText): await message_callback(conn, cursor, youtube, client, room, event) # Break if there are no more messages to fetch if not response.end or response.end == from_token: break # Update the from_token for the next iteration from_token = response.end async def main(): """Get DB and Matrix client ready, and start syncing.""" args = parse_arguments() conn, cursor = connect_db() define_tables(conn, cursor) youtube = get_authenticated_service() client = await get_client(conn, cursor, youtube) sync_token = load_sync_token() # This is incredibly dumb and most probably will exceed your YouTube API quota. if args.backwards_sync: init_sync = await client.sync(30000) room = await client.room_resolve_alias(MATRIX_ROOM) await backwards_sync(conn, cursor, youtube, client, room, init_sync.next_batch) await client.sync_forever(30000, full_state=True, since=sync_token) if __name__ == "__main__": asyncio.run(main())