Skip to content

Sender

email_profile.clients.smtp.sender.Sender

Outgoing-mail facade: send, reply, forward.

Source code in email_profile/clients/smtp/sender.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class Sender:
    """Outgoing-mail facade: send, reply, forward."""

    def __init__(self, session: ImapClient, folders: FolderAccess) -> None:
        self._session = session
        self._folders = folders
        self._smtp_host: Optional[SMTPHost] = None

    def smtp_host(self) -> SMTPHost:
        """Resolved SMTP host for this account. Cached after first call."""
        if self._smtp_host is None:
            self._smtp_host = resolve_smtp_host(self._session.user)
        return self._smtp_host

    def send(
        self,
        to: Union[str, list[str]],
        subject: str,
        body: str = "",
        *,
        html: Optional[str] = None,
        cc: Optional[Union[str, list[str]]] = None,
        bcc: Optional[Union[str, list[str]]] = None,
        reply_to: Optional[str] = None,
        attachments: Optional[list[AttachmentLike]] = None,
        headers: Optional[dict[str, str]] = None,
        save_to_sent: bool = True,
    ) -> None:
        """Send an email via SMTP, optionally appending to Sent."""
        message = _build_message(
            sender=self._session.user,
            to=to,
            subject=subject,
            body=body,
            html=html,
            cc=cc,
            bcc=bcc,
            reply_to=reply_to,
            attachments=attachments,
            headers=headers,
        )
        self.send_message(message, save_to_sent=save_to_sent)

    def send_message(
        self,
        message: EmailMessage,
        *,
        save_to_sent: bool = True,
    ) -> None:
        """Send a pre-built EmailMessage."""
        if not message.get("From"):
            message["From"] = self._session.user

        to = message.get("To", "")

        try:
            with SmtpClient(
                host=self.smtp_host(),
                user=self._session.user,
                password=self._session.password,
            ) as client:
                client.send(message)
        except Exception as exc:
            console.print(f"  [red]✗[/] Failed to send to {to}{exc}")
            raise

        console.print(f"  [green]✓[/] Email sent to {to}")

        if save_to_sent and self._session.is_connected:
            with contextlib.suppress(KeyError):
                self._folders.sent.append(message.as_bytes())

    def reply(
        self,
        original: Message,
        body: str = "",
        *,
        html: Optional[str] = None,
        attachments: Optional[list[AttachmentLike]] = None,
        reply_all: bool = False,
        save_to_sent: bool = True,
    ) -> None:
        """Reply to a previously-fetched message, preserving threading."""
        subject = original.subject or ""
        if not subject.lower().startswith("re:"):
            subject = f"Re: {subject}"

        to = original.reply_to or original.from_ or ""
        cc = original.cc if reply_all else None

        headers: dict[str, str] = {}
        if original.message_id:
            headers["In-Reply-To"] = original.message_id
            refs = (original.references or "").split()
            refs.append(original.message_id)
            headers["References"] = " ".join(refs)

        self.send(
            to=to,
            subject=subject,
            body=body,
            html=html,
            cc=cc,
            attachments=attachments,
            headers=headers,
            save_to_sent=save_to_sent,
        )

    def forward(
        self,
        original: Message,
        to: Union[str, list[str]],
        body: str = "",
        *,
        attachments: Optional[list[AttachmentLike]] = None,
        save_to_sent: bool = True,
    ) -> None:
        """Forward a previously-fetched message to new recipients."""
        subject = original.subject or ""
        if not subject.lower().startswith("fwd:"):
            subject = f"Fwd: {subject}"

        quoted = (
            f"\n\n---------- Forwarded message ----------\n"
            f"From: {original.from_}\n"
            f"Date: {original.date}\n"
            f"Subject: {original.subject}\n"
            f"To: {original.to_}\n\n"
            f"{original.body_text_plain}"
        )

        self.send(
            to=to,
            subject=subject,
            body=(body + quoted) if body else quoted,
            attachments=attachments,
            save_to_sent=save_to_sent,
        )

__init__(session, folders)

Source code in email_profile/clients/smtp/sender.py
34
35
36
37
def __init__(self, session: ImapClient, folders: FolderAccess) -> None:
    self._session = session
    self._folders = folders
    self._smtp_host: Optional[SMTPHost] = None

forward(original, to, body='', *, attachments=None, save_to_sent=True)

Forward a previously-fetched message to new recipients.

Source code in email_profile/clients/smtp/sender.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def forward(
    self,
    original: Message,
    to: Union[str, list[str]],
    body: str = "",
    *,
    attachments: Optional[list[AttachmentLike]] = None,
    save_to_sent: bool = True,
) -> None:
    """Forward a previously-fetched message to new recipients."""
    subject = original.subject or ""
    if not subject.lower().startswith("fwd:"):
        subject = f"Fwd: {subject}"

    quoted = (
        f"\n\n---------- Forwarded message ----------\n"
        f"From: {original.from_}\n"
        f"Date: {original.date}\n"
        f"Subject: {original.subject}\n"
        f"To: {original.to_}\n\n"
        f"{original.body_text_plain}"
    )

    self.send(
        to=to,
        subject=subject,
        body=(body + quoted) if body else quoted,
        attachments=attachments,
        save_to_sent=save_to_sent,
    )

reply(original, body='', *, html=None, attachments=None, reply_all=False, save_to_sent=True)

Reply to a previously-fetched message, preserving threading.

Source code in email_profile/clients/smtp/sender.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def reply(
    self,
    original: Message,
    body: str = "",
    *,
    html: Optional[str] = None,
    attachments: Optional[list[AttachmentLike]] = None,
    reply_all: bool = False,
    save_to_sent: bool = True,
) -> None:
    """Reply to a previously-fetched message, preserving threading."""
    subject = original.subject or ""
    if not subject.lower().startswith("re:"):
        subject = f"Re: {subject}"

    to = original.reply_to or original.from_ or ""
    cc = original.cc if reply_all else None

    headers: dict[str, str] = {}
    if original.message_id:
        headers["In-Reply-To"] = original.message_id
        refs = (original.references or "").split()
        refs.append(original.message_id)
        headers["References"] = " ".join(refs)

    self.send(
        to=to,
        subject=subject,
        body=body,
        html=html,
        cc=cc,
        attachments=attachments,
        headers=headers,
        save_to_sent=save_to_sent,
    )

send(to, subject, body='', *, html=None, cc=None, bcc=None, reply_to=None, attachments=None, headers=None, save_to_sent=True)

Send an email via SMTP, optionally appending to Sent.

Source code in email_profile/clients/smtp/sender.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def send(
    self,
    to: Union[str, list[str]],
    subject: str,
    body: str = "",
    *,
    html: Optional[str] = None,
    cc: Optional[Union[str, list[str]]] = None,
    bcc: Optional[Union[str, list[str]]] = None,
    reply_to: Optional[str] = None,
    attachments: Optional[list[AttachmentLike]] = None,
    headers: Optional[dict[str, str]] = None,
    save_to_sent: bool = True,
) -> None:
    """Send an email via SMTP, optionally appending to Sent."""
    message = _build_message(
        sender=self._session.user,
        to=to,
        subject=subject,
        body=body,
        html=html,
        cc=cc,
        bcc=bcc,
        reply_to=reply_to,
        attachments=attachments,
        headers=headers,
    )
    self.send_message(message, save_to_sent=save_to_sent)

send_message(message, *, save_to_sent=True)

Send a pre-built EmailMessage.

Source code in email_profile/clients/smtp/sender.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def send_message(
    self,
    message: EmailMessage,
    *,
    save_to_sent: bool = True,
) -> None:
    """Send a pre-built EmailMessage."""
    if not message.get("From"):
        message["From"] = self._session.user

    to = message.get("To", "")

    try:
        with SmtpClient(
            host=self.smtp_host(),
            user=self._session.user,
            password=self._session.password,
        ) as client:
            client.send(message)
    except Exception as exc:
        console.print(f"  [red]✗[/] Failed to send to {to}{exc}")
        raise

    console.print(f"  [green]✓[/] Email sent to {to}")

    if save_to_sent and self._session.is_connected:
        with contextlib.suppress(KeyError):
            self._folders.sent.append(message.as_bytes())

smtp_host()

Resolved SMTP host for this account. Cached after first call.

Source code in email_profile/clients/smtp/sender.py
39
40
41
42
43
def smtp_host(self) -> SMTPHost:
    """Resolved SMTP host for this account. Cached after first call."""
    if self._smtp_host is None:
        self._smtp_host = resolve_smtp_host(self._session.user)
    return self._smtp_host