epson_print_conf/epson_print_conf.py
2023-07-31 13:41:37 +02:00

1092 lines
40 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Epson Printer Configuration accessed via SNMP (TCP/IP)
"""
import itertools
import re
from typing import Any
import datetime
import easysnmp # pip3 install easysnmp
import time
import textwrap
import ast
class EpsonPrinter:
"""SNMP Epson Printer Configuration."""
PRINTER_CONFIG = { # Known Epson models
"XP-205": {
"alias": ["XP-200", "XP-207"],
"read_key": [25, 7],
"write_key": b'Wakatobi',
"main_waste": {"oids": [24, 25, 30], "divider": 73.5},
"borderless_waste": {"oids": [26, 27, 34], "divider": 34.34},
"serial_number": range(192, 202),
"printer_head_id_h": range(122, 127),
"printer_head_id_f": [136, 137, 138, 129],
"stats": {
"Manual cleaning counter": [147],
"Timer cleaning counter": [149],
"Ink replacement cleaning counter": [148],
"Total print pass counter": [171, 170, 169, 168],
"Total print page counter": [167, 166, 165, 164],
"Total scan counter": [0x01d7, 0x01d6, 0x01d5, 0x01d4],
"First TI received time": [173, 172],
"Maintenance required level of 1st waste ink counter": [46],
"Maintenance required level of 2nd waste ink counter": [47],
},
"raw_waste_reset": {
24: 0, 25: 0, 30: 0, # Data of 1st counter
28: 0, 29: 0, # another store of 1st counter
46: 94, # Maintenance required level of 1st counter
26: 0, 27: 0, 34: 0, # Data of 2nd counter
47: 94, # Maintenance required level of 2st counter
49: 0 # ?
},
"ink_replacement_counters": {
"Black": {"1B": 242, "1S": 208, "1L": 209},
"Yellow": {"1B": 248, "1S": 246, "1L": 247},
"Magenta": {"1B": 251, "1S": 249, "1L": 250},
"Cyan": {"1B": 245, "1S": 243, "1L": 244},
},
"last_printer_fatal_errors": [60, 203, 204, 205, 206, 0x01d3],
},
"Stylus Photo PX730WD": {
"alias": ["Epson Artisan 730"],
"read_key": [0x8, 0x77],
"write_key": b'Cattleya',
"main_waste": {"oids": [0xe, 0xf], "divider": 81.82},
"borderless_waste": {"oids": [0x10, 0x11], "divider": 122.88},
"stats": {
"Manual cleaning counter": [0x7e],
"Timer cleaning counter": [0x61],
"Total print pass counter": [0x2C, 0x2D, 0x2E, 0x2F],
"Total print page counter": [0x9E, 0x9F],
"Total print page counter (duplex)": [0xA0, 0xA1],
"Total print CD-R counter": [0x4A, 0x4B],
"Total print CD-R tray open/close counter": [0xA2, 0xA3],
"Total scan counter": [0x01DA, 0x01DB, 0x01DC, 0x01DD],
},
"last_printer_fatal_errors": [0x3B, 0xC0, 0xC1, 0xC2, 0xC3, 0x5C],
"ink_replacement_counters": {
"Black": {"1S": 0x66, "2S": 0x67, "3S": 0x62},
"Yellow": {"1S": 0x70, "2S": 0x71, "3S": 0xAB},
"Magenta": {"1S": 0x68, "2S": 0x69, "3S": 0x63},
"Cyan": {"1S": 0x6C, "2S": 0x6D, "3S": 0x65},
"Light magenta": {"1S": 0x6A, "2S": 0x6B, "3S": 0x64},
"Light cyan": {"1S": 0x6E, "2S": 0x6F, "3S": 0x9B},
},
"serial_number": range(0xE7, 0xF0),
# untested
},
"WF-7525": {
"read_key": [101, 0],
"write_key": b'Sasanqua',
"main_waste": {"oids": [20, 21], "divider": 196.5},
"borderless_waste": {"oids": [22, 23], "divider": 52.05},
"serial_number": range(192, 202),
"stats": {
"Maintenance required level of 1st waste ink counter": [60],
"Maintenance required level of 2nd waste ink counter": [61],
},
"raw_waste_reset": {
20: 0, 21: 0, 22: 0, 23: 0, 24: 0, 25: 0, 59: 0, 60: 94, 61: 94
}
# to be completed
},
"L355": {
"read_key": [65, 9],
# to be completed
},
"L3160": {
"read_key": [151, 7],
"write_key": b'Maribaya',
"stats": {
"Maintenance required level of 1st waste ink counter": [54],
"Maintenance required level of 2nd waste ink counter": [55],
},
"raw_waste_reset": {
48: 0, 49: 0, 47: 0, 52: 0, 53: 0,
54: 94, 50: 0, 51: 0, 55: 94, 28: 0
}
# to be completed
},
"L4160": {
"read_key": [73, 8],
"write_key": b'Arantifo',
"stats": {
"Maintenance required level of 1st waste ink counter": [54],
"Maintenance required level of 2nd waste ink counter": [55],
},
"raw_waste_reset": {
48: 0, 49: 0, 47: 0, 52: 0, 53: 0,
54: 94, 50: 0, 51: 0, 55: 94, 28: 0
}
# to be completed
},
"XP-315": {
"read_key": [129, 8],
"write_key": b'Wakatobi',
"main_waste": {"oids": [24, 25, 30], "divider": 196.5},
"borderless_waste": {"oids": [26, 27, 34], "divider": 52.05},
"stats": {
"Maintenance required level of 1st waste ink counter": [46],
"Maintenance required level of 2nd waste ink counter": [47],
},
"raw_waste_reset": {
24: 0, 25: 0, 30: 0, 28: 0, 29: 0,
46: 94, 26: 0, 27: 0, 34: 0, 47: 94, 49: 0
}
# to be completed
},
"XP-422": {
"read_key": [85, 5],
"write_key": b'Muscari.',
# to be completed
"main_waste": {"oids": [24, 25, 30], "divider": 196.5},
"borderless_waste": {"oids": [26, 27, 34], "divider": 52.05},
"stats": {
"Maintenance required level of 1st waste ink counter": [46],
"Maintenance required level of 2nd waste ink counter": [47],
},
"raw_waste_reset": {
24: 0, 25: 0, 26: 0, 27: 0, 28: 0,
29: 0, 30: 0, 34: 0, 46: 94, 47: 94, 49: 0
}
},
"XP-435": {
"read_key": [133, 5],
"write_key": b'Polyxena',
# to be completed
},
"XP-540": {
"read_key": [20, 4],
"write_key": b'Firmiana',
"main_waste": {"oids": [0x10, 0x11], "divider": 84.5}, # To be changed
"borderless_waste": {"oids": [0x12, 0x13], "divider": 33.7}, # To be changed
# to be completed
},
"XP-610": {
"read_key": [121, 4],
"write_key": b'Gossypiu',
"alias": ["XP-611", "XP-615", "XP-510"],
"main_waste": {"oids": [16, 17], "divider": 84.5}, # divider to be changed
"borderless_waste": {"oids": [18, 19], "divider": 33.7}, # divider to be changed
# to be completed
},
"XP-620": {
"read_key": [57, 5],
"write_key": b'Althaea.',
# to be completed
},
"XP-700": {
"read_key": [40, 0],
# to be completed
},
"XP-760": {
"read_key": [87, 5],
# to be completed
},
"XP-830": {
"read_key": [40, 9],
"write_key": b'Irisgarm', # (Iris graminea with typo?)
"alias": ["XP-530", "XP-630", "XP-635"],
"main_waste": {"oids": [0x10, 0x11], "divider": 84.5}, # To be changed
"borderless_waste": {"oids": [0x12, 0x13], "divider": 33.7}, # To be changed
"idProduct": 0x110b,
# to be completed
},
"XP-850": {
"read_key": [40, 0],
"write_key": b'Hibiscus',
"main_waste": {"oids": [16, 17], "divider": 84.5}, # divider to be changed
"borderless_waste": {"oids": [18, 19], "divider": 33.7}, # divider to be changed
# to be completed
},
"XP-7100": {
"read_key": [40, 5],
"write_key": b'Leucojum',
"main_waste": {"oids": [0x10, 0x11], "divider": 84.5}, # To be changed
"borderless_waste": {"oids": [0x12, 0x13], "divider": 33.7}, # To be changed
# to be completed
},
"ET-2500": {
"read_key": [68, 1],
"write_key": b'Gerbera*', # (Iris graminea with typo?)
"stats": {
"Maintenance required level of waste ink counter": [46],
},
"raw_waste_reset": {24: 0, 25: 0, 30: 0, 28: 0, 29: 0, 46: 94}
# to be completed
},
}
snmp_info = {
"Model": "1.3.6.1.2.1.25.3.2.1.3.1",
"Model short": "1.3.6.1.4.1.1248.1.1.3.1.3.8.0",
"EEPS2 firmware version": "1.3.6.1.2.1.2.2.1.2.1",
"Descr": "1.3.6.1.2.1.1.1.0",
"UpTime": "1.3.6.1.2.1.1.3.0",
"Name": "1.3.6.1.2.1.1.5.0",
"MAC Address": "1.3.6.1.2.1.2.2.1.6.1",
"Print input": "1.3.6.1.2.1.43.8.2.1.13.1.1",
"Lang 1": "1.3.6.1.2.1.43.15.1.1.3.1.1",
"Lang 2": "1.3.6.1.2.1.43.15.1.1.3.1.2",
"Lang 3": "1.3.6.1.2.1.43.15.1.1.3.1.3",
"Lang 4": "1.3.6.1.2.1.43.15.1.1.3.1.4",
"Lang 5": "1.3.6.1.2.1.43.15.1.1.3.1.5",
"Emulation 1": "1.3.6.1.2.1.43.15.1.1.5.1.1",
"Emulation 2": "1.3.6.1.2.1.43.15.1.1.5.1.2",
"Emulation 3": "1.3.6.1.2.1.43.15.1.1.5.1.3",
"Emulation 4": "1.3.6.1.2.1.43.15.1.1.5.1.4",
"Emulation 5": "1.3.6.1.2.1.43.15.1.1.5.1.5",
"Print counter": "1.3.6.1.2.1.43.10.2.1.4.1.1",
}
SNMP_OID_ENTERPRISE = "1.3.6.1.4.1"
SNMP_EPSON = "1248"
OID_PRV_CTRL = "1.2.2.44.1.1.2"
eeprom_link: str = f'{SNMP_OID_ENTERPRISE}.{SNMP_EPSON}.{OID_PRV_CTRL}.1'
session: object
printer_model: str
hostname: str
parm: dict
def __init__(
self,
printer_model:
str, hostname: str,
debug: bool = False,
dry_run: bool = False) -> None:
"""Initialise printer model."""
for printer_name, printer_data in self.PRINTER_CONFIG.copy().items():
if "alias" in printer_data:
aliases = printer_data["alias"]
del printer_data["alias"]
for alias_name in aliases:
if alias_name in self.PRINTER_CONFIG:
self.PRINTER_CONFIG[alias_name] = {
**printer_data, **self.PRINTER_CONFIG[alias_name]
}
else:
self.PRINTER_CONFIG[alias_name] = printer_data
self.printer_model = printer_model
self.hostname = hostname
self.debug = debug
self.dry_run = dry_run
if self.printer_model in self.valid_printers:
self.parm = self.PRINTER_CONFIG[self.printer_model]
else:
self.parm = None
self.session = EpsonSession(printer=self, debug=debug, dry_run=dry_run)
@property
def valid_printers(self):
"""Return list of valid printers."""
return {
printer_name
for printer_name in self.PRINTER_CONFIG.keys()
if "read_key" in self.PRINTER_CONFIG[printer_name]
}
@property
def list_methods(self):
"""Return list of available information methods about the printer."""
return(filter(lambda x: x.startswith("get_") and x not in dir(
easysnmp.Session), dir(self.session)))
def stats(self):
"""Return all available information about the printer."""
stat_set = {}
for method in self.list_methods:
ret = self.session.__getattribute__(method)()
if ret:
stat_set[method[4:]] = ret
else:
if self.debug:
print(f"No value for method '{method}'.")
return stat_set
def caesar(self, key):
return ".".join(str(b + 1) for b in key)
class EpsonSession(easysnmp.Session):
"""SNMP session wrapper for Epson printer."""
def __init__(
self,
printer: EpsonPrinter,
community: str = "public",
version: int = 1,
debug: bool = False,
dry_run: bool = False
) -> None:
"""Initialise session."""
self.printer = printer
self.debug = debug
self.dry_run = dry_run
super().__init__(
hostname=self.printer.hostname, community=community, version=version
)
def eeprom_oid_read_address(
self,
oid: int,
msb: int = 0,
label: str = "unknown method") -> str:
"""Return address for reading from EEPROM for specified OID."""
if oid > 255:
msb = oid // 256
oid = oid % 256
if 'read_key' not in self.printer.parm:
return None
return (
f"{self.printer.eeprom_link}"
".124.124" # || (0x7C 0x7C)
".7.0" # read
f".{self.printer.parm['read_key'][0]}"
f".{self.printer.parm['read_key'][1]}"
".65.190.160"
f".{oid}.{msb}"
)
def eeprom_oid_write_address(
self,
oid: int,
value: Any,
msb: int = 0,
label: str = "unknown method") -> str:
"""Return address for writing to EEPROM for specified OID."""
if oid > 255:
msb = oid // 256
oid = oid % 256
if ('write_key' not in self.printer.parm
or 'read_key' not in self.printer.parm):
return None
write_op = (
f"{self.printer.eeprom_link}"
".124.124" # || (0x7C 0x7C)
".16.0" # write
f".{self.printer.parm['read_key'][0]}"
f".{self.printer.parm['read_key'][1]}"
".66.189.33"
f".{oid}.{msb}.{value}"
f".{self.printer.caesar(self.printer.parm['write_key'])}"
)
if self.dry_run:
print("WRITE_DRY_RUN:", write_op)
return self.eeprom_oid_read_address(oid, label=label)
else:
return write_op
def read_value(self, oids: str):
"""Return value of OIDs."""
try:
value = self.get(oids).value
except easysnmp.exceptions.EasySNMPTimeoutError as e:
raise TimeoutError(str(e))
except Exception as e:
raise ValueError(str(e))
return value
def read_eeprom(
self,
oid: int,
label: str = "unknown method") -> str:
"""Read EEPROM data."""
if self.debug:
print(
f"EEPROM_DUMP {label}:\n"
f" ADDRESS: "
f"{self.eeprom_oid_read_address(oid, label=label)}\n"
f" OID: {oid}={hex(oid)}"
)
response = self.read_value(
self.eeprom_oid_read_address(oid, label=label))
if self.debug:
print(f" RESPONSE: {repr(response)}")
try:
response = re.findall(r"EE:[0-9A-F]{6}", response)[0][3:]
except IndexError:
if self.debug:
print(f"Invalid read key.")
return None
chk_addr = response[0:4]
value = response[4:6]
if int(chk_addr, 16) != oid:
raise ValueError(
f"Address and response address are"
f" not equal: {oid} != {chk_addr}"
)
return value
def read_eeprom_many(
self,
oids: list,
label: str = "unknown method"):
"""Read EEPROM data with multiple values."""
return [self.read_eeprom(oid, label=label) for oid in oids]
def write_eeprom(
self,
oid: int,
value: int,
label: str = "unknown method") -> None:
"""Write value to OID with specified type to EEPROM."""
if "write_key" not in self.printer.parm:
if self.debug:
print(f"Missing 'write_key' parameter in configuration.")
return False
if not self.dry_run and self.debug:
response = self.read_eeprom(oid, label=label)
print(f"Previous value for {label}: {response}")
oid_string = self.eeprom_oid_write_address(oid, value, label=label)
response = None
try:
response = self.get(oid_string)
except easysnmp.exceptions.EasySNMPTimeoutError as e:
if not self.dry_run:
raise TimeoutError(str(e))
except Exception as e:
raise ValueError(str(e))
if self.debug:
print(
f"EEPROM_WRITE {label}:\n"
f" ADDRESS: {oid_string}\n"
f" OID: {oid}={hex(oid)}"
)
if self.debug and response:
print(f" RESPONSE: {repr(response.value)}")
if response and not ":OK;" in repr(response.value): # ":NA;" is an error
return False
return True
def status_parser(self, data):
"""Parse an ST2 status response and decode as much as possible."""
colour_ids = {
0x01: 'Black',
0x03: 'Cyan',
0x04: 'Magenta',
0x05: 'Yellow',
0x06: 'Light Cyan',
0x07: 'Light Magenta',
0x0a: 'Light Black',
0x0b: 'Matte Black',
0x0f: 'Light Light Black',
0x10: 'Orange',
0x11: 'Green',
}
status_ids = {
0: 'Error',
1: 'Self Printing',
2: 'Busy',
3: 'Waiting',
4: 'Idle',
5: 'Paused',
7: 'Cleaning',
15: 'Nozzle Check',
}
if len(data) < 16:
return "invalid packet"
if data[:11] != b'\x00@BDC ST2\r\n':
return "printer status error"
len_p = int.from_bytes(data[11:13], byteorder='little')
if len(data) - 13 != len_p:
return "message error"
buf = data[13:]
data_set = {}
while len(buf):
if len(buf) < 3:
return "invalid element"
(ftype, length) = buf[:2]
buf = buf[2:]
item = buf[:length]
if len(item) != length:
return "invalid element length"
buf = buf[length:]
if self.debug:
print(
"Processing status - ftype", hex(ftype),
"length:", length, "item:", item.hex(' ')
)
if ftype == 0x0f: # ink
colourlen = item[0]
offset = 1
inks = []
while offset < length:
colour = item[offset]
level = item[offset+2]
offset += colourlen
if colour in colour_ids:
name = colour_ids[colour]
else:
name = "0x%X" % colour
inks.append((colour, level, name))
data_set["ink_level"] = inks
elif ftype == 0x0d: # maintenance tanks
(tank1, tank2) = item[0:2]
data_set["tanks"] = (tank1, tank2)
elif ftype == 0x19: # current job name
data_set["jobname"] = item
if item == b'\x00\x00\x00\x00\x00unknown':
data_set["jobname"] = "Not defined"
elif ftype == 0x1f: # serial
data_set["serial"] = str(item)
elif ftype == 0x01: # status
printer_status = item[0]
status_text = "unknown"
if printer_status in status_ids:
status_text = status_ids[printer_status]
else:
status_text = 'unknown: %d' % printer_status
if printer_status == 3 or printer_status == 4:
data_set["ready"] = True
else:
data_set["ready"] = False
data_set["status"] = (printer_status, status_text)
elif ftype == 0x02: # errcode
data_set["errcode"] = item
elif ftype == 0x03: # Self print code
data_set["self_print_code"] = item
elif ftype == 0x04: # warning
data_set["warning_code"] = item
elif ftype == 0x06: # Paper path
data_set["paper_path"] = item
if item == b'\x01\xff':
data_set["paper_path"] = "Cut sheet (Rear)"
elif ftype == 0x0e: # Replace cartridge information
data_set["replace_cartridge"] = "{:08b}".format(item[0])
elif ftype == 0x10: # Loading path information
data_set["loading_path"] = item.hex().upper()
if data_set["loading_path"] == "01094E":
data_set["loading_path"] = "fixed"
elif ftype == 0x13: # Cancel code
data_set["cancel_code"] = item
if item == b'\x01':
data_set["cancel_code"] = "No request"
if item == b'\xA1':
data_set["cancel_code"] = (
"Received cancel command and printer initialization"
)
if item == b'\x81':
data_set["cancel_code"] = "Request"
elif ftype == 0x37: # Maintenance box information
i = 1
for j in range(item[0]):
if item[i] == 0:
data_set[f"maintenance_box_{j}"] = (
f"not full ({item[i + 1]})"
)
elif item[i] == 1:
data_set[f"maintenance_box_{j}"] = (
f"near full ({item[i + 1]})"
)
elif item[i] == 2:
data_set[f"maintenance_box_{j}"] = (
f"full ({item[i + 1]})"
)
else:
data_set[f"maintenance_box_{j}"] = (
f"unknown ({item[i + 1]})"
)
i += 2
else: # unknown stuff
if "unknown" not in data_set:
data_set["unknown"] = []
data_set["unknown"].append((hex(ftype), item))
return data_set
def get_snmp_info(self, mib_name: str = None) -> str:
"""Return general SNMP information of printer."""
sys_info = {}
if mib_name and mib_name in self.printer.snmp_info.keys():
snmp_info = {mib_name: self.printer.snmp_info[mib_name]}
else:
snmp_info = self.printer.snmp_info
for name, oid in snmp_info.items():
try:
sys_info[name] = self.read_value(oid)
except Exception:
if self.debug:
print(f"No value for SNMP OID '{name}'.")
if "UpTime" in sys_info:
sys_info["UpTime"] = time.strftime(
'%H:%M:%S', time.gmtime(int(sys_info["UpTime"])/100))
if "MAC Address" in sys_info:
sys_info["MAC Address"] = bytes(
[ord(i) for i in sys_info["MAC Address"]]).hex("-").upper()
return sys_info
def get_serial_number(self) -> str:
"""Return serial number of printer."""
if "serial_number" not in self.printer.parm:
return None
return "".join(
chr(int(value, 16))
for value in self.read_eeprom_many(
self.printer.parm["serial_number"], label="serial_number")
)
def get_stats(self, stat_name: str = None) -> str:
"""Return printer statistics."""
if "stats" not in self.printer.parm:
return None
if stat_name and stat_name in self.printer.parm["stats"].keys():
stat_info = {stat_name: self.printer.parm["stats"][stat_name]}
else:
stat_info = self.printer.parm["stats"]
stats_result = {}
for stat_name, oids in stat_info.items():
total = 0
for val in self.read_eeprom_many(oids, label=stat_name):
total = (total << 8) + int(val, 16)
stats_result[stat_name] = total
ftrt = stats_result["First TI received time"]
year = 2000 + ftrt // (16 * 32)
month = (ftrt - (year - 2000) * (16 * 32)) // 32
day = ftrt - (year - 2000) * 16 * 32 - 32 * month
stats_result["First TI received time"] = datetime.datetime(
year, month, day).strftime('%d %b %Y')
return stats_result
def get_printer_head_id(self) -> str: # to be revised
"""Return printer head id."""
if "printer_head_id_h" not in self.printer.parm:
return None
if "printer_head_id_f" not in self.printer.parm:
return None
a = self.read_eeprom_many(
self.printer.parm["printer_head_id_h"], label="printer_head_id_h")
b = self.read_eeprom_many(
self.printer.parm["printer_head_id_f"], label="printer_head_id_f")
return(f'{"".join(a)} - {"".join(b)}')
def get_firmware_version(self) -> str:
"""Return firmware version."""
firmware_string = self.read_value(
f"{self.printer.eeprom_link}.118.105.1.0.0")
if not firmware_string:
return None
firmware = re.sub(r".*vi:00:(.{6}).*", r'\g<1>', firmware_string)
year = ord(firmware[4:5]) + 1945
month = int(firmware[5:], 16)
day = int(firmware[2:4])
return firmware + " " + datetime.datetime(
year, month, day).strftime('%d %b %Y')
def get_cartridges(self) -> str:
"""Return list of cartridge types."""
cartridges_string = self.read_value(
f"{self.printer.eeprom_link}.105.97.1.0.0")
if not cartridges_string:
return None
cartridges = re.sub(
r".*IA:00;(.*);.*", r'\g<1>', cartridges_string, flags=re.S)
return [i.strip() for i in cartridges.split(',')]
def get_ink_replacement_counters(self) -> str:
"""Return list of ink replacement counters."""
if "ink_replacement_counters" not in self.printer.parm:
return None
irc = {
(
color,
counter,
int(self.read_eeprom(value, label="ink_replacement_counters"),
16),
)
for color, data in self.printer.parm[
"ink_replacement_counters"].items()
for counter, value in data.items()
}
return irc
def get_printer_status(self):
"""Return printer status and ink levels."""
result = self.read_value(f"{self.printer.eeprom_link}.115.116.1.0.1")
if not result:
return None
if self.debug:
print(
textwrap.fill(
"PRINTER_STATUS: " + bytes(
[ord(i) for i in result]).hex(" "),
initial_indent="",
subsequent_indent=" ",
)
)
return self.status_parser(bytes([ord(i) for i in result]))
def get_waste_ink_levels(self):
"""Return waste ink levels as a percentage."""
if "main_waste" not in self.printer.parm:
return None
results = []
level = self.read_eeprom_many(
self.printer.parm["main_waste"]["oids"], label="main_waste")
level_b10 = int("".join(reversed(level)), 16)
results.append(
round(level_b10 / self.printer.parm["main_waste"]["divider"], 2)
)
if "borderless_waste" in self.printer.parm:
level = self.read_eeprom_many(
self.printer.parm["borderless_waste"]["oids"],
label="borderless_waste"
)
level_b10 = int("".join(reversed(level)), 16)
results.append(round(level_b10 / self.printer.parm[
"borderless_waste"]["divider"], 2))
return results
def get_last_printer_fatal_errors(self) -> str:
"""Return list of last printer fatal errors in hex format."""
if "last_printer_fatal_errors" not in self.printer.parm:
return None
return self.read_eeprom_many(
self.printer.parm["last_printer_fatal_errors"],
label="last_printer_fatal_errors"
)
def dump_eeprom(self, start: int = 0, end: int = 0xFF):
"""
Dump EEPROM data from start to end (less significant byte).
"""
d = {}
for oid in range(start, end + 1):
d[oid] = int(self.read_eeprom(oid, label="dump_eeprom"), 16)
return d
def reset_waste_ink_levels(self) -> bool:
"""
Set waste ink levels to 0.
"""
if "raw_waste_reset" in self.printer.parm:
for oid, value in self.printer.parm["raw_waste_reset"].items():
if not self.write_eeprom(oid, value, label="raw_waste_reset"):
return False
return True
if "main_waste" not in self.printer.parm:
return None
for oid in self.printer.parm["main_waste"]["oids"]:
if not self.write_eeprom(oid, 0, label="main_waste"):
return False
if "borderless_waste" not in self.printer.parm:
return True
for oid in self.printer.parm["borderless_waste"]["oids"]:
if not self.write_eeprom(oid, 0, label="borderless_waste"):
return False
return True
def write_first_ti_received_time(
self, year: int, month: int, day: int) -> bool:
"""Update first TI received time"""
try:
msb = self.printer.parm["stats"]["First TI received time"][0]
lsb = self.printer.parm["stats"]["First TI received time"][1]
except KeyError:
return False
n = (year - 2000) * 16 * 32 + 32 * month + day
if self.debug:
print("FTRT:", hex(n // 256), hex(n % 256), "=", n // 256, n % 256)
if not self.write_eeprom(msb, n // 256, label="First TI received time"):
return False
if not self.write_eeprom(lsb, n % 256, label="First TI received time"):
return False
return True
def detect_write_key(self, debug=False):
for model, chars in self.printer.PRINTER_CONFIG.items():
if 'write_key' in chars:
print(model, chars['write_key'])
def brute_force_read_key(
self, minimum: int = 0x00, maximum: int = 0xFF, debug=False
):
"""Brute force read_key for printer."""
for x, y in itertools.permutations(range(minimum, maximum), r=2):
self.printer.parm['read_key'] = [x, y]
if debug:
print(f"Trying {self.printer.parm['read_key']}...")
try:
self.read_eeprom(0x00, label="brute_force_read_key")
return self.printer.parm['read_key']
except IndexError:
continue
except KeyboardInterrupt:
return None
return None
def write_sequence_to_string(self, write_sequence):
int_sequence = [int(b) for b in write_sequence[0].split(".")]
return "".join([chr(b-1) for b in int_sequence])
if __name__ == "__main__":
import argparse
from pprint import pprint
parser = argparse.ArgumentParser(
epilog='Epson Printer Configuration accessed via SNMP (TCP/IP)')
parser.add_argument(
'-m',
'--model',
dest='model',
action="store",
help='Printer model. Example: -m XP-205'
' (use ? to print all supported models)',
required=True)
parser.add_argument(
'-a',
'--address',
dest='hostname',
action="store",
help='Printer host name or IP address. (Example: -m 192.168.1.87)',
required=True)
parser.add_argument(
'-i',
'--info',
dest='info',
action='store_true',
help='Print all available information and statistics (default option)')
parser.add_argument(
'-q',
'--query',
dest='query',
action='store',
type=str,
nargs=1,
help='Print specific information.'
' (Use ? to list all available queries)')
parser.add_argument(
'--reset_waste_ink',
dest='reset_waste_ink',
action='store_true',
help='Reset all waste ink levels to 0')
parser.add_argument(
"--detect-key",
dest='detect_key',
action='store_true',
help="Detect the read_key via brute force")
parser.add_argument(
'-d',
'--debug',
dest='debug',
action='store_true',
help='Print debug information')
parser.add_argument(
'-e',
'--eeprom-dump',
dest='dump_eeprom',
action='store',
type=int,
nargs=2,
help='Dump EEPROM (arguments: start, stop)')
parser.add_argument(
'--dry-run',
dest='dry_run',
action='store_true',
help='Dry-run change operations')
parser.add_argument(
'--write-first-ti-received-time',
dest='ftrt',
type=int,
help='Change the first TI received time (arguments: year, month, day)',
nargs=3,
)
parser.add_argument(
'-R',
'--read-eeprom',
dest='read_eeprom',
action='store',
type=str,
nargs=1,
help='Read the values of a list of printer EEPROM addreses.'
' Format is: address [, ...]')
parser.add_argument(
'-W',
'--write-eeprom',
dest='write_eeprom',
action='store',
type=str,
nargs=1,
help='Write related values to a list of printer EEPROM addresses.'
' Format is: address: value [, ...]')
parser.add_argument(
'-S',
'--write-sequence-to-string',
dest='ws_to_string',
action='store',
type=str,
nargs=1,
help='Convert write sequence of numbers to string.'
)
args = parser.parse_args()
printer = EpsonPrinter(
args.model, args.hostname, debug=args.debug, dry_run=args.dry_run)
if not printer.parm:
print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join(
printer.valid_printers),
initial_indent='', subsequent_indent=' ')
)
quit(1)
print_opt = False
try:
if args.ws_to_string:
print_opt = True
print(printer.session.write_sequence_to_string(args.ws_to_string))
if args.reset_waste_ink:
print_opt = True
if printer.session.reset_waste_ink_levels():
print("Reset waste ink levels done.")
else:
print("Failed to reset waste ink levels. Check configuration.")
if args.detect_key:
print_opt = True
read_key = printer.session.brute_force_read_key(debug=True)
if read_key:
print(f"read_key found: {read_key}")
else:
print(f"Cannot found read_key")
if args.ftrt:
print_opt = True
if printer.session.write_first_ti_received_time(
int(args.ftrt[0]), int(args.ftrt[1]), int(args.ftrt[2])):
print("Write first TI received time done.")
else:
print(
"Failed to write first TI received time."
" Check configuration."
)
if args.dump_eeprom:
print_opt = True
for addr, val in printer.session.dump_eeprom(
args.dump_eeprom[0] % 256,
int(args.dump_eeprom[1] % 256)
).items():
print(f"{str(addr).rjust(3)}: {val:#04x} = {str(val).rjust(3)}")
if args.query:
print_opt = True
if ("stats" in printer.parm and
args.query[0] in printer.parm["stats"]):
ret = printer.session.get_stats(args.query[0])
if ret:
pprint(ret)
else:
print("No information returned. Check printer definition.")
elif args.query[0] in printer.snmp_info.keys():
ret = printer.session.get_snmp_info(args.query[0])
if ret:
pprint(ret)
else:
print("No information returned. Check printer definition.")
else:
if args.query[0].startswith("get_"):
method = args.query[0]
else:
method = "get_" + args.query[0]
if method in printer.list_methods:
ret = printer.session.__getattribute__(method)()
if ret:
pprint(ret)
else:
print(
"No information returned."
" Check printer definition."
)
else:
print(
"Option error: unavailable query.\n" +
textwrap.fill(
"Available queries: " +
", ".join(printer.list_methods),
initial_indent='', subsequent_indent=' '
) + "\n" +
(
(
textwrap.fill(
"Available statistics: " +
", ".join(printer.parm["stats"].keys()),
initial_indent='', subsequent_indent=' '
) + "\n"
) if "stats" in printer.parm else ""
) +
textwrap.fill(
"Available SNMP elements: " +
", ".join(printer.snmp_info.keys()),
initial_indent='', subsequent_indent=' '
)
)
if args.read_eeprom:
print_opt = True
read_list = re.split(',\s*', args.read_eeprom[0])
for value in read_list:
try:
val = printer.session.read_eeprom(
ast.literal_eval(value), label='read_eeprom')
if val is None:
print("EEPROM read error.")
else:
print(f"0x{val}={int(val, 16)}")
except (ValueError, SyntaxError):
print("invalid argument for read_eeprom")
quit(1)
if args.write_eeprom:
print_opt = True
read_list = re.split(',\s*|;\s*|\|\s*', args.write_eeprom[0])
for key_val in read_list:
key, val = re.split(':|=', key_val)
try:
val_int = ast.literal_eval(val)
if not printer.session.write_eeprom(
ast.literal_eval(key),
str(val_int), label='write_eeprom'
):
print("invalid write operation")
quit(1)
except (ValueError, SyntaxError):
print("invalid argument for write_eeprom")
quit(1)
if args.info or not print_opt:
ret = printer.stats()
if ret:
pprint(ret)
else:
print("No information returned. Check printer definition.")
except TimeoutError as e:
print(f"Timeout error: {str(e)}")
except ValueError as e:
raise(f"Generic error: {str(e)}")
except KeyboardInterrupt:
quit(2)