Commit 5607fabd authored by Stefan Behnel's avatar Stefan Behnel

reformat Plex code files

parent 727e57d9
......@@ -7,98 +7,101 @@
class Action(object):
def perform(self, token_stream, text):
pass # abstract
def perform(self, token_stream, text):
pass # abstract
def same_as(self, other):
return self is other
def same_as(self, other):
return self is other
class Return(Action):
Internal Plex action which causes |value| to
be returned as the value of the associated token
Internal Plex action which causes |value| to
be returned as the value of the associated token
def __init__(self, value):
self.value = value
def __init__(self, value):
self.value = value
def perform(self, token_stream, text):
return self.value
def perform(self, token_stream, text):
return self.value
def same_as(self, other):
return isinstance(other, Return) and self.value == other.value
def same_as(self, other):
return isinstance(other, Return) and self.value == other.value
def __repr__(self):
return "Return(%s)" % repr(self.value)
def __repr__(self):
return "Return(%s)" % repr(self.value)
class Call(Action):
Internal Plex action which causes a function to be called.
Internal Plex action which causes a function to be called.
def __init__(self, function):
self.function = function
def __init__(self, function):
self.function = function
def perform(self, token_stream, text):
return self.function(token_stream, text)
def perform(self, token_stream, text):
return self.function(token_stream, text)
def __repr__(self):
return "Call(%s)" % self.function.__name__
def __repr__(self):
return "Call(%s)" % self.function.__name__
def same_as(self, other):
return isinstance(other, Call) and self.function is other.function
def same_as(self, other):
return isinstance(other, Call) and self.function is other.function
class Begin(Action):
Begin(state_name) is a Plex action which causes the Scanner to
enter the state |state_name|. See the docstring of Plex.Lexicon
for more information.
Begin(state_name) is a Plex action which causes the Scanner to
enter the state |state_name|. See the docstring of Plex.Lexicon
for more information.
def __init__(self, state_name):
self.state_name = state_name
def __init__(self, state_name):
self.state_name = state_name
def perform(self, token_stream, text):
def perform(self, token_stream, text):
def __repr__(self):
return "Begin(%s)" % self.state_name
def __repr__(self):
return "Begin(%s)" % self.state_name
def same_as(self, other):
return isinstance(other, Begin) and self.state_name == other.state_name
def same_as(self, other):
return isinstance(other, Begin) and self.state_name == other.state_name
class Ignore(Action):
IGNORE is a Plex action which causes its associated token
to be ignored. See the docstring of Plex.Lexicon for more
def perform(self, token_stream, text):
return None
IGNORE is a Plex action which causes its associated token
to be ignored. See the docstring of Plex.Lexicon for more
def perform(self, token_stream, text):
return None
def __repr__(self):
return "IGNORE"
def __repr__(self):
return "IGNORE"
IGNORE = Ignore()
#IGNORE.__doc__ = Ignore.__doc__
class Text(Action):
TEXT is a Plex action which causes the text of a token to
be returned as the value of the token. See the docstring of
Plex.Lexicon for more information.
TEXT is a Plex action which causes the text of a token to
be returned as the value of the token. See the docstring of
Plex.Lexicon for more information.
def perform(self, token_stream, text):
return text
def perform(self, token_stream, text):
return text
def __repr__(self):
return "TEXT"
def __repr__(self):
return "TEXT"
TEXT = Text()
#TEXT.__doc__ = Text.__doc__
......@@ -13,147 +13,152 @@ from .Machines import LOWEST_PRIORITY
from .Transitions import TransitionMap
def nfa_to_dfa(old_machine, debug = None):
Given a nondeterministic Machine, return a new equivalent
Machine which is deterministic.
# We build a new machine whose states correspond to sets of states
# in the old machine. Initially we add a new state corresponding to
# the epsilon-closure of each initial old state. Then we give transitions
# to each new state which are the union of all transitions out of any
# of the corresponding old states. The new state reached on a given
# character is the one corresponding to the set of states reachable
# on that character from any of the old states. As new combinations of
# old states are created, new states are added as needed until closure
# is reached.
new_machine = Machines.FastMachine()
state_map = StateMap(new_machine)
# Seed the process using the initial states of the old machine.
# Make the corresponding new states into initial states of the new
# machine with the same names.
for (key, old_state) in old_machine.initial_states.iteritems():
new_state = state_map.old_to_new(epsilon_closure(old_state))
new_machine.make_initial_state(key, new_state)
# Tricky bit here: we add things to the end of this list while we're
# iterating over it. The iteration stops when closure is achieved.
for new_state in new_machine.states:
transitions = TransitionMap()
for old_state in state_map.new_to_old(new_state):
for event, old_target_states in old_state.transitions.iteritems():
if event and old_target_states:
transitions.add_set(event, set_epsilon_closure(old_target_states))
for event, old_states in transitions.iteritems():
new_machine.add_transitions(new_state, event, state_map.old_to_new(old_states))
if debug:
debug.write("\n===== State Mapping =====\n")
return new_machine
def nfa_to_dfa(old_machine, debug=None):
Given a nondeterministic Machine, return a new equivalent
Machine which is deterministic.
# We build a new machine whose states correspond to sets of states
# in the old machine. Initially we add a new state corresponding to
# the epsilon-closure of each initial old state. Then we give transitions
# to each new state which are the union of all transitions out of any
# of the corresponding old states. The new state reached on a given
# character is the one corresponding to the set of states reachable
# on that character from any of the old states. As new combinations of
# old states are created, new states are added as needed until closure
# is reached.
new_machine = Machines.FastMachine()
state_map = StateMap(new_machine)
# Seed the process using the initial states of the old machine.
# Make the corresponding new states into initial states of the new
# machine with the same names.
for (key, old_state) in old_machine.initial_states.iteritems():
new_state = state_map.old_to_new(epsilon_closure(old_state))
new_machine.make_initial_state(key, new_state)
# Tricky bit here: we add things to the end of this list while we're
# iterating over it. The iteration stops when closure is achieved.
for new_state in new_machine.states:
transitions = TransitionMap()
for old_state in state_map.new_to_old(new_state):
for event, old_target_states in old_state.transitions.iteritems():
if event and old_target_states:
transitions.add_set(event, set_epsilon_closure(old_target_states))
for event, old_states in transitions.iteritems():
new_machine.add_transitions(new_state, event, state_map.old_to_new(old_states))
if debug:
debug.write("\n===== State Mapping =====\n")
return new_machine
def set_epsilon_closure(state_set):
Given a set of states, return the union of the epsilon
closures of its member states.
result = {}
for state1 in state_set:
for state2 in epsilon_closure(state1):
result[state2] = 1
return result
Given a set of states, return the union of the epsilon
closures of its member states.
result = {}
for state1 in state_set:
for state2 in epsilon_closure(state1):
result[state2] = 1
return result
def epsilon_closure(state):
Return the set of states reachable from the given state
by epsilon moves.
# Cache the result
result = state.epsilon_closure
if result is None:
result = {}
state.epsilon_closure = result
add_to_epsilon_closure(result, state)
return result
Return the set of states reachable from the given state
by epsilon moves.
# Cache the result
result = state.epsilon_closure
if result is None:
result = {}
state.epsilon_closure = result
add_to_epsilon_closure(result, state)
return result
def add_to_epsilon_closure(state_set, state):
Recursively add to |state_set| states reachable from the given state
by epsilon moves.
if not state_set.get(state, 0):
state_set[state] = 1
state_set_2 = state.transitions.get_epsilon()
if state_set_2:
for state2 in state_set_2:
add_to_epsilon_closure(state_set, state2)
class StateMap(object):
Helper class used by nfa_to_dfa() to map back and forth between
sets of states from the old machine and states of the new machine.
new_machine = None # Machine
old_to_new_dict = None # {(old_state,...) : new_state}
new_to_old_dict = None # {id(new_state) : old_state_set}
def __init__(self, new_machine):
self.new_machine = new_machine
self.old_to_new_dict = {}
self.new_to_old_dict= {}
def old_to_new(self, old_state_set):
def add_to_epsilon_closure(state_set, state):
Return the state of the new machine corresponding to the
set of old machine states represented by |state_set|. A new
state will be created if necessary. If any of the old states
are accepting states, the new state will be an accepting state
with the highest priority action from the old states.
Recursively add to |state_set| states reachable from the given state
by epsilon moves.
key = self.make_key(old_state_set)
new_state = self.old_to_new_dict.get(key, None)
if not new_state:
action = self.highest_priority_action(old_state_set)
new_state = self.new_machine.new_state(action)
self.old_to_new_dict[key] = new_state
self.new_to_old_dict[id(new_state)] = old_state_set
#for old_state in old_state_set.keys():
return new_state
def highest_priority_action(self, state_set):
best_action = None
best_priority = LOWEST_PRIORITY
for state in state_set:
priority = state.action_priority
if priority > best_priority:
best_action = state.action
best_priority = priority
return best_action
# def old_to_new_set(self, old_state_set):
# """
# Return the new state corresponding to a set of old states as
# a singleton set.
# """
# return {self.old_to_new(old_state_set):1}
def new_to_old(self, new_state):
"""Given a new state, return a set of corresponding old states."""
return self.new_to_old_dict[id(new_state)]
def make_key(self, state_set):
if not state_set.get(state, 0):
state_set[state] = 1
state_set_2 = state.transitions.get_epsilon()
if state_set_2:
for state2 in state_set_2:
add_to_epsilon_closure(state_set, state2)
class StateMap(object):
Convert a set of states into a uniquified
sorted tuple suitable for use as a dictionary key.
Helper class used by nfa_to_dfa() to map back and forth between
sets of states from the old machine and states of the new machine.
lst = list(state_set)
return tuple(lst)
def dump(self, file):
from .Transitions import state_set_str
for new_state in self.new_machine.states:
old_state_set = self.new_to_old_dict[id(new_state)]
file.write(" State %s <-- %s\n" % (
new_state['number'], state_set_str(old_state_set)))
new_machine = None # Machine
old_to_new_dict = None # {(old_state,...) : new_state}
new_to_old_dict = None # {id(new_state) : old_state_set}
def __init__(self, new_machine):
self.new_machine = new_machine
self.old_to_new_dict = {}
self.new_to_old_dict = {}
def old_to_new(self, old_state_set):
Return the state of the new machine corresponding to the
set of old machine states represented by |state_set|. A new
state will be created if necessary. If any of the old states
are accepting states, the new state will be an accepting state
with the highest priority action from the old states.
key = self.make_key(old_state_set)
new_state = self.old_to_new_dict.get(key, None)
if not new_state:
action = self.highest_priority_action(old_state_set)
new_state = self.new_machine.new_state(action)
self.old_to_new_dict[key] = new_state
self.new_to_old_dict[id(new_state)] = old_state_set
#for old_state in old_state_set.keys():
return new_state
def highest_priority_action(self, state_set):
best_action = None
best_priority = LOWEST_PRIORITY
for state in state_set:
priority = state.action_priority
if priority > best_priority:
best_action = state.action
best_priority = priority
return best_action
# def old_to_new_set(self, old_state_set):
# """
# Return the new state corresponding to a set of old states as
# a singleton set.
# """
# return {self.old_to_new(old_state_set):1}
def new_to_old(self, new_state):
"""Given a new state, return a set of corresponding old states."""
return self.new_to_old_dict[id(new_state)]
def make_key(self, state_set):
Convert a set of states into a uniquified
sorted tuple suitable for use as a dictionary key.
lst = list(state_set)
return tuple(lst)
def dump(self, file):
from .Transitions import state_set_str
for new_state in self.new_machine.states:
old_state_set = self.new_to_old_dict[id(new_state)]
file.write(" State %s <-- %s\n" % (
new_state['number'], state_set_str(old_state_set)))
......@@ -6,45 +6,49 @@
class PlexError(Exception):
message = ""
message = ""
class PlexTypeError(PlexError, TypeError):
class PlexValueError(PlexError, ValueError):
class InvalidRegex(PlexError):
class InvalidToken(PlexError):
def __init__(self, token_number, message):
PlexError.__init__(self, "Token number %d: %s" % (token_number, message))
def __init__(self, token_number, message):
PlexError.__init__(self, "Token number %d: %s" % (token_number, message))
class InvalidScanner(PlexError):
class AmbiguousAction(PlexError):
message = "Two tokens with different actions can match the same string"
def __init__(self):
class UnrecognizedInput(PlexError):
scanner = None
position = None
state_name = None
def __init__(self, scanner, state_name):
self.scanner = scanner
self.position = scanner.get_position()
self.state_name = state_name
def __str__(self):
return ("'%s', line %d, char %d: Token not recognised in state %s"
% (self.position + (repr(self.state_name),)))
class AmbiguousAction(PlexError):
message = "Two tokens with different actions can match the same string"
def __init__(self):
class UnrecognizedInput(PlexError):
scanner = None
position = None
state_name = None
def __init__(self, scanner, state_name):
self.scanner = scanner
self.position = scanner.get_position()
self.state_name = state_name
def __str__(self):
return ("'%s', line %d, char %d: Token not recognised in state %s" % (
self.position + (repr(self.state_name),)))
......@@ -22,177 +22,179 @@ DUMP_DFA = 2
class State(object):
This class is used as part of a Plex.Lexicon specification to
introduce a user-defined state.
This class is used as part of a Plex.Lexicon specification to
introduce a user-defined state.
State(name, token_specifications)
State(name, token_specifications)
name = None
tokens = None
name = None
tokens = None
def __init__(self, name, tokens): = name
self.tokens = tokens
def __init__(self, name, tokens): = name
self.tokens = tokens
class Lexicon(object):
Lexicon(specification) builds a lexical analyser from the given
|specification|. The specification consists of a list of
specification items. Each specification item may be either:
1) A token definition, which is a tuple:
(pattern, action)
The |pattern| is a regular axpression built using the
constructors defined in the Plex module.
The |action| is the action to be performed when this pattern
is recognised (see below).
2) A state definition:
State(name, tokens)
where |name| is a character string naming the state,
and |tokens| is a list of token definitions as
above. The meaning and usage of states is described
The |action| in a token specication may be one of three things:
1) A function, which is called as follows:
function(scanner, text)
where |scanner| is the relevant Scanner instance, and |text|
is the matched text. If the function returns anything
other than None, that value is returned as the value of the
token. If it returns None, scanning continues as if the IGNORE
action were specified (see below).
2) One of the following special actions:
IGNORE means that the recognised characters will be treated as
white space and ignored. Scanning will continue until
the next non-ignored token is recognised before returning.
TEXT causes the scanned text itself to be returned as the
value of the token.
3) Any other value, which is returned as the value of the token.
At any given time, the scanner is in one of a number of states.
Associated with each state is a set of possible tokens. When scanning,
only tokens associated with the current state are recognised.
There is a default state, whose name is the empty string. Token
definitions which are not inside any State definition belong to
the default state.
The initial state of the scanner is the default state. The state can
be changed in one of two ways:
1) Using Begin(state_name) as the action of a token.
2) Calling the begin(state_name) method of the Scanner.
To change back to the default state, use '' as the state name.
machine = None # Machine
tables = None # StateTableMachine
def __init__(self, specifications, debug = None, debug_flags = 7, timings = None):
if type(specifications) != types.ListType:
raise Errors.InvalidScanner("Scanner definition is not a list")
if timings:
from .Timing import time
total_time = 0.0
time1 = time()
nfa = Machines.Machine()
default_initial_state = nfa.new_initial_state('')
token_number = 1
for spec in specifications:
if isinstance(spec, State):
user_initial_state = nfa.new_initial_state(
for token in spec.tokens:
nfa, user_initial_state, token, token_number)
token_number = token_number + 1
elif type(spec) == types.TupleType:
nfa, default_initial_state, spec, token_number)
token_number = token_number + 1
raise Errors.InvalidToken(
"Expected a token definition (tuple) or State instance")
if timings:
time2 = time()
total_time = total_time + (time2 - time1)
time3 = time()
if debug and (debug_flags & 1):
debug.write("\n============= NFA ===========\n")
dfa = DFA.nfa_to_dfa(nfa, debug = (debug_flags & 3) == 3 and debug)
if timings:
time4 = time()
total_time = total_time + (time4 - time3)
if debug and (debug_flags & 2):
debug.write("\n============= DFA ===========\n")
if timings:
timings.write("Constructing NFA : %5.2f\n" % (time2 - time1))
timings.write("Converting to DFA: %5.2f\n" % (time4 - time3))
timings.write("TOTAL : %5.2f\n" % total_time)
self.machine = dfa
def add_token_to_machine(self, machine, initial_state, token_spec, token_number):
(re, action_spec) = self.parse_token_definition(token_spec)
# Disabled this -- matching empty strings can be useful
#if re.nullable:
# raise Errors.InvalidToken(
# token_number, "Pattern can match 0 input symbols")
if isinstance(action_spec, Actions.Action):
action = action_spec
Lexicon(specification) builds a lexical analyser from the given
|specification|. The specification consists of a list of
specification items. Each specification item may be either:
1) A token definition, which is a tuple:
(pattern, action)
The |pattern| is a regular axpression built using the
constructors defined in the Plex module.
The |action| is the action to be performed when this pattern
is recognised (see below).
2) A state definition:
State(name, tokens)
where |name| is a character string naming the state,
and |tokens| is a list of token definitions as
above. The meaning and usage of states is described
The |action| in a token specication may be one of three things:
1) A function, which is called as follows:
function(scanner, text)
where |scanner| is the relevant Scanner instance, and |text|
is the matched text. If the function returns anything
other than None, that value is returned as the value of the
token. If it returns None, scanning continues as if the IGNORE
action were specified (see below).
2) One of the following special actions:
IGNORE means that the recognised characters will be treated as
white space and ignored. Scanning will continue until
the next non-ignored token is recognised before returning.
TEXT causes the scanned text itself to be returned as the
value of the token.
3) Any other value, which is returned as the value of the token.
At any given time, the scanner is in one of a number of states.
Associated with each state is a set of possible tokens. When scanning,
only tokens associated with the current state are recognised.
There is a default state, whose name is the empty string. Token
definitions which are not inside any State definition belong to
the default state.
The initial state of the scanner is the default state. The state can
be changed in one of two ways:
1) Using Begin(state_name) as the action of a token.
2) Calling the begin(state_name) method of the Scanner.
To change back to the default state, use '' as the state name.
machine = None # Machine
tables = None # StateTableMachine
def __init__(self, specifications, debug=None, debug_flags=7, timings=None):
if type(specifications) != types.ListType:
raise Errors.InvalidScanner("Scanner definition is not a list")
if timings:
from .Timing import time
total_time = 0.0
time1 = time()
nfa = Machines.Machine()
default_initial_state = nfa.new_initial_state('')
token_number = 1
for spec in specifications:
if isinstance(spec, State):
user_initial_state = nfa.new_initial_state(
for token in spec.tokens:
nfa, user_initial_state, token, token_number)
token_number += 1
elif type(spec) == types.TupleType:
nfa, default_initial_state, spec, token_number)
token_number += 1
raise Errors.InvalidToken(
"Expected a token definition (tuple) or State instance")
if timings:
time2 = time()
total_time = total_time + (time2 - time1)
time3 = time()
if debug and (debug_flags & 1):
debug.write("\n============= NFA ===========\n")
dfa = DFA.nfa_to_dfa(nfa, debug=(debug_flags & 3) == 3 and debug)
if timings:
time4 = time()
total_time = total_time + (time4 - time3)
if debug and (debug_flags & 2):
debug.write("\n============= DFA ===========\n")
if timings:
timings.write("Constructing NFA : %5.2f\n" % (time2 - time1))
timings.write("Converting to DFA: %5.2f\n" % (time4 - time3))
timings.write("TOTAL : %5.2f\n" % total_time)
self.machine = dfa
def add_token_to_machine(self, machine, initial_state, token_spec, token_number):
except AttributeError:
action = Actions.Return(action_spec)
action = Actions.Call(action_spec)
final_state = machine.new_state()
re.build_machine(machine, initial_state, final_state,
match_bol = 1, nocase = 0)
final_state.set_action(action, priority = -token_number)
except Errors.PlexError, e:
raise e.__class__("Token number %d: %s" % (token_number, e))
def parse_token_definition(self, token_spec):
if type(token_spec) != types.TupleType:
raise Errors.InvalidToken("Token definition is not a tuple")
if len(token_spec) != 2:
raise Errors.InvalidToken("Wrong number of items in token definition")
pattern, action = token_spec
if not isinstance(pattern, Regexps.RE):
raise Errors.InvalidToken("Pattern is not an RE instance")
return (pattern, action)
def get_initial_state(self, name):
return self.machine.get_initial_state(name)
(re, action_spec) = self.parse_token_definition(token_spec)
# Disabled this -- matching empty strings can be useful
#if re.nullable:
# raise Errors.InvalidToken(
# token_number, "Pattern can match 0 input symbols")
if isinstance(action_spec, Actions.Action):
action = action_spec
except AttributeError:
action = Actions.Return(action_spec)
action = Actions.Call(action_spec)
final_state = machine.new_state()
re.build_machine(machine, initial_state, final_state,
match_bol=1, nocase=0)
final_state.set_action(action, priority=-token_number)
except Errors.PlexError, e:
raise e.__class__("Token number %d: %s" % (token_number, e))
def parse_token_definition(self, token_spec):
if type(token_spec) != types.TupleType:
raise Errors.InvalidToken("Token definition is not a tuple")
if len(token_spec) != 2:
raise Errors.InvalidToken("Wrong number of items in token definition")
pattern, action = token_spec
if not isinstance(pattern, Regexps.RE):
raise Errors.InvalidToken("Pattern is not an RE instance")
return (pattern, action)
def get_initial_state(self, name):
return self.machine.get_initial_state(name)
......@@ -16,244 +16,245 @@ LOWEST_PRIORITY = -sys.maxint
class Machine(object):
"""A collection of Nodes representing an NFA or DFA."""
states = None # [Node]
next_state_number = 1
initial_states = None # {(name, bol): Node}
def __init__(self):
self.states = []
self.initial_states = {}
def __del__(self):
#print "Destroying", self ###
for state in self.states:
def new_state(self):
"""Add a new state to the machine and return it."""
s = Node()
n = self.next_state_number
self.next_state_number = n + 1
s.number = n
return s
def new_initial_state(self, name):
state = self.new_state()
self.make_initial_state(name, state)
return state
def make_initial_state(self, name, state):
self.initial_states[name] = state
def get_initial_state(self, name):
return self.initial_states[name]
def dump(self, file):
if self.initial_states is not None:
file.write(" Initial states:\n")
for (name, state) in self.initial_states.iteritems():
file.write(" '%s': %d\n" % (name, state.number))
for s in self.states:
"""A collection of Nodes representing an NFA or DFA."""
states = None # [Node]
next_state_number = 1
initial_states = None # {(name, bol): Node}
def __init__(self):
self.states = []
self.initial_states = {}
def __del__(self):
#print "Destroying", self ###
for state in self.states:
def new_state(self):
"""Add a new state to the machine and return it."""
s = Node()
n = self.next_state_number
self.next_state_number = n + 1
s.number = n
return s
def new_initial_state(self, name):
state = self.new_state()
self.make_initial_state(name, state)
return state
def make_initial_state(self, name, state):
self.initial_states[name] = state
def get_initial_state(self, name):
return self.initial_states[name]
def dump(self, file):
if self.initial_states is not None:
file.write(" Initial states:\n")
for (name, state) in self.initial_states.iteritems():
file.write(" '%s': %d\n" % (name, state.number))
for s in self.states:
class Node(object):
"""A state of an NFA or DFA."""
transitions = None # TransitionMap
action = None # Action
action_priority = None # integer
number = 0 # for debug output
epsilon_closure = None # used by nfa_to_dfa()
def __init__(self):
# Preinitialise the list of empty transitions, because
# the nfa-to-dfa algorithm needs it
#self.transitions = {'':[]}
self.transitions = TransitionMap()
self.action_priority = LOWEST_PRIORITY
def destroy(self):
#print "Destroying", self ###
self.transitions = None
self.action = None
self.epsilon_closure = None
def add_transition(self, event, new_state):
self.transitions.add(event, new_state)
def link_to(self, state):
"""Add an epsilon-move from this state to another state."""
self.add_transition('', state)
def set_action(self, action, priority):
"""Make this an accepting state with the given action. If
there is already an action, choose the action with highest
if priority > self.action_priority:
self.action = action
self.action_priority = priority
def get_action(self):
return self.action
def get_action_priority(self):
return self.action_priority
def is_accepting(self):
return self.action is not None
def __str__(self):
return "State %d" % self.number
def dump(self, file):
# Header
file.write(" State %d:\n" % self.number)
# Transitions
# self.dump_transitions(file)
# Action
action = self.action
priority = self.action_priority
if action is not None:
file.write(" %s [priority %d]\n" % (action, priority))
def __lt__(self, other):
return self.number < other.number
"""A state of an NFA or DFA."""
transitions = None # TransitionMap
action = None # Action
action_priority = None # integer
number = 0 # for debug output
epsilon_closure = None # used by nfa_to_dfa()
def __init__(self):
# Preinitialise the list of empty transitions, because
# the nfa-to-dfa algorithm needs it
#self.transitions = {'':[]}
self.transitions = TransitionMap()
self.action_priority = LOWEST_PRIORITY
def destroy(self):
#print "Destroying", self ###
self.transitions = None
self.action = None
self.epsilon_closure = None
def add_transition(self, event, new_state):
self.transitions.add(event, new_state)
def link_to(self, state):
"""Add an epsilon-move from this state to another state."""
self.add_transition('', state)
def set_action(self, action, priority):
"""Make this an accepting state with the given action. If
there is already an action, choose the action with highest
if priority > self.action_priority:
self.action = action
self.action_priority = priority
def get_action(self):
return self.action
def get_action_priority(self):
return self.action_priority
def is_accepting(self):
return self.action is not None
def __str__(self):
return "State %d" % self.number
def dump(self, file):
# Header
file.write(" State %d:\n" % self.number)
# Transitions
# self.dump_transitions(file)
# Action
action = self.action
priority = self.action_priority
if action is not None:
file.write(" %s [priority %d]\n" % (action, priority))
def __lt__(self, other):
return self.number < other.number
class FastMachine(object):
FastMachine is a deterministic machine represented in a way that
allows fast scanning.
initial_states = None # {state_name:state}
states = None # [state]
# where state = {event:state, 'else':state, 'action':Action}
next_number = 1 # for debugging
new_state_template = {
'':None, 'bol':None, 'eol':None, 'eof':None, 'else':None
def __init__(self, old_machine = None):
self.initial_states = initial_states = {}
self.states = []
if old_machine:
self.old_to_new = old_to_new = {}
for old_state in old_machine.states:
new_state = self.new_state()
old_to_new[old_state] = new_state
for name, old_state in old_machine.initial_states.iteritems():
initial_states[name] = old_to_new[old_state]
for old_state in old_machine.states:
new_state = old_to_new[old_state]
for event, old_state_set in old_state.transitions.iteritems():
if old_state_set:
new_state[event] = old_to_new[old_state_set.keys()[0]]
new_state[event] = None
new_state['action'] = old_state.action
def __del__(self):
for state in self.states:
def new_state(self, action = None):
number = self.next_number
self.next_number = number + 1
result = self.new_state_template.copy()
result['number'] = number
result['action'] = action
return result
def make_initial_state(self, name, state):
self.initial_states[name] = state
def add_transitions(self, state, event, new_state, maxint=sys.maxint):
if type(event) is tuple:
code0, code1 = event
if code0 == -maxint:
state['else'] = new_state
elif code1 != maxint:
while code0 < code1:
state[unichr(code0)] = new_state
code0 = code0 + 1
state[event] = new_state
def get_initial_state(self, name):
return self.initial_states[name]
def dump(self, file):
file.write(" Initial states:\n")
for name, state in self.initial_states.iteritems():
file.write(" %s: %s\n" % (repr(name), state['number']))
for state in self.states:
self.dump_state(state, file)
def dump_state(self, state, file):
# Header
file.write(" State %d:\n" % state['number'])
# Transitions
self.dump_transitions(state, file)
# Action
action = state['action']
if action is not None:
file.write(" %s\n" % action)
def dump_transitions(self, state, file):
chars_leading_to_state = {}
special_to_state = {}
for (c, s) in state.iteritems():
if len(c) == 1:
chars = chars_leading_to_state.get(id(s), None)
if chars is None:
chars = []
chars_leading_to_state[id(s)] = chars
elif len(c) <= 4:
special_to_state[c] = s
ranges_to_state = {}
for state in self.states:
char_list = chars_leading_to_state.get(id(state), None)
if char_list:
ranges = self.chars_to_ranges(char_list)
ranges_to_state[ranges] = state
ranges_list = ranges_to_state.keys()
for ranges in ranges_list:
key = self.ranges_to_string(ranges)
state = ranges_to_state[ranges]
file.write(" %s --> State %d\n" % (key, state['number']))
for key in ('bol', 'eol', 'eof', 'else'):
state = special_to_state.get(key, None)
if state:
file.write(" %s --> State %d\n" % (key, state['number']))
def chars_to_ranges(self, char_list):
i = 0
n = len(char_list)
result = []
while i < n:
c1 = ord(char_list[i])
c2 = c1
i = i + 1
while i < n and ord(char_list[i]) == c2 + 1:
i = i + 1
c2 = c2 + 1
result.append((chr(c1), chr(c2)))
return tuple(result)
def ranges_to_string(self, range_list):
return ','.join(map(self.range_to_string, range_list))
def range_to_string(self, range_tuple):
(c1, c2) = range_tuple
if c1 == c2:
return repr(c1)
return "%s..%s" % (repr(c1), repr(c2))
FastMachine is a deterministic machine represented in a way that
allows fast scanning.
initial_states = None # {state_name:state}
states = None # [state] where state = {event:state, 'else':state, 'action':Action}
next_number = 1 # for debugging
new_state_template = {
'': None, 'bol': None, 'eol': None, 'eof': None, 'else': None
def __init__(self, old_machine=None):
self.initial_states = initial_states = {}
self.states = []
if old_machine:
self.old_to_new = old_to_new = {}
for old_state in old_machine.states:
new_state = self.new_state()
old_to_new[old_state] = new_state
for name, old_state in old_machine.initial_states.iteritems():
initial_states[name] = old_to_new[old_state]
for old_state in old_machine.states:
new_state = old_to_new[old_state]
for event, old_state_set in old_state.transitions.iteritems():
if old_state_set:
new_state[event] = old_to_new[old_state_set.keys()[0]]
new_state[event] = None
new_state['action'] = old_state.action
def __del__(self):
for state in self.states:
def new_state(self, action=None):
number = self.next_number
self.next_number = number + 1
result = self.new_state_template.copy()
result['number'] = number
result['action'] = action
return result
def make_initial_state(self, name, state):
self.initial_states[name] = state
def add_transitions(self, state, event, new_state, maxint=sys.maxint):
if type(event) is tuple:
code0, code1 = event
if code0 == -maxint:
state['else'] = new_state
elif code1 != maxint:
while code0 < code1:
state[unichr(code0)] = new_state
code0 += 1
state[event] = new_state
def get_initial_state(self, name):
return self.initial_states[name]
def dump(self, file):
file.write(" Initial states:\n")
for name, state in self.initial_states.iteritems():
file.write(" %s: %s\n" % (repr(name), state['number']))
for state in self.states:
self.dump_state(state, file)
def dump_state(self, state, file):
# Header
file.write(" State %d:\n" % state['number'])
# Transitions
self.dump_transitions(state, file)
# Action
action = state['action']
if action is not None:
file.write(" %s\n" % action)
def dump_transitions(self, state, file):
chars_leading_to_state = {}
special_to_state = {}
for (c, s) in state.iteritems():
if len(c) == 1:
chars = chars_leading_to_state.get(id(s), None)
if chars is None:
chars = []
chars_leading_to_state[id(s)] = chars
elif len(c) <= 4:
special_to_state[c] = s
ranges_to_state = {}
for state in self.states:
char_list = chars_leading_to_state.get(id(state), None)
if char_list:
ranges = self.chars_to_ranges(char_list)
ranges_to_state[ranges] = state
ranges_list = ranges_to_state.keys()
for ranges in ranges_list:
key = self.ranges_to_string(ranges)
state = ranges_to_state[ranges]
file.write(" %s --> State %d\n" % (key, state['number']))
for key in ('bol', 'eol', 'eof', 'else'):
state = special_to_state.get(key, None)
if state:
file.write(" %s --> State %d\n" % (key, state['number']))
def chars_to_ranges(self, char_list):
i = 0
n = len(char_list)
result = []
while i < n:
c1 = ord(char_list[i])
c2 = c1
i += 1
while i < n and ord(char_list[i]) == c2 + 1:
i += 1
c2 += 1
result.append((chr(c1), chr(c2)))
return tuple(result)
def ranges_to_string(self, range_list):
return ','.join(map(self.range_to_string, range_list))
def range_to_string(self, range_tuple):
(c1, c2) = range_tuple
if c1 == c2:
return repr(c1)
return "%s..%s" % (repr(c1), repr(c2))
......@@ -42,14 +42,15 @@ def chars_to_ranges(s):
while i < n:
code1 = ord(char_list[i])
code2 = code1 + 1
i = i + 1
i += 1
while i < n and code2 >= ord(char_list[i]):
code2 = code2 + 1
i = i + 1
code2 += 1
i += 1
return result
def uppercase_range(code1, code2):
If the range of characters from code1 to code2-1 includes any
......@@ -63,6 +64,7 @@ def uppercase_range(code1, code2):
return None
def lowercase_range(code1, code2):
If the range of characters from code1 to code2-1 includes any
......@@ -76,6 +78,7 @@ def lowercase_range(code1, code2):
return None
def CodeRanges(code_list):
Given a list of codes as returned by chars_to_ranges, return
......@@ -86,6 +89,7 @@ def CodeRanges(code_list):
re_list.append(CodeRange(code_list[i], code_list[i + 1]))
return Alt(*re_list)
def CodeRange(code1, code2):
CodeRange(code1, code2) is an RE which matches any character
......@@ -93,11 +97,12 @@ def CodeRange(code1, code2):
if code1 <= nl_code < code2:
return Alt(RawCodeRange(code1, nl_code),
RawCodeRange(nl_code + 1, code2))
RawCodeRange(nl_code + 1, code2))
return RawCodeRange(code1, code2)
# Abstract classes
......@@ -110,12 +115,12 @@ class RE(object):
re1 | re2 is an RE which matches either |re1| or |re2|
nullable = 1 # True if this RE can match 0 input symbols
match_nl = 1 # True if this RE can match a string ending with '\n'
str = None # Set to a string to override the class's __str__ result
nullable = 1 # True if this RE can match 0 input symbols
match_nl = 1 # True if this RE can match a string ending with '\n'
str = None # Set to a string to override the class's __str__ result
def build_machine(self, machine, initial_state, final_state,
match_bol, nocase):
match_bol, nocase):
This method should add states to |machine| to implement this
RE, starting at |initial_state| and ending at |final_state|.
......@@ -124,7 +129,7 @@ class RE(object):
letters should be treated as equivalent.
raise NotImplementedError("%s.build_machine not implemented" %
def build_opt(self, m, initial_state, c):
......@@ -160,18 +165,18 @@ class RE(object):
self.check_string(num, value)
if len(value) != 1:
raise Errors.PlexValueError("Invalid value for argument %d of Plex.%s."
"Expected a string of length 1, got: %s" % (
num, self.__class__.__name__, repr(value)))
"Expected a string of length 1, got: %s" % (
num, self.__class__.__name__, repr(value)))
def wrong_type(self, num, value, expected):
if type(value) == types.InstanceType:
got = "%s.%s instance" % (
value.__class__.__module__, value.__class__.__name__)
got = "%s.%s instance" % (
value.__class__.__module__, value.__class__.__name__)
got = type(value).__name__
raise Errors.PlexTypeError("Invalid type for argument %d of Plex.%s "
"(expected %s, got %s" % (
num, self.__class__.__name__, expected, got))
"(expected %s, got %s" % (
num, self.__class__.__name__, expected, got))
# Primitive RE constructors
......@@ -211,6 +216,7 @@ class RE(object):
## def calc_str(self):
## return "Char(%s)" % repr(self.char)
def Char(c):
Char(c) is an RE which matches the character |c|.
......@@ -222,6 +228,7 @@ def Char(c):
result.str = "Char(%s)" % repr(c)
return result
class RawCodeRange(RE):
RawCodeRange(code1, code2) is a low-level RE which matches any character
......@@ -230,9 +237,9 @@ class RawCodeRange(RE):
nullable = 0
match_nl = 0
range = None # (code, code)
uppercase_range = None # (code, code) or None
lowercase_range = None # (code, code) or None
range = None # (code, code)
uppercase_range = None # (code, code) or None
lowercase_range = None # (code, code) or None
def __init__(self, code1, code2):
self.range = (code1, code2)
......@@ -252,6 +259,7 @@ class RawCodeRange(RE):
def calc_str(self):
return "CodeRange(%d,%d)" % (self.code1, self.code2)
class _RawNewline(RE):
RawNewline is a low-level RE which matches a newline character.
......@@ -266,6 +274,7 @@ class _RawNewline(RE):
s = self.build_opt(m, initial_state, EOL)
s.add_transition((nl_code, nl_code + 1), final_state)
RawNewline = _RawNewline()
......@@ -304,7 +313,7 @@ class Seq(RE):
i = len(re_list)
match_nl = 0
while i:
i = i - 1
i -= 1
re = re_list[i]
if re.match_nl:
match_nl = 1
......@@ -354,7 +363,7 @@ class Alt(RE):
if re.match_nl:
match_nl = 1
i = i + 1
i += 1
self.nullable_res = nullable_res
self.non_nullable_res = non_nullable_res
self.nullable = nullable
......@@ -411,7 +420,7 @@ class SwitchCase(RE):
def build_machine(self, m, initial_state, final_state, match_bol, nocase):, initial_state, final_state, match_bol,
def calc_str(self):
if self.nocase:
......@@ -434,6 +443,7 @@ Empty.__doc__ = \
Empty.str = "Empty"
def Str1(s):
Str1(s) is an RE which matches the literal string |s|.
......@@ -442,6 +452,7 @@ def Str1(s):
result.str = "Str(%s)" % repr(s)
return result
def Str(*strs):
Str(s) is an RE which matches the literal string |s|.
......@@ -454,6 +465,7 @@ def Str(*strs):
result.str = "Str(%s)" % ','.join(map(repr, strs))
return result
def Any(s):
Any(s) is an RE which matches any character in the string |s|.
......@@ -463,6 +475,7 @@ def Any(s):
result.str = "Any(%s)" % repr(s)
return result
def AnyBut(s):
AnyBut(s) is an RE which matches any character (including
......@@ -475,6 +488,7 @@ def AnyBut(s):
result.str = "AnyBut(%s)" % repr(s)
return result
AnyChar = AnyBut("")
AnyChar.__doc__ = \
......@@ -482,7 +496,8 @@ AnyChar.__doc__ = \
AnyChar.str = "AnyChar"
def Range(s1, s2 = None):
def Range(s1, s2=None):
Range(c1, c2) is an RE which matches any single character in the range
|c1| to |c2| inclusive.
......@@ -495,11 +510,12 @@ def Range(s1, s2 = None):
ranges = []
for i in range(0, len(s1), 2):
ranges.append(CodeRange(ord(s1[i]), ord(s1[i+1]) + 1))
ranges.append(CodeRange(ord(s1[i]), ord(s1[i + 1]) + 1))
result = Alt(*ranges)
result.str = "Range(%s)" % repr(s1)
return result
def Opt(re):
Opt(re) is an RE which matches either |re| or the empty string.
......@@ -508,6 +524,7 @@ def Opt(re):
result.str = "Opt(%s)" % re
return result
def Rep(re):
Rep(re) is an RE which matches zero or more repetitions of |re|.
......@@ -516,12 +533,14 @@ def Rep(re):
result.str = "Rep(%s)" % re
return result
def NoCase(re):
NoCase(re) is an RE which matches the same strings as RE, but treating
upper and lower case letters as equivalent.
return SwitchCase(re, nocase = 1)
return SwitchCase(re, nocase=1)
def Case(re):
......@@ -529,7 +548,7 @@ def Case(re):
upper and lower case letters as distinct, i.e. it cancels the effect
of any enclosing NoCase().
return SwitchCase(re, nocase = 0)
return SwitchCase(re, nocase=0)
# RE Constants
......@@ -10,6 +10,7 @@
from __future__ import absolute_import
import cython
cython.declare(BOL=object, EOL=object, EOF=object, NOT_FOUND=object)
from . import Errors
......@@ -19,317 +20,318 @@ NOT_FOUND = object()
class Scanner(object):
A Scanner is used to read tokens from a stream of characters
using the token set specified by a Plex.Lexicon.
Scanner(lexicon, stream, name = '')
A Scanner is used to read tokens from a stream of characters
using the token set specified by a Plex.Lexicon.
See the docstring of the __init__ method for details.
Scanner(lexicon, stream, name = '')
See the docstrings of the individual methods for more
See the docstring of the __init__ method for details.
read() --> (value, text)
Reads the next lexical token from the stream.
position() --> (name, line, col)
Returns the position of the last token read using the
read() method.
See the docstrings of the individual methods for more
Causes scanner to change state.
read() --> (value, text)
Reads the next lexical token from the stream.
produce(value [, text])
Causes return of a token value to the caller of the
position() --> (name, line, col)
Returns the position of the last token read using the
read() method.
Causes scanner to change state.
# lexicon = None # Lexicon
# stream = None # file-like object
# name = ''
# buffer = ''
# buf_start_pos = 0 # position in input of start of buffer
# next_pos = 0 # position in input of next char to read
# cur_pos = 0 # position in input of current char
# cur_line = 1 # line number of current char
# cur_line_start = 0 # position in input of start of current line
# start_pos = 0 # position in input of start of token
# start_line = 0 # line number of start of token
# start_col = 0 # position in line of start of token
# text = None # text of last token read
# initial_state = None # Node
# state_name = '' # Name of initial state
# queue = None # list of tokens to be returned
# trace = 0
produce(value [, text])
Causes return of a token value to the caller of the
def __init__(self, lexicon, stream, name = '', initial_pos = None):
Scanner(lexicon, stream, name = '')
|lexicon| is a Plex.Lexicon instance specifying the lexical tokens
to be recognised.
|stream| can be a file object or anything which implements a
compatible read() method.
|name| is optional, and may be the name of the file being
scanned or any other identifying string.
self.trace = 0
self.buffer = u''
self.buf_start_pos = 0
self.next_pos = 0
self.cur_pos = 0
self.cur_line = 1
self.start_pos = 0
self.start_line = 0
self.start_col = 0
self.text = None
self.state_name = None
self.lexicon = lexicon = stream = name
self.queue = []
self.initial_state = None
self.next_pos = 0
self.cur_pos = 0
self.cur_line_start = 0
self.cur_char = BOL
self.input_state = 1
if initial_pos is not None:
self.cur_line, self.cur_line_start = initial_pos[1], -initial_pos[2]
def read(self):
Read the next lexical token from the stream and return a
tuple (value, text), where |value| is the value associated with
the token as specified by the Lexicon, and |text| is the actual
string read from the stream. Returns (None, '') on end of file.
queue = self.queue
while not queue:
self.text, action = self.scan_a_token()
if action is None:
value = action.perform(self, self.text)
if value is not None:
result = queue[0]
del queue[0]
return result
def scan_a_token(self):
Read the next input sequence recognised by the machine
and return (text, action). Returns ('', None) on end of
self.start_pos = self.cur_pos
self.start_line = self.cur_line
self.start_col = self.cur_pos - self.cur_line_start
action = self.run_machine_inlined()
if action is not None:
if self.trace:
print("Scanner: read: Performing %s %d:%d" % (
action, self.start_pos, self.cur_pos))
text = self.buffer[self.start_pos - self.buf_start_pos :
self.cur_pos - self.buf_start_pos]
return (text, action)
if self.cur_pos == self.start_pos:
if self.cur_char is EOL:
if self.cur_char is None or self.cur_char is EOF:
return (u'', None)
raise Errors.UnrecognizedInput(self, self.state_name)
def run_machine_inlined(self):
Inlined version of run_machine for speed.
state = self.initial_state
cur_pos = self.cur_pos
cur_line = self.cur_line
cur_line_start = self.cur_line_start
cur_char = self.cur_char
input_state = self.input_state
next_pos = self.next_pos
buffer = self.buffer
buf_start_pos = self.buf_start_pos
buf_len = len(buffer)
b_action, b_cur_pos, b_cur_line, b_cur_line_start, b_cur_char, b_input_state, b_next_pos = \
None, 0, 0, 0, u'', 0, 0
trace = self.trace
while 1:
if trace: #TRACE#
print("State %d, %d/%d:%s -->" % ( #TRACE#
state['number'], input_state, cur_pos, repr(cur_char))) #TRACE#
# Begin inlined self.save_for_backup()
#action = state.action #@slow
action = state['action'] #@fast
if action is not None:
# lexicon = None # Lexicon
# stream = None # file-like object
# name = ''
# buffer = ''
# buf_start_pos = 0 # position in input of start of buffer
# next_pos = 0 # position in input of next char to read
# cur_pos = 0 # position in input of current char
# cur_line = 1 # line number of current char
# cur_line_start = 0 # position in input of start of current line
# start_pos = 0 # position in input of start of token
# start_line = 0 # line number of start of token
# start_col = 0 # position in line of start of token
# text = None # text of last token read
# initial_state = None # Node
# state_name = '' # Name of initial state
# queue = None # list of tokens to be returned
# trace = 0
def __init__(self, lexicon, stream, name='', initial_pos=None):
Scanner(lexicon, stream, name = '')
|lexicon| is a Plex.Lexicon instance specifying the lexical tokens
to be recognised.
|stream| can be a file object or anything which implements a
compatible read() method.
|name| is optional, and may be the name of the file being
scanned or any other identifying string.
self.trace = 0
self.buffer = u''
self.buf_start_pos = 0
self.next_pos = 0
self.cur_pos = 0
self.cur_line = 1
self.start_pos = 0
self.start_line = 0
self.start_col = 0
self.text = None
self.state_name = None
self.lexicon = lexicon = stream = name
self.queue = []
self.initial_state = None
self.next_pos = 0
self.cur_pos = 0
self.cur_line_start = 0
self.cur_char = BOL
self.input_state = 1
if initial_pos is not None:
self.cur_line, self.cur_line_start = initial_pos[1], -initial_pos[2]
def read(self):
Read the next lexical token from the stream and return a
tuple (value, text), where |value| is the value associated with
the token as specified by the Lexicon, and |text| is the actual
string read from the stream. Returns (None, '') on end of file.
queue = self.queue
while not queue:
self.text, action = self.scan_a_token()
if action is None:
value = action.perform(self, self.text)
if value is not None:
result = queue[0]
del queue[0]
return result
def scan_a_token(self):
Read the next input sequence recognised by the machine
and return (text, action). Returns ('', None) on end of
self.start_pos = self.cur_pos
self.start_line = self.cur_line
self.start_col = self.cur_pos - self.cur_line_start
action = self.run_machine_inlined()
if action is not None:
if self.trace:
print("Scanner: read: Performing %s %d:%d" % (
action, self.start_pos, self.cur_pos))
text = self.buffer[
self.start_pos - self.buf_start_pos:
self.cur_pos - self.buf_start_pos]
return (text, action)
if self.cur_pos == self.start_pos:
if self.cur_char is EOL:
if self.cur_char is None or self.cur_char is EOF:
return (u'', None)
raise Errors.UnrecognizedInput(self, self.state_name)
def run_machine_inlined(self):
Inlined version of run_machine for speed.
state = self.initial_state
cur_pos = self.cur_pos
cur_line = self.cur_line
cur_line_start = self.cur_line_start
cur_char = self.cur_char
input_state = self.input_state
next_pos = self.next_pos
buffer = self.buffer
buf_start_pos = self.buf_start_pos
buf_len = len(buffer)
b_action, b_cur_pos, b_cur_line, b_cur_line_start, b_cur_char, b_input_state, b_next_pos = \
action, cur_pos, cur_line, cur_line_start, cur_char, input_state, next_pos
# End inlined self.save_for_backup()
c = cur_char
#new_state = state.new_state(c) #@slow
new_state = state.get(c, NOT_FOUND) #@fast
if new_state is NOT_FOUND: #@fast
new_state = c and state.get('else') #@fast
if new_state:
if trace: #TRACE#
print("State %d" % new_state['number']) #TRACE#
state = new_state
# Begin inlined: self.next_char()
None, 0, 0, 0, u'', 0, 0
trace = self.trace
while 1:
if trace: #TRACE#
print("State %d, %d/%d:%s -->" % ( #TRACE#
state['number'], input_state, cur_pos, repr(cur_char))) #TRACE#
# Begin inlined self.save_for_backup()
#action = state.action #@slow
action = state['action'] #@fast
if action is not None:
b_action, b_cur_pos, b_cur_line, b_cur_line_start, b_cur_char, b_input_state, b_next_pos = \
action, cur_pos, cur_line, cur_line_start, cur_char, input_state, next_pos
# End inlined self.save_for_backup()
c = cur_char
#new_state = state.new_state(c) #@slow
new_state = state.get(c, NOT_FOUND) #@fast
if new_state is NOT_FOUND: #@fast
new_state = c and state.get('else') #@fast
if new_state:
if trace: #TRACE#
print("State %d" % new_state['number']) #TRACE#
state = new_state
# Begin inlined: self.next_char()
if input_state == 1:
cur_pos = next_pos
# Begin inlined: c = self.read_char()
buf_index = next_pos - buf_start_pos
if buf_index < buf_len:
c = buffer[buf_index]
next_pos += 1
discard = self.start_pos - buf_start_pos
data =
buffer = self.buffer[discard:] + data
self.buffer = buffer
buf_start_pos += discard
self.buf_start_pos = buf_start_pos
buf_len = len(buffer)
buf_index -= discard
if data:
c = buffer[buf_index]
next_pos += 1
c = u''
# End inlined: c = self.read_char()
if c == u'\n':
cur_char = EOL
input_state = 2
elif not c:
cur_char = EOL
input_state = 4
cur_char = c
elif input_state == 2:
cur_char = u'\n'
input_state = 3
elif input_state == 3:
cur_line += 1
cur_line_start = cur_pos = next_pos
cur_char = BOL
input_state = 1
elif input_state == 4:
cur_char = EOF
input_state = 5
else: # input_state = 5
cur_char = u''
# End inlined self.next_char()
else: # not new_state
if trace: #TRACE#
print("blocked") #TRACE#
# Begin inlined: action = self.back_up()
if b_action is not None:
(action, cur_pos, cur_line, cur_line_start,
cur_char, input_state, next_pos) = \
(b_action, b_cur_pos, b_cur_line, b_cur_line_start,
b_cur_char, b_input_state, b_next_pos)
action = None
break # while 1
# End inlined: action = self.back_up()
self.cur_pos = cur_pos
self.cur_line = cur_line
self.cur_line_start = cur_line_start
self.cur_char = cur_char
self.input_state = input_state
self.next_pos = next_pos
if trace: #TRACE#
if action is not None: #TRACE#
print("Doing %s" % action) #TRACE#
return action
def next_char(self):
input_state = self.input_state
if self.trace:
print("Scanner: next: %s [%d] %d" % (" " * 20, input_state, self.cur_pos))
if input_state == 1:
cur_pos = next_pos
# Begin inlined: c = self.read_char()
buf_index = next_pos - buf_start_pos
if buf_index < buf_len:
c = buffer[buf_index]
next_pos = next_pos + 1
discard = self.start_pos - buf_start_pos
data =
buffer = self.buffer[discard:] + data
self.buffer = buffer
buf_start_pos = buf_start_pos + discard
self.buf_start_pos = buf_start_pos
buf_len = len(buffer)
buf_index = buf_index - discard
if data:
c = buffer[buf_index]
next_pos = next_pos + 1
self.cur_pos = self.next_pos
c = self.read_char()
if c == u'\n':
self.cur_char = EOL
self.input_state = 2
elif not c:
self.cur_char = EOL
self.input_state = 4
c = u''
# End inlined: c = self.read_char()
if c == u'\n':
cur_char = EOL
input_state = 2
elif not c:
cur_char = EOL
input_state = 4
cur_char = c
self.cur_char = c
elif input_state == 2:
cur_char = u'\n'
input_state = 3
self.cur_char = u'\n'
self.input_state = 3
elif input_state == 3:
cur_line = cur_line + 1
cur_line_start = cur_pos = next_pos
cur_char = BOL
input_state = 1
self.cur_line += 1
self.cur_line_start = self.cur_pos = self.next_pos
self.cur_char = BOL
self.input_state = 1
elif input_state == 4:
cur_char = EOF
input_state = 5
else: # input_state = 5
cur_char = u''
# End inlined self.next_char()
else: # not new_state
if trace: #TRACE#
print("blocked") #TRACE#
# Begin inlined: action = self.back_up()
if b_action is not None:
(action, cur_pos, cur_line, cur_line_start,
cur_char, input_state, next_pos) = \
(b_action, b_cur_pos, b_cur_line, b_cur_line_start,
b_cur_char, b_input_state, b_next_pos)
action = None
break # while 1
# End inlined: action = self.back_up()
self.cur_pos = cur_pos
self.cur_line = cur_line
self.cur_line_start = cur_line_start
self.cur_char = cur_char
self.input_state = input_state
self.next_pos = next_pos
if trace: #TRACE#
if action is not None: #TRACE#
print("Doing %s" % action) #TRACE#
return action
def next_char(self):
input_state = self.input_state
if self.trace:
print("Scanner: next: %s [%d] %d" % (" "*20, input_state, self.cur_pos))
if input_state == 1:
self.cur_pos = self.next_pos
c = self.read_char()
if c == u'\n':
self.cur_char = EOL
self.input_state = 2
elif not c:
self.cur_char = EOL
self.input_state = 4
self.cur_char = c
elif input_state == 2:
self.cur_char = u'\n'
self.input_state = 3
elif input_state == 3:
self.cur_line = self.cur_line + 1
self.cur_line_start = self.cur_pos = self.next_pos
self.cur_char = BOL
self.input_state = 1
elif input_state == 4:
self.cur_char = EOF
self.input_state = 5
else: # input_state = 5
self.cur_char = u''
if self.trace:
print("--> [%d] %d %s" % (input_state, self.cur_pos, repr(self.cur_char)))
def position(self):
Return a tuple (name, line, col) representing the location of
the last token read using the read() method. |name| is the
name that was provided to the Scanner constructor; |line|
is the line number in the stream (1-based); |col| is the
position within the line of the first character of the token
return (, self.start_line, self.start_col)
def get_position(self):
"""Python accessible wrapper around position(), only for error reporting.
return self.position()
def begin(self, state_name):
"""Set the current state of the scanner to the named state."""
self.initial_state = (
self.state_name = state_name
def produce(self, value, text = None):
Called from an action procedure, causes |value| to be returned
as the token value from read(). If |text| is supplied, it is
returned in place of the scanned text.
produce() can be called more than once during a single call to an action
procedure, in which case the tokens are queued up and returned one
at a time by subsequent calls to read(), until the queue is empty,
whereupon scanning resumes.
if text is None:
text = self.text
self.queue.append((value, text))
def eof(self):
Override this method if you want something to be done at
end of file.
self.cur_char = EOF
self.input_state = 5
else: # input_state = 5
self.cur_char = u''
if self.trace:
print("--> [%d] %d %s" % (input_state, self.cur_pos, repr(self.cur_char)))
def position(self):
Return a tuple (name, line, col) representing the location of
the last token read using the read() method. |name| is the
name that was provided to the Scanner constructor; |line|
is the line number in the stream (1-based); |col| is the
position within the line of the first character of the token
return (, self.start_line, self.start_col)
def get_position(self):
"""Python accessible wrapper around position(), only for error reporting.
return self.position()
def begin(self, state_name):
"""Set the current state of the scanner to the named state."""
self.initial_state = (
self.state_name = state_name
def produce(self, value, text=None):
Called from an action procedure, causes |value| to be returned
as the token value from read(). If |text| is supplied, it is
returned in place of the scanned text.
produce() can be called more than once during a single call to an action
procedure, in which case the tokens are queued up and returned one
at a time by subsequent calls to read(), until the queue is empty,
whereupon scanning resumes.
if text is None:
text = self.text
self.queue.append((value, text))
def eof(self):
Override this method if you want something to be done at
end of file.
......@@ -13,147 +13,146 @@ from .Errors import PlexError
class RegexpSyntaxError(PlexError):
def re(s):
Convert traditional string representation of regular expression |s|
into Plex representation.
return REParser(s).parse_re()
Convert traditional string representation of regular expression |s|
into Plex representation.
return REParser(s).parse_re()
class REParser(object):
def __init__(self, s):
self.s = s
self.i = -1
self.end = 0
def parse_re(self):
re = self.parse_alt()
if not self.end:
self.error("Unexpected %s" % repr(self.c))
return re
def parse_alt(self):
"""Parse a set of alternative regexps."""
re = self.parse_seq()
if self.c == '|':
re_list = [re]
while self.c == '|':
def __init__(self, s):
self.s = s
self.i = -1
self.end = 0
re = Alt(*re_list)
return re
def parse_seq(self):
"""Parse a sequence of regexps."""
re_list = []
while not self.end and not self.c in "|)":
return Seq(*re_list)
def parse_mod(self):
"""Parse a primitive regexp followed by *, +, ? modifiers."""
re = self.parse_prim()
while not self.end and self.c in "*+?":
if self.c == '*':
re = Rep(re)
elif self.c == '+':
re = Rep1(re)
else: # self.c == '?'
re = Opt(re)
return re
def parse_prim(self):
"""Parse a primitive regexp."""
c = self.get()
if c == '.':
re = AnyBut("\n")
elif c == '^':
re = Bol
elif c == '$':
re = Eol
elif c == '(':
re = self.parse_alt()
elif c == '[':
re = self.parse_charset()
if c == '\\':
def parse_re(self):
re = self.parse_alt()
if not self.end:
self.error("Unexpected %s" % repr(self.c))
return re
def parse_alt(self):
"""Parse a set of alternative regexps."""
re = self.parse_seq()
if self.c == '|':
re_list = [re]
while self.c == '|':
re = Alt(*re_list)
return re
def parse_seq(self):
"""Parse a sequence of regexps."""
re_list = []
while not self.end and not self.c in "|)":
return Seq(*re_list)
def parse_mod(self):
"""Parse a primitive regexp followed by *, +, ? modifiers."""
re = self.parse_prim()
while not self.end and self.c in "*+?":
if self.c == '*':
re = Rep(re)
elif self.c == '+':
re = Rep1(re)
else: # self.c == '?'
re = Opt(re)
return re
def parse_prim(self):
"""Parse a primitive regexp."""
c = self.get()
re = Char(c)
return re
def parse_charset(self):
"""Parse a charset. Does not include the surrounding []."""
char_list = []
invert = 0
if self.c == '^':
invert = 1
if self.c == ']':
while not self.end and self.c != ']':
c1 = self.get()
if self.c == '-' and self.lookahead(1) != ']':
if c == '.':
re = AnyBut("\n")
elif c == '^':
re = Bol
elif c == '$':
re = Eol
elif c == '(':
re = self.parse_alt()
elif c == '[':
re = self.parse_charset()
if c == '\\':
c = self.get()
re = Char(c)
return re
def parse_charset(self):
"""Parse a charset. Does not include the surrounding []."""
char_list = []
invert = 0
if self.c == '^':
invert = 1
if self.c == ']':
while not self.end and self.c != ']':
c1 = self.get()
if self.c == '-' and self.lookahead(1) != ']':
c2 = self.get()
for a in xrange(ord(c1), ord(c2) + 1):
chars = ''.join(char_list)
if invert:
return AnyBut(chars)
return Any(chars)
def next(self):
"""Advance to the next char."""
s = self.s
i = self.i = self.i + 1
if i < len(s):
self.c = s[i]
self.c = ''
self.end = 1
def get(self):
if self.end:
self.error("Premature end of string")
c = self.c
c2 = self.get()
for a in xrange(ord(c1), ord(c2) + 1):
chars = ''.join(char_list)
if invert:
return AnyBut(chars)
return Any(chars)
def next(self):
"""Advance to the next char."""
s = self.s
i = self.i = self.i + 1
if i < len(s):
self.c = s[i]
self.c = ''
self.end = 1
def get(self):
if self.end:
self.error("Premature end of string")
c = self.c
return c
def lookahead(self, n):
"""Look ahead n chars."""
j = self.i + n
if j < len(self.s):
return self.s[j]
return ''
def expect(self, c):
Expect to find character |c| at current position.
Raises an exception otherwise.
if self.c == c:
self.error("Missing %s" % repr(c))
def error(self, mess):
"""Raise exception to signal syntax error in regexp."""
raise RegexpSyntaxError("Syntax error in regexp %s at position %d: %s" % (
repr(self.s), self.i, mess))
return c
def lookahead(self, n):
"""Look ahead n chars."""
j = self.i + n
if j < len(self.s):
return self.s[j]
return ''
def expect(self, c):
Expect to find character |c| at current position.
Raises an exception otherwise.
if self.c == c:
self.error("Missing %s" % repr(c))
def error(self, mess):
"""Raise exception to signal syntax error in regexp."""
raise RegexpSyntaxError("Syntax error in regexp %s at position %d: %s" % (
repr(self.s), self.i, mess))
# Plex - Transition Maps
# Plex - Transition Maps
# This version represents state sets directly as dicts for speed.
# This version represents state sets directly as dicts for speed.
from __future__ import absolute_import
......@@ -10,229 +10,231 @@ from sys import maxint as maxint
class TransitionMap(object):
A TransitionMap maps an input event to a set of states.
An input event is one of: a range of character codes,
the empty string (representing an epsilon move), or one
of the special symbols BOL, EOL, EOF.
For characters, this implementation compactly represents
the map by means of a list:
[code_0, states_0, code_1, states_1, code_2, states_2,
..., code_n-1, states_n-1, code_n]
where |code_i| is a character code, and |states_i| is a
set of states corresponding to characters with codes |c|
in the range |code_i| <= |c| <= |code_i+1|.
The following invariants hold:
n >= 1
code_0 == -maxint
code_n == maxint
code_i < code_i+1 for i in 0..n-1
states_0 == states_n-1
Mappings for the special events '', BOL, EOL, EOF are
kept separately in a dictionary.
map = None # The list of codes and states
special = None # Mapping for special events
def __init__(self, map = None, special = None):
if not map:
map = [-maxint, {}, maxint]
if not special:
special = {} = map
self.special = special
#self.check() ###
def add(self, event, new_state,
TupleType = tuple):
Add transition to |new_state| on |event|.
A TransitionMap maps an input event to a set of states.
An input event is one of: a range of character codes,
the empty string (representing an epsilon move), or one
of the special symbols BOL, EOL, EOF.
For characters, this implementation compactly represents
the map by means of a list:
[code_0, states_0, code_1, states_1, code_2, states_2,
..., code_n-1, states_n-1, code_n]
where |code_i| is a character code, and |states_i| is a
set of states corresponding to characters with codes |c|
in the range |code_i| <= |c| <= |code_i+1|.
The following invariants hold:
n >= 1
code_0 == -maxint
code_n == maxint
code_i < code_i+1 for i in 0..n-1
states_0 == states_n-1
Mappings for the special events '', BOL, EOL, EOF are
kept separately in a dictionary.
if type(event) is TupleType:
code0, code1 = event
i = self.split(code0)
j = self.split(code1)
map =
while i < j:
map[i + 1][new_state] = 1
i = i + 2
self.get_special(event)[new_state] = 1
def add_set(self, event, new_set,
TupleType = tuple):
Add transitions to the states in |new_set| on |event|.
if type(event) is TupleType:
code0, code1 = event
i = self.split(code0)
j = self.split(code1)
map =
while i < j:
map[i + 1].update(new_set)
i = i + 2
def get_epsilon(self,
none = None):
Return the mapping for epsilon, or None.
return self.special.get('', none)
def iteritems(self,
len = len):
Return the mapping as an iterable of ((code1, code2), state_set) and
(special_event, state_set) pairs.
result = []
map =
else_set = map[1]
i = 0
n = len(map) - 1
code0 = map[0]
while i < n:
set = map[i + 1]
code1 = map[i + 2]
if set or else_set:
result.append(((code0, code1), set))
code0 = code1
i = i + 2
for event, set in self.special.iteritems():
if set:
result.append((event, set))
return iter(result)
items = iteritems
# ------------------- Private methods --------------------
def split(self, code,
len = len, maxint = maxint):
Search the list for the position of the split point for |code|,
inserting a new split point if necessary. Returns index |i| such
that |code| == |map[i]|.
# We use a funky variation on binary search.
map =
hi = len(map) - 1
# Special case: code == map[-1]
if code == maxint:
return hi
# General case
lo = 0
# loop invariant: map[lo] <= code < map[hi] and hi - lo >= 2
while hi - lo >= 4:
# Find midpoint truncated to even index
mid = ((lo + hi) // 2) & ~1
if code < map[mid]:
hi = mid
lo = mid
# map[lo] <= code < map[hi] and hi - lo == 2
if map[lo] == code:
return lo
map[hi:hi] = [code, map[hi - 1].copy()]
#self.check() ###
return hi
def get_special(self, event):
Get state set for special event, adding a new entry if necessary.
special = self.special
set = special.get(event, None)
if not set:
set = {}
special[event] = set
return set
# --------------------- Conversion methods -----------------------
def __str__(self):
map_strs = []
map =
n = len(map)
i = 0
while i < n:
code = map[i]
if code == -maxint:
code_str = "-inf"
elif code == maxint:
code_str = "inf"
code_str = str(code)
i = i + 1
if i < n:
i = i + 1
special_strs = {}
for event, set in self.special.iteritems():
special_strs[event] = state_set_str(set)
return "[%s]+%s" % (
# --------------------- Debugging methods -----------------------
def check(self):
"""Check data structure integrity."""
if not[-3] <[-1]:
assert 0
def dump(self, file):
map =
i = 0
n = len(map) - 1
while i < n:
self.dump_range(map[i], map[i + 2], map[i + 1], file)
i = i + 2
for event, set in self.special.iteritems():
if set:
if not event:
event = 'empty'
self.dump_trans(event, set, file)
def dump_range(self, code0, code1, set, file):
if set:
if code0 == -maxint:
if code1 == maxint:
k = "any"
map = None # The list of codes and states
special = None # Mapping for special events
def __init__(self, map=None, special=None):
if not map:
map = [-maxint, {}, maxint]
if not special:
special = {} = map
self.special = special
#self.check() ###
def add(self, event, new_state,
Add transition to |new_state| on |event|.
if type(event) is TupleType:
code0, code1 = event
i = self.split(code0)
j = self.split(code1)
map =
while i < j:
map[i + 1][new_state] = 1
i += 2
self.get_special(event)[new_state] = 1
def add_set(self, event, new_set,
Add transitions to the states in |new_set| on |event|.
if type(event) is TupleType:
code0, code1 = event
i = self.split(code0)
j = self.split(code1)
map =
while i < j:
map[i + 1].update(new_set)
i += 2
def get_epsilon(self,
Return the mapping for epsilon, or None.
return self.special.get('', none)
def iteritems(self,
Return the mapping as an iterable of ((code1, code2), state_set) and
(special_event, state_set) pairs.
result = []
map =
else_set = map[1]
i = 0
n = len(map) - 1
code0 = map[0]
while i < n:
set = map[i + 1]
code1 = map[i + 2]
if set or else_set:
result.append(((code0, code1), set))
code0 = code1
i += 2
for event, set in self.special.iteritems():
if set:
result.append((event, set))
return iter(result)
items = iteritems
# ------------------- Private methods --------------------
def split(self, code,
len=len, maxint=maxint):
Search the list for the position of the split point for |code|,
inserting a new split point if necessary. Returns index |i| such
that |code| == |map[i]|.
# We use a funky variation on binary search.
map =
hi = len(map) - 1
# Special case: code == map[-1]
if code == maxint:
return hi
# General case
lo = 0
# loop invariant: map[lo] <= code < map[hi] and hi - lo >= 2
while hi - lo >= 4:
# Find midpoint truncated to even index
mid = ((lo + hi) // 2) & ~1
if code < map[mid]:
hi = mid
lo = mid
# map[lo] <= code < map[hi] and hi - lo == 2
if map[lo] == code:
return lo
k = "< %s" % self.dump_char(code1)
elif code1 == maxint:
k = "> %s" % self.dump_char(code0 - 1)
elif code0 == code1 - 1:
k = self.dump_char(code0)
k = "%s..%s" % (self.dump_char(code0),
self.dump_char(code1 - 1))
self.dump_trans(k, set, file)
def dump_char(self, code):
if 0 <= code <= 255:
return repr(chr(code))
return "chr(%d)" % code
def dump_trans(self, key, set, file):
file.write(" %s --> %s\n" % (key, self.dump_set(set)))
def dump_set(self, set):
return state_set_str(set)
map[hi:hi] = [code, map[hi - 1].copy()]
#self.check() ###
return hi
def get_special(self, event):
Get state set for special event, adding a new entry if necessary.
special = self.special
set = special.get(event, None)
if not set:
set = {}
special[event] = set
return set
# --------------------- Conversion methods -----------------------
def __str__(self):
map_strs = []
map =
n = len(map)
i = 0
while i < n:
code = map[i]
if code == -maxint:
code_str = "-inf"
elif code == maxint:
code_str = "inf"
code_str = str(code)
i += 1
if i < n:
i += 1
special_strs = {}
for event, set in self.special.iteritems():
special_strs[event] = state_set_str(set)
return "[%s]+%s" % (
# --------------------- Debugging methods -----------------------
def check(self):
"""Check data structure integrity."""
if not[-3] <[-1]:
assert 0
def dump(self, file):
map =
i = 0
n = len(map) - 1
while i < n:
self.dump_range(map[i], map[i + 2], map[i + 1], file)
i += 2
for event, set in self.special.iteritems():
if set:
if not event:
event = 'empty'
self.dump_trans(event, set, file)
def dump_range(self, code0, code1, set, file):
if set:
if code0 == -maxint:
if code1 == maxint:
k = "any"
k = "< %s" % self.dump_char(code1)
elif code1 == maxint:
k = "> %s" % self.dump_char(code0 - 1)
elif code0 == code1 - 1:
k = self.dump_char(code0)
k = "%s..%s" % (self.dump_char(code0),
self.dump_char(code1 - 1))
self.dump_trans(k, set, file)
def dump_char(self, code):
if 0 <= code <= 255:
return repr(chr(code))
return "chr(%d)" % code
def dump_trans(self, key, set, file):
file.write(" %s --> %s\n" % (key, self.dump_set(set)))
def dump_set(self, set):
return state_set_str(set)
# State set manipulation functions
......@@ -243,4 +245,4 @@ class TransitionMap(object):
# set1[state] = 1
def state_set_str(set):
return "[%s]" % ','.join(["S%d" % state.number for state in set])
return "[%s]" % ','.join(["S%d" % state.number for state in set])
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment