mirror of
https://github.com/Ircama/epson_print_conf.git
synced 2024-10-18 01:00:36 -04:00
Refinements
This commit is contained in:
parent
09c67ac7c7
commit
2d6d498b41
2 changed files with 245 additions and 27 deletions
23
README.md
23
README.md
|
@ -35,19 +35,23 @@ It is tested with Ubuntu / Windows Subsystem for Linux, Windows.
|
|||
## Usage
|
||||
|
||||
```
|
||||
usage: epson_print_conf.py [-h] -m MODEL -a HOSTNAME [-i] [-q QUERY] [--reset_waste_ink] [--detect-key] [-d]
|
||||
[-e DUMP_EEPROM DUMP_EEPROM] [--dry-run] [--write-first-ti-received-time FTRT FTRT FTRT]
|
||||
[-R READ_EEPROM] [-W WRITE_EEPROM] [-S WS_TO_STRING] [-t TIMEOUT] [-r RETRIES]
|
||||
usage: epson_print_conf.py [-h] -m MODEL -a HOSTNAME [-i] [-q QUERY] [--reset_waste_ink]
|
||||
[--detect-key] [-d] [-e DUMP_EEPROM DUMP_EEPROM] [--dry-run]
|
||||
[--write-first-ti-received-time FTRT FTRT FTRT]
|
||||
[-R READ_EEPROM] [-W WRITE_EEPROM] [-S WS_TO_STRING]
|
||||
[-t TIMEOUT] [-r RETRIES] [-c CONFIG_FILE]
|
||||
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
-m MODEL, --model MODEL
|
||||
Printer model. Example: -m XP-205 (use ? to print all supported models)
|
||||
Printer model. Example: -m XP-205 (use ? to print all supported
|
||||
models)
|
||||
-a HOSTNAME, --address HOSTNAME
|
||||
Printer host name or IP address. (Example: -m 192.168.1.87)
|
||||
-i, --info Print all available information and statistics (default option)
|
||||
-q QUERY, --query QUERY
|
||||
Print specific information. (Use ? to list all available queries)
|
||||
Print specific information. (Use ? to list all available
|
||||
queries)
|
||||
--reset_waste_ink Reset all waste ink levels to 0
|
||||
--detect-key Detect the read_key via brute force
|
||||
-d, --debug Print debug information
|
||||
|
@ -57,15 +61,20 @@ optional arguments:
|
|||
--write-first-ti-received-time FTRT FTRT FTRT
|
||||
Change the first TI received time (arguments: year, month, day)
|
||||
-R READ_EEPROM, --read-eeprom READ_EEPROM
|
||||
Read the values of a list of printer EEPROM addreses. Format is: address [, ...]
|
||||
Read the values of a list of printer EEPROM addreses. Format is:
|
||||
address [, ...]
|
||||
-W WRITE_EEPROM, --write-eeprom WRITE_EEPROM
|
||||
Write related values to a list of printer EEPROM addresses. Format is: address: value [, ...]
|
||||
Write related values to a list of printer EEPROM addresses.
|
||||
Format is: address: value [, ...]
|
||||
-S WS_TO_STRING, --write-sequence-to-string WS_TO_STRING
|
||||
Convert write sequence of numbers to string.
|
||||
-t TIMEOUT, --timeout TIMEOUT
|
||||
SNMP GET timeout (floating point argument)
|
||||
-r RETRIES, --retries RETRIES
|
||||
SNMP GET retries (floating point argument)
|
||||
-c CONFIG_FILE, --config CONFIG_FILE
|
||||
read a configuration file including the full log dump of a
|
||||
previous operation instead of accessing the printer via SNMP
|
||||
|
||||
Epson Printer Configuration via SNMP (TCP/IP)
|
||||
```
|
||||
|
|
|
@ -320,6 +320,7 @@ class EpsonPrinter:
|
|||
printer_model: str
|
||||
hostname: str
|
||||
parm: dict
|
||||
mib_dict: dict = {}
|
||||
|
||||
ink_color_ids = { # Ink color
|
||||
0x00: 'Black',
|
||||
|
@ -459,8 +460,18 @@ class EpsonPrinter:
|
|||
else:
|
||||
return write_op
|
||||
|
||||
def snmp_mib(self, mib):
|
||||
def snmp_mib(self, mib, label="unknown"):
|
||||
"""Generic SNMP query, returning value of a MIB."""
|
||||
if self.mib_dict:
|
||||
if mib not in self.mib_dict:
|
||||
logging.error(
|
||||
"MIB '%s' not valued in the configuration file. "
|
||||
"Operation: %s",
|
||||
mib,
|
||||
label
|
||||
)
|
||||
return False
|
||||
return self.mib_dict[mib]
|
||||
if not self.hostname:
|
||||
return False
|
||||
utt = UdpTransportTarget(
|
||||
|
@ -479,15 +490,20 @@ class EpsonPrinter:
|
|||
for response in iterator:
|
||||
errorIndication, errorStatus, errorIndex, varBinds = response
|
||||
if errorIndication:
|
||||
logging.info("snmp_mib error: %s", errorIndication)
|
||||
logging.info(
|
||||
"snmp_mib error: %s. MIB: %s. Operation: %s",
|
||||
errorIndication, mib, label
|
||||
)
|
||||
if " timed out" in errorIndication:
|
||||
raise TimeoutError(errorIndication)
|
||||
return False
|
||||
elif errorStatus:
|
||||
logging.info(
|
||||
'snmp_mib PDU error: %s at %s',
|
||||
'snmp_mib PDU error: %s at %s. MIB: %s. Operation: %s',
|
||||
errorStatus.prettyPrint(),
|
||||
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
|
||||
errorIndex and varBinds[int(errorIndex) - 1][0] or '?',
|
||||
mib,
|
||||
label
|
||||
)
|
||||
return False
|
||||
else:
|
||||
|
@ -496,9 +512,17 @@ class EpsonPrinter:
|
|||
return varBind[1].asOctets()
|
||||
else:
|
||||
return varBind[1].prettyPrint()
|
||||
logging.info("snmp_mib value error: invalid multiple data")
|
||||
logging.info(
|
||||
"snmp_mib value error: invalid multiple data. "
|
||||
"MIB: %s. Operation: %s",
|
||||
mib,
|
||||
label
|
||||
)
|
||||
return False
|
||||
logging.info("snmp_mib value error: invalid data")
|
||||
logging.info(
|
||||
"snmp_mib value error: invalid data. MIB: %s. Operation: %s",
|
||||
label
|
||||
)
|
||||
return False
|
||||
|
||||
def read_eeprom(
|
||||
|
@ -513,7 +537,7 @@ class EpsonPrinter:
|
|||
f" OID: {oid}={hex(oid)}"
|
||||
)
|
||||
response = self.snmp_mib(
|
||||
self.eeprom_oid_read_address(oid, label=label))
|
||||
self.eeprom_oid_read_address(oid, label=label), label=label)
|
||||
if not response:
|
||||
return None
|
||||
logging.debug(f" RESPONSE: {repr(response)}")
|
||||
|
@ -557,12 +581,13 @@ class EpsonPrinter:
|
|||
response = self.read_eeprom(oid, label=label)
|
||||
logging.debug(f"Previous value for {label}: {response}")
|
||||
oid_string = self.eeprom_oid_write_address(oid, value, label=label)
|
||||
response = self.snmp_mib(oid_string)
|
||||
logging.debug(
|
||||
f"EEPROM_WRITE {label}:\n"
|
||||
f" ADDRESS: {oid_string}\n"
|
||||
f" OID: {oid}={hex(oid)}"
|
||||
f" OID: {oid}={hex(oid)}\n"
|
||||
f" VALUE: {value} = {hex(int(value))}"
|
||||
)
|
||||
response = self.snmp_mib(oid_string, label=label)
|
||||
if response:
|
||||
logging.debug(f" RESPONSE: {repr(response)}")
|
||||
if not self.dry_run and response and not ":OK;" in repr(response):
|
||||
|
@ -888,7 +913,12 @@ class EpsonPrinter:
|
|||
else:
|
||||
snmp_info = self.snmp_info
|
||||
for name, oid in snmp_info.items():
|
||||
result = self.snmp_mib(oid)
|
||||
logging.debug(
|
||||
f"SNMP_DUMP {name}:\n"
|
||||
f" ADDRESS: {oid}"
|
||||
)
|
||||
result = self.snmp_mib(oid, label="get_snmp_info " + name)
|
||||
logging.debug(f" RESPONSE: {repr(result)}")
|
||||
if name == "hex_data" and result is not False:
|
||||
sys_info[name] = result.hex(" ").upper()
|
||||
elif name == "UpTime" and result is not False:
|
||||
|
@ -969,10 +999,16 @@ class EpsonPrinter:
|
|||
|
||||
def get_firmware_version(self) -> str:
|
||||
"""Return firmware version."""
|
||||
firmware_string = self.snmp_mib(
|
||||
f"{self.eeprom_link}.118.105.1.0.0")
|
||||
oid = f"{self.eeprom_link}.118.105.1.0.0"
|
||||
label = "get_firmware_version"
|
||||
logging.debug(
|
||||
f"SNMP_DUMP {label}:\n"
|
||||
f" ADDRESS: {oid}"
|
||||
)
|
||||
firmware_string = self.snmp_mib(oid, label=label)
|
||||
if not firmware_string:
|
||||
return None
|
||||
logging.debug(f" RESPONSE: {repr(firmware_string)}")
|
||||
firmware = re.sub(
|
||||
r".*vi:00:(.{6}).*", r'\g<1>', firmware_string.decode())
|
||||
year = ord(firmware[4:5]) + 1945
|
||||
|
@ -983,10 +1019,16 @@ class EpsonPrinter:
|
|||
|
||||
def get_cartridges(self) -> str:
|
||||
"""Return list of cartridge types."""
|
||||
cartridges_string = self.snmp_mib(
|
||||
f"{self.eeprom_link}.105.97.1.0.0")
|
||||
oid = f"{self.eeprom_link}.105.97.1.0.0"
|
||||
label = "get_cartridges"
|
||||
logging.debug(
|
||||
f"SNMP_DUMP {label}:\n"
|
||||
f" ADDRESS: {oid}"
|
||||
)
|
||||
cartridges_string = self.snmp_mib(oid, label=label)
|
||||
if not cartridges_string:
|
||||
return None
|
||||
logging.debug(f" RESPONSE: {repr(cartridges_string)}")
|
||||
cartridges = re.sub(
|
||||
r".*IA:00;(.*);.*", r'\g<1>',
|
||||
cartridges_string.decode(),
|
||||
|
@ -1020,7 +1062,7 @@ class EpsonPrinter:
|
|||
"""Return printer status and ink levels."""
|
||||
address = f"{self.eeprom_link}.115.116.1.0.1"
|
||||
logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}")
|
||||
result = self.snmp_mib(address)
|
||||
result = self.snmp_mib(address, label="get_printer_status")
|
||||
if not result:
|
||||
return None
|
||||
logging.debug(f" RESPONSE: {repr(result[:20])}...\n%s",
|
||||
|
@ -1080,7 +1122,7 @@ class EpsonPrinter:
|
|||
f"Cartridge {i}:\n"
|
||||
f" ADDRESS: {mib}"
|
||||
)
|
||||
cartridge = self.snmp_mib(mib)
|
||||
cartridge = self.snmp_mib(mib, label="get_cartridge_information")
|
||||
logging.debug(f" RESPONSE: {repr(cartridge)}")
|
||||
if not cartridge:
|
||||
continue
|
||||
|
@ -1208,6 +1250,150 @@ class EpsonPrinter:
|
|||
except Exception:
|
||||
return None
|
||||
|
||||
def read_config_file(self, file):
|
||||
mib_dict = {}
|
||||
next_line = None
|
||||
try:
|
||||
while True:
|
||||
line = next_line if next_line != None else next(file)
|
||||
next_line = None
|
||||
oid = None
|
||||
value = None
|
||||
process = None
|
||||
address_val = None
|
||||
response_val = None
|
||||
response_val_bytes = None
|
||||
if line.startswith("PRINTER_STATUS:"):
|
||||
oid = False
|
||||
value = False
|
||||
process = True
|
||||
response_next = True
|
||||
if line.startswith("Cartridge "):
|
||||
oid = False
|
||||
value = False
|
||||
process = True
|
||||
response_next = False
|
||||
if line.startswith("SNMP_DUMP "):
|
||||
oid = False
|
||||
value = False
|
||||
process = True
|
||||
response_next = False
|
||||
if line.startswith("EEPROM_DUMP "):
|
||||
oid = True
|
||||
value = False
|
||||
process = True
|
||||
response_next = False
|
||||
if line.startswith("EEPROM_WRITE "):
|
||||
oid = True
|
||||
value = True
|
||||
process = True
|
||||
response_next = False
|
||||
if process:
|
||||
# address
|
||||
address_line = (
|
||||
next_line if next_line != None else next(file)
|
||||
)
|
||||
next_line = None
|
||||
if not address_line.startswith(" ADDRESS: "):
|
||||
logging.error(
|
||||
"Missing ADDRESS: '%s'", address_line.rstrip())
|
||||
next_line = address_line
|
||||
continue
|
||||
address_val = address_line[11:].rstrip()
|
||||
if not address_val:
|
||||
logging.error(
|
||||
"Invalid ADDRESS: '%s'", address_line.rstrip())
|
||||
next_line = address_line
|
||||
continue
|
||||
# oid
|
||||
if oid:
|
||||
oid_line = (
|
||||
next_line if next_line != None else next(file)
|
||||
)
|
||||
next_line = None
|
||||
if not oid_line.startswith(" OID: "):
|
||||
logging.error(
|
||||
"Missing OID: '%s'", oid_line.rstrip())
|
||||
next_line = oid_line
|
||||
continue
|
||||
# value
|
||||
if value:
|
||||
value_line = (
|
||||
next_line if next_line != None else next(file)
|
||||
)
|
||||
next_line = None
|
||||
if not value_line.startswith(" VALUE: "):
|
||||
logging.error(
|
||||
"Missing VALUE: '%s'", value_line.rstrip())
|
||||
next_line = value_line
|
||||
continue
|
||||
# response
|
||||
response_line = (
|
||||
next_line if next_line != None else next(file)
|
||||
)
|
||||
next_line = None
|
||||
if response_line.startswith(" RESPONSE: "):
|
||||
response_val = response_line[12:].rstrip()
|
||||
if not response_val:
|
||||
logging.error(
|
||||
"Invalid RESPONSE '%s'", response_line.rstrip())
|
||||
next_line = response_line
|
||||
continue
|
||||
if response_next:
|
||||
dump_hex_str = ""
|
||||
while True:
|
||||
dump_hex = (
|
||||
next_line if next_line != None
|
||||
else next(file)
|
||||
)
|
||||
next_line = None
|
||||
if not dump_hex.startswith(" "):
|
||||
next_line = dump_hex
|
||||
break
|
||||
try:
|
||||
val = bytes.fromhex(dump_hex)
|
||||
except ValueError:
|
||||
next_line = dump_hex
|
||||
continue
|
||||
dump_hex_str += dump_hex
|
||||
if not dump_hex_str:
|
||||
logging.error(
|
||||
"Invalid DUMP: '%s'", dump_hex.rstrip())
|
||||
next_line = dump_hex
|
||||
continue
|
||||
try:
|
||||
val = bytes.fromhex(dump_hex_str)
|
||||
except ValueError:
|
||||
logging.error(
|
||||
"Invalid DUMP %s", dump_hex_str.rstrip())
|
||||
next_line = dump_hex
|
||||
continue
|
||||
if val:
|
||||
mib_dict[address_val] = val
|
||||
else:
|
||||
try:
|
||||
response_val_bytes = ast.literal_eval(
|
||||
response_val)
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Invalid response %s: %s",
|
||||
response_line.rstrip(),
|
||||
e
|
||||
)
|
||||
next_line = response_line
|
||||
continue
|
||||
if response_val_bytes:
|
||||
mib_dict[address_val] = response_val_bytes
|
||||
else:
|
||||
logging.error(
|
||||
"Null value for response %s",
|
||||
response_line.rstrip()
|
||||
)
|
||||
next_line = response_line
|
||||
except StopIteration:
|
||||
pass
|
||||
return mib_dict
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
@ -1325,6 +1511,16 @@ if __name__ == "__main__":
|
|||
default=None,
|
||||
help='SNMP GET retries (floating point argument)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'-c',
|
||||
"--config",
|
||||
dest='config_file',
|
||||
type=argparse.FileType('r'),
|
||||
help="read a configuration file including the full log dump of a "
|
||||
"previous operation instead of accessing the printer via SNMP",
|
||||
default=0,
|
||||
nargs=1,
|
||||
metavar='CONFIG_FILE')
|
||||
args = parser.parse_args()
|
||||
|
||||
logging_level = logging.WARNING
|
||||
|
@ -1355,6 +1551,11 @@ if __name__ == "__main__":
|
|||
timeout=args.timeout,
|
||||
retries=args.retries,
|
||||
dry_run=args.dry_run)
|
||||
if args.config_file:
|
||||
printer.mib_dict = printer.read_config_file(args.config_file[0])
|
||||
if not printer.mib_dict:
|
||||
print("Error while reading configuration file")
|
||||
quit(1)
|
||||
if not printer.parm:
|
||||
print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join(
|
||||
printer.valid_printers),
|
||||
|
@ -1397,7 +1598,11 @@ if __name__ == "__main__":
|
|||
args.dump_eeprom[0] % 256,
|
||||
int(args.dump_eeprom[1] % 256)
|
||||
).items():
|
||||
print(f"{str(addr).rjust(3)}: {val:#04x} = {str(val).rjust(3)}")
|
||||
print(
|
||||
f"EEPROM_ADDR {hex(addr).rjust(4)} = "
|
||||
f"{str(addr).rjust(3)}: "
|
||||
f"{val:#04x} = {str(val).rjust(3)}"
|
||||
)
|
||||
if args.query:
|
||||
print_opt = True
|
||||
if ("stats" in printer.parm and
|
||||
|
@ -1455,12 +1660,16 @@ if __name__ == "__main__":
|
|||
read_list = re.split(',\s*', args.read_eeprom[0])
|
||||
for value in read_list:
|
||||
try:
|
||||
val = printer.read_eeprom(
|
||||
ast.literal_eval(value), label='read_eeprom')
|
||||
addr = int(ast.literal_eval(value))
|
||||
val = printer.read_eeprom(addr, label='read_eeprom')
|
||||
if val is None:
|
||||
print("EEPROM read error.")
|
||||
else:
|
||||
print(f"0x{val}={int(val, 16)}")
|
||||
print(
|
||||
f"EEPROM_ADDR {hex(addr).rjust(4)} = "
|
||||
f"{str(addr).rjust(3)}: "
|
||||
f"{int(val):#04x} = {val.rjust(3)}"
|
||||
)
|
||||
except (ValueError, SyntaxError):
|
||||
print("invalid argument for read_eeprom")
|
||||
quit(1)
|
||||
|
|
Loading…
Reference in a new issue