diff --git a/micromail.py b/micromail.py index 84ac399..32d2274 100644 --- a/micromail.py +++ b/micromail.py @@ -6,7 +6,9 @@ from datetime import datetime, timedelta, timezone class MicromailClient: - def __init__(self, host, port=0, ssl=False, starttls=False): + def __init__( + self, host: str, port: int = 0, ssl: bool = False, starttls: bool = False + ): self.host = host if ssl and starttls: @@ -24,10 +26,10 @@ class MicromailClient: port = 25 self.port = port - self.socket = None - self.username = None + self.socket: None | socket.socket = None + self.username: str | None = None - def connect(self): + def connect(self) -> None: con_errors = [] try: ai = socket.getaddrinfo( @@ -57,27 +59,29 @@ class MicromailClient: break if self.socket is None: - err = ", ".join([errno.errorcode.get(e.errno, str(e)) for e in con_errors]) + err = ", ".join( + [errno.errorcode.get(e.errno or 0, str(e)) for e in con_errors] + ) raise Error( f"Could not connect to server: {err}. Are you connected to the network?" ) print("Connected to server.") if self.ssl: - self.socket = ssl.wrap_socket(self.socket) + self.socket = ssl.wrap_socket(self.socket) # type: ignore self.read_res(code=220) self.features = self.ehlo() - if self.starttls and b"STARTTLS" in self.features: + if self.starttls and "STARTTLS" in self.features: self.start_starttls() - def login(self, username, password): + def login(self, username: str, password: str) -> None: self.username = username auth_methods = None for feature in self.features: - if feature.startswith(b"AUTH"): + if feature.startswith("AUTH"): auth_methods = feature[4:].upper().split() break @@ -86,14 +90,16 @@ class MicromailClient: f"No authentication methods available. Server features: {self.features}" ) - if b"PLAIN" in auth_methods: - encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False) - self.send_command(b"AUTH PLAIN ", encoded_auth, code=235) - elif b"LOGIN" in auth_methods: - encoded_username = b2a_base64(username, newline=False) - self.send_command(b"AUTH LOGIN ", encoded_username, code=334) + if "PLAIN" in auth_methods: + encoded_auth = b2a_base64( + f"\0{username}\0{password}".encode(), newline=False + ) + self.send_command("AUTH PLAIN ", encoded_auth, code=235) + elif "LOGIN" in auth_methods: + encoded_username = b2a_base64(username.encode(), newline=False) + self.send_command("AUTH LOGIN ", encoded_username, code=334) - encoded_password = b2a_base64(password, newline=False) + encoded_password = b2a_base64(password.encode(), newline=False) self.send_command(encoded_password, code=235) else: raise Error( @@ -102,26 +108,38 @@ class MicromailClient: print(f"Authenticated as {username}.") - def write(self, data): + def write(self, data: str | bytes) -> None: if self.socket is None: raise Error("No socket to write to.") - n = self.socket.write(data) + if isinstance(data, str): + data = data.encode() + n = self.socket.write(data) # type: ignore if n != len(data): - print(f"Failure writing data <{data}>: not all bytes written") + print(f"Failure writing data <{data.decode()}>: not all bytes written") - def send_command(self, *cmd, code=[]): - cmd = b" ".join(cmd) - self.write(cmd + b"\r\n") - return self.read_res(cmd, code=code) + def send_command( + self, *cmd: str | bytes, code: int | list[int] = [] + ) -> tuple[int, list[str]]: + cmd_ba = ( + b" ".join([c if isinstance(c, bytes) else c.encode() for c in cmd]) + + b"\r\n" + ) + self.write(cmd_ba) + return self.read_res(cmd_ba.decode(), code=code) + + def read_res( + self, cmd: str = "", code: int | list[int] = [] + ) -> tuple[int, list[str]]: + if self.socket is None: + raise NoSocketError - def read_res(self, cmd="", code=[]): response = [] next = True while next: - r_code = int(self.socket.read(3)) - next = self.socket.read(1) == b"-" - response.append(self.socket.readline().strip()) + r_code = int(self.socket.read(3)) # type: ignore + next = self.socket.read(1) == b"-" # type: ignore + response.append(self.socket.readline().strip().decode()) # type: ignore if isinstance(code, int): code = [code] @@ -129,16 +147,16 @@ class MicromailClient: raise SMTPError(cmd, r_code, b" ".join(response).decode()) return r_code, response - def ehlo(self): - _, res = self.send_command(b"EHLO _", code=250) + def ehlo(self) -> list[str]: + _, res = self.send_command("EHLO _", code=250) return res - def start_starttls(self): - self.send_command(b"STARTTLS", code=220) - self.socket = ssl.wrap_socket(self.socket) + def start_starttls(self) -> None: + self.send_command("STARTTLS", code=220) + self.socket = ssl.wrap_socket(self.socket) # type: ignore - def new_message(self, to, sender=None): - self.send_command(b"RSET", code=250) + def new_message(self, to: str | list[str], sender: str | None = None) -> None: + self.send_command("RSET", code=250) if sender is None: sender = self.username @@ -148,45 +166,46 @@ class MicromailClient: self.sender = sender self.to = to - self.send_command(f"MAIL FROM:<{sender}>".encode(), code=250) + self.send_command(f"MAIL FROM:<{sender}>", code=250) for recipient in to: - self.send_command(f"RCPT TO:<{recipient}>".encode(), code=[250, 251]) + self.send_command(f"RCPT TO:<{recipient}>", code=[250, 251]) - self.send_command(b"DATA", code=354) + self.send_command("DATA", code=354) - def headers(self, headers={}): - date = headers.get("date", datetime.now(tz=timezone(timedelta()))) - if isinstance(date, datetime): - date = format_date(date) - self.write(f"Date: {date}\r\n".encode()) + def headers(self, headers: dict[str, str]) -> None: + date = headers.get("date", format_date(datetime.now(tz=timezone(timedelta())))) + self.write(f"Date: {date}\r\n") sender = headers.get("from", self.sender) - self.write(f"From: {sender}\r\n".encode()) + self.write(f"From: {sender}\r\n") if "subject" in headers: - self.write(f"Subject: {headers['subject']}\r\n".encode()) + self.write(f"Subject: {headers['subject']}\r\n") to = headers.get("to", self.to) + if to is None: + to = self.to if isinstance(to, str): to = [to] - self.write(f"To: {', '.join(to)}\r\n".encode()) + self.write(f"To: {', '.join(to)}\r\n") - self.write("\r\n".encode()) + self.write("\r\n") - def write_line(self, content): - if isinstance(content, str): - content = content.encode() - if content.startswith(b"."): - content = b"." + content + def write_line(self, content: str) -> None: + if content.startswith("."): + content = "." + content - self.write(content + b"\r\n") + self.write(content + "\r\n") - def send(self): - self.send_command(b"\r\n.", code=250) + def send(self) -> None: + self.send_command("\r\n.", code=250) print("Message sent successfully") - def quit(self): + def quit(self) -> None: + if self.socket is None: + raise NoSocketError + self.send_command(b"QUIT", code=221) print("Disconnected from server") self.socket.close() @@ -196,9 +215,13 @@ class Error(Exception): pass +class NoSocketError(Error): + pass + + class SMTPError(Error): - def __init__(self, cmd, code, message): - super().__init__(f"{code} {message}") + def __init__(self, cmd: str, code: int, message: str): + super().__init__(f"{cmd} -> {code} {message}") self.cmd = cmd self.code = code @@ -220,7 +243,7 @@ MONTHS = [ ] -def format_date(date): +def format_date(date: datetime) -> str: date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}" time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT" return f"{date_str} {time_str}"