import socket import ssl import sys from binascii import b2a_base64 from datetime import datetime, timedelta, timezone class MicromailClient: def __init__(self, host, port, ssl=False, starttls=False): self.host = host self.port = port if ssl and starttls: print("Cannot use both SSL and STARTTLS at the same time.") sys.exit(1) self.ssl = ssl self.starttls = starttls self.socket = None self.username = None def connect(self): for addr_info in socket.getaddrinfo( self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP ): af, socktype, proto, _, sa = addr_info print(f"Trying {sa}") try: self.socket = socket.socket(af, socktype, proto) except OSError: print(f"Could not create socket: {sa}") self.socket = None continue try: self.socket.connect(sa) except OSError: print(f"Could not connect to {sa}") self.socket.close() self.socket = None continue break if self.socket is None: print("Could not open socket.") sys.exit(1) if self.ssl: self.socket = ssl.wrap_socket(self.socket) code = self.socket.read(3) self.socket.readline() if code != b"220": print(f"Error: got code {code} on initial connection") sys.exit(1) self.features = self.ehlo() if self.starttls and b"STARTTLS" in self.features: self.start_starttls() def login(self, username, password): self.username = username auth_methods = None for feature in self.features: if feature.startswith(b"AUTH"): auth_methods = feature[4:].upper().split() break if auth_methods is None: print("No authentication methods available") sys.exit(1) if b"PLAIN" in auth_methods: encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False) code, res = self.send_command(b"AUTH PLAIN ", encoded_auth) elif b"LOGIN" in auth_methods: encoded_username = b2a_base64(username, newline=False) code, res = self.send_command(b"AUTH LOGIN ", encoded_username) if code != b"334": print(f"Error: got code {code} on AUTH LOGIN") sys.exit(1) encoded_password = b2a_base64(password, newline=False) code, res = self.send_command(encoded_password) if code != b"235": print(f"Error: got code {code} on AUTH LOGIN") sys.exit(1) else: print(f"Unsupported authentication method: {', '.join(auth_methods)}") sys.exit(1) print(f"Authenticated as {username}") def write(self, data): if self.socket is None: print("No socket to write to") sys.exit(1) n = self.socket.write(data) if n != len(data): print(f"Failure writing data <{data}>: not all bytes written") def send_command(self, *cmd): self.write(b" ".join(cmd) + b"\r\n") response = [] next = True while next: code = self.socket.read(3) next = self.socket.read(1) == b"-" response.append(self.socket.readline().strip()) return code, response def ehlo(self): code, res = self.send_command(b"EHLO _") if code != b"250": print(f"Error: got code {code} on EHLO") sys.exit(1) return res def start_starttls(self): code, res = self.send_command(b"STARTTLS") if code != b"220": print(f"Error: got code {code} on STARTTLS") sys.exit(1) print(res) self.socket = ssl.wrap_socket(self.socket) def new_message(self, to, sender=None): code, _ = self.send_command(b"RSET") if code != b"250": print(f"Error: got code {code} on RSET") sys.exit(1) if sender is None: sender = self.username if isinstance(to, str): to = [to] self.sender = sender self.to = to code, _ = self.send_command(f"MAIL FROM:<{sender}>".encode()) if code != b"250": print(f"Error: got code {code} on MAIL FROM:<{sender}>") sys.exit(1) for recipient in to: code, _ = self.send_command(f"RCPT TO:<{recipient}>".encode()) if code not in [b"250", b"251"]: print(f"Error: got code {code} on RCPT TO:<{recipient}>") code, _ = self.send_command(b"DATA") if code != b"354": print(f"Error: got code {code} on DATA") sys.exit(1) def headers(self, headers={}): date = headers.get("date", datetime.now(tz=timezone(timedelta()))) if isinstance(date, datetime): date = format_date(date) self.write(f"Date: {date}\r\n".encode()) sender = headers.get("from", self.sender) self.write(f"From: {sender}\r\n".encode()) if "subject" in headers: self.write(f"Subject: {headers['subject']}\r\n".encode()) to = headers.get("to", self.to) if isinstance(to, str): to = [to] self.write(f"To: {', '.join(to)}\r\n".encode()) self.write("\r\n".encode()) def write_line(self, content): if isinstance(content, str): content = content.encode() if content.startswith(b"."): content = b"." + content self.write(content + b"\r\n") def send(self): code, res = self.send_command(b"\r\n.") if code != b"250": print(f"Error: got code {code} on send") print(res) sys.exit(1) print("Message sent successfully") def quit(self): code, _ = self.send_command(b"QUIT") if code != b"221": print(f"Error: got code {code} on QUIT") print("Disconnected from server") self.socket.close() MONTHS = [ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", ] def format_date(date): date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}" time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT" return f"{date_str} {time_str}"