Refinements

This commit is contained in:
Ircama 2023-07-26 12:55:29 +02:00
parent a8c6f677f0
commit bfd8e4a402

View file

@ -28,13 +28,13 @@ class EpsonPrinter:
"printer_head_id_h": range(122, 127), "printer_head_id_h": range(122, 127),
"printer_head_id_f": [136, 137, 138, 129], "printer_head_id_f": [136, 137, 138, 129],
"stats": { "stats": {
"Manual cleaning counter": [[147], "0"], "Manual cleaning counter": [147],
"Timer cleaning counter": [[149], "0"], "Timer cleaning counter": [149],
"Ink replacement cleaning counter": [[148], "0"], "Ink replacement cleaning counter": [148],
"Total print pass counter": [[171, 170, 169, 168], "0"], "Total print pass counter": [171, 170, 169, 168],
"Total print page counter": [[167, 166, 165, 164], "0"], "Total print page counter": [167, 166, 165, 164],
"Total scan counter": [[215, 214, 213, 212], "1"], "Total scan counter": [0x01d7, 0x01d6, 0x01d5, 0x01d4],
"First TI received time": [[173, 172], "0"] "First TI received time": [173, 172]
}, },
"ink_replacement_counters": { "ink_replacement_counters": {
"Black": { "1B": 242, "1S": 208, "1L": 209}, "Black": { "1B": 242, "1S": 208, "1L": 209},
@ -42,8 +42,7 @@ class EpsonPrinter:
"Magenta": { "1B": 251, "1S": 249, "1L": 250}, "Magenta": { "1B": 251, "1S": 249, "1L": 250},
"Cyan": { "1B": 245, "1S": 243, "1L": 244}, "Cyan": { "1B": 245, "1S": 243, "1L": 244},
}, },
"last_printer_fatal_errors": [60, 203, 204, 205, 206], "last_printer_fatal_errors": [60, 203, 204, 205, 206, 0x01d3],
"last_printer_fatal_err_ext": [211],
}, },
"Stylus Photo PX730WD": { "Stylus Photo PX730WD": {
"alias": ["Epson Artisan 730"], "alias": ["Epson Artisan 730"],
@ -52,14 +51,14 @@ class EpsonPrinter:
"main_waste": {"oids": [0xe, 0xf], "divider": 81.82}, "main_waste": {"oids": [0xe, 0xf], "divider": 81.82},
"borderless_waste": {"oids": [0x10, 0x11], "divider": 122.88}, "borderless_waste": {"oids": [0x10, 0x11], "divider": 122.88},
"stats": { "stats": {
"Manual cleaning counter": [[0x7e], "0"], "Manual cleaning counter": [0x7e],
"Timer cleaning counter": [[0x61], "0"], "Timer cleaning counter": [0x61],
"Total print pass counter": [[0x2C, 0x2D, 0x2E, 0x2F], "0"], "Total print pass counter": [0x2C, 0x2D, 0x2E, 0x2F],
"Total print page counter": [[0x9E, 0x9F], "0"], "Total print page counter": [0x9E, 0x9F],
"Total print page counter (duplex)": [[0xA0, 0xA1], "0"], "Total print page counter (duplex)": [0xA0, 0xA1],
"Total print CD-R counter": [[0x4A, 0x4B], "0"], "Total print CD-R counter": [0x4A, 0x4B],
"Total print CD-R tray open/close counter": [[0xA2, 0xA3], "0"], "Total print CD-R tray open/close counter": [0xA2, 0xA3],
"Total scan counter": [[0xDA, 0xDB, 0xDC, 0xDD], "1"], "Total scan counter": [0x01DA, 0x01DB, 0x01DC, 0x01DD],
}, },
"last_printer_fatal_errors": [0x3B, 0xC0, 0xC1, 0xC2, 0xC3, 0x5C], "last_printer_fatal_errors": [0x3B, 0xC0, 0xC1, 0xC2, 0xC3, 0x5C],
"ink_replacement_counters": { "ink_replacement_counters": {
@ -71,6 +70,7 @@ class EpsonPrinter:
"Light cyan": { "1S": 0x6E, "2S": 0x6F, "3S": 0x9B}, "Light cyan": { "1S": 0x6E, "2S": 0x6F, "3S": 0x9B},
}, },
"serial_number": range(0xE7, 0xF0), "serial_number": range(0xE7, 0xF0),
# untested
}, },
"WF-7525": { "WF-7525": {
"read_key": [101, 0], "read_key": [101, 0],
@ -78,6 +78,7 @@ class EpsonPrinter:
"main_waste": {"oids": [20, 21], "divider": 196.5}, "main_waste": {"oids": [20, 21], "divider": 196.5},
"borderless_waste": {"oids": [22, 23], "divider": 52.05}, "borderless_waste": {"oids": [22, 23], "divider": 52.05},
"serial_number": range(192, 202), "serial_number": range(192, 202),
# untested
}, },
"L355": { "L355": {
"read_key": [65, 9], "read_key": [65, 9],
@ -288,8 +289,15 @@ class EpsonSession(easysnmp.Session):
hostname=self.printer.hostname, community=community, version=version hostname=self.printer.hostname, community=community, version=version
) )
def read_eeprom_oid_address(self, oid: int, ext: str="0") -> str: def eeprom_oid_read_address(
self,
oid: int,
msb: int=0,
label: str="unknown method") -> str:
"""Return address for reading from EEPROM for specified OID.""" """Return address for reading from EEPROM for specified OID."""
if oid > 255:
msb = oid // 256
oid = oid % 256
return ( return (
f"{self.printer.eeprom_link}" f"{self.printer.eeprom_link}"
".124.124" # || (0x7C 0x7C) ".124.124" # || (0x7C 0x7C)
@ -297,12 +305,19 @@ class EpsonSession(easysnmp.Session):
f".{self.printer.parm['read_key'][0]}" f".{self.printer.parm['read_key'][0]}"
f".{self.printer.parm['read_key'][1]}" f".{self.printer.parm['read_key'][1]}"
".65.190.160" ".65.190.160"
f".{oid}.{ext}" f".{oid}.{msb}"
) )
def write_eeprom_oid_address( def eeprom_oid_write_address(
self, oid: int, value: Any, ext: str="0") -> str: self,
oid: int,
value: Any,
msb: int=0,
label: str="unknown method") -> str:
"""Return address for writing to EEPROM for specified OID.""" """Return address for writing to EEPROM for specified OID."""
if oid > 255:
msb = oid // 256
oid = oid % 256
write_op = ( write_op = (
f"{self.printer.eeprom_link}" f"{self.printer.eeprom_link}"
".124.124" # || (0x7C 0x7C) ".124.124" # || (0x7C 0x7C)
@ -310,12 +325,12 @@ class EpsonSession(easysnmp.Session):
f".{self.printer.parm['read_key'][0]}" f".{self.printer.parm['read_key'][0]}"
f".{self.printer.parm['read_key'][1]}" f".{self.printer.parm['read_key'][1]}"
".66.189.33" ".66.189.33"
f".{oid}.{ext}.{value}" f".{oid}.{msb}.{value}"
f".{self.printer.caesar(self.printer.parm['write_key'])}" f".{self.printer.caesar(self.printer.parm['write_key'])}"
) )
if self.dry_run: if self.dry_run:
print("WRITE_DRY_RUN:", write_op) print("WRITE_DRY_RUN:", write_op)
return self.read_eeprom_oid_address(oid, ext) return self.eeprom_oid_read_address(oid, label=label)
else: else:
return write_op return write_op
@ -329,18 +344,23 @@ class EpsonSession(easysnmp.Session):
raise ValueError(str(e)) raise ValueError(str(e))
return value return value
def read_eeprom(self, oid: int, ext: str="0") -> str: def read_eeprom(
self,
oid: int,
label: str="unknown method") -> str:
"""Read EEPROM data.""" """Read EEPROM data."""
response = self.read_value(self.read_eeprom_oid_address(oid, ext)) response = self.read_value(
self.eeprom_oid_read_address(oid, label=label))
if self.debug: if self.debug:
print( print(
"EEPROM_DUMP" f"EEPROM_DUMP {label}:\n"
f": ADDRESS: {self.read_eeprom_oid_address(oid, ext)}" f" ADDRESS: "
f". OID: {oid}" f"{self.eeprom_oid_read_address(oid, label=label)}\n"
f". RESPONSE: {response}" f" OID: {oid}={hex(oid)}\n"
f" RESPONSE: {repr(response)}"
) )
response = re.findall(r"EE:[0-9A-F]{6}", response)[0][3:] response = re.findall(r"EE:[0-9A-F]{6}", response)[0][3:]
chk_addr = response[2:4] chk_addr = response[0:4]
value = response[4:6] value = response[4:6]
if int(chk_addr, 16) != oid: if int(chk_addr, 16) != oid:
raise ValueError( raise ValueError(
@ -349,14 +369,29 @@ class EpsonSession(easysnmp.Session):
) )
return value return value
def read_eeprom_many(self, oids: list, ext: str="0"): def read_eeprom_many(
self,
oids: list,
label: str="unknown method"):
"""Read EEPROM data with multiple values.""" """Read EEPROM data with multiple values."""
return [self.read_eeprom(oid, ext) for oid in oids] return [self.read_eeprom(oid, label=label) for oid in oids]
def write_eeprom(self, oid: int, value: int) -> None: def write_eeprom(
self,
oid: int,
value: int,
label: str="unknown method") -> None:
"""Write value to OID with specified type to EEPROM.""" """Write value to OID with specified type to EEPROM."""
oid_string = self.eeprom_oid_write_address(oid, value, label=label)
try: try:
self.get(self.write_eeprom_oid_address(oid, value)) response = self.get(oid_string)
if self.debug:
print(
f"EEPROM_WRITE {label}:\n"
f" ADDRESS: {oid_string}\n"
f" OID: {oid}={hex(oid)}\n"
f" RESPONSE: {repr(response.value)}"
)
except easysnmp.exceptions.EasySNMPTimeoutError as e: except easysnmp.exceptions.EasySNMPTimeoutError as e:
raise TimeoutError(str(e)) raise TimeoutError(str(e))
except Exception as e: except Exception as e:
@ -409,8 +444,8 @@ class EpsonSession(easysnmp.Session):
buf = buf[length:] buf = buf[length:]
if self.debug: if self.debug:
print("Processing status. ftype", hex(ftype), print("Processing status - ftype", hex(ftype),
"length:", length, "item:", item, "buf:", buf) "length:", length, "item:", item.hex(' '))
if ftype == 0x0f: # ink if ftype == 0x0f: # ink
colourlen = item[0] colourlen = item[0]
@ -543,23 +578,27 @@ class EpsonSession(easysnmp.Session):
return "".join( return "".join(
chr(int(value, 16)) chr(int(value, 16))
for value in self.read_eeprom_many( for value in self.read_eeprom_many(
self.printer.parm["serial_number"]) self.printer.parm["serial_number"], label="serial_number")
) )
def get_stats(self) -> str: def get_stats(self, stat_name: str=None) -> str:
"""Return printer statistics.""" """Return printer statistics."""
if "stats" not in self.printer.parm: if "stats" not in self.printer.parm:
return None return None
if stat_name and stat_name in self.printer.parm["stats"].keys():
stat_info = { stat_name: self.printer.parm["stats"][stat_name] }
else:
stat_info = self.printer.parm["stats"]
stats_result = {} stats_result = {}
for stat_name, oids in self.printer.parm["stats"].items(): for stat_name, oids in stat_info.items():
total = 0 total = 0
for val in self.read_eeprom_many(oids[0], oids[1]): for val in self.read_eeprom_many(oids, label=stat_name):
total = (total << 8) + int(val, 16) total = (total << 8) + int(val, 16)
stats_result[stat_name] = total stats_result[stat_name] = total
ftrt = stats_result["First TI received time"] ftrt = stats_result["First TI received time"]
year = 2000 + ftrt // (16 * 32) year = 2000 + ftrt // (16 * 32)
month = (ftrt - (year - 2000) * (16 * 32)) // 32 month = (ftrt - (year - 2000) * (16 * 32)) // 32
day = (ftrt - (year - 2000) * (16 * 32)) // 16 day = ftrt - (year - 2000) * 16 * 32 - 32 * month
stats_result["First TI received time"] = datetime.datetime( stats_result["First TI received time"] = datetime.datetime(
year, month, day).strftime('%d %b %Y') year, month, day).strftime('%d %b %Y')
return stats_result return stats_result
@ -570,8 +609,10 @@ class EpsonSession(easysnmp.Session):
return None return None
if "printer_head_id_f" not in self.printer.parm: if "printer_head_id_f" not in self.printer.parm:
return None return None
a = self.read_eeprom_many(self.printer.parm["printer_head_id_h"]) a = self.read_eeprom_many(
b = self.read_eeprom_many(self.printer.parm["printer_head_id_f"]) self.printer.parm["printer_head_id_h"], label="printer_head_id_h")
b = self.read_eeprom_many(
self.printer.parm["printer_head_id_f"], label="printer_head_id_f")
return(f'{"".join(a)} - {"".join(b)}') return(f'{"".join(a)} - {"".join(b)}')
def get_firmware_version(self) -> str: def get_firmware_version(self) -> str:
@ -602,7 +643,8 @@ class EpsonSession(easysnmp.Session):
if "ink_replacement_counters" not in self.printer.parm: if "ink_replacement_counters" not in self.printer.parm:
return None return None
irc = { irc = {
(color, counter, int(self.read_eeprom(value), 16)) (color, counter, int(self.read_eeprom(
value, label="ink_replacement_counters"), 16))
for color, data in self.printer.parm[ for color, data in self.printer.parm[
"ink_replacement_counters"].items() "ink_replacement_counters"].items()
for counter, value in data.items() for counter, value in data.items()
@ -614,8 +656,11 @@ class EpsonSession(easysnmp.Session):
result = self.read_value(f"{self.printer.eeprom_link}.115.116.1.0.1") result = self.read_value(f"{self.printer.eeprom_link}.115.116.1.0.1")
if not result: if not result:
return None return None
if self.debug: if self.debug:
print("PRINTER_STATUS", bytes([ord(i) for i in result]).hex(" ")) print(textwrap.fill("PRINTER_STATUS: " +
bytes([ord(i) for i in result]).hex(" "),
initial_indent='', subsequent_indent=' ')
)
return self.status_parser(bytes([ord(i) for i in result])) return self.status_parser(bytes([ord(i) for i in result]))
def get_waste_ink_levels(self): def get_waste_ink_levels(self):
@ -624,7 +669,8 @@ class EpsonSession(easysnmp.Session):
return None return None
results = [] results = []
level = self.read_eeprom_many(self.printer.parm["main_waste"]["oids"]) level = self.read_eeprom_many(
self.printer.parm["main_waste"]["oids"], label="main_waste")
level_b10 = int("".join(reversed(level)), 16) level_b10 = int("".join(reversed(level)), 16)
results.append( results.append(
round(level_b10 / self.printer.parm["main_waste"]["divider"], round(level_b10 / self.printer.parm["main_waste"]["divider"],
@ -633,7 +679,9 @@ class EpsonSession(easysnmp.Session):
if "borderless_waste" in self.printer.parm: if "borderless_waste" in self.printer.parm:
level = self.read_eeprom_many( level = self.read_eeprom_many(
self.printer.parm["borderless_waste"]["oids"]) self.printer.parm["borderless_waste"]["oids"],
label="borderless_waste"
)
level_b10 = int("".join(reversed(level)), 16) level_b10 = int("".join(reversed(level)), 16)
results.append(round(level_b10 / self.printer.parm[ results.append(round(level_b10 / self.printer.parm[
"borderless_waste"]["divider"], 2)) "borderless_waste"]["divider"], 2))
@ -644,41 +692,39 @@ class EpsonSession(easysnmp.Session):
"""Return list of last printer fatal errors in hex format.""" """Return list of last printer fatal errors in hex format."""
if "last_printer_fatal_errors" not in self.printer.parm: if "last_printer_fatal_errors" not in self.printer.parm:
return None return None
err = self.read_eeprom_many( return self.read_eeprom_many(
self.printer.parm["last_printer_fatal_errors"]) self.printer.parm["last_printer_fatal_errors"],
if "last_printer_fatal_err_ext" in self.printer.parm: label="last_printer_fatal_errors"
err.extend(self.read_eeprom_many( )
self.printer.parm["last_printer_fatal_err_ext"], "1"))
return err
def dump_eeprom(self, ext: str="0", start: int = 0, end: int = 0xFF): def dump_eeprom(self, start: int = 0, end: int = 0xFF):
""" """
Dump EEPROM data from start to end (less significant byte). Dump EEPROM data from start to end (less significant byte).
ext = most significant byte
""" """
d = {} d = {}
for oid in range(start, end): for oid in range(start, end + 1):
d[oid] = int(self.read_eeprom(oid, ext), 16) d[oid] = int(self.read_eeprom(oid, label="dump_eeprom"), 16)
return d return d
def reset_waste_ink_levels(self) -> None: def reset_waste_ink_levels(self) -> None:
""" """
Set waste ink levels to 0. Set waste ink levels to 0.
""" """
for oid in self.printer.parm["main_waste"]["oids"] + self.printer.parm[ for oid in self.printer.parm["main_waste"]["oids"]:
"borderless_waste"]["oids"]: self.write_eeprom(oid, 0, label="main_waste")
self.write_eeprom(oid, 0) for oid in self.printer.parm["borderless_waste"]["oids"]:
self.write_eeprom(oid, 0, label="borderless_waste")
def write_first_ti_received_time( def write_first_ti_received_time(
self, year: int, month: int, day: int) -> None: self, year: int, month: int, day: int) -> None:
"""Update first TI received time""" """Update first TI received time"""
b = self.printer.parm["stats"]["First TI received time"][0][0] b = self.printer.parm["stats"]["First TI received time"][0]
l = self.printer.parm["stats"]["First TI received time"][0][1] l = self.printer.parm["stats"]["First TI received time"][1]
n = (year - 2000) * 16 * 32 + 32 * month + day n = (year - 2000) * 16 * 32 + 32 * month + day
if self.debug: if self.debug:
print("FTRT:", hex(n // 256), hex(n % 256)) print("FTRT:", hex(n // 256), hex(n % 256), "=", n // 256, n % 256)
self.write_eeprom(b, n // 256) self.write_eeprom(b, n // 256, label="First TI received time")
self.write_eeprom(l, n % 256) self.write_eeprom(l, n % 256, label="First TI received time")
def brute_force_read_key( def brute_force_read_key(
self, minimum: int = 0x00, maximum: int = 0xFF self, minimum: int = 0x00, maximum: int = 0xFF
@ -688,7 +734,7 @@ class EpsonSession(easysnmp.Session):
self.printer.parm['read_key'] = [x, y] self.printer.parm['read_key'] = [x, y]
print(f"Trying {self.printer.parm['read_key']}...") print(f"Trying {self.printer.parm['read_key']}...")
try: try:
self.read_eeprom(0x00) self.read_eeprom(0x00, label="brute_force_read_key")
print(f"read_key found: {self.printer.parm['read_key']}") print(f"read_key found: {self.printer.parm['read_key']}")
return self.printer.parm['read_key'] return self.printer.parm['read_key']
except IndexError: except IndexError:
@ -758,8 +804,8 @@ if __name__ == "__main__":
dest='dump_eeprom', dest='dump_eeprom',
action='store', action='store',
type=int, type=int,
nargs=3, nargs=2,
help='Dump EEPROM (arguments: extension, start, stop)') help='Dump EEPROM (arguments: start, stop)')
parser.add_argument( parser.add_argument(
'--dry-run', '--dry-run',
dest='dry_run', dest='dry_run',
@ -797,14 +843,20 @@ if __name__ == "__main__":
if args.dump_eeprom: if args.dump_eeprom:
print_opt = True print_opt = True
for addr, val in printer.session.dump_eeprom( for addr, val in printer.session.dump_eeprom(
str(args.dump_eeprom[0] % 256), args.dump_eeprom[0] % 256,
args.dump_eeprom[1] % 256, int(args.dump_eeprom[1] % 256)
int(args.dump_eeprom[2] % 256 + 1)
).items(): ).items():
print(f"{str(addr).rjust(3)}: {val:#04x} = {str(val).rjust(3)}") print(f"{str(addr).rjust(3)}: {val:#04x} = {str(val).rjust(3)}")
if args.query: if args.query:
print_opt = True print_opt = True
if args.query[0] in printer.snmp_info.keys(): if ("stats" in printer.parm and
args.query[0] in printer.parm["stats"]):
ret = printer.session.get_stats(args.query[0])
if ret:
pprint(ret)
else:
print("No information returned. Check printer definition.")
elif args.query[0] in printer.snmp_info.keys():
ret = printer.session.get_snmp_info(args.query[0]) ret = printer.session.get_snmp_info(args.query[0])
if ret: if ret:
pprint(ret) pprint(ret)
@ -826,10 +878,23 @@ if __name__ == "__main__":
print("Option error: unavailable query.\n" + print("Option error: unavailable query.\n" +
textwrap.fill( textwrap.fill(
"Available queries: " + "Available queries: " +
", ".join(printer.list_methods) + ", ".join(printer.list_methods),
"\nAvailable SNMP elements: " + initial_indent='', subsequent_indent=' '
) + "\n" +
(
(
textwrap.fill(
"Available statistics: " +
", ".join(printer.parm["stats"].keys()),
initial_indent='', subsequent_indent=' '
) + "\n"
) if "stats" in printer.parm else ""
) +
textwrap.fill(
"Available SNMP elements: " +
", ".join(printer.snmp_info.keys()), ", ".join(printer.snmp_info.keys()),
initial_indent='', subsequent_indent=' ') initial_indent='', subsequent_indent=' '
)
) )
if args.info or not print_opt: if args.info or not print_opt:
ret = printer.stats() ret = printer.stats()