Compare commits

...

4 commits

4 changed files with 390 additions and 109 deletions

2
.mypy.ini Normal file
View file

@ -0,0 +1,2 @@
[mypy]
mypy_path = $MYPY_CONFIG_FILE_DIR/.typestubs/

234
.typestubs/socket.pyi Normal file
View file

@ -0,0 +1,234 @@
import sys
from enum import IntEnum
import _socket
from _socket import CAPI as CAPI
from _socket import EAI_AGAIN as EAI_AGAIN
from _socket import EAI_BADFLAGS as EAI_BADFLAGS
from _socket import EAI_FAIL as EAI_FAIL
from _socket import EAI_FAMILY as EAI_FAMILY
from _socket import EAI_MEMORY as EAI_MEMORY
from _socket import EAI_NODATA as EAI_NODATA
from _socket import EAI_NONAME as EAI_NONAME
from _socket import EAI_SERVICE as EAI_SERVICE
from _socket import EAI_SOCKTYPE as EAI_SOCKTYPE
from _socket import INADDR_ALLHOSTS_GROUP as INADDR_ALLHOSTS_GROUP
from _socket import INADDR_ANY as INADDR_ANY
from _socket import INADDR_BROADCAST as INADDR_BROADCAST
from _socket import INADDR_LOOPBACK as INADDR_LOOPBACK
from _socket import INADDR_MAX_LOCAL_GROUP as INADDR_MAX_LOCAL_GROUP
from _socket import INADDR_NONE as INADDR_NONE
from _socket import INADDR_UNSPEC_GROUP as INADDR_UNSPEC_GROUP
from _socket import IP_ADD_MEMBERSHIP as IP_ADD_MEMBERSHIP
from _socket import IP_DROP_MEMBERSHIP as IP_DROP_MEMBERSHIP
from _socket import IP_HDRINCL as IP_HDRINCL
from _socket import IP_MULTICAST_IF as IP_MULTICAST_IF
from _socket import IP_MULTICAST_LOOP as IP_MULTICAST_LOOP
from _socket import IP_MULTICAST_TTL as IP_MULTICAST_TTL
from _socket import IP_OPTIONS as IP_OPTIONS
from _socket import IP_TOS as IP_TOS
from _socket import IP_TTL as IP_TTL
from _socket import IPPORT_RESERVED as IPPORT_RESERVED
from _socket import IPPORT_USERRESERVED as IPPORT_USERRESERVED
from _socket import IPPROTO_AH as IPPROTO_AH
from _socket import IPPROTO_DSTOPTS as IPPROTO_DSTOPTS
from _socket import IPPROTO_EGP as IPPROTO_EGP
from _socket import IPPROTO_ESP as IPPROTO_ESP
from _socket import IPPROTO_FRAGMENT as IPPROTO_FRAGMENT
from _socket import IPPROTO_HOPOPTS as IPPROTO_HOPOPTS
from _socket import IPPROTO_ICMP as IPPROTO_ICMP
from _socket import IPPROTO_ICMPV6 as IPPROTO_ICMPV6
from _socket import IPPROTO_IDP as IPPROTO_IDP
from _socket import IPPROTO_IGMP as IPPROTO_IGMP
from _socket import IPPROTO_IP as IPPROTO_IP
from _socket import IPPROTO_IPV6 as IPPROTO_IPV6
from _socket import IPPROTO_NONE as IPPROTO_NONE
from _socket import IPPROTO_PIM as IPPROTO_PIM
from _socket import IPPROTO_PUP as IPPROTO_PUP
from _socket import IPPROTO_RAW as IPPROTO_RAW
from _socket import IPPROTO_ROUTING as IPPROTO_ROUTING
from _socket import IPPROTO_SCTP as IPPROTO_SCTP
from _socket import IPPROTO_TCP as IPPROTO_TCP
from _socket import IPPROTO_UDP as IPPROTO_UDP
from _socket import IPV6_CHECKSUM as IPV6_CHECKSUM
from _socket import IPV6_DONTFRAG as IPV6_DONTFRAG
from _socket import IPV6_HOPLIMIT as IPV6_HOPLIMIT
from _socket import IPV6_HOPOPTS as IPV6_HOPOPTS
from _socket import IPV6_JOIN_GROUP as IPV6_JOIN_GROUP
from _socket import IPV6_LEAVE_GROUP as IPV6_LEAVE_GROUP
from _socket import IPV6_MULTICAST_HOPS as IPV6_MULTICAST_HOPS
from _socket import IPV6_MULTICAST_IF as IPV6_MULTICAST_IF
from _socket import IPV6_MULTICAST_LOOP as IPV6_MULTICAST_LOOP
from _socket import IPV6_PKTINFO as IPV6_PKTINFO
from _socket import IPV6_RECVRTHDR as IPV6_RECVRTHDR
from _socket import IPV6_RECVTCLASS as IPV6_RECVTCLASS
from _socket import IPV6_RTHDR as IPV6_RTHDR
from _socket import IPV6_TCLASS as IPV6_TCLASS
from _socket import IPV6_UNICAST_HOPS as IPV6_UNICAST_HOPS
from _socket import IPV6_V6ONLY as IPV6_V6ONLY
from _socket import NI_DGRAM as NI_DGRAM
from _socket import NI_MAXHOST as NI_MAXHOST
from _socket import NI_MAXSERV as NI_MAXSERV
from _socket import NI_NAMEREQD as NI_NAMEREQD
from _socket import NI_NOFQDN as NI_NOFQDN
from _socket import NI_NUMERICHOST as NI_NUMERICHOST
from _socket import NI_NUMERICSERV as NI_NUMERICSERV
from _socket import SHUT_RD as SHUT_RD
from _socket import SHUT_RDWR as SHUT_RDWR
from _socket import SHUT_WR as SHUT_WR
from _socket import SO_ACCEPTCONN as SO_ACCEPTCONN
from _socket import SO_BROADCAST as SO_BROADCAST
from _socket import SO_DEBUG as SO_DEBUG
from _socket import SO_DONTROUTE as SO_DONTROUTE
from _socket import SO_ERROR as SO_ERROR
from _socket import SO_KEEPALIVE as SO_KEEPALIVE
from _socket import SO_LINGER as SO_LINGER
from _socket import SO_OOBINLINE as SO_OOBINLINE
from _socket import SO_RCVBUF as SO_RCVBUF
from _socket import SO_RCVLOWAT as SO_RCVLOWAT
from _socket import SO_RCVTIMEO as SO_RCVTIMEO
from _socket import SO_REUSEADDR as SO_REUSEADDR
from _socket import SO_SNDBUF as SO_SNDBUF
from _socket import SO_SNDLOWAT as SO_SNDLOWAT
from _socket import SO_SNDTIMEO as SO_SNDTIMEO
from _socket import SO_TYPE as SO_TYPE
from _socket import SOL_IP as SOL_IP
from _socket import SOL_SOCKET as SOL_SOCKET
from _socket import SOL_TCP as SOL_TCP
from _socket import SOL_UDP as SOL_UDP
from _socket import SOMAXCONN as SOMAXCONN
from _socket import TCP_FASTOPEN as TCP_FASTOPEN
from _socket import TCP_KEEPCNT as TCP_KEEPCNT
from _socket import TCP_KEEPINTVL as TCP_KEEPINTVL
from _socket import TCP_MAXSEG as TCP_MAXSEG
from _socket import TCP_NODELAY as TCP_NODELAY
from _socket import SocketType as SocketType
from _socket import _Address as _Address
from _socket import _RetAddress as _RetAddress
from _socket import close as close
from _socket import dup as dup
from _socket import getdefaulttimeout as getdefaulttimeout
from _socket import gethostbyaddr as gethostbyaddr
from _socket import gethostbyname as gethostbyname
from _socket import gethostbyname_ex as gethostbyname_ex
from _socket import gethostname as gethostname
from _socket import getnameinfo as getnameinfo
from _socket import getprotobyname as getprotobyname
from _socket import getservbyname as getservbyname
from _socket import getservbyport as getservbyport
from _socket import has_ipv6 as has_ipv6
from _socket import htonl as htonl
from _socket import htons as htons
from _socket import if_indextoname as if_indextoname
from _socket import if_nameindex as if_nameindex
from _socket import if_nametoindex as if_nametoindex
from _socket import inet_aton as inet_aton
from _socket import inet_ntoa as inet_ntoa
from _socket import inet_ntop as inet_ntop
from _socket import inet_pton as inet_pton
from _socket import ntohl as ntohl
from _socket import ntohs as ntohs
from _socket import setdefaulttimeout as setdefaulttimeout
from _typeshed import ReadableBuffer
class AddressFamily(IntEnum):
AF_INET = 2
AF_INET6 = 10
AF_APPLETALK = 5
AF_IPX = 4
AF_SNA = 22
AF_UNSPEC = 0
if sys.platform != "darwin":
AF_IRDA = 23
if sys.platform != "win32":
AF_ROUTE = 16
AF_UNIX = 1
if sys.platform == "darwin":
AF_SYSTEM = 32
if sys.platform != "win32" and sys.platform != "darwin":
AF_ASH = 18
AF_ATMPVC = 8
AF_ATMSVC = 20
AF_AX25 = 3
AF_BRIDGE = 7
AF_ECONET = 19
AF_KEY = 15
AF_LLC = 26
AF_NETBEUI = 13
AF_NETROM = 6
AF_PPPOX = 24
AF_ROSE = 11
AF_SECURITY = 14
AF_WANPIPE = 25
AF_X25 = 9
if sys.platform == "linux":
AF_CAN = 29
AF_PACKET = 17
AF_RDS = 21
AF_TIPC = 30
AF_ALG = 38
AF_NETLINK = 16
AF_VSOCK = 40
AF_QIPCRTR = 42
if sys.platform != "linux":
AF_LINK = 33
if sys.platform != "darwin" and sys.platform != "linux":
AF_BLUETOOTH = 32
if sys.platform == "win32" and sys.version_info >= (3, 12):
AF_HYPERV = 34
if (
sys.platform != "linux"
and sys.platform != "win32"
and sys.platform != "darwin"
and sys.version_info >= (3, 12)
):
# FreeBSD >= 14.0
AF_DIVERT = 44
class SocketKind(IntEnum):
SOCK_STREAM = 1
SOCK_DGRAM = 2
SOCK_RAW = 3
SOCK_RDM = 4
SOCK_SEQPACKET = 5
if sys.platform == "linux":
SOCK_CLOEXEC = 524288
SOCK_NONBLOCK = 2048
SOCK_STREAM = SocketKind.SOCK_STREAM
SOCK_DGRAM = SocketKind.SOCK_DGRAM
SOCK_RAW = SocketKind.SOCK_RAW
SOCK_RDM = SocketKind.SOCK_RDM
SOCK_SEQPACKET = SocketKind.SOCK_SEQPACKET
if sys.platform == "linux":
SOCK_CLOEXEC = SocketKind.SOCK_CLOEXEC
SOCK_NONBLOCK = SocketKind.SOCK_NONBLOCK
class socket(_socket.socket):
def __init__(
self,
family: AddressFamily | int = -1,
type: SocketKind | int = -1,
proto: int = -1,
/,
) -> None: ...
def write(self, b: ReadableBuffer) -> int | None: ...
def read(self, size: int, /) -> bytes: ...
def readline(self, /) -> bytes: ...
def getaddrinfo(
host: bytes | str | None,
port: bytes | str | int | None,
family: int = 0,
type: int = 0,
proto: int = 0,
flags: int = 0,
) -> list[
tuple[
AddressFamily,
SocketKind,
int,
str,
tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
]
]: ...

18
.typestubs/ssl.pyi Normal file
View file

@ -0,0 +1,18 @@
import socket
from _typeshed import StrOrBytesPath
def wrap_socket(
sock: socket.socket,
keyfile: StrOrBytesPath | None = None,
certfile: StrOrBytesPath | None = None,
server_side: bool = False,
cert_reqs: int = ...,
ssl_version: int = ...,
ca_certs: str | None = None,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
ciphers: str | None = None,
) -> SSLSocket: ...
class SSLSocket(socket.socket): ...

View file

@ -1,42 +1,57 @@
import errno
import socket
import ssl
import sys
from binascii import b2a_base64
from datetime import datetime, timedelta, timezone
class MicromailClient:
def __init__(self, host, port, ssl=False, starttls=False):
def __init__(
self, host: str, port: int = 0, ssl: bool = False, starttls: bool = False
):
self.host = host
self.port = port
if ssl and starttls:
print("Cannot use both SSL and STARTTLS at the same time.")
sys.exit(1)
raise Error("Cannot use both SSL and STARTTLS at the same time.")
self.ssl = ssl
self.starttls = starttls
self.socket = None
self.username = None
if port == 0:
if ssl:
port = 465
elif starttls:
port = 587
else:
port = 25
self.port = port
def connect(self):
for addr_info in socket.getaddrinfo(
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP
):
self.socket: None | socket.socket = None
self.username: str | None = None
def connect(self) -> None:
con_errors = []
try:
ai = socket.getaddrinfo(
self.host, self.port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP
)
except OSError as e:
ai = []
con_errors.append(e)
for addr_info in ai:
af, socktype, proto, _, sa = addr_info
print(f"Trying {sa}")
try:
self.socket = socket.socket(af, socktype, proto)
except OSError:
print(f"Could not create socket: {sa}")
except OSError as e:
con_errors.append(e)
self.socket = None
continue
try:
self.socket.connect(sa)
except OSError:
print(f"Could not connect to {sa}")
except OSError as e:
con_errors.append(e)
self.socket.close()
self.socket = None
continue
@ -44,98 +59,107 @@ class MicromailClient:
break
if self.socket is None:
print("Could not open socket.")
sys.exit(1)
err = ", ".join(
[errno.errorcode.get(e.errno or 0, str(e)) for e in con_errors]
)
raise Error(
f"Could not connect to server: {err}. Are you connected to the network?"
)
print("Connected to server.")
if self.ssl:
self.socket = ssl.wrap_socket(self.socket)
code = self.socket.read(3)
self.socket.readline()
if code != b"220":
print(f"Error: got code {code} on initial connection")
sys.exit(1)
self.read_res(code=220)
self.features = self.ehlo()
if self.starttls and b"STARTTLS" in self.features:
if self.starttls and "STARTTLS" in self.features:
self.start_starttls()
def login(self, username, password):
def login(self, username: str, password: str) -> None:
self.username = username
auth_methods = None
for feature in self.features:
if feature.startswith(b"AUTH"):
if feature.startswith("AUTH"):
auth_methods = feature[4:].upper().split()
break
if auth_methods is None:
print("No authentication methods available")
sys.exit(1)
raise Error(
f"No authentication methods available. Server features: {self.features}"
)
if b"PLAIN" in auth_methods:
encoded_auth = b2a_base64(f"\0{username}\0{password}", newline=False)
code, res = self.send_command(b"AUTH PLAIN ", encoded_auth)
elif b"LOGIN" in auth_methods:
encoded_username = b2a_base64(username, newline=False)
code, res = self.send_command(b"AUTH LOGIN ", encoded_username)
if code != b"334":
print(f"Error: got code {code} on AUTH LOGIN")
sys.exit(1)
encoded_password = b2a_base64(password, newline=False)
code, res = self.send_command(encoded_password)
if code != b"235":
print(f"Error: got code {code} on AUTH LOGIN")
sys.exit(1)
if "PLAIN" in auth_methods:
encoded_auth = b2a_base64(
f"\0{username}\0{password}".encode(), newline=False
)
self.send_command("AUTH PLAIN ", encoded_auth, code=235)
elif "LOGIN" in auth_methods:
encoded_username = b2a_base64(username.encode(), newline=False)
self.send_command("AUTH LOGIN ", encoded_username, code=334)
encoded_password = b2a_base64(password.encode(), newline=False)
self.send_command(encoded_password, code=235)
else:
print(f"Unsupported authentication method: {', '.join(auth_methods)}")
sys.exit(1)
raise Error(
f"Unsupported authentication methods: {', '.join(auth_methods)}. Micromail only supports PLAIN and LOGIN."
)
print(f"Authenticated as {username}")
print(f"Authenticated as {username}.")
def write(self, data):
def write(self, data: str | bytes) -> None:
if self.socket is None:
print("No socket to write to")
sys.exit(1)
raise Error("No socket to write to.")
if isinstance(data, str):
data = data.encode()
n = self.socket.write(data)
if n != len(data):
print(f"Failure writing data <{data}>: not all bytes written")
print(f"Failure writing data <{data.decode()}>: not all bytes written")
def send_command(self, *cmd):
self.write(b" ".join(cmd) + b"\r\n")
def send_command(
self, *cmd: str | bytes, code: int | list[int] = []
) -> tuple[int, list[str]]:
cmd_ba = (
b" ".join([c if isinstance(c, bytes) else c.encode() for c in cmd])
+ b"\r\n"
)
self.write(cmd_ba)
return self.read_res(cmd_ba.decode(), code=code)
def read_res(
self, cmd: str = "", code: int | list[int] = []
) -> tuple[int, list[str]]:
if self.socket is None:
raise NoSocketError
response = []
next = True
while next:
code = self.socket.read(3)
r_code = int(self.socket.read(3))
next = self.socket.read(1) == b"-"
response.append(self.socket.readline().strip())
response.append(self.socket.readline().strip().decode())
return code, response
if isinstance(code, int):
code = [code]
if code != [] and r_code not in code:
raise SMTPError(cmd, r_code, " ".join(response))
return r_code, response
def ehlo(self):
code, res = self.send_command(b"EHLO _")
if code != b"250":
print(f"Error: got code {code} on EHLO")
sys.exit(1)
def ehlo(self) -> list[str]:
_, res = self.send_command("EHLO _", code=250)
return res
def start_starttls(self):
code, res = self.send_command(b"STARTTLS")
if code != b"220":
print(f"Error: got code {code} on STARTTLS")
sys.exit(1)
def start_starttls(self) -> None:
if self.socket is None:
raise NoSocketError
print(res)
self.send_command("STARTTLS", code=220)
self.socket = ssl.wrap_socket(self.socket)
def new_message(self, to, sender=None):
code, _ = self.send_command(b"RSET")
if code != b"250":
print(f"Error: got code {code} on RSET")
sys.exit(1)
def new_message(self, to: str | list[str], sender: str | None = None) -> None:
self.send_command("RSET", code=250)
if sender is None:
sender = self.username
@ -145,64 +169,67 @@ class MicromailClient:
self.sender = sender
self.to = to
code, _ = self.send_command(f"MAIL FROM:<{sender}>".encode())
if code != b"250":
print(f"Error: got code {code} on MAIL FROM:<{sender}>")
sys.exit(1)
self.send_command(f"MAIL FROM:<{sender}>", code=250)
for recipient in to:
code, _ = self.send_command(f"RCPT TO:<{recipient}>".encode())
if code not in [b"250", b"251"]:
print(f"Error: got code {code} on RCPT TO:<{recipient}>")
self.send_command(f"RCPT TO:<{recipient}>", code=[250, 251])
code, _ = self.send_command(b"DATA")
if code != b"354":
print(f"Error: got code {code} on DATA")
sys.exit(1)
self.send_command("DATA", code=354)
def headers(self, headers={}):
date = headers.get("date", datetime.now(tz=timezone(timedelta())))
if isinstance(date, datetime):
date = format_date(date)
self.write(f"Date: {date}\r\n".encode())
def headers(self, headers: dict[str, str]) -> None:
date = headers.get("date", format_date(datetime.now(tz=timezone(timedelta()))))
self.write(f"Date: {date}\r\n")
sender = headers.get("from", self.sender)
self.write(f"From: {sender}\r\n".encode())
self.write(f"From: {sender}\r\n")
if "subject" in headers:
self.write(f"Subject: {headers['subject']}\r\n".encode())
self.write(f"Subject: {headers['subject']}\r\n")
to = headers.get("to", self.to)
if to is None:
to = self.to
if isinstance(to, str):
to = [to]
self.write(f"To: {', '.join(to)}\r\n".encode())
self.write(f"To: {', '.join(to)}\r\n")
self.write("\r\n".encode())
self.write("\r\n")
def write_line(self, content):
if isinstance(content, str):
content = content.encode()
if content.startswith(b"."):
content = b"." + content
def write_line(self, content: str) -> None:
if content.startswith("."):
content = "." + content
self.write(content + b"\r\n")
self.write(content + "\r\n")
def send(self):
code, res = self.send_command(b"\r\n.")
if code != b"250":
print(f"Error: got code {code} on send")
print(res)
sys.exit(1)
def send(self) -> None:
self.send_command("\r\n.", code=250)
print("Message sent successfully")
def quit(self):
code, _ = self.send_command(b"QUIT")
if code != b"221":
print(f"Error: got code {code} on QUIT")
def quit(self) -> None:
if self.socket is None:
raise NoSocketError
self.send_command(b"QUIT", code=221)
print("Disconnected from server")
self.socket.close()
class Error(Exception):
pass
class NoSocketError(Error):
pass
class SMTPError(Error):
def __init__(self, cmd: str, code: int, message: str):
super().__init__(f"{cmd} -> {code} {message}")
self.cmd = cmd
self.code = code
MONTHS = [
"Jan",
"Feb",
@ -219,7 +246,7 @@ MONTHS = [
]
def format_date(date):
def format_date(date: datetime) -> str:
date_str = f"{date.day} {MONTHS[date.month]} {str(date.year)[-2:]}"
time_str = f"{date.hour:02}:{date.minute:02}:{date.second:02} UT"
return f"{date_str} {time_str}"