Source code for transonic.aheadoftime

"""User runtime API for the ahead-of-time compilation
=====================================================

User API
--------

.. autofunction:: boost

.. autoclass:: Transonic
   :members:
   :private-members:

Internal API
------------

.. autofunction:: _get_transonic_calling_module

.. autoclass:: CheckCompiling
   :members:
   :private-members:

"""

import functools
import inspect
import os
import subprocess
import sys
import time
from importlib import import_module

from transonic import mpi
from transonic.backends import backends, get_backend_name_module
from transonic.config import backend_default, has_to_replace
from transonic.log import logger
from transonic.mpi import Path
from transonic.util import (
    _get_pathfile_from_frame,
    find_module_name_from_path,
    get_frame,
    get_module_name,
    has_to_build,
    has_to_compile_at_import,
    import_from_path,
    is_method,
    modification_date,
    write_if_has_to_write,
)

if mpi.nb_proc == 1:
    mpi.has_to_build = has_to_build
    mpi.modification_date = modification_date

is_transpiling = False
modules_backends = {backend_name: {} for backend_name in backends.keys()}
modules = modules_backends[backend_default]


def _get_data_from_func_or_data(data_or_func):
    """
    Before 0.7.2, some metadata was saved in module variables,
    which leads to issue with Pythran on Windows.
    They are now saved in functions, but we continue supporting the "old" extensions.
    """
    if callable(data_or_func):
        return data_or_func()
    else:
        # extension compiled with Transonic < 0.7.2
        return data_or_func


[docs]def _get_transonic_calling_module(backend_name: str = None): """Get the Transonic instance corresponding to the calling module Parameters ---------- backend_name: str """ frame = get_frame(2) module_name = get_module_name(frame) if backend_name is None: backend_name = get_backend_name_module(module_name) modules = modules_backends[backend_name] if module_name in modules: ts = modules[module_name] if ( ts.is_transpiling != is_transpiling or ts._compile_at_import_at_creation != has_to_compile_at_import() or ( hasattr(ts, "path_mod") and ts.path_backend.exists() and mpi.has_to_build(ts.path_backend, ts.path_mod) ) ): ts = Transonic(frame=frame, reuse=False, backend=backend_name) else: ts = Transonic(frame=frame, reuse=False, backend=backend_name) return ts
[docs]def boost( obj=None, backend: str = None, inline=False, boundscheck=True, wraparound=True, cdivision=False, nonecheck=True, nogil=False, ): """Decorator to declare that an object can be accelerated Parameters ---------- obj: a function, a method or a class """ if backend is not None and not isinstance(backend, str): raise TypeError ts = _get_transonic_calling_module(backend_name=backend) decor = ts.boost( inline=inline, nogil=nogil, boundscheck=boundscheck, wraparound=wraparound, cdivision=cdivision, nonecheck=nonecheck, ) if callable(obj) or isinstance(obj, type): return decor(obj) else: return decor
[docs]class CheckCompiling: """Check if the module is being compiled and replace the module and the function""" def __init__(self, ts, func): self.has_been_replaced = False self.ts = ts self.func = func def __call__(self, *args, **kwargs): if self.has_been_replaced: return self.func(*args, **kwargs) ts = self.ts if ts.is_compiling and not ts.process.is_alive(raise_if_error=True): ts.is_compiling = False time.sleep(0.1) ts.module_backend = import_from_path( ts.path_extension, ts.module_backend.__name__ ) assert ts.backend.check_if_compiled(self.ts.module_backend) ts.is_compiled = True if not ts.is_compiling: self.func = getattr(ts.module_backend, self.func.__name__) self.has_been_replaced = True return self.func(*args, **kwargs)
[docs]class Transonic: """ Representation of a module using ahead-of-time transonic commands Parameters ---------- use_transonified : bool (optional, default True) If False, don't use the pythranized versions at run time frame : int (optional) (Internal) Frame of ``__init__`` caller. reuse : bool (optional, default True) (Internal) If True, do not recreate an instance. """ def __init__( self, use_transonified=True, frame=None, reuse=True, backend=None ): if frame is None: frame = get_frame(1) self.module_name = module_name = get_module_name(frame) if backend is None: backend = get_backend_name_module(module_name) if isinstance(backend, str): backend = backends[backend] self.backend = backend modules = modules_backends[backend.name] self._compile_at_import_at_creation = has_to_compile_at_import() if reuse and module_name in modules: ts = modules[module_name] for key, value in ts.__dict__.items(): self.__dict__[key] = value return self.is_transpiling = is_transpiling self.has_to_replace = has_to_replace if is_transpiling: self.functions = {} self.classes = {} self.signatures_func = {} self.is_transpiled = False self.is_compiled = False return self.is_compiling = False if not use_transonified or not has_to_replace: self.is_transpiled = False self.is_compiled = False return if "." in module_name: package, module_short_name = module_name.rsplit(".", 1) module_backend_name = package + "." else: module_short_name = module_name module_backend_name = "" module_backend_name += f"__{backend.name}__." + module_short_name self.path_mod = path_mod = Path(_get_pathfile_from_frame(frame)) suffix = ".py" self.path_backend = path_backend = ( path_mod.parent / f"__{backend.name}__" / (module_short_name + suffix) ) # for Meson, we try to import module_backend_name try: _module_backend = import_module(module_backend_name) except ImportError: path_ext = None else: if backend.check_if_compiled(_module_backend): path_ext = Path(_module_backend.__file__) if not path_ext.exists(): path_ext = None else: path_ext = None if has_to_compile_at_import() and path_mod.exists() and path_ext is None: if mpi.has_to_build(path_backend, path_mod): if path_backend.exists(): time_backend = mpi.modification_date(path_backend) else: time_backend = 0 returncode = None if mpi.rank == 0: print(f"Running transonic on file {path_mod}... ", end="") # better to do this in another process because the file is already run... os.environ["TRANSONIC_NO_MPI"] = "1" returncode = subprocess.call( [ sys.executable, "-m", "transonic.run", "-nc", str(path_mod), ] ) del os.environ["TRANSONIC_NO_MPI"] returncode = mpi.bcast(returncode) if returncode != 0: raise RuntimeError( f"transonic does not manage to produce the {backend.name_capitalized} " f"file for {path_mod}" ) if mpi.rank == 0: print("Done!") path_ext = path_backend.with_name( backend.name_ext_from_path_backend(path_backend) ) time_backend_after = mpi.modification_date(path_backend) # We have to touch the files to signal that they are up-to-date if time_backend_after == time_backend and mpi.rank == 0: if not has_to_build(path_ext, path_backend): path_backend.touch() if path_ext.exists(): path_ext.touch() else: path_backend.touch() path_ext = path_ext or path_backend.with_name( backend.name_ext_from_path_backend(path_backend) ) self.path_extension = path_ext if ( has_to_compile_at_import() and path_mod.exists() and not self.path_extension.exists() ): if mpi.rank == 0: print( f"Launching {backend.name_capitalized} to compile a new extension..." ) self.is_compiling, self.process = backend.compile_extension( path_backend, name_ext_file=self.path_extension.name ) self.is_compiled = not self.is_compiling self.is_transpiled = True if not path_ext.exists() and not self.is_compiling: path_ext_alt = path_backend.with_suffix(backend.suffix_extension) if path_ext_alt.exists(): self.path_extension = path_ext = path_ext_alt self.reload_module_backend(module_backend_name) if not self.is_transpiled: logger.warning( f"Module {path_mod} has not been compiled for " f"Transonic-{backend.name_capitalized}" ) else: self.is_compiled = backend.check_if_compiled(self.module_backend) if self.is_compiled: module = inspect.getmodule(frame) # module can be None if (at least) it has been run with runpy if module is not None: if backend.name == "pythran": module.__pythran__ = self.module_backend.__pythran__ module.__transonic__ = _get_data_from_func_or_data( self.module_backend.__transonic__ ) if hasattr(self.module_backend, "arguments_blocks"): self.arguments_blocks = _get_data_from_func_or_data( getattr(self.module_backend, "arguments_blocks") ) modules[module_name] = self def reload_module_backend(self, module_backend_name=None): if module_backend_name is None: module_backend_name = self.module_backend.__name__ if self.path_extension.exists() and not self.is_compiling: self.module_backend = import_from_path( self.path_extension, module_backend_name ) elif self.path_backend.exists(): self.module_backend = import_from_path( self.path_backend, module_backend_name ) else: self.is_transpiled = False self.is_compiled = False
[docs] def transonic_def(self, func): """Decorator used for functions Parameters ---------- func: a function """ if is_method(func): return self.transonic_def_method(func) if is_transpiling or not has_to_replace or not self.is_transpiled: return func if not hasattr(self.module_backend, func.__name__): self.reload_module_backend() try: func_tmp = getattr(self.module_backend, func.__name__) except AttributeError: # TODO: improve what happens in this case logger.warning( f"{self.backend.name_capitalized} file does not seem to be up-to-date:\n" f"{self.module_backend}\nfunc: {func.__name__}" ) func_tmp = func if self.is_compiling: return functools.wraps(func)(CheckCompiling(self, func_tmp)) return func_tmp
[docs] def transonic_def_method(self, func): """Decorator used for methods Parameters ---------- func: a function """ if is_transpiling or not has_to_replace or not self.is_transpiled: return func return TransonicTemporaryMethod(func)
[docs] def boost(self, **kwargs): """Universal decorator for AOT compilation Used for functions, methods and classes. """ return self._boost_decor
[docs] def _boost_decor(self, obj): """Universal decorator for AOT compilation Used for functions, methods and classes. """ if isinstance(obj, type): return self.transonic_class(obj) else: return self.transonic_def(obj)
[docs] def transonic_class(self, cls: type): """Decorator used for classes Parameters ---------- cls: a class """ if is_transpiling: return cls jit_methods = { key: value for key, value in cls.__dict__.items() if isinstance(value, TransonicTemporaryJITMethod) } if jit_methods: cls = jit_class(cls, jit_methods, self.backend) if not has_to_replace or not self.is_transpiled: return cls cls_name = cls.__name__ for key, value in cls.__dict__.items(): if not isinstance(value, TransonicTemporaryMethod): continue func = value.func func_name = func.__name__ name_backend_func = f"__for_method__{cls_name}__{func_name}" name_var_code_new_method = ( f"__code_new_method__{cls_name}__{func_name}" ) if not hasattr(self.module_backend, name_backend_func): self.reload_module_backend() try: backend_func = getattr(self.module_backend, name_backend_func) code_new_method = _get_data_from_func_or_data( getattr(self.module_backend, name_var_code_new_method) ) except AttributeError: # TODO: improve what happens in this case raise RuntimeError( f"{self.backend.name_capitalized} file does not seem to be up-to-date." ) # setattr(cls, key, func) else: namespace = {"backend_func": backend_func} exec(code_new_method, namespace) setattr(cls, key, functools.wraps(func)(namespace["new_method"])) return cls
[docs] def use_block(self, name): """Use the pythranized version of a code block Parameters ---------- name : str The name of the block. """ if not self.is_transpiled: raise ValueError( "`use_block` has to be used protected by `if ts.is_transpiled`" ) if self.is_compiling and not self.process.is_alive(raise_if_error=True): self.is_compiling = False time.sleep(0.1) self.module_backend = import_from_path( self.path_extension, self.module_backend.__name__ ) assert self.backend.check_if_compiled(self.module_backend) self.is_compiled = True func = getattr(self.module_backend, name) argument_names = self.arguments_blocks[name] locals_caller = get_frame(1).f_locals arguments = [locals_caller[name] for name in argument_names] return func(*arguments)
class TransonicTemporaryMethod: """Internal temporary class for methods""" def __init__(self, func): self.func = func def __call__(self, self_bis, *args, **kwargs): raise RuntimeError( "Did you forget to decorate a class using methods decorated " "with transonic? Please decorate it with @boost." ) class TransonicTemporaryJITMethod: """Internal temporary class for JIT methods""" __transonic__ = "jit_method" def __init__(self, func, native, xsimd, openmp): self.func = func self.native = native self.xsimd = xsimd self.openmp = openmp def __call__(self, self_bis, *args, **kwargs): raise RuntimeError( "Did you forget to decorate a class using methods decorated " "with transonic? Please decorate it with @boost." ) def jit_class(cls, jit_methods, backend): """Modify the class by replacing jit methods 1. create a Python file with @jit functions and methods 2. import the file 3. replace the methods """ if not has_to_replace: return cls cls_name = cls.__name__ mod_name = cls.__module__ module = sys.modules[mod_name] if mod_name == "__main__": mod_name = find_module_name_from_path(module.__file__) path_jit_class = mpi.Path(backend.jit.path_class) # 1. create a Python file with @jit functions and methods python_path_dir = path_jit_class / mod_name.replace(".", os.path.sep) python_path = python_path_dir / (cls_name + ".py") if mpi.has_to_build(python_path, module.__file__): from transonic.justintime import _get_module_jit mod = _get_module_jit(backend_name=backend.name, depth_frame=5) if mpi.rank == 0: python_path = mpi.PathSeq(python_path) python_code = ( mod.info_analysis["codes_dependance_classes"][cls_name] + "\n" ) python_code += backend.jit.produce_code_class(cls) write_if_has_to_write(python_path, python_code) python_path = mpi.Path(python_path) mpi.barrier() # 2. import the file python_mod_name = path_jit_class.name + "." + mod_name + "." + cls_name module = import_from_path(python_path, python_mod_name) # 3. replace the methods for name_method, method in jit_methods.items(): func = method.func name_new_method = f"__new_method__{cls.__name__}__{name_method}" new_method = getattr(module, name_new_method) setattr(cls, name_method, functools.wraps(func)(new_method)) return cls