|
@@ -84,7 +84,10 @@ impl<'a> Stream for Cursor<'a> {
|
|
|
let db = this.db;
|
|
|
|
|
|
match check_future_call(&mut this.future_event, &this.filter, cx) {
|
|
|
- FutureValue::Found(event) => return Poll::Ready(Some(event)),
|
|
|
+ FutureValue::Found(event) => {
|
|
|
+ this.returned += 1;
|
|
|
+ return Poll::Ready(Some(event));
|
|
|
+ }
|
|
|
FutureValue::Pending => return Poll::Pending,
|
|
|
FutureValue::FoundNotMatch | FutureValue::Ended => {}
|
|
|
}
|
|
@@ -107,7 +110,10 @@ impl<'a> Stream for Cursor<'a> {
|
|
|
return if let Some(prefix) = this.prefixes.pop_front() {
|
|
|
this.future_event = Some(db.get_event(prefix));
|
|
|
match check_future_call(&mut this.future_event, &this.filter, cx) {
|
|
|
- FutureValue::Found(event) => Poll::Ready(Some(event)),
|
|
|
+ FutureValue::Found(event) => {
|
|
|
+ this.returned += 1;
|
|
|
+ Poll::Ready(Some(event))
|
|
|
+ }
|
|
|
FutureValue::Pending => Poll::Pending,
|
|
|
FutureValue::FoundNotMatch | FutureValue::Ended => continue,
|
|
|
}
|
|
@@ -126,12 +132,18 @@ impl<'a> Stream for Cursor<'a> {
|
|
|
|
|
|
this.future_event = Some(db.get_event(value));
|
|
|
match check_future_call(&mut this.future_event, &this.filter, cx) {
|
|
|
- FutureValue::Found(event) => Poll::Ready(Some(event)),
|
|
|
+ FutureValue::Found(event) => {
|
|
|
+ this.returned += 1;
|
|
|
+ Poll::Ready(Some(event))
|
|
|
+ }
|
|
|
FutureValue::Pending => Poll::Pending,
|
|
|
FutureValue::FoundNotMatch | FutureValue::Ended => continue,
|
|
|
}
|
|
|
}
|
|
|
- Some(Err(err)) => Poll::Ready(Some(Err(Error::Internal(err.to_string())))),
|
|
|
+ Some(Err(err)) => {
|
|
|
+ this.returned += 1;
|
|
|
+ Poll::Ready(Some(Err(Error::Internal(err.to_string()))))
|
|
|
+ }
|
|
|
None => {
|
|
|
let _ = this.select_next_prefix_using_secondary_index();
|
|
|
continue;
|