combine/stream/buffered.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
use std::collections::VecDeque;
use error::StreamError;
use stream::{Positioned, Resetable, StreamErrorFor, StreamOnce};
/// `Stream` which buffers items from an instance of `StreamOnce` into a ring buffer.
/// Instances of `StreamOnce` which is not able to implement `Resetable` (such as `ReadStream`) may
/// use this as a way to implement `Resetable` and become a full `Stream` instance.
///
/// The drawback is that the buffer only stores a limited number of items which limits how many
/// tokens that can be reset and replayed. If a `BufferedStream` is reset past this limit an error
/// will be returned when `uncons` is next called.
///
/// NOTE: If this stream is used in conjunction with an error enhancing stream such as
/// `easy::Stream` (also via the `easy_parser` method) it is recommended that the `BufferedStream`
/// instance wraps the `easy::Stream` instance instead of the other way around.
///
/// ```ignore
/// // DO
/// BufferedStream::new(easy::Stream(..), ..)
/// // DON'T
/// easy::Stream(BufferedStream::new(.., ..))
/// parser.easy_parse(BufferedStream::new(..));
/// ```
#[derive(Debug, PartialEq)]
pub struct BufferedStream<I>
where
I: StreamOnce + Positioned,
{
offset: usize,
iter: I,
buffer_offset: usize,
buffer: VecDeque<(I::Item, I::Position)>,
}
impl<I> Resetable for BufferedStream<I>
where
I: Positioned,
{
type Checkpoint = usize;
fn checkpoint(&self) -> Self::Checkpoint {
self.offset
}
fn reset(&mut self, checkpoint: Self::Checkpoint) {
self.offset = checkpoint;
}
}
impl<I> BufferedStream<I>
where
I: StreamOnce + Positioned,
I::Position: Clone,
I::Item: Clone,
{
/// Constructs a new `BufferedStream` from a `StreamOnce` instance with a `lookahead`
/// number of elements that can be stored in the buffer.
pub fn new(iter: I, lookahead: usize) -> BufferedStream<I> {
BufferedStream {
offset: 0,
iter: iter,
buffer_offset: 0,
buffer: VecDeque::with_capacity(lookahead),
}
}
}
impl<I> Positioned for BufferedStream<I>
where
I: StreamOnce + Positioned,
{
#[inline(always)]
fn position(&self) -> Self::Position {
if self.offset >= self.buffer_offset {
self.iter.position()
} else if self.offset < self.buffer_offset - self.buffer.len() {
self.buffer
.front()
.expect("At least 1 element in the buffer")
.1
.clone()
} else {
self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
.1
.clone()
}
}
}
impl<I> StreamOnce for BufferedStream<I>
where
I: StreamOnce + Positioned,
I::Item: Clone + PartialEq,
{
type Item = I::Item;
type Range = I::Range;
type Position = I::Position;
type Error = I::Error;
#[inline]
fn uncons(&mut self) -> Result<I::Item, StreamErrorFor<Self>> {
if self.offset >= self.buffer_offset {
let position = self.iter.position();
let item = try!(self.iter.uncons());
self.buffer_offset += 1;
// We want the VecDeque to only keep the last .capacity() elements so we need to remove
// an element if it gets to large
if self.buffer.len() == self.buffer.capacity() {
self.buffer.pop_front();
}
self.buffer.push_back((item.clone(), position.clone()));
self.offset += 1;
Ok(item)
} else if self.offset < self.buffer_offset - self.buffer.len() {
// We have backtracked to far
Err(StreamError::message_static_message(
"Backtracked to far".into(),
))
} else {
let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
.0
.clone();
self.offset += 1;
Ok(value)
}
}
fn is_partial(&self) -> bool {
self.iter.is_partial()
}
}