Made py-kms Gui working again

This commit is contained in:
Matteo ℱan 2019-11-26 21:30:58 +01:00
parent 7c14eb10dc
commit 59fe27744e
No known key found for this signature in database
GPG key ID: 3C30A05BC133D9B6
14 changed files with 1208 additions and 335 deletions

View file

@ -11,8 +11,7 @@ from pykms_DB2Dict import kmsDB2Dict
from pykms_PidGenerator import epidGenerator
from pykms_Filetimes import filetime_to_dt
from pykms_Sql import sql_initialize, sql_update, sql_update_epid
from pykms_Format import justify, byterize, enco, deco
from pykms_Misc import pretty_printer
from pykms_Format import justify, byterize, enco, deco, pretty_printer
#--------------------------------------------------------------------------------------------------------------------------------------------------------
@ -111,7 +110,7 @@ class kmsBase:
if self.srv_config['sqlite'] and self.srv_config['dbSupport']:
self.dbName = sql_initialize()
pretty_printer(num_text = 15)
pretty_printer(num_text = 15, where = "srv")
kmsRequest = byterize(kmsRequest)
loggersrv.debug("KMS Request Bytes: \n%s\n" % justify(deco(binascii.b2a_hex(enco(str(kmsRequest), 'latin-1')), 'latin-1')))
loggersrv.debug("KMS Request: \n%s\n" % justify(kmsRequest.dump(print_to_stdout = False)))

View file

@ -20,9 +20,9 @@ from pykms_RequestV5 import kmsRequestV5
from pykms_RequestV6 import kmsRequestV6
from pykms_RpcBase import rpcBase
from pykms_DB2Dict import kmsDB2Dict
from pykms_Misc import logger_create, check_logfile, pretty_printer
from pykms_Misc import logger_create, check_logfile
from pykms_Misc import KmsParser, KmsException
from pykms_Format import justify, byterize, enco, deco, ShellMessage
from pykms_Format import justify, byterize, enco, deco, ShellMessage, pretty_printer
clt_description = 'KMS Client Emulator written in Python'
clt_version = 'py-kms_2019-05-15'
@ -128,8 +128,8 @@ def client_create():
try:
loggerclt.info("Sending RPC bind request...")
pretty_printer(num_text = [-1, 1], where = "clt")
s.send(RPC_Bind)
pretty_printer(num_text = [-1, 1])
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True,
put_text = "{reverse}{red}{bold}While sending: %s{end}" %str(e))
@ -138,7 +138,7 @@ def client_create():
if bindResponse == '' or not bindResponse:
pretty_printer(log_obj = loggerclt.warning, to_exit = True,
put_text = "{reverse}{yellow}{bold}No data received.{end}")
pretty_printer(num_text = [-4, 7])
pretty_printer(num_text = [-4, 7], where = "clt")
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True,
put_text = "{reverse}{red}{bold}While receiving: %s{end}" %str(e))
@ -146,20 +146,21 @@ def client_create():
packetType = MSRPCHeader(bindResponse)['type']
if packetType == rpcBase.packetType['bindAck']:
loggerclt.info("RPC bind acknowledged.")
pretty_printer(num_text = 8)
pretty_printer(num_text = 8, where = "clt")
kmsRequest = createKmsRequest()
requester = pykms_RpcRequest.handler(kmsRequest, clt_config)
try:
loggerclt.info("Sending RPC activation request...")
s.send(enco(str(requester.generateRequest()), 'latin-1'))
pretty_printer(num_text = [-1, 12])
RPC_Actv = enco(str(requester.generateRequest()), 'latin-1')
pretty_printer(num_text = [-1, 12], where = "clt")
s.send(RPC_Actv)
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True,
put_text = "{reverse}{red}{bold}While sending: %s{end}" %str(e))
try:
response = s.recv(1024)
pretty_printer(num_text = [-4, 20])
pretty_printer(num_text = [-4, 20], where = "clt")
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True,
put_text = "{reverse}{red}{bold}While receiving: %s{end}" %str(e))
@ -184,7 +185,7 @@ def client_create():
'status' : "Activated",
'product' : clt_config["mode"]})
pretty_printer(num_text = 21)
pretty_printer(num_text = 21, where = "clt")
elif packetType == rpcBase.packetType['bindNak']:
loggerclt.info(justify(MSRPCBindNak(bindResponse).dump(print_to_stdout = False)))
@ -224,7 +225,7 @@ def createKmsRequestBase():
requestDict['mnPad'] = '\0'.encode('utf-16le') * (63 - len(requestDict['machineName'].decode('utf-16le')))
# Debug Stuff
pretty_printer(num_text = 9)
pretty_printer(num_text = 9, where = "clt")
requestDict = byterize(requestDict)
loggerclt.debug("Request Base Dictionary: \n%s\n" % justify(requestDict.dump(print_to_stdout = False)))

View file

@ -4,7 +4,7 @@ from __future__ import print_function, unicode_literals
import re
import sys
import os
import threading
from collections import OrderedDict
try:
# Python 2.x imports
@ -21,19 +21,16 @@ pyver = sys.version_info[:2]
def enco(strg, typ = 'latin-1'):
if pyver >= (3, 0):
if isinstance(strg, str):
strgenc = strg.encode(typ)
return strgenc
return strg.encode(typ)
else:
return strg
return strg
def deco(strg, typ = 'latin-1'):
if pyver >= (3, 0):
if isinstance(strg, bytes):
strgdec = strg.decode(typ)
return strgdec
return strg.decode(typ)
else:
return strg
return strg
def byterize(obj):
@ -95,59 +92,96 @@ ExtraMap = {'end' : '\x1b[0m',
}
ColorExtraMap = dict(ColorMap, **ExtraMap)
ColorMapReversed = dict(zip(ColorMap.values(), ColorMap.keys()))
ExtraMapReversed = dict(zip(ExtraMap.values(), ExtraMap.keys()))
MsgMap = {0 : {'text' : "{yellow}\n\t\t\tClient generating RPC Bind Request...{end}", 'where' : "clt"},
1 : {'text' : "{white}<==============={end}{yellow}\tClient sending RPC Bind Request...{end}", 'where' : "clt"},
2 : {'text' : "{yellow}Server received RPC Bind Request !!!\t\t\t\t{end}{white}<==============={end}", 'where' : "srv"},
3 : {'text' : "{yellow}Server parsing RPC Bind Request...{end}", 'where' : "srv"},
4 : {'text' : "{yellow}Server generating RPC Bind Response...{end}", 'where' : "srv"},
5 : {'text' : "{yellow}Server sending RPC Bind Response...\t\t\t\t{end}{white}===============>{end}", 'where' : "srv"},
6 : {'text' : "{green}{bold}\nRPC Bind acknowledged !!!{end}", 'where' : "srv"},
7 : {'text' : "{white}===============>{end}{yellow}\tClient received RPC Bind Response !!!{end}", 'where' : "clt"},
8 : {'text' : "{green}{bold}\t\t\tRPC Bind acknowledged !!!{end}", 'where' : "clt"},
9 : {'text' : "{blue}\t\t\tClient generating Activation Request dictionary...{end}", 'where' : "clt"},
10 : {'text' : "{blue}\t\t\tClient generating Activation Request data...{end}", 'where' : "clt"},
11 : {'text' : "{blue}\t\t\tClient generating RPC Activation Request...{end}", 'where' : "clt"},
12 : {'text' : "{white}<==============={end}{blue}\tClient sending RPC Activation Request...{end}", 'where' : "clt"},
13 : {'text' : "{blue}Server received RPC Activation Request !!!\t\t\t{end}{white}<==============={end}", 'where' : "srv"},
14 : {'text' : "{blue}Server parsing RPC Activation Request...{end}", 'where' : "srv"},
15 : {'text' : "{blue}Server processing KMS Activation Request...{end}", 'where' : "srv"},
16 : {'text' : "{blue}Server processing KMS Activation Response...{end}", 'where' : "srv"},
17 : {'text' : "{blue}Server generating RPC Activation Response...{end}", 'where' : "srv"},
18 : {'text' : "{blue}Server sending RPC Activation Response...\t\t\t{end}{white}===============>{end}", 'where' : "srv"},
19 : {'text' : "{green}{bold}\nServer responded, now in Stand by...\n{end}", 'where' : "srv"},
20 : {'text' : "{white}===============>{end}{blue}\tClient received Response !!!{end}", 'where' : "clt"},
21 : {'text' : "{green}{bold}\t\t\tActivation Done !!!{end}", 'where' : "clt"},
-1 : {'text' : "{white}Server receiving{end}", 'where' : "clt"},
-2 : {'text' : "{white}\t\t\t\t\t\t\t\tClient sending{end}", 'where' : "srv"},
-3 : {'text' : "{white}\t\t\t\t\t\t\t\tClient receiving{end}", 'where' : "srv"},
-4 : {'text' : "{white}Server sending{end}", 'where' : "clt"},
MsgMap = {0 : {'text' : "{yellow}\n\t\t\tClient generating RPC Bind Request...{end}", 'align' : ()},
1 : {'text' : "{white}<==============={end}{yellow}\tClient sending RPC Bind Request...{end}", 'align' : ()},
2 : {'text' : "{yellow}Server received RPC Bind Request !!!\t\t\t\t{end}{white}<==============={end}", 'align' : ()},
3 : {'text' : "{yellow}Server parsing RPC Bind Request...{end}", 'align' : ()},
4 : {'text' : "{yellow}Server generating RPC Bind Response...{end}", 'align' : ()},
5 : {'text' : "{yellow}Server sending RPC Bind Response...\t\t\t\t{end}{white}===============>{end}", 'align' : ()},
6 : {'text' : "{green}{bold}\nRPC Bind acknowledged !!!{end}", 'align' : ()},
7 : {'text' : "{white}===============>{end}{yellow}\tClient received RPC Bind Response !!!{end}", 'align' : ()},
8 : {'text' : "{green}{bold}\t\t\tRPC Bind acknowledged !!!{end}", 'align' : ()},
9 : {'text' : "{blue}\t\t\tClient generating Activation Request dictionary...{end}", 'align' : ()},
10 : {'text' : "{blue}\t\t\tClient generating Activation Request data...{end}", 'align' : ()},
11 : {'text' : "{blue}\t\t\tClient generating RPC Activation Request...{end}", 'align' : ()},
12 : {'text' : "{white}<==============={end}{blue}\tClient sending RPC Activation Request...{end}", 'align' : ()},
13 : {'text' : "{blue}Server received RPC Activation Request !!!\t\t\t{end}{white}<==============={end}", 'align' : ()},
14 : {'text' : "{blue}Server parsing RPC Activation Request...{end}", 'align' : ()},
15 : {'text' : "{blue}Server processing KMS Activation Request...{end}", 'align' : ()},
16 : {'text' : "{blue}Server processing KMS Activation Response...{end}", 'align' : ()},
17 : {'text' : "{blue}Server generating RPC Activation Response...{end}", 'align' : ()},
18 : {'text' : "{blue}Server sending RPC Activation Response...\t\t\t{end}{white}===============>{end}", 'align' : ()},
19 : {'text' : "{green}{bold}\nServer responded, now in Stand by...\n{end}", 'align' : ()},
20 : {'text' : "{white}===============>{end}{blue}\tClient received Response !!!{end}", 'align' : ()},
21 : {'text' : "{green}{bold}\t\t\tActivation Done !!!{end}", 'align' : ()},
-1 : {'text' : "{white}Server receiving{end}", 'align' : ()},
-2 : {'text' : "{white}\t\t\t\t\t\t\t\tClient sending{end}", 'align' : ()},
-3 : {'text' : "{white}\t\t\t\t\t\t\t\tClient receiving{end}", 'align' : ()},
-4 : {'text' : "{white}Server sending{end}", 'align' : ()},
}
def MsgMap_unformat(messagelist):
def unformat_message(symbolic_string_list):
""" `symbolic_string_list` : a list of strings with symbolic formattation, example:
symbolic_string_list = ["{yellow}\tPluto\n{end}",
"{reverse}{blue}======>{end}{red}\t\tPaperino{end}"]
>>> unformat_message(symbolic_string_list)
>>> [['\tPluto\n'], ['======>', '\t\tPaperino']]
"""
pattern = r"(?<!\{)\{([^}]+)\}(?!\})"
picktxt, pickarrw = [ [] for _ in range(2) ]
for messageitem in messagelist:
picklist = re.sub(pattern, '*', messageitem['text'])
picklist = list(filter(None, picklist.split('*')))
picktxt.append(picklist[0])
for item in symbolic_string_list:
try:
pickarrw.append(picklist[1])
except IndexError:
pass
return picktxt, pickarrw
# only for py-kms MsgMap.
picklist = re.sub(pattern, '*', item['text'])
except:
# generalization.
picklist = re.sub(pattern, '*', item)
picklist = list(filter(None, picklist.split('*')))
picktxt.append(picklist)
return picktxt
def MsgMap_unshell(arrows):
unMsgMap = {}
for key, values in MsgMap.items():
txt = MsgMap_unformat([values])
def unshell_message(ansi_string, count):
""" `ansi_string` : a string with ansi formattation, example:
ansi_string = '\x1b[97mPippo\x1b[0m\n\x1b[94mPluto\t\t\x1b[0m\n\x1b[92m\x1b[1m\nPaperino\n\x1b[0m\n
`count` : int progressive increment for tag.
>>> unshell_message(ansi_string count = 0)
>>> ({'tag00': {'color': 'white', 'extra': [], 'text': 'Pippo'},
'tag01': {'color': 'blue', 'extra': [], 'text': 'Pluto\t\t'}
'tag02': {'color': 'green', 'extra': ['bold'], 'text': '\nPaperino\n'}
}, 3)
"""
ansi_find = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
ansi_list = re.findall(ansi_find, ansi_string)
ansi_indx_start = [ n for n in range(len(ansi_string)) for ansi in list(set(ansi_list)) if ansi_string.find(ansi, n) == n ]
ansi_indx_stop = [ n + len(value) for n, value in zip(ansi_indx_start, ansi_list)]
ansi_indx = sorted(list(set(ansi_indx_start + ansi_indx_stop)))
if txt[0][0] in arrows:
unMsgMap.update({txt[1][0] : values['where']})
else:
unMsgMap.update({txt[0][0] : values['where']})
return unMsgMap
msgcolored = {}
for k in range(len(ansi_indx) - 1):
ansi_value = ansi_string[ansi_indx[k] : ansi_indx[k + 1]]
if ansi_value not in ['\x1b[0m', '\n']:
tagname = "tag" + str(count).zfill(2)
if tagname not in msgcolored:
msgcolored[tagname] = {'color' : '', 'extra' : [], 'text' : ''}
if ansi_value in ColorMapReversed.keys():
msgcolored[tagname]['color'] = ColorMapReversed[ansi_value]
elif ansi_value in ExtraMapReversed.keys():
msgcolored[tagname]['extra'].append(ExtraMapReversed[ansi_value])
else:
msgcolored[tagname]['text'] = ansi_value
else:
if ansi_value != '\n':
count += 1
# Ordering.
msgcolored = OrderedDict(sorted(msgcolored.items()))
return msgcolored, count
#-------------------------------------------------------------------------------------------------------------------------------------------------------
# https://stackoverflow.com/questions/230751/how-to-flush-output-of-print-function
@ -161,7 +195,10 @@ if pyver < (3, 3):
file = kwargs.get('file', sys.stdout)
file.flush() if file is not None else sys.stdout.flush()
# https://ryanjoneil.github.io/posts/2014-02-14-capturing-stdout-in-a-python-child-process.html
# based on: https://ryanjoneil.github.io/posts/2014-02-14-capturing-stdout-in-a-python-child-process.html,
# but not using threading/multiprocessing so:
# 1) message visualization order preserved.
# 2) newlines_count function output not wrong.
class ShellMessage(object):
view = True
count, remain, numlist = (0, 0, [])
@ -170,19 +207,20 @@ class ShellMessage(object):
# Capture string sent to stdout.
def write(self, s):
StringIO.write(self, s)
class Process(object):
def __init__(self, nshell, get_text = False, put_text = None):
def __init__(self, nshell, get_text = False, put_text = None, where = 'srv'):
self.nshell = nshell
self.get_text = get_text
self.put_text = put_text
self.print_queue = Queue.Queue()
self.where = where
self.plaintext = []
self.path = os.path.dirname(os.path.abspath( __file__ )) + '/newlines.txt'
self.print_queue = Queue.Queue()
def formatter(self, msgtofrmt):
if self.newlines:
text = MsgMap_unformat([msgtofrmt])[0][0]
text = unformat_message([msgtofrmt])[0][0]
msgtofrmt = msgtofrmt['text'].replace(text, self.newlines * '\n' + text)
self.newlines = 0
else:
@ -194,40 +232,7 @@ class ShellMessage(object):
pass
self.msgfrmt = msgtofrmt.format(**ColorExtraMap)
if self.get_text:
self.plaintext.append(unshell_message(self.msgfrmt, m = 0)[0]["tag00"]['text'])
def run(self):
if not ShellMessage.view:
if self.get_text:
self.newlines = 0
if self.put_text is not None:
for msg in self.put_text:
self.formatter(msg)
else:
for num in self.nshell:
self.formatter(MsgMap[num])
return self.plaintext
else:
return
# Start thread process.
print_thread = threading.Thread(target = self.spawn(), args=(self.print_queue,))
print_thread.setDaemon(True)
print_thread.start()
# Do something with output.
toprint = self.read(0.1) # 0.1 s to let the shell output the result
# Redirect output.
if sys.stdout.isatty():
print(toprint)
else:
try:
from pykms_GuiBase import gui_redirect # Import after variables creation.
gui_redirect(toprint)
except:
print(toprint)
# Get string/s printed.
if self.get_text:
return self.plaintext
self.plaintext.append(unshell_message(self.msgfrmt, count = 0)[0]["tag00"]['text'])
def newlines_file(self, mode, *args):
try:
@ -238,15 +243,16 @@ class ShellMessage(object):
data = [int(i) for i in [line.rstrip('\n') for line in file.readlines()]]
self.newlines, ShellMessage.remain = data[0], sum(data[1:])
except:
with open(self.path, 'w') as file: pass
with open(self.path, 'w') as file:
pass
def newlines_count(self, num):
ShellMessage.count += MsgMap[num]['text'].count('\n')
if num >= 0:
ShellMessage.numlist.append(num)
if self.continuecount:
# Note: bypassed '\n' counted after message with arrow,
# isn't: str(len(ShellMessage.numlist) + ShellMessage.count)
# Note: is bypassed '\n' counted after message with arrow,
# so isn't: str(len(ShellMessage.numlist) + ShellMessage.count)
towrite = str(len(ShellMessage.numlist)) + '\n'
self.newlines_file('a', towrite)
ShellMessage.count, ShellMessage.numlist = (0, [])
@ -260,9 +266,41 @@ class ShellMessage(object):
elif num in [-2 ,-4]:
self.newlines_file('r')
if num == 21:
ShellMessage.count, ShellMessage.remain, ShellMessage.numlist = (0, 0, [])
os.remove(self.path)
def spawn(self):
def run(self):
# view = False part.
if not ShellMessage.view:
if self.get_text:
self.newlines = 0
if self.put_text is not None:
for msg in self.put_text:
self.formatter(msg)
else:
for num in self.nshell:
self.formatter(MsgMap[num])
return self.plaintext
else:
return
# Do job.
self.produce()
toprint = self.consume(timeout = 0.1)
# Redirect output.
if sys.stdout.isatty():
print(toprint)
else:
try:
# Import after variables creation.
from pykms_GuiBase import gui_redirect
gui_redirect(toprint, self.where)
except:
print(toprint)
# Get string/s printed.
if self.get_text:
return self.plaintext
def produce(self):
# Save everything that would otherwise go to stdout.
outstream = ShellMessage.Collect()
sys.stdout = outstream
@ -275,6 +313,7 @@ class ShellMessage(object):
if self.put_text is not None:
for msg in self.put_text:
ShellMessage.count += msg.count('\n')
# Append a dummy element.
ShellMessage.numlist.append('put')
self.formatter(msg)
print(self.msgfrmt, end = '\n', flush = True)
@ -283,6 +322,8 @@ class ShellMessage(object):
self.newlines_count(num)
self.formatter(MsgMap[num])
print(self.msgfrmt, end = '\n', flush = True)
except Exception as e:
print(e, end = '\n', flush = True)
finally:
# Restore stdout and send content.
sys.stdout = sys.__stdout__
@ -291,7 +332,7 @@ class ShellMessage(object):
except Queue.Full:
pass
def read(self, timeout = None):
def consume(self, timeout = None):
try:
toprint = self.print_queue.get(block = timeout is not None, timeout = timeout)
self.print_queue.task_done()
@ -299,34 +340,54 @@ class ShellMessage(object):
except Queue.Empty:
return None
def pretty_printer(**kwargs):
"""kwargs:
`log_obj` --> if logging object specified the text not ansi
formatted is logged.
`get_text` --> if True obtain text not ansi formatted,
after printing it with ansi formattation.
`put_text` --> a string or list of strings with ansi formattation.
if None refer to `num_text` for printing process.
`num_text` --> a number or list of numbers refering numbered message map.
if None `put_text` must be defined for printing process.
`to_exit ` --> if True system exit is called.
`where` --> specifies if message is server-side or client-side
(useful for GUI redirect).
"""
# Set defaults for not defined options.
options = {'log_obj' : None,
'get_text' : False,
'put_text' : None,
'num_text' : None,
'to_exit' : False,
'where' : 'srv'
}
options.update(kwargs)
# Check options.
if (options['num_text'] is None) and (options['put_text'] is None):
raise ValueError('One of `num_text` and `put_text` must be provided.')
elif (options['num_text'] is not None) and (options['put_text'] is not None):
raise ValueError('These parameters are mutually exclusive.')
def unshell_message(ansi_string, m):
ansi_find = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
ansi_list = re.findall(ansi_find, ansi_string)
ansi_indx_start = [ n for n in range(len(ansi_string)) for ansi in list(set(ansi_list)) if ansi_string.find(ansi, n) == n ]
ansi_indx_stop = [ n + len(value) for n, value in zip(ansi_indx_start, ansi_list)]
ansi_indx = sorted(list(set(ansi_indx_start + ansi_indx_stop)))
if (options['num_text'] is not None) and (not isinstance(options['num_text'], list)):
options['num_text'] = [options['num_text']]
if (options['put_text'] is not None) and (not isinstance(options['put_text'], list)):
options['put_text'] = [options['put_text']]
msgcolored = {}
ColorMapReversed = dict(zip(ColorMap.values(), ColorMap.keys()))
ExtraMapReversed = dict(zip(ExtraMap.values(), ExtraMap.keys()))
# Overwrite `get_text` (used as hidden).
if options['put_text']:
options['get_text'] = True
elif options['num_text']:
options['get_text'] = False
for k in range(len(ansi_indx) - 1):
ansi_value = ansi_string[ansi_indx[k] : ansi_indx[k + 1]]
if ansi_value != '\x1b[0m':
tagname = "tag" + str(m).zfill(2)
if tagname not in msgcolored:
msgcolored[tagname] = {'color' : '', 'extra' : [], 'text' : ''}
if ansi_value in ColorMapReversed.keys():
msgcolored[tagname]['color'] = ColorMapReversed[ansi_value]
elif ansi_value in ExtraMapReversed.keys():
msgcolored[tagname]['extra'].append(ExtraMapReversed[ansi_value])
else:
msgcolored[tagname]['text'] = ansi_value
else:
m += 1
# Ordering.
msgcolored = dict(sorted(msgcolored.items()))
return msgcolored, m
# Process messages.
plain_messages = ShellMessage.Process(options['num_text'],
get_text = options['get_text'],
put_text = options['put_text'],
where = options['where']).run()
if options['log_obj']:
for plain_message in plain_messages:
options['log_obj'](plain_message)
if options['to_exit']:
sys.exit(1)

View file

@ -3,6 +3,7 @@
import os
import sys
import threading
from time import sleep
try:
# Python 2.x imports
@ -19,10 +20,11 @@ except ImportError:
from tkinter import filedialog
import tkinter.font as tkFont
from pykms_Server import srv_options, srv_version, srv_config, serverqueue, serverthread
from pykms_GuiMisc import ToolTip, TextDoubleScroll, TextRedirect, custom_background, make_clear
from pykms_Server import srv_options, srv_version, srv_config, srv_terminate, serverqueue, serverthread
from pykms_GuiMisc import ToolTip, TextDoubleScroll, TextRedirect, custom_background
from pykms_Client import clt_options, clt_version, clt_config, clt_main
from pykms_Misc import check_logfile
gui_description = 'py-kms GUI'
gui_version = 'v1.0'
@ -55,11 +57,11 @@ def switch_dir(path):
else:
return
def gui_redirect(str_to_print):
def gui_redirect(str_to_print, where):
global txsrv, txclt, txcol, rclt
try:
TextRedirect.StdoutRedirect(txsrv, txclt, txcol, rclt, str_to_print)
TextRedirect.StdoutRedirect(txsrv, txclt, txcol, rclt, str_to_print, where)
except:
print(str_to_print)
@ -76,6 +78,7 @@ class KmsGui(tk.Tk):
def __init__(self, *args, **kwargs):
tk.Tk.__init__(self, *args, **kwargs)
self.wraplength = 200
serverthread.with_gui = True
## Define fonts and colors.
self.btnwinfont = tkFont.Font(family = 'Times', size = 12, weight = 'bold')
@ -131,14 +134,14 @@ class KmsGui(tk.Tk):
## Create widgets (btnsrvwin) -----------------------------------------------------------------------------------------------------------
self.statesrv = tk.Label(self.btnsrvwin, text = 'Server\nState:\nStopped', font = self.othfont, foreground = self.customcolors['red'])
self.runbtnsrv = tk.Button(self.btnsrvwin, text = 'START\nSERVER', background = self.customcolors['green'],
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont, command = self.srv_clickedmain)
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont, command = self.srv_on_start)
self.shbtnclt = tk.Button(self.btnsrvwin, text = 'SHOW\nCLIENT', background = self.customcolors['magenta'],
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont, command = self.clt_showhide)
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont, command = self.clt_on_show)
self.clearbtnsrv = tk.Button(self.btnsrvwin, text = 'CLEAR', background = self.customcolors['orange'],
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont,
command = lambda: make_clear([txsrv, txclt]))
command = lambda: self.on_clear([txsrv, txclt]))
self.exitbtnsrv = tk.Button(self.btnsrvwin, text = 'EXIT', background = self.customcolors['black'],
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont, command = self.destroy)
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont, command = self.on_exit)
## Layout widgets (btnsrvwin)
self.statesrv.grid(row = 0, column = 0, padx = 2, pady = 2, sticky = 'ew')
@ -197,6 +200,7 @@ class KmsGui(tk.Tk):
filelbl = tk.Label(self.optsrvwin, text = 'Logfile Path / Name: ', font = self.optfont)
self.file = tk.Entry(self.optsrvwin, width = 10, font = self.optfont)
self.file.insert('end', srv_options['lfile']['def'])
self.file.xview_moveto(1)
ToolTip(self.file, text = srv_options['lfile']['help'], wraplength = self.wraplength)
filebtnwin = tk.Button(self.optsrvwin, text = 'Browse', command = lambda: self.browse(self.file, srv_options))
# Loglevel.
@ -286,7 +290,7 @@ class KmsGui(tk.Tk):
# Create widgets (btncltwin) ------------------------------------------------------------------------------------------------------------
self.runbtnclt = tk.Button(self.btncltwin, text = 'START\nCLIENT', background = self.customcolors['blue'],
foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont,
state = 'disabled', command = self.clt_clickedstart)
state = 'disabled', command = self.clt_on_start)
#self.othbutt = tk.Button(self.btncltwin, text = 'Botton\n2', background = self.customcolors['green'],
# foreground = self.customcolors['white'], relief = 'flat', font = self.btnwinfont)
@ -328,6 +332,7 @@ class KmsGui(tk.Tk):
cltfilelbl = tk.Label(self.optcltwin, text = 'Logfile Path / Name: ', font = self.optfont)
self.cltfile = tk.Entry(self.optcltwin, width = 10, font = self.optfont)
self.cltfile.insert('end', clt_options['lfile']['def'])
self.cltfile.xview_moveto(1)
ToolTip(self.cltfile, text = clt_options['lfile']['help'], wraplength = self.wraplength)
cltfilebtnwin = tk.Button(self.optcltwin, text = 'Browse', command = lambda: self.browse(self.cltfile, clt_options))
# Loglevel.
@ -366,7 +371,7 @@ class KmsGui(tk.Tk):
except TypeError:
return value
def clt_showhide(self, force = False):
def clt_on_show(self, force = False):
if self.optcltwin.winfo_ismapped() or force:
self.shbtnclt['text'] = 'SHOW\nCLIENT'
self.optcltwin.grid_remove()
@ -378,19 +383,28 @@ class KmsGui(tk.Tk):
self.msgcltwin.grid()
self.btncltwin.place(x = self.btncltwin_X, y = self.btncltwin_Y, bordermode = 'inside', anchor = 'nw')
def srv_clickedmain(self):
def srv_on_start(self):
if self.runbtnsrv['text'] == 'START\nSERVER':
if self.srv_clickedstart():
if self.srv_actions_start():
self.on_clear([txsrv, txclt])
self.runbtnsrv.configure(text = 'STOP\nSERVER', background = self.customcolors['red'],
foreground = self.customcolors['white'])
self.runbtnclt.configure(state = 'normal')
# run thread for interrupting.
self.ejectthread = threading.Thread(target = self.srv_eject, name = "Thread-Ejt")
self.ejectthread.setDaemon(True)
self.ejectthread.start()
elif self.runbtnsrv['text'] == 'STOP\nSERVER':
self.srv_clickedstop()
self.runbtnsrv.configure(text = 'START\nSERVER', background = self.customcolors['green'],
foreground = self.customcolors['white'])
self.runbtnclt.configure(state = 'disabled')
def srv_clickedstart(self):
serverthread.terminate_eject()
def srv_eject(self):
while not serverthread.eject:
sleep(0.1)
self.srv_actions_stop()
def srv_actions_start(self):
ok = False
if switch_dir(os.path.dirname(self.file.get())):
if self.file.get().lower().endswith('.log'):
@ -403,44 +417,49 @@ class KmsGui(tk.Tk):
srv_config[srv_options['count']['des']] = self.proper_none(self.count.get())
srv_config[srv_options['activation']['des']] = int(self.activ.get())
srv_config[srv_options['renewal']['des']] = int(self.renew.get())
srv_config[srv_options['lfile']['des']] = self.file.get()
srv_config[srv_options['lfile']['des']] = check_logfile(self.file.get(), srv_options['lfile']['def'])
srv_config[srv_options['llevel']['des']] = self.level.get()
srv_config[srv_options['sql']['des']] = self.chkval.get()
## TODO.
srv_config[srv_options['lsize']['des']] = 0
srv_config[srv_options['time']['des']] = 30
srv_config[srv_options['time']['des']] = None
serverqueue.put('start')
# wait for switch.
while not serverthread.is_running:
while not serverthread.is_running_server:
pass
self.srv_togglestate()
self.srv_toggle()
ok = True
else:
messagebox.showerror('Invalid extension', 'Not a .log file !')
else:
messagebox.showerror('Invalid path', 'Path you have provided not found !')
return ok
def srv_clickedstop(self):
if serverthread.is_running:
serverqueue.put('stop')
serverthread.server.shutdown()
def srv_actions_stop(self):
if serverthread.is_running_server:
srv_terminate(exit_server = True)
# wait for switch.
while serverthread.is_running:
while serverthread.is_running_server:
pass
self.srv_togglestate()
self.srv_toggle()
self.runbtnsrv.configure(text = 'START\nSERVER', background = self.customcolors['green'],
foreground = self.customcolors['white'])
self.runbtnclt.configure(state = 'disabled')
def srv_togglestate(self):
if serverthread.is_running:
def srv_toggle(self):
if serverthread.is_running_server:
txt, color = ('Server\nState:\nServing', self.customcolors['green'])
else:
txt, color = ('Server\nState:\nStopped', self.customcolors['red'])
self.statesrv.configure(text = txt, foreground = color)
def clt_clickedstart(self):
def clt_on_start(self):
self.on_clear([txsrv, txclt])
self.clt_actions_start()
def clt_actions_start(self):
if switch_dir(os.path.dirname(self.cltfile.get())):
if self.cltfile.get().lower().endswith('.log'):
# Load dict.
@ -449,16 +468,28 @@ class KmsGui(tk.Tk):
clt_config[clt_options['mode']['des']] = self.cltmode.get()
clt_config[clt_options['cmid']['des']] = self.proper_none(self.cltcmid.get())
clt_config[clt_options['name']['des']] = self.proper_none(self.cltname.get())
clt_config[clt_options['lfile']['des']] = self.cltfile.get()
clt_config[clt_options['lfile']['des']] = check_logfile(self.cltfile.get(), clt_options['lfile']['def'])
clt_config[clt_options['llevel']['des']] = self.cltlevel.get()
## TODO
clt_config[clt_options['lsize']['des']] = 0
clientthread = threading.Thread(target = clt_main, args=(True,))
clientthread.setDaemon(True)
clientthread.start()
self.clientthread = threading.Thread(target = clt_main, name = 'Thread-Clt', args=(True,))
self.clientthread.setDaemon(True)
self.clientthread.start()
else:
messagebox.showerror('Invalid extension', 'Not a .log file !')
else:
messagebox.showerror('Invalid path', 'Path you have provided not found !')
def on_exit(self):
if serverthread.is_running_server:
srv_terminate(exit_server = True)
srv_terminate(exit_thread = True)
self.destroy()
def on_clear(self, widgetlist):
for widget in widgetlist:
widget.configure(state = 'normal')
widget.delete('1.0', 'end')
widget.configure(state = 'disabled')

View file

@ -16,7 +16,7 @@ except ImportError:
from tkinter import ttk
import tkinter.font as tkFont
from pykms_Format import unshell_message, MsgMap, pick_MsgMap, unshell_MsgMap
from pykms_Format import MsgMap, unshell_message, unformat_message
#---------------------------------------------------------------------------------------------------------------------------------------------------------
@ -122,73 +122,51 @@ class ToolTip(object):
# https://stackoverflow.com/questions/7217715/threadsafe-printing-across-multiple-processes-python-2-x
# https://stackoverflow.com/questions/3029816/how-do-i-get-a-thread-safe-print-in-python-2-6
# https://stackoverflow.com/questions/20303291/issue-with-redirecting-stdout-to-tkinter-text-widget-with-threads
def make_clear(widgetlist):
for widget in widgetlist:
widget.configure(state = 'normal')
widget.delete('1.0', 'end')
widget.configure(state = 'disabled')
class TextRedirect(object):
class StdoutRedirect(object):
tag_num = 0
listwhere = []
arrows, clt_msg_nonewline = pick_MsgMap([MsgMap[1], MsgMap[7], MsgMap[12], MsgMap[20]])
srv_msg_nonewline, _ = pick_MsgMap([MsgMap[2], MsgMap[5], MsgMap[13], MsgMap[18]])
grpmsg = unformat_message([MsgMap[1], MsgMap[7], MsgMap[12], MsgMap[20]])
arrows = [ item[0] for item in grpmsg ]
clt_msg_nonewline = [ item[1] for item in grpmsg ]
arrows = list(set(arrows))
lenarrow = len(arrows[0])
unMsgMap = unshell_MsgMap(arrows)
def __init__(self, srv_text_space, clt_text_space, customcolors, runclt, str_to_print):
srv_msg_nonewline = [ item[0] for item in unformat_message([MsgMap[2], MsgMap[5], MsgMap[13], MsgMap[18]]) ]
terminator = unformat_message([MsgMap[21]])[0][0]
def __init__(self, srv_text_space, clt_text_space, customcolors, runclt, str_to_print, where):
self.srv_text_space = srv_text_space
self.clt_text_space = clt_text_space
self.customcolors = customcolors
self.runclt = runclt
self.runclt.configure(state = 'disabled')
self.str_to_print = str_to_print
self.where = where
self.textbox_do()
def textbox_finish(self, message):
if all(x == "srv" for x in TextRedirect.StdoutRedirect.listwhere):
terminator = pick_MsgMap([MsgMap[19]])[0]
else:
terminator = pick_MsgMap([MsgMap[21]])[0]
if message in terminator:
TextRedirect.StdoutRedirect.tag_num = 0
self.runclt.configure(state = 'normal')
def textbox_clear(self):
if TextRedirect.StdoutRedirect.tag_num == 0:
# Clear "srv" and "clt" textboxs.
make_clear([self.srv_text_space, self.clt_text_space])
if message == self.terminator:
TextRedirect.StdoutRedirect.tag_num = 0
self.runclt.configure(state = 'normal')
def textbox_write(self, tag, message, color, extras):
widget = self.textbox_choose(message)
TextRedirect.StdoutRedirect.listwhere.append(self.where)
self.maxchar = widget['width']
self.textbox_color(tag, widget, color, self.customcolors['black'], extras)
widget.configure(state = 'normal')
widget.insert('end', self.textbox_format(message), tag)
widget.see('end')
self.textbox_color(tag, widget, color, self.customcolors['black'], extras)
widget.after(100, widget.see('end'))
widget.configure(state = 'disabled')
self.textbox_finish(message)
def textbox_choose(self, message):
if message not in self.arrows:
self.remind = message
self.where = self.unMsgMap[message]
if self.where == "srv":
return self.srv_text_space
elif self.where == "clt":
return self.clt_text_space
else:
if self.remind in self.srv_msg_nonewline:
self.where = "srv"
return self.srv_text_space
else:
self.where = "clt"
return self.clt_text_space
if self.where == "srv":
self.srv_text_space.focus_set()
return self.srv_text_space
elif self.where == "clt":
self.clt_text_space.focus_set()
return self.clt_text_space
def textbox_color(self, tag, widget, forecolor = 'white', backcolor = 'black', extras = []):
xfont = tkFont.Font(font = widget['font'])
@ -202,8 +180,9 @@ class TextRedirect(object):
xfont.text_font.configure(underline = True)
elif extra == 'strike':
xfont.configure(overstrike = True)
widget.tag_configure(tag, foreground = forecolor, background = backcolor, font = xfont)
widget.tag_add(tag, "insert linestart", "insert lineend")
def textbox_format(self, message):
lenfixed = self.maxchar - len(message.replace('\t', ''))
@ -231,7 +210,6 @@ class TextRedirect(object):
return message
def textbox_do(self):
self.textbox_clear()
msgs, TextRedirect.StdoutRedirect.tag_num = unshell_message(self.str_to_print, TextRedirect.StdoutRedirect.tag_num)
for tag in msgs:
self.textbox_write(tag, msgs[tag]['text'], self.customcolors[msgs[tag]['color']], msgs[tag]['extra'])
@ -327,7 +305,7 @@ def custom_background(window):
widget.configure(background = window.customcolors['lavender'])
# Hide client.
window.clt_showhide(force = True)
window.clt_on_show(force = True)
# Show Gui.
window.deiconify()

View file

@ -5,7 +5,7 @@ import logging
import os
import argparse
from logging.handlers import RotatingFileHandler
from pykms_Format import ColorExtraMap, ShellMessage
from pykms_Format import ColorExtraMap, pretty_printer
#-----------------------------------------------------------------------------------------------------------------------------------------------------------
@ -163,53 +163,6 @@ def check_logfile(optionlog, defaultlog):
checkdir(optionlog[0])
return optionlog
def pretty_printer(**kwargs):
"""kwargs:
`log_obj` --> if logging object specified the text not ansi
formatted is logged.
`get_text` --> if True obtain text not ansi formatted,
after printing it with ansi formattation.
`put_text` --> a string or list of strings with ansi formattation.
if None refer to `num_text` for printing process.
`num_text` --> a number or list of numbers refering numbered message map.
if None `put_text` must be defined for printing process.
`to_exit ` --> if True system exit is called.
"""
# Set defaults for not defined options.
options = {'log_obj' : None,
'get_text' : False,
'put_text' : None,
'num_text' : None,
'to_exit' : False,
}
options.update(kwargs)
# Check options.
if (options['num_text'] is None) and (options['put_text'] is None):
raise ValueError('One of `num_text` and `put_text` must be provided.')
elif (options['num_text'] is not None) and (options['put_text'] is not None):
raise ValueError('These parameters are mutually exclusive.')
if (options['num_text'] is not None) and (not isinstance(options['num_text'], list)):
options['num_text'] = [options['num_text']]
if (options['put_text'] is not None) and (not isinstance(options['put_text'], list)):
options['put_text'] = [options['put_text']]
# Overwrite `get_text` (used as hidden).
if options['put_text']:
options['get_text'] = True
elif options['num_text']: # further check.
options['get_text'] = False
# Process messages.
plain_messages = ShellMessage.Process(options['num_text'], get_text = options['get_text'], put_text = options['put_text']).run()
if options['log_obj']:
for plain_message in plain_messages:
options['log_obj'](plain_message)
if options['to_exit']:
sys.exit(1)
#----------------------------------------------------------------------------------------------------------------------------------------------------------
# Valid language identifiers to be used in the EPID (see "kms.c" in vlmcsd)

View file

@ -7,8 +7,7 @@ import logging
from pykms_Base import kmsBase
from pykms_Structure import Structure
from pykms_Aes import AES
from pykms_Format import justify, byterize, enco, deco
from pykms_Misc import pretty_printer
from pykms_Format import justify, byterize, enco, deco, pretty_printer
#---------------------------------------------------------------------------------------------------------------------------------------------------------
@ -106,7 +105,7 @@ class kmsRequestV4(kmsBase):
response['padding'] = bytes(bytearray(self.getPadding(bodyLength)))
## Debug stuff.
pretty_printer(num_text = 16)
pretty_printer(num_text = 16, where = "srv")
response = byterize(response)
loggersrv.debug("KMS V4 Response: \n%s\n" % justify(response.dump(print_to_stdout = False)))
loggersrv.debug("KMS V4 Response Bytes: \n%s\n" % justify(deco(binascii.b2a_hex(enco(str(response), 'latin-1')), 'utf-8')))
@ -125,7 +124,7 @@ class kmsRequestV4(kmsBase):
request['padding'] = bytes(bytearray(self.getPadding(bodyLength)))
## Debug stuff.
pretty_printer(num_text = 10)
pretty_printer(num_text = 10, where = "clt")
request = byterize(request)
loggersrv.debug("Request V4 Data: \n%s\n" % justify(request.dump(print_to_stdout = False)))
loggersrv.debug("Request V4: \n%s\n" % justify(deco(binascii.b2a_hex(enco(str(request), 'latin-1')), 'utf-8')))

View file

@ -8,8 +8,7 @@ import random
import pykms_Aes as aes
from pykms_Base import kmsBase
from pykms_Structure import Structure
from pykms_Format import justify, byterize, enco, deco
from pykms_Misc import pretty_printer
from pykms_Format import justify, byterize, enco, deco, pretty_printer
#--------------------------------------------------------------------------------------------------------------------------------------------------------
@ -141,7 +140,7 @@ class kmsRequestV5(kmsBase):
response['encrypted'] = bytes(bytearray(encryptedResponse))
response['padding'] = bytes(bytearray(self.getPadding(bodyLength)))
pretty_printer(num_text = 16)
pretty_printer(num_text = 16, where = "srv")
response = byterize(response)
loggersrv.info("KMS V%d Response: \n%s\n" % (self.ver, justify(response.dump(print_to_stdout = False))))
loggersrv.info("KMS V%d Structure Bytes: \n%s\n" % (self.ver, justify(deco(binascii.b2a_hex(enco(str(response), 'latin-1')), 'utf-8'))))
@ -173,7 +172,7 @@ class kmsRequestV5(kmsBase):
request['versionMajor'] = requestBase['versionMajor']
request['message'] = message
pretty_printer(num_text = 10)
pretty_printer(num_text = 10, where = "clt")
request = byterize(request)
loggersrv.info("Request V%d Data: \n%s\n" % (self.ver, justify(request.dump(print_to_stdout = False))))
loggersrv.info("Request V%d: \n%s\n" % (self.ver, justify(deco(binascii.b2a_hex(enco(str(request), 'latin-1')), 'utf-8'))))

View file

@ -7,8 +7,7 @@ import uuid
import pykms_RpcBase
from pykms_Dcerpc import MSRPCHeader, MSRPCBindAck
from pykms_Structure import Structure
from pykms_Format import justify, byterize, enco, deco
from pykms_Misc import pretty_printer
from pykms_Format import justify, byterize, enco, deco, pretty_printer
#--------------------------------------------------------------------------------------------------------------------------------------------------------
@ -78,7 +77,7 @@ class MSRPCBind(Structure):
class handler(pykms_RpcBase.rpcBase):
def parseRequest(self):
request = MSRPCHeader(self.data)
pretty_printer(num_text = 3)
pretty_printer(num_text = 3, where = "srv")
request = byterize(request)
loggersrv.debug("RPC Bind Request Bytes: \n%s\n" % justify(deco(binascii.b2a_hex(self.data), 'utf-8')))
loggersrv.debug("RPC Bind Request: \n%s\n%s\n" % (justify(request.dump(print_to_stdout = False)),
@ -122,7 +121,7 @@ class handler(pykms_RpcBase.rpcBase):
resp = preparedResponses[ts_uuid]
response['ctx_items'] += str(resp)
pretty_printer(num_text = 4)
pretty_printer(num_text = 4, where = "srv")
response = byterize(response)
loggersrv.debug("RPC Bind Response: \n%s\n" % justify(response.dump(print_to_stdout = False)))
loggersrv.debug("RPC Bind Response Bytes: \n%s\n" % justify(deco(binascii.b2a_hex(enco(str(response), 'latin-1')), 'utf-8')))
@ -163,7 +162,7 @@ class handler(pykms_RpcBase.rpcBase):
request['call_id'] = self.srv_config['call_id']
request['pduData'] = str(bind)
pretty_printer(num_text = 0)
pretty_printer(num_text = 0, where = "clt")
bind = byterize(bind)
request = byterize(request)
loggersrv.debug("RPC Bind Request: \n%s\n%s\n" % (justify(request.dump(print_to_stdout = False)),

View file

@ -6,8 +6,7 @@ import logging
import pykms_Base
import pykms_RpcBase
from pykms_Dcerpc import MSRPCRequestHeader, MSRPCRespHeader
from pykms_Format import justify, byterize, enco, deco
from pykms_Misc import pretty_printer
from pykms_Format import justify, byterize, enco, deco, pretty_printer
#----------------------------------------------------------------------------------------------------------------------------------------------------------
@ -16,7 +15,7 @@ loggersrv = logging.getLogger('logsrv')
class handler(pykms_RpcBase.rpcBase):
def parseRequest(self):
request = MSRPCRequestHeader(self.data)
pretty_printer(num_text = 14)
pretty_printer(num_text = 14, where = "srv")
request = byterize(request)
loggersrv.debug("RPC Message Request Bytes: \n%s\n" % justify(binascii.b2a_hex(self.data).decode('utf-8')))
loggersrv.debug("RPC Message Request: \n%s\n" % justify(request.dump(print_to_stdout = False)))
@ -41,7 +40,7 @@ class handler(pykms_RpcBase.rpcBase):
response['pduData'] = responseData
pretty_printer(num_text = 17)
pretty_printer(num_text = 17, where = "srv")
response = byterize(response)
loggersrv.debug("RPC Message Response: \n%s\n" % justify(response.dump(print_to_stdout = False)))
loggersrv.debug("RPC Message Response Bytes: \n%s\n" % justify(deco(binascii.b2a_hex(enco(str(response), 'latin-1')), 'utf-8')))
@ -60,7 +59,7 @@ class handler(pykms_RpcBase.rpcBase):
request['alloc_hint'] = len(self.data)
request['pduData'] = str(self.data)
pretty_printer(num_text = 11)
pretty_printer(num_text = 11, where = "clt")
request = byterize(request)
loggersrv.debug("RPC Message Request: \n%s\n" % justify(request.dump(print_to_stdout = False)))
loggersrv.debug("RPC Message Request Bytes: \n%s\n" % justify(deco(binascii.b2a_hex(enco(str(request), 'latin-1')), 'utf-8')))

588
py-kms/pykms_Selectors.py Normal file
View file

@ -0,0 +1,588 @@
#!/usr/bin/env python3
"""
SPDX-License-Identifier: MIT
Backport of selectors.py from Python 3.5+ to support Python < 3.4
Also has the behavior specified in PEP 475 which is to retry syscalls
in the case of an EINTR error. This module is required because selectors34
does not follow this behavior and instead returns that no dile descriptor
events have occurred rather than retry the syscall. The decision to drop
support for select.devpoll is made to maintain 100% test coverage.
link: https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/python_modules/urllib3/util/selectors.py
"""
import errno
import math
import select
import socket
import sys
import time
from collections import namedtuple, Mapping
try:
monotonic = time.monotonic
except (AttributeError, ImportError): # Python 3.3<
from pykms_Time import monotonic
EVENT_READ = (1 << 0)
EVENT_WRITE = (1 << 1)
HAS_SELECT = True # Variable that shows whether the platform has a selector.
_SYSCALL_SENTINEL = object() # Sentinel in case a system call returns None.
_DEFAULT_SELECTOR = None
class SelectorError(Exception):
def __init__(self, errcode):
super(SelectorError, self).__init__()
self.errno = errcode
def __repr__(self):
return "<SelectorError errno={0}>".format(self.errno)
def __str__(self):
return self.__repr__()
def _fileobj_to_fd(fileobj):
""" Return a file descriptor from a file object. If
given an integer will simply return that integer back. """
if isinstance(fileobj, int):
fd = fileobj
else:
try:
fd = int(fileobj.fileno())
except (AttributeError, TypeError, ValueError):
raise ValueError("Invalid file object: {0!r}".format(fileobj))
if fd < 0:
raise ValueError("Invalid file descriptor: {0}".format(fd))
return fd
# Determine which function to use to wrap system calls because Python 3.5+
# already handles the case when system calls are interrupted.
if sys.version_info >= (3, 5):
def _syscall_wrapper(func, _, *args, **kwargs):
""" This is the short-circuit version of the below logic
because in Python 3.5+ all system calls automatically restart
and recalculate their timeouts. """
try:
return func(*args, **kwargs)
except (OSError, IOError, select.error) as e:
errcode = None
if hasattr(e, "errno"):
errcode = e.errno
raise SelectorError(errcode)
else:
def _syscall_wrapper(func, recalc_timeout, *args, **kwargs):
""" Wrapper function for syscalls that could fail due to EINTR.
All functions should be retried if there is time left in the timeout
in accordance with PEP 475. """
timeout = kwargs.get("timeout", None)
if timeout is None:
expires = None
recalc_timeout = False
else:
timeout = float(timeout)
if timeout < 0.0: # Timeout less than 0 treated as no timeout.
expires = None
else:
expires = monotonic() + timeout
args = list(args)
if recalc_timeout and "timeout" not in kwargs:
raise ValueError(
"Timeout must be in args or kwargs to be recalculated")
result = _SYSCALL_SENTINEL
while result is _SYSCALL_SENTINEL:
try:
result = func(*args, **kwargs)
# OSError is thrown by select.select
# IOError is thrown by select.epoll.poll
# select.error is thrown by select.poll.poll
# Aren't we thankful for Python 3.x rework for exceptions?
except (OSError, IOError, select.error) as e:
# select.error wasn't a subclass of OSError in the past.
errcode = None
if hasattr(e, "errno"):
errcode = e.errno
elif hasattr(e, "args"):
errcode = e.args[0]
# Also test for the Windows equivalent of EINTR.
is_interrupt = (errcode == errno.EINTR or (hasattr(errno, "WSAEINTR") and
errcode == errno.WSAEINTR))
if is_interrupt:
if expires is not None:
current_time = monotonic()
if current_time > expires:
raise OSError(errno=errno.ETIMEDOUT)
if recalc_timeout:
if "timeout" in kwargs:
kwargs["timeout"] = expires - current_time
continue
if errcode:
raise SelectorError(errcode)
else:
raise
return result
SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
class _SelectorMapping(Mapping):
""" Mapping of file objects to selector keys """
def __init__(self, selector):
self._selector = selector
def __len__(self):
return len(self._selector._fd_to_key)
def __getitem__(self, fileobj):
try:
fd = self._selector._fileobj_lookup(fileobj)
return self._selector._fd_to_key[fd]
except KeyError:
raise KeyError("{0!r} is not registered.".format(fileobj))
def __iter__(self):
return iter(self._selector._fd_to_key)
class BaseSelector(object):
""" Abstract Selector class
A selector supports registering file objects to be monitored
for specific I/O events.
A file object is a file descriptor or any object with a
`fileno()` method. An arbitrary object can be attached to the
file object which can be used for example to store context info,
a callback, etc.
A selector can use various implementations (select(), poll(), epoll(),
and kqueue()) depending on the platform. The 'DefaultSelector' class uses
the most efficient implementation for the current platform.
"""
def __init__(self):
# Maps file descriptors to keys.
self._fd_to_key = {}
# Read-only mapping returned by get_map()
self._map = _SelectorMapping(self)
def _fileobj_lookup(self, fileobj):
""" Return a file descriptor from a file object.
This wraps _fileobj_to_fd() to do an exhaustive
search in case the object is invalid but we still
have it in our map. Used by unregister() so we can
unregister an object that was previously registered
even if it is closed. It is also used by _SelectorMapping
"""
try:
return _fileobj_to_fd(fileobj)
except ValueError:
# Search through all our mapped keys.
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
return key.fd
# Raise ValueError after all.
raise
def register(self, fileobj, events, data=None):
""" Register a file object for a set of events to monitor. """
if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
raise ValueError("Invalid events: {0!r}".format(events))
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
if key.fd in self._fd_to_key:
raise KeyError("{0!r} (FD {1}) is already registered"
.format(fileobj, key.fd))
self._fd_to_key[key.fd] = key
return key
def unregister(self, fileobj):
""" Unregister a file object from being monitored. """
try:
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
# Getting the fileno of a closed socket on Windows errors with EBADF.
except socket.error as e: # Platform-specific: Windows.
if e.errno != errno.EBADF:
raise
else:
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
self._fd_to_key.pop(key.fd)
break
else:
raise KeyError("{0!r} is not registered".format(fileobj))
return key
def modify(self, fileobj, events, data=None):
""" Change a registered file object monitored events and data. """
# NOTE: Some subclasses optimize this operation even further.
try:
key = self._fd_to_key[self._fileobj_lookup(fileobj)]
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
if events != key.events:
self.unregister(fileobj)
key = self.register(fileobj, events, data)
elif data != key.data:
# Use a shortcut to update the data.
key = key._replace(data=data)
self._fd_to_key[key.fd] = key
return key
def select(self, timeout=None):
""" Perform the actual selection until some monitored file objects
are ready or the timeout expires. """
raise NotImplementedError()
def close(self):
""" Close the selector. This must be called to ensure that all
underlying resources are freed. """
self._fd_to_key.clear()
self._map = None
def get_key(self, fileobj):
""" Return the key associated with a registered file object. """
mapping = self.get_map()
if mapping is None:
raise RuntimeError("Selector is closed")
try:
return mapping[fileobj]
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
def get_map(self):
""" Return a mapping of file objects to selector keys """
return self._map
def _key_from_fd(self, fd):
""" Return the key associated to a given file descriptor
Return None if it is not found. """
try:
return self._fd_to_key[fd]
except KeyError:
return None
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# Almost all platforms have select.select()
if hasattr(select, "select"):
class SelectSelector(BaseSelector):
""" Select-based selector. """
def __init__(self):
super(SelectSelector, self).__init__()
self._readers = set()
self._writers = set()
def register(self, fileobj, events, data=None):
key = super(SelectSelector, self).register(fileobj, events, data)
if events & EVENT_READ:
self._readers.add(key.fd)
if events & EVENT_WRITE:
self._writers.add(key.fd)
return key
def unregister(self, fileobj):
key = super(SelectSelector, self).unregister(fileobj)
self._readers.discard(key.fd)
self._writers.discard(key.fd)
return key
def _select(self, r, w, timeout=None):
""" Wrapper for select.select because timeout is a positional arg """
return select.select(r, w, [], timeout)
def select(self, timeout=None):
# Selecting on empty lists on Windows errors out.
if not len(self._readers) and not len(self._writers):
return []
timeout = None if timeout is None else max(timeout, 0.0)
ready = []
r, w, _ = _syscall_wrapper(self._select, True, self._readers,
self._writers, timeout)
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ
if fd in w:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
if hasattr(select, "poll"):
class PollSelector(BaseSelector):
""" Poll-based selector """
def __init__(self):
super(PollSelector, self).__init__()
self._poll = select.poll()
def register(self, fileobj, events, data=None):
key = super(PollSelector, self).register(fileobj, events, data)
event_mask = 0
if events & EVENT_READ:
event_mask |= select.POLLIN
if events & EVENT_WRITE:
event_mask |= select.POLLOUT
self._poll.register(key.fd, event_mask)
return key
def unregister(self, fileobj):
key = super(PollSelector, self).unregister(fileobj)
self._poll.unregister(key.fd)
return key
def _wrap_poll(self, timeout=None):
""" Wrapper function for select.poll.poll() so that
_syscall_wrapper can work with only seconds. """
if timeout is not None:
if timeout <= 0:
timeout = 0
else:
# select.poll.poll() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3)
result = self._poll.poll(timeout)
return result
def select(self, timeout=None):
ready = []
fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.POLLIN:
events |= EVENT_WRITE
if event_mask & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
if hasattr(select, "epoll"):
class EpollSelector(BaseSelector):
""" Epoll-based selector """
def __init__(self):
super(EpollSelector, self).__init__()
self._epoll = select.epoll()
def fileno(self):
return self._epoll.fileno()
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def unregister(self, fileobj):
key = super(EpollSelector, self).unregister(fileobj)
try:
_syscall_wrapper(self._epoll.unregister, False, key.fd)
except SelectorError:
# This can occur when the fd was closed since registry.
pass
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1e3) * 1e-3
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._epoll.close()
super(EpollSelector, self).close()
if hasattr(select, "kqueue"):
class KqueueSelector(BaseSelector):
""" Kqueue / Kevent-based selector """
def __init__(self):
super(KqueueSelector, self).__init__()
self._kqueue = select.kqueue()
def fileno(self):
return self._kqueue.fileno()
def register(self, fileobj, events, data=None):
key = super(KqueueSelector, self).register(fileobj, events, data)
if events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_ADD)
_syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
if events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_ADD)
_syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
return key
def unregister(self, fileobj):
key = super(KqueueSelector, self).unregister(fileobj)
if key.events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
except SelectorError:
pass
if key.events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
except SelectorError:
pass
return key
def select(self, timeout=None):
if timeout is not None:
timeout = max(timeout, 0)
max_events = len(self._fd_to_key) * 2
ready_fds = {}
kevent_list = _syscall_wrapper(self._kqueue.control, True,
None, max_events, timeout)
for kevent in kevent_list:
fd = kevent.ident
event_mask = kevent.filter
events = 0
if event_mask == select.KQ_FILTER_READ:
events |= EVENT_READ
if event_mask == select.KQ_FILTER_WRITE:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
if key.fd not in ready_fds:
ready_fds[key.fd] = (key, events & key.events)
else:
old_events = ready_fds[key.fd][1]
ready_fds[key.fd] = (key, (events | old_events) & key.events)
return list(ready_fds.values())
def close(self):
self._kqueue.close()
super(KqueueSelector, self).close()
if not hasattr(select, 'select'): # Platform-specific: AppEngine
HAS_SELECT = False
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError) as e:
return False
# Choose the best implementation, roughly:
# kqueue == epoll > poll > select. Devpoll not supported. (See above)
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise ValueError('Platform does not have a selector')
return _DEFAULT_SELECTOR()

View file

@ -1,4 +1,5 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import binascii
import re
@ -13,17 +14,21 @@ try:
# Python 2 import.
import SocketServer as socketserver
import Queue as Queue
import pykms_Selectors as selectors
from pykms_Time import monotonic as time
except ImportError:
# Python 3 import.
import socketserver
import queue as Queue
import selectors
from time import monotonic as time
import pykms_RpcBind, pykms_RpcRequest
from pykms_RpcBase import rpcBase
from pykms_Dcerpc import MSRPCHeader
from pykms_Misc import logger_create, check_logfile, check_lcid, pretty_printer
from pykms_Misc import logger_create, check_logfile, check_lcid
from pykms_Misc import KmsParser, KmsException
from pykms_Format import enco, deco, ShellMessage
from pykms_Format import enco, deco, ShellMessage, pretty_printer
srv_description = 'KMS Server Emulator written in Python'
srv_version = 'py-kms_2019-05-15'
@ -33,43 +38,120 @@ srv_config = {}
class KeyServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass):
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass)
self.__shutdown_request = False
self.r_service, self.w_service = os.pipe()
if hasattr(selectors, 'PollSelector'):
self._ServerSelector = selectors.PollSelector
else:
self._ServerSelector = selectors.SelectSelector
def pykms_serve(self):
""" Mixing of socketserver serve_forever() and handle_request() functions,
without elements blocking tkinter.
Handle one request at a time, possibly blocking.
Respects self.timeout.
"""
# Support people who used socket.settimeout() to escape
# pykms_serve() before self.timeout was available.
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
if timeout is not None:
deadline = time() + timeout
try:
# Wait until a request arrives or the timeout expires.
with self._ServerSelector() as selector:
selector.register(fileobj = self, events = selectors.EVENT_READ)
# self-pipe trick.
selector.register(fileobj = self.r_service, events = selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(timeout)
if self.__shutdown_request:
break
if ready == []:
if timeout is not None:
timeout = deadline - time()
if timeout < 0:
return self.handle_timeout()
else:
for key, mask in ready:
if key.fileobj is self:
self._handle_request_noblock()
elif key.fileobj is self.r_service:
# only to clean buffer.
msgkill = os.read(self.r_service, 8).decode('utf-8')
sys.exit(0)
finally:
self.__shutdown_request = False
def shutdown(self):
self.__shutdown_request = True
def handle_timeout(self):
pretty_printer(log_obj = loggersrv.error, to_exit = True,
put_text = "{reverse}{red}{bold}Server connection timed out. Exiting...{end}")
def handle_error(self, request, client_address):
pass
class server_thread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.queue = serverqueue
self.is_running = False
self.daemon = True
def run(self):
while True:
if not self.queue.empty():
item = self.queue.get()
if item == 'start':
self.is_running = True
# Check options.
server_check()
# Create and run threaded server.
self.server = server_create()
try:
while True:
self.server.handle_request()
except KeyboardInterrupt:
pass
finally:
self.server.server_close()
elif item == 'stop':
self.is_running = False
self.server = None
self.queue.task_done()
##-----------------------------------------------------------------------------------------------------------------------------------------------
class server_thread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.server = None
self.is_running_server, self.with_gui = [False for _ in range(2)]
self.is_running_thread = threading.Event()
def terminate_serve(self):
self.server.shutdown()
self.server.server_close()
def terminate_thread(self):
self.is_running_thread.set()
def terminate_eject(self):
os.write(self.server.w_service, u''.encode('utf-8'))
def run(self):
while not self.is_running_thread.is_set():
try:
item = self.queue.get(block = True, timeout = 0.1)
self.queue.task_done()
except Queue.Empty:
continue
else:
try:
if item == 'start':
self.eject = False
self.is_running_server = True
# Check options.
server_check()
# Create and run server.
self.server = server_create()
self.server.pykms_serve()
elif item == 'stop':
self.server = None
self.is_running_server = False
elif item == 'exit':
self.terminate_thread()
except SystemExit as e:
self.eject = True
if not self.with_gui:
raise
else:
continue
##---------------------------------------------------------------------------------------------------------------------------------------------------------
loggersrv = logging.getLogger('logsrv')
@ -190,17 +272,29 @@ def server_create():
loggersrv.info("TCP server listening at %s on port %d." % (srv_config['ip'], srv_config['port']))
loggersrv.info("HWID: %s" % deco(binascii.b2a_hex(srv_config['hwid']), 'utf-8').upper())
return server
def srv_terminate(exit_server = False, exit_thread = False):
if exit_server:
serverthread.terminate_serve()
serverqueue.put('stop')
if exit_thread:
serverqueue.put('exit')
def srv_main_without_gui():
# Parse options.
server_options()
# Run threaded server.
serverqueue.put('start')
serverthread.join()
# Wait to finish.
try:
while serverthread.is_alive():
serverthread.join(timeout = 0.5)
except (KeyboardInterrupt, SystemExit):
srv_terminate(exit_server = True, exit_thread = True)
def srv_main_with_gui(width = 950, height = 660):
import pykms_GuiBase
root = pykms_GuiBase.KmsGui()
root.title(pykms_GuiBase.gui_description + ' ' + pykms_GuiBase.gui_version)
# Main window initial position.
@ -210,10 +304,8 @@ def srv_main_with_gui(width = 950, height = 660):
x = (ws / 2) - (width / 2)
y = (hs / 2) - (height / 2)
root.geometry('+%d+%d' %(x, y))
root.mainloop()
class kmsServerHandler(socketserver.BaseRequestHandler):
def setup(self):
loggersrv.info("Connection accepted: %s:%d" % (self.client_address[0], self.client_address[1]))
@ -235,11 +327,11 @@ class kmsServerHandler(socketserver.BaseRequestHandler):
packetType = MSRPCHeader(self.data)['type']
if packetType == rpcBase.packetType['bindReq']:
loggersrv.info("RPC bind request received.")
pretty_printer(num_text = [-2, 2])
pretty_printer(num_text = [-2, 2], where = "srv")
handler = pykms_RpcBind.handler(self.data, srv_config)
elif packetType == rpcBase.packetType['request']:
loggersrv.info("Received activation request.")
pretty_printer(num_text = [-2, 13])
pretty_printer(num_text = [-2, 13], where = "srv")
handler = pykms_RpcRequest.handler(self.data, srv_config)
else:
pretty_printer(log_obj = loggersrv.error,
@ -250,28 +342,28 @@ class kmsServerHandler(socketserver.BaseRequestHandler):
if packetType == rpcBase.packetType['bindReq']:
loggersrv.info("RPC bind acknowledged.")
pretty_printer(num_text = [-3, 5, 6])
pretty_printer(num_text = [-3, 5, 6], where = "srv")
elif packetType == rpcBase.packetType['request']:
loggersrv.info("Responded to activation request.")
pretty_printer(num_text = [-3, 18, 19])
pretty_printer(num_text = [-3, 18, 19], where = "srv")
try:
self.request.send(res)
if packetType == rpcBase.packetType['request']:
break
except socket.error as e:
pretty_printer(log_obj = loggersrv.error,
put_text = "{reverse}{red}{bold}While sending: %s{end}" %str(e))
break
if packetType == rpcBase.packetType['request']:
break
def finish(self):
self.request.close()
loggersrv.info("Connection closed: %s:%d" % (self.client_address[0], self.client_address[1]))
serverqueue = Queue.Queue(maxsize = 0)
serverthread = server_thread()
serverthread = server_thread(serverqueue)
serverthread.setDaemon(True)
serverthread.start()
if __name__ == "__main__":

View file

@ -9,7 +9,7 @@ try:
except ImportError:
pass
from pykms_Misc import pretty_printer
from pykms_Format import pretty_printer
#--------------------------------------------------------------------------------------------------------------------------------------------------------

174
py-kms/pykms_Time.py Normal file
View file

@ -0,0 +1,174 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
monotonic
~~~~~~~~~
This module provides a ``monotonic()`` function which returns the
value (in fractional seconds) of a clock which never goes backwards.
On Python 3.3 or newer, ``monotonic`` will be an alias of
``time.monotonic`` from the standard library. On older versions,
it will fall back to an equivalent implementation:
+-------------+----------------------------------------+
| Linux, BSD | ``clock_gettime(3)`` |
+-------------+----------------------------------------+
| Windows | ``GetTickCount`` or ``GetTickCount64`` |
+-------------+----------------------------------------+
| OS X | ``mach_absolute_time`` |
+-------------+----------------------------------------+
If no suitable implementation exists for the current platform,
attempting to import this module (or to import from it) will
cause a ``RuntimeError`` exception to be raised.
Copyright 2014, 2015, 2016 Ori Livneh <ori@wikimedia.org>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
link: https://github.com/atdt/monotonic/blob/master/monotonic.py
"""
import time
__all__ = ('monotonic',)
try:
monotonic = time.monotonic
except AttributeError:
import ctypes
import ctypes.util
import os
import sys
import threading
try:
if sys.platform == 'darwin': # OS X, iOS
# See Technical Q&A QA1398 of the Mac Developer Library:
# <https://developer.apple.com/library/mac/qa/qa1398/>
libc = ctypes.CDLL('/usr/lib/libc.dylib', use_errno=True)
class mach_timebase_info_data_t(ctypes.Structure):
"""System timebase info. Defined in <mach/mach_time.h>."""
_fields_ = (('numer', ctypes.c_uint32),
('denom', ctypes.c_uint32))
mach_absolute_time = libc.mach_absolute_time
mach_absolute_time.restype = ctypes.c_uint64
timebase = mach_timebase_info_data_t()
libc.mach_timebase_info(ctypes.byref(timebase))
ticks_per_second = timebase.numer / timebase.denom * 1.0e9
def monotonic():
"""Monotonic clock, cannot go backward."""
return mach_absolute_time() / ticks_per_second
elif sys.platform.startswith('win32') or sys.platform.startswith('cygwin'):
if sys.platform.startswith('cygwin'):
# Note: cygwin implements clock_gettime (CLOCK_MONOTONIC = 4) since
# version 1.7.6. Using raw WinAPI for maximum version compatibility.
# Ugly hack using the wrong calling convention (in 32-bit mode)
# because ctypes has no windll under cygwin (and it also seems that
# the code letting you select stdcall in _ctypes doesn't exist under
# the preprocessor definitions relevant to cygwin).
# This is 'safe' because:
# 1. The ABI of GetTickCount and GetTickCount64 is identical for
# both calling conventions because they both have no parameters.
# 2. libffi masks the problem because after making the call it doesn't
# touch anything through esp and epilogue code restores a correct
# esp from ebp afterwards.
try:
kernel32 = ctypes.cdll.kernel32
except OSError: # 'No such file or directory'
kernel32 = ctypes.cdll.LoadLibrary('kernel32.dll')
else:
kernel32 = ctypes.windll.kernel32
GetTickCount64 = getattr(kernel32, 'GetTickCount64', None)
if GetTickCount64:
# Windows Vista / Windows Server 2008 or newer.
GetTickCount64.restype = ctypes.c_ulonglong
def monotonic():
"""Monotonic clock, cannot go backward."""
return GetTickCount64() / 1000.0
else:
# Before Windows Vista.
GetTickCount = kernel32.GetTickCount
GetTickCount.restype = ctypes.c_uint32
get_tick_count_lock = threading.Lock()
get_tick_count_last_sample = 0
get_tick_count_wraparounds = 0
def monotonic():
"""Monotonic clock, cannot go backward."""
global get_tick_count_last_sample
global get_tick_count_wraparounds
with get_tick_count_lock:
current_sample = GetTickCount()
if current_sample < get_tick_count_last_sample:
get_tick_count_wraparounds += 1
get_tick_count_last_sample = current_sample
final_milliseconds = get_tick_count_wraparounds << 32
final_milliseconds += get_tick_count_last_sample
return final_milliseconds / 1000.0
else:
try:
clock_gettime = ctypes.CDLL(ctypes.util.find_library('c'),
use_errno=True).clock_gettime
except Exception:
clock_gettime = ctypes.CDLL(ctypes.util.find_library('rt'),
use_errno=True).clock_gettime
class timespec(ctypes.Structure):
"""Time specification, as described in clock_gettime(3)."""
_fields_ = (('tv_sec', ctypes.c_long),
('tv_nsec', ctypes.c_long))
if sys.platform.startswith('linux'):
CLOCK_MONOTONIC = 1
elif sys.platform.startswith('freebsd'):
CLOCK_MONOTONIC = 4
elif sys.platform.startswith('sunos5'):
CLOCK_MONOTONIC = 4
elif 'bsd' in sys.platform:
CLOCK_MONOTONIC = 3
elif sys.platform.startswith('aix'):
CLOCK_MONOTONIC = ctypes.c_longlong(10)
def monotonic():
"""Monotonic clock, cannot go backward."""
ts = timespec()
if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(ts)):
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
return ts.tv_sec + ts.tv_nsec / 1.0e9
# Perform a sanity-check.
if monotonic() - monotonic() > 0:
raise ValueError('monotonic() is not monotonic!')
except Exception as e:
raise RuntimeError('no suitable implementation for this system: ' + repr(e))