Migrate from easysnmp to pysnmp

This commit is contained in:
Ircama 2023-08-04 00:09:11 +02:00
parent c6187cf56d
commit 6857f37097
2 changed files with 193 additions and 131 deletions

View file

@ -18,16 +18,24 @@ The software also provides a configurable printer dictionary, which can be easil
```
git clone https://github.com/Ircama/epson_print_conf
pip3 install easysnmp
pip3 install pyasn1==0.4.8
pip3 install git+https://github.com/etingof/pysnmp.git
cd epson_print_conf
```
Notes (at the time of writing):
- [before pysnmp, install pyasn1 with version 0.4.8 and not 0.5](https://github.com/etingof/pysnmp/issues/440#issuecomment-1544341598)
- [pull pysnmp from the GitHub master branch, not from PyPI](https://stackoverflow.com/questions/54868134/snmp-reading-from-an-oid-with-three-libraries-gives-different-execution-times#comment96532761_54869361)
Tested with Ubuntu / Windows Subsystem for Linux, Windows.
## Usage
```
usage: epson_print_conf.py [-h] -m MODEL -a HOSTNAME [-i] [-q QUERY] [--reset_waste_ink] [--detect-key] [-d]
[-e DUMP_EEPROM DUMP_EEPROM] [--dry-run] [--write-first-ti-received-time FTRT FTRT FTRT]
[-R READ_EEPROM] [-W WRITE_EEPROM] [-S WS_TO_STRING]
[-R READ_EEPROM] [-W WRITE_EEPROM] [-S WS_TO_STRING] [-t TIMEOUT] [-r RETRIES]
optional arguments:
-h, --help show this help message and exit
@ -52,6 +60,10 @@ optional arguments:
Write related values to a list of printer EEPROM addresses. Format is: address: value [, ...]
-S WS_TO_STRING, --write-sequence-to-string WS_TO_STRING
Convert write sequence of numbers to string.
-t TIMEOUT, --timeout TIMEOUT
SNMP GET timeout (floating point argument)
-r RETRIES, --retries RETRIES
SNMP GET retries (floating point argument)
Epson Printer Configuration accessed via SNMP (TCP/IP)
```
@ -100,37 +112,30 @@ if not printer.parm:
stats = printer.stats()
print("stats:", stats)
ret = printer.session.get_snmp_info()
ret = printer.get_snmp_info()
print("get_snmp_info:", ret)
ret = printer.session.get_serial_number()
ret = printer.get_serial_number()
print("get_serial_number:", ret)
ret = printer.session.get_firmware_version()
ret = printer.get_firmware_version()
print("get_firmware_version:", ret)
ret = printer.session.get_printer_head_id()
ret = printer.get_printer_head_id()
print("get_printer_head_id:", ret)
ret = printer.session.get_cartridges()
ret = printer.get_cartridges()
print("get_cartridges:", ret)
ret = printer.session.get_printer_status()
ret = printer.get_printer_status()
print("get_printer_status:", ret)
ret = printer.session.get_ink_replacement_counters()
ret = printer.get_ink_replacement_counters()
print("get_ink_replacement_counters:", ret)
ret = printer.session.get_waste_ink_levels()
ret = printer.get_waste_ink_levels()
print("get_waste_ink_levels:", ret)
ret = printer.session.get_last_printer_fatal_errors()
ret = printer.get_last_printer_fatal_errors()
print("get_last_printer_fatal_errors:", ret)
ret = printer.session.get_stats()
ret = printer.get_stats()
print("get_stats:", ret)
printer.session.reset_waste_ink_levels()
printer.session.detect_key()
printer.session.write_first_ti_received_time(2000, 1, 2)
```
### Exceptions
```
TimeoutError
ValueError
printer.reset_waste_ink_levels()
printer.brute_force_read_key()
printer.write_first_ti_received_time(2000, 1, 2)
```
## Output example
@ -191,8 +196,8 @@ Example of advanced printer status with an XP-205 printer:
'URL_path': 'Epson_IPP_Printer',
'UpTime': '00:57:48',
'WiFi': '....',
'device_id': 'MFG:EPSON;CMD:ESCPL2,BDC,D4,D4PX,ESCPR1;MDL:XP-205 207 '
'Series;CLS:PRINTER;DES:EPSON XP-205 207 '
'device_id': 'MFG:EPSON;CMD:ESCPL2,BDC,D4,D4PX,ESCPR1;MDL:XP-205 '
'207 Series;CLS:PRINTER;DES:EPSON XP-205 207 '
'Series;CID:EpsonRGB;FID:FXN,DPN,WFA,ETN,AFN,DAN;RID:40;',
'hex_data': 'A4 EE 57 DE FD 03'},
'stats': {'First TI received time': '...',
@ -209,7 +214,7 @@ Example of advanced printer status with an XP-205 printer:
## Resources
### snmpget
### snmpget (Linux)
Installation:

View file

@ -12,7 +12,7 @@ import datetime
import time
import textwrap
import ast
import easysnmp # pip3 install easysnmp
from pysnmp.hlapi.v1arch import *
class EpsonPrinter:
@ -239,6 +239,13 @@ class EpsonPrinter:
"raw_waste_reset": {24: 0, 25: 0, 30: 0, 28: 0, 29: 0, 46: 94}
# to be completed
},
"XP-3150": {
"read_key": [80, 9],
"stats": {
"Total print page counter": [133, 132, 131, 130],
},
# draft
},
}
snmp_info = {
@ -284,6 +291,8 @@ class EpsonPrinter:
self,
printer_model:
str, hostname: str,
timeout: (None, float) = None,
retries: (None, float) = None,
debug: bool = False,
dry_run: bool = False) -> None:
"""Initialise printer model."""
@ -300,13 +309,14 @@ class EpsonPrinter:
self.PRINTER_CONFIG[alias_name] = printer_data
self.printer_model = printer_model
self.hostname = hostname
self.timeout = timeout
self.retries = retries
self.debug = debug
self.dry_run = dry_run
if self.printer_model in self.valid_printers:
self.parm = self.PRINTER_CONFIG[self.printer_model]
else:
self.parm = None
self.session = EpsonSession(printer=self, debug=debug, dry_run=dry_run)
@property
def valid_printers(self):
@ -320,14 +330,13 @@ class EpsonPrinter:
@property
def list_methods(self):
"""Return list of available information methods about a printer."""
return(filter(lambda x: x.startswith("get_") and x not in dir(
easysnmp.Session), dir(self.session)))
return(filter(lambda x: x.startswith("get_"), dir(self)))
def stats(self):
"""Return all available information about a printer."""
stat_set = {}
for method in self.list_methods:
ret = self.session.__getattribute__(method)()
ret = self.__getattribute__(method)()
if ret:
stat_set[method[4:]] = ret
else:
@ -339,26 +348,6 @@ class EpsonPrinter:
"""Convert the string write key to a sequence of numbers"""
return ".".join(str(b + 1) for b in key)
class EpsonSession(easysnmp.Session):
"""SNMP session wrapper for Epson printer."""
def __init__(
self,
printer: EpsonPrinter,
community: str = "public",
version: int = 1,
debug: bool = False,
dry_run: bool = False
) -> None:
"""Initialise session."""
self.printer = printer
self.debug = debug
self.dry_run = dry_run
super().__init__(
hostname=self.printer.hostname, community=community, version=version
)
def eeprom_oid_read_address(
self,
oid: int,
@ -374,14 +363,14 @@ class EpsonSession(easysnmp.Session):
oid = oid % 256
if msb > 255:
return None
if 'read_key' not in self.printer.parm:
if 'read_key' not in self.parm:
return None
return (
f"{self.printer.eeprom_link}"
f"{self.eeprom_link}"
".124.124" # || (0x7C 0x7C)
".7.0" # read
f".{self.printer.parm['read_key'][0]}"
f".{self.printer.parm['read_key'][1]}"
f".{self.parm['read_key'][0]}"
f".{self.parm['read_key'][1]}"
".65.190.160"
f".{oid}.{msb}"
)
@ -402,18 +391,18 @@ class EpsonSession(easysnmp.Session):
oid = oid % 256
if msb > 255:
return None
if ('write_key' not in self.printer.parm
or 'read_key' not in self.printer.parm):
if ('write_key' not in self.parm
or 'read_key' not in self.parm):
return None
write_op = (
f"{self.printer.eeprom_link}"
f"{self.eeprom_link}"
".124.124" # || (0x7C 0x7C)
".16.0" # write
f".{self.printer.parm['read_key'][0]}"
f".{self.printer.parm['read_key'][1]}"
f".{self.parm['read_key'][0]}"
f".{self.parm['read_key'][1]}"
".66.189.33"
f".{oid}.{msb}.{value}"
f".{self.printer.caesar(self.printer.parm['write_key'])}"
f".{self.caesar(self.parm['write_key'])}"
)
if self.dry_run:
print("WRITE_DRY_RUN:", write_op)
@ -421,15 +410,48 @@ class EpsonSession(easysnmp.Session):
else:
return write_op
def read_value(self, oids: str):
"""Generic SNMP query, returning value of OIDs."""
try:
value = self.get(oids).value
except easysnmp.exceptions.EasySNMPTimeoutError as e:
raise TimeoutError(str(e))
except Exception as e:
raise ValueError(str(e))
return value
def snmp_mib(self, mib):
"""Generic SNMP query, returning value of a MIB."""
utt = UdpTransportTarget(
(self.hostname, 161),
)
if self.timeout is not None:
utt.timeout = self.timeout
if self.retries is not None:
utt.retries = self.retries
iterator = getCmd(
SnmpDispatcher(),
CommunityData('public', mpModel=0),
utt,
(mib, None)
)
for response in iterator:
errorIndication, errorStatus, errorIndex, varBinds = response
if errorIndication:
if self.debug:
print("snmp_mib error", errorIndication)
return False
elif errorStatus:
if self.debug:
print(
'snmp_mib error: %s at %s' % (
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?')
)
return False
else:
for varBind in varBinds:
try:
return "".join(
[chr(x) for x in varBind[1].asNumbers()])
except Exception:
return varBind[1].prettyPrint()
if self.debug:
print("snmp_mib error: invalid multiple data")
return False
if self.debug:
print("snmp_mib error: invalid multiple data")
return False
def read_eeprom(
self,
@ -443,13 +465,13 @@ class EpsonSession(easysnmp.Session):
f"{self.eeprom_oid_read_address(oid, label=label)}\n"
f" OID: {oid}={hex(oid)}"
)
response = self.read_value(
response = self.snmp_mib(
self.eeprom_oid_read_address(oid, label=label))
if self.debug:
print(f" RESPONSE: {repr(response)}")
try:
response = re.findall(r"EE:[0-9A-F]{6}", response)[0][3:]
except IndexError:
except (TypeError, IndexError):
if self.debug:
print(f"Invalid read key.")
return None
@ -477,7 +499,7 @@ class EpsonSession(easysnmp.Session):
value: int,
label: str = "unknown method") -> None:
"""Write a single byte 'value' to the Epson EEPROM address 'oid'."""
if "write_key" not in self.printer.parm:
if "write_key" not in self.parm:
if self.debug:
print(f"Missing 'write_key' parameter in configuration.")
return False
@ -663,49 +685,54 @@ class EpsonSession(easysnmp.Session):
def get_snmp_info(self, mib_name: str = None) -> str:
"""Return general SNMP information of printer."""
sys_info = {}
if mib_name and mib_name in self.printer.snmp_info.keys():
snmp_info = {mib_name: self.printer.snmp_info[mib_name]}
if mib_name and mib_name in self.snmp_info.keys():
snmp_info = {mib_name: self.snmp_info[mib_name]}
else:
snmp_info = self.printer.snmp_info
snmp_info = self.snmp_info
for name, oid in snmp_info.items():
try:
sys_info[name] = self.read_value(oid)
except Exception:
sys_info[name] = self.snmp_mib(oid)
except Exception as e:
if self.debug:
print(f"No value for SNMP OID '{name}'.")
if "hex_data" in sys_info:
print(
f"No value for SNMP OID '{name}'. "
f"MIB: {oid}. Error: {e}"
)
if "hex_data" in sys_info and sys_info["hex_data"] is not False:
sys_info["hex_data"] = bytes(
[ord(i) for i in sys_info["hex_data"]]).hex(" ").upper()
if "UpTime" in sys_info:
if "UpTime" in sys_info and sys_info["UpTime"] is not False:
sys_info["UpTime"] = time.strftime(
'%H:%M:%S', time.gmtime(int(sys_info["UpTime"])/100))
if "MAC Address" in sys_info:
if "MAC Address" in sys_info and sys_info["MAC Address"] is not False:
sys_info["MAC Address"] = bytes(
[ord(i) for i in sys_info["MAC Address"]]).hex("-").upper()
return sys_info
def get_serial_number(self) -> str:
"""Return serial number of printer."""
if "serial_number" not in self.printer.parm:
if "serial_number" not in self.parm:
return None
return "".join(
chr(int(value, 16))
chr(int(value or "0", 16))
for value in self.read_eeprom_many(
self.printer.parm["serial_number"], label="serial_number")
self.parm["serial_number"], label="serial_number")
)
def get_stats(self, stat_name: str = None) -> str:
"""Return printer statistics."""
if "stats" not in self.printer.parm:
if "stats" not in self.parm:
return None
if stat_name and stat_name in self.printer.parm["stats"].keys():
stat_info = {stat_name: self.printer.parm["stats"][stat_name]}
if stat_name and stat_name in self.parm["stats"].keys():
stat_info = {stat_name: self.parm["stats"][stat_name]}
else:
stat_info = self.printer.parm["stats"]
stat_info = self.parm["stats"]
stats_result = {}
for stat_name, oids in stat_info.items():
total = 0
for val in self.read_eeprom_many(oids, label=stat_name):
if val is None:
return None
total = (total << 8) + int(val, 16)
stats_result[stat_name] = total
ftrt = stats_result["First TI received time"]
@ -718,20 +745,25 @@ class EpsonSession(easysnmp.Session):
def get_printer_head_id(self) -> str: # to be revised
"""Return printer head id."""
if "printer_head_id_h" not in self.printer.parm:
if "printer_head_id_h" not in self.parm:
return None
if "printer_head_id_f" not in self.printer.parm:
if "printer_head_id_f" not in self.parm:
return None
a = self.read_eeprom_many(
self.printer.parm["printer_head_id_h"], label="printer_head_id_h")
self.parm["printer_head_id_h"], label="printer_head_id_h")
b = self.read_eeprom_many(
self.printer.parm["printer_head_id_f"], label="printer_head_id_f")
self.parm["printer_head_id_f"], label="printer_head_id_f")
if (
a == [None, None, None, None, None]
or b == [None, None, None, None, None]
):
return None
return(f'{"".join(a)} - {"".join(b)}')
def get_firmware_version(self) -> str:
"""Return firmware version."""
firmware_string = self.read_value(
f"{self.printer.eeprom_link}.118.105.1.0.0")
firmware_string = self.snmp_mib(
f"{self.eeprom_link}.118.105.1.0.0")
if not firmware_string:
return None
firmware = re.sub(r".*vi:00:(.{6}).*", r'\g<1>', firmware_string)
@ -743,8 +775,8 @@ class EpsonSession(easysnmp.Session):
def get_cartridges(self) -> str:
"""Return list of cartridge types."""
cartridges_string = self.read_value(
f"{self.printer.eeprom_link}.105.97.1.0.0")
cartridges_string = self.snmp_mib(
f"{self.eeprom_link}.105.97.1.0.0")
if not cartridges_string:
return None
cartridges = re.sub(
@ -753,16 +785,18 @@ class EpsonSession(easysnmp.Session):
def get_ink_replacement_counters(self) -> str:
"""Return list of ink replacement counters."""
if "ink_replacement_counters" not in self.printer.parm:
if "ink_replacement_counters" not in self.parm:
return None
irc = {
(
color,
counter,
int(self.read_eeprom(value, label="ink_replacement_counters"),
16),
int(
self.read_eeprom(
value, label="ink_replacement_counters") or "-1", 16
),
)
for color, data in self.printer.parm[
for color, data in self.parm[
"ink_replacement_counters"].items()
for counter, value in data.items()
}
@ -770,7 +804,7 @@ class EpsonSession(easysnmp.Session):
def get_printer_status(self):
"""Return printer status and ink levels."""
result = self.read_value(f"{self.printer.eeprom_link}.115.116.1.0.1")
result = self.snmp_mib(f"{self.eeprom_link}.115.116.1.0.1")
if not result:
return None
if self.debug:
@ -786,26 +820,28 @@ class EpsonSession(easysnmp.Session):
def get_waste_ink_levels(self):
"""Return waste ink levels as a percentage."""
if "main_waste" not in self.printer.parm:
if "main_waste" not in self.parm:
return None
results = {}
for waste_type in ["main_waste", "borderless_waste", "first_waste",
"second_waste", "third_waste"]:
if waste_type not in self.printer.parm:
if waste_type not in self.parm:
continue
level = self.read_eeprom_many(
self.printer.parm[waste_type]["oids"], label=waste_type)
self.parm[waste_type]["oids"], label=waste_type)
if level == [None, None, None]:
return None
level_b10 = int("".join(reversed(level)), 16)
results[waste_type] = round(
level_b10 / self.printer.parm[waste_type]["divider"], 2)
level_b10 / self.parm[waste_type]["divider"], 2)
return results
def get_last_printer_fatal_errors(self) -> str:
"""Return list of last printer fatal errors in hex format."""
if "last_printer_fatal_errors" not in self.printer.parm:
if "last_printer_fatal_errors" not in self.parm:
return None
return self.read_eeprom_many(
self.printer.parm["last_printer_fatal_errors"],
self.parm["last_printer_fatal_errors"],
label="last_printer_fatal_errors"
)
@ -822,19 +858,19 @@ class EpsonSession(easysnmp.Session):
"""
Set waste ink levels to 0.
"""
if "raw_waste_reset" in self.printer.parm:
for oid, value in self.printer.parm["raw_waste_reset"].items():
if "raw_waste_reset" in self.parm:
for oid, value in self.parm["raw_waste_reset"].items():
if not self.write_eeprom(oid, value, label="raw_waste_reset"):
return False
return True
if "main_waste" not in self.printer.parm:
if "main_waste" not in self.parm:
return None
for oid in self.printer.parm["main_waste"]["oids"]:
for oid in self.parm["main_waste"]["oids"]:
if not self.write_eeprom(oid, 0, label="main_waste"):
return False
if "borderless_waste" not in self.printer.parm:
if "borderless_waste" not in self.parm:
return True
for oid in self.printer.parm["borderless_waste"]["oids"]:
for oid in self.parm["borderless_waste"]["oids"]:
if not self.write_eeprom(oid, 0, label="borderless_waste"):
return False
return True
@ -843,8 +879,8 @@ class EpsonSession(easysnmp.Session):
self, year: int, month: int, day: int) -> bool:
"""Update first TI received time"""
try:
msb = self.printer.parm["stats"]["First TI received time"][0]
lsb = self.printer.parm["stats"]["First TI received time"][1]
msb = self.parm["stats"]["First TI received time"][0]
lsb = self.parm["stats"]["First TI received time"][1]
except KeyError:
return False
n = (year - 2000) * 16 * 32 + 32 * month + day
@ -857,7 +893,7 @@ class EpsonSession(easysnmp.Session):
return True
def list_known_keys(self, debug=False):
for model, chars in self.printer.PRINTER_CONFIG.items():
for model, chars in self.PRINTER_CONFIG.items():
if 'write_key' in chars:
print(f"{repr(model).rjust(25)}: {repr(chars['read_key']).rjust(10)} - {repr(chars['write_key'])[1:]}")
else:
@ -868,13 +904,13 @@ class EpsonSession(easysnmp.Session):
):
"""Brute force read_key for printer."""
for x, y in itertools.permutations(range(minimum, maximum + 1), r=2):
self.printer.parm['read_key'] = [x, y]
self.parm['read_key'] = [x, y]
if debug:
print(f"Trying {self.printer.parm['read_key']}...")
print(f"Trying {self.parm['read_key']}...")
val = self.read_eeprom(0x00, label="brute_force_read_key")
if val is None:
continue
return self.printer.parm['read_key']
return self.parm['read_key']
return None
def write_sequence_to_string(self, write_sequence):
@ -985,10 +1021,31 @@ if __name__ == "__main__":
nargs=1,
help='Convert write sequence of numbers to string.'
)
parser.add_argument(
'-t',
'--timeout',
dest='timeout',
type=float,
default=None,
help='SNMP GET timeout (floating point argument)',
)
parser.add_argument(
'-r',
'--retries',
dest='retries',
type=float,
default=None,
help='SNMP GET retries (floating point argument)',
)
args = parser.parse_args()
printer = EpsonPrinter(
args.model, args.hostname, debug=args.debug, dry_run=args.dry_run)
args.model,
args.hostname,
timeout=args.timeout,
retries=args.retries,
debug=args.debug,
dry_run=args.dry_run)
if not printer.parm:
print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join(
printer.valid_printers),
@ -999,25 +1056,25 @@ if __name__ == "__main__":
try:
if args.ws_to_string:
print_opt = True
print(printer.session.write_sequence_to_string(args.ws_to_string))
print(self.write_sequence_to_string(args.ws_to_string))
if args.reset_waste_ink:
print_opt = True
if printer.session.reset_waste_ink_levels():
if self.reset_waste_ink_levels():
print("Reset waste ink levels done.")
else:
print("Failed to reset waste ink levels. Check configuration.")
if args.detect_key:
print_opt = True
read_key = printer.session.brute_force_read_key(debug=True)
read_key = self.brute_force_read_key(debug=True)
if read_key:
print(f"read_key found: {read_key}")
print("List of known keys:")
printer.session.list_known_keys(debug=True)
self.list_known_keys(debug=True)
else:
print(f"Cannot found read_key")
if args.ftrt:
print_opt = True
if printer.session.write_first_ti_received_time(
if self.write_first_ti_received_time(
int(args.ftrt[0]), int(args.ftrt[1]), int(args.ftrt[2])):
print("Write first TI received time done.")
else:
@ -1027,7 +1084,7 @@ if __name__ == "__main__":
)
if args.dump_eeprom:
print_opt = True
for addr, val in printer.session.dump_eeprom(
for addr, val in self.dump_eeprom(
args.dump_eeprom[0] % 256,
int(args.dump_eeprom[1] % 256)
).items():
@ -1036,13 +1093,13 @@ if __name__ == "__main__":
print_opt = True
if ("stats" in printer.parm and
args.query[0] in printer.parm["stats"]):
ret = printer.session.get_stats(args.query[0])
ret = self.get_stats(args.query[0])
if ret:
pprint(ret)
else:
print("No information returned. Check printer definition.")
elif args.query[0] in printer.snmp_info.keys():
ret = printer.session.get_snmp_info(args.query[0])
ret = self.get_snmp_info(args.query[0])
if ret:
pprint(ret)
else:
@ -1053,7 +1110,7 @@ if __name__ == "__main__":
else:
method = "get_" + args.query[0]
if method in printer.list_methods:
ret = printer.session.__getattribute__(method)()
ret = self.__getattribute__(method)()
if ret:
pprint(ret)
else:
@ -1089,7 +1146,7 @@ if __name__ == "__main__":
read_list = re.split(',\s*', args.read_eeprom[0])
for value in read_list:
try:
val = printer.session.read_eeprom(
val = self.read_eeprom(
ast.literal_eval(value), label='read_eeprom')
if val is None:
print("EEPROM read error.")
@ -1105,7 +1162,7 @@ if __name__ == "__main__":
key, val = re.split(':|=', key_val)
try:
val_int = ast.literal_eval(val)
if not printer.session.write_eeprom(
if not self.write_eeprom(
ast.literal_eval(key),
str(val_int), label='write_eeprom'
):