package cmd import ( "context" "encoding/json" ts "github.com/imperatrona/twitter-scraper" "log" "math/rand" "net/http" "os" "os/signal" "syscall" "time" ) var ( configPath = "./config.toml" dbPath = "./db/tweets.db" cookiePath = "./db/cookies.json" ) const ( ScrapeInterval int = 3 // How often to check for new tweets (in minutes) ScrapeDelay int64 = 0 // How long to wait between api requests (in seconds) ScrapeStep int = 10 // How many tweets to get at a time ) type App struct { config *Config db *Database scraper *ts.Scraper } func Run() { args := os.Args[1:] if len(args) > 1 { log.Fatalln("Too many arguments") } if len(args) == 1 { if args[0] == "" { log.Fatalln("No config path given") } configPath = args[0] } config, err := ConfigFromFile(configPath) if err != nil { log.Fatalf("There has been an error parsing config file '%s': %s\n", configPath, err.Error()) } if len(config.Channels) == 0 { log.Fatalln("List of channels cannot be empty") } if len(config.Channels) != len(config.Filter) { log.Fatalln("List of filters has to be same length as channel list") } if config.Webhook == "" { log.Fatalln("Webhook address cannot be empty") } if config.DbPath != "" { dbPath = config.DbPath } if config.CookiePath != "" { cookiePath = config.CookiePath } db, dberr := NewDatabase("sqlite3", dbPath) if dberr != nil { log.Fatalf("An error occurred while creating database connection: %s\n", dberr.Error()) } defer db.Close() scraper := ts.New() if config.ProxyAddr != "" { err := scraper.SetProxy(config.ProxyAddr) if err != nil { log.Fatalf("An error occurred with proxy connection: %s\n", err.Error()) } } { f, err := os.Open(cookiePath) if err != nil { log.Println("Cookie file does not yet exist") } else { var cookies []*http.Cookie json.NewDecoder(f).Decode(&cookies) scraper.SetCookies(cookies) } } if scraper.IsLoggedIn() { log.Println("We're already logged in, skipping login...") } else { scraper.ClearCookies() if len(config.Username) > 0 { err := scraper.Login(config.Username, config.Password) if err != nil { log.Fatalf("An error occurred during scraper login: %s\n", err.Error()) } else { log.Printf("New Login - Saving cookies to %s\n", cookiePath) js, jsonErr := json.Marshal(scraper.GetCookies()) if jsonErr != nil { log.Fatalf("An error occurred during cookie serialization: %s\n", jsonErr.Error()) } f, fErr := os.Create(cookiePath) if fErr != nil { log.Fatalf("Failed to create cookie file at %s with the following error: %s\n", cookiePath, fErr.Error()) } f.Write(js) writeErr := f.Close() if writeErr != nil { log.Fatalf("An error occurred on closing cookie file: %s\n", writeErr.Error()) } } } else { log.Println("Trying open account login... ") _, err := scraper.LoginOpenAccount() // TODO: save openaccount token/secret if err != nil { log.Fatalf("An error occurred during scraper login: %s\n", err.Error()) } defer scraper.Logout() } } scraper.WithDelay(ScrapeDelay) app := App{config, db, scraper} for i, c := range config.Channels { go app.queryX(i, c) } sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) done := make(chan bool, 1) go func() { sig := <-sigs log.Println(sig) done <- true }() <-done log.Println("Exiting...") } func (app *App) queryX(id int, channel string) { log.Printf("Starting worker %d for channel %s", id, channel) // Sleep to stagger api queries of workers time.Sleep(time.Duration(id) * time.Minute) db := app.db filter := app.config.Filter[id] init := true ScrapeLoop: for { if !init { // Sleep for set interval +-30 seconds time.Sleep(time.Duration(ScrapeInterval)*time.Minute + time.Duration(rand.Intn(60)-30)*time.Second) } init = false step := ScrapeStep tweets := []*ts.Tweet{} tweetsToParse := []*ts.Tweet{} tweetsToPost := []*ts.Tweet{} dbTweets, dbErr := db.GetTweets(channel) if dbErr != nil { log.Printf("Error while retrieving tweets from database for channel %s: %s", channel, dbErr.Error()) continue ScrapeLoop } GetTweets: for { for tweet := range app.scraper.GetTweets(context.Background(), channel, step) { if tweet.Error != nil { log.Printf("Error while retrieving tweet for channel %s: %s", channel, tweet.Error.Error()) continue ScrapeLoop } tweets = append(tweets, &tweet.Tweet) } if len(tweets) == 0 { break GetTweets } if len(dbTweets) > 0 { for _, dbTweet := range dbTweets { for i, tweet := range tweets { if dbTweet.EqualsTweet(tweet) { tweetsToParse = tweets[:i] break GetTweets } } } } else { tweetsToParse = append(tweetsToParse, tweets[0]) break GetTweets } if step >= 50 { tweetsToParse = append(tweetsToParse, tweets[0]) break GetTweets } else if step+ScrapeStep > 50 { step = 50 } else { step += ScrapeStep } log.Printf("Fetching more tweets for %s...", channel) time.Sleep(time.Duration(3) * time.Second) // Wait a few seconds for next api request } ParseTweets: // We want to parse old to new for i := len(tweetsToParse) - 1; i >= 0; i-- { tweet := tweetsToParse[i] if filterTweet(filter, tweet) { // One of the filters applies as same bits are 1, so we skip this tweet continue ParseTweets } err := db.InsertTweet(channel, tweet) if err != nil { log.Printf("Error while inserting tweet for channel %s into the database: %s", channel, err.Error()) continue ParseTweets } tweetsToPost = append(tweetsToPost, tweet) } sendToWebhook(app.config.Webhook, tweetsToPost) err := db.PruneOldestTweets(channel) if err != nil { log.Printf("Error while pruning old tweets for channel %s: %s", channel, err.Error()) } } } func filterTweet(filter uint8, tweet *ts.Tweet) bool { var tweetFilter uint8 = 0 filterMap := []bool{tweet.IsSelfThread, tweet.IsRetweet, tweet.IsReply, tweet.IsPin, tweet.IsQuoted} for _, f := range filterMap { tweetFilter <<= 1 if f { tweetFilter |= 1 } } return filter&tweetFilter > 0 }