Skip to content

FetchParser

email_profile.clients.imap.parser.FetchParser

Decodes one IMAP fetch response entry.

Source code in email_profile/clients/imap/parser.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class FetchParser:
    """Decodes one IMAP fetch response entry."""

    __slots__ = ("_header", "_body", "uid", "flags", "message_id")

    def __init__(self, entry: tuple) -> None:
        self._header = entry[0] if len(entry) > 0 else b""
        self._body = entry[1] if len(entry) > 1 else b""
        self.uid: str = ""
        self.flags: str = ""
        self.message_id: str = ""

    @staticmethod
    def is_valid(entry: object) -> bool:
        return isinstance(entry, tuple) and len(entry) >= 2

    def _parse_uid(self) -> Optional[str]:
        text = self._decode(self._header)
        match = _UID_RE.search(text)

        return match.group(1) if match else None

    def _parse_header_flags(self) -> Optional[str]:
        text = self._decode(self._header)
        match = _FLAGS_RE.search(text)
        return match.group(2) if match else None

    def _parse_header_flags_with_uid(self) -> Optional[tuple[str, str]]:
        text = self._decode(self._header)
        match = _FLAGS_RE.search(text)
        if match:
            return match.group(1), match.group(2)
        return None

    def _parse_message_id(self) -> Optional[str]:
        text = self._decode(self._body)

        for line in text.splitlines():
            if line.lower().startswith("message-id:"):
                value = line.split(":", 1)[1].strip()
                return value or None

        return None

    def _resolve_message_id(self) -> str:
        msg = message_from_bytes(self._body)
        mid = msg.get("Message-ID")

        if mid:
            return mid

        return sha256(self._body).hexdigest()

    def raw(self) -> bytes:
        return self._body

    def text(self) -> str:
        return self._decode(self._body)

    def date(self) -> Optional[datetime]:
        msg = message_from_bytes(self._body)
        raw_date = msg.get("Date")

        if not raw_date:
            return None

        try:
            return parsedate_to_datetime(raw_date)
        except (TypeError, ValueError):
            return None

    def _resolve_flags(self, trailing: object = None) -> str:
        """Return flags from the header, falling back to a trailing element."""
        flags = self._parse_header_flags() or ""

        if not flags and isinstance(trailing, bytes):
            parsed = self.parse_flags(self._decode(trailing))
            if parsed:
                flags = parsed[1]

        return flags

    @staticmethod
    def _decode(data: bytes, errors: str = "replace") -> str:
        if isinstance(data, bytes):
            return data.decode("utf-8", errors=errors)
        return str(data)

    @staticmethod
    def parse_flags(text: str) -> Optional[tuple[str, str]]:
        """Parse FLAGS from a raw bytes or text entry."""
        match = _FLAGS_RE.search(text)
        if match:
            return match.group(1), match.group(2)

        match = _FLAGS_ONLY_RE.search(text)
        if match:
            return "", match.group(1)

        return None

    @staticmethod
    def iter_entries(fetched: list) -> Iterator[FetchParser]:
        """Yield parsed entries with resolved flags."""
        offset = 0

        while offset < len(fetched):
            entry = fetched[offset]
            offset += 1

            if not FetchParser.is_valid(entry):
                continue

            parsed_entry = FetchParser(entry)
            uid = parsed_entry._parse_uid()
            if uid is None:
                continue

            trailing = fetched[offset] if offset < len(fetched) else None
            parsed_entry.uid = uid
            parsed_entry.flags = parsed_entry._resolve_flags(trailing)
            parsed_entry.message_id = parsed_entry._resolve_message_id()

            if trailing is not None and isinstance(trailing, bytes):
                offset += 1

            yield parsed_entry

__slots__ = ('_header', '_body', 'uid', 'flags', 'message_id') class-attribute instance-attribute

flags = '' instance-attribute

message_id = '' instance-attribute

uid = '' instance-attribute

__init__(entry)

Source code in email_profile/clients/imap/parser.py
23
24
25
26
27
28
def __init__(self, entry: tuple) -> None:
    self._header = entry[0] if len(entry) > 0 else b""
    self._body = entry[1] if len(entry) > 1 else b""
    self.uid: str = ""
    self.flags: str = ""
    self.message_id: str = ""

date()

Source code in email_profile/clients/imap/parser.py
77
78
79
80
81
82
83
84
85
86
87
def date(self) -> Optional[datetime]:
    msg = message_from_bytes(self._body)
    raw_date = msg.get("Date")

    if not raw_date:
        return None

    try:
        return parsedate_to_datetime(raw_date)
    except (TypeError, ValueError):
        return None

is_valid(entry) staticmethod

Source code in email_profile/clients/imap/parser.py
30
31
32
@staticmethod
def is_valid(entry: object) -> bool:
    return isinstance(entry, tuple) and len(entry) >= 2

iter_entries(fetched) staticmethod

Yield parsed entries with resolved flags.

Source code in email_profile/clients/imap/parser.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@staticmethod
def iter_entries(fetched: list) -> Iterator[FetchParser]:
    """Yield parsed entries with resolved flags."""
    offset = 0

    while offset < len(fetched):
        entry = fetched[offset]
        offset += 1

        if not FetchParser.is_valid(entry):
            continue

        parsed_entry = FetchParser(entry)
        uid = parsed_entry._parse_uid()
        if uid is None:
            continue

        trailing = fetched[offset] if offset < len(fetched) else None
        parsed_entry.uid = uid
        parsed_entry.flags = parsed_entry._resolve_flags(trailing)
        parsed_entry.message_id = parsed_entry._resolve_message_id()

        if trailing is not None and isinstance(trailing, bytes):
            offset += 1

        yield parsed_entry

parse_flags(text) staticmethod

Parse FLAGS from a raw bytes or text entry.

Source code in email_profile/clients/imap/parser.py
106
107
108
109
110
111
112
113
114
115
116
117
@staticmethod
def parse_flags(text: str) -> Optional[tuple[str, str]]:
    """Parse FLAGS from a raw bytes or text entry."""
    match = _FLAGS_RE.search(text)
    if match:
        return match.group(1), match.group(2)

    match = _FLAGS_ONLY_RE.search(text)
    if match:
        return "", match.group(1)

    return None

raw()

Source code in email_profile/clients/imap/parser.py
71
72
def raw(self) -> bytes:
    return self._body

text()

Source code in email_profile/clients/imap/parser.py
74
75
def text(self) -> str:
    return self._decode(self._body)