# -*- coding: utf-8 -*-
import os
#import typing
import pickle
#import warnings
import logging
import textwrap
import copy
import mpmath
import numpy as np
import numba
import fractalshades as fs
import fractalshades.numpy_utils.xrange as fsx
import fractalshades.numpy_utils.numba_xr as fsxn
logger = logging.getLogger(__name__)
[docs]
class PerturbationFractal(fs.Fractal):
[docs]
def __init__(self, directory):
"""Base class for escape-time fractals implementing the
perturbation technique, with an iteration matching:
.. math::
z_0 &= 0 \\\\
z_{n+1} &= f ( z_{n} ) + c
Where :math:`f` has a critical point at 0.
Derived classes should provide the implementation for the actual function
:math:`f`.
Parameters
----------
directory : str
Path for the working base directory"""
super().__init__(directory)
[docs]
@fs.utils.zoom_options
def zoom(
self, *,
precision: int,
x: mpmath.mpf,
y: mpmath.mpf,
dx: mpmath.mpf,
nx: int,
xy_ratio: float,
theta_deg: float,
projection: fs.projection.Projection=fs.projection.Cartesian(),
has_skew: bool=False,
skew_00: float=1.,
skew_01: float=0.,
skew_10: float=0.,
skew_11: float=1.
):
"""
Define the zoom parameters for the next calculation.
Parameters
----------
precision : int
number of significant base-10 digits to use for full precision
calculations.
x : str or mpmath.mpf
x-coordinate of the central point
y : str or mpmath.mpf
y-coordinate of the central point
dx : str or mpmath.mpf
span of the view rectangle along the x-axis
nx : int
number of pixels of the image along the x-axis
xy_ratio: float
ratio of dx / dy and nx / ny
theta_deg : float
Pre-rotation of the calculation domain, in degree
projection : `fractalshades.projection.Projection`
Kind of projection used (default to
`fractalshades.projection.Cartesian`)
has_skew : bool
If True, unskew the view base on skew coefficients skew_ij
skew_ij : float
Components of the local skew matrix, with ij = 00, 01, 10, 11
"""
mpmath.mp.dps = precision # in base 10 digit
# In case the user inputs were strings, we override with mpmath scalars
self.x = mpmath.mpf(x)
self.y = mpmath.mpf(y)
self.dx = dx = mpmath.mpf(dx)
# Backward compatibility: also accept projection = "cartesian"
if isinstance(projection, str):
logger.warning(textwrap.dedent(
"""\
Use of str for projection is deprecated, and might be
removed. Please use `fractalshades.projection.Projection`
subclasses instead. Defaulting to
`fractalshades.projection.Cartesian()`"""
))
self.projection = projection = fs.projection.Cartesian()
# Stores the skew matrix
self._skew = None
if has_skew:
self._skew = np.array(
((skew_00, skew_01), (skew_10, skew_11)), dtype=np.float64
)
self.dx_std = dx_std = float(dx)
self.dx_xr = dx_xr = fsx.mpf_to_Xrange(dx, dtype=np.float64).ravel()
# Here we store the linscale as a 1d 1 element array
self.lin_scale_xr = np.asanyarray(dx_xr).reshape(-1)
self.lin_scale_std = np.asanyarray(dx_std).reshape(-1)
self.lin_mat = self.get_lin_mat()
projection.adjust_to_zoom(self)
self.proj_impl = projection.f
# check the dps...
pix = projection.min_local_scale * self.dx / self.nx
# pix = projection.min_local_scale * projection.scale * self.dx / self.nx
with mpmath.workdps(6):
# Sets the working dps to 10e-1 x pixel size
required_dps = int(-mpmath.log10(pix / nx) + 1)
if required_dps > precision:
raise ValueError(
"Precision is too low for min. pixel size and shall be "
f"increased to {required_dps} (current setting: {precision})."
"This might be a consequence of Exponential mapping or "
"other non-cartesian projections."
)
def new_status(self, wget):
""" Return a dictionnary that can hold the current progress status """
base_status = super().new_status(wget)
status = {
"Reference": {
"str_val": "--"
},
"Bilin. approx": {
"str_val": "--"
},
}
status.update(base_status)
return status
@property
def xr_detect_activated(self):
""" Triggers use of special dataype to avoid underflow in double """
return (self.dx < fs.settings.xrange_zoom_level)
def ref_point_file(self):
"""
Returns the file path to store or retrieve data arrays associated to a
reference orbit
"""
return os.path.join(self.directory, "data", "ref_pt.dat")
def ref_point_kc(self):
"""
Return a bound for dc in image pixels, used as a criteria for BLA
validity
Returns:
--------
kc: full precision, scaling coefficient
"""
c0 = self.x + 1j * self.y
w, h = self.projection.bounding_box(self.xy_ratio)
dx = self.dx # * self.projection.scale
mat = self.lin_mat
corner_a = mpc_lin_proj_impl_noscale(mat, 0.5 * w, 0.5 * h) * dx
corner_b = mpc_lin_proj_impl_noscale(mat, -0.5 * w , 0.5 * h) * dx
corner_c = mpc_lin_proj_impl_noscale(mat, -0.5 * w, -0.5 * h) * dx
corner_d = mpc_lin_proj_impl_noscale(mat, 0.5 * w, -0.5 * h) * dx
c0 = self.x + 1j * self.y
ref = self.FP_params["ref_point"]
shift = ref - c0
# Let take the largest distance + some margin
kc = max(abs(shift - corner_a), abs(shift - corner_b),
abs(shift - corner_c), abs(shift - corner_d)) * 1.1
return fsx.mpf_to_Xrange(kc, dtype=np.float64).ravel() # 1d for numba
def get_std_cpt(self, c_pix):
""" Return the c complex value from c_pix - standard precision """
n_pts, = c_pix.shape # Z of shape [n_Z, n_pts]
# Explicit casting to complex / float
center = complex(self.x + 1j * self.y)
cpt = np.empty((n_pts,), dtype=c_pix.dtype)
fill1d_std_C_from_pix(
c_pix, self.proj_impl, self.lin_mat, self.lin_scale_std,
center, cpt
)
return cpt
def ref_point_matching(self):
"""
Test if the ref point can be used for this calculation ie
- same or more max_iter
- not too far
- and with a suitable dps
- Same fractal __init__ params
"""
init_kwargs = self.init_kwargs
del init_kwargs["directory"]
try:
ref_point = self.FP_params["ref_point"]
max_iter_ref = self.FP_params["max_iter"]
init_kwargs_ref = self.FP_params["init_kwargs"]
except FileNotFoundError:
logger.debug("No data file found for ref point")
return False
# Parameters 'max_iter' borrowed from last "@fsutils.calc_options" call
# calc_options = self.calc_options
max_iter = self.max_iter #calc_options["max_iter"]
drift_xr = fsx.mpc_to_Xrange((self.x + 1j * self.y) - ref_point)
dx_xr = fsx.mpf_to_Xrange(self.dx)
matching_init_kwargs = True
for k, v in init_kwargs_ref.items():
if init_kwargs[k] != v:
matching_init_kwargs = False
dic_match = {
"dps": mpmath.mp.dps <= self.FP_params["dps"] + 3,
"location": (drift_xr / dx_xr).abs2() < 1.e6,
"max_iter": max_iter_ref >= max_iter,
"init_kwargs": matching_init_kwargs
}
for item, match in dic_match.items():
if not match:
logger.info(f"Updating ref point: {item} not matching")
break
return all(match for match in dic_match.values())
def save_ref_point(self, FP_params, Zn_path):
"""
Write to a data file the following data:
- params = main parameters used for the calculation
- codes = complex_codes, int_codes, termination_codes
- arrays : [Z, U, stop_reason, stop_iter]
"""
self._FP_params = FP_params
self._Zn_path = Zn_path
save_path = self.ref_point_file()
fs.utils.mkdir_p(os.path.dirname(save_path))
with open(save_path, 'wb+') as tmpfile:
logger.info(textwrap.dedent(f"""\
Full precision path computed, saving to:
{save_path}"""
))
pickle.dump(FP_params, tmpfile, pickle.HIGHEST_PROTOCOL)
pickle.dump(Zn_path, tmpfile, pickle.HIGHEST_PROTOCOL)
def reload_ref_point(self, scan_only=False):
"""
Reload arrays from a data file
- params = main parameters used for the calculation
- codes = complex_codes, int_codes, termination_codes
- arrays : [Z, U, stop_reason, stop_iter]
"""
save_path = self.ref_point_file()
with open(save_path, 'rb') as tmpfile:
FP_params = pickle.load(tmpfile)
if scan_only:
return FP_params
Zn_path = pickle.load(tmpfile)
return FP_params, Zn_path
@property
def FP_params(self):
"""
Return the FP_params attribute, if not available try to reload it
from file
"""
if hasattr(self, "_FP_params"):
return self._FP_params
FP_params = self.reload_ref_point(scan_only=True)
self._FP_params = FP_params
return FP_params
@property
def Zn_path(self):
"""
Return the Zn_path attribute, if not available try to reload it
from file
"""
if hasattr(self, "_Zn_path"):
return self._Zn_path
FP_params, Zn_path = self.reload_ref_point()
self._Zn_path = Zn_path
return Zn_path
def get_path_data(self):
""" Builds a Zn_path data tuple from FP_params and Zn_path
This object will be used in numba jitted functions
"""
FP_params = self.FP_params
Zn_path = self.Zn_path
ref_xr_python = FP_params["xr"]
has_xr = (len(ref_xr_python) > 0)
ref_order = FP_params["order"]
ref_div_iter = FP_params["div_iter"] # The first invalid iter
# ref_div_iter should be ref div iter of the FP, only if it is div.
# Otherwise, the max_iter from calc param
if ref_order is not None: # The reference orbit is a cycle
ref_div_iter = self.max_iter + 1
dx_xr = fsx.mpf_to_Xrange(self.dx, dtype=self.float_type).ravel()
# Note: uses ravel to change the shape from (,) - numpy scalar - to
# (1,) - numpy array - for numba compatibility
ref_index_xr = np.empty([len(ref_xr_python)], dtype=np.int32)
if self.holomorphic:
# Complex distance between image center and ref point
drift_xr = fsx.mpc_to_Xrange(
(self.x + 1j * self.y) - FP_params["ref_point"],
dtype=np.complex128
).ravel()
# Build 2 arrays to avoid using a dict in numba
# /!\ ref_xr at least len 1 to ensure typing as complex
ref_xr = fsx.Xrange_array([0j] * max(len(ref_xr_python), 1))
for i, xr_index in enumerate(ref_xr_python.keys()):
ref_index_xr[i] = xr_index
ref_xr[i] = ref_xr_python[xr_index][0]
return (Zn_path, has_xr, ref_index_xr, ref_xr,
ref_div_iter, ref_order, drift_xr, dx_xr)
else:
# Complex distance between image center and ref point
refp_ptx = (FP_params["ref_point"]).real
refp_pty = (FP_params["ref_point"]).imag
driftx_xr = fsx.mpf_to_Xrange(self.x - refp_ptx, dtype=np.float64
).ravel()
drifty_xr = fsx.mpf_to_Xrange(self.y - refp_pty, dtype=np.float64
).ravel()
# Build 2 arrays to avoid using a dict in numba
# /!\ ref_xr at least len 1 to ensure typing as complex
refx_xr = fsx.Xrange_array([0.] * max(len(ref_xr_python), 1))
refy_xr = fsx.Xrange_array([0.] * max(len(ref_xr_python), 1))
for i, xr_index in enumerate(ref_xr_python.keys()):
ref_index_xr[i] = xr_index
tmpx, tmpy = ref_xr_python[xr_index]
refx_xr[i] = tmpx[0]
refy_xr[i] = tmpy[0]
return (Zn_path, has_xr, ref_index_xr, refx_xr, refy_xr,
ref_div_iter, ref_order, driftx_xr, drifty_xr,
dx_xr)
#==============================================================================
# Printing - export functions
@staticmethod
def print_FP(FP_params, Zn_path):
"""
Just a pretty-print of the reference path, for debugging purposes
"""
pp = "-------------------------------------------------------------"
pp += " Full precision orbit loaded, FP_params:"
for k, v in FP_params.items():
try:
for kv, vv in v.items():
pp += f" {k}, ({kv}) --> {vv}"
except AttributeError:
pp += f" {k} --> {v}"
pp += f"ref_path, shape: {Zn_path.shape}, {Zn_path.dtype}"
pp += str(Zn_path)
pp += " -------------------------------------------------------------"
logger.info(pp)
@staticmethod
def write_FP(path, out_file):
"""
Export to csv format a complex path, for debugging purposes
"""
path_real = np.empty((path.shape[0], 2), dtype = np.float64)
path_real[:, 0] = path.real
path_real[:, 1] = path.imag
np.savetxt(out_file, path_real, delimiter=",")
#==============================================================================
# Calculation functions
@staticmethod
def numba_cycle_call(cycle_dep_args, cycle_indep_args):
""" Here we customize for perturbation iterations """
# We still have a case switch on indep parameter "holomorphic"
holomorphic = cycle_indep_args[0]
cycle_indep_args = cycle_indep_args[1:]
if holomorphic:
return numba_cycles_perturb(
*cycle_dep_args, *cycle_indep_args
)
else:
return numba_cycles_perturb_BS(
*cycle_dep_args, *cycle_indep_args
)
def get_cycle_indep_args(self, initialize, iterate):
"""
Parameters independant of the cycle
This is where the hard work is done
"""
# ====================================================================
# CUSTOM class impl
# Initialise the reference path
holomorphic = self.holomorphic
calc_deriv_c = self.calc_dZndc if holomorphic else self.calc_hessian
calc_dZndz = self.calc_dZndz if holomorphic else False
# 1) compute or retrieve the reference orbit
self.set_status("Reference", "running")
self.get_FP_orbit()
if holomorphic:
(Zn_path, has_xr, ref_index_xr, ref_xr, ref_div_iter, ref_order,
drift_xr, dx_xr) = self.get_path_data()
else:
(Zn_path, has_xr, ref_index_xr, refx_xr, refy_xr,
ref_div_iter, ref_order, driftx_xr, drifty_xr,
dx_xr) = self.get_path_data()
if ref_order is None:
ref_order = (1 << 62) # a quite large int64
self.set_status("Reference", "completed")
# 2) compute the orbit derivatives if needed
# Note: this is where the "scale" of the derivative is chosen
scale_deriv_xr = dx_xr
if self.projection.scale != 1.:
# If there is a projection-induced scale, we need to use it here.
scale_xr = fsx.mpf_to_Xrange(
self.projection.scale, dtype=self.float_type
).ravel()
scale_deriv_xr = dx_xr * scale_xr
logger.debug(f"Derivative ref scale set at: {dx_xr}")
dZndc_path = None
(dXnda_path, dXndb_path, dYnda_path, dYndb_path) = (None,) * 4
if calc_deriv_c:
xr_detect_activated = self.xr_detect_activated
if holomorphic:
dZndc_path, dZndc_xr_path = numba_dZndc_path(
Zn_path, has_xr, ref_index_xr, ref_xr,
ref_div_iter, ref_order,
self.dfdz, scale_deriv_xr, xr_detect_activated
)
if xr_detect_activated:
dZndc_path = dZndc_xr_path
else:
(dXnda_path, dXndb_path, dYnda_path, dYndb_path,
dXnda_xr_path, dXndb_xr_path, dYnda_xr_path, dYndb_xr_path
) = numba_dZndc_path_BS(
Zn_path, has_xr, ref_index_xr, refx_xr, refy_xr,
ref_div_iter, ref_order,
self.dfxdx, self.dfxdy, self.dfydx, self.dfydy,
scale_deriv_xr, xr_detect_activated #, self.reverse_y
)
if xr_detect_activated:
dXnda_path = dXnda_xr_path
dXndb_path = dXndb_xr_path
dYnda_path = dYnda_xr_path
dYndb_path = dYndb_xr_path
# 2') compute the orbit derivatives wrt z if needed
# (interior detection - only for holomorphic case)
dZndz_path = None
if calc_dZndz:
xr_detect_activated = self.xr_detect_activated
dZndz_path, dZndz_xr_path = numba_dZndz_path(
Zn_path, has_xr, ref_index_xr, ref_xr,
ref_div_iter, ref_order,
self.dfdz, xr_detect_activated
)
if xr_detect_activated:
dZndz_path = dZndz_xr_path
self.kc = kc = self.ref_point_kc()
if kc == 0.:
raise RuntimeError(
"Resolution is too low for this zoom depth. Try to increase"
"the reference calculation precicion."
)
logger.debug(f"Bounding box defined, with kc: {self.kc }")
# Initialize BLA interpolation
if self.BLA_eps is None:
M_bla = None
r_bla = None
bla_len = None
stages_bla = None
self.set_status("Bilin. approx", "N.A.")
else:
self.set_status("Bilin. approx", "running")
eps = self.BLA_eps
M_bla, r_bla, bla_len, stages_bla = self.get_BLA_tree(Zn_path, eps)
self.set_status("Bilin. approx", "completed")
proj_dzndc_modifier = getattr(self.projection, "dzndc_modifier", None)
if holomorphic:
cycle_indep_args = (
holomorphic,
initialize, iterate,
Zn_path, dZndc_path, dZndz_path,
has_xr, ref_index_xr, ref_xr, ref_div_iter, ref_order,
drift_xr, dx_xr, self.proj_impl, self.lin_mat, self.lin_scale_xr,
kc, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier,
self._interrupted
)
else:
cycle_indep_args = (
holomorphic,
initialize, iterate,
Zn_path, dXnda_path, dXndb_path, dYnda_path, dYndb_path,
has_xr, ref_index_xr, refx_xr, refy_xr, ref_div_iter, ref_order,
driftx_xr, drifty_xr, dx_xr, self.proj_impl, self.lin_mat, self.lin_scale_xr,
kc, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier,
self._interrupted
)
return cycle_indep_args
def reset_bla_tree(self, cycle_indep_args):
"""
Returns a new cycle_indep_args with modification of:
- BLA validaty radius
- derivative scaling (dZndc_path)
for deep expmap zooms
cycle_indep_args : the data to modify in place
Note: the fractal projection shall have been modified through
its ``set_exp_zoom_step`` method
"""
# self.kc = self.ref_point_kc()
logger.debug(f"Bounding box reset with kc: {self.kc }")
initialize = cycle_indep_args[1]
iterate = cycle_indep_args[2]
return self.get_cycle_indep_args(initialize, iterate)
def fingerprint_matching(self, calc_name, test_fingerprint, log=False):
"""
Test if the stored parameters match those of new calculation
/!\ modified in subclass
"""
flatten_fp = fs.utils.dic_flatten(test_fingerprint)
state = self._calc_data[calc_name]["state"]
expected_fp = fs.utils.dic_flatten(state.fingerprint)
if log:
logger.debug(f"flatten_fp:\n {flatten_fp}")
logger.debug(f"expected_fp:\n {expected_fp}")
precision_key = f"zoom_kwargs@precision"
SPECIAL = [precision_key,]
for key, val in expected_fp.items():
if (key in SPECIAL):
if key == precision_key:
# We allow precision to *decrease* without rerunning
# the whole calc
if flatten_fp[key] < val:
if log:
logger.debug(textwrap.dedent(f"""\
Higher precision needed: will trigger a recalculation
{key}, {flatten_fp[key]} --> {val}"""
))
return False
elif flatten_fp[key] > val:
if log:
logger.debug(textwrap.dedent(f"""\
Lower precision requested: no need for recalculation
{key}, {flatten_fp[key]} --> {val}"""
))
continue
else:
if flatten_fp[key] != val:
if log:
logger.debug(textwrap.dedent(f"""\
Parameter mismatch ; will trigger a recalculation
{key}, {flatten_fp[key]} --> {val}"""
))
return False
return True
def get_BLA_tree(self, Zn_path, eps):
"""
Return
------
M_bla, r_bla, stages_bla
"""
kc = self.kc
if self.holomorphic:
dfdz = self.dfdz
return numba_make_BLA(Zn_path, dfdz, kc, eps)
else:
dfxdx = self.dfxdx
dfxdy = self.dfxdy
dfydx = self.dfydx
dfydy = self.dfydy
return numba_make_BLA_BS(
Zn_path, dfxdx, dfxdy, dfydx, dfydy, kc, eps
)
def get_FP_orbit(self, c0=None, newton="cv", order=None,
max_newton=None):
"""
# Check if we have a reference point stored,
- otherwise computes and stores it in a file
newton: ["cv", "step", None]
"""
if newton == "step":
raise NotImplementedError("step option not Implemented (yet)")
# Early escape if file exists
if self.ref_point_matching():
logger.info("Reference point already stored, skipping calc")
return
# no newton if zoom level is low. TODO: early escape possible
if self.dx > fs.settings.newton_zoom_level:
c0 = self.critical_pt
self.compute_critical_orbit(c0)
return
if c0 is None:
c0 = self.x + 1j * self.y
# skip Newton if settings impose it
if fs.settings.no_newton or (newton is None) or (newton == "None"):
logger.info(textwrap.dedent(f"""\
Skipping all Newton calculation according to calc options
fs.settings.no_newton: {fs.settings.no_newton}
newton arg: {newton}"""
))
self.compute_FP_orbit(c0, None)
return
# Here we will compute a Newton iteration.
# First, try Ball method to guess the order
if order is None:
k_ball = 1.0
order = self.ball_method(c0, self.dx * k_ball)
if order is None:
logger.warning(
"Ball method failed - Default to image center"
)
self.compute_FP_orbit(c0, None)
return
if order is None:
raise ValueError("Order must be specified for Newton iteration")
# Now launch Newton descent
max_attempt = 2
logger.info(textwrap.dedent(f"""\
"Launch Newton descent with parameters:
order: {order}, max newton attempts: {max_attempt + 1}"""
))
eps_pixel = self.dx * (1. / self.nx)
try:
newton_cv, nucleus = self.find_nucleus(
c0, order, eps_pixel, max_newton=max_newton
)
except NotImplementedError:
newton_cv, nucleus = self.find_any_nucleus(
c0, order, eps_pixel, max_newton=max_newton
)
# If Newton did not CV, we try to boost the dps / precision
attempt = 1
if not(newton_cv):
old_dps = mpmath.mp.dps
while not(newton_cv) and attempt <= max_attempt:
attempt += 1
mpmath.mp.dps = int(1.25 * mpmath.mp.dps)
logger.info(textwrap.dedent(f"""\
Newton attempt {attempt - 1} failed
Increasing dps for next : {old_dps} -> {mpmath.mp.dps}"""
))
eps_pixel = self.dx * (1. / self.nx)
try:
no_div = True
newton_cv, nucleus = self.find_nucleus(
c0, order, eps_pixel, max_newton=max_newton
)
except NotImplementedError:
no_div = False
newton_cv, nucleus = self.find_any_nucleus(
c0, order, eps_pixel, max_newton=max_newton
)
if no_div and not(newton_cv) and (attempt == max_attempt):
# Last try, we just release constraint on the cycle order
# and consider also divisors.
newton_cv, nucleus = self.find_any_nucleus(
c0, order, eps_pixel, max_newton=max_newton
)
# Still not CV ? we default to the center of the image
if newton_cv:
logger.info(f"Newton descent converged at attempt {attempt}.")
else:
order = None # We cannot wrap the ref point here...
nucleus = c0
logger.warning("Newton descent failed - Default to image center")
shift = nucleus - (self.x + self.y * 1j)
shift_x = shift.real / self.dx
shift_y = shift.imag / self.dx
logger.info(textwrap.dedent(f"""\
Reference nucleus found at shift from center:
({shift_x}, {shift_y}) in dx units
with order {order}"""
))
self.compute_FP_orbit(nucleus, order)
def compute_critical_orbit(self, crit):
"""
Basically nothing to 'compute' here, just short-cutting
"""
logger.debug("Skipping ref pt orbit calc for low res")
FP_code = self.FP_code
# Parameters 'max_iter' borrowed from last "@fsutils.calc_options" call
# max_iter = self.max_iter
FP_params = {
"ref_point": crit,
"dps": mpmath.mp.dps,
"order": 100,
"max_iter": self.max_iter,
"FP_code": FP_code
}
# Given the "single reference" implementation the loop will wrap when
# we reach div_iter - this is probably better not to do it too often
# Let's just pick a reasonnable figure
div_iter = 100 # min(100, self.max_iter)
FP_params["partials"] = {}
FP_params["xr"] = {}
FP_params["div_iter"] = div_iter
# Also, store the *init_kwargs* because if the Fractal is regenerated
# from new initial inputs, we shall obvs invalidate the orbit
init_kwargs = self.init_kwargs
del init_kwargs["directory"]
FP_params["init_kwargs"] = init_kwargs
Zn_path = np.zeros([div_iter + 1], dtype=np.complex128)
Zn_path[:] = crit
logger.debug(f"Storing critical orbit {crit}:\n {Zn_path}")
self.save_ref_point(FP_params, Zn_path)
def compute_FP_orbit(self, ref_point, order=None):
"""
Computes full precision orbit, and stores path in normal precision
FP_params keys:
ref_point : starting point for the FP orbit
order : ref cycle order or None
max_iter : max iteration possible
FP_codes : orbit sored fields
div_iter : First invalid iteration (either diverging or not stored)
partials : dictionary iteration -> partial value
xr : dictionary iteration -> xr_value
"""
# Parameter 'max_iter' from last "@fsutils.calc_options" call
max_iter = self.max_iter
logger.info(f"Computing full precision path with max_iter {max_iter}")
FP_code = self.FP_code
FP_params = {
"ref_point": ref_point,
"dps": mpmath.mp.dps,
"order": order,
"max_iter": max_iter,
"FP_code": FP_code
}
# If order is defined, we wrap
ref_orbit_len = max_iter + 1
if order is not None:
ref_orbit_len = min(order, ref_orbit_len) # at order + 1, we wrap
FP_params["ref_orbit_len"] = ref_orbit_len
Zn_path = np.empty([ref_orbit_len], dtype=np.complex128)
if order is not None:
logger.info(f"Order is known, wraping at {ref_orbit_len}")
i, partial_dict, xr_dict = self.FP_loop(Zn_path, ref_point)
FP_params["partials"] = partial_dict
FP_params["xr"] = xr_dict
FP_params["div_iter"] = i
# Also, store the *init_kwargs* because if the Fractal is regenerated
# from new initial inputs, we shall obvs invalidate the orbit
init_kwargs = self.init_kwargs
del init_kwargs["directory"]
FP_params["init_kwargs"] = init_kwargs
logger.info(f"Storing orbit for pt: \n {ref_point}\n {Zn_path}")
self.save_ref_point(FP_params, Zn_path)
def ball_method(self, c, px, kind=1, M_divergence=1.e5):
"""
Use a ball centered on c = x + i y to find the first period (up to
maxiter) of nucleus
"""
max_iter = self.max_iter
if kind == 1:
return self._ball_method(c, px, max_iter, M_divergence)
elif kind == 2:
return self._ball_method2(c, px, max_iter, M_divergence)
def ball_method_order(self, x, y, pix, dps,
maxiter: int=100000,
radius_pixels: int=25):
""" x, y : coordinates of the event """
c = x + 1j * y
radius = pix * radius_pixels
M_divergence = 1.e3
order = self._ball_method(c, radius, maxiter, M_divergence)
x_str = str(x)
y_str = str(y)
radius_str = str(radius)
res_str = f"""
ball_order = {{
"x": "{x_str}",
"y": "{y_str}",
"maxiter": {maxiter},
"radius_pixels": {radius_pixels},
"radius": "{radius_str}",
"M_divergence": {M_divergence},
"order": {order}
}}
"""
return res_str
def _newton_search(self, x, y, pix, dps,
maxiter: int=100000,
radius_pixels: int=3):
""" x, y : coordinates of the event """
c = x + 1j * y
radius = pix * radius_pixels
radius_str = str(radius)
M_divergence = 1.e3
order = self._ball_method(c, radius, maxiter, M_divergence)
newton_cv = False
max_attempt = 2
attempt = 0
while not(newton_cv) and attempt < max_attempt:
if order is None:
break
attempt += 1
dps = int(1.5 * dps)
logger.info(f"Newton, decimal precision (dps) boost to: {dps}")
with mpmath.workdps(dps):
try:
newton_cv, c_newton = self.find_nucleus(
c, order, pix, max_newton=None, eps_cv=None
)
except NotImplementedError:
newton_cv, c_newton = self.find_any_nucleus(
c, order, pix, max_newton=None, eps_cv=None
)
if newton_cv:
xn_str = str(c_newton.real)
yn_str = str(c_newton.imag)
if newton_cv:
size_estimates = self._nucleus_size_estimate(
c_newton, order
)
else:
size_estimates = None
xn_str = ""
yn_str = ""
x_str = str(x)
y_str = str(y)
return (
x_str, y_str, maxiter, radius_pixels, radius_str, dps, order,
xn_str, yn_str, size_estimates
)
def newton_search(self, x, y, pix, dps, maxiter: int=100000,
radius_pixels: int=3):
""" x, y : coordinates of the event """
(
x_str, y_str, maxiter, radius_pixels, radius_str, dps, order,
xn_str, yn_str, size_estimates
) = self._newton_search(
x, y, pix, dps, maxiter, radius_pixels
)
if size_estimates is not None:
(nucleus_size, julia_size) = size_estimates
else:
nucleus_size = None
julia_size = None
res_str = f"""
newton_search = {{
"x_start": "{x_str}",
"y_start": "{y_str}",
"maxiter": {maxiter},
"radius_pixels": {radius_pixels},
"radius": "{radius_str}",
"calculation dps": {dps}
"order": {order}
"x_nucleus": "{xn_str}",
"y_nucleus": "{yn_str}",
"nucleus_size": "{nucleus_size}",
"julia_size": "{julia_size}",
}}
"""
return res_str
#==============================================================================
# Numba JIT functions
#==============================================================================
Xr_template = fsx.Xrange_array.zeros([1], dtype=np.complex128)
Xr_float_template = fsx.Xrange_array.zeros([1], dtype=np.float64)
USER_INTERRUPTED = 1
STG_COMPRESSED = fs.settings.BLA_compression
STG_SKIP_MASK = (1 << STG_COMPRESSED) - 1
@numba.njit(nogil=True, fastmath=True, error_model="numpy")
def numba_cycles_perturb(
c_pix, Z, U, stop_reason, stop_iter,
initialize, iterate,
Zn_path, dZndc_path, dZndz_path,
has_xr, ref_index_xr, ref_xr, ref_div_iter, ref_order,
drift_xr, dx_xr, proj_impl, lin_mat, lin_scale_xr,
kc, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier,
_interrupted
):
"""
Run the perturbation cycles
Parameters:
-----------
Z, U, c, stop_reason, stop_iter
result arrays
iterate :
numba jitted function
Ref_path:
Ref_path numba object
n_iter:
current iteration
"""
nz, npts = Z.shape
Z_xr = Xr_template.repeat(nz)
Z_xr_trigger = np.ones((nz,), dtype=np.bool_)
for ipt in range(npts):
refpath_ptr = np.zeros((2,), dtype=np.int32)
out_is_xr = np.zeros((2,), dtype=numba.bool_)
out_xr = Xr_template.repeat(2)
Zpt = Z[:, ipt]
Upt = U[:, ipt]
cpt, c_xr = ref_path_c_from_pix(
c_pix[ipt], proj_impl, lin_mat, lin_scale_xr, drift_xr
)
stop_pt = stop_reason[:, ipt]
initialize(c_xr, Zpt, Z_xr, Z_xr_trigger, Upt, kc, dx_xr)
n_iter = iterate(
cpt, c_xr, Zpt, Z_xr, Z_xr_trigger, Upt, stop_pt, # n_iter_init,
Zn_path, dZndc_path, dZndz_path, has_xr, ref_index_xr, ref_xr, ref_div_iter, ref_order,
refpath_ptr, out_is_xr, out_xr, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier, c_pix[ipt]
)
stop_iter[0, ipt] = n_iter
stop_reason[0, ipt] = stop_pt[0]
if _interrupted[0]:
return USER_INTERRUPTED
return 0
def numba_initialize(zn, dzndc, dzndz):
@numba.njit(fastmath=True, error_model="numpy")
def numba_init_impl(c_xr, Z, Z_xr, Z_xr_trigger, U, kc, dx_xr):
"""
Initialize 'in place' :
Z[zn], Z[dzndz], Z[dzndc]
"""
Z_xr[zn] = fsxn.to_Xrange_scalar(Z[zn])
if dzndc != -1:
Z_xr[dzndc] = fsxn.to_Xrange_scalar(Z[dzndc])
if (dzndz != -1):
Z[dzndz] = 0.
return numba_init_impl
# Defines iterate via a function factory - jitted implementation
def numba_iterate(
max_iter, M_divergence_sq, epsilon_stationnary_sq,
reason_max_iter, reason_M_divergence, reason_stationnary,
xr_detect_activated, BLA_activated,
zn, dzndc, dzndz,
p_iter_zn, p_iter_dzndz, p_iter_dzndc,
calc_dzndc, calc_dzndz,
calc_orbit, i_znorbit, backshift, zn_iterate # Added args
):
@numba.njit(fastmath=True, error_model="numpy")
def numba_impl(
c, c_xr, Z, Z_xr, Z_xr_trigger, U, stop,
Zn_path, dZndc_path, dZndz_path, has_xr, ref_index_xr, ref_xr, ref_div_iter, ref_order,
refpath_ptr, out_is_xr, out_xr, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier, c_pix
):
"""
Parameters
----------
c, c_xr: c and it "Xrange" counterparts
Z, Z_xr: idem for result vector Z
Z_xr_trigger : bolean, activated when Z_xr need to be used
"""
record_zero = Xr_template.repeat(1)[0]
w_iter = 0
n_iter = 0
if w_iter >= ref_order:
w_iter = w_iter % ref_order
if calc_orbit:
div_shift = 0
orbit_zn1 = Z[zn]
orbit_zn2 = Z[zn]
orbit_i1 = 0
orbit_i2 = 0
if calc_dzndz:
nullify_dZndz = False
w_wraped = len(dZndz_path) - 1
# We know that :
# ref_orbit_len = max_iter + 1 >= ref_div_iter
# if order is not None:
# ref_orbit_len = min(order, ref_orbit_len)
ref_orbit_len = Zn_path.shape[0]
first_invalid_index = min(ref_orbit_len, ref_div_iter, ref_order)
M_out = np.empty((2,), dtype=np.complex128)
# Index to keep track if we use the wraped index of dzndc
bool_dyn_rebase = True
while True:
#==========================================================
# Try a BLA_step
if BLA_activated and (w_iter & STG_SKIP_MASK) == 0:
# [ A 0 0]
# M = [ 0 A B]
# [ 0 0 1]
#
# [dzndc]
# Zn = [ zn]
# [ c]
#
# Z_(n+1) = M * Zn
#
step = ref_BLA_get(
M_bla, r_bla, bla_len, stages_bla, Z[zn], w_iter,
first_invalid_index, M_out, True
)
if step != 0:
n_iter += step
w_iter = (w_iter + step) % ref_order
if xr_detect_activated:
Z_xr[zn] = M_out[0] * Z_xr[zn] + M_out[1] * c_xr
# /!\ keep this, needed for next BLA step
Z[zn] = fsxn.to_standard(Z_xr[zn])
if calc_dzndc:
Z_xr[dzndc] = M_out[0] * Z_xr[dzndc]
if calc_dzndz:
Z_xr[dzndz] = M_out[0] * Z_xr[dzndz]
else:
# just the usual BLA step
Z[zn] = M_out[0] * Z[zn] + M_out[1] * c
if calc_dzndc:
Z[dzndc] = M_out[0] * Z[dzndc]
if calc_dzndz:
Z[dzndz] = M_out[0] * Z[dzndz]
continue
#==================================================================
# BLA failed, launching a full perturbation iteration
n_iter += 1 # the indice we are going to compute now
# Load reference point value @ w_iter
# refpath_ptr = [prev_idx, curr_xr]
if xr_detect_activated:
ref_zn = ref_path_get(
Zn_path, w_iter,
has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, 0
)
ref_zn_xr = ensure_xr(ref_zn, out_xr[0], out_is_xr[0])
else:
ref_zn = Zn_path[w_iter]
#==================================================================
# Pertubation iter block
#------------------------------------------------------------------
# dzndc subblock
if calc_dzndc:
ref_dzndc = dZndc_path[w_iter]
if bool_dyn_rebase: # Avoid wraped value at 0
if xr_detect_activated:
ref_dzndc = record_zero # Record type
else:
ref_dzndc = 0.
if xr_detect_activated:
p_iter_dzndc(Z_xr, ref_zn_xr, ref_dzndc)
else:
p_iter_dzndc(Z, ref_zn, ref_dzndc)
#------------------------------------------------------------------
# Interior detection
if calc_dzndz:
if nullify_dZndz:
ref_dzndz = dZndz_path[0] # This may be Xrange
else:
# Note: no need of special case for n_iter == order here
# It is managed before during glitch correction step
ref_dzndz = dZndz_path[w_iter]
if xr_detect_activated:
p_iter_dzndz(Z_xr, ref_zn_xr, ref_dzndz)
else:
p_iter_dzndz(Z, ref_zn, ref_dzndz)
#------------------------------------------------------------------
# zn subblok
if xr_detect_activated:
p_iter_zn(Z_xr, ref_zn_xr, c_xr)# in place mod
# std is used for div condition
Z[zn] = fsxn.to_standard(Z_xr[zn])
else:
p_iter_zn(Z, ref_zn, c)
# Increment w_iter just before the stopping conditions
w_iter += 1
if w_iter >= ref_order:
w_iter = w_iter % ref_order
#==================================================================
# Stopping condition: maximum iter reached
if n_iter >= max_iter:
stop[0] = reason_max_iter
break
#==================================================================
# Stopping condition: Interior points detection (dzndz)
if calc_dzndz:
if nullify_dZndz:
ref_dzndz_next = dZndz_path[0]
else:
ref_dzndz_next = dZndz_path[w_iter]
if n_iter == ref_order:
ref_dzndz_next = dZndz_path[w_wraped]
if xr_detect_activated:
ZdZ = Z_xr[dzndz] + ref_dzndz_next
bool_stationnary = (
fsxn.extended_abs2(ZdZ)
< epsilon_stationnary_sq
)
else:
ZdZ = Z[dzndz] + ref_dzndz_next
bool_stationnary = (
ZdZ.real ** 2 + ZdZ.imag ** 2
< epsilon_stationnary_sq
)
if bool_stationnary:
stop[0] = reason_stationnary
break
#==================================================================
# Stopping condition: divergence
# ZZ = "Total" z + dz
if xr_detect_activated:
ref_zn_next = fs.perturbation.ref_path_get(
Zn_path, w_iter,
has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, 0
)
else:
ref_zn_next = Zn_path[w_iter]
# div condition computation with std only
ZZ = Z[zn] + ref_zn_next
full_sq_norm = ZZ.real ** 2 + ZZ.imag ** 2
# Storing the orbit for future use
if calc_orbit:
div = n_iter // backshift
if div > div_shift:
div_shift = div
orbit_i2 = orbit_i1
orbit_zn2 = orbit_zn1
orbit_i1 = n_iter
orbit_zn1 = ZZ
# Flagged as 'diverging'
bool_infty = (full_sq_norm > M_divergence_sq)
if bool_infty:
stop[0] = reason_M_divergence
break
#==================================================================
# Glitch correction - reference point diverging
if (w_iter >= ref_div_iter - 1):
# Rebasing - we are already big no underflow risk
Z[zn] = ZZ
if xr_detect_activated:
Z_xr[zn] = fsxn.to_Xrange_scalar(ZZ)
if calc_dzndc:
Z_xr[dzndc] = Z_xr[dzndc] + dZndc_path[w_iter]
if calc_dzndz:
if not(nullify_dZndz):
added_index = w_iter
if n_iter == ref_order:
added_index = w_wraped
Z_xr[dzndz] = (
Z_xr[dzndz] + dZndz_path[added_index]
)
nullify_dZndz = True
else:
if calc_dzndc:
# not a cycle, dZndc_path[0] == 0
Z[dzndc] += dZndc_path[w_iter]
if calc_dzndz:
if not(nullify_dZndz):
if n_iter == ref_order:
Z[dzndz] += dZndz_path[w_wraped]
else:
Z[dzndz] += dZndz_path[w_iter]
nullify_dZndz = True
w_iter = 0
continue
#==================================================================
# Glitch correction - "Dynamic glitch"
bool_dyn_rebase = (
(abs(ZZ.real) <= abs(Z[zn].real))
and (abs(ZZ.imag) <= abs(Z[zn].imag))
)
if bool_dyn_rebase:# and False:
if xr_detect_activated:
# Can we *really* rebase ??
# Note: if Z[zn] underflows we might miss a rebase
# So we cast everything to xr
Z_xrn = Z_xr[zn]
if out_is_xr[0]:
# Reference underflows, use available xr ref
ZZ_xr = Z_xrn + out_xr[0]
else:
ZZ_xr = Z_xrn + ref_zn_next
bool_dyn_rebase_xr = (
fsxn.extended_abs2(ZZ_xr)
<= fsxn.extended_abs2(Z_xrn)
)
if bool_dyn_rebase_xr:
Z_xr[zn] = ZZ_xr
# /!\ keep this, needed for next BLA step
Z[zn] = fsxn.to_standard(ZZ_xr)
if calc_dzndc:
Z_xr[dzndc] = (
Z_xr[dzndc] + dZndc_path[w_iter]
)
if calc_dzndz:
if not(nullify_dZndz):
added_index = w_iter
if n_iter == ref_order:
added_index = w_wraped
Z_xr[dzndz] = (
Z_xr[dzndz] + dZndz_path[added_index]
)
nullify_dZndz = True
w_iter = 0
continue
else:
# No risk of underflow - safe to rebase
Z[zn] = ZZ
if calc_dzndc:
Z[dzndc] += dZndc_path[w_iter]
if calc_dzndz:
if not(nullify_dZndz):
if n_iter == ref_order:
Z[dzndz] += dZndz_path[w_wraped]
else:
Z[dzndz] += dZndz_path[w_iter]
nullify_dZndz = True
w_iter = 0
continue
# End of iterations for this point
U[0] = w_iter
if xr_detect_activated:
Z[zn] = fsxn.to_standard(Z_xr[zn]) + Zn_path[w_iter]
if calc_dzndc: # and n_iter > 1:
Z[dzndc] = fsxn.to_standard(Z_xr[dzndc] + dZndc_path[w_iter])
else:
Z[zn] += Zn_path[w_iter]
if calc_dzndc: # and n_iter > 1:
# if n_iter != 1:
Z[dzndc] += dZndc_path[w_iter]
if proj_dzndc_modifier is not None:
Z[dzndc] *= proj_dzndc_modifier(c_pix)
if calc_orbit: # Finalizing the orbit
zn_orbit = orbit_zn2
CC = c + Zn_path[1]
while orbit_i2 < n_iter - backshift:
zn_orbit = zn_iterate(zn_orbit, CC)
orbit_i2 += 1
Z[i_znorbit] = zn_orbit
return n_iter
return numba_impl
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Non-holomorphic perturbation iterations
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@numba.njit(nogil=True, fastmath=True, error_model="numpy")
def numba_cycles_perturb_BS(
c_pix, Z, U, stop_reason, stop_iter,
initialize, iterate,
Zn_path, dXnda_path, dXndb_path, dYnda_path, dYndb_path,
has_xr, ref_index_xr, refx_xr, refy_xr, ref_div_iter, ref_order,
driftx_xr, drifty_xr, dx_xr, proj_impl, lin_mat, lin_scale_xr,
kc, M_bla, r_bla, bla_len, stages_bla, # suppressed P, n_iter_init
proj_dzndc_modifier,
_interrupted
):
nz, npts = Z.shape
Z_xr = Xr_float_template.repeat(nz)
Z_xr_trigger = np.ones((nz,), dtype=np.bool_)
for ipt in range(npts):
refpath_ptr = np.zeros((2,), dtype=np.int32)
out_is_xr = np.zeros((2,), dtype=numba.bool_)
out_xr = Xr_float_template.repeat(4)
Zpt = Z[:, ipt]
Upt = U[:, ipt]
apt, bpt, a_xr, b_xr = ref_path_c_from_pix_BS(
c_pix[ipt], proj_impl, lin_mat, lin_scale_xr, driftx_xr, drifty_xr
)
stop_pt = stop_reason[:, ipt]
initialize(Zpt, Z_xr)
n_iter = iterate(
apt, bpt, a_xr, b_xr, Zpt, Z_xr, Z_xr_trigger,
Upt, stop_pt,
Zn_path, dXnda_path, dXndb_path, dYnda_path, dYndb_path,
has_xr, ref_index_xr, refx_xr, refy_xr, ref_div_iter, ref_order,
refpath_ptr, out_is_xr, out_xr, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier, c_pix[ipt]
)
stop_iter[0, ipt] = n_iter
stop_reason[0, ipt] = stop_pt[0]
if _interrupted[0]:
return USER_INTERRUPTED
return 0
def numba_initialize_BS(xn, yn, dxnda, dxndb, dynda, dyndb):
@numba.njit
def numba_init_impl(Z, Z_xr):
"""
Only : initialize the Xrange
"""
for key in (xn, yn, dxnda, dxndb, dynda, dyndb):
if key!= -1:
Z_xr[key] = fsxn.to_Xrange_scalar(Z[key])
return numba_init_impl
# Defines iterate for non-holomorphic function via a function factory
# jitted implementation
def numba_iterate_BS(
M_divergence_sq, max_iter, reason_max_iter, reason_M_divergence,
xr_detect_activated, BLA_activated,
calc_hessian,
xn, yn, dxnda, dxndb, dynda, dyndb,
p_iter_zn, p_iter_hessian,
calc_orbit, i_xnorbit, i_ynorbit, backshift, xnyn_iterate # new args
):
@numba.njit
def numba_impl(
a, b, a_xr, b_xr, Z, Z_xr, Z_xr_trigger,
U, stop,
Zn_path, dXnda_path, dXndb_path, dYnda_path, dYndb_path,
has_xr, ref_index_xr, refx_xr, refy_xr, ref_div_iter, ref_order,
refpath_ptr, out_is_xr, out_xr, M_bla, r_bla, bla_len, stages_bla,
proj_dzndc_modifier, c_pix
):
"""
Parameters
----------
c, c_xr: c and it "Xrange" counterparts
Z, Z_xr: idem for result vector Z
Z_xr_trigger : bolean, activated when Z_xr need to be used
"""
record_zero = Xr_float_template.repeat(1)[0]
# Wrapped iteration if we reach the cycle order
w_iter = 0
n_iter = 0
if w_iter >= ref_order:
w_iter = w_iter % ref_order
if calc_orbit:
div_shift = 0
orbit_xn1 = Z[xn]
orbit_xn2 = Z[xn]
orbit_yn1 = Z[yn]
orbit_yn2 = Z[yn]
orbit_i1 = 0
orbit_i2 = 0
# We know that :
# ref_orbit_len = max_iter + 1 >= ref_div_iter
# if order is not None:
# ref_orbit_len = min(order, ref_orbit_len)
ref_orbit_len = Zn_path.shape[0]
first_invalid_index = min(ref_orbit_len, ref_div_iter, ref_order)
M_out = np.empty((8,), dtype=np.float64)
# Index to keep track if we use the wraped index of dzndc
bool_dyn_rebase = True
while True:
#==========================================================
# Try a BLA_step
if BLA_activated and (w_iter & STG_SKIP_MASK) == 0: # and False:
Zn = Z[xn] + 1j * Z[yn]
step = ref_BLA_get(
M_bla, r_bla, bla_len, stages_bla, Zn, w_iter,
first_invalid_index, M_out, False
)
if step != 0:
n_iter += step
w_iter = (w_iter + step) % ref_order
if xr_detect_activated:
apply_BLA_BS(M_out, Z_xr, a_xr, b_xr, xn, yn)
# /!\ keep this, needed for next BLA step
Z[xn] = fsxn.to_standard(Z_xr[xn])
Z[yn] = fsxn.to_standard(Z_xr[yn])
if calc_hessian:
apply_BLA_deriv_BS(M_out, Z_xr, a_xr, b_xr,
dxnda, dxndb, dynda, dyndb)
else:
# just the usual BLA step
apply_BLA_BS(M_out, Z, a, b, xn, yn)
if calc_hessian:
apply_BLA_deriv_BS(M_out, Z, a, b,
dxnda, dxndb, dynda, dyndb)
continue
#==================================================================
# BLA failed, launching a full perturbation iteration
n_iter += 1 # the indice we are going to compute now
# Load reference point value @ w_iter
# refpath_ptr = [prev_idx, curr_xr]
if xr_detect_activated:
ref_zn = ref_path_get_BS(
Zn_path, w_iter,
has_xr, ref_index_xr, refx_xr, refy_xr, refpath_ptr,
out_is_xr, out_xr, 0, 1
)
ref_xn_xr, ref_yn_xr = ensure_xr_BS(
ref_zn, out_xr[0], out_xr[1], out_is_xr[0]
)
else:
ref_zn = Zn_path[w_iter]
ref_xn = ref_zn.real
ref_yn = ref_zn.imag
#==================================================================
# Pertubation iter block
#------------------------------------------------------------------
# dzndc subblock
if calc_hessian:
ref_dxnda = dXnda_path[w_iter] # Note this may be Xrange
ref_dxndb = dXndb_path[w_iter]
ref_dynda = dYnda_path[w_iter]
ref_dyndb = dYndb_path[w_iter]
if bool_dyn_rebase: # Avoid the wraped value at 0
if xr_detect_activated:
ref_dxnda = record_zero # Record type
ref_dxndb = record_zero # Record type
ref_dynda = record_zero # Record type
ref_dyndb = record_zero # Record type
else:
ref_dxnda = 0.
ref_dxndb = 0.
ref_dynda = 0.
ref_dyndb = 0.
if xr_detect_activated:
p_iter_hessian(
Z_xr, ref_xn_xr, ref_yn_xr,
ref_dxnda, ref_dxndb, ref_dynda, ref_dyndb
)
else:
p_iter_hessian(
Z, ref_xn, ref_yn,
ref_dxnda, ref_dxndb, ref_dynda, ref_dyndb
)
#------------------------------------------------------------------
# zn subblok
if xr_detect_activated:
# Z_xr[zn] = p_iter_zn(Z_xr, ref_zn_xr, c_xr)# in place mod
p_iter_zn(Z_xr, ref_xn_xr, ref_yn_xr, a_xr, b_xr)# in place mod
# std is used for div condition
Z[xn] = fsxn.to_standard(Z_xr[xn])
Z[yn] = fsxn.to_standard(Z_xr[yn])
else:
# Z[zn] = p_iter_zn(Z, ref_zn, c)
p_iter_zn(Z, ref_xn, ref_yn, a, b)
#==================================================================
# Stopping condition: maximum iter reached
if n_iter >= max_iter:
stop[0] = reason_max_iter
break
#==================================================================
# Stopping condition: divergence
# ZZ = "Total" z + dz
w_iter += 1
if w_iter >= ref_order:
w_iter = w_iter % ref_order
if xr_detect_activated:
ref_zn_next = ref_path_get_BS(
Zn_path, w_iter,
has_xr, ref_index_xr, refx_xr, refy_xr, refpath_ptr,
out_is_xr, out_xr, 0, 1
)
else:
ref_zn_next = Zn_path[w_iter]
# div condition computation with std only
XX = Z[xn] + ref_zn_next.real
YY = Z[yn] + ref_zn_next.imag
full_sq_norm = XX ** 2 + YY ** 2
# Storing the orbit for future use
if calc_orbit:
div = n_iter // backshift
if div > div_shift:
div_shift = div
orbit_i2 = orbit_i1
orbit_xn2 = orbit_xn1
orbit_yn2 = orbit_yn1
orbit_i1 = n_iter
orbit_xn1 = XX
orbit_yn1 = YY
# Flagged as 'diverging'
bool_infty = (full_sq_norm > M_divergence_sq)
if bool_infty:
stop[0] = reason_M_divergence
break
#==================================================================
# Glitch correction - reference point diverging
if (w_iter >= ref_div_iter - 1):
# Rebasing - we are already big no underflow risk
Z[xn] = XX
Z[yn] = YY
if xr_detect_activated:
Z_xr[xn] = fsxn.to_Xrange_scalar(XX)
Z_xr[yn] = fsxn.to_Xrange_scalar(YY)
# not a cycle, dZndc_path[0] == 0
# assert (dXnda_path[w_iter] == 0.)
if calc_hessian:
Z_xr[dxnda] = Z_xr[dxnda] + dXnda_path[w_iter]
Z_xr[dxndb] = Z_xr[dxndb] + dXndb_path[w_iter]
Z_xr[dynda] = Z_xr[dynda] + dYnda_path[w_iter]
Z_xr[dyndb] = Z_xr[dyndb] + dYndb_path[w_iter]
else:
if calc_hessian:
# not a cycle, dZndc_path[0] == 0
Z[dxnda] += dXnda_path[w_iter]
Z[dxndb] += dXndb_path[w_iter]
Z[dynda] += dYnda_path[w_iter]
Z[dyndb] += dYndb_path[w_iter]
w_iter = 0
continue
#==================================================================
# Glitch correction - "dynamic glitch"
bool_dyn_rebase = (
(abs(XX) <= abs(Z[xn])) and (abs(YY) <= abs(Z[yn]))
)
if bool_dyn_rebase:
if xr_detect_activated:
# Can we *really* rebase ??
# Note: if Z[zn] underflows we might miss a rebase
# So we cast everything to xr
X_xrn = Z_xr[xn]
Y_xrn = Z_xr[yn]
if out_is_xr[0]:
# Reference underflows, use available xr ref
XX_xr = X_xrn + out_xr[0]
YY_xr = Y_xrn + out_xr[1]
else:
XX_xr = X_xrn + ref_zn_next.real
YY_xr = Y_xrn + ref_zn_next.imag
bool_dyn_rebase_xr = (
(XX_xr * XX_xr + YY_xr * YY_xr)
<= (X_xrn * X_xrn + Y_xrn * Y_xrn)
)
if bool_dyn_rebase_xr:
Z_xr[xn] = XX_xr
Z_xr[yn] = YY_xr
# /!\ keep this, needed for next BLA step
Z[xn] = fsxn.to_standard(XX_xr)
Z[yn] = fsxn.to_standard(YY_xr)
if calc_hessian:
Z_xr[dxnda] = (
Z_xr[dxnda] + dXnda_path[w_iter]
)
Z_xr[dxndb] = (
Z_xr[dxndb] + dXndb_path[w_iter]
)
Z_xr[dynda] = (
Z_xr[dynda] + dYnda_path[w_iter]
)
Z_xr[dyndb] = (
Z_xr[dyndb] + dYndb_path[w_iter]
)
w_iter = 0
continue
else:
# No risk of underflow - safe to rebase
Z[xn] = XX
Z[yn] = YY
if calc_hessian:
Z[dxnda] += dXnda_path[w_iter]
Z[dxndb] += dXndb_path[w_iter]
Z[dynda] += dYnda_path[w_iter]
Z[dyndb] += dYndb_path[w_iter]
w_iter = 0
continue
# End of iterations for this point
U[0] = w_iter
# Total zn = Zn + zn
ref_zn = Zn_path[w_iter]
if xr_detect_activated:
Z[xn] = fsxn.to_standard(Z_xr[xn] + ref_zn.real)
Z[yn] = fsxn.to_standard(Z_xr[yn] + ref_zn.imag)
if calc_hessian:
Z[dxnda] = fsxn.to_standard(
Z_xr[dxnda] + dXnda_path[w_iter])
Z[dxndb] = fsxn.to_standard(
Z_xr[dxndb] + dXndb_path[w_iter])
Z[dynda] = fsxn.to_standard(
Z_xr[dynda] + dYnda_path[w_iter])
Z[dyndb] = fsxn.to_standard(
Z_xr[dyndb] + dYndb_path[w_iter])
else:
Z[xn] += ref_zn.real
Z[yn] += ref_zn.imag
if calc_hessian:
Z[dxnda] += dXnda_path[w_iter]
Z[dxndb] += dXndb_path[w_iter]
Z[dynda] += dYnda_path[w_iter]
Z[dyndb] += dYndb_path[w_iter]
if proj_dzndc_modifier is not None:
Z[dxnda] *= proj_dzndc_modifier(c_pix)
Z[dxndb] *= proj_dzndc_modifier(c_pix)
Z[dynda] *= proj_dzndc_modifier(c_pix)
Z[dyndb] *= proj_dzndc_modifier(c_pix)
if calc_orbit: # Finalizing the orbit
xn_orbit = orbit_xn2
yn_orbit = orbit_yn2
AA = a + np.real(Zn_path[1])
BB = b + np.imag(Zn_path[1])
while orbit_i2 < n_iter - backshift:
xn_orbit, yn_orbit = xnyn_iterate(xn_orbit, yn_orbit, AA, BB)
orbit_i2 += 1
Z[i_xnorbit] = xn_orbit
Z[i_ynorbit] = yn_orbit
return n_iter
return numba_impl
@numba.njit(nogil=True, cache=True)
def apply_BLA_BS(M, Z, a, b, xn, yn):
Z_xn = M[0] * Z[xn] + M[1] * Z[yn] + M[4] * a + M[5] * b
Z_yn = M[2] * Z[xn] + M[3] * Z[yn] + M[6] * a + M[7] * b
Z[xn] = Z_xn
Z[yn] = Z_yn
@numba.njit(nogil=True, cache=True)
def apply_BLA_deriv_BS(M, Z, a, b, dxnda, dxndb, dynda, dyndb):
# assert dxnda >= 0
# assert dxndb < len(Z)
Z_dxnda = M[0] * Z[dxnda] + M[1] * Z[dynda]
Z_dxndb = M[0] * Z[dxndb] + M[1] * Z[dyndb]
Z_dynda = M[2] * Z[dxnda] + M[3] * Z[dynda]
Z_dyndb = M[2] * Z[dxndb] + M[3] * Z[dyndb]
Z[dxnda] = Z_dxnda
Z[dxndb] = Z_dxndb
Z[dynda] = Z_dynda
Z[dyndb] = Z_dyndb
#------------------------------------------------------------------------------
# Bilinear approximation
# Note: the bilinear arrays being cheap, they will not be stored but
# re-computed if needed
@numba.njit(nogil=True, cache=True, fastmath=True, error_model="numpy")
def numba_make_BLA(Zn_path, dfdz, kc, eps):
"""
Generates a compressed BLA tree with
- bilinear approximation coefficients A and B
- validaty radius
z_n+2**stg = f**stg(z_n, c) with |c| < r_stg_n is approximated by
z_n+2**stg = A_stg_n * z_n + B_stg_n * c
"""
# number of needed "stages" is (ref_orbit_len).bit_length()
# number of needed "stages" is (ref_orbit_len).bit_length()
kc_std = fsxn.to_standard(kc[0])
ref_orbit_len = Zn_path.shape[0]
# Compressed quantities
k_comp = 1 << STG_COMPRESSED
comp_bla_len = ref_orbit_len // k_comp
bla_dim = 2
M_bla = np.zeros((2 * comp_bla_len, bla_dim), dtype=numba.complex128)
r_bla = np.zeros((2 * comp_bla_len,), dtype=numba.float64)
# The temporary tree structure to get to the stored index
temp_M_bla = np.zeros((2 * k_comp, bla_dim), dtype=numba.complex128)
temp_r_bla = np.zeros((2 * k_comp,), dtype=numba.float64)
temp_M_bla_template = np.copy(temp_M_bla)
temp_r_bla_template = np.copy(temp_r_bla)
for i in range(0, comp_bla_len):
i_0 = 2 * i # BLA index for (i, 0)
temp_M_bla = np.copy(temp_M_bla_template)
temp_r_bla = np.copy(temp_r_bla_template)
for j in range(0, k_comp):
j_0 = 2 * j # BLA index for (j, 0)
# Define a BLA_step by:
# [ M[0] 0 0] [dzndc]
# M = [ 0 M[0] M[1]] Zn = [ zn]
# [ 0 0 1] [ c]
#
# Z_(n+1) = M * Zn
Zn_i = Zn_path[i * k_comp + j]
temp_M_bla[j_0, 0] = dfdz(Zn_i)
temp_M_bla[j_0, 1] = 1.
temp_r_bla[j_0] = eps * abs(temp_M_bla[j_0, 0])
# Combine step for temporary tree 'compression'
for stg in range(1, STG_COMPRESSED + 1):
combine_BLA(temp_M_bla, temp_r_bla, kc_std, stg, k_comp, eps)
stored_index = k_comp - 1
for d in range(bla_dim):
M_bla[i_0, d] = temp_M_bla[stored_index, d]
r_bla[i_0] = temp_r_bla[stored_index]
# Combine step for stored data
stages = _stages_bla(ref_orbit_len)
for stg in range(1, stages - STG_COMPRESSED):
combine_BLA(M_bla, r_bla, kc_std, stg, comp_bla_len, eps)
return M_bla.reshape(-1), r_bla, 2 * comp_bla_len, stages
@numba.njit(nogil=True, cache=True, fastmath=True, error_model="numpy")
def numba_make_BLA_BS(Zn_path, dfxdx, dfxdy, dfydx, dfydy, kc, eps):
"""
Generates a compressed BLA tree with
- bilinear approximation coefficients A and B
- validaty radius
z_n+2**stg = f**stg(z_n, c) with |c| < r_stg_n is approximated by
z_n+2**stg = A_stg_n * z_n + B_stg_n * c
"""
# number of needed "stages" is (ref_orbit_len).bit_length()
# number of needed "stages" is (ref_orbit_len).bit_length()
kc_std = fsxn.to_standard(kc[0])
ref_orbit_len = Zn_path.shape[0]
# Compressed quantities
k_comp = 1 << STG_COMPRESSED
comp_bla_len = ref_orbit_len // k_comp
bla_dim = 8
M_bla = np.zeros((2 * comp_bla_len, bla_dim), dtype=numba.float64)
r_bla = np.zeros((2 * comp_bla_len,), dtype=numba.float64)
# The temporary tree structure to get to the stored index
temp_M_bla = np.zeros((2 * k_comp, bla_dim), dtype=numba.float64)
temp_r_bla = np.zeros((2 * k_comp,), dtype=numba.float64)
temp_M_bla_template = np.copy(temp_M_bla)
temp_r_bla_template = np.copy(temp_r_bla)
for i in range(0, comp_bla_len):
i_0 = 2 * i # BLA index for (i, 0)
temp_M_bla = np.copy(temp_M_bla_template)
temp_r_bla = np.copy(temp_r_bla_template)
for j in range(0, k_comp):
j_0 = 2 * j # BLA index for (j, 0)
# Define a BLA_step by:
# Z_(n+1) = M * Zn where
# [dxnda]
# [dxndb]
# [dynda] [ M_0 0 0]
# Zn = [dyndb] M = [ 0 M_1 M_2]
# [ xn] [ 0 0 I]
# [ yn]
# [ a]
# [ b]
# [M[0] 0 M[1] 0 ]
# M_0 = [0 M[0] 0 M[1]]
# [M[2] 0 M[3] 0 ]
# [0 M[2] 0 M[3]]
#
# M_1 = [M[0] M[1]] M_1_init = [dfxdx dfxdy]
# [M[2] M[3]] [dfydx dfydy]
#
# M_2 = [M[4] M[5]] M_2_init = [1 0]
# [M[6] M[7]] [0 -1]
Zn_i = Zn_path[i * k_comp + j]
Xn_i = Zn_i.real
Yn_i = Zn_i.imag
temp_M_bla[j_0, 0] = dfxdx(Xn_i, Yn_i)
temp_M_bla[j_0, 1] = dfxdy(Xn_i, Yn_i)
temp_M_bla[j_0, 2] = dfydx(Xn_i, Yn_i)
temp_M_bla[j_0, 3] = dfydy(Xn_i, Yn_i)
temp_M_bla[j_0, 4] = 1.
temp_M_bla[j_0, 5] = 0.
temp_M_bla[j_0, 6] = 0.
temp_M_bla[j_0, 7] = -1.
temp_r_bla[j_0] = eps * min(abs(Xn_i), abs(Yn_i))
# Combine step for temporary tree 'compression'
for stg in range(1, STG_COMPRESSED + 1):
combine_BLA_BS(temp_M_bla, temp_r_bla, kc_std, stg, k_comp, eps)
stored_index = k_comp - 1
for d in range(bla_dim):
M_bla[i_0, d] = temp_M_bla[stored_index, d]
r_bla[i_0] = temp_r_bla[stored_index]
# Combine step for stored data
stages = _stages_bla(ref_orbit_len)
for stg in range(1, stages - STG_COMPRESSED):
combine_BLA_BS(M_bla, r_bla, kc_std, stg, comp_bla_len, eps)
return M_bla.reshape(-1), r_bla, 2 * comp_bla_len, stages
@numba.njit(nogil=True, cache=True)
def _stages_bla(ref_orbit_len):
"""
number of needed "stages" (ref_orbit_len).bit_length()
"""
return int(np.ceil(np.log2(ref_orbit_len)))
@numba.njit(nogil=True, cache=True, fastmath=True, error_model="numpy")
def combine_BLA(M, r, kc_std, stg, ref_orbit_len, eps):
""" Populate successive stages of a BLA tree
A_bla, B_bla, r_bla : data of the BLA tree
kc : majorant of |c|
stg : stage of the tree that is populated by merging (stg - 1) items
ref_orbit_len : the len for the reference orbit
"""
# Combine all BVA at stage stg-1 to make stage stg with stg > 0
step = (1 << stg)
for i in range(0, ref_orbit_len - step + 1, step): # +1 for power of 2 case
ii = i + (step // 2)
# If ref_orbit_len is not a power of 2, we might get outside the array
if ii >= ref_orbit_len:
break
index1 = BLA_index(i, stg - 1)
index2 = BLA_index(ii, stg - 1)
index_res = BLA_index(i, stg)
# Combines linear approximations
# M_res = [ M2[0] M2[1]] * [ M1[0] M1[1]]
# [ 0 1] [ 0 1]
M[index_res, 0] = M[index2, 0] * M[index1, 0]
M[index_res, 1] = M[index2, 0] * M[index1, 1] + M[index2, 1]
# Combines the validity radii
r1 = r[index1]
r2 = r[index2]
# r1 is a direct criteria however for r2 we need to go 'backw the flow'
# z0 -> z1 -> z2 with z1 = A1 z0 + B1 c, |z1| < r2
# Valid if:
# |A1 z0| + |B1 c| < r2
# |z0| < |r2 - |B1 c|| / |A1|
# Note: "Strict inequality factor" of 0.95 is mandatory to pass the
# glitch test
mA1 = np.abs(M[index1, 0])
mB1 = np.abs(M[index1, 1])
r2_backw = 0.95 * max(0., (r2 - mB1 * kc_std) / max(mA1, eps))
r[index_res] = min(r1, r2_backw)
@numba.njit(nogil=True, cache=True)
def combine_BLA_BS(M, r, kc_std, stg, ref_orbit_len, eps):
""" Populate successive stages of a BLA tree
A_bla, B_bla, r_bla : data of the BLA tree
kc : majorant of |c|
stg : stage of the tree that is populated by merging (stg - 1) items
ref_orbit_len : the len for the reference orbit
"""
# Combine all BVA at stage stg-1 to make stage stg with stg > 0
step = (1 << stg)
for i in range(0, ref_orbit_len - step + 1, step):
ii = i + (step // 2)
# If ref_orbit_len is not a power of 2, we might get outside the array
if ii >= ref_orbit_len:
break
index1 = BLA_index(i, stg - 1)
index2 = BLA_index(ii, stg - 1)
index_res = BLA_index(i, stg)
# Combines linear approximations
# M = [M2_1 M2_2] x [M1_1 M1_2]
# [ 0 I] [ 0 I]
# Mx_1 = [M[0] M[1]] Mx_2 = [M[4] M[5]]
# [M[2] M[3]] [M[6] M[7]]
# Mres_1 = M2_1 * M1_1
# Mres_2 = M2_1 * M1_1 + M2_2
# Mres_1 = M2_1 * M1_1 :
M[index_res, 0] = (
M[index2, 0] * M[index1, 0] + M[index2, 1] * M[index1, 2]
)
M[index_res, 1] = (
M[index2, 0] * M[index1, 1] + M[index2, 1] * M[index1, 3]
)
M[index_res, 2] = (
M[index2, 2] * M[index1, 0] + M[index2, 3] * M[index1, 2]
)
M[index_res, 3] = (
M[index2, 2] * M[index1, 1] + M[index2, 3] * M[index1, 3]
)
# Mres_2 = M2_1 * M1_1 + M2_2
M[index_res, 4] = (
M[index2, 0] * M[index1, 4] + M[index2, 1] * M[index1, 6]
+ M[index2, 4]
)
M[index_res, 5] = (
M[index2, 0] * M[index1, 5] + M[index2, 1] * M[index1, 7]
+ M[index2, 5]
)
M[index_res, 6] = (
M[index2, 2] * M[index1, 4] + M[index2, 3] * M[index1, 6]
+ M[index2, 6]
)
M[index_res, 7] = (
M[index2, 2] * M[index1, 5] + M[index2, 3] * M[index1, 7]
+ M[index2, 7]
)
# Combines the validity radii
r1 = r[index1]
r2 = r[index2]
# r1 is a direct criteria however for r2 we need to go 'backw the flow'
# z0 -> z1 -> z2 with z1 = A1 z0 + B1 c, |z1| < r2
mA1 = max(
np.abs(M[index1, 0]),
np.abs(M[index1, 1]),
np.abs(M[index1, 2]),
np.abs(M[index1, 3]),
)
mB1 = max(
np.abs(M[index1, 4]),
np.abs(M[index1, 5]),
np.abs(M[index1, 6]),
np.abs(M[index1, 7]),
)
r2_backw = 0.95 * max(0., (r2 - mB1 * kc_std) / max(mA1, eps))
r[index_res] = min(r1, r2_backw)
@numba.njit(nogil=True, cache=True)
def BLA_index(i, stg):
"""
Return the indices in BLA table for this iteration and stage
this is the jump from i to j = i + (1 << stg)
"""
return (2 * i) + ((1 << stg) - 1)
@numba.njit(nogil=True, cache=True)
def ref_BLA_get(M_bla, r_bla, bla_len, stages_bla, zn, n_iter,
first_invalid_index, M_out, holomorphic):
"""
Paramters:
----------
A_bla, B_bla, r_bla: arrays
Bilinear approx tree
zn :
The current value of dz
n_iter :
The current iteration for ref pt
M_out :
Container for the Bla coefficient
holomorphic: boolean
True if the base function is holomorphic
Returns:
--------
step
The interation "jump" provided by this linear interpolation
"""
k_comp = (1 << STG_COMPRESSED)
_iter = (n_iter >> STG_COMPRESSED)
for stages in range(STG_COMPRESSED, stages_bla):
if _iter & 1:
break
_iter = _iter >> 1
# The first invalid step /!\
invalid_step = first_invalid_index - n_iter
# numba version of reversed(range(stages_bla)):
for stg in range(stages, STG_COMPRESSED - 1, -1):
step = (1 << stg)
if step >= invalid_step:
continue
index_bla = BLA_index(n_iter // k_comp, stg - STG_COMPRESSED)
r = r_bla[index_bla]
# /!\ Use strict comparisons here: to rule out underflow
if (abs(zn) < r):
if holomorphic:
loc = 2 * index_bla
M_out[0] = M_bla[loc]
M_out[1] = M_bla[loc + 1]
return step
else:
for i in range(8):
loc = 8 * index_bla
M_out[i] = M_bla[loc + i]
return step
return 0 # No BLA applicable
@numba.njit(nogil=True, cache=True)
def need_xr(x_std):
"""
True if norm L-inf of std is lower than xrange_zoom_level
"""
return (
(abs(np.real(x_std)) < fs.settings.xrange_zoom_level)
and (abs(np.imag(x_std)) < fs.settings.xrange_zoom_level)
)
@numba.njit(nogil=True, cache=True)
def ensure_xr(val_std, val_xr, is_xr):
"""
Return a valid Xrange. if not(Z_xr_trigger) we return x_std
converted
val_xr : complex128_Xrange_scalar or float64_Xrange_scalar
"""
if is_xr:
return fsxn.to_Xrange_scalar(val_xr)
else:
return fsxn.to_Xrange_scalar(val_std)
@numba.njit(nogil=True, cache=True)
def ensure_xr_BS(val_std, valx_xr, valy_xr, is_xr):
"""
Return a valid Xrange. if not(Z_xr_trigger) we return x_std
converted
val_xr : complex128_Xrange_scalar or float64_Xrange_scalar
"""
if is_xr:
return (
fsxn.to_Xrange_scalar(valx_xr),
fsxn.to_Xrange_scalar(valy_xr),
)
else:
return (
fsxn.to_Xrange_scalar(np.real(val_std)),
fsxn.to_Xrange_scalar(np.imag(val_std))
)
@numba.njit(nogil=True, cache=True)
def ref_path_c_from_pix(pix, proj_impl, lin_mat, lin_scale_xr, drift_xr):
"""
Returns the true c (coords from ref point) from the pixel coords
ie c = C - refp
Parameters
----------
pix : complex
pixel location in fraction of dx
Returns
-------
c, c_xr : c value as complex and as Xrange
"""
c_xr = lin_proj_impl(lin_mat, lin_scale_xr, proj_impl(pix)) + drift_xr[0]
return fsxn.to_standard(c_xr), c_xr
@numba.njit(nogil=True, cache=True)
def std_C_from_pix(pix, proj_impl, lin_mat, lin_scale_std, center):
"""
Returns the true C (C = cref + dc) from the pixel coords
Parameters
----------
pix : complex
pixel location in fraction of dx
Returns
-------
C : Full C value, as complex
"""
return center + lin_proj_impl(lin_mat, lin_scale_std, proj_impl(pix))
@numba.njit(nogil=True, cache=True)
def fill1d_std_C_from_pix(
c_pix, proj_impl, lin_mat, lin_scale_std, center, c_out):
""" same as std_C_from_pix but fills in-place a 1d vec """
nx = c_pix.shape[0]
for i in range(nx):
c_out[i] = std_C_from_pix(
c_pix[i], proj_impl, lin_mat, lin_scale_std, center
)
@numba.njit(nogil=True, cache=True)
def ref_path_c_from_pix_BS(
pix, proj_impl, lin_mat, lin_scale_xr, driftx_xr, drifty_xr):
"""
Returns the true a + i b (coords from ref point) from the pixel coords
Parameters
----------
pix : complex
pixel location in fraction of dx
dx : Xrange float
width of the image
Returns
-------
a, b, a_xr, b_xr : c value as complex and as Xrange
"""
c_xr = lin_proj_impl(lin_mat, lin_scale_xr, proj_impl(pix))
a_xr = np.real(c_xr) + driftx_xr[0]
b_xr = np.imag(c_xr) + drifty_xr[0]
return fsxn.to_standard(a_xr), fsxn.to_standard(b_xr), a_xr, b_xr
@numba.njit(nogil=True, cache=True)
def numba_dZndc_path(Zn_path, has_xr, ref_index_xr, ref_xr,
ref_div_iter, ref_order, dfdz,
scale_deriv_xr, xr_detect_activated):
"""
Compute dZndc in Xr, or std precision, depending on xr_detect_activated
"""
# Development note: the dx_xr which is imposed at each step
# imposes the deivative scaling
ref_orbit_len = Zn_path.shape[0]
valid_pts = min(ref_orbit_len, ref_div_iter)
xr_act = xr_detect_activated
# dx = fsxn.to_standard(dx_xr[0])
scale_deriv = fsxn.to_standard(scale_deriv_xr[0])
if xr_act:
dZndc_path = np.zeros((1,), dtype=numba.complex128) # dummy
dZndc_xr_path = Xr_template.repeat(ref_orbit_len)
refpath_ptr = np.zeros((2,), dtype=numba.int32)
out_is_xr = np.zeros((1,), dtype=numba.bool_)
out_xr = Xr_template.repeat(1)
for i in range(1, valid_pts):
ref_zn = ref_path_get(
Zn_path, i - 1,
has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, 0
)
ref_zn_xr = ensure_xr(ref_zn, out_xr[0], out_is_xr[0])
dZndc_xr_path[i] = dfdz(ref_zn_xr) * dZndc_xr_path[i - 1] + scale_deriv_xr[0]
if (i == ref_order - 1):
# /!\ We have a cycle, use the "wrapped" value at 0
# Note that this value will be used... a lot !
ref_zn = ref_path_get(
Zn_path, i,
has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, 0
)
ref_zn_xr = ensure_xr(ref_zn, out_xr[0], out_is_xr[0])
dZndc_xr_path[0] = dfdz(Zn_path[i]) * dZndc_xr_path[i] + scale_deriv_xr[0] # Note: the wraped is [0] !!
else:
dZndc_path = np.zeros((ref_orbit_len,), dtype=numba.complex128)
dZndc_xr_path = Xr_template.repeat(1) # dummy
for i in range(1, valid_pts):
dZndc_path[i] = dfdz(Zn_path[i - 1]) * dZndc_path[i - 1] + scale_deriv
if (i == ref_order - 1):
# /!\ We have a cycle, use the "wrapped" value at 0
# Note that this value will be used... a lot !
dZndc_path[0] = dfdz(Zn_path[i]) * dZndc_path[i] + scale_deriv # Note: the wraped is [0] !!
return dZndc_path, dZndc_xr_path
@numba.njit(nogil=True, cache=True)
def numba_dZndc_path_BS(Zn_path, has_xr, ref_index_xr, refx_xr, refy_xr,
ref_div_iter, ref_order, dfxdx, dfxdy, dfydx, dfydy,
scale_deriv_xr, xr_detect_activated):
"""
Compute dXndb, dXnda, dYnda , dYndb in Xr, or std precision, depending on
xr_detect_activated
"""
ref_orbit_len = Zn_path.shape[0]
valid_pts = min(ref_orbit_len, ref_div_iter)
scale_deriv = fsxn.to_standard(scale_deriv_xr[0])
if xr_detect_activated:
dXnda_path = np.zeros((1,), dtype=numba.float64) # dummy
dXndb_path = np.zeros((1,), dtype=numba.float64) # dummy
dYnda_path = np.zeros((1,), dtype=numba.float64) # dummy
dYndb_path = np.zeros((1,), dtype=numba.float64) # dummy
dXnda_xr_path = Xr_float_template.repeat(ref_orbit_len)
dXndb_xr_path = Xr_float_template.repeat(ref_orbit_len)
dYnda_xr_path = Xr_float_template.repeat(ref_orbit_len)
dYndb_xr_path = Xr_float_template.repeat(ref_orbit_len)
refpath_ptr = np.zeros((2,), dtype=numba.int32)
out_is_xr = np.zeros((1,), dtype=numba.bool_)
out_xr = Xr_float_template.repeat(2) # coord X, coord Y
for i in range(1, valid_pts):
from_i = i - 1
to_i = i
ref_zn = ref_path_get_BS(
Zn_path,from_i,
has_xr, ref_index_xr, refx_xr, refy_xr, refpath_ptr,
out_is_xr, out_xr, 0, 1
)
ref_xn_xr, ref_yn_xr = ensure_xr_BS(
ref_zn, out_xr[0], out_xr[1], out_is_xr[0]
)
incr_deriv_ref_BS(
dXnda_xr_path, dXndb_xr_path, dYnda_xr_path, dYndb_xr_path,
from_i, to_i, scale_deriv_xr[0],
ref_xn_xr, ref_yn_xr, dfxdx, dfxdy, dfydx, dfydy
)
if (i == ref_order - 1):
# /!\ We have a cycle, use the "wrapped" value at 0
# Note that this value will be used... a lot !
from_i = i
to_i = 0
ref_zn = ref_path_get_BS(
Zn_path, from_i,
has_xr, ref_index_xr, refx_xr, refy_xr, refpath_ptr,
out_is_xr, out_xr, 0, 1
)
ref_xn_xr, ref_yn_xr = ensure_xr_BS(
ref_zn, out_xr[0], out_xr[1], out_is_xr[0]
)
incr_deriv_ref_BS(
dXnda_xr_path, dXndb_xr_path, dYnda_xr_path, dYndb_xr_path,
from_i, to_i, scale_deriv_xr[0],
ref_xn_xr, ref_yn_xr, dfxdx, dfxdy, dfydx, dfydy
)
else:
dXnda_path = np.zeros((ref_orbit_len,), dtype=numba.float64)
dXndb_path = np.zeros((ref_orbit_len,), dtype=numba.float64)
dYnda_path = np.zeros((ref_orbit_len,), dtype=numba.float64)
dYndb_path = np.zeros((ref_orbit_len,), dtype=numba.float64)
dXnda_xr_path = Xr_float_template.repeat(1) # dummy
dXndb_xr_path = Xr_float_template.repeat(1) # dummy
dYnda_xr_path = Xr_float_template.repeat(1) # dummy
dYndb_xr_path = Xr_float_template.repeat(1) # dummy
for i in range(1, valid_pts):
from_i = i - 1
to_i = i
Xn = np.real(Zn_path[from_i]) #.real
Yn = np.imag(Zn_path[from_i]) #.imag
incr_deriv_ref_BS(
dXnda_path, dXndb_path, dYnda_path, dYndb_path,
from_i, to_i, scale_deriv,
Xn, Yn, dfxdx, dfxdy, dfydx, dfydy
)
if (i == ref_order - 1):
# /!\ We have a cycle, use the "wrapped" value at 0
# Note that this value will be used... a lot !
from_i = i
to_i = 0
Xn = np.real(Zn_path[from_i]) #.real
Yn = np.imag(Zn_path[from_i]) #.imag
incr_deriv_ref_BS(
dXnda_path, dXndb_path, dYnda_path, dYndb_path,
from_i, to_i, scale_deriv,
Xn, Yn, dfxdx, dfxdy, dfydx, dfydy
)
return (
dXnda_path, dXndb_path, dYnda_path, dYndb_path,
dXnda_xr_path, dXndb_xr_path, dYnda_xr_path, dYndb_xr_path
)
@numba.njit(nogil=True, cache=True)
def incr_deriv_ref_BS(
dXnda_path, dXndb_path, dYnda_path, dYndb_path,
from_i, to_i, dx,
Xn, Yn, dfxdx, dfxdy, dfydx, dfydy,
):
"""
H = [dfxdx dfxdy] [dfx] = H x [dx]
[dfydx dfydy] [dfy] [dy]
"""
dfxdx = dfxdx(Xn, Yn)
dfxdy = dfxdy(Xn, Yn)
dfydx = dfydx(Xn, Yn)
dfydy = dfydy(Xn, Yn)
dXnda = dXnda_path[from_i]
dXndb = dXndb_path[from_i]
dYnda = dYnda_path[from_i]
dYndb = dYndb_path[from_i]
dXnda_path[to_i] = dfxdx * dXnda + dfxdy * dYnda + dx
dXndb_path[to_i] = dfxdx * dXndb + dfxdy * dYndb
dYnda_path[to_i] = dfydx * dXnda + dfydy * dYnda
dYndb_path[to_i] = dfydx * dXndb + dfydy * dYndb - dx
@numba.njit(nogil=True, cache=True)
def numba_dZndz_path(Zn_path, has_xr, ref_index_xr, ref_xr,
ref_div_iter, ref_order, dfdz, xr_detect_activated):
"""
Compute dZndz in Xr, or std precision, depending on xr_detect_activated
Same as dZndc except the +dx is missing
"""
ref_orbit_len = Zn_path.shape[0]
valid_pts = min(ref_orbit_len, ref_div_iter)
xr_act = xr_detect_activated
if xr_act:
dZndz_path = np.zeros((1,), dtype=numba.complex128) # dummy
dZndz_xr_path = Xr_template.repeat(ref_orbit_len + 1)
dZndz_xr_path[1] = fsxn.one()
refpath_ptr = np.zeros((2,), dtype=numba.int32)
out_is_xr = np.zeros((1,), dtype=numba.bool_)
out_xr = Xr_template.repeat(1)
for i in range(2, valid_pts):
ref_zn = ref_path_get(
Zn_path, i - 1,
has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, 0
)
ref_zn_xr = ensure_xr(ref_zn, out_xr[0], out_is_xr[0])
dZndz_xr_path[i] = dfdz(ref_zn_xr) * dZndz_xr_path[i - 1]
# /!\ Store the "wrapped" value at last pos
ref_zn = ref_path_get(
Zn_path, i,
has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, 0
)
ref_zn_xr = ensure_xr(ref_zn, out_xr[0], out_is_xr[0])
dZndz_xr_path[ref_orbit_len] = dfdz(ref_zn_xr) * dZndz_xr_path[i]
else:
dZndz_path = np.zeros((ref_orbit_len + 1,), dtype=numba.complex128)
dZndz_xr_path = Xr_template.repeat(1) # dummy
dZndz_path[1] = 1.
for i in range(2, valid_pts):
dZndz_path[i] = dfdz(Zn_path[i - 1]) * dZndz_path[i - 1]
# /!\ Store the "wrapped" value at last pos
dZndz_path[ref_orbit_len] = dfdz(Zn_path[i]) * dZndz_path[i]
return dZndz_path, dZndz_xr_path
@numba.njit(nogil=True, cache=True)
def ref_path_get(ref_path, idx, has_xr, ref_index_xr, ref_xr, refpath_ptr,
out_is_xr, out_xr, out_index):
"""
Alternative to getitem which also takes as input prev_idx, curr_xr :
allows to optimize the look-up of Xrange values in case of successive calls
with increasing idx.
idx :
index requested
(prev_idx, curr_xr) :
couple returned from last call, last index requested + next xr target
Contract : curr_xr the smallest integer that verify :
prev_idx <= ref_index_xr[curr_xr]
or curr_xr = ref_index_xr.size (No more xr)
Returns
-------
(val, xr_val, is_xr, prev_idx, curr_xr)
val : np.complex128
Modify in place:
xr_val : complex128_Xrange_scalar -> pushed to out_xr[out_index]
is_xr : bool -> pushed to out_is_xr[out_index]
prev_idx == refpath_ptr[0] : int
curr_xr == refpath_ptr[1] : int (index in path ref_xr)
"""
if not(has_xr):
return ref_path[idx]
# Not an increasing sequence, reset to restart a new sequence
if idx < refpath_ptr[0]:
# Rewind to 0
refpath_ptr[0] = 0 # prev_idx = 0
refpath_ptr[1] = 0 # curr_xr = 0
# In increasing sequence (idx >= prev_idx)
if (
(refpath_ptr[1] >= ref_index_xr.size)
or (idx < ref_index_xr[refpath_ptr[1]])
):
refpath_ptr[0] = idx
out_is_xr[out_index] = False
return ref_path[idx]
elif idx == ref_index_xr[refpath_ptr[1]]:
refpath_ptr[0] = idx
out_is_xr[out_index] = True
out_xr[out_index] = ref_xr[refpath_ptr[1]]
return ref_path[idx]
else:
# Here we have idx > ref_index_xr[curr_xr]:
while (
(idx > ref_index_xr[refpath_ptr[1]])
and (refpath_ptr[1] < ref_index_xr.size)
):
refpath_ptr[1] += 1
if (
(refpath_ptr[1] == ref_index_xr.size)
or (idx < ref_index_xr[refpath_ptr[1]])
):
refpath_ptr[0] = idx
out_is_xr[out_index] = False
return ref_path[idx]
# Here idx == ref_index_xr[refpath_ptr[1]]
refpath_ptr[0] = idx
out_is_xr[out_index] = True
out_xr[out_index] = ref_xr[refpath_ptr[1]]
return ref_path[idx]
@numba.njit(nogil=True, cache=True)
def ref_path_get_BS(ref_path, idx, has_xr, ref_index_xr, refx_xr, refy_xr, refpath_ptr,
out_is_xr, out_xr, outx_index, outy_index):
"""
Alternative to getitem which also takes as input prev_idx, curr_xr :
allows to optimize the look-up of Xrange values in case of successive calls
with increasing idx.
idx :
index requested
refpath_ptr = (prev_idx, curr_xr) :
couple returned from last call, last index requested + next xr target
Contract : curr_xr the smallest integer that verify :
prev_idx <= ref_index_xr[curr_xr]
or curr_xr = ref_index_xr.size (No more xr)
Returns
-------
val np.complex128
satndard value
Modify in place:
xr_val : complex128_Xrange_scalar -> pushed to out_xr[out_index]
is_xr : bool -> pushed to out_is_xr[out_index]
prev_idx == refpath_ptr[0] : int
curr_xr == refpath_ptr[1] : int (index in path ref_xr)
"""
if not(has_xr):
return ref_path[idx]
# Not an increasing sequence, reset to restart a new sequence
if idx < refpath_ptr[0]:
# Rewind to 0
refpath_ptr[0] = 0 # prev_idx = 0
refpath_ptr[1] = 0 # curr_xr = 0
# In increasing sequence (idx >= prev_idx)
if (
(refpath_ptr[1] >= ref_index_xr.size)
or (idx < ref_index_xr[refpath_ptr[1]])
):
refpath_ptr[0] = idx
out_is_xr[outx_index] = False
return ref_path[idx]
elif idx == ref_index_xr[refpath_ptr[1]]:
refpath_ptr[0] = idx
out_is_xr[outx_index] = True
out_xr[outx_index] = refx_xr[refpath_ptr[1]]
out_xr[outy_index] = refy_xr[refpath_ptr[1]]
return ref_path[idx]
else:
# Here we have idx > ref_index_xr[curr_xr]:
while (
(idx > ref_index_xr[refpath_ptr[1]])
and (refpath_ptr[1] < ref_index_xr.size)
):
refpath_ptr[1] += 1
if (
(refpath_ptr[1] == ref_index_xr.size)
or (idx < ref_index_xr[refpath_ptr[1]])
):
refpath_ptr[0] = idx
out_is_xr[outx_index] = False
return ref_path[idx]
# Here idx == ref_index_xr[refpath_ptr[1]]
refpath_ptr[0] = idx
out_is_xr[outx_index] = True
out_xr[outx_index] = refx_xr[refpath_ptr[1]]
out_xr[outy_index] = refy_xr[refpath_ptr[1]]
return ref_path[idx]
# Linear projection routines
@numba.njit(cache=True, fastmath=True, nogil=True)
def lin_proj_impl(lin_mat, linscale, z):
x = z.real
y = z.imag
x1 = lin_mat[0, 0] * x + lin_mat[0, 1] * y
y1 = lin_mat[1, 0] * x + lin_mat[1, 1] * y
return linscale[0] * complex(x1, y1)
#@numba.njit(cache=True, fastmath=True, nogil=True)
#def lin_proj_impl_noscale(lin_mat, z):
# x = z.real
# y = z.imag
# x1 = lin_mat[0, 0] * x + lin_mat[0, 1] * y
# y1 = lin_mat[1, 0] * x + lin_mat[1, 1] * y
# return complex(x1, y1)
def mpc_lin_proj_impl_noscale(lin_mat, x, y):
x1 = lin_mat[0, 0] * x + lin_mat[0, 1] * y
y1 = lin_mat[1, 0] * x + lin_mat[1, 1] * y
return mpmath.mpc(x1, y1)