from abc import ABCMeta, abstractmethod import binascii ''' ABCMeta = type def abstractmethod(func): return func ''' class FinsPLCMemoryAreas: def __init__(self): #"""Hex code for memory areas # #Each memory area has a corresponding hex code for word access, bit access #forced word access and forced bit access. This class provides name-based #access to them. #""" self.CIO_BIT=binascii.hexlify('\x30') self.WORK_BIT=binascii.hexlify('\x31') self.HOLDING_BIT=binascii.hexlify('\x32') self.AUXILIARY_BIT=binascii.hexlify('\x33') self.CIO_BIT_FORCED=binascii.hexlify('\x70') self.WORK_BIT_FORCED=binascii.hexlify('\x71') self.HOLDING_BIT_FORCED=binascii.hexlify('\x72') self.CIO_WORD=binascii.hexlify('\xB0') self.WORK_WORD=binascii.hexlify('\xB1') self.HOLDING_WORD=binascii.hexlify('\xB2') self.AUXILIARY_WORD=binascii.hexlify('\xB3') self.CIO_WORD_FORCED=binascii.hexlify('\xF0') self.WORK_WORD_FORCED=binascii.hexlify('\xF1') self.HOLDING_WORD_FORCED=binascii.hexlify('\xF2') self.TIMER_FLAG=binascii.hexlify('\x09') self.COUNTER_FLAG=binascii.hexlify('\x09') self.TIMER_FLAG_FORCED=binascii.hexlify('\x49') self.COUNTER_FLAG_FORCED=binascii.hexlify('\x49') self.TIMER_WORD=binascii.hexlify('\x89') self.COUNTER_WORD=binascii.hexlify('\x89') self.DATA_MEMORY_BIT=binascii.hexlify('\x02') self.DATA_MEMORY_WORD=binascii.hexlify('\x82') self.EM0_BIT=binascii.hexlify('\x20') self.EM1_BIT=binascii.hexlify('\x21') self.EM2_BIT=binascii.hexlify('\x22') self.EM3_BIT=binascii.hexlify('\x23') self.EM4_BIT=binascii.hexlify('\x24') self.EM5_BIT=binascii.hexlify('\x25') self.EM6_BIT=binascii.hexlify('\x26') self.EM7_BIT=binascii.hexlify('\x27') self.EM8_BIT=binascii.hexlify('\x28') self.EM9_BIT=binascii.hexlify('\x29') self.EMA_BIT=binascii.hexlify('\x2A') self.EMB_BIT=binascii.hexlify('\x2B') self.EMC_BIT=binascii.hexlify('\x2C') self.EMD_BIT=binascii.hexlify('\x2D') self.EME_BIT=binascii.hexlify('\x2E') self.EMF_BIT=binascii.hexlify('\x2F') self.EM10_BIT=binascii.hexlify('\xE0') self.EM11_BIT=binascii.hexlify('\xE1') self.EM12_BIT=binascii.hexlify('\xE2') self.EM13_BIT=binascii.hexlify('\xE3') self.EM14_BIT=binascii.hexlify('\xE4') self.EM15_BIT=binascii.hexlify('\xE5') self.EM16_BIT=binascii.hexlify('\xE6') self.EM17_BIT=binascii.hexlify('\xE7') self.EM18_BIT=binascii.hexlify('\xE8') self.EM0_WORD=binascii.hexlify('\xA0') self.EM1_WORD=binascii.hexlify('\xA1') self.EM2_WORD=binascii.hexlify('\xA2') self.EM3_WORD=binascii.hexlify('\xA3') self.EM4_WORD=binascii.hexlify('\xA4') self.EM5_WORD=binascii.hexlify('\xA5') self.EM6_WORD=binascii.hexlify('\xA6') self.EM7_WORD=binascii.hexlify('\xA7') self.EM8_WORD=binascii.hexlify('\xA8') self.EM9_WORD=binascii.hexlify('\xA9') self.EMA_WORD=binascii.hexlify('\xAA') self.EMB_WORD=binascii.hexlify('\xAB') self.EMC_WORD=binascii.hexlify('\xAC') self.EMD_WORD=binascii.hexlify('\xAD') self.EME_WORD=binascii.hexlify('\xAE') self.EMF_WORD=binascii.hexlify('\xAF') self.EM10_WORD=binascii.hexlify('\x60') self.EM11_WORD=binascii.hexlify('\x61') self.EM12_WORD=binascii.hexlify('\x62') self.EM13_WORD=binascii.hexlify('\x63') self.EM14_WORD=binascii.hexlify('\x64') self.EM15_WORD=binascii.hexlify('\x65') self.EM16_WORD=binascii.hexlify('\x66') self.EM17_WORD=binascii.hexlify('\x67') self.EM18_WORD=binascii.hexlify('\x68') self.EM_CURR_BANK_BIT=binascii.hexlify('\x0A') self.EM_CURR_BANK_WORD=binascii.hexlify('\x98') self.EM_CURR_BANK_NUMBER=binascii.hexlify('\xBC') self.TASK_FLAG_BIT=binascii.hexlify('\x06') self.TASK_FLAG_STATUS=binascii.hexlify('\x46') self.INDEX_REGISTER=binascii.hexlify('\xDC') self.DATA_REGISTER=binascii.hexlify('\xBC') self.CLOCK_PULSES=binascii.hexlify('\x07') self.CONDITION_FLAGS=binascii.hexlify('\x07') class FinsCommandCode: def __init__(self): """Hex code for fins command code Each fins command has a corresponding hex code. This class provides name-based access to them. """ self.MEMORY_AREA_READ=binascii.hexlify('\x01\x01') self.MEMORY_AREA_WRITE=binascii.hexlify('\x01\x02') self.MEMORY_AREA_FILL=binascii.hexlify('\x01\x03') self.MULTIPLE_MEMORY_AREA_READ=binascii.hexlify('\x01\x04') self.MEMORY_AREA_TRANSFER=binascii.hexlify('\x01\x05') self.PARAMETER_AREA_READ=binascii.hexlify('\x02\x01') self.PARAMETER_AREA_WRITE=binascii.hexlify('\x02\x02') self.PARAMETER_AREA_FILL=binascii.hexlify('\x02\x03') self.PROGRAM_AREA_READ=binascii.hexlify('\x03\x06') self.PROGRAM_AREA_WRITE=binascii.hexlify('\x03\x07') self.PROGRAM_AREA_CLEAR=binascii.hexlify('\x03\x08') self.RUN=binascii.hexlify('\x04\x01') self.STOP=binascii.hexlify('\x04\x02') self.CPU_UNIT_DATA_READ=binascii.hexlify('\x05\x01') self.CONNECTION_DATA_READ=binascii.hexlify('\x05\x02') self.CPU_UNIT_STATUS_READ=binascii.hexlify('\x06\x01') self.CYCLE_TIME_READ=binascii.hexlify('\x06\x20') self.CLOCK_READ=binascii.hexlify('\x07\x01') self.CLOCK_WRITE=binascii.hexlify('\x07\x02') self.MESSAGE_READ=binascii.hexlify('\x09\x20') self.ACCESS_RIGHT_ACQUIRE=binascii.hexlify('\x0C\x01') self.ACCESS_RIGHT_FORCED_ACQUIRE=binascii.hexlify('\x0C\x02') self.ACCESS_RIGHT_RELEASE=binascii.hexlify('\x0C\x03') self.ERROR_CLEAR=binascii.hexlify('\x21\x01') self.ERROR_LOG_READ=binascii.hexlify('\x21\x02') self.ERROR_LOG_CLEAR=binascii.hexlify('\x21\x03') self.FINS_WRITE_ACCESS_LOG_READ=binascii.hexlify('\x21\x40') self.FINS_WRITE_ACCESS_LOG_CLEAR=binascii.hexlify('\x21\x41') self.FILE_NAME_READ=binascii.hexlify('\x22\x01') self.SINGLE_FILE_READ=binascii.hexlify('\x22\x02') self.SINGLE_FILE_WRITE=binascii.hexlify('\x22\x03') self.FILE_MEMORY_FORMAT=binascii.hexlify('\x22\x04') self.FILE_DELETE=binascii.hexlify('\x22\x05') self.FILE_COPY=binascii.hexlify('\x22\x07') self.FILE_NAME_CHANGE=binascii.hexlify('\x22\x08') self.MEMORY_AREA_FILE_TRANSFER=binascii.hexlify('\x22\x0A') self.PARAMETER_AREA_FILE_TRANSFER=binascii.hexlify('\x22\x0B') self.PROGRAM_AREA_FILE_TRANSFER=binascii.hexlify('\x22\x0C') self.DIRECTORY_CREATE_DELETE=binascii.hexlify('\x22\x15') self.MEMORY_CASSETTE_TRANSFER=binascii.hexlify('\x22\x20') self.FORCED_SET_RESET=binascii.hexlify('\x23\x01') self.FORCED_SET_RESET_CANCEL=binascii.hexlify('\x23\x02') self.CONVERT_TO_COMPOWAY_F_COMMAND=binascii.hexlify('\x28\x03') self.CONVERT_TO_MODBUS_RTU_COMMAND=binascii.hexlify('\x28\x04') self.CONVERT_TO_MODBUS_ASCII_COMMAND=binascii.hexlify('\x28\x05') class FinsResponseEndCode: def __init__(self): self.NORMAL_COMPLETION=binascii.hexlify('\x00\x00') self.SERVICE_CANCELLED=binascii.hexlify('\x00\x01') #class FinsConnection(metaclass=ABCMeta): class FinsConnection(metaclass): def __init__(self): self.dest_node_add=0 self.srce_node_add=0 self.dest_net_add=0 self.srce_net_add=0 self.dest_unit_add=0 self.srce_unit_add=0 @abstractmethod def execute_fins_command_frame(self,fins_command_frame): pass def fins_command_frame(self,command_code,text=binascii.hexlify(''), service_id=binascii.hexlify('\x60'), icf=binascii.hexlify('\x80'),gct=binascii.hexlify('\x07'),rsv=binascii.hexlify('\x00')): command_bytes=icf+rsv+gct+\ self.dest_net_add.to_bytes(1,'big')+self.dest_node_add.to_bytes(1,'big')+\ self.dest_unit_add.to_bytes(1,'big')+self.srce_net_add.to_bytes(1,'big')+\ self.srce_node_add.to_bytes(1,'big')+self.srce_unit_add.to_bytes(1,'big')+\ service_id+command_code+text return command_bytes def plc_program_to_file(self,filename,number_of_read_bytes=992): """Read the program from the connected FINS device :param filename: Filename to write the program from the FINS device :param number_of_read_bytes: Bytes to read from the device per cycle(default 992) """ program_buffer=binascii.hexlify('') output_file=open(filename,'wb') done=False current_word=0 while not done: response=self.program_area_read(current_word,number_of_read_bytes) #Strip FINS frame headers from response response=response[10:] #The MSB of the 10th Byte of response is the last word of data flag done = response[10]>0x80 #Strip command information from response leaving only program data response=response[12:] program_buffer+=response current_word+=number_of_read_bytes output_file.write(program_buffer) def file_to_plc_program(self,filename,number_of_write_bytes=992): """Write a stored hex program to the connected FINS device :param filename: Filename to write the program from the FINS device :param number_of_write_bytes: Bytes to write per cycle(default 992) """ program_buffer=binascii.hexlify('') input_file=open(filename,'rb') program_buffer+=input_file.read() if len(program_buffer)%number_of_write_bytes != 0: write_cycles = len(program_buffer)//992+1 else: write_cycles = len(program_buffer)//992 current_word=0 #PLC must be in program mode to do a program area write self.change_to_program_mode() for i in range(write_cycles): number_of_write_bytes_with_completion_flag=number_of_write_bytes if i == write_cycles-1: number_of_write_bytes=len(program_buffer)%number_of_write_bytes number_of_write_bytes_with_completion_flag=number_of_write_bytes+0x8000 current_data=program_buffer[current_word:current_word+number_of_write_bytes] self.program_area_write(current_word,number_of_write_bytes_with_completion_flag,current_data) current_word+=number_of_write_bytes #Change back to run mode after PLC program is written self.change_to_run_mode() def memory_area_read(self,memory_area_code,beginning_address=binascii.hexlify('\x00\x00\x00'),number_of_items=1): """Function to read PLC memory areas :param memory_area_code: Memory area to read :param beginning_address: Beginning address :param number_of_items: Number of items to read :return: response """ assert len(beginning_address)==3 data = memory_area_code+beginning_address+number_of_items.to_bytes(2,'big') response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().MEMORY_AREA_READ,data)) return response def memory_area_write(self,memory_area_code,beginning_address=binascii.hexlify('\x00\x00\x00'), write_bytes=binascii.hexlify(''), number_of_items=0): """Function to write PLC memory areas :param memory_area_code: Memory area to write :param beginning_address: Beginning address :param write_bytes: The bytes to write :return: response """ assert len(beginning_address)==3 data = memory_area_code+beginning_address+number_of_items.to_bytes(2,'big')+write_bytes response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().MEMORY_AREA_WRITE,data)) return response def program_area_read(self,beginning_word,number_of_bytes=992): """Function to read PLC program area :param beginning_word: Word to start read :param number_of_bytes: Number of bytes to read :return: response """ program_number=binascii.hexlify('\xff\xff') data=program_number+beginning_word.to_bytes(4,'big')+number_of_bytes.to_bytes(2,'big') response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().PROGRAM_AREA_READ,data)) return response def program_area_write(self,beginning_word,number_of_bytes,program_data): """Function to write data to PLC program area :param beginning_word: Word to start write :param number_of_bytes: Number of bytes to write :param program_data: List with end code and response :return: """ program_number=binascii.hexlify('\xff\xff') data=program_number+beginning_word.to_bytes(4,'big')+number_of_bytes.to_bytes(2,'big')+program_data response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().PROGRAM_AREA_WRITE,data) ) return response def cpu_unit_data_read(self,data=binascii.hexlify('')): """Function to read CPU unit data :param data: :return: """ response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().CPU_UNIT_DATA_READ,data) ) return response def cpu_unit_status_read(self): """Function to read CPU unit status :return: """ response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().CPU_UNIT_STATUS_READ) ) return response def change_to_run_mode(self): """Function to change PLC to run mode :return: """ response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().RUN) ) return response def change_to_program_mode(self): """ Function to change PLC to program mode :return: """ response=self.execute_fins_command_frame( self.fins_command_frame(FinsCommandCode().STOP) ) return response # udp import socket #from fins import FinsConnection class UDPFinsConnection(FinsConnection): """ """ def __init__(self): super().__init__() self.BUFFER_SIZE=4096 self.fins_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) self.ip_address='10.162.92.30' self.fins_port=None def execute_fins_command_frame(self,fins_command_frame): """Sends FINS command using this connection Implements the abstract method required of FinsConnection :param fins_command_frame: :return: :raise: """ response = "" self.fins_socket.sendto(fins_command_frame,(self.ip_address,9600)) try: response=self.fins_socket.recv(self.BUFFER_SIZE) except : # Exception as err: print(err) return response def connect(self, IP_Address, Port=9600): """Establish a connection for FINS communications :param IP_Address: The IP address of the device you are connecting to :param Port: The port that the device and host should listen on (default 9600) """ self.fins_port=Port self.ip_address=IP_Address self.fins_socket.bind(('',Port)) self.fins_socket.settimeout(1.0) def __del__(self): self.fins_socket.close() #Program: The following program read CIO 100 (0x1f) , and then write to tag 'test' fins_instance = fins.udp.UDPFinsConnection() fins_instance.connect('10.162.92.30') fins_instance.dest_node_add=30 fins_instance.srce_node_add=238 mem_area = fins_instance.memory_area_read(fins.FinsPLCMemoryAreas().CIO_WORD,binascii.hexlify('\x00\x64\x00')) system.tag.write('test',mem_area)