steamgifts-bot/src/bot/database.py

153 lines
6.2 KiB
Python

import os
from datetime import datetime, timedelta
import sqlalchemy
from alembic import command
from alembic.config import Config
from dateutil import tz
from sqlalchemy import func
from sqlalchemy.orm import Session
from sqlalchemy_utils import database_exists
from .log import get_logger
from .models import TableNotification, TableGiveaway, TableSteamItem
logger = get_logger(__name__)
engine = sqlalchemy.create_engine(f"{os.getenv('BOT_DB_URL', 'sqlite:///./config/sqlite.db')}", echo=False)
engine.connect()
def create_engine(db_url: str):
return engine
def run_migrations(script_location: str, db_url: str) -> None:
logger.debug('Running DB migrations in %r on %r', script_location, db_url)
alembic_cfg = Config()
alembic_cfg.set_main_option('script_location', script_location)
alembic_cfg.set_main_option('sqlalchemy.url', db_url)
if not database_exists(db_url):
logger.debug(f"'{db_url}' does not exist. Running normal migration to create db and tables." )
command.upgrade(alembic_cfg, 'head')
if database_exists(db_url):
logger.debug(f"'{db_url} exists.")
e = create_engine(db_url)
insp = sqlalchemy.inspect(e)
alembic_version_table_name = 'alembic_version'
has_alembic_table = insp.has_table(alembic_version_table_name)
if has_alembic_table:
logger.debug(f"Table '{alembic_version_table_name}' exists.")
else:
logger.debug(f"Table '{alembic_version_table_name}' doesn't exist so assuming it was created pre-alembic "
f"setup. Setting the version to the first version created prior to alembic setup.")
alembic_first_ref = '1da33402b659'
command.stamp(alembic_cfg, alembic_first_ref)
logger.debug("Running migration.")
command.upgrade(alembic_cfg, 'head')
class NotificationHelper:
@classmethod
def insert(cls, type_of_error, message, medium, success):
with Session(engine) as session:
n = TableNotification(type=type_of_error, message=message, medium=medium, success=success)
session.add(n)
session.commit()
@classmethod
def get_won_notifications_today(cls):
with Session(engine) as session:
# with how filtering of datetimes works with a sqlite backend I couldn't figure out a better way
# to filter out the dates to local time when they are stored in utc in the db
within_3_days = session.query(TableNotification) \
.filter(func.DATE(TableNotification.created_at) >= (datetime.utcnow().date() - timedelta(days=1))) \
.filter(func.DATE(TableNotification.created_at) <= (datetime.utcnow().date() + timedelta(days=1))) \
.filter_by(type='won').all()
actual = []
for r in within_3_days:
if r.created_at.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).date() == datetime.now(
tz=tz.tzlocal()).date():
actual.append(r)
return actual
@classmethod
def get_won_notifications(cls):
with Session(engine) as session:
return session.query(TableNotification) \
.filter_by(type='won') \
.all()
@classmethod
def get_error_notifications(cls):
with Session(engine) as session:
return session.query(TableNotification) \
.filter_by(type='error') \
.all()
class GiveawayHelper:
@classmethod
def unix_timestamp_to_utc_datetime(cls, timestamp):
return datetime.utcfromtimestamp(timestamp)
@classmethod
def get_by_ids(cls, giveaway):
with Session(engine) as session:
return session.query(TableGiveaway).filter_by(giveaway_id=giveaway.giveaway_game_id,
steam_id=giveaway.steam_app_id).all()
@classmethod
def insert(cls, giveaway, entered):
with Session(engine) as session:
result = session.query(TableSteamItem).filter_by(steam_id=giveaway.steam_app_id).all()
if result:
steam_id = result[0].steam_id
else:
item = TableSteamItem(
steam_id=giveaway.steam_app_id,
steam_url=giveaway.steam_url,
game_name=giveaway.game_name)
session.add(item)
session.flush()
steam_id = item.steam_id
g = TableGiveaway(
giveaway_id=giveaway.giveaway_game_id,
steam_id=steam_id,
giveaway_uri=giveaway.giveaway_uri,
user=giveaway.user,
giveaway_created_at=GiveawayHelper.unix_timestamp_to_utc_datetime(giveaway.time_created_timestamp),
giveaway_ended_at=GiveawayHelper.unix_timestamp_to_utc_datetime(giveaway.time_remaining_timestamp),
cost=giveaway.cost,
copies=giveaway.copies,
contributor_level=giveaway.contributor_level,
entered=entered,
won=False,
game_entries=giveaway.game_entries)
session.add(g)
session.commit()
@classmethod
def upsert_giveaway(cls, giveaway, entered):
result = GiveawayHelper.get_by_ids(giveaway)
if not result:
GiveawayHelper.insert(giveaway, entered)
else:
with Session(engine) as session:
g = TableGiveaway(
giveaway_id=giveaway.giveaway_game_id,
steam_id=result[0].steam_id,
giveaway_uri=giveaway.giveaway_uri,
user=giveaway.user,
giveaway_created_at=GiveawayHelper.unix_timestamp_to_utc_datetime(giveaway.time_created_timestamp),
giveaway_ended_at=GiveawayHelper.unix_timestamp_to_utc_datetime(giveaway.time_remaining_timestamp),
cost=giveaway.cost,
copies=giveaway.copies,
contributor_level=giveaway.contributor_level,
entered=entered,
won=False,
game_entries=giveaway.game_entries)
session.merge(g)
session.commit()