Refinements

This commit is contained in:
Ircama 2023-08-08 17:35:34 +02:00
parent 6e5cac9316
commit 3571c365d9
2 changed files with 85 additions and 25 deletions

View file

@ -236,8 +236,8 @@ Example of advanced printer status with an XP-205 printer:
'Model': 'EPSON XP-205 207 Series',
'Model short': 'XP-205 207 Series',
'Name': '....',
'Print counter': '0',
'Print input': 'Auto sheet feeder',
'Total printed pages': '0',
'URL': 'http://192.168.1.87:631/Epson_IPP_Printer',
'URL_path': 'Epson_IPP_Printer',
'UpTime': '00:57:48',

View file

@ -480,10 +480,10 @@ class EpsonPrinter:
mib,
label
)
return False
return None, False
return self.mib_dict[mib]
if not self.hostname:
return False
return None, False
utt = UdpTransportTarget(
(self.hostname, 161),
)
@ -506,7 +506,7 @@ class EpsonPrinter:
)
if " timed out" in errorIndication:
raise TimeoutError(errorIndication)
return False
return None, False
elif errorStatus:
logging.info(
'snmp_mib PDU error: %s at %s. MIB: %s. Operation: %s',
@ -515,25 +515,31 @@ class EpsonPrinter:
mib,
label
)
return False
return None, False
else:
for varBind in varBinds:
if isinstance(varBind[1], OctetStringType):
return varBind[1].asOctets()
return(
varBind[1].__class__.__name__,
varBind[1].asOctets()
)
else:
return varBind[1].prettyPrint()
return(
varBind[1].__class__.__name__,
varBind[1].prettyPrint()
)
logging.info(
"snmp_mib value error: invalid multiple data. "
"MIB: %s. Operation: %s",
mib,
label
)
return False
return None, False
logging.info(
"snmp_mib value error: invalid data. MIB: %s. Operation: %s",
label
)
return False
return None, False
def invalid_response(self, response):
return len(response) < 2 or response[0] != 0 or response[-1] != 12
@ -549,7 +555,7 @@ class EpsonPrinter:
f"{self.eeprom_oid_read_address(oid, label=label)}\n"
f" OID: {oid}={hex(oid)}"
)
response = self.snmp_mib(
tag, response = self.snmp_mib(
self.eeprom_oid_read_address(oid, label=label), label=label)
if not response:
return None
@ -559,7 +565,7 @@ class EpsonPrinter:
repr(response), oid, label
)
return None
logging.debug(f" RESPONSE: {repr(response)}")
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response))
try:
response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:]
except (TypeError, IndexError):
@ -610,9 +616,9 @@ class EpsonPrinter:
f" OID: {oid}={hex(oid)}\n"
f" VALUE: {value} = {hex(int(value))}"
)
response = self.snmp_mib(oid_string, label=label)
tag, response = self.snmp_mib(oid_string, label=label)
if response:
logging.debug(f" RESPONSE: {repr(response)}")
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(response))
if not self.dry_run and response and not ":OK;" in repr(response):
logging.info(
"Write error. Oid=%s, value=%s, label=%s", oid, value, label)
@ -947,8 +953,8 @@ class EpsonPrinter:
f"SNMP_DUMP {name}:\n"
f" ADDRESS: {oid}"
)
result = self.snmp_mib(oid, label="get_snmp_info " + name)
logging.debug(f" RESPONSE: {repr(result)}")
tag, result = self.snmp_mib(oid, label="get_snmp_info " + name)
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(result))
if name == "hex_data" and result is not False:
sys_info[name] = result.hex(" ").upper()
elif name == "UpTime" and result is not False:
@ -1032,7 +1038,7 @@ class EpsonPrinter:
f"SNMP_DUMP {label}:\n"
f" ADDRESS: {oid}"
)
firmware_string = self.snmp_mib(oid, label=label)
tag, firmware_string = self.snmp_mib(oid, label=label)
if not firmware_string:
return None
if self.invalid_response(firmware_string):
@ -1040,7 +1046,7 @@ class EpsonPrinter:
f"Invalid response for %s: '%s'",
label, repr(firmware_string)
)
logging.debug(f" RESPONSE: {repr(firmware_string)}")
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(firmware_string))
firmware = re.sub(
r".*vi:00:(.{6}).*", r'\g<1>', firmware_string.decode())
year = ord(firmware[4:5]) + 1945
@ -1057,7 +1063,7 @@ class EpsonPrinter:
f"SNMP_DUMP {label}:\n"
f" ADDRESS: {oid}"
)
cartridges_string = self.snmp_mib(oid, label=label)
tag, cartridges_string = self.snmp_mib(oid, label=label)
if self.invalid_response(cartridges_string):
logging.error(
f"Invalid response for %s: '%s'",
@ -1065,7 +1071,8 @@ class EpsonPrinter:
)
if not cartridges_string:
return None
logging.debug(f" RESPONSE: {repr(cartridges_string)}")
logging.debug(
" TAG: %s\n RESPONSE: %s", tag, repr(cartridges_string))
cartridges = re.sub(
r".*IA:00;(.*);.*", r'\g<1>',
cartridges_string.decode(),
@ -1099,10 +1106,12 @@ class EpsonPrinter:
"""Return printer status and ink levels."""
address = f"{self.eeprom_link}.115.116.1.0.1"
logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}")
result = self.snmp_mib(address, label="get_printer_status")
tag, result = self.snmp_mib(address, label="get_printer_status")
if not result:
return None
logging.debug(f" RESPONSE: {repr(result[:20])}...\n%s",
logging.debug(" TAG: %s\n RESPONSE: %s...\n%s",
tag,
repr(result[:20]),
textwrap.fill(
result.hex(' '),
initial_indent=" ",
@ -1166,8 +1175,9 @@ class EpsonPrinter:
f"Cartridge {i}:\n"
f" ADDRESS: {mib}"
)
cartridge = self.snmp_mib(mib, label="get_cartridge_information")
logging.debug(f" RESPONSE: {repr(cartridge)}")
tag, cartridge = self.snmp_mib(
mib, label="get_cartridge_information")
logging.debug(" TAG: %s\n RESPONSE: %s", tag, repr(cartridge))
if not cartridge:
continue
if self.invalid_response(cartridge):
@ -1368,6 +1378,7 @@ class EpsonPrinter:
process = None
address_val = None
response_val = None
tag_val = None
response_val_bytes = None
if line.startswith("PRINTER_STATUS:"):
oid = False
@ -1424,6 +1435,15 @@ class EpsonPrinter:
"Missing VALUE: '%s'", value_line.rstrip())
next_line.pushline(value_line)
continue
# tag
tag_line = next_line.readline()
if tag_line.startswith(" TAG: "):
tag_val = tag_line[7:].rstrip()
if not tag_val:
logging.error(
"Invalid TAG '%s'", tag_line.rstrip())
next_line.pushline(tag_line)
continue
# response
response_line = next_line.readline()
if response_line.startswith(" RESPONSE: "):
@ -1459,7 +1479,7 @@ class EpsonPrinter:
next_line.pushline(dump_hex)
continue
if val:
mib_dict[address_val] = val
mib_dict[address_val] = tag_val, val
else:
try:
response_val_bytes = ast.literal_eval(
@ -1473,7 +1493,7 @@ class EpsonPrinter:
next_line.pushline(response_line)
continue
if response_val_bytes:
mib_dict[address_val] = response_val_bytes
mib_dict[address_val] = tag_val, response_val_bytes
else:
logging.error(
"Null value for response %s",
@ -1487,6 +1507,33 @@ class EpsonPrinter:
self.mib_dict = mib_dict
return mib_dict
def write_simdata(self, file):
tagnum = {
"OctetString": "4x",
"TimeTicks": "2", # 64
"Integer": "2",
}
try:
for key, (tag, value) in self.mib_dict.items():
if tag == "OctetString":
if isinstance(value, bytes):
write_line = f"{key}|{tagnum[tag]}|{value.hex()}\n"
else:
logging.error(
"OctetString is not byte type: key=%s, tag=%s, "
"value=%s, type=%s",
key, tag, value, type(value)
)
continue
else:
write_line = f"{key}|{tagnum[tag]}|{value}\n"
file.write(write_line)
file.close()
except Exception as e:
logging.error("simdata write error: %s", e)
return False
return True
if __name__ == "__main__":
import argparse
@ -1621,6 +1668,14 @@ if __name__ == "__main__":
default=0,
nargs=1,
metavar='CONFIG_FILE')
parser.add_argument(
"--simdata",
dest='simdata_file',
type=argparse.FileType('a'),
help="write SNMP dictionary map to simdata file",
default=0,
nargs=1,
metavar='SIMDATA_FILE')
args = parser.parse_args()
logging_level = logging.WARNING
@ -1656,6 +1711,11 @@ if __name__ == "__main__":
print("Error while reading configuration file")
quit(1)
args.config_file[0].close()
if args.simdata_file:
if not printer.write_simdata(args.simdata_file[0]):
print("Error while writing simdata file")
quit(1)
args.simdata_file[0].close()
if not printer.parm:
print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join(
printer.valid_printers),