Enhance type annotations and improve method signatures in MicromailClient for better clarity and type safety
This commit is contained in:
parent
4c9ff09ea9
commit
77d4da5900
1 changed files with 81 additions and 58 deletions
139
micromail.py
139
micromail.py
|
@ -6,7 +6,9 @@ from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
|
|
||||||
class MicromailClient:
|
class MicromailClient:
|
||||||
def __init__(self, host, port=0, ssl=False, starttls=False):
|
def __init__(
|
||||||
|
self, host: str, port: int = 0, ssl: bool = False, starttls: bool = False
|
||||||
|
):
|
||||||
self.host = host
|
self.host = host
|
||||||
|
|
||||||
if ssl and starttls:
|
if ssl and starttls:
|
||||||
|
@ -24,10 +26,10 @@ class MicromailClient:
|
||||||
port = 25
|
port = 25
|
||||||
self.port = port
|
self.port = port
|
||||||
|
|
||||||
self.socket = None
|
self.socket: None | socket.socket = None
|
||||||
self.username = None
|
self.username: str | None = None
|
||||||
|
|
||||||
def connect(self):
|
def connect(self) -> None:
|
||||||
con_errors = []
|
con_errors = []
|
||||||
try:
|
try:
|
||||||
ai = socket.getaddrinfo(
|
ai = socket.getaddrinfo(
|
||||||
|
@ -57,27 +59,29 @@ class MicromailClient:
|
||||||
break
|
break
|
||||||
|
|
||||||
if self.socket is None:
|
if self.socket is None:
|
||||||
err = ", ".join([errno.errorcode.get(e.errno, str(e)) for e in con_errors])
|
err = ", ".join(
|
||||||
|
[errno.errorcode.get(e.errno or 0, str(e)) for e in con_errors]
|
||||||
|
)
|
||||||
raise Error(
|
raise Error(
|
||||||
f"Could not connect to server: {err}. Are you connected to the network?"
|
f"Could not connect to server: {err}. Are you connected to the network?"
|
||||||
)
|
)
|
||||||
print("Connected to server.")
|
print("Connected to server.")
|
||||||
|
|
||||||
if self.ssl:
|
if self.ssl:
|
||||||
self.socket = ssl.wrap_socket(self.socket)
|
self.socket = ssl.wrap_socket(self.socket) # type: ignore
|
||||||
|
|
||||||
self.read_res(code=220)
|
self.read_res(code=220)
|
||||||
self.features = self.ehlo()
|
self.features = self.ehlo()
|
||||||
|
|
||||||
if self.starttls and b"STARTTLS" in self.features:
|
if self.starttls and "STARTTLS" in self.features:
|
||||||
self.start_starttls()
|
self.start_starttls()
|
||||||
|
|
||||||
def login(self, username, password):
|
def login(self, username: str, password: str) -> None:
|
||||||
self.username = username
|
self.username = username
|
||||||
|
|
||||||
auth_methods = None
|
auth_methods = None
|
||||||
for feature in self.features:
|
for feature in self.features:
|
||||||
if feature.startswith(b"AUTH"):
|
if feature.startswith("AUTH"):
|
||||||
auth_methods = feature[4:].upper().split()
|
auth_methods = feature[4:].upper().split()
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -86,14 +90,16 @@ class MicromailClient:
|
||||||
f"No authentication methods available. Server features: {self.features}"
|
f"No authentication methods available. Server features: {self.features}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if b"PLAIN" in auth_methods:
|
if "PLAIN" in auth_methods:
|
||||||
encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False)
|
encoded_auth = b2a_base64(
|
||||||
self.send_command(b"AUTH PLAIN ", encoded_auth, code=235)
|
f"\0{username}\0{password}".encode(), newline=False
|
||||||
elif b"LOGIN" in auth_methods:
|
)
|
||||||
encoded_username = b2a_base64(username, newline=False)
|
self.send_command("AUTH PLAIN ", encoded_auth, code=235)
|
||||||
self.send_command(b"AUTH LOGIN ", encoded_username, code=334)
|
elif "LOGIN" in auth_methods:
|
||||||
|
encoded_username = b2a_base64(username.encode(), newline=False)
|
||||||
|
self.send_command("AUTH LOGIN ", encoded_username, code=334)
|
||||||
|
|
||||||
encoded_password = b2a_base64(password, newline=False)
|
encoded_password = b2a_base64(password.encode(), newline=False)
|
||||||
self.send_command(encoded_password, code=235)
|
self.send_command(encoded_password, code=235)
|
||||||
else:
|
else:
|
||||||
raise Error(
|
raise Error(
|
||||||
|
@ -102,26 +108,38 @@ class MicromailClient:
|
||||||
|
|
||||||
print(f"Authenticated as {username}.")
|
print(f"Authenticated as {username}.")
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data: str | bytes) -> None:
|
||||||
if self.socket is None:
|
if self.socket is None:
|
||||||
raise Error("No socket to write to.")
|
raise Error("No socket to write to.")
|
||||||
|
|
||||||
n = self.socket.write(data)
|
if isinstance(data, str):
|
||||||
|
data = data.encode()
|
||||||
|
n = self.socket.write(data) # type: ignore
|
||||||
if n != len(data):
|
if n != len(data):
|
||||||
print(f"Failure writing data <{data}>: not all bytes written")
|
print(f"Failure writing data <{data.decode()}>: not all bytes written")
|
||||||
|
|
||||||
def send_command(self, *cmd, code=[]):
|
def send_command(
|
||||||
cmd = b" ".join(cmd)
|
self, *cmd: str | bytes, code: int | list[int] = []
|
||||||
self.write(cmd + b"\r\n")
|
) -> tuple[int, list[str]]:
|
||||||
return self.read_res(cmd, code=code)
|
cmd_ba = (
|
||||||
|
b" ".join([c if isinstance(c, bytes) else c.encode() for c in cmd])
|
||||||
|
+ b"\r\n"
|
||||||
|
)
|
||||||
|
self.write(cmd_ba)
|
||||||
|
return self.read_res(cmd_ba.decode(), code=code)
|
||||||
|
|
||||||
|
def read_res(
|
||||||
|
self, cmd: str = "", code: int | list[int] = []
|
||||||
|
) -> tuple[int, list[str]]:
|
||||||
|
if self.socket is None:
|
||||||
|
raise NoSocketError
|
||||||
|
|
||||||
def read_res(self, cmd="", code=[]):
|
|
||||||
response = []
|
response = []
|
||||||
next = True
|
next = True
|
||||||
while next:
|
while next:
|
||||||
r_code = int(self.socket.read(3))
|
r_code = int(self.socket.read(3)) # type: ignore
|
||||||
next = self.socket.read(1) == b"-"
|
next = self.socket.read(1) == b"-" # type: ignore
|
||||||
response.append(self.socket.readline().strip())
|
response.append(self.socket.readline().strip().decode()) # type: ignore
|
||||||
|
|
||||||
if isinstance(code, int):
|
if isinstance(code, int):
|
||||||
code = [code]
|
code = [code]
|
||||||
|
@ -129,16 +147,16 @@ class MicromailClient:
|
||||||
raise SMTPError(cmd, r_code, b" ".join(response).decode())
|
raise SMTPError(cmd, r_code, b" ".join(response).decode())
|
||||||
return r_code, response
|
return r_code, response
|
||||||
|
|
||||||
def ehlo(self):
|
def ehlo(self) -> list[str]:
|
||||||
_, res = self.send_command(b"EHLO _", code=250)
|
_, res = self.send_command("EHLO _", code=250)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def start_starttls(self):
|
def start_starttls(self) -> None:
|
||||||
self.send_command(b"STARTTLS", code=220)
|
self.send_command("STARTTLS", code=220)
|
||||||
self.socket = ssl.wrap_socket(self.socket)
|
self.socket = ssl.wrap_socket(self.socket) # type: ignore
|
||||||
|
|
||||||
def new_message(self, to, sender=None):
|
def new_message(self, to: str | list[str], sender: str | None = None) -> None:
|
||||||
self.send_command(b"RSET", code=250)
|
self.send_command("RSET", code=250)
|
||||||
|
|
||||||
if sender is None:
|
if sender is None:
|
||||||
sender = self.username
|
sender = self.username
|
||||||
|
@ -148,45 +166,46 @@ class MicromailClient:
|
||||||
self.sender = sender
|
self.sender = sender
|
||||||
self.to = to
|
self.to = to
|
||||||
|
|
||||||
self.send_command(f"MAIL FROM:<{sender}>".encode(), code=250)
|
self.send_command(f"MAIL FROM:<{sender}>", code=250)
|
||||||
|
|
||||||
for recipient in to:
|
for recipient in to:
|
||||||
self.send_command(f"RCPT TO:<{recipient}>".encode(), code=[250, 251])
|
self.send_command(f"RCPT TO:<{recipient}>", code=[250, 251])
|
||||||
|
|
||||||
self.send_command(b"DATA", code=354)
|
self.send_command("DATA", code=354)
|
||||||
|
|
||||||
def headers(self, headers={}):
|
def headers(self, headers: dict[str, str]) -> None:
|
||||||
date = headers.get("date", datetime.now(tz=timezone(timedelta())))
|
date = headers.get("date", format_date(datetime.now(tz=timezone(timedelta()))))
|
||||||
if isinstance(date, datetime):
|
self.write(f"Date: {date}\r\n")
|
||||||
date = format_date(date)
|
|
||||||
self.write(f"Date: {date}\r\n".encode())
|
|
||||||
|
|
||||||
sender = headers.get("from", self.sender)
|
sender = headers.get("from", self.sender)
|
||||||
self.write(f"From: {sender}\r\n".encode())
|
self.write(f"From: {sender}\r\n")
|
||||||
|
|
||||||
if "subject" in headers:
|
if "subject" in headers:
|
||||||
self.write(f"Subject: {headers['subject']}\r\n".encode())
|
self.write(f"Subject: {headers['subject']}\r\n")
|
||||||
|
|
||||||
to = headers.get("to", self.to)
|
to = headers.get("to", self.to)
|
||||||
|
if to is None:
|
||||||
|
to = self.to
|
||||||
if isinstance(to, str):
|
if isinstance(to, str):
|
||||||
to = [to]
|
to = [to]
|
||||||
self.write(f"To: {', '.join(to)}\r\n".encode())
|
self.write(f"To: {', '.join(to)}\r\n")
|
||||||
|
|
||||||
self.write("\r\n".encode())
|
self.write("\r\n")
|
||||||
|
|
||||||
def write_line(self, content):
|
def write_line(self, content: str) -> None:
|
||||||
if isinstance(content, str):
|
if content.startswith("."):
|
||||||
content = content.encode()
|
content = "." + content
|
||||||
if content.startswith(b"."):
|
|
||||||
content = b"." + content
|
|
||||||
|
|
||||||
self.write(content + b"\r\n")
|
self.write(content + "\r\n")
|
||||||
|
|
||||||
def send(self):
|
def send(self) -> None:
|
||||||
self.send_command(b"\r\n.", code=250)
|
self.send_command("\r\n.", code=250)
|
||||||
print("Message sent successfully")
|
print("Message sent successfully")
|
||||||
|
|
||||||
def quit(self):
|
def quit(self) -> None:
|
||||||
|
if self.socket is None:
|
||||||
|
raise NoSocketError
|
||||||
|
|
||||||
self.send_command(b"QUIT", code=221)
|
self.send_command(b"QUIT", code=221)
|
||||||
print("Disconnected from server")
|
print("Disconnected from server")
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
|
@ -196,9 +215,13 @@ class Error(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NoSocketError(Error):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class SMTPError(Error):
|
class SMTPError(Error):
|
||||||
def __init__(self, cmd, code, message):
|
def __init__(self, cmd: str, code: int, message: str):
|
||||||
super().__init__(f"{code} {message}")
|
super().__init__(f"{cmd} -> {code} {message}")
|
||||||
|
|
||||||
self.cmd = cmd
|
self.cmd = cmd
|
||||||
self.code = code
|
self.code = code
|
||||||
|
@ -220,7 +243,7 @@ MONTHS = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def format_date(date):
|
def format_date(date: datetime) -> str:
|
||||||
date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}"
|
date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}"
|
||||||
time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT"
|
time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT"
|
||||||
return f"{date_str} {time_str}"
|
return f"{date_str} {time_str}"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue