Streaming read works

stream
Dave Vasilevsky 12 years ago
parent 70a3c58520
commit 4474af5419

@ -53,6 +53,8 @@ static void tar_write_last(void);
#pragma mark DECLARE READ BUFFER
#define STREAMSIZE (1024 * 1024)
static pipeline_item_t *gRbufPI = NULL;
static io_block_t *gRbuf = NULL;
@ -66,6 +68,8 @@ static rbuf_read_status rbuf_read(size_t bytes);
static void rbuf_consume(size_t bytes);
static void rbuf_dispatch(void);
static void read_streaming(lzma_block *block);
#pragma mark DECLARE UTILS
@ -287,11 +291,57 @@ static void rbuf_dispatch(void) {
gRbuf = NULL;
}
static void read_streaming(lzma_block *block) {
lzma_stream stream = LZMA_STREAM_INIT;
if (lzma_block_decoder(&stream, block) != LZMA_OK)
die("Error initializing streaming block decode");
stream.next_in = gRbuf->input + block->header_size;
stream.avail_in = gRbuf->insize - block->header_size;
stream.avail_out = 0;
pipeline_item_t *pi = NULL;
io_block_t *ib = NULL;
lzma_ret err = LZMA_OK;
while (err != LZMA_STREAM_END) {
if (err != LZMA_OK)
die("Error decoding streaming block");
if (stream.avail_out == 0) {
if (ib) {
ib->outsize = ib->outcap;
pipeline_dispatch(pi, gPipelineMergeQ);
}
queue_pop(gPipelineStartQ, (void**)&pi);
ib = (io_block_t*)pi->data;
block_capacity(ib, 0, STREAMSIZE);
stream.next_out = ib->output;
stream.avail_out = ib->outcap;
}
if (stream.avail_in == 0) {
rbuf_consume(gRbuf->insize);
if (rbuf_read(CHUNKSIZE) < RBUF_PART)
die("Error reading streaming block contents");
stream.next_in = gRbuf->input;
stream.avail_in = gRbuf->insize;
}
err = lzma_code(&stream, LZMA_RUN);
}
if (ib && stream.avail_out != ib->outcap) {
ib->outsize = ib->outcap - stream.avail_out;
pipeline_dispatch(pi, gPipelineMergeQ);
}
rbuf_consume(gRbuf->insize - stream.avail_in);
lzma_end(&stream);
}
static void read_thread_noindex(void) {
size_t bytes;
lzma_ret err;
// Read the header
// Stream header
uint8_t stream_header[LZMA_STREAM_HEADER_SIZE];
bytes = fread(stream_header, 1, LZMA_STREAM_HEADER_SIZE, gInFile);
if (bytes != LZMA_STREAM_HEADER_SIZE)
@ -307,13 +357,11 @@ static void read_thread_noindex(void) {
lzma_filter filters[LZMA_FILTERS_MAX + 1];
lzma_block block = { .filters = filters, .check = gCheck, .version = 0 };
while (true) {
// Check for index
if (rbuf_read(1) != RBUF_FULL)
die("Error reading block header size");
if (gRbuf->input[0] == 0)
break; // Found the index. FIXME: multi-stream?
// Decode header
block.header_size = lzma_block_header_size_decode(gRbuf->input[0]);
if (block.header_size > LZMA_BLOCK_HEADER_SIZE_MAX)
die("Block header size too large");
@ -323,14 +371,16 @@ static void read_thread_noindex(void) {
die("Error decoding block header");
size_t comp = block.compressed_size, outsize = block.uncompressed_size;
if (comp == LZMA_VLI_UNKNOWN || outsize == LZMA_VLI_UNKNOWN)
die("No sizes in header!!!"); // FIXME: streaming; file index
block_capacity(gRbuf, 0, outsize);
gRbuf->outsize = outsize;
if (comp == LZMA_VLI_UNKNOWN || outsize == LZMA_VLI_UNKNOWN) {
read_streaming(&block);
} else {
block_capacity(gRbuf, 0, outsize);
gRbuf->outsize = outsize;
if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL)
die("Error reading block contents");
rbuf_dispatch();
if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL)
die("Error reading block contents");
rbuf_dispatch();
}
}
pipeline_stop();

Loading…
Cancel
Save