Modbus Serial Master Jamodo

Posted on by admin

For most unix based systems, there is a simple virtual serial forwarding application in the tools/nullmodem/ directory: make run # connect the master to the master output # connect the client to the client output Or for a tried and true method, simply connect a null modem cable between two of your serial ports and then simply reference those. Modbus tester for serial communication diagnostics. Home About MODBUS About Enron MODBUS RTU Master RTU Slave TCP Client manual1 manual7 manual8 Download Purchase. Modbus Device Directory. The Modbus Organization maintains a database of Modbus devices as a service to users looking for such devices for their applications. Feb 23, 2016. Jamod RS-485 Serial master. Forum: Open Discussion. Creator: Karthik. Created: 2011-12-31. Updated: 2013-05-28. Karthik - 2011-12-31. Hi all, i'm newbie to Java modbus Serial Communication. I was successful with RS232 but_ i'm struggling with RS485._. Can anyone tell me what are the steps to do for RS485.

Permalink

Join GitHub today

GitHub is home to over 40 million developers working together to host and review code, manage projects, and build software together.

Sign up
Find file Copy path
1 contributor
#Source: https://github.com/pycom/pycom-modbus/tree/master/uModbus (2018-07-16)
#This file has been modified and differ from its source version.
import uModBusFunctions as functions
import uModBusConst as Const
from machine importUART
from machine import Pin
import struct
import time
import machine
classuModBusSerial:
def__init__(self, uart_id, baudrate=9600, data_bits=8, stop_bits=1, parity=None, pins=None, ctrl_pin=None):
pinsLen=len(pins)
if pinsNoneor pinsLen<2or pinsLen>4or pinsLen3:
raiseValueError('pins should contain pin names/numbers for: tx, rx, [rts, cts]')
tx=pins[0]
rx=pins[1]
if pinsLen4:
rts=pins[2]
cts=pins[3]
self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity,
stop=stop_bits, timeout_char=10, tx=tx, rx=rx, rts=rts, cts=cts)
else:
self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity,
stop=stop_bits, timeout_char=10, tx=tx, rx=rx)
#self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity,
# stop=stop_bits, timeout_chars=10, pins=pins)
if ctrl_pin isnotNone:
self._ctrlPin = Pin(ctrl_pin, mode=Pin.OUT)
else:
self._ctrlPin =None
self.char_time_ms = (1000* (data_bits + stop_bits +2)) // baudrate
def_calculate_crc16(self, data):
crc =0xFFFF
for char in data:
crc = (crc >>8) ^ Const.CRC16_TABLE[((crc) ^ char) &0xFF]
return struct.pack('<H',crc)
def_bytes_to_bool(self, byte_list):
bool_list = []
for index, byte inenumerate(byte_list):
bool_list.extend([bool(byte & (1<< n)) for n inrange(8)])
return bool_list
def_to_short(self, byte_array, signed=True):
response_quantity =int(len(byte_array) /2)
fmt ='>'+ (('h'if signed else'H') * response_quantity)
return struct.unpack(fmt, byte_array)
def_exit_read(self, response):
if response[1] >= Const.ERROR_BIAS:
iflen(response) < Const.ERROR_RESP_LEN:
returnFalse
elif (Const.READ_COILS<= response[1] <= Const.READ_INPUT_REGISTER):
expected_len = Const.RESPONSE_HDR_LENGTH+1+ response[2] + Const.CRC_LENGTH
iflen(response) < expected_len:
returnFalse
eliflen(response) < Const.FIXED_RESP_LEN:
returnFalse
returnTrue
def_uart_read(self):
response =bytearray()
for x inrange(1, 40):
ifself._uart.any():
response.extend(self._uart.read())
#response.extend(self._uart.readall())
# variable length function codes may require multiple reads
ifself._exit_read(response):
break
time.sleep(0.05)
return response
def_send_receive(self, modbus_pdu, slave_addr, count):
serial_pdu =bytearray()
serial_pdu.append(slave_addr)
serial_pdu.extend(modbus_pdu)
crc =self._calculate_crc16(serial_pdu)
serial_pdu.extend(crc)
# flush the Rx FIFO
self._uart.read()
ifself._ctrlPin:
self._ctrlPin(1)
self._uart.write(serial_pdu)
ifself._ctrlPin:
whilenotself._uart.wait_tx_done(2):
machine.idle()
time.sleep_ms(1+self.char_time_ms)
self._ctrlPin(0)
returnself._validate_resp_hdr(self._uart_read(), slave_addr, modbus_pdu[0], count)
def_validate_resp_hdr(self, response, slave_addr, function_code, count):
iflen(response) 0:
raiseOSError('no data received from slave')
resp_crc = response[-Const.CRC_LENGTH:]
expected_crc =self._calculate_crc16(response[0:len(response) - Const.CRC_LENGTH])
if (resp_crc[0] != expected_crc[0]) or (resp_crc[1] != expected_crc[1]):
raiseOSError('invalid response CRC')
if (response[0] != slave_addr):
raiseValueError('wrong slave address')
if (response[1] (function_code + Const.ERROR_BIAS)):
raiseValueError('slave returned exception code: {:d}'.format(response[2]))
hdr_length = (Const.RESPONSE_HDR_LENGTH+1) if count else Const.RESPONSE_HDR_LENGTH
return response[hdr_length : len(response) - Const.CRC_LENGTH]
defread_coils(self, slave_addr, starting_addr, coil_qty):
modbus_pdu = functions.read_coils(starting_addr, coil_qty)
resp_data =self._send_receive(modbus_pdu, slave_addr, True)
status_pdu =self._bytes_to_bool(resp_data)
return status_pdu
defread_discrete_inputs(self, slave_addr, starting_addr, input_qty):
modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty)
resp_data =self._send_receive(modbus_pdu, slave_addr, True)
status_pdu =self._bytes_to_bool(resp_data)
return status_pdu
defread_holding_registers(self, slave_addr, starting_addr, register_qty, signed=True):
modbus_pdu = functions.read_holding_registers(starting_addr, register_qty)
resp_data =self._send_receive(modbus_pdu, slave_addr, True)
register_value =self._to_short(resp_data, signed)
return register_value
defread_input_registers(self, slave_addr, starting_address, register_quantity, signed=True):
modbus_pdu = functions.read_input_registers(starting_address, register_quantity)
resp_data =self._send_receive(modbus_pdu, slave_addr, True)
register_value =self._to_short(resp_data, signed)
return register_value
defwrite_single_coil(self, slave_addr, output_address, output_value):
modbus_pdu = functions.write_single_coil(output_address, output_value)
resp_data =self._send_receive(modbus_pdu, slave_addr, False)
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_COIL,
output_address, value=output_value, signed=False)
return operation_status
defwrite_single_register(self, slave_addr, register_address, register_value, signed=True):
modbus_pdu = functions.write_single_register(register_address, register_value, signed)
resp_data =self._send_receive(modbus_pdu, slave_addr, False)
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_REGISTER,
register_address, value=register_value, signed=signed)
return operation_status
defwrite_multiple_coils(self, slave_addr, starting_address, output_values):
modbus_pdu = functions.write_multiple_coils(starting_address, output_values)
resp_data =self._send_receive(modbus_pdu, slave_addr, False)
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_COILS,
starting_address, quantity=len(output_values))
return operation_status
defwrite_multiple_registers(self, slave_addr, starting_address, register_values, signed=True):
modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed)
resp_data =self._send_receive(modbus_pdu, slave_addr, False)
operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_REGISTERS,
starting_address, quantity=len(register_values))
return operation_status
defclose(self):
ifself._uart None:
return
try:
self._uart.deinit()
exceptException:
pass

Modbus Serial Master Jamodo Online

Modbus
  • Copy lines
  • Copy permalink