micromail/micromail.py
2025-05-19 13:57:43 +02:00

252 lines
6.9 KiB
Python

import errno
import socket
import ssl
from binascii import b2a_base64
from datetime import datetime, timedelta, timezone
class MicromailClient:
def __init__(
self, host: str, port: int = 0, ssl: bool = False, starttls: bool = False
):
self.host = host
if ssl and starttls:
raise Error("Cannot use both SSL and STARTTLS at the same time.")
self.ssl = ssl
self.starttls = starttls
if port == 0:
if ssl:
port = 465
elif starttls:
port = 587
else:
port = 25
self.port = port
self.socket: None | socket.socket = None
self.username: str | None = None
def connect(self) -> None:
con_errors = []
try:
ai = socket.getaddrinfo(
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP
)
except OSError as e:
ai = []
con_errors.append(e)
for addr_info in ai:
af, socktype, proto, _, sa = addr_info
try:
self.socket = socket.socket(af, socktype, proto)
except OSError as e:
con_errors.append(e)
self.socket = None
continue
try:
self.socket.connect(sa)
except OSError as e:
con_errors.append(e)
self.socket.close()
self.socket = None
continue
break
if self.socket is None:
err = ", ".join(
[errno.errorcode.get(e.errno or 0, str(e)) for e in con_errors]
)
raise Error(
f"Could not connect to server: {err}. Are you connected to the network?"
)
print("Connected to server.")
if self.ssl:
self.socket = ssl.wrap_socket(self.socket)
self.read_res(code=220)
self.features = self.ehlo()
if self.starttls and "STARTTLS" in self.features:
self.start_starttls()
def login(self, username: str, password: str) -> None:
self.username = username
auth_methods = None
for feature in self.features:
if feature.startswith("AUTH"):
auth_methods = feature[4:].upper().split()
break
if auth_methods is None:
raise Error(
f"No authentication methods available. Server features: {self.features}"
)
if "PLAIN" in auth_methods:
encoded_auth = b2a_base64(
f"\0{username}\0{password}".encode(), newline=False
)
self.send_command("AUTH PLAIN ", encoded_auth, code=235)
elif "LOGIN" in auth_methods:
encoded_username = b2a_base64(username.encode(), newline=False)
self.send_command("AUTH LOGIN ", encoded_username, code=334)
encoded_password = b2a_base64(password.encode(), newline=False)
self.send_command(encoded_password, code=235)
else:
raise Error(
f"Unsupported authentication methods: {', '.join(auth_methods)}. Micromail only supports PLAIN and LOGIN."
)
print(f"Authenticated as {username}.")
def write(self, data: str | bytes) -> None:
if self.socket is None:
raise NoSocketError
if isinstance(data, str):
data = data.encode()
n = self.socket.write(data)
if n != len(data):
print(f"Failure writing data <{data.decode()}>: not all bytes written")
def send_command(
self, *cmd: str | bytes, code: int | list[int] = []
) -> tuple[int, list[str]]:
cmd_ba = (
b" ".join([c if isinstance(c, bytes) else c.encode() for c in cmd])
+ b"\r\n"
)
self.write(cmd_ba)
return self.read_res(cmd_ba.decode(), code=code)
def read_res(
self, cmd: str = "", code: int | list[int] = []
) -> tuple[int, list[str]]:
if self.socket is None:
raise NoSocketError
response = []
next = True
while next:
r_code = int(self.socket.read(3))
next = self.socket.read(1) == b"-"
response.append(self.socket.readline().strip().decode())
if isinstance(code, int):
code = [code]
if code != [] and r_code not in code:
raise SMTPError(cmd, r_code, " ".join(response))
return r_code, response
def ehlo(self) -> list[str]:
_, res = self.send_command("EHLO _", code=250)
return res
def start_starttls(self) -> None:
if self.socket is None:
raise NoSocketError
self.send_command("STARTTLS", code=220)
self.socket = ssl.wrap_socket(self.socket)
def new_message(self, to: str | list[str], sender: str | None = None) -> None:
self.send_command("RSET", code=250)
if sender is None:
sender = self.username
if isinstance(to, str):
to = [to]
self.sender = sender
self.to = to
self.send_command(f"MAIL FROM:<{sender}>", code=250)
for recipient in to:
self.send_command(f"RCPT TO:<{recipient}>", code=[250, 251])
self.send_command("DATA", code=354)
def headers(self, headers: dict[str, str] = {}) -> None:
date = headers.get("date", format_date(datetime.now(tz=timezone(timedelta()))))
self.write(f"Date: {date}\r\n")
sender = headers.get("from", self.sender)
self.write(f"From: {sender}\r\n")
if "subject" in headers:
self.write(f"Subject: {headers['subject']}\r\n")
to = headers.get("to", self.to)
if to is None:
to = self.to
if isinstance(to, str):
to = [to]
self.write(f"To: {', '.join(to)}\r\n")
self.write("\r\n")
def write_line(self, content: str) -> None:
if content.startswith("."):
content = "." + content
self.write(content + "\r\n")
def send(self) -> None:
self.send_command("\r\n.", code=250)
print("Message sent successfully")
def quit(self) -> None:
if self.socket is None:
raise NoSocketError
self.send_command(b"QUIT", code=221)
print("Disconnected from server")
self.socket.close()
class Error(Exception):
pass
class NoSocketError(Error):
pass
class SMTPError(Error):
def __init__(self, cmd: str, code: int, message: str):
super().__init__(f"{cmd} -> {code} {message}")
self.cmd = cmd
self.code = code
MONTHS = [
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sep",
"Oct",
"Nov",
"Dec",
]
def format_date(date: datetime) -> str:
date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}"
time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT"
return f"{date_str} {time_str}"