-
Notifications
You must be signed in to change notification settings - Fork 1
/
load-data.js
104 lines (83 loc) · 2.57 KB
/
load-data.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env node
import 'dotenv/config.js'
import process from 'node:process'
import {createGunzip} from 'node:zlib'
import {Transform} from 'node:stream'
import {finished} from 'node:stream/promises'
import {createReadStream} from 'node:fs'
import got from 'got'
import {createParser} from '@etalab/fantoir-parser'
import mongo from './lib/mongo.js'
const FANTOIR_PATH = process.env.FANTOIR_PATH || 'https://adresse.data.gouv.fr/data/fantoir/latest'
const TERRITOIRES = process.env.TERRITOIRES ? process.env.TERRITOIRES.split(',') : undefined
function createFantoirStream() {
if (FANTOIR_PATH.startsWith('http')) {
return got.stream(FANTOIR_PATH)
}
return createReadStream(FANTOIR_PATH)
}
function conformToTerritoiresConfig(codeCommune) {
if (!TERRITOIRES) {
return true
}
return TERRITOIRES.some(codeTerritoire => codeCommune.startsWith(codeTerritoire))
}
function createLoader(mongo) {
const communes = []
let currentCommune
let currentVoies
return new Transform({
async transform(item, enc, cb) {
if (item.type === 'voie') {
currentVoies.push(item)
cb()
}
if (item.type === 'commune') {
try {
if (currentCommune && conformToTerritoiresConfig(currentCommune.codeCommune) && currentVoies.length > 0) {
console.log(`Inserting ${currentCommune.codeCommune}`)
await mongo.db.collection('voies').insertMany(currentVoies)
}
currentVoies = []
currentCommune = item
communes.push(item)
cb()
} catch (error) {
cb(error)
}
}
},
async flush(cb) {
try {
if (currentCommune && conformToTerritoiresConfig(currentCommune.codeCommune) && currentVoies.length > 0) {
console.log(`Inserting ${currentCommune.codeCommune}`)
await mongo.db.collection('voies').insertMany(currentVoies)
}
await mongo.db.collection('communes').insertMany(communes)
cb()
} catch (error) {
cb(error)
}
},
objectMode: true
})
}
async function main() {
// Connect to database
await mongo.connect()
// Cleaning existing collections
await mongo.db.collection('voies').deleteMany({})
await mongo.db.collection('communes').deleteMany({})
const loader = createLoader(mongo)
createFantoirStream()
.pipe(createGunzip())
.pipe(createParser({accept: ['commune', 'voie']}))
.pipe(loader)
await finished(loader, {readable: false})
await mongo.disconnect()
process.exit(0)
}
main().catch(error => {
console.error(error)
process.exit(1)
})