Commit 8cd43f94 authored by Ivan Tyagov's avatar Ivan Tyagov

Remove not longer maintained modbus implementation.

parent 921c4183
#!/usr/bin/env python3
"""
Modbus server implementation for Lime2 -> MOD-IO which controls relays.
root@a20:~/osie/examples/modbus# pymodbus.console tcp --host localhost --port 502
To switch on relay 1:
> client.write_registers address=0 values=1 unit=1
To switch OFF:
> client.write_registers address=0 values=0 unit=1
"""
import os
import sys
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.server.asynchronous import StartUdpServer
from pymodbus.server.asynchronous import StartSerialServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import (ModbusRtuFramer,
ModbusAsciiFramer,
ModbusBinaryFramer)
from pymodbus.pdu import ModbusRequest
from pyA20Lime2 import i2c
import bitarray
from bitarray.util import ba2int
from argparse import ArgumentParser
# XXX: is it needed actually, examine
ModbusRequest.function_code = 55
# name fo device within Lime2
DEFAULT_MOD_IO_DEVICE_NAME = "/dev/i2c-1"
class Lime2MODIOI2c:
"""
Class to communication from Lime2 -> MOD-IO (over I2c)
Keep relay state internally as take into account to preserver relay state!
"""
# state of 4 relays of mod-io, by default OFF
# added 8 as their format is 8 bit and maybe they extend mod-io
# only last 4 bits are used
relay_state_list = [0, 0, 0, 0, 0, 0, 0, 0]
def __init__(self, mod_io_device_name = DEFAULT_MOD_IO_DEVICE_NAME):
self.mod_io_device_name = mod_io_device_name
def open(self):
"""
Open connection to MOD-IO.
"""
i2c.init(self.mod_io_device_name)
i2c.open(0x58)
def close(self):
"""
Close connection to MOD-IO.
"""
i2c.close()
def write(self, code):
"""
Write / send to device.
"""
self.open()
while 1:
try:
i2c.write([0x10, code])
break
except:
print("Failed to sent command: %s" %code)
# be a good citizen and close
self.close()
def setRelayState(self, relay_number, relay_state):
"""
Set relay state.
relay_number: 0 - 3
relay_state: 0 (off), 1 (on)
"""
if relay_state not in (0, 1):
raise ValueError("Incorect relay state!")
if relay_number not in (0, 1, 2, 3):
raise ValueError("Incorrect relay number!")
# relay1 is the most right element in list / byte command representation
self.relay_state_list[-(1 + relay_number)] = relay_state
# generate proper relay state command of all 4 relays
# which doesn't override any other relays' state
# which itself is kept in this app's RAM
ba = bitarray.bitarray(self.relay_state_list)
self.write(ba2int(ba))
def setRelayStateAllOff(self):
"""
Switch off all relays.
"""
self.relay_state_list = [0, 0, 0, 0, 0, 0, 0, 0]
ba = bitarray.bitarray(self.relay_state_list)
self.write(ba2int(ba))
def setRelayStateAllOn(self):
"""
Switch on all relays.
"""
self.relay_state_list = [0, 0, 0, 0, 1, 1, 1, 1]
ba = bitarray.bitarray(self.relay_state_list)
self.write(ba2int(ba))
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
import logging
FORMAT = ('%(asctime)-15s %(threadName)-15s'
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# argument parsing
parser = ArgumentParser()
parser.add_argument("-p", "--port", dest="port",
default = 502,
type = int,
help="The port modbus server will listen to.", metavar="PORT")
parser.add_argument("-i", "--interface", dest="interface",
default = "0.0.0.0",
type = str,
help="The interface modbus server will bind to.", metavar="INTERFACE")
args = parser.parse_args()
PORT = args.port
INTERFACE = args.interface
# main class to communicate with MOD-IO
mod_io = Lime2MODIOI2c()
class LimeModbusSlaveContext(ModbusSlaveContext):
"""
Control Lime2 -> Mod-io ->relay {1..4}
"""
def setValues(self, fx, address, values):
if not self.zero_mode:
address = address + 1
log.debug("setValues[%d] %d:%s" % (fx, address, values))
self.store[self.decode(fx)].setValues(address, values)
# control relays
value = int(values[0])
log.debug(address)
log.debug(value)
# switch relays at MOD-IO
mod_io.setRelayState(address, value)
def run_async_server(port = 502, interface = "0.0.0.0"):
store = LimeModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0]*10),
co=ModbusSequentialDataBlock(0, [0]*10),
hr=ModbusSequentialDataBlock(0, [0]*10),
ir=ModbusSequentialDataBlock(0, [0]*10), zero_mode = True)
store.register(ModbusRequest.function_code, 'cm',
ModbusSequentialDataBlock(0, [17] * 100))
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'Nexedi'
identity.ProductCode = 'OSIE-coupler'
identity.VendorUrl = 'https://lab.nexedi.com/nexedi/osie/'
identity.ProductName = 'Pymodbus Server'
identity.ModelName = 'Pymodbus Server'
identity.MajorMinorRevision = '0.0.1'
# TCP Server
StartTcpServer(context, identity=identity, address=(interface, port),
custom_functions=[ModbusRequest])
def main():
# check if we actually are running on proper platform
if os.uname().machine != "armv7l":
log.error("Not supported platform and / or CPU type.")
sys.exit(1)
# switch OFF all relays
mod_io.setRelayStateAllOff()
# run modbus "server"
run_async_server(interface = INTERFACE, port = PORT)
# switch off all
mod_io.setRelayStateAllOff()
if __name__ == "__main__":
main()
"""
Implement a basic OCR app that can find countours in image and
based on number of countours control a modbus server remotely.
This is a POC.
"""
import cv2
import numpy as np
import pymodbus
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from time import sleep
from datetime import datetime
import os
import time
UNIT = 0x1
OSIE_PLC_ADDRESS = "localhost" # "192.168.0.48" for real machine
OSIE_PLC_PORT = 502
ROOT_FOLDER_PATH = "/mnt/flash"
def nothing(x):
# any operation
pass
def setConveyorState(state):
"""
Send to modbus server command to start (state 1)
or stop (state 0) conveyor.
"""
client = ModbusClient(OSIE_PLC_ADDRESS, port=OSIE_PLC_PORT)
client.connect()
client.write_coils(0, [state], unit=UNIT)
client.close()
def openAndCloseAirValve(seconds=0.05):
"""
Send to modbus server command to open and close the air valve for
a specified amount of time in seconds.
"""
client = ModbusClient(OSIE_PLC_ADDRESS, port=OSIE_PLC_PORT)
client.connect()
#rr = client.read_coils(1, 1, unit=UNIT)
client.write_coils(1, [True], unit=UNIT)
sleep(seconds)
client.write_coils(1, [False], unit=UNIT)
client.close()
def storeFrame(frame):
"""
Store a video frame in locally.
"""
folder_path = "%s/%s" %(ROOT_FOLDER_PATH, datetime.today().strftime('%Y-%m-%d'))
if not os.path.isdir(folder_path):
os.mkdir(folder_path)
millis = int(round(time.time() * 1000))
file_path = "%s/%s.jpg" %(folder_path, millis)
cv2.imwrite(file_path, frame)
#f = open(file_path, "w")
#f.write(file_content)
#f.close()
# start conveyor
setConveyorState(1)
cap = cv2.VideoCapture(1)
cv2.namedWindow("Trackbars")
cv2.createTrackbar("L-H", "Trackbars", 0, 180, nothing)
cv2.createTrackbar("L-S", "Trackbars", 66, 255, nothing)
cv2.createTrackbar("L-V", "Trackbars", 134, 255, nothing)
cv2.createTrackbar("U-H", "Trackbars", 180, 180, nothing)
cv2.createTrackbar("U-S", "Trackbars", 255, 255, nothing)
cv2.createTrackbar("U-V", "Trackbars", 243, 255, nothing)
font = cv2.FONT_HERSHEY_COMPLEX
while True:
_, frame = cap.read()
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
l_h = cv2.getTrackbarPos("L-H", "Trackbars")
l_s = cv2.getTrackbarPos("L-S", "Trackbars")
l_v = cv2.getTrackbarPos("L-V", "Trackbars")
u_h = cv2.getTrackbarPos("U-H", "Trackbars")
u_s = cv2.getTrackbarPos("U-S", "Trackbars")
u_v = cv2.getTrackbarPos("U-V", "Trackbars")
lower_red = np.array([l_h, l_s, l_v])
upper_red = np.array([u_h, u_s, u_v])
mask = cv2.inRange(hsv, lower_red, upper_red)
kernel = np.ones((5, 5), np.uint8)
mask = cv2.erode(mask, kernel)
# Contours detection
if int(cv2.__version__[0]) > 3:
# Opencv 4.x.x
contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
else:
# Opencv 3.x.x
_, contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
approx = cv2.approxPolyDP(cnt, 0.02*cv2.arcLength(cnt, True), True)
x = approx.ravel()[0]
y = approx.ravel()[1]
if area > 400:
cv2.drawContours(frame, [approx], 0, (0, 0, 0), 5)
if len(approx) == 3:
cv2.putText(frame, "Triangle", (x, y), font, 1, (0, 0, 0))
storeFrame(frame)
openAndCloseAirValve()
elif len(approx) == 4:
cv2.putText(frame, "Rectangle", (x, y), font, 1, (0, 0, 0))
storeFrame(frame)
openAndCloseAirValve()
elif 10 < len(approx) < 20:
cv2.putText(frame, "Circle-fire!", (x, y), font, 1, (0, 0, 0))
storeFrame(frame)
openAndCloseAirValve()
cv2.imshow("Frame", frame)
cv2.imshow("Mask", mask)
key = cv2.waitKey(1)
if key == 27:
break
cap.release()
cv2.destroyAllWindows()
# stop conveyor
setConveyorState(0)
"""
Install OSIE's coupler.
"""
from setuptools import setup
setup(
name='osie_coupler',
version='0.1',
entry_points={'console_scripts': ['osie_coupler_modbus = osie_coupler.osie_modbus:main']},
install_requires=['pyA20Lime2', 'pymodbus[twisted]', 'click', 'prompt_toolkit', 'pygments', 'bitarray']
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment