imaginaryfriend/src/service/media_uniqueness_checker.py

46 lines
1.4 KiB
Python

from datetime import datetime, timedelta
from src.config import config
from urllib.parse import urlparse
class MediaUniquenessChecker:
lifetime = timedelta(seconds=config.getfloat('media_checker', 'lifetime'))
key = "media_checker:{}"
def __init__(self, redis):
self.redis = redis
def check(self, message):
"""Returns True if at least one media entity was already in this chat
"""
redis = self.redis.instance()
key = self.key.format(message.chat_id)
now = datetime.now()
delete_at = (now + self.lifetime).timestamp()
redis.zremrangebyscore(key, 0, now.timestamp())
pipe = redis.pipeline()
for element in self.__extract_media(message):
pipe.zadd(key, element, delete_at)
return any(x == 0 for x in pipe.execute())
def __extract_media(self, message):
links = []
def prettify(url):
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://' + url
link = urlparse(url)
host = '.'.join(link.hostname.split('.')[-2:])
return '{}{}#{}?{}'.format(host, link.path, link.fragment, link.query)
for entity in filter(lambda e: e.type == 'url', message.message.entities):
link = prettify(message.text[entity.offset:entity.length + entity.offset])
links.append(link)
return links