Skip to content

StorageSQLite

email_profile.storage.sqlite.StorageSQLite

Bases: StorageABC

SQLite persistence backed by a single raw table.

Source code in email_profile/storage/sqlite.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class StorageSQLite(StorageABC):
    """SQLite persistence backed by a single ``raw`` table."""

    def __init__(self, url: Union[str, Path] = "./email.db") -> None:
        self.url = self._coerce_url(url)
        self._engine, self._session_factory = make_session(self.url)
        Base.metadata.create_all(bind=self._engine)

    @staticmethod
    def _coerce_url(value: Union[str, Path]) -> str:
        if isinstance(value, Path):
            return f"sqlite:///{value}"

        if "://" in value:
            return value

        return f"sqlite:///{value}"

    def ids(self, mailbox: Optional[str] = None) -> set[str]:
        """Return stored email IDs."""
        with self._session_factory() as session:
            query = session.query(RawModel.message_id)
            if mailbox:
                query = query.filter(RawModel.mailbox == mailbox)
            return {row[0] for row in query.all()}

    def uids(self, mailbox: str) -> set[str]:
        """Return stored IMAP UIDs for a mailbox."""
        with self._session_factory() as session:
            return {
                row[0]
                for row in session.query(RawModel.uid)
                .filter(RawModel.mailbox == mailbox)
                .all()
            }

    def save(self, raw: RawSerializer) -> bool:
        """Persist the RFC822 source. Returns True if inserted, False if updated."""
        with self._session_factory() as session:
            exists = (
                session.query(RawModel.uid)
                .filter(
                    RawModel.uid == raw.uid,
                    RawModel.mailbox == raw.mailbox,
                )
                .first()
                is not None
            )

            stmt = sqlite_insert(RawModel).values(
                message_id=raw.message_id,
                uid=raw.uid,
                mailbox=raw.mailbox,
                flags=raw.flags,
                file=raw.file,
            )
            stmt = stmt.on_conflict_do_update(
                index_elements=["uid", "mailbox"],
                set_={
                    "message_id": stmt.excluded.message_id,
                    "flags": stmt.excluded.flags,
                    "file": stmt.excluded.file,
                },
            )
            session.execute(stmt)
            session.commit()
            return not exists

    def get(self, message_id: str) -> Optional[RawSerializer]:
        """Retrieve the RFC822 source by email id."""
        with self._session_factory() as session:
            row = (
                session.query(RawModel)
                .filter(RawModel.message_id == message_id)
                .first()
            )

            if row is None:
                return None

            return RawSerializer(
                message_id=row.message_id,
                uid=row.uid,
                mailbox=row.mailbox,
                file=row.file,
                flags=row.flags,
            )

    def __repr__(self) -> str:
        return f"StorageSQLite(url={self.url!r})"

url = self._coerce_url(url) instance-attribute

__init__(url='./email.db')

Source code in email_profile/storage/sqlite.py
22
23
24
25
def __init__(self, url: Union[str, Path] = "./email.db") -> None:
    self.url = self._coerce_url(url)
    self._engine, self._session_factory = make_session(self.url)
    Base.metadata.create_all(bind=self._engine)

__repr__()

Source code in email_profile/storage/sqlite.py
107
108
def __repr__(self) -> str:
    return f"StorageSQLite(url={self.url!r})"

get(message_id)

Retrieve the RFC822 source by email id.

Source code in email_profile/storage/sqlite.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get(self, message_id: str) -> Optional[RawSerializer]:
    """Retrieve the RFC822 source by email id."""
    with self._session_factory() as session:
        row = (
            session.query(RawModel)
            .filter(RawModel.message_id == message_id)
            .first()
        )

        if row is None:
            return None

        return RawSerializer(
            message_id=row.message_id,
            uid=row.uid,
            mailbox=row.mailbox,
            file=row.file,
            flags=row.flags,
        )

ids(mailbox=None)

Return stored email IDs.

Source code in email_profile/storage/sqlite.py
37
38
39
40
41
42
43
def ids(self, mailbox: Optional[str] = None) -> set[str]:
    """Return stored email IDs."""
    with self._session_factory() as session:
        query = session.query(RawModel.message_id)
        if mailbox:
            query = query.filter(RawModel.mailbox == mailbox)
        return {row[0] for row in query.all()}

save(raw)

Persist the RFC822 source. Returns True if inserted, False if updated.

Source code in email_profile/storage/sqlite.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def save(self, raw: RawSerializer) -> bool:
    """Persist the RFC822 source. Returns True if inserted, False if updated."""
    with self._session_factory() as session:
        exists = (
            session.query(RawModel.uid)
            .filter(
                RawModel.uid == raw.uid,
                RawModel.mailbox == raw.mailbox,
            )
            .first()
            is not None
        )

        stmt = sqlite_insert(RawModel).values(
            message_id=raw.message_id,
            uid=raw.uid,
            mailbox=raw.mailbox,
            flags=raw.flags,
            file=raw.file,
        )
        stmt = stmt.on_conflict_do_update(
            index_elements=["uid", "mailbox"],
            set_={
                "message_id": stmt.excluded.message_id,
                "flags": stmt.excluded.flags,
                "file": stmt.excluded.file,
            },
        )
        session.execute(stmt)
        session.commit()
        return not exists

uids(mailbox)

Return stored IMAP UIDs for a mailbox.

Source code in email_profile/storage/sqlite.py
45
46
47
48
49
50
51
52
53
def uids(self, mailbox: str) -> set[str]:
    """Return stored IMAP UIDs for a mailbox."""
    with self._session_factory() as session:
        return {
            row[0]
            for row in session.query(RawModel.uid)
            .filter(RawModel.mailbox == mailbox)
            .all()
        }