Modbus Serial Master Jamodo
For most unix based systems, there is a simple virtual serial forwarding application in the tools/nullmodem/ directory: make run # connect the master to the master output # connect the client to the client output Or for a tried and true method, simply connect a null modem cable between two of your serial ports and then simply reference those. Modbus tester for serial communication diagnostics. Home About MODBUS About Enron MODBUS RTU Master RTU Slave TCP Client manual1 manual7 manual8 Download Purchase. Modbus Device Directory. The Modbus Organization maintains a database of Modbus devices as a service to users looking for such devices for their applications. Feb 23, 2016. Jamod RS-485 Serial master. Forum: Open Discussion. Creator: Karthik. Created: 2011-12-31. Updated: 2013-05-28. Karthik - 2011-12-31. Hi all, i'm newbie to Java modbus Serial Communication. I was successful with RS232 but_ i'm struggling with RS485._. Can anyone tell me what are the steps to do for RS485.
PermalinkJoin GitHub today
GitHub is home to over 40 million developers working together to host and review code, manage projects, and build software together.
Sign up1 contributor
#Source: https://github.com/pycom/pycom-modbus/tree/master/uModbus (2018-07-16) |
#This file has been modified and differ from its source version. |
import uModBusFunctions as functions |
import uModBusConst as Const |
from machine importUART |
from machine import Pin |
import struct |
import time |
import machine |
classuModBusSerial: |
def__init__(self, uart_id, baudrate=9600, data_bits=8, stop_bits=1, parity=None, pins=None, ctrl_pin=None): |
pinsLen=len(pins) |
if pinsNoneor pinsLen<2or pinsLen>4or pinsLen3: |
raiseValueError('pins should contain pin names/numbers for: tx, rx, [rts, cts]') |
tx=pins[0] |
rx=pins[1] |
if pinsLen4: |
rts=pins[2] |
cts=pins[3] |
self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity, |
stop=stop_bits, timeout_char=10, tx=tx, rx=rx, rts=rts, cts=cts) |
else: |
self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity, |
stop=stop_bits, timeout_char=10, tx=tx, rx=rx) |
#self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity, |
# stop=stop_bits, timeout_chars=10, pins=pins) |
if ctrl_pin isnotNone: |
self._ctrlPin = Pin(ctrl_pin, mode=Pin.OUT) |
else: |
self._ctrlPin =None |
self.char_time_ms = (1000* (data_bits + stop_bits +2)) // baudrate |
def_calculate_crc16(self, data): |
crc =0xFFFF |
for char in data: |
crc = (crc >>8) ^ Const.CRC16_TABLE[((crc) ^ char) &0xFF] |
return struct.pack('<H',crc) |
def_bytes_to_bool(self, byte_list): |
bool_list = [] |
for index, byte inenumerate(byte_list): |
bool_list.extend([bool(byte & (1<< n)) for n inrange(8)]) |
return bool_list |
def_to_short(self, byte_array, signed=True): |
response_quantity =int(len(byte_array) /2) |
fmt ='>'+ (('h'if signed else'H') * response_quantity) |
return struct.unpack(fmt, byte_array) |
def_exit_read(self, response): |
if response[1] >= Const.ERROR_BIAS: |
iflen(response) < Const.ERROR_RESP_LEN: |
returnFalse |
elif (Const.READ_COILS<= response[1] <= Const.READ_INPUT_REGISTER): |
expected_len = Const.RESPONSE_HDR_LENGTH+1+ response[2] + Const.CRC_LENGTH |
iflen(response) < expected_len: |
returnFalse |
eliflen(response) < Const.FIXED_RESP_LEN: |
returnFalse |
returnTrue |
def_uart_read(self): |
response =bytearray() |
for x inrange(1, 40): |
ifself._uart.any(): |
response.extend(self._uart.read()) |
#response.extend(self._uart.readall()) |
# variable length function codes may require multiple reads |
ifself._exit_read(response): |
break |
time.sleep(0.05) |
return response |
def_send_receive(self, modbus_pdu, slave_addr, count): |
serial_pdu =bytearray() |
serial_pdu.append(slave_addr) |
serial_pdu.extend(modbus_pdu) |
crc =self._calculate_crc16(serial_pdu) |
serial_pdu.extend(crc) |
# flush the Rx FIFO |
self._uart.read() |
ifself._ctrlPin: |
self._ctrlPin(1) |
self._uart.write(serial_pdu) |
ifself._ctrlPin: |
whilenotself._uart.wait_tx_done(2): |
machine.idle() |
time.sleep_ms(1+self.char_time_ms) |
self._ctrlPin(0) |
returnself._validate_resp_hdr(self._uart_read(), slave_addr, modbus_pdu[0], count) |
def_validate_resp_hdr(self, response, slave_addr, function_code, count): |
iflen(response) 0: |
raiseOSError('no data received from slave') |
resp_crc = response[-Const.CRC_LENGTH:] |
expected_crc =self._calculate_crc16(response[0:len(response) - Const.CRC_LENGTH]) |
if (resp_crc[0] != expected_crc[0]) or (resp_crc[1] != expected_crc[1]): |
raiseOSError('invalid response CRC') |
if (response[0] != slave_addr): |
raiseValueError('wrong slave address') |
if (response[1] (function_code + Const.ERROR_BIAS)): |
raiseValueError('slave returned exception code: {:d}'.format(response[2])) |
hdr_length = (Const.RESPONSE_HDR_LENGTH+1) if count else Const.RESPONSE_HDR_LENGTH |
return response[hdr_length : len(response) - Const.CRC_LENGTH] |
defread_coils(self, slave_addr, starting_addr, coil_qty): |
modbus_pdu = functions.read_coils(starting_addr, coil_qty) |
resp_data =self._send_receive(modbus_pdu, slave_addr, True) |
status_pdu =self._bytes_to_bool(resp_data) |
return status_pdu |
defread_discrete_inputs(self, slave_addr, starting_addr, input_qty): |
modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty) |
resp_data =self._send_receive(modbus_pdu, slave_addr, True) |
status_pdu =self._bytes_to_bool(resp_data) |
return status_pdu |
defread_holding_registers(self, slave_addr, starting_addr, register_qty, signed=True): |
modbus_pdu = functions.read_holding_registers(starting_addr, register_qty) |
resp_data =self._send_receive(modbus_pdu, slave_addr, True) |
register_value =self._to_short(resp_data, signed) |
return register_value |
defread_input_registers(self, slave_addr, starting_address, register_quantity, signed=True): |
modbus_pdu = functions.read_input_registers(starting_address, register_quantity) |
resp_data =self._send_receive(modbus_pdu, slave_addr, True) |
register_value =self._to_short(resp_data, signed) |
return register_value |
defwrite_single_coil(self, slave_addr, output_address, output_value): |
modbus_pdu = functions.write_single_coil(output_address, output_value) |
resp_data =self._send_receive(modbus_pdu, slave_addr, False) |
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_COIL, |
output_address, value=output_value, signed=False) |
return operation_status |
defwrite_single_register(self, slave_addr, register_address, register_value, signed=True): |
modbus_pdu = functions.write_single_register(register_address, register_value, signed) |
resp_data =self._send_receive(modbus_pdu, slave_addr, False) |
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_REGISTER, |
register_address, value=register_value, signed=signed) |
return operation_status |
defwrite_multiple_coils(self, slave_addr, starting_address, output_values): |
modbus_pdu = functions.write_multiple_coils(starting_address, output_values) |
resp_data =self._send_receive(modbus_pdu, slave_addr, False) |
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_COILS, |
starting_address, quantity=len(output_values)) |
return operation_status |
defwrite_multiple_registers(self, slave_addr, starting_address, register_values, signed=True): |
modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed) |
resp_data =self._send_receive(modbus_pdu, slave_addr, False) |
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_REGISTERS, |
starting_address, quantity=len(register_values)) |
return operation_status |
defclose(self): |
ifself._uart None: |
return |
try: |
self._uart.deinit() |
exceptException: |
pass |
Modbus Serial Master Jamodo Online
Copy lines Copy permalink