Add backfill (DANGER), ensure message timestamp's Monday is used, only respond to recent !pow command.
Signed-off-by: Abdulkadir Furkan Şanlı <me@abdulocra.cy>
This commit is contained in:
		
							
								
								
									
										92
									
								
								main.py
									
									
									
									
									
								
							
							
						
						
									
										92
									
								
								main.py
									
									
									
									
									
								
							@@ -1,17 +1,19 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
"""parkerbot: Matrix bot to generate YouTube (music) playlists from links sent to a channel."""
 | 
					"""parkerbot: Matrix bot to generate YouTube (music) playlists from links sent to a channel."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import argparse
 | 
				
			||||||
import asyncio
 | 
					import asyncio
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
import pickle
 | 
					import pickle
 | 
				
			||||||
import re
 | 
					import re
 | 
				
			||||||
import sqlite3
 | 
					import sqlite3
 | 
				
			||||||
import sys
 | 
					import time
 | 
				
			||||||
from datetime import datetime, timedelta
 | 
					from datetime import datetime, timedelta
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from google.auth.transport.requests import Request
 | 
					from google.auth.transport.requests import Request
 | 
				
			||||||
from google_auth_oauthlib.flow import InstalledAppFlow
 | 
					from google_auth_oauthlib.flow import InstalledAppFlow
 | 
				
			||||||
from googleapiclient.discovery import build
 | 
					from googleapiclient.discovery import build
 | 
				
			||||||
 | 
					from googleapiclient import errors
 | 
				
			||||||
from nio import AsyncClient, RoomMessageText, SyncResponse
 | 
					from nio import AsyncClient, RoomMessageText, SyncResponse
 | 
				
			||||||
 | 
					
 | 
				
			||||||
DATA_DIR = os.getenv("DATA_DIR", "./")
 | 
					DATA_DIR = os.getenv("DATA_DIR", "./")
 | 
				
			||||||
@@ -33,6 +35,19 @@ conn = sqlite3.connect(DB_PATH)
 | 
				
			|||||||
cursor = conn.cursor()
 | 
					cursor = conn.cursor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def parse_arguments():
 | 
				
			||||||
 | 
					    """Parse command line arguments."""
 | 
				
			||||||
 | 
					    parser = argparse.ArgumentParser(
 | 
				
			||||||
 | 
					        description="Matrix bot to generate YouTube (music) playlists from links sent to a channel."
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    parser.add_argument(
 | 
				
			||||||
 | 
					        "--backwards-sync",
 | 
				
			||||||
 | 
					        action="store_true",
 | 
				
			||||||
 | 
					        help="Run backwards sync on start (this may cause you to exceed your daily API quota).",
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    return parser.parse_args()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def define_tables():
 | 
					def define_tables():
 | 
				
			||||||
    """Define tables for use with program."""
 | 
					    """Define tables for use with program."""
 | 
				
			||||||
    with conn:
 | 
					    with conn:
 | 
				
			||||||
@@ -87,10 +102,10 @@ def get_authenticated_service():
 | 
				
			|||||||
    return build("youtube", "v3", credentials=credentials)
 | 
					    return build("youtube", "v3", credentials=credentials)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_monday_date():
 | 
					def get_monday_date(timestamp):
 | 
				
			||||||
    """Get Monday of current week. Weeks start on Monday."""
 | 
					    """Get Monday of the week for the given timestamp. Weeks start on Monday."""
 | 
				
			||||||
    today = datetime.now()
 | 
					    date = datetime.utcfromtimestamp(timestamp / 1000)
 | 
				
			||||||
    return today - timedelta(days=today.weekday())
 | 
					    return date - timedelta(days=date.weekday())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def make_playlist(youtube, title):
 | 
					def make_playlist(youtube, title):
 | 
				
			||||||
@@ -136,17 +151,25 @@ def get_or_make_playlist(youtube, monday_date):
 | 
				
			|||||||
    return playlist_id
 | 
					    return playlist_id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def add_video_to_playlist(youtube, playlist_id, video_id):
 | 
					def add_video_to_playlist(youtube, playlist_id, video_id, retry_count=3):
 | 
				
			||||||
    """Add video to playlist."""
 | 
					    """Add video to playlist."""
 | 
				
			||||||
    youtube.playlistItems().insert(
 | 
					    for attempt in range(retry_count):
 | 
				
			||||||
        part="snippet",
 | 
					        try:
 | 
				
			||||||
        body={
 | 
					            youtube.playlistItems().insert(
 | 
				
			||||||
            "snippet": {
 | 
					                part="snippet",
 | 
				
			||||||
                "playlistId": playlist_id,
 | 
					                body={
 | 
				
			||||||
                "resourceId": {"kind": "youtube#video", "videoId": video_id},
 | 
					                    "snippet": {
 | 
				
			||||||
            }
 | 
					                        "playlistId": playlist_id,
 | 
				
			||||||
        },
 | 
					                        "resourceId": {"kind": "youtube#video", "videoId": video_id},
 | 
				
			||||||
    ).execute()
 | 
					                    }
 | 
				
			||||||
 | 
					                },
 | 
				
			||||||
 | 
					            ).execute()
 | 
				
			||||||
 | 
					            break
 | 
				
			||||||
 | 
					        except errors.HttpError as error:
 | 
				
			||||||
 | 
					            if attempt < retry_count - 1:
 | 
				
			||||||
 | 
					                time.sleep(2**attempt)
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					            raise error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def is_music(youtube, video_id):
 | 
					def is_music(youtube, video_id):
 | 
				
			||||||
@@ -165,12 +188,17 @@ async def message_callback(client, room, event):
 | 
				
			|||||||
        body = event.body
 | 
					        body = event.body
 | 
				
			||||||
        timestamp = event.server_timestamp
 | 
					        timestamp = event.server_timestamp
 | 
				
			||||||
        room_id = room.room_id
 | 
					        room_id = room.room_id
 | 
				
			||||||
        monday_date = get_monday_date()
 | 
					        monday_date = get_monday_date(timestamp)
 | 
				
			||||||
        youtube = get_authenticated_service()
 | 
					        youtube = get_authenticated_service()
 | 
				
			||||||
        playlist_id = get_or_make_playlist(youtube, monday_date)
 | 
					        playlist_id = get_or_make_playlist(youtube, monday_date)
 | 
				
			||||||
        youtube_links = re.findall(youtube_link_pattern, body)
 | 
					        youtube_links = re.findall(youtube_link_pattern, body)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if body == "!pow":
 | 
					        timestamp_sec = datetime.utcfromtimestamp(
 | 
				
			||||||
 | 
					            event.server_timestamp / 1000
 | 
				
			||||||
 | 
					        )  # milisec to sec
 | 
				
			||||||
 | 
					        current_time = datetime.utcnow()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if body == "!pow" and current_time - timestamp_sec < timedelta(seconds=30):
 | 
				
			||||||
            playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
 | 
					            playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
 | 
				
			||||||
            reply_msg = f"{sender}, here's the playlist of the week: {playlist_link}"
 | 
					            reply_msg = f"{sender}, here's the playlist of the week: {playlist_link}"
 | 
				
			||||||
            await client.room_send(
 | 
					            await client.room_send(
 | 
				
			||||||
@@ -243,15 +271,43 @@ async def get_client():
 | 
				
			|||||||
    )
 | 
					    )
 | 
				
			||||||
    client.add_response_callback(sync_callback, SyncResponse)
 | 
					    client.add_response_callback(sync_callback, SyncResponse)
 | 
				
			||||||
    print(await client.login(MATRIX_PASSWORD))
 | 
					    print(await client.login(MATRIX_PASSWORD))
 | 
				
			||||||
    await client.join(MATRIX_ROOM)
 | 
					 | 
				
			||||||
    return client
 | 
					    return client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def backwards_sync(client, room, start_token):
 | 
				
			||||||
 | 
					    """Fetch and process historical messages from a given room."""
 | 
				
			||||||
 | 
					    print("Starting to process channel log...")
 | 
				
			||||||
 | 
					    from_token = start_token
 | 
				
			||||||
 | 
					    room_id = room.room_id
 | 
				
			||||||
 | 
					    while True:
 | 
				
			||||||
 | 
					        # Fetch room messages
 | 
				
			||||||
 | 
					        response = await client.room_messages(room_id, from_token, direction="b")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Process each message
 | 
				
			||||||
 | 
					        for event in response.chunk:
 | 
				
			||||||
 | 
					            if isinstance(event, RoomMessageText):
 | 
				
			||||||
 | 
					                await message_callback(client, room, event)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Break if there are no more messages to fetch
 | 
				
			||||||
 | 
					        if not response.end or response.end == from_token:
 | 
				
			||||||
 | 
					            break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Update the from_token for the next iteration
 | 
				
			||||||
 | 
					        from_token = response.end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def main():
 | 
					async def main():
 | 
				
			||||||
    """Get DB and Matrix client ready, and start syncing."""
 | 
					    """Get DB and Matrix client ready, and start syncing."""
 | 
				
			||||||
 | 
					    args = parse_arguments()
 | 
				
			||||||
    define_tables()
 | 
					    define_tables()
 | 
				
			||||||
    client = await get_client()
 | 
					    client = await get_client()
 | 
				
			||||||
    sync_token = load_sync_token()
 | 
					    sync_token = load_sync_token()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if args.backwards_sync:
 | 
				
			||||||
 | 
					        init_sync = await client.sync(30000)
 | 
				
			||||||
 | 
					        room = await client.room_resolve_alias(MATRIX_ROOM)
 | 
				
			||||||
 | 
					        await backwards_sync(client, room, init_sync.next_batch)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    await client.sync_forever(30000, full_state=True, since=sync_token)
 | 
					    await client.sync_forever(30000, full_state=True, since=sync_token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user