always accumulate as much as possible
This commit is contained in:
parent
b26a4e7646
commit
853d25c51a
1 changed files with 14 additions and 12 deletions
|
@ -45,10 +45,10 @@ def ip_reader_worker(fp, queue):
|
||||||
|
|
||||||
class IpReader:
|
class IpReader:
|
||||||
def __init__(self, *paths_and_handles):
|
def __init__(self, *paths_and_handles):
|
||||||
from queue import Queue
|
from queue import SimpleQueue
|
||||||
|
|
||||||
self.fps = paths_and_handles
|
self.fps = paths_and_handles
|
||||||
self.queue = Queue()
|
self.queue = SimpleQueue()
|
||||||
self.threads = []
|
self.threads = []
|
||||||
self.total = 0
|
self.total = 0
|
||||||
|
|
||||||
|
@ -56,23 +56,25 @@ class IpReader:
|
||||||
return any(thread.is_alive() for thread in self.threads)
|
return any(thread.is_alive() for thread in self.threads)
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
from queue import Empty
|
from queue import SimpleQueue, Empty
|
||||||
from sys import stderr
|
from sys import stderr
|
||||||
|
|
||||||
def _next():
|
def _next():
|
||||||
|
results = SimpleQueue()
|
||||||
while self.is_running() or not self.queue.empty():
|
while self.is_running() or not self.queue.empty():
|
||||||
results = []
|
if self.queue.empty() and results.empty():
|
||||||
if self.queue.empty():
|
|
||||||
try:
|
try:
|
||||||
results.append(self.queue.get(timeout=1.0))
|
results.put(self.queue.get(timeout=1.0))
|
||||||
|
self.total += 1
|
||||||
except Empty:
|
except Empty:
|
||||||
print("blocking on IpReader", file=stderr)
|
print("blocking on IpReader", file=stderr)
|
||||||
else:
|
|
||||||
while not self.queue.empty():
|
while not self.queue.empty():
|
||||||
results.append(self.queue.get())
|
results.put(self.queue.get())
|
||||||
self.total += len(results)
|
self.total += 1
|
||||||
for res in results:
|
if not results.empty():
|
||||||
yield res
|
yield results.get()
|
||||||
|
while not results.empty():
|
||||||
|
yield results.get()
|
||||||
|
|
||||||
return _next()
|
return _next()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue