class Fetch:
"""Execute UID FETCH in chunks and yield parsed entries."""
CHUNK_SIZE = 100
def __init__(
self,
*,
client: imaplib.IMAP4_SSL,
mailbox: MailBox,
spec: F,
chunk_size: int = CHUNK_SIZE,
) -> None:
self._client = client
self._mailbox = mailbox
self._spec = spec.mount()
self._chunk_size = chunk_size
def _select(self) -> None:
from email_profile.clients.imap.mailbox import _quote
Status.state(self._client.select(_quote(self._mailbox.name)))
def _fetch_chunk(self, group: list[str]) -> list:
return Status.state(
self._client.uid("fetch", ",".join(group), self._spec)
)
def chunks(self, uids: list[str]) -> Iterator[list]:
"""Yield raw IMAP response lists per chunk."""
self._select()
fetch = with_retry()(self._fetch_chunk)
for start in range(0, len(uids), self._chunk_size):
group = uids[start : start + self._chunk_size]
yield fetch(group)
def parsed(self, uids: list[str]) -> Iterator[FetchParser]:
"""Yield parsed entries with resolved flags."""
for fetched in self.chunks(uids):
yield from FetchParser.iter_entries(fetched)
@staticmethod
def fetch_message_ids(
client: object, uids: list[str], chunk_size: int = 500
) -> dict[str, str]:
"""Fetch Message-IDs from server. Returns {uid: message_id}."""
result: dict[str, str] = {}
for start in range(0, len(uids), chunk_size):
group = uids[start : start + chunk_size]
status, fetched = client.uid(
"fetch",
",".join(group),
"(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])",
)
if status != "OK":
continue
for entry in fetched:
if not FetchParser.is_valid(entry):
continue
d = FetchParser(entry)
uid = d._parse_uid()
msg_id = d._parse_message_id()
if uid and msg_id:
result[uid] = msg_id
return result