Refinements

This commit is contained in:
Ircama 2023-08-04 09:47:53 +02:00
parent c8e79fe6f2
commit 388f3a7e56
2 changed files with 75 additions and 58 deletions

View file

@ -28,7 +28,9 @@ Notes (at the time of writing):
- [before pysnmp, install pyasn1 with version 0.4.8 and not 0.5](https://github.com/etingof/pysnmp/issues/440#issuecomment-1544341598)
- [pull pysnmp from the GitHub master branch, not from PyPI](https://stackoverflow.com/questions/54868134/snmp-reading-from-an-oid-with-three-libraries-gives-different-execution-times#comment96532761_54869361)
Tested with Ubuntu / Windows Subsystem for Linux, Windows.
This program exploits [pysnmp](https://github.com/etingof/pysnmp), with related [documentation](https://pysnmp.readthedocs.io/).
It is tested with Ubuntu / Windows Subsystem for Linux, Windows.
## Usage
@ -65,7 +67,7 @@ optional arguments:
-r RETRIES, --retries RETRIES
SNMP GET retries (floating point argument)
Epson Printer Configuration accessed via SNMP (TCP/IP)
Epson Printer Configuration via SNMP (TCP/IP)
```
Examples:
@ -138,6 +140,13 @@ printer.brute_force_read_key()
printer.write_first_ti_received_time(2000, 1, 2)
```
### Exception
```
TimeoutError
(pysnmp exceptions)
```
## Output example
Example of advanced printer status with an XP-205 printer:

View file

@ -13,6 +13,7 @@ import time
import textwrap
import ast
from pysnmp.hlapi.v1arch import *
from pyasn1.type.univ import OctetString as OctetStringType
class EpsonPrinter:
@ -429,28 +430,29 @@ class EpsonPrinter:
errorIndication, errorStatus, errorIndex, varBinds = response
if errorIndication:
if self.debug:
print("snmp_mib error", errorIndication)
print("snmp_mib error:", errorIndication)
if " timed out" in errorIndication:
raise TimeoutError(errorIndication)
return False
elif errorStatus:
if self.debug:
print(
'snmp_mib error: %s at %s' % (
'snmp_mib PDU error: %s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?')
)
return False
else:
for varBind in varBinds:
try:
return "".join(
[chr(x) for x in varBind[1].asNumbers()])
except Exception:
if isinstance(varBind[1], OctetStringType):
return varBind[1].asOctets()
else:
return varBind[1].prettyPrint()
if self.debug:
print("snmp_mib error: invalid multiple data")
print("snmp_mib value error: invalid multiple data")
return False
if self.debug:
print("snmp_mib error: invalid multiple data")
print("snmp_mib value error: invalid data")
return False
def read_eeprom(
@ -467,10 +469,12 @@ class EpsonPrinter:
)
response = self.snmp_mib(
self.eeprom_oid_read_address(oid, label=label))
if not response:
return None
if self.debug:
print(f" RESPONSE: {repr(response)}")
try:
response = re.findall(r"EE:[0-9A-F]{6}", response)[0][3:]
response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:]
except (TypeError, IndexError):
if self.debug:
print(f"Invalid read key.")
@ -507,14 +511,7 @@ class EpsonPrinter:
response = self.read_eeprom(oid, label=label)
print(f"Previous value for {label}: {response}")
oid_string = self.eeprom_oid_write_address(oid, value, label=label)
response = None
try:
response = self.get(oid_string)
except easysnmp.exceptions.EasySNMPTimeoutError as e:
if not self.dry_run:
raise TimeoutError(str(e))
except Exception as e:
raise ValueError(str(e))
response = self.snmp_mib(oid_string)
if self.debug:
print(
f"EEPROM_WRITE {label}:\n"
@ -522,9 +519,11 @@ class EpsonPrinter:
f" OID: {oid}={hex(oid)}"
)
if self.debug and response:
print(f" RESPONSE: {repr(response.value)}")
if response and not ":OK;" in repr(response.value): # ":NA;" is an error
return False
print(f" RESPONSE: {repr(response)}")
if not self.dry_run and response and not ":OK;" in repr(response):
if self.debug:
print("Write error")
return False # ":NA;" is an error
return True
def status_parser(self, data):
@ -690,23 +689,21 @@ class EpsonPrinter:
else:
snmp_info = self.snmp_info
for name, oid in snmp_info.items():
try:
sys_info[name] = self.snmp_mib(oid)
except Exception as e:
result = self.snmp_mib(oid)
if name == "hex_data" and result is not False:
sys_info[name] = result.hex(" ").upper()
elif name == "UpTime" and result is not False:
sys_info[name] = time.strftime(
'%H:%M:%S', time.gmtime(int(result)/100))
elif name == "MAC Address" and result is not False:
sys_info[name] = result.hex("-").upper()
elif isinstance(result, bytes):
sys_info[name] = result.decode()
elif isinstance(result, str):
sys_info[name] = result
else:
if self.debug:
print(
f"No value for SNMP OID '{name}'. "
f"MIB: {oid}. Error: {e}"
)
if "hex_data" in sys_info and sys_info["hex_data"] is not False:
sys_info["hex_data"] = bytes(
[ord(i) for i in sys_info["hex_data"]]).hex(" ").upper()
if "UpTime" in sys_info and sys_info["UpTime"] is not False:
sys_info["UpTime"] = time.strftime(
'%H:%M:%S', time.gmtime(int(sys_info["UpTime"])/100))
if "MAC Address" in sys_info and sys_info["MAC Address"] is not False:
sys_info["MAC Address"] = bytes(
[ord(i) for i in sys_info["MAC Address"]]).hex("-").upper()
print(f"No value for SNMP OID '{name}'. MIB: {oid}.")
return sys_info
def get_serial_number(self) -> str:
@ -766,7 +763,8 @@ class EpsonPrinter:
f"{self.eeprom_link}.118.105.1.0.0")
if not firmware_string:
return None
firmware = re.sub(r".*vi:00:(.{6}).*", r'\g<1>', firmware_string)
firmware = re.sub(
r".*vi:00:(.{6}).*", r'\g<1>', firmware_string.decode())
year = ord(firmware[4:5]) + 1945
month = int(firmware[5:], 16)
day = int(firmware[2:4])
@ -780,7 +778,10 @@ class EpsonPrinter:
if not cartridges_string:
return None
cartridges = re.sub(
r".*IA:00;(.*);.*", r'\g<1>', cartridges_string, flags=re.S)
r".*IA:00;(.*);.*", r'\g<1>',
cartridges_string.decode(),
flags=re.S
)
return [i.strip() for i in cartridges.split(',')]
def get_ink_replacement_counters(self) -> str:
@ -810,13 +811,12 @@ class EpsonPrinter:
if self.debug:
print(
textwrap.fill(
"PRINTER_STATUS: " + bytes(
[ord(i) for i in result]).hex(" "),
"PRINTER_STATUS: " + repr(result),
initial_indent="",
subsequent_indent=" ",
)
)
return self.status_parser(bytes([ord(i) for i in result]))
return self.status_parser(result)
def get_waste_ink_levels(self):
"""Return waste ink levels as a percentage."""
@ -892,12 +892,20 @@ class EpsonPrinter:
return False
return True
def list_known_keys(self, debug=False):
def list_known_keys(self):
for model, chars in self.PRINTER_CONFIG.items():
if 'write_key' in chars:
print(f"{repr(model).rjust(25)}: {repr(chars['read_key']).rjust(10)} - {repr(chars['write_key'])[1:]}")
print(
f"{repr(model).rjust(25)}: "
f"{repr(chars['read_key']).rjust(10)} - "
f"{repr(chars['write_key'])[1:]}"
)
else:
print(f"{repr(model).rjust(25)}: {repr(chars['read_key']).rjust(10)} (unknown write key)")
print(
f"{repr(model).rjust(25)}: "
f"{repr(chars['read_key']).rjust(10)} "
f"(unknown write key)"
)
def brute_force_read_key(
self, minimum: int = 0x00, maximum: int = 0xFF, debug=False
@ -926,7 +934,7 @@ if __name__ == "__main__":
from pprint import pprint
parser = argparse.ArgumentParser(
epilog='Epson Printer Configuration accessed via SNMP (TCP/IP)')
epilog='Epson Printer Configuration via SNMP (TCP/IP)')
parser.add_argument(
'-m',
@ -1056,25 +1064,25 @@ if __name__ == "__main__":
try:
if args.ws_to_string:
print_opt = True
print(self.write_sequence_to_string(args.ws_to_string))
print(printer.write_sequence_to_string(args.ws_to_string))
if args.reset_waste_ink:
print_opt = True
if self.reset_waste_ink_levels():
if printer.reset_waste_ink_levels():
print("Reset waste ink levels done.")
else:
print("Failed to reset waste ink levels. Check configuration.")
if args.detect_key:
print_opt = True
read_key = self.brute_force_read_key(debug=True)
read_key = printer.brute_force_read_key(debug=True)
if read_key:
print(f"read_key found: {read_key}")
print("List of known keys:")
self.list_known_keys(debug=True)
printer.list_known_keys()
else:
print(f"Cannot found read_key")
if args.ftrt:
print_opt = True
if self.write_first_ti_received_time(
if printer.write_first_ti_received_time(
int(args.ftrt[0]), int(args.ftrt[1]), int(args.ftrt[2])):
print("Write first TI received time done.")
else:
@ -1084,7 +1092,7 @@ if __name__ == "__main__":
)
if args.dump_eeprom:
print_opt = True
for addr, val in self.dump_eeprom(
for addr, val in printer.dump_eeprom(
args.dump_eeprom[0] % 256,
int(args.dump_eeprom[1] % 256)
).items():
@ -1093,13 +1101,13 @@ if __name__ == "__main__":
print_opt = True
if ("stats" in printer.parm and
args.query[0] in printer.parm["stats"]):
ret = self.get_stats(args.query[0])
ret = printer.get_stats(args.query[0])
if ret:
pprint(ret)
else:
print("No information returned. Check printer definition.")
elif args.query[0] in printer.snmp_info.keys():
ret = self.get_snmp_info(args.query[0])
ret = printer.get_snmp_info(args.query[0])
if ret:
pprint(ret)
else:
@ -1110,7 +1118,7 @@ if __name__ == "__main__":
else:
method = "get_" + args.query[0]
if method in printer.list_methods:
ret = self.__getattribute__(method)()
ret = printer.__getattribute__(method)()
if ret:
pprint(ret)
else:
@ -1146,7 +1154,7 @@ if __name__ == "__main__":
read_list = re.split(',\s*', args.read_eeprom[0])
for value in read_list:
try:
val = self.read_eeprom(
val = printer.read_eeprom(
ast.literal_eval(value), label='read_eeprom')
if val is None:
print("EEPROM read error.")
@ -1162,7 +1170,7 @@ if __name__ == "__main__":
key, val = re.split(':|=', key_val)
try:
val_int = ast.literal_eval(val)
if not self.write_eeprom(
if not printer.write_eeprom(
ast.literal_eval(key),
str(val_int), label='write_eeprom'
):