Commit c4cccaaf authored by Ivan Tyagov's avatar Ivan Tyagov

Add basic example for modbus client/server from...

Add basic example for modbus client/server from
parent af1db049
......@@ -47,7 +47,7 @@ olimex@a20-olinuxino:~$ sudo bash
root@a20-olinuxino:~# apt install htop tig git etckeeper vim curl python3 python3-venv python3-pip
root@a20-olinuxino:~# python3 -m pip install --upgrade pip setuptools wheel
root@a20-olinuxino:~# pip3 install pyA20Lime2
root@a20-olinuxino:~# pip3 install -U pymodbus
root@a20-olinuxino:~# pip3 install -U pymodbus[twisted]
# enable i2c-1 by running and selecting it (temporary step until fixed by Olimex but not needed for debian 10!)
root@a20-olinuxino:~# olinuxino-overlay
#!/usr/bin/env python
Pymodbus Synchronous Client Examples
The following is an example of how to use the synchronous modbus client
implementation from pymodbus.
It should be noted that the client can also be used with
the guard construct that is available in python 2.5 and up::
with ModbusClient('') as client:
result = client.read_coils(1,10)
print result
import struct
# --------------------------------------------------------------------------- #
# import the various server implementations
# --------------------------------------------------------------------------- #
from pymodbus.pdu import ModbusRequest, ModbusResponse, ModbusExceptions
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.bit_read_message import ReadCoilsRequest
from pymodbus.compat import int2byte, byte2int
# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #
import logging
log = logging.getLogger()
# --------------------------------------------------------------------------- #
# create your custom message
# --------------------------------------------------------------------------- #
# The following is simply a read coil request that always reads 16 coils.
# Since the function code is already registered with the decoder factory,
# this will be decoded as a read coil response. If you implement a new
# method that is not currently implemented, you must register the request
# and response with a ClientDecoder factory.
# --------------------------------------------------------------------------- #
class CustomModbusResponse(ModbusResponse):
function_code = 55
_rtu_byte_count_pos = 2
def __init__(self, values=None, **kwargs):
ModbusResponse.__init__(self, **kwargs)
self.values = values or []
def encode(self):
""" Encodes response pdu
:returns: The encoded packet message
result = int2byte(len(self.values) * 2)
for register in self.values:
result += struct.pack('>H', register)
return result
def decode(self, data):
""" Decodes response pdu
:param data: The packet data to decode
byte_count = byte2int(data[0])
self.values = []
for i in range(1, byte_count + 1, 2):
self.values.append(struct.unpack('>H', data[i:i + 2])[0])
class CustomModbusRequest(ModbusRequest):
function_code = 55
_rtu_frame_size = 8
def __init__(self, address=None, **kwargs):
ModbusRequest.__init__(self, **kwargs)
self.address = address
self.count = 16
def encode(self):
return struct.pack('>HH', self.address, self.count)
def decode(self, data):
self.address, self.count = struct.unpack('>HH', data)
def execute(self, context):
if not (1 <= self.count <= 0x7d0):
return self.doException(ModbusExceptions.IllegalValue)
if not context.validate(self.function_code, self.address, self.count):
return self.doException(ModbusExceptions.IllegalAddress)
values = context.getValues(self.function_code, self.address,
return CustomModbusResponse(values)
# --------------------------------------------------------------------------- #
# This could also have been defined as
# --------------------------------------------------------------------------- #
class Read16CoilsRequest(ReadCoilsRequest):
def __init__(self, address, **kwargs):
""" Initializes a new instance
:param address: The address to start reading from
ReadCoilsRequest.__init__(self, address, 16, **kwargs)
# --------------------------------------------------------------------------- #
# execute the request with your client
# --------------------------------------------------------------------------- #
# using the with context, the client will automatically be connected
# and closed when it leaves the current scope.
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
with ModbusClient(host='localhost', port=5020) as client:
request = CustomModbusRequest(1, unit=1)
result = client.execute(request)
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient('localhost')
client.write_coil(1, True)
result = client.read_coils(1,1)
#!/usr/bin/env python
Pymodbus Asynchronous Server Example
The asynchronous server is a high performance implementation using the
twisted library as its backend. This allows it to scale to many thousands
of nodes which can be helpful for testing monitoring software.
# --------------------------------------------------------------------------- #
# import the various server implementations
# --------------------------------------------------------------------------- #
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.server.asynchronous import StartUdpServer
from pymodbus.server.asynchronous import StartSerialServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import (ModbusRtuFramer,
from custom_message import CustomModbusRequest
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
import logging
FORMAT = ('%(asctime)-15s %(threadName)-15s'
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
log = logging.getLogger()
def run_async_server():
# ----------------------------------------------------------------------- #
# initialize your data store
# ----------------------------------------------------------------------- #
# The datastores only respond to the addresses that they are initialized to
# Therefore, if you initialize a DataBlock to addresses from 0x00 to 0xFF,
# a request to 0x100 will respond with an invalid address exception.
# This is because many devices exhibit this kind of behavior (but not all)
# block = ModbusSequentialDataBlock(0x00, [0]*0xff)
# Continuing, you can choose to use a sequential or a sparse DataBlock in
# your data context. The difference is that the sequential has no gaps in
# the data while the sparse can. Once again, there are devices that exhibit
# both forms of behavior::
# block = ModbusSparseDataBlock({0x00: 0, 0x05: 1})
# block = ModbusSequentialDataBlock(0x00, [0]*5)
# Alternately, you can use the factory methods to initialize the DataBlocks
# or simply do not pass them to have them initialized to 0x00 on the full
# address range::
# store = ModbusSlaveContext(di = ModbusSequentialDataBlock.create())
# store = ModbusSlaveContext()
# Finally, you are allowed to use the same DataBlock reference for every
# table or you you may use a seperate DataBlock for each table.
# This depends if you would like functions to be able to access and modify
# the same data or not::
# block = ModbusSequentialDataBlock(0x00, [0]*0xff)
# store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
# The server then makes use of a server context that allows the server to
# respond with different slave contexts for different unit ids. By default
# it will return the same context for every unit id supplied (broadcast
# mode).
# However, this can be overloaded by setting the single flag to False
# and then supplying a dictionary of unit id to context mapping::
# slaves = {
# 0x01: ModbusSlaveContext(...),
# 0x02: ModbusSlaveContext(...),
# 0x03: ModbusSlaveContext(...),
# }
# context = ModbusServerContext(slaves=slaves, single=False)
# The slave context can also be initialized in zero_mode which means that a
# request to address(0-7) will map to the address (0-7). The default is
# False which is based on section 4.4 of the specification, so address(0-7)
# will map to (1-8)::
# store = ModbusSlaveContext(..., zero_mode=True)
# ----------------------------------------------------------------------- #
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17]*100),
co=ModbusSequentialDataBlock(0, [17]*100),
hr=ModbusSequentialDataBlock(0, [17]*100),
ir=ModbusSequentialDataBlock(0, [17]*100))
store.register(CustomModbusRequest.function_code, 'cm',
ModbusSequentialDataBlock(0, [17] * 100))
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'Pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = ''
identity.ProductName = 'Pymodbus Server'
identity.ModelName = 'Pymodbus Server'
identity.MajorMinorRevision = '2.3.0'
# ----------------------------------------------------------------------- #
# run the server you want
# ----------------------------------------------------------------------- #
# TCP Server
StartTcpServer(context, identity=identity, address=("localhost", 502),
# TCP Server with deferred reactor run
# from twisted.internet import reactor
# StartTcpServer(context, identity=identity, address=("localhost", 5020),
# defer_reactor_run=True)
# Server with RTU framer
# StartTcpServer(context, identity=identity, address=("localhost", 5020),
# framer=ModbusRtuFramer)
# UDP Server
# StartUdpServer(context, identity=identity, address=("", 5020))
# RTU Server
# StartSerialServer(context, identity=identity,
# port='/dev/ttyp0', framer=ModbusRtuFramer)
# ASCII Server
# StartSerialServer(context, identity=identity,
# port='/dev/ttyp0', framer=ModbusAsciiFramer)
# Binary Server
# StartSerialServer(context, identity=identity,
# port='/dev/ttyp0', framer=ModbusBinaryFramer)
if __name__ == "__main__":
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment