parkerbot/main.py
2024-01-24 22:16:31 +01:00

390 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""ParkerBot"""
import argparse
import asyncio
import datetime
import os
import pickle
import re
import sqlite3
import time
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient import errors
from nio import AsyncClient, RoomMessageText, SyncResponse, UploadResponse
DATA_DIR = os.getenv("DATA_DIR", "./")
DB_PATH = os.path.join(DATA_DIR, "parkerbot.sqlite3")
PICKLE_PATH = os.path.join(DATA_DIR, "token.pickle")
TOKEN_PATH = os.path.join(DATA_DIR, "sync_token")
MATRIX_SERVER = os.getenv("MATRIX_SERVER")
MATRIX_ROOM = os.getenv("MATRIX_ROOM")
MATRIX_USER = os.getenv("MATRIX_USER")
MATRIX_PASSWORD = os.getenv("MATRIX_PASSWORD")
YOUTUBE_CLIENT_SECRETS_FILE = os.getenv("YOUTUBE_CLIENT_SECRETS_FILE")
YOUTUBE_PLAYLIST_TITLE = os.getenv("YOUTUBE_PLAYLIST_TITLE")
def connect_db():
"""Connect to DB and return connection and cursor."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
return conn, cursor
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description=(
"Matrix bot to generate YouTube (music) playlists from links sent "
"to a channel."
)
)
parser.add_argument(
"--backwards-sync",
action="store_true",
help=(
"Run backwards sync on start. This most probably will cause you to "
"exceed your YouTube daily API quota, and other hidden YouTube rate"
" limits."
),
)
return parser.parse_args()
def define_tables(conn, cursor):
"""Define tables for use with program."""
with conn:
cursor.execute(
"""CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT,
message TEXT,
timestamp DATETIME,
UNIQUE (sender, message, timestamp))"""
)
cursor.execute(
"""CREATE TABLE IF NOT EXISTS playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
playlist_id TEXT UNIQUE,
creation_date DATE)"""
)
cursor.execute( # TODO: Write migration script to add video_id.
"""CREATE TABLE IF NOT EXISTS playlist_tracks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
playlist_id INTEGER,
message_id INTEGER,
video_id TEXT,
FOREIGN KEY (playlist_id) REFERENCES playlists(id),
FOREIGN KEY (message_id) REFERENCES messages(id),
UNIQUE (playlist_id, message_id))"""
)
def get_authenticated_service():
"""Get an authentivated YouTube service."""
credentials = None
# Stores the user's access and refresh tokens.
if os.path.exists(PICKLE_PATH):
with open(PICKLE_PATH, "rb") as token:
credentials = pickle.load(token)
# If there are no valid credentials available, let the user log in.
if not credentials or not credentials.valid:
if credentials and credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
YOUTUBE_CLIENT_SECRETS_FILE,
scopes=["https://www.googleapis.com/auth/youtube.force-ssl"],
)
credentials = flow.run_local_server(port=8080)
# Save the credentials for the next run
with open(PICKLE_PATH, "wb") as token:
pickle.dump(credentials, token)
return build("youtube", "v3", credentials=credentials)
def monday_date(timestamp):
"""Return Monday of week for given timestamp. Weeks start on Monday."""
date = datetime.datetime.fromtimestamp(timestamp / 1000, datetime.UTC)
return date - datetime.timedelta(days=date.weekday())
def make_playlist(youtube, title):
"""Make a playlist with given title."""
response = (
youtube.playlists()
.insert(
part="snippet,status",
body={
"snippet": {
"title": title,
"description": "Weekly playlist generated by ParkerBot",
},
"status": {"privacyStatus": "public"},
},
)
.execute()
)
return response["id"]
def get_or_make_playlist(conn, cursor, youtube, playlist_date):
"""Get ID of playlist for given Monday's week, make if doesn't exist."""
title = f"{YOUTUBE_PLAYLIST_TITLE} {playlist_date.strftime('%Y-%m-%d')}"
cursor.execute("SELECT playlist_id FROM playlists WHERE title = ?", (title,))
row = cursor.fetchone()
if row:
return row[0]
playlist_id = make_playlist(youtube, title)
with conn:
cursor.execute( # TODO: https://docs.python.org/3/library/sqlite3.html#default-adapters-and-converters-deprecated
"INSERT INTO playlists (title, playlist_id, creation_date) VALUES (?, ?, ?)",
(title, playlist_id, playlist_date),
)
return playlist_id
def add_video_to_playlist(youtube, playlist_id, video_id, retry_count=6):
"""Add video to playlist."""
for attempt in range(retry_count):
try:
youtube.playlistItems().insert(
part="snippet",
body={
"snippet": {
"playlistId": playlist_id,
"resourceId": {"kind": "youtube#video", "videoId": video_id},
}
},
).execute()
break
except errors.HttpError as error:
if attempt < retry_count - 1:
time.sleep(2**attempt)
continue
raise error
def is_music(youtube, video_id):
"""Check whether a YouTube video is music."""
video_details = youtube.videos().list(id=video_id, part="snippet").execute()
# Check if the video category is Music (typically category ID 10)
return video_details["items"][0]["snippet"]["categoryId"] in (
"10", # music
"24", # entertainment
)
async def send_intro_message(client, sender, room_id):
"""Sends introduction message in reply to sender, in room with room_id."""
intro_message = (
f"Hi {sender}, I'm ParkerBot! I generate YouTube playlists from links "
"sent to this channel. You can find my source code here: "
"https://git.abdulocra.cy/abdulocracy/parkerbot"
)
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": intro_message},
)
# TODO: Figure out how to properly send GIF, this is broken as shit.
with open("./parker.gif", "rb") as gif_file:
response = await client.upload(gif_file, content_type="image/gif")
if isinstance(response, UploadResponse):
print("Image was uploaded successfully to server. ")
gif_uri = response.content_uri
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.image",
"url": gif_uri,
"body": "parker.gif",
"info": {"mimetype": "image/gif"},
},
)
else:
print(f"Failed to upload image. Failure response: {response}")
async def send_playlist_of_week(client, sender, room_id, playlist_id):
"""Sends playlist of the week in reply to sender, in room with room_id."""
playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
reply_msg = f"{sender}, here's the playlist of the week: {playlist_link}"
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": reply_msg},
)
async def message_callback(conn, cursor, youtube, client, room, event):
"""Event handler for received messages."""
sender = event.sender
if sender != MATRIX_USER:
body = event.body.strip()
timestamp = event.server_timestamp
playlist_id = get_or_make_playlist(
conn, cursor, youtube, monday_date(timestamp)
)
timestamp_sec = datetime.datetime.fromtimestamp(
event.server_timestamp / 1000, datetime.UTC # millisec to sec
)
current_time = datetime.datetime.now(datetime.UTC)
recent = current_time - timestamp_sec < datetime.timedelta(seconds=30)
if body == "!parkerbot" and recent:
await send_intro_message(client, sender, room.room_id)
return
if body == "!pow" and recent:
await send_playlist_of_week(client, sender, room.room_id, playlist_id)
return
youtube_link_pattern = (
r"(https?://(?:www\.|music\.)?youtube\.com/(?!playlist\?list=)watch"
r"\?v=[\w-]+|https?://youtu\.be/[\w-]+)"
)
youtube_links = re.findall(youtube_link_pattern, body)
for link in youtube_links:
video_id = link.split("v=")[-1].split("&")[0].split("/")[-1]
if is_music(youtube, video_id):
message_id = record_message(conn, cursor, sender, link, timestamp)
if in_playlist(cursor, video_id, playlist_id):
print(f"Track is already in this week's playlist: {link}")
else:
# Add video to playlist and record it in the database
add_video_to_playlist(youtube, playlist_id, video_id)
with conn:
cursor.execute(
(
"INSERT INTO playlist_tracks (playlist_id, message_id, video_id) "
"VALUES (?, ?, ?)"
),
(playlist_id, message_id, video_id),
)
print(f"Added track to this week's playlist: {link}")
def in_playlist(cursor, video_id, playlist_id):
"""Checks if video is in playlist."""
cursor.execute(
"SELECT id FROM playlist_tracks WHERE video_id = ? AND playlist_id = ?",
(video_id, playlist_id),
)
if cursor.fetchone():
return True
return False
def record_message(conn, cursor, sender, link, timestamp):
"""Records message to messages table in DB, returns ID."""
try:
with conn:
cursor.execute(
"INSERT INTO messages (sender, message, timestamp) VALUES (?, ?, ?)",
(sender, link, timestamp),
)
print(f"Saved message: {sender} {link} {timestamp}")
except sqlite3.IntegrityError as e:
if "UNIQUE constraint failed" in str(e):
print(f"Entry already exists: {sender} {link} {timestamp}")
else:
raise e
cursor.execute(
"SELECT id FROM messages WHERE sender = ? AND message = ? AND timestamp = ?",
(sender, link, timestamp),
)
return cursor.fetchone()[0]
async def sync_callback(response):
"""Saves Matrix sync token."""
with open(TOKEN_PATH, "w", encoding="utf-8") as f:
f.write(response.next_batch)
def load_sync_token():
"""Gets saved Matrix sync token if it exists."""
try:
with open(TOKEN_PATH, "r", encoding="utf-8") as file:
return file.read().strip()
except FileNotFoundError:
return None
async def get_client(conn, cursor, youtube):
"""Returns configured and logged in Matrix client."""
client = AsyncClient(MATRIX_SERVER, MATRIX_USER)
client.add_event_callback(
lambda room, event: message_callback(
conn, cursor, youtube, client, room, event
),
RoomMessageText,
)
client.add_response_callback(sync_callback, SyncResponse)
print(await client.login(MATRIX_PASSWORD))
return client
async def backwards_sync(conn, cursor, youtube, client, room, start_token):
"""Fetch and process historical messages from a given room."""
print("Starting to process channel log...")
from_token = start_token
room_id = room.room_id
while True:
# Fetch room messages
response = await client.room_messages(room_id, from_token, direction="b")
# Process each message
for event in response.chunk:
if isinstance(event, RoomMessageText):
await message_callback(conn, cursor, youtube, client, room, event)
# Break if there are no more messages to fetch
if not response.end or response.end == from_token:
break
# Update the from_token for the next iteration
from_token = response.end
async def main():
"""Get DB and Matrix client ready, and start syncing."""
args = parse_arguments()
conn, cursor = connect_db()
define_tables(conn, cursor)
youtube = get_authenticated_service()
client = await get_client(conn, cursor, youtube)
sync_token = load_sync_token()
# This is incredibly dumb and most probably will exceed your YouTube API quota.
if args.backwards_sync:
init_sync = await client.sync(30000)
room = await client.room_resolve_alias(MATRIX_ROOM)
await backwards_sync(conn, cursor, youtube, client, room, init_sync.next_batch)
await client.sync_forever(30000, full_state=True, since=sync_token)
if __name__ == "__main__":
asyncio.run(main())