diff --git a/README.md b/README.md index 41fb6dd..0632d48 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,9 @@ Notes (at the time of writing): - [before pysnmp, install pyasn1 with version 0.4.8 and not 0.5](https://github.com/etingof/pysnmp/issues/440#issuecomment-1544341598) - [pull pysnmp from the GitHub master branch, not from PyPI](https://stackoverflow.com/questions/54868134/snmp-reading-from-an-oid-with-three-libraries-gives-different-execution-times#comment96532761_54869361) -Tested with Ubuntu / Windows Subsystem for Linux, Windows. +This program exploits [pysnmp](https://github.com/etingof/pysnmp), with related [documentation](https://pysnmp.readthedocs.io/). + +It is tested with Ubuntu / Windows Subsystem for Linux, Windows. ## Usage @@ -65,7 +67,7 @@ optional arguments: -r RETRIES, --retries RETRIES SNMP GET retries (floating point argument) -Epson Printer Configuration accessed via SNMP (TCP/IP) +Epson Printer Configuration via SNMP (TCP/IP) ``` Examples: @@ -138,6 +140,13 @@ printer.brute_force_read_key() printer.write_first_ti_received_time(2000, 1, 2) ``` +### Exception + +``` +TimeoutError +(pysnmp exceptions) +``` + ## Output example Example of advanced printer status with an XP-205 printer: diff --git a/epson_print_conf.py b/epson_print_conf.py index 57519c2..d705611 100644 --- a/epson_print_conf.py +++ b/epson_print_conf.py @@ -13,6 +13,7 @@ import time import textwrap import ast from pysnmp.hlapi.v1arch import * +from pyasn1.type.univ import OctetString as OctetStringType class EpsonPrinter: @@ -429,28 +430,29 @@ class EpsonPrinter: errorIndication, errorStatus, errorIndex, varBinds = response if errorIndication: if self.debug: - print("snmp_mib error", errorIndication) + print("snmp_mib error:", errorIndication) + if " timed out" in errorIndication: + raise TimeoutError(errorIndication) return False elif errorStatus: if self.debug: print( - 'snmp_mib error: %s at %s' % ( + 'snmp_mib PDU error: %s at %s' % ( errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or '?') ) return False else: for varBind in varBinds: - try: - return "".join( - [chr(x) for x in varBind[1].asNumbers()]) - except Exception: + if isinstance(varBind[1], OctetStringType): + return varBind[1].asOctets() + else: return varBind[1].prettyPrint() if self.debug: - print("snmp_mib error: invalid multiple data") + print("snmp_mib value error: invalid multiple data") return False if self.debug: - print("snmp_mib error: invalid multiple data") + print("snmp_mib value error: invalid data") return False def read_eeprom( @@ -467,10 +469,12 @@ class EpsonPrinter: ) response = self.snmp_mib( self.eeprom_oid_read_address(oid, label=label)) + if not response: + return None if self.debug: print(f" RESPONSE: {repr(response)}") try: - response = re.findall(r"EE:[0-9A-F]{6}", response)[0][3:] + response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:] except (TypeError, IndexError): if self.debug: print(f"Invalid read key.") @@ -507,14 +511,7 @@ class EpsonPrinter: response = self.read_eeprom(oid, label=label) print(f"Previous value for {label}: {response}") oid_string = self.eeprom_oid_write_address(oid, value, label=label) - response = None - try: - response = self.get(oid_string) - except easysnmp.exceptions.EasySNMPTimeoutError as e: - if not self.dry_run: - raise TimeoutError(str(e)) - except Exception as e: - raise ValueError(str(e)) + response = self.snmp_mib(oid_string) if self.debug: print( f"EEPROM_WRITE {label}:\n" @@ -522,9 +519,11 @@ class EpsonPrinter: f" OID: {oid}={hex(oid)}" ) if self.debug and response: - print(f" RESPONSE: {repr(response.value)}") - if response and not ":OK;" in repr(response.value): # ":NA;" is an error - return False + print(f" RESPONSE: {repr(response)}") + if not self.dry_run and response and not ":OK;" in repr(response): + if self.debug: + print("Write error") + return False # ":NA;" is an error return True def status_parser(self, data): @@ -690,23 +689,21 @@ class EpsonPrinter: else: snmp_info = self.snmp_info for name, oid in snmp_info.items(): - try: - sys_info[name] = self.snmp_mib(oid) - except Exception as e: + result = self.snmp_mib(oid) + if name == "hex_data" and result is not False: + sys_info[name] = result.hex(" ").upper() + elif name == "UpTime" and result is not False: + sys_info[name] = time.strftime( + '%H:%M:%S', time.gmtime(int(result)/100)) + elif name == "MAC Address" and result is not False: + sys_info[name] = result.hex("-").upper() + elif isinstance(result, bytes): + sys_info[name] = result.decode() + elif isinstance(result, str): + sys_info[name] = result + else: if self.debug: - print( - f"No value for SNMP OID '{name}'. " - f"MIB: {oid}. Error: {e}" - ) - if "hex_data" in sys_info and sys_info["hex_data"] is not False: - sys_info["hex_data"] = bytes( - [ord(i) for i in sys_info["hex_data"]]).hex(" ").upper() - if "UpTime" in sys_info and sys_info["UpTime"] is not False: - sys_info["UpTime"] = time.strftime( - '%H:%M:%S', time.gmtime(int(sys_info["UpTime"])/100)) - if "MAC Address" in sys_info and sys_info["MAC Address"] is not False: - sys_info["MAC Address"] = bytes( - [ord(i) for i in sys_info["MAC Address"]]).hex("-").upper() + print(f"No value for SNMP OID '{name}'. MIB: {oid}.") return sys_info def get_serial_number(self) -> str: @@ -766,7 +763,8 @@ class EpsonPrinter: f"{self.eeprom_link}.118.105.1.0.0") if not firmware_string: return None - firmware = re.sub(r".*vi:00:(.{6}).*", r'\g<1>', firmware_string) + firmware = re.sub( + r".*vi:00:(.{6}).*", r'\g<1>', firmware_string.decode()) year = ord(firmware[4:5]) + 1945 month = int(firmware[5:], 16) day = int(firmware[2:4]) @@ -780,7 +778,10 @@ class EpsonPrinter: if not cartridges_string: return None cartridges = re.sub( - r".*IA:00;(.*);.*", r'\g<1>', cartridges_string, flags=re.S) + r".*IA:00;(.*);.*", r'\g<1>', + cartridges_string.decode(), + flags=re.S + ) return [i.strip() for i in cartridges.split(',')] def get_ink_replacement_counters(self) -> str: @@ -810,13 +811,12 @@ class EpsonPrinter: if self.debug: print( textwrap.fill( - "PRINTER_STATUS: " + bytes( - [ord(i) for i in result]).hex(" "), + "PRINTER_STATUS: " + repr(result), initial_indent="", subsequent_indent=" ", ) ) - return self.status_parser(bytes([ord(i) for i in result])) + return self.status_parser(result) def get_waste_ink_levels(self): """Return waste ink levels as a percentage.""" @@ -892,12 +892,20 @@ class EpsonPrinter: return False return True - def list_known_keys(self, debug=False): + def list_known_keys(self): for model, chars in self.PRINTER_CONFIG.items(): if 'write_key' in chars: - print(f"{repr(model).rjust(25)}: {repr(chars['read_key']).rjust(10)} - {repr(chars['write_key'])[1:]}") + print( + f"{repr(model).rjust(25)}: " + f"{repr(chars['read_key']).rjust(10)} - " + f"{repr(chars['write_key'])[1:]}" + ) else: - print(f"{repr(model).rjust(25)}: {repr(chars['read_key']).rjust(10)} (unknown write key)") + print( + f"{repr(model).rjust(25)}: " + f"{repr(chars['read_key']).rjust(10)} " + f"(unknown write key)" + ) def brute_force_read_key( self, minimum: int = 0x00, maximum: int = 0xFF, debug=False @@ -926,7 +934,7 @@ if __name__ == "__main__": from pprint import pprint parser = argparse.ArgumentParser( - epilog='Epson Printer Configuration accessed via SNMP (TCP/IP)') + epilog='Epson Printer Configuration via SNMP (TCP/IP)') parser.add_argument( '-m', @@ -1056,25 +1064,25 @@ if __name__ == "__main__": try: if args.ws_to_string: print_opt = True - print(self.write_sequence_to_string(args.ws_to_string)) + print(printer.write_sequence_to_string(args.ws_to_string)) if args.reset_waste_ink: print_opt = True - if self.reset_waste_ink_levels(): + if printer.reset_waste_ink_levels(): print("Reset waste ink levels done.") else: print("Failed to reset waste ink levels. Check configuration.") if args.detect_key: print_opt = True - read_key = self.brute_force_read_key(debug=True) + read_key = printer.brute_force_read_key(debug=True) if read_key: print(f"read_key found: {read_key}") print("List of known keys:") - self.list_known_keys(debug=True) + printer.list_known_keys() else: print(f"Cannot found read_key") if args.ftrt: print_opt = True - if self.write_first_ti_received_time( + if printer.write_first_ti_received_time( int(args.ftrt[0]), int(args.ftrt[1]), int(args.ftrt[2])): print("Write first TI received time done.") else: @@ -1084,7 +1092,7 @@ if __name__ == "__main__": ) if args.dump_eeprom: print_opt = True - for addr, val in self.dump_eeprom( + for addr, val in printer.dump_eeprom( args.dump_eeprom[0] % 256, int(args.dump_eeprom[1] % 256) ).items(): @@ -1093,13 +1101,13 @@ if __name__ == "__main__": print_opt = True if ("stats" in printer.parm and args.query[0] in printer.parm["stats"]): - ret = self.get_stats(args.query[0]) + ret = printer.get_stats(args.query[0]) if ret: pprint(ret) else: print("No information returned. Check printer definition.") elif args.query[0] in printer.snmp_info.keys(): - ret = self.get_snmp_info(args.query[0]) + ret = printer.get_snmp_info(args.query[0]) if ret: pprint(ret) else: @@ -1110,7 +1118,7 @@ if __name__ == "__main__": else: method = "get_" + args.query[0] if method in printer.list_methods: - ret = self.__getattribute__(method)() + ret = printer.__getattribute__(method)() if ret: pprint(ret) else: @@ -1146,7 +1154,7 @@ if __name__ == "__main__": read_list = re.split(',\s*', args.read_eeprom[0]) for value in read_list: try: - val = self.read_eeprom( + val = printer.read_eeprom( ast.literal_eval(value), label='read_eeprom') if val is None: print("EEPROM read error.") @@ -1162,7 +1170,7 @@ if __name__ == "__main__": key, val = re.split(':|=', key_val) try: val_int = ast.literal_eval(val) - if not self.write_eeprom( + if not printer.write_eeprom( ast.literal_eval(key), str(val_int), label='write_eeprom' ):