micromail/micromail/__init__.py

222 lines
6.4 KiB
Python
Raw Normal View History

import socket
import ssl
import sys
from binascii import b2a_base64
from datetime import datetime, timedelta, timezone
class MicromailClient:
def __init__(self, host, port, ssl=False, starttls=False):
self.host = host
self.port = port
if ssl and starttls:
print("Cannot use both SSL and STARTTLS at the same time.")
sys.exit(1)
self.ssl = ssl
self.starttls = starttls
self.socket = None
self.username = None
def connect(self):
for addr_info in socket.getaddrinfo(
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP
):
af, socktype, proto, _, sa = addr_info
print(f"Trying {sa}")
try:
self.socket = socket.socket(af, socktype, proto)
except OSError:
print(f"Could not create socket: {sa}")
self.socket = None
continue
try:
self.socket.connect(sa)
except OSError:
print(f"Could not connect to {sa}")
self.socket.close()
self.socket = None
continue
break
if self.socket is None:
print("Could not open socket.")
sys.exit(1)
if self.ssl:
self.socket = ssl.wrap_socket(self.socket)
code = self.socket.read(3)
self.socket.readline()
if code != b"220":
print(f"Error: got code {code} on initial connection")
sys.exit(1)
self.features = self.ehlo()
if self.starttls and "STARTTLS" in self.features:
self.starttls()
def login(self, username, password):
self.username = username
auth_methods = None
for feature in self.features:
if feature.startswith(b"AUTH"):
auth_methods = feature[4:].upper().split()
break
if auth_methods is None:
print("No authentication methods available")
sys.exit(1)
if b"PLAIN" in auth_methods:
encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False)
code, res = self.send_command(b"AUTH PLAIN ", encoded_auth)
elif b"LOGIN" in auth_methods:
encoded_username = b2a_base64(username, newline=False)
code, res = self.send_command(b"AUTH LOGIN ", encoded_username)
if code != b"334":
print(f"Error: got code {code} on AUTH LOGIN")
sys.exit(1)
encoded_password = b2a_base64(password, newline=False)
code, res = self.send_command(encoded_password)
if code != b"235":
print(f"Error: got code {code} on AUTH LOGIN")
sys.exit(1)
else:
print(f"Unsupported authentication method: {', '.join(auth_methods)}")
sys.exit(1)
print(f"Authenticated as {username}")
def write(self, data):
if self.socket is None:
print("No socket to write to")
sys.exit(1)
n = self.socket.write(data)
if n != len(data):
print(f"Failure writing data <{data}>: not all bytes written")
def send_command(self, *cmd):
self.write(b" ".join(cmd) + b"\r\n")
response = []
next = True
while next:
code = self.socket.read(3)
next = self.socket.read(1) == b"-"
response.append(self.socket.readline().strip())
return code, response
def ehlo(self):
code, res = self.send_command(b"EHLO _")
if code != b"250":
print(f"Error: got code {code} on EHLO")
sys.exit(1)
return res
def starttls(self):
code, res = self.send_command(b"STARTTLS")
if code != b"220":
print(f"Error: got code {code} on STARTTLS")
sys.exit(1)
self.socket = ssl.wrap_socket(self.socket)
def new_message(self, to, sender=None):
code, _ = self.send_command(b"RSET")
if code != b"250":
print(f"Error: got code {code} on RSET")
sys.exit(1)
if sender is None:
sender = self.username
if isinstance(to, str):
to = [to]
self.sender = sender
self.to = to
code, _ = self.send_command(f"MAIL FROM:<{sender}>".encode())
if code != b"250":
print(f"Error: got code {code} on MAIL FROM:<{sender}>")
sys.exit(1)
for recipient in to:
code, _ = self.send_command(f"RCPT TO:<{recipient}>".encode())
if code not in [b"250", b"251"]:
print(f"Error: got code {code} on RCPT TO:<{recipient}>")
code, _ = self.send_command(b"DATA")
if code != b"354":
print(f"Error: got code {code} on DATA")
sys.exit(1)
def headers(self, headers={}):
date = headers.get("date", datetime.now(tz=timezone(timedelta())))
if isinstance(date, datetime):
date = format_date(date)
self.write(f"Date: {date}\r\n".encode())
sender = headers.get("from", self.sender)
self.write(f"From: {sender}\r\n".encode())
if "subject" in headers:
self.write(f"Subject: {headers['subject']}\r\n".encode())
to = headers.get("to", self.to)
if isinstance(to, str):
to = [to]
self.write(f"To: {', '.join(to)}\r\n".encode())
def write_message(self, content):
if isinstance(content, str):
content = content.encode()
if not content.endswith(b"\r\n"):
content += b"\r\n"
self.write(content)
def send(self):
code, _ = self.send_command(b"\r\n.")
if code != b"250":
print(f"Error: got code {code} on send")
sys.exit(1)
print("Message sent successfully")
def quit(self):
code, _ = self.send_command(b"QUIT")
if code != b"221":
print(f"Error: got code {code} on QUIT")
print("Disconnected from server")
MONTHS = [
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sep",
"Oct",
"Nov",
"Dec",
]
def format_date(date):
date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}"
time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT"
return f"{date_str} {time_str}"