Source code for mbsim.core.server

"""
======
Server
======

This module manages loading the modbus Server or Slave.

Examples of severs can be seen in `example/server`_

.. _example/server: https://gitlab.com/nee2c/mbsim-core/-/tree/client/examples/server

When creating a prototype.

#. Create identity (Optional)
#. Create Sever Context. Default is to full space with all values set to 0
#. Create functions or coroutine functions and add tasks to run.
#. Start the server and tasks.  See `pymodbus server
   <https://pymodbus.readthedocs.io/en/latest/source/library/server.html#pymodbus.server.StartAsyncSerialServer>`_
   to see keyword arguments to pass to the start function.
"""
import logging

from pymodbus import __version__ as version
from pymodbus.datastore import (
    ModbusServerContext,
    ModbusSlaveContext,
    ModbusSparseDataBlock,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server import (
    ServerStop,
    StartAsyncSerialServer,
    StartAsyncTcpServer,
    StartAsyncUdpServer,
)

from mbsim.core.tasks import Task, getloop

DEFAULTCONF = {
    "tcp": {"context": None, "identity": None, "address": ("", 502), "ignore_missing_slaves": False},
    "udp": {"context": None, "identity": None, "address": ("", 502), "ignore_missing_slaves": False},
    "rtu": {
        "context": None,
        "identity": None,
        "port": "/dev/ttyS0",
        "buadrate": 19200,
        "bytesize": 8,
        "stopbits": 1,
        "parity": "N",
        "timeout": 0,
        "xonxoff": 0,
        "rtscts": 0,
    },
}

log = logging.getLogger(__name__)
log.debug("pymodbus version: %s", version)


[docs]def genDevice( name="mbsim", code="MB", url="https://gitlab.com/nee2c/mbsim", product="mbsim", model="mbsim", version="1.0.0", ): """ A function to create a identity. All parameters are key word arguments. :param name: Vendor Name. Defaults to `mbsim` :type name: str :param code: Product Code. Defaults to `MB` :type code: str :param url: Vendor URL. Defaults to `https://gitlab.com/nee2c/mbsim` :type url: str :param product: Product Name. Defaults to `mbsim` :type product: str :param model: Model Name. Defaults to `mbsim` :type model: str :param version: Version. Defaults to `1.0.0` :type version: str :return: Returns an identity for server """ identity = ModbusDeviceIdentification() identity.VendorName = name identity.ProductCode = code identity.VendorUrl = url identity.ProductName = product identity.ModelName = model identity.MajorMinorRevision = version log.debug("Generated identity: %s", identity.summary()) return identity
[docs]def genContext(context=None, single=True): """ A Function to return Context for Server. The context can be None. This will generate a context that all slavesid will use. All values set to 0 If context is an instance of ModbusServerContext, function returns the context Else dict of slaves with dict of address space("di", "co", "hr", "ir") with list or dict {offset: [reg0, reg1, ...]} dict Context example .. code:: {0: {"di": [0, 1, 0, 1, 0], "co": {0: [1, 0, 1], 100: [1]}, "hr": [123, 10], "ir": {123: [123, 0, 123]}}, ...} :param context: Server Context or dict of Slaves context or dict :param single: This makes all slave id maps to one Slave context :type single: bool """ if isinstance(context, ModbusServerContext): log.debug("Already a Server Context") return context if context is None: log.debug("Generating Server context: single: %s", single) return ModbusServerContext(slaves=ModbusSlaveContext(zero_mode=True), single=single) slaves = {} log.debug("Generating context from %s", context) for slaveid, slavecontext in context.items(): slaves[slaveid] = ModbusSlaveContext( **{key: ModbusSparseDataBlock(values=vals) for key, vals in slavecontext.items()}, zero_mode=True, ) return ModbusServerContext(slaves=slaves, single=single)
[docs]def start(server, loop=None, **kwargs): """ This is the function to start modbus server. Passes all kwargs though to server, if missing uses default. To see full list of keyword arguments for supported protocols see. - `rtu`_ - `tcp`_ - `udp`_ .. _rtu: https://pymodbus.readthedocs.io/en/latest/source/library/server.html#pymodbus.server.StartAsyncSerialServer .. _tcp: https://pymodbus.readthedocs.io/en/latest/source/library/server.html#pymodbus.server.StartAsyncTcpServer .. _udp: https://pymodbus.readthedocs.io/en/latest/source/library/server.html#pymodbus.server.StartAsyncUdpServer :param server: The protocol to run modbus server. :type server: str :param loop: The event loop and if none will use running loop or create new loop """ if server not in DEFAULTCONF.keys(): raise NotImplementedError("This server type {} is not Implemented".format(server)) loop = loop or getloop() serverargs = {**kwargs, **{key: val for key, val in DEFAULTCONF[server].items() if key not in kwargs.keys()}} if not serverargs.get("context"): serverargs["context"] = genContext() log.info("Starting %s server", server) log.debug("Server args: %s", str(serverargs)) log.debug("Starting Tasks") Task.startTasks(loop=loop) log.debug("Started Tasks") log.debug("Starting %s Server", server) if server == "tcp": loop.create_task(StartAsyncTcpServer(**serverargs)) elif server == "udp": loop.create_task(StartAsyncUdpServer(**serverargs)) elif server == "rtu": # pragma: no cover # there is a precondition check loop.create_task(StartAsyncSerialServer(**serverargs)) log.debug("Started Server") loop.run_forever() loop.close()
[docs]def stop(): """ Function to stop modbus server """ log.debug("Stopping Server") ServerStop() log.debug("Stopped Server") log.debug("Stopping Tasks") Task.stopTasks() log.debug("Stopped Tasks")