# -*- coding: utf-8 -*-
import os
import fnmatch
import copy
import datetime
import pickle
import logging
import textwrap
import time
import types
import typing
import enum
import inspect
import numpy as np
from numpy.lib.format import open_memmap
import PIL
import PIL.PngImagePlugin
import numba
import fractalshades as fs
import fractalshades.settings
import fractalshades.utils
import fractalshades.postproc
import fractalshades.projection
from fractalshades.mthreading import Multithreading_iterator
import fractalshades.numpy_utils.expr_parser as fs_parser
logger = logging.getLogger(__name__)
class _Pillow_figure:
def __init__(self, img, pnginfo):
"""
This class is a wrapper that can be used to redirect a Fractal_plotter
output, for instance when generating the documentation.
"""
self.img = img
self.pnginfo = pnginfo
def save_png(self, im_path):
"""
Saves as a png with Lanczos resizing filter if exceeds the max width
"""
im = self.img
width, height = im.size
max_width = fs.settings.output_context["doc_max_width"]
if width > max_width:
ratio = float(width) / float(max_width)
new_height = int(height / ratio)
im = im.resize((max_width, new_height), PIL.Image.LANCZOS)
im.save(im_path, format="png", pnginfo=self.pnginfo)
SUPERSAMPLING_DIC = {
"2x2": 2,
"3x3": 3,
"4x4": 4,
"5x5": 5,
"6x6": 6,
"7x7": 7,
"None": None
}
SUPERSAMPLING_ENUM = enum.Enum(
"SUPERSAMPLING_ENUM",
list(SUPERSAMPLING_DIC.keys()),# ,
module=__name__
)
supersampling_type = typing.Literal[SUPERSAMPLING_ENUM]
[docs]
class Fractal_plotter:
[docs]
def __init__(self,
postproc_batch,
final_render: bool = False,
supersampling: supersampling_type = "None",
jitter: bool = False,
recovery_mode: bool = False,
):
"""
The base plotting class.
A Fractal plotter is a container for
`fractalshades.postproc.Postproc_batch` and fractal layers.
Parameters
----------
postproc_batch
A single `fractalshades.postproc.Postproc_batch` or a list of
theses. They shall to the same
unique `Fractal` object.
final_render : bool
- If ``False``, this is an exploration rendering, the raw arrays
will be stored to allow fast modification of the plotting
options for an already computed zoom - without recomputing.
High-quality rendering (supersampling, jitter) is disabled in
this case.
- If ``True``, this is the final rendering, the image tiles will
be directly computed by chunks on the fly to limit disk usage,
and stored in a memory-mapped array (.postdb extension).
High quality rendering options are available only in this case
(antialising, jitter). Raw arrays are *not* stored in this case,
any change made to the plotting parametrers will need a new
calculation. However an interrupted calculation might still be
restarted, see parameter `recovery_mode`.
supersampling : None | "2x2" | "3x3" | ... | "7x7"
Used only for the final render. if not None, the final image will
leverage supersampling (from 4 to 49 pixels computed for 1 pixel in
the saved image)
jitter : bool | float
Used only for the final render. If not None, the final image will
leverage jitter, default intensity is 1. This can help reduce moiré
effect
recovery_mode : bool
Used only for the final render. If True, will attempt to reload
the image tiles already computed. Allows to restart an interrupted
calculation in final render mode (this will result in a 'patchwork'
if plotting parameters have been modified).
Notes
-----
.. warning::
When passed a list of `fractalshades.postproc.Postproc_batch`
objects, each postprocessing batch shall point to the same
unique `Fractal` object.
"""
if supersampling is None: # Nonetype not allowed in Enum...
supersampling = "None"
self.postproc_options = {
"final_render": final_render,
"supersampling": supersampling,
"jitter": jitter,
"recovery_mode": recovery_mode
}
# postproc_batches can be single or an enumeration
# At this stage it is unfrozen (more can be added),
# waiting for a register_postprocs call
postproc_batches = postproc_batch
if isinstance(postproc_batch, fs.postproc.Postproc_batch):
postproc_batches = [postproc_batch]
self._postproc_batches = postproc_batches # unfrozen
self.register_postprocs()
@property
def postnames(self):
return self.posts.keys()
@property
def size(self):
# The image size
f = self.fractal
return (f.nx, f.ny)
@property
def db_shape(self):
# The array shapes
f = self.fractal
return (f.ny, f.nx)
def add_postproc_batch(self, postproc_batch):
"""
Adds another post-processing batch & register the associated postproc
names. `postproc_batch` shall map to the same fractal.
Note : several postproc_batches are needed whenever different
calculations need to be combined in an output plot, as a postproc
batch can only map to a unique calc_name.
:meta private:
"""
if postproc_batch.fractal != self.fractal:
raise ValueError("Attempt to add a postproc_batch from a different"
"fractal: {} from {}".format(
postproc_batch.fractal, postproc_batch.calc_name))
self.postproc_batches += [postproc_batch]
self.posts.update(postproc_batch.posts)
self.postnames_2d += postproc_batch.postnames_2d
def register_postprocs(self):
"""
Register of the postprocs - call could be delayed after a plotter
instanciation but shall be before any coloring method.
"""
postproc_batches = self._postproc_batches
for i, postproc_batch in enumerate(postproc_batches):
if i == 0:
self.postproc_batches = [postproc_batch]
self.posts = copy.copy(postproc_batch.posts)
self.postnames_2d = postproc_batch.postnames_2d
# iterator :
self.fractal = postproc_batch.fractal
else:
self.add_postproc_batch(postproc_batch)
self.chunk_slices = self.fractal.chunk_slices
# layer data
self.layers = []
self.scalings = None # to be computed !
# Plotting directory
self.plot_dir = self.fractal.directory
# Mem mapping dtype
self.post_dtype = np.dtype(fs.settings.postproc_dtype)
[docs]
def add_layer(self, layer):
"""
Adds a layer field to allow subsequent graphical operations or outputs.
Parameters
----------
layer : `fractalshades.colors.layers.Virtual_layer` or a derived class
The layer to add
Notes
-----
.. warning::
Layer `postname` shall have been already registered in one of the
plotter batches (see method
`fractalshades.postproc.Postproc_batch.add_postproc`).
.. warning::
When a layer is added, a link layer -> Fractal_plotter is
created ; a layer can therefore only be added to a single
`Fractal_plotter`.
"""
postname = layer.postname
if postname not in (list(self.postnames) + self.postnames_2d):
raise ValueError(
"Layer `{}` shall be registered in Fractal_plotter "
"postproc_batches: {}".format(
postname, list(self.postnames) + self.postnames_2d
)
)
self.layers += [layer]
layer.link_plotter(self)
def __getitem__(self, layer_name):
""" Get the layer by its postname
"""
for layer in self.layers:
if layer.postname == layer_name:
return layer
raise KeyError("Layer {} not in available layers: {}".format(
layer_name, list(l.postname for l in self.layers)))
def plotter_info_str(self, mode):
if "db" in mode:
str_info = f"Output to database .{mode}: plotter options"
else:
str_info = "Plotting images: plotter options"
for k, v in self.postproc_options.items():
str_info += f"\n {k}: {v}"
str_info += ("\n /!\\ supersampling and jitter only activated "
+ "for final render")
return str_info
def zoom_info_str(self):
str_info = "Plotting images: zoom kwargs"
for k, v in self.fractal.zoom_kwargs.items():
str_info += f"\n {k}: {v}"
return str_info
[docs]
def plot(self):
"""
The base method to produce images.
When called, it will got through all the instance-registered layers
and plot each layer for which the `output` attribute is set to `True`.
"""
self._relpath = None # Use default locations for the output files
self._mode = "img"
self.process(mode="img")
def process(self, mode, postdb_layer=None, tile_validator=None):
"""
mode: "img" | "db" | "postdb"
postdb_layer: Optional Layer object, used if exporting a .postdb
tile_validator: Optional, func: tile -> bool - used fot large expmap
reference recomputing
"""
logger.info(self.plotter_info_str(mode))
logger.info(self.zoom_info_str())
assert mode in ("img", "db", "postdb")
if postdb_layer is not None:
# Convert the str to its Layer object
postdb_layer = self[postdb_layer]
# Open the image memory mappings; open PIL images
if mode == "img":
self.open_images()
if self.final_render:
for i, layer in enumerate(self.layers):
if layer.output:
self.open_postdb(layer)
# self.create_img_mmap(layer)
else:
self.open_any_db(mode, postdb_layer)
if self.final_render:
# We need to delete because if will not be recomputed in the
# calc process - so it is invalidated
self.fractal.delete_fingerprint()
self._raw_arr = dict()
self._current_tile = {
"value": getattr(self, "_tiles_stack", 0),
"time": 0. # time of last evt in seconds
}
# Here we follow a 2-step approach:
# if "dev" (= not final) render, we first compute the 'raw' arrays.
# This is done by pbatches, due to the univoque relationship
# one postproc_batches <-> Fractal calculation
# if "final" we do not store raw datas, but we still need to create a
# mmap for the "subset" arrays
f = self.fractal
for pbatch in self.postproc_batches:
calc_name = pbatch.calc_name
f.clean_postproc_attr(when="pre")
if self.postproc_options["final_render"]:
if (hasattr(f, "_subset_hook")
and calc_name in f._subset_hook.keys()):
logger.info(f"Adding subset output for {calc_name}")
f.init_subset_mmap(calc_name, self.supersampling)
else:
logger.info(f"Computing and storing raw data for {calc_name}")
f.calc_raw(calc_name, tile_validator)
# Now 2nd step approach:
# if "dev" (= not final) render, we already know the 'raw' arrays, so
# just a postprocessing step
# if "final" render, we compute on the fly
self.process_tiles(
chunk_slice=None,
mode=mode,
postdb_layer=postdb_layer,
tile_validator=tile_validator
)
if mode == "img":
self.save_images()
logger.info("Plotting images: done")
else:
logger.info(f"Saved layer database to {self.db_path}: done")
# Output data file
self.write_postproc_report()
if fs.settings.inspect_calc:
# Detailed debugging "inspect_calc" report
for pbatch in self.postproc_batches:
calc_name = pbatch.calc_name
f.write_inspect_calc(
calc_name, final=self.final_render
)
# Clean-up
for pbatch in self.postproc_batches:
if self.postproc_options["final_render"]:
if (hasattr(f, "_subset_hook")
and calc_name in f._subset_hook.keys()):
f.close_subset_mmap(calc_name, self.supersampling)
f.clean_postproc_attr(when="post")
@property
def try_recover(self):
""" Will we try to reopen saved image chunks ?"""
# Nice to have ? Refactor of the logic: we could "try recover" if:
# - we have a .postdb
# - Its associated fingerprints collection matches with the fractal
# - Its associated postproc collection file matches too
if self._mode == "img":
return (
self.postproc_options["recovery_mode"]
and self.postproc_options["final_render"]
)
else:
return self.save_db_recovery_mode
@property
def final_render(self):
""" Just an alias"""
return self.postproc_options["final_render"]
@property
def supersampling(self):
""" Will really implement antialiasing ? """
if self.postproc_options["final_render"]:
return SUPERSAMPLING_DIC[self.postproc_options["supersampling"]]
# Note: implicitely None by default...
@Multithreading_iterator(
iterable_attr="chunk_slices", iter_kwargs="chunk_slice"
)
def process_tiles(self, chunk_slice, mode, postdb_layer, tile_validator):
"""
postdb_layer: target layer in .postdb mode
"""
if tile_validator is not None:
if not(tile_validator(chunk_slice)):
return
# 0) early exit if already computed: push the image tiles
if self.try_recover and mode == "img":
f = self.fractal
rank = f.chunk_rank(chunk_slice)
_mmap_status = open_memmap(
filename=self.postdb_status_path(None), mode="r+"
)
is_valid = (_mmap_status[rank] > 0)
del _mmap_status
if is_valid:
for i, layer in enumerate(self.layers):
# We need the postproc
self.push_reloaded(
chunk_slice, layer=layer, im=self._im[i]
)
# TODO 'update scaling' may be invalid as we lost the
# data... would need a separate mmap to store min / max
f.incr_tiles_status(which="Plot tiles")
self.incr_tiles_status(chunk_slice, mode)
return
if "db" in mode:
if mode == "db":
filename = self.db_status_path(self.db_path())
elif mode == "postdb":
filename = self.postdb_status_path(
self.postdb_path(postdb_layer)
)
rank = self.fractal.chunk_rank(chunk_slice)
_db_status = open_memmap(filename=filename, mode="r+")
is_valid = (_db_status[rank] > 0)
del _db_status
if is_valid:
# Nothing to compute here, skipping this CALC
logger.debug(f"Skipping db calculation for tile {chunk_slice}")
return
# 1) Compute the raw postprocs for this field
n_pp = len(self.posts)
(ix, ixx, iy, iyy) = chunk_slice
npts = (ixx - ix) * (iyy - iy)
if self.supersampling is not None:
npts = npts * self.supersampling ** 2
arr_shp = (n_pp, npts)
raw_arr = np.empty(shape=arr_shp, dtype=self.post_dtype)
batch_first_post = 0 # index of the first post for the current batch
is_ok = 0
for batch in self.postproc_batches:
# Do the real job
is_ok += self.fill_raw_arr(
raw_arr, chunk_slice, batch, batch_first_post
)
# Just count index
batch_first_post += len(batch.posts)
if is_ok != 0:
return
self._raw_arr[chunk_slice] = raw_arr
# 2) Push each layer crop to the relevant image
for i, layer in enumerate(self.layers):
layer.update_scaling(chunk_slice)
if mode == "img":
if layer.output:
self.push_cropped(
chunk_slice, layer=layer, im=self._im[i], ilayer=i
)
elif mode == "db":
self.push_db(chunk_slice, layer=layer)
# Push the postdb layer at the end only (in case some blending occurs)
if mode == "postdb":
self.push_postdb(chunk_slice, layer=postdb_layer)
# clean-up
self.incr_tiles_status(chunk_slice, mode, postdb_layer) # mmm not really but...
del self._raw_arr[chunk_slice]
def fill_raw_arr(self, raw_arr, chunk_slice, batch, batch_first_post):
""" Compute & store temporary arrays for this postproc batch
Note : inc_postproc_rank rank shift to take into account potential
other batches for this plotter.
(memory mapping version)
"""
f = self.fractal
inc = batch_first_post
ret = f.postproc(batch, chunk_slice, self.postproc_options)
if ret is None:
# Interrupted calculation
return 1
post_array, subset = ret
arr_1d = f.reshape1d(
post_array, subset, chunk_slice, self.supersampling
)
n_posts, _ = arr_1d.shape
raw_arr[inc: (inc + n_posts), :] = arr_1d
return 0 # OK
def incr_tiles_status(self, chunk_slice, mode, postdb_layer):
"""
Update the GUI and the tile-status tracking files
"""
f = self.fractal
curr_val = self._current_tile["value"] + 1
self._current_tile["value"] = curr_val
prev_log = self._current_tile["time"]
curr_time = time.time()
time_diff = curr_time - prev_log
ntiles = f.chunks_count
bool_log = ((time_diff > 1) or (curr_val == 1) or (curr_val == ntiles))
# Marking this chunk as valid (in case of 'final render')
if self.final_render and (mode == "img"):
rank = f.chunk_rank(chunk_slice)
_mmap_status = open_memmap(
filename=self.postdb_status_path(None), mode="r+"
)
_mmap_status[rank] = 1
del _mmap_status
elif mode == "db":
rank = f.chunk_rank(chunk_slice)
_db_status = open_memmap(
filename=self.db_status_path(self.db_path()), mode="r+"
)
_db_status[rank] = 1
del _db_status
elif mode == "postdb":
rank = f.chunk_rank(chunk_slice)
_db_status = open_memmap(
filename=self.postdb_status_path(
self.postdb_path(postdb_layer)
),
mode="r+"
)
_db_status[rank] = 1
del _db_status
if bool_log:
self._current_tile["time"] = curr_time
str_val = str(curr_val)+ " / " + str(f.chunks_count)
logger.info(f"Image output: {str_val}")
def get_2d_arr(self, post_index, chunk_slice):
"""
Returns a 2d view of a chunk for the given post-processed field
Note that the PIL convention is followed for the coordinates
"""
try:
arr = self._raw_arr[chunk_slice][post_index, :]
except KeyError:
return None
(ix, ixx, iy, iyy) = chunk_slice
nx, ny = ixx - ix, iyy - iy
ssg = self.supersampling
if ssg is not None:
nx *= ssg
ny *= ssg
return np.reshape(arr, (ny, nx))
def write_postproc_report(self):
txt_report_path = self.fractal.txt_report_path
def write_layer_report(i, layer, report):
report.write(" - Layer #{} :\n".format(i))
postname = layer.postname
report.write("post-processing: `{}`\n".format(postname))
report.write("kind: {}\n".format(type(layer).__name__))
report.write("func: {}\n".format(layer._func_arg))
mask = layer.mask
if mask is None:
mask_str = "None"
else:
mask_str = "{} `{}` with mask color: {}".format(
type(mask[0]).__name__,
mask[0].postname,
mask[1])
report.write("mask: {}\n".format(mask_str))
report.write("output: {}\n".format(layer.output))
report.write("min: {}\n".format(layer.min))
report.write("max: {}\n\n".format(layer.max))
with open(txt_report_path, 'w', encoding='utf-8') as report:
for i, layer in enumerate(self.layers):
write_layer_report(i, layer, report)
logger.info(textwrap.dedent(f"""\
Plotting image - postprocessing fields info saved to:
{txt_report_path}"""
))
def image_name(self, layer):
return "{}_{}".format(type(layer).__name__, layer.postname)
def open_images(self):
""" Open
- the image files
- the associated memory mappings in case of "final render"
("""
self._im = []
for layer in self.layers:
if layer.output:
self._im += [PIL.Image.new(mode=layer.mode, size=self.size)]
if self.try_recover:
self.open_postdb(layer)
else:
self._im += [None]
if self.final_render and (not self.try_recover):
logger.info("Reloading option disabled, all image recomputed")
def save_images(self):
""" Writes the images to disk """
for i, layer in enumerate(self.layers):
if not(layer.output):
continue
file_name = self.image_name(layer)
base_img_path = os.path.join(self.plot_dir, file_name + ".png")
self.save_tagged(self._im[i], base_img_path, self.fractal.tag)
def save_tagged(self, img, img_path, tag_dict):
"""
Saves *img* to png format at *path*, tagging with *tag_dict*.
https://dev.exiv2.org/projects/exiv2/wiki/The_Metadata_in_PNG_files
"""
pnginfo = PIL.PngImagePlugin.PngInfo()
for k, v in tag_dict.items():
pnginfo.add_text(k, str(v))
if (fs.settings.output_context["doc"]
and not(fs.settings.output_context["gui_iter"] > 0)):
fs.settings.add_figure(_Pillow_figure(img, pnginfo))
else:
img.save(img_path, pnginfo=pnginfo)
logger.info(textwrap.dedent(f"""\
Image of shape {self.size} saved to:
{img_path}"""
))
def push_cropped(self, chunk_slice, layer, im, ilayer):
""" push "cropped image" from layer for this chunk to the image"""
(ix, ixx, iy, iyy) = chunk_slice
crop_slice = (ix, iy, ixx, iyy)
# Key: Calling get_2d_arr
paste_crop = layer.crop(chunk_slice)
if paste_crop is None:
return
if self.supersampling:
# Here, we should apply a resizing filter
resample = PIL.Image.LANCZOS
paste_crop = paste_crop.resize(
size=(ixx - ix, iyy - iy),
resample=resample,
box=None,
reducing_gap=None
)
# left, upper, right, and lower pixel
im.paste(paste_crop, box=crop_slice)
if self.final_render:
# NOW let's also try to save this beast
paste_crop_arr = np.asarray(paste_crop)
layer_mmap = open_memmap(
filename=self.postdb_path(layer),
mode="r+"
)
if layer_mmap.shape[2] == 1:
# For a 1-channel image, PIL will remove the last dim...
layer_mmap[iy: iyy, ix: ixx, 0] = paste_crop_arr
else:
layer_mmap[iy: iyy, ix: ixx, :] = paste_crop_arr
del layer_mmap
def push_reloaded(self, chunk_slice, layer, im):
""" Copy the already computed pixels and paste them in the image"""
if im is None:
return
(ix, ixx, iy, iyy) = chunk_slice
crop_slice = (ix, iy, ixx, iyy)
layer_mmap = open_memmap(
filename=self.postdb_path(layer),
mode="r+"
)
crop_arr = layer_mmap[iy: iyy, ix: ixx, :]
# If crop_arr has only 1 channel, like grey or bool, Pillow won't
# handle it...
if crop_arr.shape[2] <= 1:
crop_arr = np.squeeze(crop_arr, axis=2)
paste_crop = PIL.Image.fromarray(crop_arr)
im.paste(paste_crop, box=crop_slice)
del layer_mmap
#==============================================================================
# Memory-mapping related operations
#==============================================================================
@property
def relpath(self):
""" Generic relative path for the db, might be set by save_db
defaults to None """
try:
return self._relpath
except AttributeError:
return None
# def image_name(self, layer, relpath=None):
def db_path(self):
""" Absolute path of the file storing layers data (as float)
"""
head = ""
tail = "layers"
relpath = self.relpath
if relpath is not None:
(head, tail) = os.path.split(relpath)
tail, user_ext = os.path.splitext(tail)
if user_ext != ".db":
raise ValueError(
f"Expecting a .db extension, given: {user_ext}"
)
fname = f"{tail}.db"
return os.path.normpath(
os.path.join(self.fractal.directory, head, fname)
)
# def image_name(self, layer, relpath=None):
def postdb_path(self, layer):
""" Absolute path for the file storing layer image data (as rgb array)
layer: Layer instance (not the postname)
"""
# Final render mmap backup files default to ./data directory
head = "data" if self._mode == "img" else ""
tail = ""
relpath = self.relpath
if relpath is not None:
(head, tail) = os.path.split(relpath)
tail, user_ext = os.path.splitext(tail)
if user_ext != ".postdb":
raise ValueError(
f"Expecting a .postdb extension, given: {user_ext}"
)
fname = "{}_{}_{}.postdb".format(
tail,
type(layer).__name__,
layer.postname # i.e.: layer_obj.postname
)
return os.path.normpath(
os.path.join(self.fractal.directory, head, fname)
)
[docs]
def save_db(self, relpath=None, postdb_layer=None, recovery_mode=False):
"""
Saves the post-processed data in a numpy structured memmap.
Goes through all the registered layers and stores the results in a
(nposts, ny, nx) memory mapping - with PIL-compliant coordinate
indexing order.
In case of supersampling all data points are stored (Downsampling
filtering is delayed to the coloring stage), unless the `postdb` option
is activated.
A companion text file <relpath>.info is also written: it provides a
short description of the data structure.
Parameters
----------
relpath : Optional, str
path relative to self.fractal.directory. If not provided, defaults
to either:
- :code:`"layers.db"` (when parameter`postdb_layer` is not
provided, and all layers are saved as float)
- :code:`f"{postdb_layer}.postdb"` (when parameter
`postdb_layer` is provided, and this layer is saved as rgb)
postdb_layer : Optional, str
If provided, instead of saving all the layers, saves the layer with
this name as a rgb array (ny, nx, nchannels) memory mapping, with
extension \*.postdb. This offers less flexibility (post processing
is 'frozen') but optimises performances & disk use - esp. in case
of supersampling as L2 downsampling filter can be applied before
storing the image data.
recovery_mode : bool
If True, will attempt to reload the .db / .postdb tiles already
computed. Allows to restart an interrupted calculation (this will
result in a 'patchwork' if plotting parameters have been modified).
Notes
-----
The file extension is either :
- .db (denoting the float values of fields are saved)
- .postdb (denoting the rgb arrays are saved)
"""
self._relpath = relpath
self._mode = mode = "postdb" if postdb_layer else "db"
if postdb_layer:
any_db_path = self.postdb_path(self[postdb_layer])
else:
any_db_path = self.db_path()
self.save_db_recovery_mode = recovery_mode
proj = self.fractal.projection
if isinstance(proj, fractalshades.projection.Expmap):
self.save_expdb_by_steps(postdb_layer)
else:
self.process(mode=mode, postdb_layer=postdb_layer)
# Writes a short description of the db
info_path = any_db_path + ".info"
with open(info_path, 'w+') as info_file:
info_file.write("Db file description\n")
now = datetime.datetime.now()
info_file.write(f"written time: {now}\n")
info_file.write("recovery_mode: {recovery_mode}\n\n")
info_file.write("*array description*\n")
info_file.write(f" dtype: {self.post_dtype}\n")
shape = (len(self.postnames),) + self.db_shape
info_file.write(f" shape: {shape}\n")
ss = self.supersampling
info_file.write(f" supersampling: {ss}\n")
info_file.write(f" postdb_layer: {postdb_layer}\n\n")
info_file.write("*fields description*\n")
for pn in self.postnames:
info_file.write(f" {pn}\n")
return any_db_path
def save_expdb_by_steps(self, postdb_layer):
""" Specialised flow for large exp mappings using steps in zoom
mode is "db" or "postdb"
For an exponential zoom covering a very large range, it is more
efficient to evalutate the bilinear validity radius at successive
depths. This setting tells the program to recompute the billinear
validity radius for a tile whose ending pixels differs from the
last reference from more than `exp_zoom_step` (in the `h`
direction). Only valid with a
`fractalshades.projection.Expmap` projection and for perturbation
fractals.
"""
f = self.fractal
proj = f.projection
orientation = proj.orientation
exp_zoom_step = proj.nt(f)
hmin = proj.hmin
hmax = proj.hmax
nh = proj.nh(f)
stp = exp_zoom_step
chunk_size = fs.settings.chunk_size
for r in range(0, nh + 1, stp):
# Need to trigger a recalculation of BLA validity radius
# we will call reset_bla_tree and modify in place cycle_indep_args
i_max = min(r + stp, nh)
i_min = max(r - chunk_size, 0)
exp_step_hmax = (hmax * i_max + hmin * (nh - i_max)) / nh
exp_step_hmin = (hmax * i_min + hmin * (nh - i_min)) / nh
proj.set_exp_zoom_step(exp_step_hmax, exp_step_hmin)
self.reset_bla_tree()
def validates_h(chunk_slice):
""" Escapes the tiles not matching the target h range"""
(_, ixx, _, _) = chunk_slice
ret = r < ixx <= (r + stp)
return ret
def validates_v(chunk_slice):
""" Escapes the tiles not matching the target h range"""
(_, _, _, iyy) = chunk_slice
ret = r < iyy <= (r + stp)
return ret
validates = (validates_h if orientation == "horizontal"
else validates_v)
# manage the self._current_tile["value"] increment
try:
self._tiles_stack = self._current_tile["value"]
except AttributeError:
self._tiles_stack = 0
self.process(
mode=self._mode,
postdb_layer=postdb_layer,
tile_validator=validates
)
proj.del_exp_zoom_step()
def reset_bla_tree(self):
""" Reset the BLA tree used for calculation taking into account
the projection modifications
"""
f = self.fractal
for calc_name, data in f._calc_data.items():
cycle_indep_args = data["cycle_indep_args"]
data["cycle_indep_args"] = f.reset_bla_tree(cycle_indep_args)
def open_any_db(self, mode, postdb_layer):
""" Open a .db or .postdb according to mode, managing
- the database memory mappings
- its associated "progress tracking" [post]db_status
Note that the .db size is inflated in case of supersampling, all data
is stored - but not the .postdb size which is stored after
image post-processing
mode: "db" | "postdb"
postdb_layer: Layer instance (the object, not the str postname)
"""
if mode == "db":
assert postdb_layer is None
self.open_db()
elif mode =="postdb":
self.open_postdb(postdb_layer)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Full mmap of the fields: .db
def open_db(self):
""" Specialized method for a .db memmap
"""
db_path = self.db_path()
self.open_db_status(db_path)
# The db shape
ssg = self.supersampling
ss_shape = self.db_shape
db_field_count = len(self.postnames) # Accounting for 2-fields layers
if ssg is not None:
ss_shape = tuple(ssg * x for x in ss_shape)
expected_shape = (db_field_count,) + ss_shape
try:
if not(self.try_recover):
raise ValueError(
"Invalidated db, as `recovery_mode` is set to False"
)
_mmap_db = open_memmap(
filename=db_path, mode='r'
)
# Checking that the size matches...
valid = (expected_shape == _mmap_db.shape)
if not(valid):
raise ValueError("Invalid db")
_db_status = open_memmap(
filename=self.db_status_path(db_path),
mode="r+"
)
valid_chunks = np.count_nonzero(_db_status)
del _db_status
del _mmap_db
n = self.fractal.chunks_count
logger.info(
"Attempt to restart interrupted calculation,\n"
f" Valid database tiles found: {valid_chunks} / {n}"
)
return
except (FileNotFoundError, ValueError) as e:
logger.info(
f"Invalidated db, recomputing full db:\n {e}"
)
# Creates the new datatase == structured memory mapping
fs.utils.mkdir_p(os.path.dirname(db_path))
_mmap_db = open_memmap(
filename=db_path,
mode='w+',
dtype=self.post_dtype,
shape=expected_shape,
fortran_order=False,
version=None
)
# Here as we didnt find information for this layer, sadly the whole
# memory mapping is invalidated
_db_status = open_memmap(
filename=self.db_status_path(db_path), mode="r+"
)
_db_status[:] = 0
del _mmap_db
def db_status_path(self, db_path):
""" returns the db_status_path if db_path is provided"""
root, ext = os.path.splitext(db_path)
return root + "_status" + ext
def open_db_status(self, db_path):
""" Small mmap array to flag the validated db tiles
"""
n_chunk = self.fractal.chunks_count
file_path = self.db_status_path(db_path)
fs.utils.mkdir_p(os.path.dirname(file_path))
try:
# Does layer the mmap already exists, and does it seems to suit
# our need ?
if not(self.try_recover):
raise ValueError(
"Invalidated db_status: `recovery_mode` is set to False"
)
_db_status = open_memmap(
filename=file_path, mode="r+"
)
if (_db_status.shape != (n_chunk,)):
raise ValueError("Incompatible shapes for plotter mmap_status")
except (FileNotFoundError, ValueError) as e:
# Lets create it from scratch
logger.debug(
f"No valid db status file found - recompute db:\n {e}"
)
_db_status = open_memmap(
filename=file_path,
mode='w+',
dtype=np.int32,
shape=(n_chunk,),
fortran_order=False,
version=None
)
_db_status[:] = 0
del _db_status
def push_db(self, chunk_slice, layer):
""" push "postprocessed data" (from the layer's postproc field)
to the db memory mapping
Note: Lanczos downsampling is delayed to the image making stage,
size might be inflated if "final render + supersampling
"""
(ix, ixx, iy, iyy) = chunk_slice
field_count, post_index = layer.get_postproc_index()
db_crop = layer.db_crop(chunk_slice)
s = self.supersampling
if s:
# Here, we inflate alls dims by s
ix *= s
ixx *= s
iy *= s
iyy *= s
db_mmap = open_memmap(filename=self.db_path(), mode='r+')
if field_count == 1:
db_mmap[post_index, iy:iyy, ix:ixx] = db_crop
elif field_count == 2:
db_mmap[post_index[0], iy:iyy, ix:ixx] = db_crop[0, :, :]
db_mmap[post_index[1], iy:iyy, ix:ixx] = db_crop[1, :, :]
del db_mmap
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
## mmap of the image array: .postdb
def open_postdb(self, layer):
""" Specialized method for a .postdb memmap
It need the layer name because we reuse the same codepath
for different layer (final render restart data)
note: layer is layer obj
"""
postdb_path = self.postdb_path(layer)
layer_name = layer.postname
self.open_postdb_status(postdb_path)
mode = layer.mode
dtype = fs.colors.layers.Virtual_layer.DTYPE_FROM_MODE[mode]
channel = fs.colors.layers.Virtual_layer.N_CHANNEL_FROM_MODE[mode]
expected_shape = self.db_shape + (channel,)
# Does the mmap already exists, and does it seems to suit our need ?
try:
# Does layer the mmap already exists, and does it seems to suit
# our need ?
if not(self.try_recover): #self.postproc_options["recovery_mode"]):
raise ValueError(
"Invalidated postdb: `recovery_mode` is set to False"
)
_mmap_postdb = open_memmap(
filename=postdb_path, mode="r+"
)
if _mmap_postdb.shape != expected_shape:
del _mmap_postdb
raise ValueError("Incompatible shapes for mmap, recomputing")
if _mmap_postdb.dtype != dtype:
del _mmap_postdb
raise ValueError("Incompatible dtype for mmap, recomputing")
_db_status = open_memmap(
filename=self.postdb_status_path(postdb_path), mode="r+"
)
valid_chunks = np.count_nonzero(_db_status)
del _db_status
del _mmap_postdb
n = self.fractal.chunks_count
logger.info(
"Attempt to restart interrupted calculation,\n"
f" Valid postdb tiles found: {valid_chunks} / {n}"
)
return
except (FileNotFoundError, ValueError) as e:
logger.info(
f"No valid .postdb found for layer {layer_name}:\n {e}"
)
# Create a new one...
mmap = open_memmap(
filename=postdb_path,
mode='w+',
dtype=np.dtype(dtype),
shape=expected_shape,
fortran_order=False,
version=None
)
# Here as we didnt find information for this layer, sadly the whole
# memory mapping is invalidated
_db_status = open_memmap(
filename=self.postdb_status_path(postdb_path), mode="r+"
)
_db_status[:] = 0
del _db_status
del mmap
def postdb_status_path(self, postdb_path):
""" Return the postdb_status_path if db_path is provided """
if self._mode == "img":
# Default a unique status "guard" for a image layer set
postdb_path = os.path.join(
self.fractal.directory, "data", "final_render.postdb"
)
return self.db_status_path(postdb_path)
def open_postdb_status(self, postdb_path):
""" Open the postdb_status mmap if db_path is provided """
if self._mode == "img":
# Default a unique status "guard" for a image layer set
postdb_path = os.path.join(
self.fractal.directory, "data", "final_render.postdb"
)
self.open_db_status(postdb_path)
def push_postdb(self, chunk_slice, layer):
""" Push the image pixels to the back-up mmap (/!\Follows PIL block
order) """
(ix, ixx, iy, iyy) = chunk_slice
paste_crop = layer.crop(chunk_slice)
assert paste_crop is not None
postdb_mmap = open_memmap(
filename=self.postdb_path(layer),
mode="r+"
)
s = self.supersampling
if s:
resample = PIL.Image.LANCZOS
paste_crop = paste_crop.resize(
size=(ixx - ix, iyy - iy), # Note: Pillow order here
resample=resample,
box=None,
reducing_gap=None
)
postdb_mmap[iy:iyy, ix:ixx, :] = np.asarray(paste_crop)
del postdb_mmap
class _Null_status_wget:
def __init__(self, fractal):
""" Internal class
Emulates a status bar wget in case we do not use the GUI
"""
status = {}
status.update(fractal.new_status(self))
self._status = status
def update_status(self, key, str_val):
""" Update the text status
"""
self._status[key]["str_val"] = str_val
[docs]
class Fractal:
REPORT_ITEMS = [
"chunk1d_begin",
"chunk1d_end",
"chunk_pts",
"done",
]
SAVE_ARRS = [
"Z",
"U",
"stop_reason",
"stop_iter"
]
USER_INTERRUPTED = 1 # return code for a user-interrupted computation
[docs]
def __init__(self, directory: str):
"""
The base class for all escape-time fractals calculations.
Derived class should implement the actual calculation methods used in the
innner loop. This class provides the outer looping (calculation is run on
successive tiles), enables multiprocessing, and manage raw-result storing
and retrieving.
Parameters
----------
directory : str
Path for the working base directory
Notes
-----
These notes describe implementation details and should be useful mostly to
advanced users when subclassing.
.. note::
**Special methods**
this class and its subclasses may define several methods decorated with
specific tags:
`fractalshades.zoom_options`
decorates the method used to define the zooming
`fractalshades.calc_options`
decorates the methods defining the calculation inner-loop
`fractalshades.interactive_options`
decorates the methods that can be called
interactively from the GUI (right-click then context menu selection).
The coordinates of the click are passed to the called method.
.. note::
**Saved data**
The calculation raw results (raw output of the inner loop at exit) are
saved to disk and internally accessed during plotting phase through
memory-mapping. They are however not saved for a final render.
These arrays are:
subset
boolean
Saved to disk as ``calc_name``\_Z.arr in ``data`` folder
Z
Complex fields, several fields can be defined and accessed through
a field string identifier.
Saved to disk as ``calc_name``\_Z.arr in ``data`` folder
U
Integer fields, several fields can be defined and accessed through
a field string identifier.
Saved to disk as ``calc_name``\_U.arr in ``data`` folder
stop_reason
Byte codes: the reasons for loop exit (max iteration reached ?
overflow ? other ?)
Saved to disk as ``calc_name_stop_reason.arr`` in ``data`` folder
stop_iter
Integer: iterations count at loop exit
Saved to disk as ``calc_name``\_stop_iter.arr in ``data`` folder
The string identifiers are stored in ``codes`` attributes.
"""
self.directory = directory
self.subset = None
self._interrupted = np.array([0], dtype=np.bool_)
# datatypes used for raw data storing
self.float_postproc_type = np.dtype(fs.settings.postproc_dtype)
self.termination_type = np.int8
self.int_type = np.int32
# Default listener
self._status_wget = _Null_status_wget(self)
@property
def init_kwargs(self):
""" Return a dict of parameters used during __init__ call"""
init_kwargs = {}
for (
p_name, param
) in inspect.signature(self.__init__).parameters.items():
# By contract, __init__ params shall be stored as instance attrib.
# ('Fractal' interface)
init_kwargs[p_name] = getattr(self, p_name)
return init_kwargs
def init_signature(self):
return inspect.signature(self.__init__)
def __reduce__(self):
""" Serialisation of a Fractal object. The fractal state (zoom, calc)
is dropped and shall be re-instated externally if need be.
"""
vals = tuple(self.init_kwargs.values())
return (self.__class__, vals)
def script_repr(self, indent):
"""String used to serialize this instance in GUI-generated scripts
"""
# Mainly a call to __init__ with the directory tuned to be the
# local variable `plot_dir`
fullname = fs.utils.Code_writer.fullname(self.__class__)
kwargs = self.init_kwargs
kwargs["directory"] = fs.utils.Rawcode("plot_dir") # get rid of quotes
kwargs_code = fs.utils.Code_writer.func_args(
kwargs, indent + 1, False
)
shift = " " * 4 * indent
str_call_init = f"{fullname}(\n{kwargs_code}{shift})"
return str_call_init
[docs]
@fs.utils.zoom_options
def zoom(self, *,
x: float = 0.,
y: float = 0.,
dx: float = 8.,
nx: int = 800,
xy_ratio: float = 1.,
theta_deg: float = 0.,
projection: fs.projection.Projection=fs.projection.Cartesian(),
has_skew: bool = False,
skew_00:float = 1.0,
skew_01: float = 0.0,
skew_10: float = 0.0,
skew_11: float = 1.0
):
"""
Define and stores as class-attributes the zoom parameters for the next
calculation.
Parameters
----------
x : float
x-coordinate of the central point
y : float
y-coordinate of the central point
dx : float
span of the view rectangle along the x-axis
nx : int
number of pixels of the image along the x-axis
xy_ratio: float
ratio of dx / dy and nx / ny
theta_deg : float
Pre-rotation of the calculation domain, in degree
projection : `fractalshades.projection.Projection`
Kind of projection used (default to
`fractalshades.projection.Cartesian`)
has_skew : bool
If True, unskew the view base on skew coefficients skew_ij
skew_ij : float
Components of the local skw matrix, ij = 00, 01, 10, 11
"""
# Safeguard in case the GUI inputs were strings
if isinstance(x, str) or isinstance(y, str) or isinstance(dx, str):
raise RuntimeError("Float expected for x, y, dx")
# Backward compatibility: also accept projection = "cartesian"
if isinstance(projection, str):
logger.warning(textwrap.dedent(
"""\
Use of str for projection is deprecated, and might be
removed. Please use `fractalshades.projection.Projection`
subclasses instead. Defaulting to
`fractalshades.projection.Cartesian()`"""
))
self.projection = projection = fs.projection.Cartesian()
# Stores the skew matrix
self._skew = None
if has_skew:
self._skew = np.array(
((skew_00, skew_01), (skew_10, skew_11)), dtype=np.float64
)
self.lin_mat = self.get_lin_mat()
projection.adjust_to_zoom(self)
self.proj_impl = projection.f
def get_lin_mat(self):
""" Returns a numba-jitted function which apply the linear part of the
transformation (rotation, skew, scale)
"""
theta = self.theta_deg / 180. * np.pi
skew = self._skew
# Defines the linear matrix
c = np.cos(theta)
s = np.sin(theta)
lin_mat = np.array(((c, -s), (s, c)), dtype=np.float64)
if skew is not None:
lin_mat = np.matmul(skew, lin_mat)
return lin_mat
def new_status(self, wget):
""" Return a dictionnary that can hold the current progress status """
self._status_wget = wget
status = {
"Calc tiles": {
"val": 0,
"str_val": "- / -",
"last_log": 0. # time of last logged change in seconds
},
"Plot tiles": {
"val": 0,
"str_val": "- / -",
"last_log": 0. # time of last logged change in seconds
}
}
return status
def set_status(self, key, str_val, bool_log=True):
""" Just a simple text status """
self._status_wget.update_status(key, str_val)
if bool_log:
logger.debug(f"Status update {key}: {str_val}")
def incr_tiles_status(self, which="Calc tiles"):
""" Dealing with more complex status :
Increase by 1 the number of computed tiles reported in status bar
"""
dic = self._status_wget._status
curr_val = dic[which]["val"] + 1
dic[which]["val"] = curr_val
prev_event = dic[which]["last_log"]
curr_time = time.time()
time_diff = curr_time - prev_event
ntiles = self.chunks_count
bool_log = ((time_diff > 1) or (curr_val == 1) or (curr_val == ntiles))
if bool_log:
dic[which]["last_log"] = curr_time
str_val = str(dic[which]["val"]) + " / " + str(ntiles)
self.set_status(which, str_val, bool_log)
@property
def ny(self):
return int(self.nx / self.xy_ratio + 0.5)
@property
def dy(self):
return self.dx / self.xy_ratio
@property
def px(self):
if not(hasattr(self, "_px")): # is None:
self._px = self.dx / self.nx
return self._px
@property
def skew(self):
if not(hasattr(self, "_skew")): # is None:
self._skew = None
return self._skew
@property
def multiprocess_dir(self):
""" Directory used for multiprocess stdout stderr streams redirection
:meta private:
"""
return os.path.join(self.directory, "multiproc_calc")
@property
def float_type(self):
select = {
np.dtype(np.float64): np.float64,
np.dtype(np.complex128): np.float64
}
return select[np.dtype(self.complex_type)]
[docs]
def clean_up(self, calc_name=None):
"""
Deletes all saved data files associated with a given ``calc_name``.
Parameters
----------
calc_name : str | None
The string identifying the calculation run for which we want to
delete the files.
If None, delete all calculation files
"""
if calc_name is None:
calc_name = "*"
patterns = (
calc_name + "_*.arr",
calc_name + ".report",
calc_name + ".fingerprint",
"*.postdb", # .postdb files are tagged by the output layer name
"ref_pt.dat",
"SA.dat"
)
data_dir = os.path.join(self.directory, "data")
if not os.path.isdir(data_dir):
logger.warning(textwrap.dedent(f"""\
Clean-up cancelled, directory not found:
{data_dir}"""
))
return
logger.info(textwrap.dedent(f"""\
Cleaning data directory:
{data_dir}"""
))
for pattern in patterns:
with os.scandir(data_dir) as it:
for entry in it:
if (fnmatch.fnmatch(entry.name, pattern)):
os.unlink(entry.path)
logger.debug(f"File deleted: {entry.name}")
# Delete also the temporay attributes
temp_attrs = ("_FP_params", "_Z_path", "_SA_data")
for temp_attr in temp_attrs:
if hasattr(self, temp_attr):
delattr(self, temp_attr)
def delete_fingerprint(self):
""" Deletes the fingerprint files """
patterns = (
"*.fingerprint",
)
data_dir = os.path.join(self.directory, "data")
if not os.path.isdir(data_dir):
return
for pattern in patterns:
with os.scandir(data_dir) as it:
for entry in it:
if (fnmatch.fnmatch(entry.name, pattern)):
os.unlink(entry.path)
logger.debug(f"File deleted: {entry.name}")
def clean_postproc_attr(self, when="pre"):
# Deletes the postproc-related temporary attributes, thus forcing
# a recompute of invalidated data
if when == "pre":
postproc_attrs = ("_uncompressed_beg_end",)
elif when == "post":
postproc_attrs = ("_subset_hook",)
for temp_attr in postproc_attrs:
if hasattr(self, temp_attr):
delattr(self, temp_attr)
# The various method associated with chunk mgmt ===========================
def chunk_slices(self): #, chunk_size=None):
"""
Generator function
Yields the chunks spans (ix, ixx, iy, iyy)
with each chunk of size chunk_size x chunk_size
"""
# if chunk_size is None:
chunk_size = fs.settings.chunk_size
for ix in range(0, self.nx, chunk_size):
ixx = min(ix + chunk_size, self.nx)
for iy in range(0, self.ny, chunk_size):
iyy = min(iy + chunk_size, self.ny)
yield (ix, ixx, iy, iyy)
@property
def chunks_count(self):
"""
Return the total number of chunks (tiles) for the current image
"""
chunk_size = fs.settings.chunk_size
(cx, r) = divmod(self.nx, chunk_size)
if r != 0:
cx += 1
(cy, r) = divmod(self.ny, chunk_size)
if r != 0:
cy += 1
return cx * cy
def chunk_rank(self, chunk_slice):
"""
Return the generator yield index for chunk_slice
"""
chunk_size = fs.settings.chunk_size
(ix, _, iy, _) = chunk_slice
chunk_item_x = ix // chunk_size
chunk_item_y = iy // chunk_size
(cy, r) = divmod(self.ny, chunk_size)
if r != 0:
cy += 1
return chunk_item_x * cy + chunk_item_y
def chunk_from_rank(self, rank):
"""
Return the chunk_slice from the generator yield index
"""
chunk_size = fs.settings.chunk_size
(cy, r) = divmod(self.ny, chunk_size)
if r != 0: cy += 1
chunk_item_x, chunk_item_y = divmod(rank, cy)
ix = chunk_item_x * chunk_size
iy = chunk_item_y * chunk_size
ixx = min(ix + chunk_size, self.nx)
iyy = min(iy + chunk_size, self.ny)
return (ix, ixx, iy, iyy)
def uncompressed_beg_end(self, rank):
""" Return the span for the boolean mask index for the chunk index
`rank` """
# Note: indep of the mask.
valid = False
if hasattr(self, "_uncompressed_beg_end"):
arr, in_nx, in_ny, in_csize = self._uncompressed_beg_end
valid = (
self.nx == in_nx,
self.ny == in_ny,
fs.settings.chunk_size == in_csize
)
if not valid:
arr = self.recompute_uncompressed_beg_end(rank)
return arr[rank], arr[rank + 1]
def recompute_uncompressed_beg_end(self, rank):
""" A bit brutal but does the job
"""
arr = np.empty((self.chunks_count + 1,), dtype=np.int32)
arr[0] = 0
for i, chunk_slice in enumerate(self.chunk_slices()):
(ix, ixx, iy, iyy) = chunk_slice
arr[i + 1] = arr[i] + (ixx - ix) * (iyy - iy)
self._uncompressed_beg_end = (
arr, self.nx, self.ny, fs.settings.chunk_size
)
return arr
def compressed_beg_end(self, calc_name, rank):
""" Return the span for a stored array
"""
# Note : depend of the mask... hence the calc_name
mmap = self.get_report_memmap(calc_name)
items = self.REPORT_ITEMS
beg = mmap[rank, items.index("chunk1d_begin")]
end = mmap[rank, items.index("chunk1d_end")]
del mmap
return beg, end
def pts_count(self, calc_name, chunk_slice=None):
""" Return the number of compressed 1d points for the current
calculation, and the chunk_slice
(if chunk_slice is None: return the summ for all chunks)
"""
state = self._calc_data[calc_name]["state"]
subset = state.subset
if chunk_slice is not None:
(ix, ixx, iy, iyy) = chunk_slice
if subset is not None:
if chunk_slice is None:
return np.count_nonzero(self.subset[None])
else:
subset_pts = np.count_nonzero(subset[chunk_slice])
return subset_pts
else:
if chunk_slice is None:
return self.nx * self.ny
else:
return (ixx - ix) * (iyy - iy)
def chunk_pixel_pos(self, chunk_slice, jitter, supersampling):
"""
Return the image pixels vector distance to center in fraction of image
width, as a complex
pix = center + (pix_frac_x * dx, pix_frac_x * dy)
"""
data_type = self.float_type
(nx, ny) = (self.nx, self.ny)
(ix, ixx, iy, iyy) = chunk_slice
kx = 0.5 / (nx - 1) # interval width
ky = 0.5 / (ny - 1) # interval width
if supersampling is None:
x_1d = np.linspace(
kx * (2 * ix - nx + 1),
kx * (2 * ixx - nx - 1),
num=(ixx - ix),
dtype=data_type
)
y_1d = np.linspace(
ky * (2 * iy - ny + 1),
ky * (2 * iyy - ny - 1),
num=(iyy - iy),
dtype=data_type
)
else:
ssg = supersampling
ssg_gap = (ssg - 1.) / ssg
x_1d = np.linspace(
kx * (2 * ix - nx + 1 - ssg_gap),
kx * (2 * ixx - nx - 1 + ssg_gap),
num = (ixx - ix) * ssg,
dtype=data_type
)
y_1d = np.linspace(
ky * (2 * iy - ny + 1 - ssg_gap),
ky * (2 * iyy - ny - 1 + ssg_gap),
num = (iyy - iy) * ssg,
dtype=data_type
)
dx_screen, dy_screen = np.meshgrid(x_1d, -y_1d, indexing='xy')
if jitter:
rg = np.random.default_rng(0)
rand_x = rg.random(dx_screen.shape, dtype=data_type)
rand_y = rg.random(dy_screen.shape, dtype=data_type)
k = 0.7071067811865476
jitter_x = (0.5 - rand_x) * k / (nx - 1) * jitter
jitter_y = (0.5 - rand_y) * k / (ny - 1) * jitter
if supersampling is not None:
jitter_x /= supersampling
jitter_y /= supersampling
dx_screen += jitter_x
dy_screen += jitter_y
dy_screen /= self.xy_ratio
res = dx_screen + 1j * dy_screen
return res
@property
def tag(self):
""" Used to tag an output image
"""
tag = {
"Software": "fractalshades " + fs.__version__,
"datetime": datetime.datetime.today().strftime('%Y-%m-%d_%H:%M:%S')
}
# Adding detailed data per calculation
for calc_name in self._calc_data.keys():
state = self._calc_data[calc_name]["state"]
tag[calc_name] = state.fingerprint
# Adding the reference zoom parameters for GUI navigation
tag.update(self.zoom_kwargs)
return fs.utils.dic_flatten(tag)
def fingerprint_matching(self, calc_name, test_fingerprint, log=False):
"""
Test if the stored parameters match those of new calculation
/!\ modified in subclass
"""
flatten_fp = fs.utils.dic_flatten(test_fingerprint)
state = self._calc_data[calc_name]["state"]
expected_fp = fs.utils.dic_flatten(state.fingerprint)
if log:
logger.debug(f"fingerprint_matching flatten_fp:\n {flatten_fp}")
logger.debug(f"fingerprint_matching expected_fp:\n {expected_fp}")
# We currently do not have special keys to handle
# (Note: The software version and calculation info are now only added
# to the tagged image)
SPECIAL = []
for key, val in expected_fp.items():
if (key in SPECIAL):
continue
else:
if flatten_fp[key] != val:
if log:
logger.debug(textwrap.dedent(f"""\
Parameter mismatch ; will trigger a recalculation
{key}, {flatten_fp[key]} --> {val}"""
))
return False
return True
def res_available(self, calc_name, chunk_slice=None):
"""
If chunk_slice is None, check that stored calculation fingerprint
matches.
If chunk_slice is provided, checks that calculation results are
available
"""
try:
fingerprint = self.reload_fingerprint(calc_name)
except IOError:
logger.debug(textwrap.dedent(f"""\
No fingerprint file found for {calc_name};
will trigger a recalculation"""
))
return False
log = (chunk_slice is None)
matching = self.fingerprint_matching(calc_name, fingerprint, log)
if log:
logger.debug(
f"Fingerprint match for {chunk_slice}: {matching}"
)
if not(matching):
return False
if chunk_slice is None:
return matching # should be True
try:
report = self.reload_report(calc_name, chunk_slice)
except IOError:
return False
return report["done"] > 0
def calc_hook(self, calc_callable, calc_kwargs, return_dic):
"""
Called by a calculation wrapper (ref. utils.py)
Prepares & stores the data needed for future calculation of tiles
/!\ If error here check that the Fractal calculation implementation
returns the expected dict of functions:
{
"set_state": set_state,
"initialize": initialize,
"iterate": iterate
}
"""
calc_name = calc_kwargs["calc_name"]
# Storage for future calculation
if not hasattr(self, "_calc_data"):
self._calc_data = dict()
# Setting internal state - needed to unwrap other data
# /!\ using in also a separate namespace, thread-safe option
state = types.SimpleNamespace()
for k, v in calc_kwargs.items():
setattr(state, k, v)
setattr(self, k, v)
set_state = return_dic["set_state"]()
set_state(state)
set_state(self)
initialize = return_dic["initialize"]()
iterate = return_dic["iterate"]()
cycle_indep_args = self.get_cycle_indep_args(initialize, iterate)
saved_codes = self.saved_codes(state.codes)
# stores the data for later use
self._calc_data[calc_name] = {
"calc_class": type(self).__name__,
"calc_callable": calc_callable,
"calc_kwargs": calc_kwargs,
"zoom_kwargs": self.zoom_kwargs,
"state": state,
"cycle_indep_args": cycle_indep_args,
"saved_codes": saved_codes,
"init_kwargs": self.init_kwargs
}
# Takes a 'fingerprint' of the calculation parameters
fp_items = (
"calc_class", "calc_callable", "calc_kwargs", "zoom_kwargs",
"init_kwargs"
)
state.fingerprint = {
k: self._calc_data[calc_name][k] for k in fp_items
}
# Adding a subset hook for future 'on the fly' computation
if "subset" in calc_kwargs.keys():
subset = calc_kwargs["subset"]
if subset is not None:
self.add_subset_hook(subset, calc_name)
# We cannot open new memaps at this early stage : because
# subset may still be unknown. But, we shall track it.
if self.res_available(calc_name):
# There *should* be mmaps available but lets double-check this
try:
mmap = self.get_report_memmap(calc_name, mode="r+")
del mmap
for key in self.SAVE_ARRS:
mmap = self.get_data_memmap(calc_name, key, mode="r+")
del mmap
if subset is not None:
mmap = self.get_data_memmap(calc_name, "subset", mode="r+")
del mmap
self._calc_data[calc_name]["need_new_mmap"] = False
logger.info(
f"Found suitable raw results files set for {calc_name}\n"
" -> only the missing tiles will be recomputed"
)
except FileNotFoundError as e:
self._calc_data[calc_name]["need_new_mmap"] = True
logger.info(
f"Raw results files set for {calc_name} is incomplete\n"
f" -> all tiles will be recomputed: \n {e}"
)
else:
logger.info(
f"Missing up-to-date result file available for {calc_name}\n"
" -> all tiles will be recomputed"
)
self._calc_data[calc_name]["need_new_mmap"] = True
self.save_fingerprint(calc_name, state.fingerprint)
def raise_interruption(self):
self._interrupted[0] = True
def lower_interruption(self):
self._interrupted[0] = False
def is_interrupted(self):
""" Either programmatically 'interrupted' (from the GUI) or by the user
in batch mode through fs.settings.skip_calc """
return (self._interrupted[0] or fs.settings.skip_calc)
@staticmethod
def numba_cycle_call(cycle_dep_args, cycle_indep_args):
# Just a thin wrapper to allow customization in derived classes
ret_code = numba_cycles(*cycle_dep_args, *cycle_indep_args)
return ret_code
def get_cycle_indep_args(self, initialize, iterate):
"""
When not a perturbation rendering:
This is just a diggest of the zoom and calculation parameters
"""
center = self.x + 1j * self.y
# proj_dzndc_modifier = getattr(self.projection, "dzndc_modifier", None)
return (
initialize, iterate,
center, self.proj_impl, self.lin_mat, self.dx,
self._interrupted
)
def get_cycling_dep_args(
self, calc_name, chunk_slice,
final=False, jitter=False, supersampling=None):
"""
The actual input / output arrays
"""
c_pix = np.ravel(
self.chunk_pixel_pos(chunk_slice, jitter, supersampling)
)
state = self._calc_data[calc_name]["state"]
subset = state.subset
if subset is not None:
if final:
# Here we need a substitution - as raw arrays not stored
subset = self._subset_hook[calc_name]
chunk_subset = subset[chunk_slice]
c_pix = c_pix[chunk_subset]
else:
chunk_subset = None
# Initialise the result arrays
(n_pts,) = c_pix.shape
n_Z, n_U, n_stop = (len(code) for code in self.codes)
Z = np.zeros([n_Z, n_pts], dtype=self.complex_type)
U = np.zeros([n_U, n_pts], dtype=self.int_type)
stop_reason = - np.ones([1, n_pts], dtype=self.termination_type)
stop_iter = np.zeros([1, n_pts], dtype=self.int_type)
return (c_pix, Z, U, stop_reason, stop_iter), chunk_subset
#==============================================================================
def add_subset_hook(self, f_array, calc_name_to):
""" Create a namespace for a hook that shall be filled during
on-the fly computation - used for final calculations with subset on.
Hook structure: dict
hook[calcname_to] = [f_array_dat, supersampling_k, mmap]
"""
logger.debug(f"Add Subset hook for {calc_name_to}")
if not(hasattr(self, "_subset_hook")):
self._subset_hook = dict()
self._subset_hook[calc_name_to] = _Subset_temporary_array(
self, f_array.calc_name, calc_name_to,
f_array.key, f_array._func, f_array.inv
)
def init_subset_mmap(self, calc_name_to, supersampling): # private
""" Create the memory mapping
Format: 1-d UNcompressed per chunk, takes into account supersampling
as needed.
"""
logger.debug(f"Opening subset mmap for: {calc_name_to}")
self._subset_hook[calc_name_to].init_mmap(supersampling)
def close_subset_mmap(self, calc_name_to, supersampling): # private
""" Closed the memory mapping
Format: 1-d UNcompressed per chunk, takes into account supersampling
as needed.
"""
self._subset_hook[calc_name_to].close_mmap(supersampling)
def save_subset_arrays(
self, calc_name, chunk_slice, postproc_options, ret
):
""" Checks wether saving is needed"""
for calc_name_to, sta in self._subset_hook.items():
if sta.calc_name_from != calc_name:
# Calc data not needed for this fractal array
return
logger.debug("Saving subset data chunk for on the fly plot:\n" +
f"{calc_name} -> {sta.calc_name_to}")
sta.save_array(chunk_slice, ret)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def fingerprint_path(self, calc_name):
return os.path.join(
self.directory, "data", calc_name + ".fingerprint"
)
def save_fingerprint(self, calc_name, fingerprint):
save_path = self.fingerprint_path(calc_name)
fs.utils.mkdir_p(os.path.dirname(save_path))
with open(save_path, 'wb+') as fp_file:
pickle.dump(fingerprint, fp_file, pickle.HIGHEST_PROTOCOL)
def reload_fingerprint(self, calc_name):
""" Reloading the fingerprint from the saved files """
save_path = self.fingerprint_path(calc_name)
with open(save_path, 'rb') as tmpfile:
fingerprint = pickle.load(tmpfile)
# Here we could finalize unpickling of Fractal arrays by binding
# again the Fractal object - it is however not needed (we only use
# the unpickled object for equality testing)
return fingerprint
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Codes filtering
def saved_codes(self, codes):
(complex_codes, int_codes, stop_codes) = codes
f_complex_codes = self.filter_stored_codes(complex_codes)
f_int_codes = self.filter_stored_codes(int_codes)
return (f_complex_codes, f_int_codes, stop_codes)
@staticmethod
def filter_stored_codes(codes):
""" Don't store temporary codes - i.e. those which starts with "_" """
return list(filter(lambda x: not(x.startswith("_")), codes))
#==============================================================================
@property
def txt_report_path(self):
"""The main report is written by the Fractal_plotter, only the name is
defined here"""
return os.path.join(
self.directory, self.__class__.__name__ + ".txt"
)
# Report path ".inspect" tracks the progress of the calculations
def report_path(self, calc_name): # public
return os.path.join(
self.directory, "data", calc_name + ".report"
)
def init_report_mmap(self, calc_name): # private
"""
Create the memory mapping for calculation reports by chunks
[chunk1d_begin, chunk1d_end, chunk_pts]
"""
items = self.REPORT_ITEMS
chunks_count = self.chunks_count
report_cols_count = len(items)
logger.debug(
"Create new report_mmap with chunks_count: "
f"{self.chunks_count}"
)
save_path = self.report_path(calc_name)
fs.utils.mkdir_p(os.path.dirname(save_path))
mmap = open_memmap(
filename=save_path,
mode='w+',
dtype=np.dtype(np.int32),
shape=(chunks_count, report_cols_count),
fortran_order=False,
version=None
)
mmap[:, items.index("done")] = 0 # Will be set to 1 when done
# Number of points per chunk
chunk_pts = np.empty((chunks_count,), dtype=np.int32)
for i, chunk_slice in enumerate(self.chunk_slices()):
chunk_pts[i] = self.pts_count(calc_name, chunk_slice) # (calc_name, chunk_slice)
mmap[:, items.index("chunk_pts")] = chunk_pts
# full_cumsum is np.cumsum(chunk_pts) with inserted 0
full_cumsum = np.empty((chunks_count + 1,), dtype=np.int32)
np.cumsum(chunk_pts, out=full_cumsum[1:])
full_cumsum[0] = 0
mmap[:, items.index("chunk1d_begin")] = full_cumsum[:-1]
mmap[:, items.index("chunk1d_end")] = full_cumsum[1:]
del mmap
def get_report_memmap(self, calc_name, mode='r+'):
# Development Note - Memory mapping
# From IEEE 1003.1:
# The mmap() function shall establish a mapping between a process'
# address space and a file, shared memory object, or [TYM] typed
# memory object.
# 1)
# Every instance of np.memmap creates its own mmap of the whole file
# (even if it only creates an array from part of the
# file). The implications of this are A) you can't use np.memmap's
# offset parameter to get around file size limitations, and B) you
# shouldn't create many numpy.memmaps of the same file. To work around
# B, you should create a single memmap, and dole out views and slices.
# 2)
# Yes, it allocates room for the whole file in your process's LOGICAL
# address space. However, it doesn't actually reserve any PHYSICAL
# memory, or read in any data from the disk, until you've actually
# access the data. And then it only reads small chunks in, not the
# whole file.
val = open_memmap(
filename=self.report_path(calc_name), mode=mode
)
return val
def report_memmap_attr(self, calc_name):
return "__report_mmap_" + "_" + calc_name
def update_report_mmap(self, calc_name, chunk_slice, stop_reason):
"""
"""
mmap = self.get_report_memmap(calc_name, mode="r+")
items = self.REPORT_ITEMS
chunk_rank = self.chunk_rank(chunk_slice)
mmap[chunk_rank, items.index("done")] = 1
del mmap
def reload_report(self, calc_name, chunk_slice): # public
""" Return a report extract for the given chunk, as a dict
If no chunk provided, return the full report (header, report)
"""
mmap = self.get_report_memmap(calc_name)
items = self.REPORT_ITEMS
if chunk_slice is None:
report = np.empty(mmap.shape, mmap.dtype)
report[:, :] = mmap[:, :]
return self.REPORT_ITEMS, report
rank = self.chunk_rank(chunk_slice)
report = dict(zip(
items,
(mmap[rank, items.index(it)] for it in items)
))
del mmap
return report
def write_inspect_calc(self, calc_name, final):
"""
Outputs a report for the current calculation
"""
outpath = os.path.join(self.directory, calc_name + ".inspect")
if final:
log_txt = ("Cannot write detailed output *.inspect_calc"
" for a final render")
if os.path.exists(outpath):
os.remove(outpath)
log_txt += f", deleting obsolete file {outpath}"
logger.info(log_txt)
return
REPORT_ITEMS, report = self.reload_report(calc_name, None)
report_header = ("chnk_beg|chnk_end|chnk_pts|done|")
# There are other interesting items to inspect
chunks_count = self.chunks_count
stop_ITEMS = ["min_stop_iter", "max_stop_iter", "mean_stop_iter"]
stop_report = np.zeros([chunks_count, 3], dtype = np.int32)
stop_header = "min_stop|max_stop|mean_stp|"
reason_ITEMS = []
reason_reports = []
reason_header = ""
reason_template = np.zeros([chunks_count, 1], dtype = np.int32)
for i, chunk_slice in enumerate(self.chunk_slices()):
subset, _, Z, U, stop_reason, stop_iter = self.reload_data(
chunk_slice, calc_name)
# Outputs a summary of the stop iter
has_item = (stop_iter.size != 0)
for j, it in enumerate(stop_ITEMS):
if it == "min_stop_iter" and has_item:
stop_report[i, j] = np.min(stop_iter)
elif it == "max_stop_iter" and has_item:
stop_report[i, j] = np.max(stop_iter)
elif it == "mean_stop_iter" and has_item:
stop_report[i, j] = int(np.mean(stop_iter))
else:
stop_report[i, j] = -1
# Outputs a summary of the stop reason
if (stop_reason.size == 0): # Nothing to report
continue
max_chunk_reason = np.max(stop_reason)
for r in range(len(reason_ITEMS), max_chunk_reason + 1):
reason_ITEMS += ["reason_" + str(r)]
reason_reports += [reason_template.copy()]
reason_header += ("reason_" + str(r) + "|")
bc = np.bincount(np.ravel(stop_reason))
for r, bc_r in enumerate(bc):
reason_reports[r][i, 0] = bc_r
# Stack the results
header = REPORT_ITEMS + stop_ITEMS + reason_ITEMS
n_header = len(header)
full_report = np.empty((chunks_count, n_header), dtype = np.int32)
l1 = len(REPORT_ITEMS)
l2 = l1 + len(stop_ITEMS)
full_report[:, :l1] = report
full_report[:, l1:l2] = stop_report
for i in range(l2, n_header):
r = i - l2
full_report[:, i] = reason_reports[r][:, 0]
np.savetxt(
outpath,
full_report,
fmt=('%8i|%8i|%8i|%4i|%8i|%8i|%8i|' + '%8i|' * len(reason_ITEMS)),
header=(report_header + stop_header + reason_header),
comments=''
)
def data_path(self, calc_name):
keys = ["subset"] + self.SAVE_ARRS
def file_map(key):
return os.path.join(self.directory, "data",
calc_name + "_" + key + ".arr")
return dict(zip(keys, map(file_map, keys)))
def init_data_mmaps(self, calc_name):
"""
Creates the memory mappings for calculated arrays
[subset, Z, U, stop_reason, stop_iter]
Note : subset can be initialized here
"""
state = self._calc_data[calc_name]["state"]
keys = self.SAVE_ARRS
data_type = {
"Z": self.complex_type,
"U": self.int_type,
"stop_reason": self.termination_type,
"stop_iter": self.int_type,
}
data_path = self.data_path(calc_name)
pts_count = self.pts_count(calc_name) # the memmap 1st dim
(complex_codes, int_codes, stop_codes) = state.codes
# keep only the one which do not sart with "_"
f_complex_codes = self.filter_stored_codes(complex_codes)
f_int_codes = self.filter_stored_codes(int_codes)
n_Z, n_U, n_stop = (len(codes) for codes in
(f_complex_codes, f_int_codes, stop_codes))
# Followin C row-major order --> arr[x, :] shall be fast
data_dim = {
"Z": (n_Z, pts_count),
"U": (n_U, pts_count),
"stop_reason": (1, pts_count),
"stop_iter": (1, pts_count),
}
for key in keys:
mmap = open_memmap(
filename=data_path[key],
mode='w+',
dtype=np.dtype(data_type[key]),
shape=data_dim[key],
fortran_order=False,
version=None
)
del mmap
# Store the subset (if there is one) at this stage : it is already
# known
# /!\ the size of the subset is always the same, irrespective of
# the number of items masked
state = self._calc_data[calc_name]["state"]
subset = state.subset
if subset is not None:
mmap = open_memmap(
filename=data_path["subset"],
mode='w+',
dtype=bool,
shape=(self.ny * self.nx,), # PIL xy inversion
fortran_order=False,
version=None
)
for rank, chunk_slice in enumerate(self.chunk_slices()):
beg, end = self.uncompressed_beg_end(rank) # indep of calc_name
mmap[beg:end] = subset[chunk_slice]
del mmap
def get_data_memmap(self, calc_name, key, mode='r+'):
# See "Development Note - Memory mapping "
data_path = self.data_path(calc_name)
mmap = open_memmap(
filename=data_path[key], mode=mode
)
return mmap
def update_data_mmaps(
self, calc_name, chunk_slice, Z, U, stop_reason, stop_iter
):
state = self._calc_data[calc_name]["state"]
keys = self.SAVE_ARRS
arr_map = {
"Z": Z,
"U": U,
"stop_reason": stop_reason,
"stop_iter": stop_iter,
}
rank = self.chunk_rank(chunk_slice)
beg, end = self.compressed_beg_end(calc_name, rank)
# codes mapping - taking into account suppressed fields
# (those starting with "_")
(complex_codes, int_codes, stop_codes) = state.codes
# keep only the one which do not sart with "_"
f_complex_codes = self.filter_stored_codes(complex_codes)
f_int_codes = self.filter_stored_codes(int_codes)
n_Z, n_U, n_stop = (len(codes) for codes in
(f_complex_codes, f_int_codes, stop_codes))
codes_index_map = {
"Z": (range(n_Z), list(complex_codes.index(f_complex_codes[i])
for i in range(n_Z))),
"U": (range(n_U), list(int_codes.index(f_int_codes[i])
for i in range(n_U))),
"stop_reason": (range(1), range(1)),
"stop_iter": (range(1), range(1))
}
for key in keys:
mmap = self.get_data_memmap(calc_name, key, mode="r+")
arr = arr_map[key]
for (field, f_field) in zip(*codes_index_map[key]):
mmap[field, beg:end] = arr[f_field, :]
def reload_data(self, chunk_slice, calc_name):
""" Reload all stored raw arrays for this chunk :
raw_data = subset, Z, U, stop_reason, stop_iter
"""
keys = self.SAVE_ARRS
rank = self.chunk_rank(chunk_slice)
beg, end = self.compressed_beg_end(calc_name, rank)
arr = dict()
for key in keys:
mmap = self.get_data_memmap(calc_name, key)
arr[key] = mmap[:, beg:end]
del mmap
state = self._calc_data[calc_name]["state"]
subset = state.subset
if subset is not None:
# /!\ fixed-size irrespective of the mask
beg, end = self.uncompressed_beg_end(rank) # indep of calc_name
mmap = self.get_data_memmap(calc_name, "subset")
arr["subset"] = mmap[beg: end]
del mmap
else:
arr["subset"] = None
# Reload c_pic - Note: we are in a "not final" (aka developement)
# rendering ; jitter / supersampling are desactivated
c_pix = np.ravel(
self.chunk_pixel_pos(chunk_slice, jitter=False, supersampling=None)
)
if subset is not None:
c_pix = c_pix[arr["subset"]]
return (
arr["subset"], c_pix, arr["Z"], arr["U"], arr["stop_reason"],
arr["stop_iter"]
)
@Multithreading_iterator(
iterable_attr="chunk_slices", iter_kwargs="chunk_slice"
)
def compute_rawdata_dev(self, calc_name, chunk_slice, tile_validator=None):
""" In dev mode we follow a 2-step approach : here the compute step
"""
if tile_validator is not None:
if not(tile_validator(chunk_slice)):
return
if self.is_interrupted():
logger.debug(
"Interrupted - skipping calc for:\n"
f" {calc_name} ; {chunk_slice}"
)
return
if self.res_available(calc_name, chunk_slice):
logger.debug(
"Result already available - skipping calc for:\n"
f" {calc_name} ; {chunk_slice}"
)
self.incr_tiles_status(which="Calc tiles")
return
(cycle_dep_args, chunk_subset
) = self.get_cycling_dep_args(calc_name, chunk_slice)
cycle_indep_args = self._calc_data[calc_name]["cycle_indep_args"]
ret_code = self.numba_cycle_call(cycle_dep_args, cycle_indep_args)
if ret_code == self.USER_INTERRUPTED:
logger.warning("Interruption signal received")
return
(c_pix, Z, U, stop_reason, stop_iter) = cycle_dep_args
self.update_report_mmap(calc_name, chunk_slice, stop_reason)
self.update_data_mmaps(
calc_name, chunk_slice, Z, U, stop_reason, stop_iter
)
self.incr_tiles_status(which="Calc tiles")
def reload_rawdata_dev(self, calc_name, chunk_slice):
""" In dev mode we follow a 2-step approach : here the reload step
"""
if self.res_available(calc_name, chunk_slice):
self.incr_tiles_status(which="Plot tiles")
return self.reload_data(chunk_slice, calc_name)
else:
# Interrupted calculation ?
if self.res_available(calc_name, None):
return None
raise RuntimeError(f"Results unavailable for {calc_name}")
def evaluate_rawdata_final(self, calc_name, chunk_slice, postproc_options):
# we ARE in final render
# - take into account postproc_options
# - do not save raw data arrays to avoid too much disk usage (e.g.,
# in case of supersampling...)
jitter = float(postproc_options["jitter"]) # casting to float
supersampling = SUPERSAMPLING_DIC[postproc_options["supersampling"]]
(cycle_dep_args, chunk_subset) = self.get_cycling_dep_args(
calc_name, chunk_slice,
final=True, jitter=jitter, supersampling=supersampling
)
cycle_indep_args = self._calc_data[calc_name]["cycle_indep_args"]
ret_code = self.numba_cycle_call(cycle_dep_args, cycle_indep_args)
self.incr_tiles_status(which="Plot tiles")
if ret_code == self.USER_INTERRUPTED:
logger.error("Interruption signal received")
return
(c_pix, Z, U, stop_reason, stop_iter) = cycle_dep_args
return (chunk_subset, c_pix, Z, U, stop_reason, stop_iter)
def evaluate_data(self, calc_name, chunk_slice, postproc_options):
""" Compute on the fly the raw arrays for this chunk :
raw_data = subset, Z, U, stop_reason, stop_iter
Note: this is normally activated only for a final redering
"""
if postproc_options["final_render"]:
ret = self.evaluate_rawdata_final(
calc_name, chunk_slice, postproc_options
)
# Post calc saving
if hasattr(self, "_subset_hook"):
self.save_subset_arrays(
calc_name, chunk_slice, postproc_options, ret
)
else:
ret = self.reload_rawdata_dev(calc_name, chunk_slice)
return ret
@staticmethod
def kind_from_code(code, codes):
"""
codes as returned by
(params, codes) = self.reload_data_chunk(chunk_slice, calc_name)
Used for "raw" post-processing
"""
complex_codes, int_codes, _ = codes
if code in complex_codes:
kind = "complex"
elif code in int_codes:
kind = "int"
elif code == "stop_reason":
kind = code
elif code == "stop_iter":
kind = code
else:
raise KeyError(
"raw data code unknow: " + code, complex_codes,
int_codes, "stop_reason", "stop_iter"
)
return kind
@staticmethod
def reshape2d(chunk_array, subset, chunk_slice):
"""
Returns 2d versions of the 1d stored vecs
chunk_array of size (n_post, n_pts)
# note : to get a 2-dimensionnal vec do:
if bool_mask is not None:
we need to inverse
subset = np.ravel(subset[chunk_size])
c = c[subset]
"""
(ix, ixx, iy, iyy) = chunk_slice
nx, ny = ixx - ix, iyy - iy
n_post, n_pts = chunk_array.shape
chunk_1d = Fractal.reshape1d(chunk_array, subset, chunk_slice)
return np.reshape(chunk_1d, [n_post, nx, ny])
@staticmethod
def reshape1d(chunk_array, subset, chunk_slice, supersampling=None):
"""
Returns unmasked 1d versions of the 1d stored vecs
"""
(ix, ixx, iy, iyy) = chunk_slice
nx, ny = ixx - ix, iyy - iy
n_post, n_pts = chunk_array.shape
if subset is None:
chunk_1d = chunk_array
else:
npts = nx * ny
if supersampling is not None:
npts *= supersampling ** 2
indices = np.arange(npts)[subset]
chunk_1d = np.empty([n_post, npts], dtype=chunk_array.dtype)
chunk_1d[:] = np.nan
chunk_1d[:, indices] = chunk_array
return chunk_1d
@staticmethod
def index2d(index_1d, subset, chunk_slice):
""" Return the 2d-indexing from 1d + mask
subset = None | self.subset[chunk_slice]
"""
# chunk_size = fssettings.chunk_size
(ix, ixx, iy, iyy) = chunk_slice
nx, ny = ixx - ix, iyy - iy
ix, iy = np.indices((nx, ny))
ix = np.ravel(ix)
iy = np.ravel(iy)
if subset is not None:
ix = ix[subset]
iy = iy[subset]
return ix[index_1d], iy[index_1d]
@staticmethod
def codes_mapping(complex_codes, int_codes, termination_codes):
"""
Utility function, returns the inverse mapping code -> int
"""
complex_dic, int_dic, termination_dic = [dict(
zip(tab, range(len(tab)))) for tab in [
complex_codes, int_codes, termination_codes]]
return complex_dic, int_dic, termination_dic
@staticmethod
def subsubset(bool_set, bool_subset_of_set):
"""
Returns boolean array for a subset
Parameters
- *bool_set* bool array of shape N, defines a set
- *bool_subset_of_set* bool array of shape Card(set)
Returns
- *bool_subset* bool array of shape N
"""
set_count = np.sum(bool_set)
Card_set, = np.shape(bool_subset_of_set)
if Card_set != set_count:
raise ValueError("Expected bool_subset_of_set of shape"
" [Card(set)]")
bool_subset = np.copy(bool_set)
bool_subset[bool_set] = bool_subset_of_set
return bool_subset
def calc_raw(self, calc_name, tile_validator=None):
""" Here we can create the memory mappings and launch the calculation
loop"""
# Note: at this point res_available(calc_name) IS True, however
# mmaps might not have been created.
if self._calc_data[calc_name]["need_new_mmap"]:
self.init_report_mmap(calc_name)
self.init_data_mmaps(calc_name)
# Launching the calculation + mmap storing multithreading loop
self.compute_rawdata_dev(
calc_name, chunk_slice=None, tile_validator=tile_validator
)
def postproc(self, postproc_batch, chunk_slice, postproc_options):
""" Computes the output of ``postproc_batch`` for chunk_slice
Return
post_array of shape(nposts, chunk_n_pts)
subset
"""
if postproc_batch.fractal is not self:
raise ValueError("Postproc batch from a different factal provided")
calc_name = postproc_batch.calc_name
if postproc_options["final_render"]:
self.set_status("Calc tiles", "No [final]", bool_log=True)
ret = self.evaluate_data(calc_name, chunk_slice, postproc_options)
if ret is None:
# Unexpected interruption
return None
(subset, c_pix, Z, U, stop_reason, stop_iter) = ret
codes = self._calc_data[calc_name]["saved_codes"]
complex_dic, int_dic, termination_dic = self.codes_mapping(*codes)
postproc_batch.set_chunk_data(chunk_slice, subset, c_pix, Z, U,
stop_reason, stop_iter, complex_dic, int_dic, termination_dic)
# Output data
n_pts = Z.shape[1] # Z of shape [n_Z, n_pts]
post_array = np.empty(
(len(postproc_batch.posts), n_pts), dtype=self.float_postproc_type
)
for i, postproc in enumerate(postproc_batch.posts.values()):
val, context_update = postproc[chunk_slice]
post_array[i, :] = val
postproc_batch.update_context(chunk_slice, context_update)
postproc_batch.clear_chunk_data(chunk_slice)
return post_array, subset
def get_std_cpt(self, c_pix):
""" Return the c complex value from c_pix """
n_pts, = c_pix.shape # Z of shape [n_Z, n_pts]
# Explicit casting to complex
center = complex(self.x + 1j * self.y)
cpt = np.empty((n_pts,), dtype=c_pix.dtype)
fill1d_c_from_pix(
c_pix, self.proj_impl, self.lin_mat, self.dx, center, cpt
)
return cpt
#==============================================================================
# GUI : "interactive options"
#==============================================================================
def coords(self, x, y, pix, dps):
""" x, y : coordinates of the event """
x_str = str(x)
y_str = str(y)
res_str = f"""
coords = {{
"x": "{x_str}"
"y": "{y_str}"
}}
"""
return res_str
#==============================================================================
class _Subset_temporary_array:
def __init__(self, fractal, calc_name_from, calc_name_to, key, func, inv):
""" Drop-in replacement for Fractal_array in case of on-the-fly
computation: used when final calculation & subset activated """
self.fractal = fractal
self.calc_name_from = calc_name_from
self.calc_name_to = calc_name_to
self.key = key
self._func = func
self.inv = inv
# Parsing the func string if needed
self.func = func
if func is not None:
if isinstance(func, str):
self.func = fs_parser.func_parser(["x"], func)
def path(self):
""" Memory mapping used in case of on-fly calculation
"""
return os.path.join(
self.fractal.directory, "data", self.calc_name_to + "_subset._img"
)
def init_mmap(self, supersampling):
""" Create the memory mapping
Format: 1-d UNcompressed per chunk, takes into account supersampling
as needed.
"""
subset_path = self.path()
logger.debug(f"Opening subset mmap at: {subset_path}")
npts = self.fractal.nx * self.fractal.ny
if supersampling is not None:
npts *= supersampling ** 2
mmap = open_memmap(
filename=subset_path,
mode='w+',
dtype=np.dtype(bool),
shape=(npts,),
fortran_order=False,
version=None
)
del mmap
# Storing status (/!\ not thread safe)
self.supersampling = supersampling
# del mmap
# self._mmap = mmap
def close_mmap(self, supersampling):
try:
del self._mmap
except AttributeError:
pass
def __setitem__(self, chunk_slice, bool_arr):
f = self.fractal
rank = f.chunk_rank(chunk_slice)
beg, end = f.uncompressed_beg_end(rank)
ssg = self.supersampling
if ssg is not None:
beg *= ssg ** 2
end *= ssg ** 2
_mmap = open_memmap(self.path(), mode='r+')
_mmap[beg: end] = bool_arr
del _mmap
def __getitem__(self, chunk_slice):
f = self.fractal
rank = f.chunk_rank(chunk_slice)
beg, end = f.uncompressed_beg_end(rank)
ssg = self.supersampling
if ssg is not None:
beg *= ssg ** 2
end *= ssg ** 2
_mmap = open_memmap(self.path(), mode='r+')
ret = _mmap[beg: end]
del _mmap
return ret
def save_array(self, chunk_slice, ret):
""" Compute on the fly the boolean array & save it in memory mapping"""
(subset, c_pix, Z, U, stop_reason, stop_iter) = ret
f = self.fractal
if subset is not None:
raise NotImplementedError(
"Chained subset 'on the fly', currently not implemented"
)
# Identify the base 1d array
codes = f._calc_data[self.calc_name_from]["saved_codes"]
kind = f.kind_from_code(self.key, codes)
complex_dic, int_dic, termination_dic = f.codes_mapping(*codes)
if kind == "complex":
arr_base = Z[complex_dic[self.key]]
elif kind == "int":
arr_base = U[int_dic[self.key]]
elif kind in "stop_reason":
arr_base = stop_reason[0]
elif kind == "stop_iter":
arr_base = stop_iter[0]
else:
raise ValueError(kind)
# Apply func + inv as needed
if self.func is not None:
arr_base = self.func(arr_base)
if self.inv:
arr_base = ~arr_base
self[chunk_slice] = arr_base
#==============================================================================
# Numba JIT functions
#==============================================================================
USER_INTERRUPTED = 1
@numba.njit(nogil=True)
def numba_cycles(
c_pix, Z, U, stop_reason, stop_iter,
initialize, iterate,
center, proj_impl, lin_mat, dx,
_interrupted
):
# Full iteration for a set of points - calls numba_cycle
npts = c_pix.size
for ipt in range(npts):
Zpt = Z[:, ipt]
Upt = U[:, ipt]
cpt = c_from_pix(
c_pix[ipt], proj_impl, lin_mat, dx, center
)
stop_pt = stop_reason[:, ipt]
initialize(Zpt, Upt, cpt)
n_iter = iterate(
cpt, Zpt, Upt, stop_pt
)
stop_iter[0, ipt] = n_iter
stop_reason[0, ipt] = stop_pt[0]
if _interrupted[0]:
return USER_INTERRUPTED
return 0
def numba_iterate(
calc_orbit, i_znorbit, backshift, zn, iterate_once, zn_iterate
):
""" Numba implementation - recompiled if options change """
@numba.njit(nogil=True)
def numba_impl(c, Z, U, stop):
n_iter = 0
if calc_orbit:
div_shift = 0
orbit_zn1 = Z[zn]
orbit_zn2 = Z[zn]
orbit_i1 = 0
orbit_i2 = 0
while True:
n_iter += 1
ret = iterate_once(c, Z, U, stop, n_iter)
if calc_orbit:
div = n_iter // backshift
if div > div_shift:
div_shift = div
orbit_i2 = orbit_i1
orbit_zn2 = orbit_zn1
orbit_i1 = n_iter
orbit_zn1 = Z[zn]
if ret != 0:
break
if calc_orbit:
zn_orbit = orbit_zn2
while orbit_i2 < n_iter - backshift:
zn_orbit = zn_iterate(zn_orbit, c)
orbit_i2 += 1
Z[i_znorbit] = zn_orbit
return n_iter
return numba_impl
def numba_iterate_BS(
calc_orbit, i_xnorbit, i_ynorbit, backshift, xn, yn,
iterate_once, xnyn_iterate
):
""" Numba implementation - recompiled if options change """
@numba.njit(nogil=True)
def numba_impl(c, Z, U, stop):
n_iter = 0
if calc_orbit:
div_shift = 0
orbit_xn1 = Z[xn]
orbit_xn2 = Z[xn]
orbit_yn1 = Z[yn]
orbit_yn2 = Z[yn]
orbit_i1 = 0
orbit_i2 = 0
a = c.real
b = c.imag
while True:
n_iter += 1
ret = iterate_once(c, Z, U, stop, n_iter)
if calc_orbit:
div = n_iter // backshift
if div > div_shift:
div_shift = div
orbit_i2 = orbit_i1
orbit_xn2 = orbit_xn1
orbit_yn2 = orbit_yn1
orbit_i1 = n_iter
orbit_xn1 = Z[xn]
orbit_yn1 = Z[yn]
if ret != 0:
break
if calc_orbit:
xn_orbit = orbit_xn2
yn_orbit = orbit_yn2
while orbit_i2 < n_iter - backshift:
xn_orbit, yn_orbit = xnyn_iterate(xn_orbit, yn_orbit, a, b)
orbit_i2 += 1
Z[i_xnorbit] = xn_orbit
Z[i_ynorbit] = yn_orbit
return n_iter
return numba_impl
def numba_Newton(
known_orders, max_order, max_newton, eps_newton_cv,
reason_max_order, reason_order_confirmed,
izr, idzrdz, idzrdc, id2zrdzdc, i_partial, i_zn, iorder,
zn_iterate, iterate_newton_search
):
""" Numba implementation
Run Newton search to find z0 so that f^n(z0) == z0 (n being the order)
"""
@numba.njit(nogil=True)
def numba_impl(c, Z, U, stop):
n_iter = 0
while True:
n_iter += 1
if n_iter > max_order:
stop[0] = reason_max_order
break
# If n is not a 'partial' for this point it cannot be the
# cycle order : early exit
Z[i_zn] = zn_iterate(Z[i_zn], c)
m = np.abs(Z[i_zn])
if m < Z[i_partial].real:
# This is a new partial, might be an attracting cycle
Z[i_partial] = m # Cannot assign to the real part
else:
continue
# Early exit if n it is not a multiple of one of the
# known_orders (provided by the user)
if known_orders is not None:
valid = False
for order in known_orders:
if n_iter % order == 0:
valid = True
if not valid:
continue
z0_loop = complex(0.) # Z[0]
dz0dc_loop = complex(0.) # Z[2]
for i_newton in range(max_newton):
zr = z0_loop
dzrdc = dz0dc_loop
dzrdz = complex(1.)
d2zrdzdc = complex(0.)
for i in range(n_iter):
# It seems it is not possible to split and compute the
# derivatives in postprocssing when convergence is reached
# --> we do it during the Newton iterations
d2zrdzdc, dzrdc, dzrdz, zr = iterate_newton_search(
d2zrdzdc, dzrdc, dzrdz, zr, c
)
delta = (zr - z0_loop) / (dzrdz - 1.)
newton_cv = (np.abs(delta) < eps_newton_cv)
zz = z0_loop - delta
dz0dc_loop = dz0dc_loop - (
(dzrdc - dz0dc_loop) / (dzrdz - 1.)
- (zr - z0_loop) * d2zrdzdc / (dzrdz - 1.)**2
)
z0_loop = zz
if newton_cv:
break
# We have a candidate but is it the good one ?
is_confirmed = (np.abs(dzrdz) <= 1.) & newton_cv
if not(is_confirmed): # not found, try next
continue
Z[izr] = zr
Z[idzrdz] = dzrdz # attr (cycle attractivity)
Z[idzrdc] = dzrdc
Z[id2zrdzdc] = d2zrdzdc # dattrdc
U[iorder] = n_iter
stop[0] = reason_order_confirmed
break
# End of while loop
return n_iter
return numba_impl
@numba.njit(fastmath=True, nogil=True, cache=True)
def apply_unskew_1d(skew, arrx, arry):
"""Unskews the view for contravariant coordinates e.g. normal vec
Used in postproc.py to keep the right orientation for the shadings
"""
n = arrx.shape[0]
for i in range(n):
nx = arrx[i]
ny = arry[i]
# Note: this is a product by the transposed matrix
# *Contravariant* indexing
arrx[i] = skew[0, 0] * nx + skew[1, 0] * ny
arry[i] = skew[0, 1] * nx + skew[1, 1] * ny
@numba.njit(fastmath=True, nogil=True, cache=True) # (numba.complex128(numba.complex128))
def lin_proj_impl(lin_mat, dx, z):
x = z.real
y = z.imag
x1 = lin_mat[0, 0] * x + lin_mat[0, 1] * y
y1 = lin_mat[1, 0] * x + lin_mat[1, 1] * y
return dx * complex(x1, y1)
@numba.njit(fastmath=True, nogil=True)
def c_from_pix(pix, proj_impl, lin_mat, dx, center):
"""
Returns the true c from the pixel coords
Note: to be re-implemented for pertubation theory, as C = cref + dc
Parameters
----------
pix : complex
pixel location in fraction of dx
center : coordinates of the image center
proj_impl : numba func
Non-linear part of the projection (for a "cartesian projection" this
is simply a pass-through)
proj_impl : numba func
Non-linear part of the projection. Includes the following effects:
- scaling (with dx)
- rotation (with theta)
- skew
Returns
-------
c : c value as complex
"""
return center + lin_proj_impl(lin_mat, dx, proj_impl(pix))
@numba.njit(fastmath=True, nogil=True)
def fill1d_c_from_pix(c_pix, proj_impl, lin_mat, dx, center, c_out):
""" Same as c_from_pix but fills in-place a 1d vec """
nx = c_pix.shape[0]
for i in range(nx):
c_out[i] = c_from_pix(
c_pix[i], proj_impl, lin_mat, dx, center
)