import errno import socket import ssl from binascii import b2a_base64 from datetime import datetime, timedelta, timezone class MicromailClient: def __init__( self, host: str, port: int = 0, ssl: bool = False, starttls: bool = False ): self.host = host if ssl and starttls: raise Error("Cannot use both SSL and STARTTLS at the same time.") self.ssl = ssl self.starttls = starttls if port == 0: if ssl: port = 465 elif starttls: port = 587 else: port = 25 self.port = port self.socket: None | socket.socket = None self.username: str | None = None def connect(self) -> None: con_errors = [] try: ai = socket.getaddrinfo( self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP ) except OSError as e: ai = [] con_errors.append(e) for addr_info in ai: af, socktype, proto, _, sa = addr_info try: self.socket = socket.socket(af, socktype, proto) except OSError as e: con_errors.append(e) self.socket = None continue try: self.socket.connect(sa) except OSError as e: con_errors.append(e) self.socket.close() self.socket = None continue break if self.socket is None: err = ", ".join( [errno.errorcode.get(e.errno or 0, str(e)) for e in con_errors] ) raise Error( f"Could not connect to server: {err}. Are you connected to the network?" ) print("Connected to server.") if self.ssl: self.socket = ssl.wrap_socket(self.socket) self.read_res(code=220) self.features = self.ehlo() if self.starttls and "STARTTLS" in self.features: self.start_starttls() def login(self, username: str, password: str) -> None: self.username = username auth_methods = None for feature in self.features: if feature.startswith("AUTH"): auth_methods = feature[4:].upper().split() break if auth_methods is None: raise Error( f"No authentication methods available. Server features: {self.features}" ) if "PLAIN" in auth_methods: encoded_auth = b2a_base64( f"\0{username}\0{password}".encode(), newline=False ) self.send_command("AUTH PLAIN ", encoded_auth, code=235) elif "LOGIN" in auth_methods: encoded_username = b2a_base64(username.encode(), newline=False) self.send_command("AUTH LOGIN ", encoded_username, code=334) encoded_password = b2a_base64(password.encode(), newline=False) self.send_command(encoded_password, code=235) else: raise Error( f"Unsupported authentication methods: {', '.join(auth_methods)}. Micromail only supports PLAIN and LOGIN." ) print(f"Authenticated as {username}.") def write(self, data: str | bytes) -> None: if self.socket is None: raise NoSocketError if isinstance(data, str): data = data.encode() n = self.socket.write(data) if n != len(data): print(f"Failure writing data <{data.decode()}>: not all bytes written") def send_command( self, *cmd: str | bytes, code: int | list[int] = [] ) -> tuple[int, list[str]]: cmd_ba = ( b" ".join([c if isinstance(c, bytes) else c.encode() for c in cmd]) + b"\r\n" ) self.write(cmd_ba) return self.read_res(cmd_ba.decode(), code=code) def read_res( self, cmd: str = "", code: int | list[int] = [] ) -> tuple[int, list[str]]: if self.socket is None: raise NoSocketError response = [] next = True while next: r_code = int(self.socket.read(3)) next = self.socket.read(1) == b"-" response.append(self.socket.readline().strip().decode()) if isinstance(code, int): code = [code] if code != [] and r_code not in code: raise SMTPError(cmd, r_code, " ".join(response)) return r_code, response def ehlo(self) -> list[str]: _, res = self.send_command("EHLO _", code=250) return res def start_starttls(self) -> None: if self.socket is None: raise NoSocketError self.send_command("STARTTLS", code=220) self.socket = ssl.wrap_socket(self.socket) def new_message(self, to: str | list[str], sender: str | None = None) -> None: self.send_command("RSET", code=250) if sender is None: sender = self.username if isinstance(to, str): to = [to] self.sender = sender self.to = to self.send_command(f"MAIL FROM:<{sender}>", code=250) for recipient in to: self.send_command(f"RCPT TO:<{recipient}>", code=[250, 251]) self.send_command("DATA", code=354) def headers(self, headers: dict[str, str] = {}) -> None: date = headers.get("date", format_date(datetime.now(tz=timezone(timedelta())))) self.write(f"Date: {date}\r\n") sender = headers.get("from", self.sender) self.write(f"From: {sender}\r\n") if "subject" in headers: self.write(f"Subject: {headers['subject']}\r\n") to = headers.get("to", self.to) if to is None: to = self.to if isinstance(to, str): to = [to] self.write(f"To: {', '.join(to)}\r\n") self.write("\r\n") def write_line(self, content: str) -> None: if content.startswith("."): content = "." + content self.write(content + "\r\n") def send(self) -> None: self.send_command("\r\n.", code=250) print("Message sent successfully") def quit(self) -> None: if self.socket is None: raise NoSocketError self.send_command(b"QUIT", code=221) print("Disconnected from server") self.socket.close() class Error(Exception): pass class NoSocketError(Error): pass class SMTPError(Error): def __init__(self, cmd: str, code: int, message: str): super().__init__(f"{cmd} -> {code} {message}") self.cmd = cmd self.code = code MONTHS = [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ] def format_date(date: datetime) -> str: date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}" time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT" return f"{date_str} {time_str}"