A small re-write of the sender into a Click driven CLI, making hard-codes into CLI arguments.

#!/usr/bin/env python3

import click
import random
import re
import socket
import time
from typing import Tuple

class NMEASender:
    def __init__(
        connection_type: socket.SocketKind,
        remote_host: str,
        remote_port: int,
        network: bool,
    ) -> None:
        self.host = remote_host
        self.port = remote_port
        self.conn_type = connection_type
        self.network = network

    def checksum_for(self, sentence: str) -> str:
        Sample sentence is $XXXXX,data,goes,here*
        No checksum present, this calculates it.
        chksumdata = re.sub(
            "(\n|\r\n)", "", sentence[sentence.find("$") + 1 : sentence.find("*")]
        csum = 0
        for c in chksumdata:
            csum ^= ord(c)
        # Mangle the result appropriately
        return str(hex(csum)).lstrip("0x").upper()

    def send_errpm(self, source: str, engine_number: int, rpm: int, pitch: int) -> None:
        Sends engine RPM as a NMEA 0183 sentence.
        if source not in ("E", "S"):
            raise Exception("Source must be E(ngine) or S(haft)")
        if engine_number not in (0, 1, 2):
            raise Exception(
                "Engine number must be 0 (centre), 1 (port) or 2 (starboard)"
        if rpm < 0:
            raise Exception("RPM cannot be negative")
        if pitch < -100 or pitch > 100:
            raise Exception("Pitch is a percentile number between -100 and 100")
                talker="ERRPM", values=(source, engine_number, rpm, pitch, "A")

    def compose_sentence(self, talker: str, values: Tuple) -> str:
        Given a talker name and a tuple of values, build a sentence of the form 
        s = f"${talker},{','.join([str(x) for x in values])}*"
        return f"{s}{self.checksum_for(s)}"

    def send_0183_sentence(self, sentence: str) -> None:
        # NMEA 0183 sentences must
        # * start with $
        # * contain comma-delimited values
        # * carry a * marker after the values
        # * have a hex checksum after the *
        # * end with \r\n
        # For python to be happy with sockets, encode as bytestring
        if not sentence.startswith("$"):
            raise Exception("Sentence does not start with a $")
        if not "*" in sentence:
            raise Exception("Sentence does not contain a *")

        print(f"[{self.host}:{self.port}] Sending {sentence}")
        if self.network:
            with socket.socket(socket.AF_INET, self.conn_type) as s:
                if s.type == socket.SocketKind.SOCK_DGRAM:
                    s.sendto(f"{sentence}\r\n".encode(), (self.host, self.port))
                elif s.type == socket.SocketKind.SOCK_STREAM:
                        s.connect((self.host, self.port))
                    except socket.timeout:
                        click.echo(f"[{self.host}:{self.port}] Socket timeout")
                    raise Exception("Expecting DGRAM or STREAM socket")

    type=click.Choice(("UDP", "TCP")),
    help="Choose UDP or TCP transport",
    "--remote-host", type=click.STRING, help="The remote host to send packets to"
    help="The listening port on the remote host",
@click.option("--fake-it/--read-sensors", default=False, help="Fake values for sending")
    help="Whether to actually send packets on the network",
def cli(
    ctx: click.Context,
    conn_type: str,
    remote_host: str,
    remote_port: int,
    fake_it: bool,
    network: bool,
) -> None:
    if network and not remote_host:
        raise click.UsageError("If sending to the network, provide a remote host")
    if conn_type == "TCP":
        connection_type = socket.SOCK_STREAM
        connection_type = socket.SOCK_DGRAM
    sender = NMEASender(
    ctx.obj["sender"] = sender
    ctx.obj["fake_it"] = fake_it

def errpm(ctx: click.Context):
    while True:
        if ctx.obj["fake_it"]:
            rpm = random.randint(800, 1200)
            rpm = 1000  # Read the analogue pin here to get the pulses.
        ctx.obj["sender"].send_errpm(source="E", engine_number=0, rpm=rpm, pitch=100)

if __name__ == "__main__":
Sending engine RPMs to the network, 3/x
Tagged on: