-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
expire-tweets.js
143 lines (125 loc) · 3.93 KB
/
expire-tweets.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
const Twitter = require('twitter');
const fetch = require('isomorphic-fetch');
const Dropbox = require('dropbox').Dropbox;
const ignoredTweets = require('./ignored-tweets.json');
const bigInt = require('big-integer');
const DAYS_THRESHOLD = 14;
const BATCH_SIZE = 50;
const dbx = new Dropbox({
accessToken: process.env.DROPBOX_ACCESS_TOKEN,
fetch: fetch
});
const client = new Twitter({
consumer_key: process.env.TWITTER_CONSUMER_KEY,
consumer_secret: process.env.TWITTER_CONSUMER_SECRET,
access_token_key: process.env.TWITTER_ACCESS_TOKEN_KEY,
access_token_secret: process.env.TWITTER_ACCESS_TOKEN_SECRET
});
function forAllTweets(fx, commit) {
const params = max_id => ({
screen_name: process.env.TWITTER_USERNAME,
trim_user: true,
count: BATCH_SIZE,
exclude_replies: false,
include_rts: true,
tweet_mode: 'extended',
...(max_id ? { max_id: max_id.toString() } : {})
});
const pageTweets = maxID =>
client.get('statuses/user_timeline', params(maxID)).then(tweets => {
console.log(tweets.length);
var minID = undefined;
const promises = [];
tweets.forEach(tweet => {
promises.push(fx(tweet));
const bigTweetID = bigInt(tweet.id_str);
if (minID === undefined || bigTweetID.compare(minID) === -1) {
minID = bigTweetID;
}
});
if (tweets.length > 1) {
console.log('Min ID for page: ', minID.toString());
return Promise.all(promises)
.then(commit)
.then(() => pageTweets(minID));
} else {
return Promise.all(promises).then(commit);
}
});
return pageTweets();
}
const processTweet = tweet => {
const tweetDate = new Date(tweet.created_at);
const now = new Date();
const threshold = DAYS_THRESHOLD * 24 * 60 * 60 * 1000;
if (ignoredTweets.find(v => v === tweet.id_str)) {
console.log('Ignored, in json: ', tweet.full_text);
return Promise.resolve();
} else if (now.getTime() - tweetDate.getTime() < threshold) {
console.log('Ignored, too young: ', tweet.full_text, tweetDate);
return Promise.resolve();
} else {
// Delete and save tweet
const path = `/${tweet.id_str}.json`;
console.log(`Saving ${path}...`);
const contents = JSON.stringify(tweet);
return client
.post(`statuses/destroy/${tweet.id_str}.json`, { trim_user: true })
.catch(err => {
console.log(err);
console.log(tweet);
fatalError('error');
})
.then(() =>
dbx.filesUploadSessionStart({
contents,
close: true
})
)
.then(response => {
const entry = {
cursor: {
session_id: response.session_id,
offset: Buffer.from(contents).length
},
commit: {
path
}
};
return entry;
});
}
};
const waitForPoll = fx => {
const wait = ms => new Promise(resolve => setTimeout(resolve, ms));
return fx().then(result => {
if (result['.tag'] === 'in_progress') {
console.log('In progress, waiting 1s...');
return wait(1000).then(() => waitForPoll(fx));
} else {
console.log('Batch committed!');
if (result['entries']) {
result['entries'].forEach(entry => {
if (entry.failure) {
console.log(entry.failure);
}
});
}
return result;
}
});
};
const commitTweets = results => {
const entries = results.filter(result => result !== undefined);
console.log(`Committing batch of ${entries.length} files.`);
return dbx.filesUploadSessionFinishBatch({ entries }).then(response => {
const { async_job_id } = response;
console.log('Now polling commit job ID: ', async_job_id);
const pollJob = () =>
dbx.filesUploadSessionFinishBatchCheck({ async_job_id });
return waitForPoll(pollJob);
});
};
forAllTweets(processTweet, commitTweets)
.then(() => process.exit(0))
.catch(err => console.log('error', err));