259 lines
6.1 KiB
Go
259 lines
6.1 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
ts "github.com/imperatrona/twitter-scraper"
|
|
"log"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
configPath = "./config.toml"
|
|
dbPath = "./db/tweets.db"
|
|
cookiePath = "./db/cookies.json"
|
|
)
|
|
|
|
const (
|
|
ScrapeInterval int = 3 // How often to check for new tweets (in minutes)
|
|
ScrapeDelay int64 = 0 // How long to wait between api requests (in seconds)
|
|
ScrapeStep int = 10 // How many tweets to get at a time
|
|
)
|
|
|
|
type App struct {
|
|
config *Config
|
|
db *Database
|
|
scraper *ts.Scraper
|
|
}
|
|
|
|
func Run() {
|
|
args := os.Args[1:]
|
|
|
|
if len(args) > 1 {
|
|
log.Fatalln("Too many arguments")
|
|
}
|
|
|
|
if len(args) == 1 {
|
|
if args[0] == "" {
|
|
log.Fatalln("No config path given")
|
|
}
|
|
configPath = args[0]
|
|
}
|
|
|
|
config, err := ConfigFromFile(configPath)
|
|
if err != nil {
|
|
log.Fatalf("There has been an error parsing config file '%s': %s\n", configPath, err.Error())
|
|
}
|
|
|
|
if len(config.Channels) == 0 {
|
|
log.Fatalln("List of channels cannot be empty")
|
|
}
|
|
|
|
if len(config.Channels) != len(config.Filter) {
|
|
log.Fatalln("List of filters has to be same length as channel list")
|
|
}
|
|
|
|
if config.Webhook == "" {
|
|
log.Fatalln("Webhook address cannot be empty")
|
|
}
|
|
|
|
if config.DbPath != "" {
|
|
dbPath = config.DbPath
|
|
}
|
|
|
|
if config.CookiePath != "" {
|
|
cookiePath = config.CookiePath
|
|
}
|
|
|
|
db, dberr := NewDatabase("sqlite3", dbPath)
|
|
if dberr != nil {
|
|
log.Fatalf("An error occurred while creating database connection: %s\n", dberr.Error())
|
|
}
|
|
defer db.Close()
|
|
|
|
scraper := ts.New()
|
|
|
|
if config.ProxyAddr != "" {
|
|
err := scraper.SetProxy(config.ProxyAddr)
|
|
if err != nil {
|
|
log.Fatalf("An error occurred with proxy connection: %s\n", err.Error())
|
|
}
|
|
}
|
|
|
|
{
|
|
f, err := os.Open(cookiePath)
|
|
if err != nil {
|
|
log.Println("Cookie file does not yet exist")
|
|
} else {
|
|
var cookies []*http.Cookie
|
|
json.NewDecoder(f).Decode(&cookies)
|
|
scraper.SetCookies(cookies)
|
|
}
|
|
}
|
|
|
|
if scraper.IsLoggedIn() {
|
|
log.Println("We're already logged in, skipping login...")
|
|
} else {
|
|
scraper.ClearCookies()
|
|
if len(config.Username) > 0 {
|
|
err := scraper.Login(config.Username, config.Password)
|
|
if err != nil {
|
|
log.Fatalf("An error occurred during scraper login: %s\n", err.Error())
|
|
} else {
|
|
log.Printf("New Login - Saving cookies to %s\n", cookiePath)
|
|
js, jsonErr := json.Marshal(scraper.GetCookies())
|
|
if jsonErr != nil {
|
|
log.Fatalf("An error occurred during cookie serialization: %s\n", jsonErr.Error())
|
|
}
|
|
f, fErr := os.Create(cookiePath)
|
|
if fErr != nil {
|
|
log.Fatalf("Failed to create cookie file at %s with the following error: %s\n", cookiePath, fErr.Error())
|
|
}
|
|
f.Write(js)
|
|
writeErr := f.Close()
|
|
if writeErr != nil {
|
|
log.Fatalf("An error occurred on closing cookie file: %s\n", writeErr.Error())
|
|
}
|
|
}
|
|
} else {
|
|
log.Println("Trying open account login... ")
|
|
_, err := scraper.LoginOpenAccount() // TODO: save openaccount token/secret
|
|
if err != nil {
|
|
log.Fatalf("An error occurred during scraper login: %s\n", err.Error())
|
|
}
|
|
defer scraper.Logout()
|
|
}
|
|
}
|
|
|
|
scraper.WithDelay(ScrapeDelay)
|
|
|
|
app := App{config, db, scraper}
|
|
|
|
for i, c := range config.Channels {
|
|
go app.queryX(i, c)
|
|
}
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
done := make(chan bool, 1)
|
|
|
|
go func() {
|
|
sig := <-sigs
|
|
log.Println(sig)
|
|
done <- true
|
|
}()
|
|
|
|
<-done
|
|
log.Println("Exiting...")
|
|
}
|
|
|
|
func (app *App) queryX(id int, channel string) {
|
|
log.Printf("Starting worker %d for channel %s", id, channel)
|
|
// Sleep to stagger api queries of workers
|
|
time.Sleep(time.Duration(id) * time.Minute)
|
|
|
|
db := app.db
|
|
filter := app.config.Filter[id]
|
|
init := true
|
|
|
|
ScrapeLoop:
|
|
for {
|
|
if !init {
|
|
// Sleep for set interval +-30 seconds
|
|
time.Sleep(time.Duration(ScrapeInterval)*time.Minute + time.Duration(rand.Intn(60)-30)*time.Second)
|
|
}
|
|
init = false
|
|
|
|
step := ScrapeStep
|
|
tweets := []*ts.Tweet{}
|
|
tweetsToParse := []*ts.Tweet{}
|
|
tweetsToPost := []*ts.Tweet{}
|
|
|
|
dbTweets, dbErr := db.GetTweets(channel)
|
|
if dbErr != nil {
|
|
log.Printf("Error while retrieving tweets from database for channel %s: %s", channel, dbErr.Error())
|
|
continue ScrapeLoop
|
|
}
|
|
|
|
GetTweets:
|
|
for {
|
|
for tweet := range app.scraper.GetTweets(context.Background(), channel, step) {
|
|
if tweet.Error != nil {
|
|
log.Printf("Error while retrieving tweet for channel %s: %s", channel, tweet.Error.Error())
|
|
continue ScrapeLoop
|
|
}
|
|
tweets = append(tweets, &tweet.Tweet)
|
|
}
|
|
if len(tweets) == 0 {
|
|
break GetTweets
|
|
}
|
|
|
|
if len(dbTweets) > 0 {
|
|
for _, dbTweet := range dbTweets {
|
|
for i, tweet := range tweets {
|
|
if dbTweet.EqualsTweet(tweet) {
|
|
tweetsToParse = tweets[:i]
|
|
break GetTweets
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
tweetsToParse = append(tweetsToParse, tweets[0])
|
|
break GetTweets
|
|
}
|
|
|
|
if step >= 50 {
|
|
tweetsToParse = append(tweetsToParse, tweets[0])
|
|
break GetTweets
|
|
} else if step+ScrapeStep > 50 {
|
|
step = 50
|
|
} else {
|
|
step += ScrapeStep
|
|
}
|
|
|
|
log.Printf("Fetching more tweets for %s...", channel)
|
|
time.Sleep(time.Duration(3) * time.Second) // Wait a few seconds for next api request
|
|
}
|
|
|
|
ParseTweets:
|
|
// We want to parse old to new
|
|
for i := len(tweetsToParse) - 1; i >= 0; i-- {
|
|
tweet := tweetsToParse[i]
|
|
if filterTweet(filter, tweet) {
|
|
// One of the filters applies as same bits are 1, so we skip this tweet
|
|
continue ParseTweets
|
|
}
|
|
|
|
err := db.InsertTweet(channel, tweet)
|
|
if err != nil {
|
|
log.Printf("Error while inserting tweet for channel %s into the database: %s", channel, err.Error())
|
|
continue ParseTweets
|
|
}
|
|
tweetsToPost = append(tweetsToPost, tweet)
|
|
}
|
|
|
|
sendToWebhook(app.config.Webhook, tweetsToPost)
|
|
err := db.PruneOldestTweets(channel)
|
|
if err != nil {
|
|
log.Printf("Error while pruning old tweets for channel %s: %s", channel, err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
func filterTweet(filter uint8, tweet *ts.Tweet) bool {
|
|
var tweetFilter uint8 = 0
|
|
filterMap := []bool{tweet.IsSelfThread, tweet.IsRetweet, tweet.IsReply, tweet.IsPin, tweet.IsQuoted}
|
|
for _, f := range filterMap {
|
|
tweetFilter <<= 1
|
|
if f {
|
|
tweetFilter |= 1
|
|
}
|
|
}
|
|
|
|
return filter&tweetFilter > 0
|
|
}
|