Parallel extraction!

pull/2/head
Dave Vasilevsky 14 years ago
parent a284a18616
commit 0357c6413c

@ -13,7 +13,7 @@ typedef struct {
FILE *gInFile = NULL;
lzma_stream gStream = LZMA_STREAM_INIT;
static lzma_check gCheck = LZMA_CHECK_NONE;
lzma_check gCheck = LZMA_CHECK_NONE;
void die(const char *fmt, ...) {

@ -50,6 +50,9 @@ struct file_index_t {
extern file_index_t *gFileIndex, *gLastFile;
// As discovered from footer
extern lzma_check gCheck;
void decode_index(void);
lzma_vli find_file_index(void **bdatap);

@ -1,40 +1,152 @@
#include "pixz.h"
/* TODO
* - parallel extraction
* - restrict to certain files
* - verify file-index matches archive contents
*/
typedef struct {
uint8_t *input, *output;
size_t insize, outsize;
} io_block_t;
static void *block_create(void);
static void block_free(void *data);
static void read_thread(void);
static void decode_thread(size_t thnum);
static FILE *gOutFile = NULL;
static lzma_vli gFileIndexOffset = 0;
static size_t gBlockInSize = 0, gBlockOutSize = 0;
static void set_block_sizes();
static size_t largest_block_size();
int main(int argc, char **argv) {
// TODO: Arguments?
gInFile = stdin;
gOutFile = stdout;
char *progname = argv[0];
if (argc == 1) {
gInFile = stdin;
gOutFile = stdout;
} else if (argc == 3) {
if (!(gInFile = fopen(argv[1], "r")))
die("Can't open input file");
if (!(gOutFile = fopen(argv[2], "w")))
die("Can't open output file");
} else {
die("Usage: %s [-t] [INPUT OUTPUT]", progname);
}
// Find block sizes
gFileIndexOffset = find_file_index(NULL);
set_block_sizes();
// Find largest block size
size_t blocksize = largest_block_size();
printf("block size: %zu\n", blocksize);
pipeline_create(block_create, block_free, read_thread, decode_thread);
pipeline_item_t *pi;
while ((pi = pipeline_merged())) {
io_block_t *ib = (io_block_t*)(pi->data);
fwrite(ib->output, ib->outsize, 1, gOutFile);
queue_push(gPipelineStartQ, PIPELINE_ITEM, pi);
}
pipeline_destroy();
return 0;
}
static size_t largest_block_size() {
// exclude the index block
lzma_vli index_offset = find_file_index(NULL);
static void *block_create(void) {
io_block_t *ib = malloc(sizeof(io_block_t));
ib->input = malloc(gBlockInSize);
ib->output = malloc(gBlockOutSize);
return ib;
}
static void block_free(void* data) {
io_block_t *ib = (io_block_t*)data;
free(ib->input);
free(ib->output);
free(ib);
}
static void set_block_sizes() {
lzma_index_iter iter;
lzma_index_iter_init(&iter, gIndex);
while (!lzma_index_iter_next(&iter, LZMA_INDEX_ITER_BLOCK)) {
// exclude the file index block
lzma_vli off = iter.block.compressed_file_offset;
if (gFileIndexOffset && off == gFileIndexOffset)
continue;
size_t in = iter.block.total_size,
out = iter.block.uncompressed_size;
if (out > gBlockOutSize)
gBlockOutSize = out;
if (in > gBlockInSize)
gBlockInSize = in;
}
}
static void read_thread(void) {
off_t offset = ftello(gInFile);
size_t largest = 0;
lzma_index_iter iter;
lzma_index_iter_init(&iter, gIndex);
while (!lzma_index_iter_next(&iter, LZMA_INDEX_ITER_BLOCK)) {
if (index_offset && iter.block.compressed_file_offset == index_offset)
size_t boffset = iter.block.compressed_file_offset;
if (boffset == gFileIndexOffset)
continue;
if (iter.block.uncompressed_size > largest)
largest = iter.block.uncompressed_size;
pipeline_item_t *pi;
queue_pop(gPipelineStartQ, (void**)&pi);
io_block_t *ib = (io_block_t*)(pi->data);
if (offset != boffset) {
fseeko(gInFile, boffset, SEEK_SET);
offset = boffset;
}
size_t bsize = iter.block.total_size;
ib->insize = fread(ib->input, 1, bsize, gInFile);
if (ib->insize < bsize)
die("Error reading block contents");
pipeline_split(pi);
}
return largest;
pipeline_stop();
}
static void decode_thread(size_t thnum) {
lzma_stream stream = LZMA_STREAM_INIT;
lzma_filter filters[LZMA_FILTERS_MAX + 1];
lzma_block block = { .filters = filters, .check = gCheck, .version = 0 };
pipeline_item_t *pi;
io_block_t *ib;
while (PIPELINE_STOP != queue_pop(gPipelineSplitQ, (void**)&pi)) {
ib = (io_block_t*)(pi->data);
block.header_size = lzma_block_header_size_decode(*(ib->input));
if (lzma_block_header_decode(&block, NULL, ib->input) != LZMA_OK)
die("Error decoding block header");
if (lzma_block_decoder(&stream, &block) != LZMA_OK)
die("Error initializing block decode");
stream.avail_in = ib->insize - block.header_size;
stream.next_in = ib->input + block.header_size;
stream.avail_out = gBlockOutSize;
stream.next_out = ib->output;
lzma_ret err = LZMA_OK;
while (err != LZMA_STREAM_END) {
if (err != LZMA_OK)
die("Error decoding block");
err = lzma_code(&stream, LZMA_FINISH);
}
ib->outsize = stream.next_out - ib->output;
queue_push(gPipelineMergeQ, PIPELINE_ITEM, pi);
}
lzma_end(&stream);
}

@ -7,11 +7,6 @@
#pragma mark TYPES
typedef enum {
MSG_BLOCK,
MSG_STOP,
} msg_type_t;
typedef struct io_block_t io_block_t;
struct io_block_t {
lzma_block block;
@ -202,7 +197,7 @@ static void read_thread() {
if (gReadBlock->insize)
pipeline_split(gReadItem);
else
queue_push(gPipelineStartQ, MSG_BLOCK, gReadItem);
queue_push(gPipelineStartQ, PIPELINE_ITEM, gReadItem);
gReadItem = NULL;
}
@ -297,7 +292,7 @@ static void encode_thread(size_t thnum) {
while (true) {
pipeline_item_t *pi;
int msg = queue_pop(gPipelineSplitQ, (void**)&pi);
if (msg == MSG_STOP)
if (msg == PIPELINE_STOP)
break;
debug("encoder %zu: received %zu", thnum, pi->seq);

Loading…
Cancel
Save