discord-tweeter/cmd/tweeter.go

259 lines
6.1 KiB
Go

package cmd
import (
"context"
"encoding/json"
ts "github.com/imperatrona/twitter-scraper"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
var (
configPath = "./config.toml"
dbPath = "./db/tweets.db"
cookiePath = "./db/cookies.json"
)
const (
ScrapeInterval int = 3 // How often to check for new tweets (in minutes)
ScrapeDelay int64 = 0 // How long to wait between api requests (in seconds)
ScrapeStep int = 10 // How many tweets to get at a time
)
type App struct {
config *Config
db *Database
scraper *ts.Scraper
}
func Run() {
args := os.Args[1:]
if len(args) > 1 {
log.Fatalln("Too many arguments")
}
if len(args) == 1 {
if args[0] == "" {
log.Fatalln("No config path given")
}
configPath = args[0]
}
config, err := ConfigFromFile(configPath)
if err != nil {
log.Fatalf("There has been an error parsing config file '%s': %s\n", configPath, err.Error())
}
if len(config.Channels) == 0 {
log.Fatalln("List of channels cannot be empty")
}
if len(config.Channels) != len(config.Filter) {
log.Fatalln("List of filters has to be same length as channel list")
}
if config.Webhook == "" {
log.Fatalln("Webhook address cannot be empty")
}
if config.DbPath != "" {
dbPath = config.DbPath
}
if config.CookiePath != "" {
cookiePath = config.CookiePath
}
db, dberr := NewDatabase("sqlite3", dbPath)
if dberr != nil {
log.Fatalf("An error occurred while creating database connection: %s\n", dberr.Error())
}
defer db.Close()
scraper := ts.New()
if config.ProxyAddr != "" {
err := scraper.SetProxy(config.ProxyAddr)
if err != nil {
log.Fatalf("An error occurred with proxy connection: %s\n", err.Error())
}
}
{
f, err := os.Open(cookiePath)
if err != nil {
log.Println("Cookie file does not yet exist")
} else {
var cookies []*http.Cookie
json.NewDecoder(f).Decode(&cookies)
scraper.SetCookies(cookies)
}
}
if scraper.IsLoggedIn() {
log.Println("We're already logged in, skipping login...")
} else {
scraper.ClearCookies()
if len(config.Username) > 0 {
err := scraper.Login(config.Username, config.Password)
if err != nil {
log.Fatalf("An error occurred during scraper login: %s\n", err.Error())
} else {
log.Printf("New Login - Saving cookies to %s\n", cookiePath)
js, jsonErr := json.Marshal(scraper.GetCookies())
if jsonErr != nil {
log.Fatalf("An error occurred during cookie serialization: %s\n", jsonErr.Error())
}
f, fErr := os.Create(cookiePath)
if fErr != nil {
log.Fatalf("Failed to create cookie file at %s with the following error: %s\n", cookiePath, fErr.Error())
}
f.Write(js)
writeErr := f.Close()
if writeErr != nil {
log.Fatalf("An error occurred on closing cookie file: %s\n", writeErr.Error())
}
}
} else {
log.Println("Trying open account login... ")
_, err := scraper.LoginOpenAccount() // TODO: save openaccount token/secret
if err != nil {
log.Fatalf("An error occurred during scraper login: %s\n", err.Error())
}
defer scraper.Logout()
}
}
scraper.WithDelay(ScrapeDelay)
app := App{config, db, scraper}
for i, c := range config.Channels {
go app.queryX(i, c)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
done := make(chan bool, 1)
go func() {
sig := <-sigs
log.Println(sig)
done <- true
}()
<-done
log.Println("Exiting...")
}
func (app *App) queryX(id int, channel string) {
log.Printf("Starting worker %d for channel %s", id, channel)
// Sleep to stagger api queries of workers
time.Sleep(time.Duration(id) * time.Minute)
db := app.db
filter := app.config.Filter[id]
init := true
ScrapeLoop:
for {
if !init {
// Sleep for set interval +-30 seconds
time.Sleep(time.Duration(ScrapeInterval)*time.Minute + time.Duration(rand.Intn(60)-30)*time.Second)
}
init = false
step := ScrapeStep
tweets := []*ts.Tweet{}
tweetsToParse := []*ts.Tweet{}
tweetsToPost := []*ts.Tweet{}
dbTweets, dbErr := db.GetTweets(channel)
if dbErr != nil {
log.Printf("Error while retrieving tweets from database for channel %s: %s", channel, dbErr.Error())
continue ScrapeLoop
}
GetTweets:
for {
for tweet := range app.scraper.GetTweets(context.Background(), channel, step) {
if tweet.Error != nil {
log.Printf("Error while retrieving tweet for channel %s: %s", channel, tweet.Error.Error())
continue ScrapeLoop
}
tweets = append(tweets, &tweet.Tweet)
}
if len(tweets) == 0 {
break GetTweets
}
if len(dbTweets) > 0 {
for _, dbTweet := range dbTweets {
for i, tweet := range tweets {
if dbTweet.EqualsTweet(tweet) {
tweetsToParse = tweets[:i]
break GetTweets
}
}
}
} else {
tweetsToParse = append(tweetsToParse, tweets[0])
break GetTweets
}
if step >= 50 {
tweetsToParse = append(tweetsToParse, tweets[0])
break GetTweets
} else if step+ScrapeStep > 50 {
step = 50
} else {
step += ScrapeStep
}
log.Printf("Fetching more tweets for %s...", channel)
time.Sleep(time.Duration(3) * time.Second) // Wait a few seconds for next api request
}
ParseTweets:
// We want to parse old to new
for i := len(tweetsToParse) - 1; i >= 0; i-- {
tweet := tweetsToParse[i]
if filterTweet(filter, tweet) {
// One of the filters applies as same bits are 1, so we skip this tweet
continue ParseTweets
}
err := db.InsertTweet(channel, tweet)
if err != nil {
log.Printf("Error while inserting tweet for channel %s into the database: %s", channel, err.Error())
continue ParseTweets
}
tweetsToPost = append(tweetsToPost, tweet)
}
sendToWebhook(app.config.Webhook, tweetsToPost)
err := db.PruneOldestTweets(channel)
if err != nil {
log.Printf("Error while pruning old tweets for channel %s: %s", channel, err.Error())
}
}
}
func filterTweet(filter uint8, tweet *ts.Tweet) bool {
var tweetFilter uint8 = 0
filterMap := []bool{tweet.IsSelfThread, tweet.IsRetweet, tweet.IsReply, tweet.IsPin, tweet.IsQuoted}
for _, f := range filterMap {
tweetFilter <<= 1
if f {
tweetFilter |= 1
}
}
return filter&tweetFilter > 0
}