version control db

This commit is contained in:
mcinj 2022-05-19 18:03:35 -04:00
parent 948281dbfb
commit abc2e66c29
11 changed files with 278 additions and 76 deletions

135
src/database.py Normal file
View file

@ -0,0 +1,135 @@
from datetime import datetime, timedelta
import sqlalchemy
from alembic import command
from alembic.config import Config
from dateutil import tz
from sqlalchemy import func
from sqlalchemy.orm import Session
import log
from models import TableNotification, TableGiveaway, TableSteamItem
logger = log.get_logger(__name__)
engine = None
def create_engine(db_url: str):
global engine
if not engine:
engine = sqlalchemy.create_engine(db_url, echo=False)
engine.connect()
def run_migrations(script_location: str, dsn: str) -> None:
logger.info('Running DB migrations in %r on %r', script_location, dsn)
alembic_cfg = Config()
alembic_cfg.set_main_option('script_location', script_location)
alembic_cfg.set_main_option('sqlalchemy.url', dsn)
command.upgrade(alembic_cfg, 'head')
class NotificationHelper:
@classmethod
def insert(cls, type_of_error, message, medium, success):
with Session(engine) as session:
n = TableNotification(type=type_of_error, message=message, medium=medium, success=success)
session.add(n)
session.commit()
@classmethod
def get_won_notifications_today(cls):
with Session(engine) as session:
# with how filtering of datetimes works with a sqlite backend I couldn't figure out a better way
# to filter out the dates to local time when they are stored in utc in the db
within_3_days = session.query(TableNotification) \
.filter(func.DATE(TableNotification.created_at) >= (datetime.utcnow().date() - timedelta(days=1))) \
.filter(func.DATE(TableNotification.created_at) <= (datetime.utcnow().date() + timedelta(days=1))) \
.filter_by(type='won').all()
actual = []
for r in within_3_days:
if r.created_at.replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal()).date() == datetime.now(
tz=tz.tzlocal()).date():
actual.append(r)
return actual
@classmethod
def get_won_notifications(cls):
with Session(engine) as session:
return session.query(TableNotification) \
.filter_by(type='won') \
.all()
@classmethod
def get_error_notifications(cls):
with Session(engine) as session:
return session.query(TableNotification) \
.filter_by(type='error') \
.all()
class GiveawayHelper:
@classmethod
def unix_timestamp_to_utc_datetime(cls, timestamp):
return datetime.utcfromtimestamp(timestamp)
@classmethod
def get_by_ids(cls, giveaway):
with Session(engine) as session:
return session.query(TableGiveaway).filter_by(giveaway_id=giveaway.giveaway_game_id,
steam_id=giveaway.steam_app_id).all()
@classmethod
def insert(cls, giveaway, entered):
with Session(engine) as session:
result = session.query(TableSteamItem).filter_by(steam_id=giveaway.steam_app_id).all()
if result:
steam_id = result[0].steam_id
else:
item = TableSteamItem(
steam_id=giveaway.steam_app_id,
steam_url=giveaway.steam_url,
game_name=giveaway.game_name)
session.add(item)
session.flush()
steam_id = item.steam_id
g = TableGiveaway(
giveaway_id=giveaway.giveaway_game_id,
steam_id=steam_id,
giveaway_uri=giveaway.giveaway_uri,
user=giveaway.user,
giveaway_created_at=GiveawayHelper.unix_timestamp_to_utc_datetime(giveaway.time_created_timestamp),
giveaway_ended_at=GiveawayHelper.unix_timestamp_to_utc_datetime(giveaway.time_remaining_timestamp),
cost=giveaway.cost,
copies=giveaway.copies,
contributor_level=giveaway.contributor_level,
entered=entered,
won=False,
game_entries=giveaway.game_entries)
session.add(g)
session.commit()
@classmethod
def upsert_giveaway(cls, giveaway, entered):
result = GiveawayHelper.get_by_ids(giveaway)
if not result:
GiveawayHelper.insert(giveaway, entered)
else:
with Session(engine) as session:
g = TableGiveaway(
giveaway_id=giveaway.giveaway_game_id,
steam_id=result[0].steam_id,
giveaway_uri=giveaway.giveaway_uri,
user=giveaway.user,
giveaway_created_at=TableGiveaway.unix_timestamp_to_utc_datetime(giveaway.time_created_timestamp),
giveaway_ended_at=TableGiveaway.unix_timestamp_to_utc_datetime(giveaway.time_remaining_timestamp),
cost=giveaway.cost,
copies=giveaway.copies,
contributor_level=giveaway.contributor_level,
entered=entered,
won=False,
game_entries=giveaway.game_entries)
session.merge(g)
session.commit()