diff --git a/README.md b/README.md index ea062ee..83e1acb 100644 --- a/README.md +++ b/README.md @@ -35,19 +35,23 @@ It is tested with Ubuntu / Windows Subsystem for Linux, Windows. ## Usage ``` -usage: epson_print_conf.py [-h] -m MODEL -a HOSTNAME [-i] [-q QUERY] [--reset_waste_ink] [--detect-key] [-d] - [-e DUMP_EEPROM DUMP_EEPROM] [--dry-run] [--write-first-ti-received-time FTRT FTRT FTRT] - [-R READ_EEPROM] [-W WRITE_EEPROM] [-S WS_TO_STRING] [-t TIMEOUT] [-r RETRIES] +usage: epson_print_conf.py [-h] -m MODEL -a HOSTNAME [-i] [-q QUERY] [--reset_waste_ink] + [--detect-key] [-d] [-e DUMP_EEPROM DUMP_EEPROM] [--dry-run] + [--write-first-ti-received-time FTRT FTRT FTRT] + [-R READ_EEPROM] [-W WRITE_EEPROM] [-S WS_TO_STRING] + [-t TIMEOUT] [-r RETRIES] [-c CONFIG_FILE] optional arguments: -h, --help show this help message and exit -m MODEL, --model MODEL - Printer model. Example: -m XP-205 (use ? to print all supported models) + Printer model. Example: -m XP-205 (use ? to print all supported + models) -a HOSTNAME, --address HOSTNAME Printer host name or IP address. (Example: -m 192.168.1.87) -i, --info Print all available information and statistics (default option) -q QUERY, --query QUERY - Print specific information. (Use ? to list all available queries) + Print specific information. (Use ? to list all available + queries) --reset_waste_ink Reset all waste ink levels to 0 --detect-key Detect the read_key via brute force -d, --debug Print debug information @@ -57,15 +61,20 @@ optional arguments: --write-first-ti-received-time FTRT FTRT FTRT Change the first TI received time (arguments: year, month, day) -R READ_EEPROM, --read-eeprom READ_EEPROM - Read the values of a list of printer EEPROM addreses. Format is: address [, ...] + Read the values of a list of printer EEPROM addreses. Format is: + address [, ...] -W WRITE_EEPROM, --write-eeprom WRITE_EEPROM - Write related values to a list of printer EEPROM addresses. Format is: address: value [, ...] + Write related values to a list of printer EEPROM addresses. + Format is: address: value [, ...] -S WS_TO_STRING, --write-sequence-to-string WS_TO_STRING Convert write sequence of numbers to string. -t TIMEOUT, --timeout TIMEOUT SNMP GET timeout (floating point argument) -r RETRIES, --retries RETRIES SNMP GET retries (floating point argument) + -c CONFIG_FILE, --config CONFIG_FILE + read a configuration file including the full log dump of a + previous operation instead of accessing the printer via SNMP Epson Printer Configuration via SNMP (TCP/IP) ``` diff --git a/epson_print_conf.py b/epson_print_conf.py index 881616c..3d022fd 100644 --- a/epson_print_conf.py +++ b/epson_print_conf.py @@ -320,6 +320,7 @@ class EpsonPrinter: printer_model: str hostname: str parm: dict + mib_dict: dict = {} ink_color_ids = { # Ink color 0x00: 'Black', @@ -459,8 +460,18 @@ class EpsonPrinter: else: return write_op - def snmp_mib(self, mib): + def snmp_mib(self, mib, label="unknown"): """Generic SNMP query, returning value of a MIB.""" + if self.mib_dict: + if mib not in self.mib_dict: + logging.error( + "MIB '%s' not valued in the configuration file. " + "Operation: %s", + mib, + label + ) + return False + return self.mib_dict[mib] if not self.hostname: return False utt = UdpTransportTarget( @@ -479,15 +490,20 @@ class EpsonPrinter: for response in iterator: errorIndication, errorStatus, errorIndex, varBinds = response if errorIndication: - logging.info("snmp_mib error: %s", errorIndication) + logging.info( + "snmp_mib error: %s. MIB: %s. Operation: %s", + errorIndication, mib, label + ) if " timed out" in errorIndication: raise TimeoutError(errorIndication) return False elif errorStatus: logging.info( - 'snmp_mib PDU error: %s at %s', + 'snmp_mib PDU error: %s at %s. MIB: %s. Operation: %s', errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex) - 1][0] or '?' + errorIndex and varBinds[int(errorIndex) - 1][0] or '?', + mib, + label ) return False else: @@ -496,9 +512,17 @@ class EpsonPrinter: return varBind[1].asOctets() else: return varBind[1].prettyPrint() - logging.info("snmp_mib value error: invalid multiple data") + logging.info( + "snmp_mib value error: invalid multiple data. " + "MIB: %s. Operation: %s", + mib, + label + ) return False - logging.info("snmp_mib value error: invalid data") + logging.info( + "snmp_mib value error: invalid data. MIB: %s. Operation: %s", + label + ) return False def read_eeprom( @@ -513,7 +537,7 @@ class EpsonPrinter: f" OID: {oid}={hex(oid)}" ) response = self.snmp_mib( - self.eeprom_oid_read_address(oid, label=label)) + self.eeprom_oid_read_address(oid, label=label), label=label) if not response: return None logging.debug(f" RESPONSE: {repr(response)}") @@ -557,12 +581,13 @@ class EpsonPrinter: response = self.read_eeprom(oid, label=label) logging.debug(f"Previous value for {label}: {response}") oid_string = self.eeprom_oid_write_address(oid, value, label=label) - response = self.snmp_mib(oid_string) logging.debug( f"EEPROM_WRITE {label}:\n" f" ADDRESS: {oid_string}\n" - f" OID: {oid}={hex(oid)}" + f" OID: {oid}={hex(oid)}\n" + f" VALUE: {value} = {hex(int(value))}" ) + response = self.snmp_mib(oid_string, label=label) if response: logging.debug(f" RESPONSE: {repr(response)}") if not self.dry_run and response and not ":OK;" in repr(response): @@ -888,7 +913,12 @@ class EpsonPrinter: else: snmp_info = self.snmp_info for name, oid in snmp_info.items(): - result = self.snmp_mib(oid) + logging.debug( + f"SNMP_DUMP {name}:\n" + f" ADDRESS: {oid}" + ) + result = self.snmp_mib(oid, label="get_snmp_info " + name) + logging.debug(f" RESPONSE: {repr(result)}") if name == "hex_data" and result is not False: sys_info[name] = result.hex(" ").upper() elif name == "UpTime" and result is not False: @@ -969,10 +999,16 @@ class EpsonPrinter: def get_firmware_version(self) -> str: """Return firmware version.""" - firmware_string = self.snmp_mib( - f"{self.eeprom_link}.118.105.1.0.0") + oid = f"{self.eeprom_link}.118.105.1.0.0" + label = "get_firmware_version" + logging.debug( + f"SNMP_DUMP {label}:\n" + f" ADDRESS: {oid}" + ) + firmware_string = self.snmp_mib(oid, label=label) if not firmware_string: return None + logging.debug(f" RESPONSE: {repr(firmware_string)}") firmware = re.sub( r".*vi:00:(.{6}).*", r'\g<1>', firmware_string.decode()) year = ord(firmware[4:5]) + 1945 @@ -983,10 +1019,16 @@ class EpsonPrinter: def get_cartridges(self) -> str: """Return list of cartridge types.""" - cartridges_string = self.snmp_mib( - f"{self.eeprom_link}.105.97.1.0.0") + oid = f"{self.eeprom_link}.105.97.1.0.0" + label = "get_cartridges" + logging.debug( + f"SNMP_DUMP {label}:\n" + f" ADDRESS: {oid}" + ) + cartridges_string = self.snmp_mib(oid, label=label) if not cartridges_string: return None + logging.debug(f" RESPONSE: {repr(cartridges_string)}") cartridges = re.sub( r".*IA:00;(.*);.*", r'\g<1>', cartridges_string.decode(), @@ -1020,7 +1062,7 @@ class EpsonPrinter: """Return printer status and ink levels.""" address = f"{self.eeprom_link}.115.116.1.0.1" logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}") - result = self.snmp_mib(address) + result = self.snmp_mib(address, label="get_printer_status") if not result: return None logging.debug(f" RESPONSE: {repr(result[:20])}...\n%s", @@ -1080,7 +1122,7 @@ class EpsonPrinter: f"Cartridge {i}:\n" f" ADDRESS: {mib}" ) - cartridge = self.snmp_mib(mib) + cartridge = self.snmp_mib(mib, label="get_cartridge_information") logging.debug(f" RESPONSE: {repr(cartridge)}") if not cartridge: continue @@ -1208,6 +1250,150 @@ class EpsonPrinter: except Exception: return None + def read_config_file(self, file): + mib_dict = {} + next_line = None + try: + while True: + line = next_line if next_line != None else next(file) + next_line = None + oid = None + value = None + process = None + address_val = None + response_val = None + response_val_bytes = None + if line.startswith("PRINTER_STATUS:"): + oid = False + value = False + process = True + response_next = True + if line.startswith("Cartridge "): + oid = False + value = False + process = True + response_next = False + if line.startswith("SNMP_DUMP "): + oid = False + value = False + process = True + response_next = False + if line.startswith("EEPROM_DUMP "): + oid = True + value = False + process = True + response_next = False + if line.startswith("EEPROM_WRITE "): + oid = True + value = True + process = True + response_next = False + if process: + # address + address_line = ( + next_line if next_line != None else next(file) + ) + next_line = None + if not address_line.startswith(" ADDRESS: "): + logging.error( + "Missing ADDRESS: '%s'", address_line.rstrip()) + next_line = address_line + continue + address_val = address_line[11:].rstrip() + if not address_val: + logging.error( + "Invalid ADDRESS: '%s'", address_line.rstrip()) + next_line = address_line + continue + # oid + if oid: + oid_line = ( + next_line if next_line != None else next(file) + ) + next_line = None + if not oid_line.startswith(" OID: "): + logging.error( + "Missing OID: '%s'", oid_line.rstrip()) + next_line = oid_line + continue + # value + if value: + value_line = ( + next_line if next_line != None else next(file) + ) + next_line = None + if not value_line.startswith(" VALUE: "): + logging.error( + "Missing VALUE: '%s'", value_line.rstrip()) + next_line = value_line + continue + # response + response_line = ( + next_line if next_line != None else next(file) + ) + next_line = None + if response_line.startswith(" RESPONSE: "): + response_val = response_line[12:].rstrip() + if not response_val: + logging.error( + "Invalid RESPONSE '%s'", response_line.rstrip()) + next_line = response_line + continue + if response_next: + dump_hex_str = "" + while True: + dump_hex = ( + next_line if next_line != None + else next(file) + ) + next_line = None + if not dump_hex.startswith(" "): + next_line = dump_hex + break + try: + val = bytes.fromhex(dump_hex) + except ValueError: + next_line = dump_hex + continue + dump_hex_str += dump_hex + if not dump_hex_str: + logging.error( + "Invalid DUMP: '%s'", dump_hex.rstrip()) + next_line = dump_hex + continue + try: + val = bytes.fromhex(dump_hex_str) + except ValueError: + logging.error( + "Invalid DUMP %s", dump_hex_str.rstrip()) + next_line = dump_hex + continue + if val: + mib_dict[address_val] = val + else: + try: + response_val_bytes = ast.literal_eval( + response_val) + except Exception as e: + logging.error( + "Invalid response %s: %s", + response_line.rstrip(), + e + ) + next_line = response_line + continue + if response_val_bytes: + mib_dict[address_val] = response_val_bytes + else: + logging.error( + "Null value for response %s", + response_line.rstrip() + ) + next_line = response_line + except StopIteration: + pass + return mib_dict + if __name__ == "__main__": import argparse @@ -1325,6 +1511,16 @@ if __name__ == "__main__": default=None, help='SNMP GET retries (floating point argument)', ) + parser.add_argument( + '-c', + "--config", + dest='config_file', + type=argparse.FileType('r'), + help="read a configuration file including the full log dump of a " + "previous operation instead of accessing the printer via SNMP", + default=0, + nargs=1, + metavar='CONFIG_FILE') args = parser.parse_args() logging_level = logging.WARNING @@ -1355,6 +1551,11 @@ if __name__ == "__main__": timeout=args.timeout, retries=args.retries, dry_run=args.dry_run) + if args.config_file: + printer.mib_dict = printer.read_config_file(args.config_file[0]) + if not printer.mib_dict: + print("Error while reading configuration file") + quit(1) if not printer.parm: print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join( printer.valid_printers), @@ -1397,7 +1598,11 @@ if __name__ == "__main__": args.dump_eeprom[0] % 256, int(args.dump_eeprom[1] % 256) ).items(): - print(f"{str(addr).rjust(3)}: {val:#04x} = {str(val).rjust(3)}") + print( + f"EEPROM_ADDR {hex(addr).rjust(4)} = " + f"{str(addr).rjust(3)}: " + f"{val:#04x} = {str(val).rjust(3)}" + ) if args.query: print_opt = True if ("stats" in printer.parm and @@ -1455,12 +1660,16 @@ if __name__ == "__main__": read_list = re.split(',\s*', args.read_eeprom[0]) for value in read_list: try: - val = printer.read_eeprom( - ast.literal_eval(value), label='read_eeprom') + addr = int(ast.literal_eval(value)) + val = printer.read_eeprom(addr, label='read_eeprom') if val is None: print("EEPROM read error.") else: - print(f"0x{val}={int(val, 16)}") + print( + f"EEPROM_ADDR {hex(addr).rjust(4)} = " + f"{str(addr).rjust(3)}: " + f"{int(val):#04x} = {val.rjust(3)}" + ) except (ValueError, SyntaxError): print("invalid argument for read_eeprom") quit(1)