avformat/async: cache some data for fast seek backward

Signed-off-by: Michael Niedermayer <michael@niedermayer.cc>
This commit is contained in:
Zhang Rui 2015-10-13 18:30:47 +08:00 committed by Michael Niedermayer
parent 87ff61b9ab
commit 6c7f289fab

View File

@ -24,7 +24,6 @@
/**
* @TODO
* support timeout
* support backward short seek
* support work with concatdec, hls
*/
@ -43,8 +42,17 @@
#endif
#define BUFFER_CAPACITY (4 * 1024 * 1024)
#define READ_BACK_CAPACITY (4 * 1024 * 1024)
#define SHORT_SEEK_THRESHOLD (256 * 1024)
typedef struct RingBuffer
{
AVFifoBuffer *fifo;
int read_back_capacity;
int read_pos;
} RingBuffer;
typedef struct Context {
AVClass *class;
URLContext *inner;
@ -61,7 +69,7 @@ typedef struct Context {
int64_t logical_pos;
int64_t logical_size;
AVFifoBuffer *fifo;
RingBuffer ring;
pthread_cond_t cond_wakeup_main;
pthread_cond_t cond_wakeup_background;
@ -72,6 +80,73 @@ typedef struct Context {
AVIOInterruptCB interrupt_callback;
} Context;
static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
{
memset(ring, 0, sizeof(RingBuffer));
ring->fifo = av_fifo_alloc(capacity + read_back_capacity);
if (!ring->fifo)
return AVERROR(ENOMEM);
ring->read_back_capacity = read_back_capacity;
return 0;
}
static void ring_destroy(RingBuffer *ring)
{
av_fifo_freep(&ring->fifo);
}
static void ring_reset(RingBuffer *ring)
{
av_fifo_reset(ring->fifo);
ring->read_pos = 0;
}
static int ring_size(RingBuffer *ring)
{
return av_fifo_size(ring->fifo) - ring->read_pos;
}
static int ring_space(RingBuffer *ring)
{
return av_fifo_space(ring->fifo);
}
static int ring_generic_read(RingBuffer *ring, void *dest, int buf_size, void (*func)(void*, void*, int))
{
int ret;
av_assert2(buf_size <= ring_size(ring));
ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func);
ring->read_pos += buf_size;
if (ring->read_pos > ring->read_back_capacity) {
av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity);
ring->read_pos = ring->read_back_capacity;
}
return ret;
}
static int ring_generic_write(RingBuffer *ring, void *src, int size, int (*func)(void*, void*, int))
{
av_assert2(size <= ring_space(ring));
return av_fifo_generic_write(ring->fifo, src, size, func);
}
static int ring_size_of_read_back(RingBuffer *ring)
{
return ring->read_pos;
}
static int ring_drain(RingBuffer *ring, int offset)
{
av_assert2(offset >= -ring_size_of_read_back(ring));
av_assert2(offset <= -ring_size(ring));
ring->read_pos += offset;
return 0;
}
static int async_check_interrupt(void *arg)
{
URLContext *h = arg;
@ -102,7 +177,7 @@ static void *async_buffer_task(void *arg)
{
URLContext *h = arg;
Context *c = h->priv_data;
AVFifoBuffer *fifo = c->fifo;
RingBuffer *ring = &c->ring;
int ret = 0;
int64_t seek_ret;
@ -132,14 +207,14 @@ static void *async_buffer_task(void *arg)
c->seek_ret = seek_ret;
c->seek_request = 0;
av_fifo_reset(fifo);
ring_reset(ring);
pthread_cond_signal(&c->cond_wakeup_main);
pthread_mutex_unlock(&c->mutex);
continue;
}
fifo_space = av_fifo_space(fifo);
fifo_space = ring_space(ring);
if (c->io_eof_reached || fifo_space <= 0) {
pthread_cond_signal(&c->cond_wakeup_main);
pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
@ -149,7 +224,7 @@ static void *async_buffer_task(void *arg)
pthread_mutex_unlock(&c->mutex);
to_copy = FFMIN(4096, fifo_space);
ret = av_fifo_generic_write(fifo, (void *)h, to_copy, (void *)wrapped_url_read);
ret = ring_generic_write(ring, (void *)h, to_copy, (void *)wrapped_url_read);
pthread_mutex_lock(&c->mutex);
if (ret <= 0) {
@ -173,11 +248,9 @@ static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **
av_strstart(arg, "async:", &arg);
c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
if (!c->fifo) {
ret = AVERROR(ENOMEM);
ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
if (ret < 0)
goto fifo_fail;
}
/* wrap interrupt callback */
c->interrupt_callback = h->interrupt_callback;
@ -225,7 +298,7 @@ cond_wakeup_main_fail:
mutex_fail:
ffurl_close(c->inner);
url_fail:
av_fifo_freep(&c->fifo);
ring_destroy(&c->ring);
fifo_fail:
return ret;
}
@ -248,7 +321,7 @@ static int async_close(URLContext *h)
pthread_cond_destroy(&c->cond_wakeup_main);
pthread_mutex_destroy(&c->mutex);
ffurl_close(c->inner);
av_fifo_freep(&c->fifo);
ring_destroy(&c->ring);
return 0;
}
@ -257,7 +330,7 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com
void (*func)(void*, void*, int))
{
Context *c = h->priv_data;
AVFifoBuffer *fifo = c->fifo;
RingBuffer *ring = &c->ring;
int to_read = size;
int ret = 0;
@ -269,10 +342,10 @@ static int async_read_internal(URLContext *h, void *dest, int size, int read_com
ret = AVERROR_EXIT;
break;
}
fifo_size = av_fifo_size(fifo);
fifo_size = ring_size(ring);
to_copy = FFMIN(to_read, fifo_size);
if (to_copy > 0) {
av_fifo_generic_read(fifo, dest, to_copy, func);
ring_generic_read(ring, dest, to_copy, func);
if (!func)
dest = (uint8_t *)dest + to_copy;
c->logical_pos += to_copy;
@ -312,10 +385,11 @@ static void fifo_do_not_copy_func(void* dest, void* src, int size) {
static int64_t async_seek(URLContext *h, int64_t pos, int whence)
{
Context *c = h->priv_data;
AVFifoBuffer *fifo = c->fifo;
RingBuffer *ring = &c->ring;
int64_t ret;
int64_t new_logical_pos;
int fifo_size;
int fifo_size_of_read_back;
if (whence == AVSEEK_SIZE) {
av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
@ -332,17 +406,28 @@ static int64_t async_seek(URLContext *h, int64_t pos, int whence)
if (new_logical_pos < 0)
return AVERROR(EINVAL);
fifo_size = av_fifo_size(fifo);
fifo_size = ring_size(ring);
fifo_size_of_read_back = ring_size_of_read_back(ring);
if (new_logical_pos == c->logical_pos) {
/* current position */
return c->logical_pos;
} else if ((new_logical_pos > c->logical_pos) &&
} else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) &&
(new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
int pos_delta = (int)(new_logical_pos - c->logical_pos);
/* fast seek */
av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
new_logical_pos, (int)c->logical_pos,
(int)(new_logical_pos - c->logical_pos), fifo_size);
async_read_internal(h, NULL, (int)(new_logical_pos - c->logical_pos), 1, fifo_do_not_copy_func);
if (pos_delta > 0) {
// fast seek forwards
async_read_internal(h, NULL, pos_delta, 1, fifo_do_not_copy_func);
} else {
// fast seek backwards
ring_drain(ring, pos_delta);
c->logical_pos = new_logical_pos;
}
return c->logical_pos;
} else if (c->logical_size <= 0) {
/* can not seek */