Refactor error handling in MicromailClient to raise exceptions instead of exiting; improve connection logic and command responses

This commit is contained in:
Edgar P. Burkhart 2025-05-19 12:33:37 +02:00
parent f592335c8f
commit 36dc32fef3
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227

View file

@ -1,6 +1,6 @@
import errno
import socket import socket
import ssl import ssl
import sys
from binascii import b2a_base64 from binascii import b2a_base64
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -11,8 +11,7 @@ class MicromailClient:
self.port = port self.port = port
if ssl and starttls: if ssl and starttls:
print("Cannot use both SSL and STARTTLS at the same time.") raise Error("Cannot use both SSL and STARTTLS at the same time.")
sys.exit(1)
self.ssl = ssl self.ssl = ssl
self.starttls = starttls self.starttls = starttls
@ -21,22 +20,28 @@ class MicromailClient:
self.username = None self.username = None
def connect(self): def connect(self):
for addr_info in socket.getaddrinfo( con_errors = []
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP try:
): ai = socket.getaddrinfo(
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP
)
except OSError as e:
ai = []
con_errors.append(e)
for addr_info in ai:
af, socktype, proto, _, sa = addr_info af, socktype, proto, _, sa = addr_info
print(f"Trying {sa}")
try: try:
self.socket = socket.socket(af, socktype, proto) self.socket = socket.socket(af, socktype, proto)
except OSError: except OSError as e:
print(f"Could not create socket: {sa}") con_errors.append(e)
self.socket = None self.socket = None
continue continue
try: try:
self.socket.connect(sa) self.socket.connect(sa)
except OSError: except OSError as e:
print(f"Could not connect to {sa}") con_errors.append(e)
self.socket.close() self.socket.close()
self.socket = None self.socket = None
continue continue
@ -44,19 +49,16 @@ class MicromailClient:
break break
if self.socket is None: if self.socket is None:
print("Could not open socket.") err = ", ".join([errno.errorcode.get(e.errno, str(e)) for e in con_errors])
sys.exit(1) raise Error(
f"Could not connect to server: {err}. Are you connected to the network?"
)
print("Connected to server.")
if self.ssl: if self.ssl:
self.socket = ssl.wrap_socket(self.socket) self.socket = ssl.wrap_socket(self.socket)
code = self.socket.read(3) self.read_res(code=220)
self.socket.readline()
if code != b"220":
print(f"Error: got code {code} on initial connection")
sys.exit(1)
self.features = self.ehlo() self.features = self.ehlo()
if self.starttls and b"STARTTLS" in self.features: if self.starttls and b"STARTTLS" in self.features:
@ -70,72 +72,65 @@ class MicromailClient:
if feature.startswith(b"AUTH"): if feature.startswith(b"AUTH"):
auth_methods = feature[4:].upper().split() auth_methods = feature[4:].upper().split()
break break
if auth_methods is None: if auth_methods is None:
print("No authentication methods available") raise Error(
sys.exit(1) f"No authentication methods available. Server features: {self.features}"
)
if b"PLAIN" in auth_methods: if b"PLAIN" in auth_methods:
encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False) encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False)
code, res = self.send_command(b"AUTH PLAIN ", encoded_auth) self.send_command(b"AUTH PLAIN ", encoded_auth, code=235)
elif b"LOGIN" in auth_methods: elif b"LOGIN" in auth_methods:
encoded_username = b2a_base64(username, newline=False) encoded_username = b2a_base64(username, newline=False)
code, res = self.send_command(b"AUTH LOGIN ", encoded_username) self.send_command(b"AUTH LOGIN ", encoded_username, code=334)
if code != b"334":
print(f"Error: got code {code} on AUTH LOGIN")
sys.exit(1)
encoded_password = b2a_base64(password, newline=False)
code, res = self.send_command(encoded_password)
if code != b"235":
print(f"Error: got code {code} on AUTH LOGIN")
sys.exit(1)
else:
print(f"Unsupported authentication method: {', '.join(auth_methods)}")
sys.exit(1)
print(f"Authenticated as {username}") encoded_password = b2a_base64(password, newline=False)
self.send_command(encoded_password, code=235)
else:
raise Error(
f"Unsupported authentication methods: {', '.join(auth_methods)}. Micromail only supports PLAIN and LOGIN."
)
print(f"Authenticated as {username}.")
def write(self, data): def write(self, data):
if self.socket is None: if self.socket is None:
print("No socket to write to") raise Error("No socket to write to.")
sys.exit(1)
n = self.socket.write(data) n = self.socket.write(data)
if n != len(data): if n != len(data):
print(f"Failure writing data <{data}>: not all bytes written") print(f"Failure writing data <{data}>: not all bytes written")
def send_command(self, *cmd): def send_command(self, *cmd, code=[]):
self.write(b" ".join(cmd) + b"\r\n") cmd = b" ".join(cmd)
self.write(cmd + b"\r\n")
return self.read_res(cmd, code=code)
def read_res(self, cmd="", code=[]):
response = [] response = []
next = True next = True
while next: while next:
code = self.socket.read(3) r_code = int(self.socket.read(3))
next = self.socket.read(1) == b"-" next = self.socket.read(1) == b"-"
response.append(self.socket.readline().strip()) response.append(self.socket.readline().strip())
return code, response if isinstance(code, int):
code = [code]
if code != [] and r_code not in code:
raise SMTPError(cmd, r_code, b" ".join(response).decode())
return r_code, response
def ehlo(self): def ehlo(self):
code, res = self.send_command(b"EHLO _") _, res = self.send_command(b"EHLO _", code=250)
if code != b"250":
print(f"Error: got code {code} on EHLO")
sys.exit(1)
return res return res
def start_starttls(self): def start_starttls(self):
code, res = self.send_command(b"STARTTLS") self.send_command(b"STARTTLS", code=220)
if code != b"220":
print(f"Error: got code {code} on STARTTLS")
sys.exit(1)
print(res)
self.socket = ssl.wrap_socket(self.socket) self.socket = ssl.wrap_socket(self.socket)
def new_message(self, to, sender=None): def new_message(self, to, sender=None):
code, _ = self.send_command(b"RSET") self.send_command(b"RSET", code=250)
if code != b"250":
print(f"Error: got code {code} on RSET")
sys.exit(1)
if sender is None: if sender is None:
sender = self.username sender = self.username
@ -145,20 +140,12 @@ class MicromailClient:
self.sender = sender self.sender = sender
self.to = to self.to = to
code, _ = self.send_command(f"MAIL FROM:<{sender}>".encode()) self.send_command(f"MAIL FROM:<{sender}>".encode(), code=250)
if code != b"250":
print(f"Error: got code {code} on MAIL FROM:<{sender}>")
sys.exit(1)
for recipient in to: for recipient in to:
code, _ = self.send_command(f"RCPT TO:<{recipient}>".encode()) self.send_command(f"RCPT TO:<{recipient}>".encode(), code=[250, 251])
if code not in [b"250", b"251"]:
print(f"Error: got code {code} on RCPT TO:<{recipient}>")
code, _ = self.send_command(b"DATA") self.send_command(b"DATA", code=354)
if code != b"354":
print(f"Error: got code {code} on DATA")
sys.exit(1)
def headers(self, headers={}): def headers(self, headers={}):
date = headers.get("date", datetime.now(tz=timezone(timedelta()))) date = headers.get("date", datetime.now(tz=timezone(timedelta())))
@ -188,21 +175,27 @@ class MicromailClient:
self.write(content + b"\r\n") self.write(content + b"\r\n")
def send(self): def send(self):
code, res = self.send_command(b"\r\n.") self.send_command(b"\r\n.", code=250)
if code != b"250":
print(f"Error: got code {code} on send")
print(res)
sys.exit(1)
print("Message sent successfully") print("Message sent successfully")
def quit(self): def quit(self):
code, _ = self.send_command(b"QUIT") self.send_command(b"QUIT", code=221)
if code != b"221":
print(f"Error: got code {code} on QUIT")
print("Disconnected from server") print("Disconnected from server")
self.socket.close() self.socket.close()
class Error(Exception):
pass
class SMTPError(Error):
def __init__(self, cmd, code, message):
super().__init__(f"{code} {message}")
self.cmd = cmd
self.code = code
MONTHS = [ MONTHS = [
"Jan", "Jan",
"Feb", "Feb",