import errno import socket import ssl from binascii import b2a_base64 from datetime import datetime, timedelta, timezone class MicromailClient: def __init__(self, host, port=0, ssl=False, starttls=False): self.host = host if ssl and starttls: raise Error("Cannot use both SSL and STARTTLS at the same time.") self.ssl = ssl self.starttls = starttls if port == 0: if ssl: port = 465 elif starttls: port = 587 else: port = 25 self.port = port self.socket = None self.username = None def connect(self): con_errors = [] try: ai = socket.getaddrinfo( self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP ) except OSError as e: ai = [] con_errors.append(e) for addr_info in ai: af, socktype, proto, _, sa = addr_info try: self.socket = socket.socket(af, socktype, proto) except OSError as e: con_errors.append(e) self.socket = None continue try: self.socket.connect(sa) except OSError as e: con_errors.append(e) self.socket.close() self.socket = None continue break if self.socket is None: err = ", ".join([errno.errorcode.get(e.errno, str(e)) for e in con_errors]) raise Error( f"Could not connect to server: {err}. Are you connected to the network?" ) print("Connected to server.") if self.ssl: self.socket = ssl.wrap_socket(self.socket) self.read_res(code=220) self.features = self.ehlo() if self.starttls and b"STARTTLS" in self.features: self.start_starttls() def login(self, username, password): self.username = username auth_methods = None for feature in self.features: if feature.startswith(b"AUTH"): auth_methods = feature[4:].upper().split() break if auth_methods is None: raise Error( f"No authentication methods available. Server features: {self.features}" ) if b"PLAIN" in auth_methods: encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False) self.send_command(b"AUTH PLAIN ", encoded_auth, code=235) elif b"LOGIN" in auth_methods: encoded_username = b2a_base64(username, newline=False) self.send_command(b"AUTH LOGIN ", encoded_username, code=334) encoded_password = b2a_base64(password, newline=False) self.send_command(encoded_password, code=235) else: raise Error( f"Unsupported authentication methods: {', '.join(auth_methods)}. Micromail only supports PLAIN and LOGIN." ) print(f"Authenticated as {username}.") def write(self, data): if self.socket is None: raise Error("No socket to write to.") n = self.socket.write(data) if n != len(data): print(f"Failure writing data <{data}>: not all bytes written") def send_command(self, *cmd, code=[]): cmd = b" ".join(cmd) self.write(cmd + b"\r\n") return self.read_res(cmd, code=code) def read_res(self, cmd="", code=[]): response = [] next = True while next: r_code = int(self.socket.read(3)) next = self.socket.read(1) == b"-" response.append(self.socket.readline().strip()) if isinstance(code, int): code = [code] if code != [] and r_code not in code: raise SMTPError(cmd, r_code, b" ".join(response).decode()) return r_code, response def ehlo(self): _, res = self.send_command(b"EHLO _", code=250) return res def start_starttls(self): self.send_command(b"STARTTLS", code=220) self.socket = ssl.wrap_socket(self.socket) def new_message(self, to, sender=None): self.send_command(b"RSET", code=250) if sender is None: sender = self.username if isinstance(to, str): to = [to] self.sender = sender self.to = to self.send_command(f"MAIL FROM:<{sender}>".encode(), code=250) for recipient in to: self.send_command(f"RCPT TO:<{recipient}>".encode(), code=[250, 251]) self.send_command(b"DATA", code=354) def headers(self, headers={}): date = headers.get("date", datetime.now(tz=timezone(timedelta()))) if isinstance(date, datetime): date = format_date(date) self.write(f"Date: {date}\r\n".encode()) sender = headers.get("from", self.sender) self.write(f"From: {sender}\r\n".encode()) if "subject" in headers: self.write(f"Subject: {headers['subject']}\r\n".encode()) to = headers.get("to", self.to) if isinstance(to, str): to = [to] self.write(f"To: {', '.join(to)}\r\n".encode()) self.write("\r\n".encode()) def write_line(self, content): if isinstance(content, str): content = content.encode() if content.startswith(b"."): content = b"." + content self.write(content + b"\r\n") def send(self): self.send_command(b"\r\n.", code=250) print("Message sent successfully") def quit(self): self.send_command(b"QUIT", code=221) print("Disconnected from server") self.socket.close() class Error(Exception): pass class SMTPError(Error): def __init__(self, cmd, code, message): super().__init__(f"{code} {message}") self.cmd = cmd self.code = code MONTHS = [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ] def format_date(date): date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}" time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT" return f"{date_str} {time_str}"