Initialize micromail project with basic structure and functionality
This commit is contained in:
parent
e249b4061c
commit
65c23ca231
5 changed files with 245 additions and 2 deletions
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
|
@ -0,0 +1,5 @@
|
|||
__pycache__
|
||||
|
||||
/pyproject.toml
|
||||
/uv.lock
|
||||
/.python-version
|
17
.pre-commit-config.yaml
Normal file
17
.pre-commit-config.yaml
Normal file
|
@ -0,0 +1,17 @@
|
|||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v5.0.0
|
||||
hooks:
|
||||
- id: check-yaml
|
||||
- id: end-of-file-fixer
|
||||
- id: trailing-whitespace
|
||||
- repo: https://github.com/pycqa/isort
|
||||
rev: 6.0.1
|
||||
hooks:
|
||||
- id: isort
|
||||
name: isort (python)
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.9.10
|
||||
hooks:
|
||||
- id: ruff
|
||||
- id: ruff-format
|
|
@ -2,4 +2,6 @@
|
|||
|
||||

|
||||
|
||||
Tiny SMTP client for Micropython
|
||||
Tiny SMTP client for Micropython
|
||||
|
||||
Inspired from [shawwwn/uMail](https://github.com/shawwwn/uMail).
|
||||
|
|
2
logo.svg
2
logo.svg
|
@ -22,4 +22,4 @@
|
|||
y="0" />
|
||||
<use xlink:href="#child-svg" filter="url(#alpha-to-white)"
|
||||
transform="matrix(8,0,0,8,64,64)" />
|
||||
</svg>
|
||||
</svg>
|
||||
|
|
Before Width: | Height: | Size: 1.2 KiB After Width: | Height: | Size: 1.2 KiB |
219
micromail/__init__.py
Normal file
219
micromail/__init__.py
Normal file
|
@ -0,0 +1,219 @@
|
|||
import socket
|
||||
import ssl
|
||||
import sys
|
||||
from binascii import b2a_base64
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
|
||||
class MicromailClient:
|
||||
def __init__(self, host, port, ssl=False, starttls=False):
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
if ssl and starttls:
|
||||
print("Cannot use both SSL and STARTTLS at the same time.")
|
||||
sys.exit(1)
|
||||
|
||||
self.ssl = ssl
|
||||
self.starttls = starttls
|
||||
|
||||
self.socket = None
|
||||
self.username = None
|
||||
|
||||
def connect(self):
|
||||
for addr_info in socket.getaddrinfo(
|
||||
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP
|
||||
):
|
||||
af, socktype, proto, _, sa = addr_info
|
||||
print(f"Trying {sa}")
|
||||
try:
|
||||
self.socket = socket.socket(af, socktype, proto)
|
||||
except OSError:
|
||||
print(f"Could not create socket: {sa}")
|
||||
self.socket = None
|
||||
continue
|
||||
|
||||
try:
|
||||
self.socket.connect(sa)
|
||||
except OSError:
|
||||
print(f"Could not connect to {sa}")
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
if self.socket is None:
|
||||
print("Could not open socket.")
|
||||
sys.exit(1)
|
||||
|
||||
if self.ssl:
|
||||
self.socket = ssl.wrap_socket(self.socket)
|
||||
|
||||
code = self.socket.read(3)
|
||||
self.socket.readline()
|
||||
|
||||
if code != b"220":
|
||||
print(f"Error: got code {code} on initial connection")
|
||||
sys.exit(1)
|
||||
|
||||
self.features = self.ehlo()
|
||||
|
||||
if self.starttls and "STARTTLS" in self.features:
|
||||
self.starttls()
|
||||
|
||||
def login(self, username, password):
|
||||
self.username = username
|
||||
|
||||
auth_methods = None
|
||||
for feature in self.features:
|
||||
if feature.startswith(b"AUTH"):
|
||||
auth_methods = feature[4:].upper().split()
|
||||
break
|
||||
if auth_methods is None:
|
||||
print("No authentication methods available")
|
||||
sys.exit(1)
|
||||
|
||||
if b"PLAIN" in auth_methods:
|
||||
encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False)
|
||||
code, res = self.send_command(b"AUTH PLAIN ", encoded_auth)
|
||||
elif b"LOGIN" in auth_methods:
|
||||
encoded_username = b2a_base64(username, newline=False)
|
||||
code, res = self.send_command(b"AUTH LOGIN ", encoded_username)
|
||||
if code != b"334":
|
||||
print(f"Error: got code {code} on AUTH LOGIN")
|
||||
sys.exit(1)
|
||||
encoded_password = b2a_base64(password, newline=False)
|
||||
code, res = self.send_command(encoded_password)
|
||||
if code != b"235":
|
||||
print(f"Error: got code {code} on AUTH LOGIN")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print(f"Unsupported authentication method: {', '.join(auth_methods)}")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Authenticated as {username}")
|
||||
|
||||
def write(self, data):
|
||||
if self.socket is None:
|
||||
print("No socket to write to")
|
||||
sys.exit(1)
|
||||
|
||||
n = self.socket.write(data)
|
||||
if n != len(data):
|
||||
print(f"Failure writing data <{data}>: not all bytes written")
|
||||
|
||||
def send_command(self, *cmd):
|
||||
self.write(b" ".join(cmd) + b"\r\n")
|
||||
|
||||
response = []
|
||||
next = True
|
||||
while next:
|
||||
code = self.socket.read(3)
|
||||
next = self.socket.read(1) == b"-"
|
||||
response.append(self.socket.readline().strip())
|
||||
|
||||
return code, response
|
||||
|
||||
def ehlo(self):
|
||||
code, res = self.send_command(b"EHLO _")
|
||||
if code != b"250":
|
||||
print(f"Error: got code {code} on EHLO")
|
||||
sys.exit(1)
|
||||
return res
|
||||
|
||||
def starttls(self):
|
||||
code, res = self.send_command(b"STARTTLS")
|
||||
if code != b"220":
|
||||
print(f"Error: got code {code} on STARTTLS")
|
||||
sys.exit(1)
|
||||
|
||||
self.socket = ssl.wrap_socket(self.socket)
|
||||
|
||||
def new_message(self, to, sender=None):
|
||||
code, _ = self.send_command(b"RSET")
|
||||
if code != b"250":
|
||||
print(f"Error: got code {code} on RSET")
|
||||
sys.exit(1)
|
||||
|
||||
if sender is None:
|
||||
sender = self.username
|
||||
if isinstance(to, str):
|
||||
to = [to]
|
||||
|
||||
self.sender = sender
|
||||
self.to = to
|
||||
|
||||
code, _ = self.send_command(f"MAIL FROM:<{sender}>".encode())
|
||||
if code != b"250":
|
||||
print(f"Error: got code {code} on MAIL FROM:<{sender}>")
|
||||
sys.exit(1)
|
||||
|
||||
for recipient in to:
|
||||
code, _ = self.send_command(f"RCPT TO:<{recipient}>".encode())
|
||||
if code not in [b"250", b"251"]:
|
||||
print(f"Error: got code {code} on RCPT TO:<{recipient}>")
|
||||
|
||||
code, _ = self.send_command(b"DATA")
|
||||
if code != b"354":
|
||||
print(f"Error: got code {code} on DATA")
|
||||
sys.exit(1)
|
||||
|
||||
def headers(self, headers={}):
|
||||
date = headers.get("date", datetime.now(tz=timezone(timedelta())))
|
||||
if isinstance(date, datetime):
|
||||
date = format_date(date)
|
||||
self.write(f"Date: {date}\r\n".encode())
|
||||
|
||||
sender = headers.get("from", self.sender)
|
||||
self.write(f"From: {sender}\r\n".encode())
|
||||
|
||||
if "subject" in headers:
|
||||
self.write(f"Subject: {headers['subject']}\r\n".encode())
|
||||
|
||||
to = headers.get("to", self.to)
|
||||
self.write(f"To: {', '.join(to)}\r\n".encode())
|
||||
|
||||
def write_message(self, content):
|
||||
if isinstance(content, str):
|
||||
content = content.encode()
|
||||
|
||||
if not content.endswith(b"\r\n"):
|
||||
content += b"\r\n"
|
||||
|
||||
self.write(content)
|
||||
|
||||
def send(self):
|
||||
code, _ = self.send_command(b"\r\n.")
|
||||
if code != b"250":
|
||||
print(f"Error: got code {code} on send")
|
||||
sys.exit(1)
|
||||
print("Message sent successfully")
|
||||
|
||||
def quit(self):
|
||||
code, _ = self.send_command(b"QUIT")
|
||||
if code != b"221":
|
||||
print(f"Error: got code {code} on QUIT")
|
||||
print("Disconnected from server")
|
||||
|
||||
|
||||
MONTHS = [
|
||||
"Jan",
|
||||
"Feb",
|
||||
"Mar",
|
||||
"Apr",
|
||||
"May",
|
||||
"Jun",
|
||||
"Jul",
|
||||
"Aug",
|
||||
"Sep",
|
||||
"Oct",
|
||||
"Nov",
|
||||
"Dec",
|
||||
]
|
||||
|
||||
|
||||
def format_date(date):
|
||||
date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}"
|
||||
time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT"
|
||||
return f"{date_str} {time_str}"
|
Loading…
Add table
Add a link
Reference in a new issue