-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathdb.js
More file actions
102 lines (90 loc) · 2.75 KB
/
db.js
File metadata and controls
102 lines (90 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
const bluebird = require('bluebird');
const redis = require('redis');
const MongoClient = require('mongodb').MongoClient;
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
const debug = {
db: require('debug')('crawler:db'),
redis: require('debug')('crawler:redis'),
mongo: require('debug')('crawler:mongo'),
};
module.exports = {
connect: async () => {
this.db = await MongoClient.connect('mongodb://localhost:27017/crawler');
this.client = redis.createClient(
process.env.REDIS_PORT || 6379,
process.env.REDIS_HOST || 'localhost'
);
},
store: async page => {
debug.db(`Store page ${page.url}`);
// We don't need to wait for this
(async () => {
debug.mongo('Add page to mongo');
await this.db.collection('pages').insertOne(page);
debug.mongo('Mongo save complete');
})();
debug.redis('Add scraped urls to redis');
const multi = this.client.multi();
page.outboundUrls.forEach(url => {
multi.sadd('discoveredPages', url);
});
const result = await multi.execAsync();
debug.redis('Added urls to discovered set');
let count = 0;
result.forEach((notDiscovered, i) => {
if (notDiscovered) {
count++;
const url = page.outboundUrls[i];
multi.rpush('pageQueue', `${url} ${page.radius}`);
}
});
await multi.execAsync();
debug.redis(`Added ${count} new urls to queue`);
debug.redis(`${page.outboundUrls.length - count} duplicates found`);
debug.db('Page stored');
},
popUrl: async () => {
debug.redis('Pop url from queue');
const reply = await this.client.lpopAsync('pageQueue');
if (reply) {
debug.redis('Url popped');
if (debug.redis.enabled) {
const length = await this.client.llenAsync('pageQueue');
debug.redis(`${length} urls in queue`);
}
const parts = reply.match(/(.+) ([0-9]+)$/);
return {
url: parts[1],
radius: parseInt(parts[2]),
};
}
debug.redis('Queue empty');
return null;
},
getNodes: async () => {
const pages = await this.db.collection('pages').find().toArray();
const nodes = [];
pages.forEach(page => {
page.outboundUrls.forEach(url => {
nodes.push({ source: page.url, target: url });
});
});
console.log(JSON.stringify(nodes));
},
flush: async () => {
debug.mongo('Drop page collection');
const pages = await this.db.collection('pages');
if (pages) {
await pages.drop();
}
debug.mongo('Page collection dropped');
debug.redis('Flush db');
await this.client.del('discoveredPages', 'pageQueue');
debug.redis('Redis flushed');
},
close: () => {
this.client.end(true);
this.db.close();
},
};