A small re-write of the sender into a Click driven CLI, making hard-codes into CLI arguments.
#!/usr/bin/env python3
import click
import random
import re
import socket
import time
from typing import Tuple
class NMEASender:
def __init__(
self,
connection_type: socket.SocketKind,
remote_host: str,
remote_port: int,
network: bool,
) -> None:
self.host = remote_host
self.port = remote_port
self.conn_type = connection_type
self.network = network
def checksum_for(self, sentence: str) -> str:
"""
Sample sentence is $XXXXX,data,goes,here*
No checksum present, this calculates it.
"""
chksumdata = re.sub(
"(\n|\r\n)", "", sentence[sentence.find("$") + 1 : sentence.find("*")]
)
csum = 0
for c in chksumdata:
csum ^= ord(c)
# Mangle the result appropriately
return str(hex(csum)).lstrip("0x").upper()
def send_errpm(self, source: str, engine_number: int, rpm: int, pitch: int) -> None:
"""
Sends engine RPM as a NMEA 0183 sentence.
"""
if source not in ("E", "S"):
raise Exception("Source must be E(ngine) or S(haft)")
if engine_number not in (0, 1, 2):
raise Exception(
"Engine number must be 0 (centre), 1 (port) or 2 (starboard)"
)
if rpm < 0:
raise Exception("RPM cannot be negative")
if pitch < -100 or pitch > 100:
raise Exception("Pitch is a percentile number between -100 and 100")
self.send_0183_sentence(
self.compose_sentence(
talker="ERRPM", values=(source, engine_number, rpm, pitch, "A")
)
)
def compose_sentence(self, talker: str, values: Tuple) -> str:
"""
Given a talker name and a tuple of values, build a sentence of the form
$TALKR,val1,val2,...,valN*CK
"""
s = f"${talker},{','.join([str(x) for x in values])}*"
return f"{s}{self.checksum_for(s)}"
def send_0183_sentence(self, sentence: str) -> None:
# NMEA 0183 sentences must
# * start with $
# * contain comma-delimited values
# * carry a * marker after the values
# * have a hex checksum after the *
# * end with \r\n
# For python to be happy with sockets, encode as bytestring
if not sentence.startswith("$"):
raise Exception("Sentence does not start with a $")
if not "*" in sentence:
raise Exception("Sentence does not contain a *")
print(f"[{self.host}:{self.port}] Sending {sentence}")
if self.network:
with socket.socket(socket.AF_INET, self.conn_type) as s:
if s.type == socket.SocketKind.SOCK_DGRAM:
s.sendto(f"{sentence}\r\n".encode(), (self.host, self.port))
elif s.type == socket.SocketKind.SOCK_STREAM:
s.settimeout(0.5)
try:
s.connect((self.host, self.port))
s.sendall(f"sentence\r\n".encode())
except socket.timeout:
click.echo(f"[{self.host}:{self.port}] Socket timeout")
else:
raise Exception("Expecting DGRAM or STREAM socket")
@click.group()
@click.option(
"--conn-type",
type=click.Choice(("UDP", "TCP")),
default="UDP",
help="Choose UDP or TCP transport",
)
@click.option(
"--remote-host", type=click.STRING, help="The remote host to send packets to"
)
@click.option(
"--remote-port",
type=click.INT,
default=10110,
help="The listening port on the remote host",
)
@click.option("--fake-it/--read-sensors", default=False, help="Fake values for sending")
@click.option(
"--network/--no-network",
default=True,
help="Whether to actually send packets on the network",
)
@click.pass_context
def cli(
ctx: click.Context,
conn_type: str,
remote_host: str,
remote_port: int,
fake_it: bool,
network: bool,
) -> None:
if network and not remote_host:
raise click.UsageError("If sending to the network, provide a remote host")
if conn_type == "TCP":
connection_type = socket.SOCK_STREAM
else:
connection_type = socket.SOCK_DGRAM
sender = NMEASender(
connection_type=connection_type,
remote_host=remote_host,
remote_port=remote_port,
network=network,
)
ctx.obj["sender"] = sender
ctx.obj["fake_it"] = fake_it
@cli.command()
@click.pass_context
def errpm(ctx: click.Context):
while True:
if ctx.obj["fake_it"]:
rpm = random.randint(800, 1200)
else:
rpm = 1000 # Read the analogue pin here to get the pulses.
ctx.obj["sender"].send_errpm(source="E", engine_number=0, rpm=rpm, pitch=100)
time.sleep(2)
if __name__ == "__main__":
cli(obj={})