# -*- coding: utf-8 -*-
import pickle
import inspect
import logging
import numpy as np
import numba
from numpy.lib.format import open_memmap
import fractalshades as fs
#import fractalshades.numpy_utils.xrange as fsx
import fractalshades.numpy_utils.expr_parser as fs_parser
logger = logging.getLogger(__name__)
[docs]
class Postproc_batch:
[docs]
def __init__(self, fractal, calc_name):
"""
Container for several postprocessing objects (instances of `Postproc`)
which share the same Fractal & calculation (*calc_name*)
It is usefull to have such container to avoid re-calculation of data
that can be shared between several post-processing objects.
Parameters
==========
fractal : fractalshades.Fractal instance
The shared fractal object
calc_name : str
The string identifier for the shared calculation
"""
self.fractal = fractal
self.calc_name = calc_name
self.posts = dict()
self.postnames_2d = []
# Temporary data used when iterating chunk_slices
self.raw_data = dict()
self.context = dict()
# self.clear_chunk_data()
@property
def postnames(self):
return self.posts.keys()
[docs]
def add_postproc(self, postname, postproc):
"""
Add successive postprocessing and ensure they share identical
*fractal* & *calc_name*
Parameters
==========
postname : str
The string identifier that will be associated with this
post-processing object
postproc : `Postproc` instance
The post-processing object to be added
Notes
=====
.. warning::
A Postproc can only be added to one batch.
"""
if postproc.batch is not None:
raise ValueError("Postproc can only be linked to one batch")
postproc.link_batch(self) # Give access to batch members from postproc
if postproc.field_count == 1:
self.posts[postname] = postproc
elif postproc.field_count == 2:
# We need to add 2 separate fields
x_field = postproc.x
y_field = postproc.y
x_field.link_sibling(y_field)
y_field.link_sibling(x_field)
x_field.link_batch(self)
y_field.link_batch(self)
self.posts[postname + "_x"] = x_field
self.posts[postname + "_y"] = y_field
self.postnames_2d += [postname]
else:
raise ValueError(
f"postproc.field_count bigger than 2: {postproc.field_count}"
)
def set_chunk_data(self, chunk_slice, chunk_mask, c_pix,
Z, U, stop_reason, stop_iter, complex_dic, int_dic, termination_dic
):
# self.chunk_slice = chunk_slice
self.raw_data[chunk_slice] = (
chunk_mask, c_pix, Z, U, stop_reason, stop_iter,
complex_dic, int_dic, termination_dic
)
self.context[chunk_slice] = dict()
def update_context(self, chunk_slice, context_update):
self.context[chunk_slice].update(context_update)
def clear_chunk_data(self, chunk_slice):
""" Temporary data shared by postproc items when iterating a given
chunk """
del self.raw_data[chunk_slice]
del self.context[chunk_slice]
[docs]
class Postproc:
"""
Abstract base class for all postprocessing objects.
"""
field_count = 1
def __init__(self):
""" Filter None values : removing them from post_dic.
"""
self.batch = None
self._holomorphic = None
def link_batch(self, batch):
"""
Link to Postproc group
:meta private:
"""
self.batch = batch
@property
def fractal(self):
return self.batch.fractal
@property
def calc_name(self):
return self.batch.calc_name
@property
def raw_data(self):
return self.batch.raw_data
@property
def context(self):
return self.batch.context
@property
def state(self):
# The fractal state object
return self.fractal._calc_data[self.calc_name]["state"]
@property
def holomorphic(self):
if self._holomorphic is None:
complex_type = self.batch.fractal.complex_type
select = {
np.dtype(np.float64): False,
np.dtype(np.complex128): True
}
self._holomorphic = select[np.dtype(complex_type)]
return self._holomorphic
def ensure_context(self, chunk_slice, key):
"""
Check that the provided context contains the expected data
:meta private:
"""
try:
return self.context[chunk_slice][key]
except KeyError:
msg = ("{} should be computed before {}. "
"Please add the relevant item to this post-processing "
"batch.")
raise RuntimeError(msg.format(key, type(self).__name__))
def __getitem__(self, chunk_slice):
""" Subclasses should implement
"""
raise NotImplementedError()
def get_zn(self, Z, complex_dic):
if self.holomorphic:
return Z[complex_dic["zn"], :]
else:
return Z[complex_dic["xn"], :] + 1j * Z[complex_dic["yn"], :]
def get_dzndc(self, Z, c_pix, complex_dic):
""" Returns the derivatives taking into account potential scaling
due to transfomation
"""
proj = self.fractal.projection
if self.holomorphic:
dzndc = Z[complex_dic["dzndc"], :]
if proj.df is not None:
return apply_df(proj.df, c_pix, dzndc)
else:
return dzndc
else:
dXdA = Z[complex_dic["dxnda"], :]
dXdB = Z[complex_dic["dxndb"], :]
dYdA = Z[complex_dic["dynda"], :]
dYdB = Z[complex_dic["dyndb"], :]
if proj.dfBS is not None:
return apply_dfBS(proj.dfBS, c_pix, dXdA, dXdB, dYdA, dYdB)
else:
return dXdA, dXdB, dYdA, dYdB
def get_std_cpt(self, c_pix):
""" image pixel -> c mapping
"""
# Note: we *could* store this at pp batch level but for the moment it
# is only used for 1 postproc kind (Fieldlines) so not worth the effort
return self.fractal.get_std_cpt(c_pix)
[docs]
class Raw_pp(Postproc):
[docs]
def __init__(self, key, func=None):
"""A raw postproc provide direct access to the res data stored (memmap)
during calculation. (Similar to what does `Fractal_array` but
here within a `Postproc_batch`)
Parameters
==========
key: str
The raw data key. Should be one of the *complex_codes*,
*int_codes*, *termination_codes* for this calculation.
func: None | callable | a str of variable x (e.g. "np.sin(x)")
will be applied as a pre-processing step to the raw data if not
`None`
"""
super().__init__()
self.key = key
self.func = func
if isinstance(func, str):
self.func = fs_parser.func_parser(["x"], func)
def __getitem__(self, chunk_slice):
""" Returns the raw array, with self.func applied if not None """
fractal = self.fractal
calc_name = self.calc_name
func = self.func
key = self.key
codes = fractal._calc_data[calc_name]["saved_codes"]
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
kind = fractal.kind_from_code(self.key, codes)
if kind == "complex":
if func is None:
raise ValueError("for batch post-processing of {}, complex "
"vals should be mapped to float, provide `func` ({}).".format(
type(self).__name__, self.key))
arr = Z[complex_dic[key], :]
elif kind == "int":
arr = U[int_dic[key], :]
elif key == "stop_reason":
arr = stop_reason[0, :]
elif key == "stop_iter":
arr = stop_iter[0, :]
if func is None:
return arr, {}
return func(arr), {}
[docs]
class Continuous_iter_pp(Postproc):
[docs]
def __init__(self, kind=None, d=None, a_d=None, M=None, epsilon_cv=None,
floor_iter=0):
"""Return a continuous iteration number: this is the iteration count at
bailout, plus a fractionnal part to allow smooth coloring.
Implementation based on potential, for details see:
- *On Smooth Fractal Coloring Techniques*,
**Jussi Härkönenen**, Abo University, 2007
Parameters
==========
kind : str "infinity" | "convergent" | "transcendent"
the classification of potential function.
d : None | int
degree of polynome d >= 2 if kind = "infinity"
a_d : None | float
coeff of higher order monome if kind = "infinity"
M : None | float
High number corresponding to the criteria for stopping iteration
if kind = "infinity"
epsilon_cv : None | float
Small number corresponding to the criteria for stopping iteration
if kind = "convergent"
floor_iter : int
If not 0, a shift will be applied and floor_iter become the 0th
iteration.
Used to avoids loss of precision (banding) for very large iteration
numbers, usually above several milions. For
reference, the largest integer that cannot be accurately
represented with a float32 is > 16 M), banding will start to be
noticeable before.
Notes
=====
.. note::
Usually it is not needed to set any parameter ;
if a parameter ``param``
is not provided, it will defaut to the attribute of
the fractal with name ``potential_<param>``, which should be the
recommended value.
"""
super().__init__()
post_dic = {
"kind": kind,
"d": d,
"a_d": a_d,
"M": M,
"epsilon_cv": epsilon_cv
}
self.post_dic = {k: v for k, v in post_dic.items() if v is not None}
self._potential_dic = None
self._floor_iter = floor_iter
@property
def potential_dic(self):
"""
Returns the potential parameters properties
priority order :
1) user input to Continuous_iter_pp constructor
2) fractal (or fractal 'state') defaut potential attributes
("potential_" + prop)
3) default to None if 1) and 2) fail. """
if self._potential_dic is not None:
return self._potential_dic
post_dic = self.post_dic
_potential_dic = {}
fractal = self.fractal # The factal
for prop in ["kind", "d", "a_d", "M_cutoff"]:
_potential_dic[prop] = post_dic.get(
prop, getattr(fractal, "potential_" + prop, None)
)
state = self.state # The fractal state
for prop in ["M", "epsilon_cv"]:
_potential_dic[prop] = post_dic.get(
prop, getattr(state, "potential_" + prop, None)
)
self._potential_dic = _potential_dic
return _potential_dic
def __getitem__(self, chunk_slice):
""" Returns the real Iteration number
"""
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
n = stop_iter[0, :]
potential_dic = self.potential_dic
zn = self.get_zn(Z, complex_dic) #["zn"], :]
if potential_dic["kind"] == "infinity":
d = potential_dic["d"]
a_d = potential_dic["a_d"]
M = potential_dic["M"]
nu_frac = Continuous_iter_pp_infinity(zn, d, a_d, M)
elif potential_dic["kind"] == "convergent":
eps = potential_dic["epsilon_cv"]
attractivity = Z[complex_dic["attractivity"]]
z_star = Z[complex_dic["zr"]]
nu_frac = Continuous_iter_pp_convergent(
zn, eps, attractivity, z_star
)
elif potential_dic["kind"] == "transcendent":
# Not possible to define a proper potential for a
# transcendental fonction
nu_frac = 0.
else:
raise NotImplementedError(
f"Potential kind: {potential_dic['kind']} unsupported"
)
# We need to take care of special cases to ensure that
# -1 < nu_frac <= 0.
# This happen e.g. when the pixel hasn't reach the limit circle
# at current max_iter, so its status is undefined.
nu_div, nu_mod = np.divmod(-nu_frac, 1.)
nu_frac = - nu_mod
n = n - nu_div.astype(n.dtype) # need explicit casting to int
nu = (n - self._floor_iter) + nu_frac
val = nu
context_update = {
"potential_dic": self.potential_dic,
"nu_frac": nu_frac,
"n": n, # + jump,
"backshift": getattr(self.state, "backshift", None)
}
return val, context_update
[docs]
class Fieldlines_pp(Postproc):
[docs]
def __init__(self, n_iter=5, swirl=0., endpoint_k=1.0):
"""
Return a continuous orbit-averaged angular value, allowing to reveal
fieldlines. Implementation based on [#f2]_. The averaging orbit starts
at the computed `zn_orbit` field if it exists, otherwise at the exit
`zn` value, and is truncated after `n_iter` iterations.
Fieldlines approximate the external rays which are very important
in the study of the Mandelbrot set and more generally in
holomorphic dynamics.
.. [#f2] *On Smooth Fractal Coloring Techniques*,
**Jussi Härkönenen**, Abo University, 2007
Parameters
==========
n_iter : int > 0
the number of orbit points used for the averageing
swirl : float between -1. and 1.
adds a random dephasing effect between each successive orbit point
endpoint_k : float > 0.
A geometric serie is used for scaling individual orbit points,
this parameters is the ratio between the last and the first scaling
(For an arithmetic mean over the truncated orbit, use 1.)
Notes
=====
.. note::
The Continuous iteration number shall have been calculated before
in the same `Postproc_batch`
"""
super().__init__()
self.n_iter = n_iter
self.swirl = swirl
self.endpoint_k = endpoint_k
def __getitem__(self, chunk_slice):
""" Returns the post arr
"""
nu_frac = self.ensure_context(chunk_slice, "nu_frac")
potential_dic = self.ensure_context(chunk_slice, "potential_dic")
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
c_pt = self.get_std_cpt(c_pix)
if potential_dic["kind"] == "infinity":
# Following Jussi Härkönenen - 2007
backshift = self.ensure_context(chunk_slice, "backshift")
n = self.ensure_context(chunk_slice, "n")
n_iter_fl = self.n_iter
swirl_fl = self.swirl
# Geometric serie, last term being damping_ratio * first term
k_arr = np.geomspace(1., self.endpoint_k, num=n_iter_fl)
k_arr = k_arr / np.sum(k_arr)
# randomizing the phse angle
rg = np.random.default_rng(0)
phi_arr = rg.random(n_iter_fl) * swirl_fl * np.pi
if self.fractal.holomorphic:
try:
zn = Z[complex_dic["zn_orbit"], :]
is_backward = True
assert backshift is not None
except KeyError:
logger.warning(
"No stored backshift field found for key: \"zn_orbit\""
"\ndefaulting to key: \"zn\""
)
zn = Z[complex_dic["zn"], :]
is_backward = False
backshift = 0 # Need the right datatype for jit compiler
zn_iterate = self.fractal.zn_iterate
val = Fieldlines_pp_infinity(
c_pt, zn, nu_frac, k_arr, phi_arr, zn_iterate,
is_backward, backshift, n
)
else:
# Non-holomorphic case
try:
xn = Z[complex_dic["xn_orbit"], :]
yn = Z[complex_dic["yn_orbit"], :]
is_backward = True
assert backshift is not None
except KeyError:
logger.warning(
"No stored backshift field found for key: \"zn_orbit\""
"\ndefaulting to key: \"zn\""
)
xn = Z[complex_dic["xn"], :]
yn = Z[complex_dic["yn"], :]
is_backward = False
backshift = 0 # Need the right datatype for jit compiler
xnyn_iterate = self.fractal.xnyn_iterate
val = Fieldlines_pp_infinity_BS(
c_pt, xn, yn, nu_frac, k_arr, phi_arr, xnyn_iterate,
is_backward, backshift, n
)
elif potential_dic["kind"] == "convergent":
zn = Z[complex_dic["zn"], :]
alpha = 1. / Z[complex_dic["attractivity"]]
z_star = Z[complex_dic["zr"]]
beta = np.angle(alpha)
val = np.angle(z_star - zn) + nu_frac * beta
# We have an issue if z_star == zn...
val = val * 2.
else:
raise ValueError(
"Unsupported potential '{}' for field lines".format(
potential_dic["kind"]))
return val, {}
[docs]
class DEM_normal_pp(Postproc):
field_count = 2
[docs]
def __init__(self, kind="potential"):
"""
Return the 2 components of the distance estimation method derivative
(*normal map*).
This postproc is normally used in combination with a
`fractalshades.colors.layers.Normal_map_layer`
Parameters
==========
`kind`: str "potential" | "Milnor" | "convergent"
if "potential" (default) DEM is base on the potential.
For Mandelbrot power 2, "Milnor" option is also available which
gives slightly different results (sharper edges).
Use "convergent" for convergent fractals...
Notes
=====
.. note::
The Continuous iteration number shall have been calculated before
in the same `Postproc_batch`
If kind="Milnor", the following raw complex fields
must be available from the calculation:
"zn", "dzndc", "d2zndc2"
"""
super().__init__()
self.kind = kind
@property
def x(self):
""" Postproc for the real part """
return XYCoord_wrapper_pp(self, "x")
@property
def y(self):
""" Postproc for the imag part """
return XYCoord_wrapper_pp(self, "y")
def __getitem__(self, chunk_slice):
""" Returns the normal as a complex (x, y, 1) is the normal vec
"""
potential_dic = self.ensure_context(chunk_slice, "potential_dic")
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
zn = self.get_zn(Z, complex_dic) #["zn"], :]
if self.kind == "Milnor": # use only for z -> z**n + c
# https://www.math.univ-toulouse.fr/~cheritat/wiki-draw/index.php/Mandelbrot_set
# dzndc = Z[complex_dic["dzndc"], :]
dzndc = self.get_dzndc(Z, c_pix, complex_dic)
# TODO: d2zndc2 may fail if a projection is used - not tested
d2zndc2 = Z[complex_dic["d2zndc2"], :]
abs_zn = np.abs(zn)
lo = np.log(abs_zn)
normal = zn * dzndc * ((1+lo)*np.conj(dzndc * dzndc)
-lo * np.conj(zn * d2zndc2))
elif self.kind == "potential":
if potential_dic["kind"] == "infinity":
# pulls back the normal direction from an approximation of
# potential: phi = log(|zn|) / 2**n
if self.holomorphic:
dzndc = self.get_dzndc(Z, c_pix, complex_dic) #, :]
normal = zn / dzndc
else:
(dXdA, dXdB, dYdA, dYdB) = self.get_dzndc(
Z, c_pix, complex_dic
)
# J = [dXdA dXdB] J.T = [dXdA dYdA] n = J.T * zn
# [dYdA dYdB] [dXdB dYdB]
normal = (
(dXdA * zn.real + dYdA * zn.imag)
+ 1j * (dXdB * zn.real + dYdB * zn.imag)
)
elif potential_dic["kind"] == "convergent":
# pulls back the normal direction from an approximation of
# potential (/!\ need to derivate zr w.r.t. c...)
# phi = 1. / (abs(zn - zr) * abs(alpha))
# dzndc = Z[complex_dic["dzndc"], :]
dzndc = self.get_dzndc(Z, c_pix, complex_dic)
zr = Z[complex_dic["zr"], :]
dzrdc = Z[complex_dic["dzrdc"], :]
normal = - (zr - zn) / (dzrdc - dzndc)
skew = self.fractal.skew
if skew is not None:
nx = normal.real
ny = normal.imag
# These are contravariant coordinates -> we UNskew
fs.core.apply_unskew_1d(skew, nx, ny)
normal = normal / np.abs(normal)
return normal, None
class XYCoord_wrapper_pp(Postproc):
def __init__(self, pp_2d, coord):
""" Helper class to split one complex Postproc into 2 real & imag parts
"""
self.pp_2d = pp_2d
self.coord = coord
@property
def context_getitem_key(self):
return type(self.pp_2d).__name__ + ".__getitem__"
def __getitem__(self, chunk_slice):
if self.coord == "x":
normal, _ = self.pp_2d[chunk_slice]
# Save imaginary part in context dict
context_update = {self.context_getitem_key: normal.imag}
return normal.real, context_update
elif self.coord == "y":
# Retrieve imaginary part from context,
val = self.context[chunk_slice][self.context_getitem_key]
# Then delete it
del self.context[chunk_slice][self.context_getitem_key]
return val, {}
else:
raise ValueError(self.coord)
def link_sibling(self, sibling):
""" The other coord """
self.sibling = sibling
[docs]
class DEM_pp(Postproc):
field_count = 1
[docs]
def __init__(self, px_snap=None):
"""
Return a distance estimation of the pixel to the fractal set.
Parameters
==========
`px_snap`: None | float
If not None, pixels at a distance from the fractal lower
than `px_snap` (expressed in image pixel) will be snapped to 0.
Notes
=====
.. note::
The Continuous iteration number shall have been calculated before
in the same `Postproc_batch`
"""
self.px_snap = px_snap
super().__init__()
def __getitem__(self, chunk_slice):
""" Returns the DEM - """
potential_dic = self.ensure_context(chunk_slice,"potential_dic")
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
zn = self.get_zn(Z, complex_dic) #Z[complex_dic["zn"], :]
if potential_dic["kind"] == "infinity":
abs_zn = np.abs(zn)
if self.holomorphic:
abs_dzndc = np.abs(self.get_dzndc(Z, c_pix, complex_dic)) #, :]
# val = abs_zn * np.log(abs_zn) / abs_dzndc
else:
(dXdA, dXdB, dYdA, dYdB) = self.get_dzndc(
Z, c_pix, complex_dic
)
# In which direction for an abs value ?
# Lets take the maximal singular value
# https://scicomp.stackexchange.com/questions/8899/robust-algorithm-for-2-times-2-svd
# J = [dXdA dXdB] = [a b]
# [dYdA dYdB] [c d]
Q = np.hypot(dXdA + dYdB, dXdB - dYdA)
R = np.hypot(dXdA - dYdB, dXdB + dYdA)
abs_dzndc = 0.5 * (Q + R)
def nan_frac(arr):
return np.count_nonzero(np.isnan(arr)) / len(arr)
val = abs_zn * np.log(abs_zn) / abs_dzndc
elif potential_dic["kind"] == "convergent":
assert self.holomorphic
dzndc = self.get_dzndc(Z, c_pix, complex_dic)
zr = Z[complex_dic["zr"]]
dzrdc = Z[complex_dic["dzrdc"], :]
val = np.abs((zr - zn) / (dzrdc - dzndc))
else:
raise ValueError(potential_dic["kind"])
px_snap = self.px_snap
if px_snap is not None:
val = np.where(val < px_snap, 0., val)
return val, {}
[docs]
class Attr_normal_pp(Postproc):
field_count = 2
[docs]
def __init__(self):
"""
Return the 2 components of the cycle attractivity derivative (*normal
map*).
This postproc is normally used in combination with a
`fractalshades.colors.layers.Normal_map_layer`
Notes
=====
.. note::
The following complex fields must be available from a previously
run calculation:
- "attractivity"
- "order" (optionnal)
"""
super().__init__()
@property
def x(self):
""" Postproc for the real part """
return XYCoord_wrapper_pp(self, "x")
@property
def y(self):
""" Postproc for the imag part """
return XYCoord_wrapper_pp(self, "y")
def __getitem__(self, chunk_slice):
""" Returns the normal as a complex (x, y, 1) is the normal vec
"""
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
attr = np.copy(Z[complex_dic["attractivity"], :])
dattrdc = np.copy(Z[complex_dic["dattrdc"], :])
proj = self.fractal.projection
if proj.df is not None:
dattrdc = apply_df(proj.df, c_pix, dattrdc)
invalid = (np.abs(attr) > 1.)
np.putmask(attr, invalid, 1.)
np.putmask(dattrdc, invalid, 1.)
# val = np.sqrt(1. - attr * np.conj(attr)) / r
# Now let's take the total differential of this
# While not totally exact this gives good results :
normal = attr * np.conj(dattrdc) / np.abs(dattrdc)
return normal, None
[docs]
class Attr_pp(Postproc):
field_count = 1
[docs]
def __init__(self, scale_by_order=False):
"""
Return the cycle attractivity
Parameters
==========
scale_by_order : bool
If True return the attractivity divided by ccyle order (this
allows more realistic 3d-view exports)
Notes
=====
.. note::
The following complex fields must be available from a previously
run calculation:
- "attractivity"
- "order" (optionnal)
"""
super().__init__()
self.scale_by_order = scale_by_order
def __getitem__(self, chunk_slice):
""" Returns the normal as a complex (x, y, 1) is the normal vec
"""
(chunk_mask, c_pix, Z, U, stop_reason, stop_iter, complex_dic,
int_dic, termination_dic) = self.raw_data[chunk_slice]
# Plotting the 'domed' height map for the cycle attractivity
attr = Z[complex_dic["attractivity"], :]
abs2_attr = np.real(attr)**2 + np.imag(attr)**2
abs2_attr = np.clip(abs2_attr, None, 1.)
# divide by the cycle order
val = np.sqrt(1. - abs2_attr) # / r
if self.scale_by_order:
r = U[int_dic["order"], :]
val *= (1. / r)
return val, {}
[docs]
class Fractal_array:
[docs]
def __init__(self, fractal, calc_name, key, func=None, inv=False):
"""
Class which provide a direct access to the res data stored (memory
mapping) during calculation.
One application, it can be passed as a *subset* parameter to most
calculation models, e.g. : `fractalshades.models.Mandelbrot.base_calc`
Parameters
==========
fractal : `fractalshades.Fractal` derived class
The reference fracal object
calc_name : str
The calculation name
key :
The raw data key. Should be one of the *complex_codes*,
*int_codes*, *termination_codes* for this calculation.
func: None | callable | a str of variable x (e.g. "np.sin(x)")
will be applied as a pre-processing step to the raw data if not
`None` - use of str form is recommended
"""
self.fractal = fractal
self.calc_name = calc_name
self.key = key
try:
pickle.dumps(func)
except:
# Note: possible improvement, see:
# https://stackoverflow.com/questions/11878300/serializing-and-deserializing-lambdas
# Seems over-the top here, just raising a detailed error
source_code = inspect.getsource(func)
raise ValueError(
"func is not provied in a serializable form:\n"
+ f"{source_code}\n"
+ "Consider passing func definition by string instead"
+ " (e.g. \"np.sin(x)\")"
)
self._func = func # stored for future serialization
self.func = self.parse_func(func)
# Manage ~ notation
self.inv = inv
@staticmethod
def parse_func(func):
if isinstance(func, str):
return fs_parser.func_parser(["x"], func)
else:
return func
def __reduce__(self):
""" Enable standard Python serialization mechanism
"""
args = (self.fractal, self.calc_name, self.key, self._func, self.inv)
return (self.__class__, args, None, None, None, None)
def __getitem__(self, chunk_slice):
""" Returns the raw array, with self.func applied if not None """
fractal = self.fractal
if fractal is None:
raise ValueError(
"Fractal_array without valid Fractal, "
"might be an unbound state from unppickling. Use "
"self.bind_fractal to rebind"
)
f_class = fractal.__class__
calc_name = self.calc_name
codes = fractal._calc_data[calc_name]["saved_codes"]
kind = f_class.kind_from_code(self.key, codes)
# localise the pts range for the chunk /!\ use the right calc_name
if chunk_slice is not None:
items = fractal.REPORT_ITEMS
report_mmap = open_memmap(filename=fractal.report_path(calc_name),
mode='r')
rank = fractal.chunk_rank(chunk_slice)
beg = report_mmap[rank, items.index("chunk1d_begin")]
end = report_mmap[rank, items.index("chunk1d_end")]
# load the data
arr_from_kind = {
"complex": "Z",
"int": "U",
"stop_reason": "stop_reason",
"stop_iter": "stop_iter"
}
arr_str = arr_from_kind[kind]
data_path = fractal.data_path(calc_name)
mmap = open_memmap(filename=data_path[arr_str], mode='r')
# field from key :
key = self.key
(complex_dic, int_dic, termination_dic) = f_class.codes_mapping(*codes)
if kind == "complex":
field = complex_dic[key]
elif kind == "int":
field = int_dic[key]
elif kind in ["stop_reason", "stop_iter"]:
field = 0
else:
raise ValueError(kind)
if chunk_slice is None:
# We return the full pts range
arr = mmap[field, :]
else:
arr = mmap[field, beg:end]
# Apply last steps (func / inversion) and returns
if self.func is not None:
arr = self.func(arr)
if self.inv:
arr = ~arr
return arr
def __invert__(self):
""" Allows use of the ~ notation 'ala numpy' """
ret = Fractal_array(
self.fractal, self.calc_name, self.key, self.func, not(self.inv)
)
return ret
def __eq__(self, other):
""" Equality testing, useful when pickling / unpickling"""
if isinstance(other, self.__class__):
eq = (
(self._func == other._func)
and (self.calc_name == other.calc_name)
and (self.key == other.key)
and (self.inv == other.inv)
)
return eq
else:
return NotImplemented
#==============================================================================
# Numba Nogil implementations
@numba.njit(nogil=True, fastmath=False)
def apply_df(df, c_pix, dzndc):
nvec, = c_pix.shape
ret = np.empty((nvec,), dtype=dzndc.dtype)
for i in range(nvec):
ret[i] = df(c_pix[i]) * dzndc[i]
return ret
@numba.njit(nogil=True, fastmath=False)
def apply_dfBS(dfBS, c_pix, dXdA, dXdB, dYdA, dYdB):
nvec, = c_pix.shape
dt = dXdA.dtype
dXdpixa = np.empty((nvec,), dtype=dt)
dXdpixb = np.empty((nvec,), dtype=dt)
dYdpixa = np.empty((nvec,), dtype=dt)
dYdpixb = np.empty((nvec,), dtype=dt)
for i in range(nvec):
m00, m01, m10, m11 = dfBS(c_pix[i])
# mapping is defined as : pixa, pixb -f-> AB
# we want dXdpixa
# See fs.core.apply_unskew_1d(skew, nx, ny)
# Matricial product - contravariant
dXdpixa[i] = dXdA[i] * m00 + dXdB[i] * m10
dXdpixb[i] = dXdA[i] * m01 + dXdB[i] * m11
dYdpixa[i] = dYdA[i] * m00 + dYdB[i] * m10
dYdpixb[i] = dYdA[i] * m01 + dYdB[i] * m11
return dXdpixa, dXdpixb, dYdpixa, dYdpixb
@numba.njit(nogil=True, fastmath=True)
def Continuous_iter_pp_infinity(zn, d, a_d, M):
k = np.abs(a_d) ** (1. / (d - 1.))
# k normaliszation corefficient, because the formula given
# in https://en.wikipedia.org/wiki/Julia_set
# suppose the highest order monome is normalized
nu_frac = -(np.log(np.log(np.abs(zn * k)) / np.log(M * k))
/ np.log(d))
return nu_frac
@numba.njit(nogil=True, fastmath=True)
def Continuous_iter_pp_convergent(zn, eps, attractivity, z_star):
alpha = 1. / attractivity
nu_frac = + np.log(eps / np.abs(zn - z_star) # nu frac > 0
) / np.log(np.abs(alpha))
return nu_frac
# Catmull-Rom polynomials - C1 Smoothing of an orbit value
@numba.njit(nogil=True, fastmath=True)
def cm_H0(x):
return 0.5 * x * (-x + x ** 2)
@numba.njit(nogil=True, fastmath=True)
def cm_H1(x):
return 0.5 * x * (1. + 4.* x - 3. * x ** 2)
@numba.njit(nogil=True, fastmath=True)
def cm_H2(x):
return 1. + 0.5 * x * (-5. * x + 3. * x ** 2)
@numba.njit(nogil=True, fastmath=True)
def cm_H3(x):
return 0.5 * x * (-1. + 2. * x - x ** 2)
@numba.njit(nogil=True, fastmath=True)
def Fieldlines_pp_infinity(
c_pt, zn, nu_frac, k_arr, phi_arr, zn_iterate,
is_backward, backshift, n
):
M_cutoff = 100000. # To be tuned...
n_iter_fl = len(k_arr)
assert len(phi_arr) == n_iter_fl
nvec, = zn.shape
orbit_z = np.empty((nvec, n_iter_fl + 3), dtype=zn.dtype)
orbit_angle = np.empty((nvec, n_iter_fl + 3), dtype=np.float64) #zn.dtype)
val = np.zeros((nvec,), dtype=np.float64)
for j in range(nvec):
z_loc = zn[j]
orbit_z[j, 0] = z_loc
orbit_angle[j, 0] = np.angle(z_loc)
for i in range(n_iter_fl + 2):
for j in range(nvec):
z_loc = orbit_z[j, i]
if abs(z_loc) > M_cutoff:
z_loc = np.exp(orbit_angle[j, i] * 1j) * M_cutoff
z_loc = zn_iterate(z_loc, complex(0.))
orbit_z[j, i + 1] = z_loc # flag
orbit_angle[j, i + 1] = np.angle(z_loc)
else:
z_loc = zn_iterate(orbit_z[j, i], c_pt[j])
orbit_z[j, i + 1] = z_loc
orbit_angle[j, i + 1] = np.angle(z_loc)
# Now lets do some smoothing with Catmull-Rom polynomials
for j in range(nvec):
d = -nu_frac[j]
# Correction if applying backshift would yield a negative index
# in this case we just keep a constant nu_frac to avoid banding
if is_backward:
if backshift > n[j]:
d = 1.
# Catmull-Rom spline weighting polynomials.
a0 = cm_H0(d)
a1 = cm_H1(d)
a2 = cm_H2(d)
a3 = cm_H3(d)
for i in range(n_iter_fl):
val_loc = k_arr[i] * (
a0 * np.sin(orbit_angle[j, i] + phi_arr[i])
+ a1 * np.sin(orbit_angle[j, i + 1] + phi_arr[i])
+ a2 * np.sin(orbit_angle[j, i + 2] + phi_arr[i])
+ a3 * np.sin(orbit_angle[j, i + 3] + phi_arr[i])
)
val[j] += val_loc
return val
@numba.njit(nogil=True, fastmath=True)
def Fieldlines_pp_infinity_BS(
c_pt, xn, yn, nu_frac, k_arr, phi_arr, xnyn_iterate,
is_backward, backshift, n
):
M_cutoff = 100000. # To be tuned...
n_iter_fl = len(k_arr)
assert len(phi_arr) == n_iter_fl
nvec, = xn.shape
orbit_x = np.empty((nvec, n_iter_fl + 3), dtype=xn.dtype)
orbit_y = np.empty((nvec, n_iter_fl + 3), dtype=yn.dtype)
orbit_angle = np.empty((nvec, n_iter_fl + 3), dtype=np.float64)
val = np.zeros((nvec,), dtype=np.float64)
for j in range(nvec):
x_loc = xn[j]
y_loc = yn[j]
orbit_x[j, 0] = x_loc
orbit_y[j, 0] = y_loc
orbit_angle[j, 0] = np.arctan2(y_loc, x_loc)
for i in range(n_iter_fl + 2):
for j in range(nvec):
aj = c_pt[j].real
bj = c_pt[j].imag
x_loc = orbit_x[j, i]
y_loc = orbit_y[j, i]
if np.hypot(x_loc, y_loc) > M_cutoff:
x_loc = np.cos(orbit_angle[j, i]) * M_cutoff
y_loc = np.sin(orbit_angle[j, i]) * M_cutoff
x_loc, y_loc = xnyn_iterate(x_loc, y_loc, 0., 0.)
orbit_x[j, i + 1] = x_loc # flag
orbit_y[j, i + 1] = y_loc # flag
orbit_angle[j, i + 1] = np.arctan2(y_loc, x_loc)
else:
x_loc, y_loc = xnyn_iterate(
orbit_x[j, i], orbit_y[j, i], aj, bj
)
orbit_x[j, i + 1] = x_loc
orbit_y[j, i + 1] = y_loc
orbit_angle[j, i + 1] = np.arctan2(y_loc, x_loc)
# Now lets do some smoothing with Catmull-Rom polynomials
for j in range(nvec):
d = -nu_frac[j]
# Correction if applying backshift would yield a negative index
# in this case we just keep a constant nu_frac to avoid banding
if is_backward:
if backshift > n[j]:
d = 1.
# Catmull-Rom spline weighting polynomials.
a0 = cm_H0(d)
a1 = cm_H1(d)
a2 = cm_H2(d)
a3 = cm_H3(d)
for i in range(n_iter_fl):
val_loc = k_arr[i] * (
a0 * np.sin(orbit_angle[j, i] + phi_arr[i])
+ a1 * np.sin(orbit_angle[j, i + 1] + phi_arr[i])
+ a2 * np.sin(orbit_angle[j, i + 2] + phi_arr[i])
+ a3 * np.sin(orbit_angle[j, i + 3] + phi_arr[i])
)
val[j] += val_loc
return val