refactor: replace event-stream with native Node streams

pull/353/merge
Romain 4 months ago
parent 8639c9d635
commit 9238a180fd

93
package-lock.json generated

@ -19,7 +19,6 @@
"chalk": "^4.1.0",
"command-exists": "^1.2.9",
"debug": "^4.3.4",
"event-stream": "^4.0.1",
"fs-extra": "^11.1.1",
"fs-readdir-recursive": "^1.1.0",
"handlebars": "^4.7.8",
@ -2042,11 +2041,6 @@
"node": ">=6.0.0"
}
},
"node_modules/duplexer": {
"version": "0.1.2",
"resolved": "https://registry.npmjs.org/duplexer/-/duplexer-0.1.2.tgz",
"integrity": "sha512-jtD6YG370ZCIi/9GTaJKQxWTZD045+4R4hTk/x1UyoqadyJ9x9CgSi1RlVDQF8U2sxLLSnFkCaMihqljHIWgMg=="
},
"node_modules/eastasianwidth": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz",
@ -2797,20 +2791,6 @@
"node": ">=0.10.0"
}
},
"node_modules/event-stream": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/event-stream/-/event-stream-4.0.1.tgz",
"integrity": "sha512-qACXdu/9VHPBzcyhdOWR5/IahhGMf0roTeZJfzz077GwylcDd90yOHLouhmv7GJ5XzPi6ekaQWd8AvPP2nOvpA==",
"dependencies": {
"duplexer": "^0.1.1",
"from": "^0.1.7",
"map-stream": "0.0.7",
"pause-stream": "^0.0.11",
"split": "^1.0.1",
"stream-combiner": "^0.2.2",
"through": "^2.3.8"
}
},
"node_modules/expand-template": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/expand-template/-/expand-template-2.0.3.tgz",
@ -2987,11 +2967,6 @@
"node": ">=8.0.0"
}
},
"node_modules/from": {
"version": "0.1.7",
"resolved": "https://registry.npmjs.org/from/-/from-0.1.7.tgz",
"integrity": "sha1-g8YK/Fi5xWmXAH7Rp2izqzA6RP4="
},
"node_modules/fs-constants": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz",
@ -4588,11 +4563,6 @@
"node": ">=10"
}
},
"node_modules/map-stream": {
"version": "0.0.7",
"resolved": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.7.tgz",
"integrity": "sha1-ih8HiW2CsQkmvTdEokIACfiJdKg="
},
"node_modules/micromatch": {
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.5.tgz",
@ -5451,14 +5421,6 @@
"node": ">=8"
}
},
"node_modules/pause-stream": {
"version": "0.0.11",
"resolved": "https://registry.npmjs.org/pause-stream/-/pause-stream-0.0.11.tgz",
"integrity": "sha1-/lo0sMvOErWqaitAPuLnO2AvFEU=",
"dependencies": {
"through": "~2.3"
}
},
"node_modules/picocolors": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.0.0.tgz",
@ -6325,15 +6287,6 @@
"node": "^12.22.0 || ^14.17.0 || >=16.0.0"
}
},
"node_modules/stream-combiner": {
"version": "0.2.2",
"resolved": "https://registry.npmjs.org/stream-combiner/-/stream-combiner-0.2.2.tgz",
"integrity": "sha1-rsjLrBd7Vrb0+kec7YwZEs7lKFg=",
"dependencies": {
"duplexer": "~0.1.1",
"through": "~2.3.4"
}
},
"node_modules/stream-mock": {
"version": "2.0.5",
"resolved": "https://registry.npmjs.org/stream-mock/-/stream-mock-2.0.5.tgz",
@ -8815,11 +8768,6 @@
"esutils": "^2.0.2"
}
},
"duplexer": {
"version": "0.1.2",
"resolved": "https://registry.npmjs.org/duplexer/-/duplexer-0.1.2.tgz",
"integrity": "sha512-jtD6YG370ZCIi/9GTaJKQxWTZD045+4R4hTk/x1UyoqadyJ9x9CgSi1RlVDQF8U2sxLLSnFkCaMihqljHIWgMg=="
},
"eastasianwidth": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz",
@ -9368,20 +9316,6 @@
"integrity": "sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==",
"dev": true
},
"event-stream": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/event-stream/-/event-stream-4.0.1.tgz",
"integrity": "sha512-qACXdu/9VHPBzcyhdOWR5/IahhGMf0roTeZJfzz077GwylcDd90yOHLouhmv7GJ5XzPi6ekaQWd8AvPP2nOvpA==",
"requires": {
"duplexer": "^0.1.1",
"from": "^0.1.7",
"map-stream": "0.0.7",
"pause-stream": "^0.0.11",
"split": "^1.0.1",
"stream-combiner": "^0.2.2",
"through": "^2.3.8"
}
},
"expand-template": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/expand-template/-/expand-template-2.0.3.tgz",
@ -9524,11 +9458,6 @@
"signal-exit": "^3.0.2"
}
},
"from": {
"version": "0.1.7",
"resolved": "https://registry.npmjs.org/from/-/from-0.1.7.tgz",
"integrity": "sha1-g8YK/Fi5xWmXAH7Rp2izqzA6RP4="
},
"fs-constants": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz",
@ -10694,11 +10623,6 @@
}
}
},
"map-stream": {
"version": "0.0.7",
"resolved": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.7.tgz",
"integrity": "sha1-ih8HiW2CsQkmvTdEokIACfiJdKg="
},
"micromatch": {
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.5.tgz",
@ -11346,14 +11270,6 @@
"integrity": "sha512-gDKb8aZMDeD/tZWs9P6+q0J9Mwkdl6xMV8TjnGP3qJVJ06bdMgkbBlLU8IdfOsIsFz2BW1rNVT3XuNEl8zPAvw==",
"dev": true
},
"pause-stream": {
"version": "0.0.11",
"resolved": "https://registry.npmjs.org/pause-stream/-/pause-stream-0.0.11.tgz",
"integrity": "sha1-/lo0sMvOErWqaitAPuLnO2AvFEU=",
"requires": {
"through": "~2.3"
}
},
"picocolors": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/picocolors/-/picocolors-1.0.0.tgz",
@ -11961,15 +11877,6 @@
"xdg-basedir": "^4.0.0"
}
},
"stream-combiner": {
"version": "0.2.2",
"resolved": "https://registry.npmjs.org/stream-combiner/-/stream-combiner-0.2.2.tgz",
"integrity": "sha1-rsjLrBd7Vrb0+kec7YwZEs7lKFg=",
"requires": {
"duplexer": "~0.1.1",
"through": "~2.3.4"
}
},
"stream-mock": {
"version": "2.0.5",
"resolved": "https://registry.npmjs.org/stream-mock/-/stream-mock-2.0.5.tgz",

@ -41,7 +41,6 @@
"chalk": "^4.1.0",
"command-exists": "^1.2.9",
"debug": "^4.3.4",
"event-stream": "^4.0.1",
"fs-extra": "^11.1.1",
"fs-readdir-recursive": "^1.1.0",
"handlebars": "^4.7.8",

@ -1,7 +1,7 @@
const os = require('node:os')
const stream = require('node:stream')
const _ = require('lodash')
const debug = require('debug')('thumbsup:debug')
const es = require('event-stream')
const exiftool = require('./stream.js')
/*
@ -19,5 +19,23 @@ exports.parse = (rootFolder, filePaths, concurrency) => {
return exiftool.parse(rootFolder, buckets[i])
})
// merge the object streams
return es.merge(streams)
return merge(streams)
}
function merge (streams) {
let ended = 0
const merged = new stream.PassThrough({ objectMode: true })
streams.forEach(s => {
s.pipe(merged, { end: false })
s.once('end', () => {
++ended
if (ended === streams.length) {
merged.emit('end')
}
})
s.once('error', (err) => {
merged.emit('error', err)
})
})
return merged
}

@ -2,7 +2,6 @@ const childProcess = require('node:child_process')
const trace = require('debug')('thumbsup:trace')
const debug = require('debug')('thumbsup:debug')
const error = require('debug')('thumbsup:error')
const es = require('event-stream')
const JSONStream = require('JSONStream')
/*
@ -29,6 +28,13 @@ exports.parse = (rootFolder, filePaths) => {
cwd: rootFolder,
stdio: ['pipe', 'pipe', 'pipe']
})
// stream <stdout> into a JSON parser
// parse every top-level object and emit it on the stream
const parser = JSONStream.parse([true])
child.stdout.pipe(parser)
// Error handling
child.on('error', (err) => {
error('Error: please verify that <exiftool> is installed on your system')
error(err.toString())
@ -36,7 +42,12 @@ exports.parse = (rootFolder, filePaths) => {
child.on('close', (code, signal) => {
debug(`Exiftool exited with code ${code}`)
})
parser.on('error', (err) => {
error('Error: failed to parse JSON from Exiftool output')
error(err.message)
})
// Print exiftool error messages if any
child.stderr.on('data', chunk => {
trace('Exiftool output:', chunk.toString())
})
@ -47,10 +58,5 @@ exports.parse = (rootFolder, filePaths) => {
child.stdin.write(allFiles + '\n')
child.stdin.end()
// stream <stdout> into a JSON parser
// parse every top-level object and emit it on the stream
return es.pipeline(
child.stdout,
JSONStream.parse([true])
)
return parser
}

Loading…
Cancel
Save