"""
Defines the time Context that constructs unit types and manages their
relationship to other unit types.
The contents of this module are largely the mechanics under the hood and should
not be used directly.
.. danger:: NOTHING IN THIS FILE SHOULD BE RELIED UPON.
.. danger:: THESE INTERFACES ARE SUBJECT TO CHANGE WITHOUT WARNING
"""
import collections
import fractions
import functools
import operator
from . import abstract
[docs]class Unit(int):
"""
The base class for Measures and Points subclasses across all Time Contexts.
"""
__slots__ = ()
@classmethod
def construct(typ,
units, parts, start = 0,
op = operator.add,
Queue = collections.deque, int = int
):
d = Queue() # for opening containers
popleft = d.popleft
append = d.append
prepend = d.appendleft
terms = {}
# Keyword processing. First, combine like terms.
append(parts.items())
containers = typ.context.containers
convert = typ.context.convert
getterm = typ.context.terms.get
target_unit = typ.unit
target_term = getterm(target_unit)
total = start
# currently containers can return containers,
# thus this. :(
while d:
parts = popleft()
for unit, value in parts:
# a given unit is an expression of a "term".
# years are an expression of months
# seconds are an expression of days
term = getterm(unit)
if term is None:
# not a term, assume that it's a container.
prepend(containers[unit][1](typ, value)) # open container unit
elif term == target_term:
# simple conversion is needed for like-term units.
total = op(total, convert(unit, target_unit, value))
else:
# not a like term. sum up all the unlike terms for later conversion.
terms[term] = terms.get(term, 0) + convert(unit, term, value)
# apply units
for x in units:
total = op(total, convert(x.unit, target_unit, (int(x) + x.datum)))
for term, value in terms.items():
# first convert the existing total to the unlike-term units.
# this gives context for the term's value.
ctx = convert(target_unit, term, total)
# maintain the difference (conversion remainder)
dif = total - convert(term, target_unit, ctx)
# apply the like term value to the ctx
local = op(ctx, value)
# convert both back to the target unit and
# apply the difference to the actual total
total = convert(term, target_unit, local) + dif
return typ(int(total) - typ.datum)
@classmethod
def of(typ, *units, **parts):
return typ.construct(units, parts)
def update(self, part = None, replacement = None, of = None, align = 0):
# adjust self by the difference of the new value and the selection.
return self.construct((self,), {
part: replacement - self.select(part, of = of, align = align)
})
def truncate(self, unit, int = int):
return self.construct((), {unit: self.select(unit)})
def select(self, part, of = None, align = 0):
if part in self.context.containers:
# container type? no need for conversions.
return self.context.containers[part][0](self, of)
elif of is None:
# no of-whole? just convert and return
return int(self.context.convert(self.unit, part, self + self.datum))
convert = self.context.convert
# A few significant factors in selection.
this_unit = self.unit
# What is the term of the part and of-whole?
part_term = self.context.terms[part]
of_term = self.context.terms[of]
if of_term == self.liketerm:
# The requested part is of the same term as the instance.
# A simple modulus does the trick.
boundary = convert(of, this_unit, 1)
selection = self % boundary
else:
# the of_term is not a liketerm, convert self to the of_term.
total = self + self.datum
unlike_selection = convert(this_unit, of, total)
if part_term == of_term:
# the part term is the same as the of_term,
# so get the base selection.
boundary = convert(of, part, 1)
selection = convert(of, part, unlike_selection) % boundary
this_unit = part
else:
boundary = convert(of, this_unit, unlike_selection)
selection = total - boundary
if align:
# after getting the selection
offset = convert(part, this_unit, 1) * align
selection = (selection + offset) % boundary
return int(selection * self.context.compose(this_unit, part))
class Measure(Unit):
__slots__ = ()
@property
def start(self):
return self.__class__(0)
@property
def stop(self):
return self
def __neg__(self):
return self.__class__(super().__neg__())
def __abs__(self):
return self.__class__(super().__abs__())
def __str__(self):
return str(self.select(self.unit))
def __repr__(self, format = "{2}{0}.of({1})".format):
# XXX: this is clearly stupid slow
if self < 0:
sign = '-'
sub = -self
else:
sign = ''
sub = self
seq = self.context.measure_repr[self.liketerm]
prev = None
fields = []
for x in seq:
y = sub.select(x, prev)
prev = x
if y:
sub = sub.elapse(**{x: -y})
fields.append('{0}={1!s}'.format(x,y))
return format(
self.__name__,
', '.join(fields),
sign
)
def __contains__(self, t):
return 0 <= t < self
def elapse(self, *args, **parts):
return self.of(self, *args, **parts)
adjust = elapse
def increase(self, *units, **parts):
return self.construct(units, parts, start = self)
def decrease(self, *units, **parts):
return self.construct(units, parts, start = self, op = operator.sub)
abstract.Measure.register(Measure)
class Point(Unit):
@property
def start(self):
return self
@property
def stop(self):
return self.__class__(self + self.magnitude)
def __contains__(self, t):
return (self + self.datum) == t.select(self.unit)
# XXX: depending on a definition in .lib; stop it!
def __str__(self):
return self.select('iso')
def __repr__(self, format = "{0}.of(iso={1})".format):
return format(self.__name__, repr(self.select('iso')))
def rollback(self, *units, **parts):
return self.construct(units, parts, start = self + self.datum, op = operator.sub)
def elapse(self, *units, **parts):
return self.construct(units, parts, start = self + self.datum)
def measure(self, pit):
return self.Measure(pit - self)
abstract.Measure.register(Point)
[docs]class Range(tuple):
"""
A range between two points, inclusive on the start, but non-inclusive on the end.
If the start is greater than the end, the direction is implied to be
negative.
"""
__slots__ = ()
@property
def magnitude(self):
return abs(self.start.measure(self.stop))
@property
def direction(self):
return self.start.measure(self.stop) // abs(self.magnitude)
def __contains__(self, pit):
# XXX: doesn't consider direction
point = pit.measure(self.start)
if point < 0:
return False
mag = self.stop.measure(self.start)
return point < mag
def __iter__(self):
return self.points()
@property
def start(self):
return self[0]
@property
def stop(self):
return self[1]
[docs] def points(self, step = None):
"""
Iterate through all the points between range according to the given step.
"""
start = self.start
stop = self.stop
# init the step
if step is None:
# default to the start's type
step = start.Measure(start.magnitude)
dir = self.direction # if start < stop, -1 else 1
# adjust step according to the direction
step = step.__class__(dir * step)
pos = start
while pos < stop:
yield pos
pos = pos.elapse(step)
[docs]class Context(object):
"""
A container for time units and transformations.
.. warning:: **The APIs here are subject to change.**
"""
def __init__(self):
# opaque transformations
self.datums = {}
self.terms = {} # grouping of like-terms
self.bridges = {} # convert between unlike-terms
self.ratios = {} # specifies ratios between like-terms
self.containers = {} # "terms" containing sets of terms.
self.measures = {} # scalar types
self.measure_repr = {} # term to unit sequence to build out Scalar repr()
self.points = {} # PointInTime types
self.names = {} # unit names
self.constants = {}
[docs] def declare(self, id, datum):
"""
Declare a fundamental unit for use in a context.
All defined, :py:meth:`rhythm.libunit.Context.define`, units are defined
in terms of a declared unit.
"""
if not id.isidentifier():
raise ValueError("unit names must be valid identifiers")
self.ratios[id] = {id : fractions.Fraction(1,1)} # unit-to-unit is 1-to-1
self.terms[id] = id
self.measures[id] = {}
self.points[id] = {}
self.datums[id] = datum
[docs] def define(self, id, term, exponent, base = 10):
"""
Defines a Unit in terms of another unit.
"""
if not id.isidentifier():
raise ValueError("unit names must be valid identifiers")
termu = self.terms[term]
self.terms[id] = termu
self.ratios[termu][id] = self.ratios[termu][term] * (base ** exponent)
[docs] def bridge(self, from_unit, to_unit, transformer):
"""
Note a "bridge" between two units.
In the case where a unit cannot not be resolved from its definitions,
bridges can be used to perform the conversion.
"""
self.bridges[(from_unit,to_unit)] = transformer
def container(self, id, pack, unpack):
if not id.isidentifier():
raise ValueError("container names must be valid identifiers")
self.containers[id] = (pack, unpack)
def constant(self, id, value):
self.constants[id] = value
@functools.lru_cache()
[docs] def compose(self, from_unit, to_unit):
"""
Compose two ratios into another so that the `from_unit` can be converted
into the `to_unit`.
"""
ratios = self.ratios[self.terms[from_unit]]
r = ratios[from_unit] / ratios[to_unit]
ir = int(r)
if ir == r:
return ir
else:
return r
[docs] def convert(self, from_unit, to_unit, value):
"""
Convert the `value` into `to_unit` from the `from_unit`.
"""
if from_unit in self.containers:
pkg = self.containers[from_unit][1](value)
return sum([
self.convert(part, to_unit, value)
for part, value in pkg
])
else:
from_term = self.terms[from_unit]
to_term = self.terms[to_unit]
if from_term == to_term:
# like terms, multiple by the composed ratio
return value * self.compose(from_unit, to_unit)
else:
# convert to bridge type, bridge, then from bridge type.
bu = value * self.compose(from_unit, from_term)
bdu = self.bridges[(from_term, to_term)](bu)
return bdu * self.compose(to_term, to_unit)
def new_measure_class(self, id, qname = None):
Measure = self.measure_factory(id, qname)
# ABC registration
abstract.Time.register(Measure)
# Associate with related unit.
self.measures[Measure.liketerm][Measure.unit] = Measure
return Measure
def new_point_class(self, Measure, qname = None):
Point = self.point_factory(Measure, qname)
# ABC registration
abstract.Point.register(Point)
# Associate with related unit.
self.points[Point.liketerm][Point.unit] = Point
return Point
def datum_for_point(self, to_unit):
return int(self.convert(
self.terms[to_unit], to_unit,
self.datums[self.terms[to_unit]]
))
def point_from_unit(self, unit):
return self.points[self.terms[unit]][unit]
def represent(self, term, unitseq):
self.measure_repr[term] = unitseq
[docs] def point_factory(self, Measure, qname, Class = Point, point_magnitude = 1):
"""
Construct a Point class from the given scalar.
"""
class Point(Class):
__slots__ = ()
__name__ = qname
unit = Measure.unit
name = Measure.name
datum = self.datum_for_point(Measure.unit)
# vector properties
magnitude = point_magnitude # Vector is: Point -> Point+magnitude
context = self
liketerm = self.terms[Measure.unit]
Point.Measure = Measure
return Point
[docs] def measure_factory(self, id, qname, Class = Measure, name = None, address = None):
"""
Construct a measure with the designated unit identifier and class.
"""
proper_name = name or id
# Build constructors/classes for the parameterized unit.
class Measure(Class):
__slots__ = ()
__name__ = qname
unit = id
name = proper_name
datum = 0
context = self
liketerm = self.terms[id]
return Measure
[docs]def standard_context(qname):
"""
Construct the standard time context from the modules in rhythm.
"""
from . import earth
from . import metric
from . import week
from . import gregorian
from . import libformat
from . import libzone
context = Context()
# Most practical time units are actually related to a day.
# Here, we declare the datum for day based PiTs
context.declare('day', 5 * ((((365*4) + 1) * 100) - 3) + 1)
# Likewise, Month PiTs are relative to Y2K
context.declare('month', 2000*12)
# NOTE: The month offset and day offset are *not* equal.
# Day offsets are relative to the beginning of the first week
# in Y2K in order to aid week updates.
earth.context(context)
gregorian.context(context)
week.context(context)
metric.context(context)
libformat.context(context) # 'iso' and 'rfc' containers
context.represent('day', [
'petasecond',
'annum',
'week',
'day',
'hour',
'minute',
'second',
'millisecond',
'microsecond',
'nanosecond',
])
context.represent('month', [
'millennium',
'century',
'decade',
'year',
'month',
])
measures = (
context.new_measure_class(
'nanosecond', qname = (qname + '.Measure')),
context.new_measure_class(
'day', qname = qname + '.Days'),
context.new_measure_class(
'week', qname = qname + '.Weeks'),
context.new_measure_class(
'month', qname = qname + '.Months'),
)
points = (
context.new_point_class(
measures[0], qname = qname + '.Timestamp'),
context.new_point_class(
measures[1], qname = qname + '.Date'),
context.new_point_class(
measures[2], qname = qname + '.Week'),
context.new_point_class(
measures[3], qname = qname + '.GregorianMonth')
)
unix_delta = (
points[0].datum - points[0].of(date=(1970,1,1)).measure(
points[0].of(date=(2000,1,2))
).select(points[0].unit)
)
# XXX: pretty much assuming the desired/possible precision of `x` here..
def unpack_unix(typ, x, delta = unix_delta):
return ('nanosecond', int(x * 1000000000) + delta),
def pack_unix(pit, arg, delta = unix_delta):
return (pit.select(pit.unit) - delta) / 1000000
context.container('unix', pack_unix, unpack_unix)
context.constant('unix', unix_delta)
return (context, measures, points)