Goal
I wanted a very small background worker in Python with this flow:
- Main app starts.
- Main app submits a message (
str). - Background daemon thread consumes from
queue.Queue. - Main app stops the worker cleanly.
Then I wanted to test failure conditions around submit().
Minimal Worker
from __future__ import annotations
import queue
import threading
import time
class BackgroundWorker:
def __init__(self, *, maxsize: int = 0) -> None:
self._queue: queue.Queue[str | None] = queue.Queue(maxsize=maxsize)
self._thread = threading.Thread(target=self._run, daemon=True, name="bg-worker")
self._started = False
self._lock = threading.Lock()
def start(self) -> None:
with self._lock:
if self._started:
print("[main] start() called again; worker is already running")
return
self._thread.start()
self._started = True
def submit(self, message: str) -> None:
self._queue.put(message)
def stop(self) -> None:
self._queue.put(None) # Sentinel
self._thread.join()
def _run(self) -> None:
while True:
message = self._queue.get()
if message is None:
print("[worker] stop signal received")
break
print(f"[worker] processing message: {message}")
time.sleep(0.1)
print("[worker] done")
Why start() should be idempotent
Calling threading.Thread.start() twice on the same thread object raises:
RuntimeError: threads can only be started once
If multiple callers may invoke start(), protect it with a lock and a _started flag.
Can submit() hang forever?
Short answer: yes.
- With default
Queue(maxsize=0)(unbounded),put()usually does not block. - With bounded queue (
maxsize > 0),put(block=True)blocks when full. - If consumer is dead/hung/not started, the queue may never drain.
- Then
submit()can block forever.
Reproduction with the worker class
worker = BackgroundWorker(maxsize=1)
# consumer is NOT started -> queue is never consumed
worker.submit("first") # fills queue
worker.submit("second") # blocks forever
In my local test, I put the second submit in another thread and observed it still blocked after 0.5s.
Practical ways to avoid infinite blocking
1. put_nowait()
self._queue.put_nowait(message)
- Never blocks
- Raises
queue.Fullimmediately - Good when caller can retry, drop, or degrade fast
2. put(..., timeout=...)
self._queue.put(message, timeout=0.5)
- Bounded wait
- Raises
queue.Fullafter timeout - Good default for backpressure without hanging forever
3. Buffer / spool when loss is unacceptable
If message loss is not allowed, persist overflow somewhere else and replay later:
- in-memory ring buffer
- local file
- database / external queue
This adds complexity (ordering, dedupe, retry), but avoids silent loss.
Recommended baseline
For most applications, start here:
- Keep
start()idempotent under lock. - Use bounded queue (
maxsize) to prevent unbounded memory growth. - Use
put(timeout=...)insubmit(). - On timeout, return error / metric / retry decision explicitly.
Example submit():
import queue
def submit(self, message: str, timeout: float = 0.5) -> bool:
try:
self._queue.put(message, timeout=timeout)
return True
except queue.Full:
# log metric, fallback buffer, or drop policy
return False
Takeaway
A background daemon worker with Queue is easy to build, but reliability depends on queue policy.
The subtle bug is not thread startup. The subtle bug is producer blocking behavior when consumers stop.
Define your backpressure strategy explicitly from the beginning.