Source code for transonic.aheadoftime

"""User runtime API for the ahead-of-time compilation
=====================================================

User API
--------

.. autofunction:: boost

.. autoclass:: Transonic
   :members:
   :private-members:

Internal API
------------

.. autofunction:: _get_transonic_calling_module

.. autoclass:: CheckCompiling
   :members:
   :private-members:

"""

import inspect
import time
import subprocess
import os
import functools
import sys
from importlib import import_module

from transonic.backends import backends, get_backend_name_module
from transonic.config import has_to_replace, backend_default
from transonic.log import logger
from transonic import mpi
from transonic.mpi import Path

from transonic.util import (
    get_module_name,
    has_to_compile_at_import,
    import_from_path,
    has_to_build,
    modification_date,
    is_method,
    write_if_has_to_write,
    find_module_name_from_path,
    _get_pathfile_from_frame,
    get_frame,
)

if mpi.nb_proc == 1:
    mpi.has_to_build = has_to_build
    mpi.modification_date = modification_date

is_transpiling = False
modules_backends = {backend_name: {} for backend_name in backends.keys()}
modules = modules_backends[backend_default]


[docs]def _get_transonic_calling_module(backend_name: str = None): """Get the Transonic instance corresponding to the calling module Parameters ---------- backend_name: str """ frame = get_frame(2) module_name = get_module_name(frame) if backend_name is None: backend_name = get_backend_name_module(module_name) modules = modules_backends[backend_name] if module_name in modules: ts = modules[module_name] if ( ts.is_transpiling != is_transpiling or ts._compile_at_import_at_creation != has_to_compile_at_import() or ( hasattr(ts, "path_mod") and ts.path_backend.exists() and mpi.has_to_build(ts.path_backend, ts.path_mod) ) ): ts = Transonic(frame=frame, reuse=False, backend=backend_name) else: ts = Transonic(frame=frame, reuse=False, backend=backend_name) return ts
[docs]def boost( obj=None, backend: str = None, inline=False, boundscheck=True, wraparound=True, cdivision=False, nonecheck=True, nogil=False, ): """Decorator to declare that an object can be accelerated Parameters ---------- obj: a function, a method or a class """ if backend is not None and not isinstance(backend, str): raise TypeError ts = _get_transonic_calling_module(backend_name=backend) decor = ts.boost( inline=inline, nogil=nogil, boundscheck=boundscheck, wraparound=wraparound, cdivision=cdivision, nonecheck=nonecheck, ) if callable(obj) or isinstance(obj, type): return decor(obj) else: return decor
[docs]class CheckCompiling: """Check if the module is being compiled and replace the module and the function""" def __init__(self, ts, func): self.has_been_replaced = False self.ts = ts self.func = func def __call__(self, *args, **kwargs): if self.has_been_replaced: return self.func(*args, **kwargs) ts = self.ts if ts.is_compiling and not ts.process.is_alive(raise_if_error=True): ts.is_compiling = False time.sleep(0.1) ts.module_backend = import_from_path( ts.path_extension, ts.module_backend.__name__ ) assert ts.backend.check_if_compiled(self.ts.module_backend) ts.is_compiled = True if not ts.is_compiling: self.func = getattr(ts.module_backend, self.func.__name__) self.has_been_replaced = True return self.func(*args, **kwargs)
[docs]class Transonic: """ Representation of a module using ahead-of-time transonic commands Parameters ---------- use_transonified : bool (optional, default True) If False, don't use the pythranized versions at run time frame : int (optional) (Internal) Frame of ``__init__`` caller. reuse : bool (optional, default True) (Internal) If True, do not recreate an instance. """ def __init__( self, use_transonified=True, frame=None, reuse=True, backend=None ): if frame is None: frame = get_frame(1) self.module_name = module_name = get_module_name(frame) if backend is None: backend = get_backend_name_module(module_name) if isinstance(backend, str): backend = backends[backend] self.backend = backend modules = modules_backends[backend.name] self._compile_at_import_at_creation = has_to_compile_at_import() if reuse and module_name in modules: ts = modules[module_name] for key, value in ts.__dict__.items(): self.__dict__[key] = value return self.is_transpiling = is_transpiling self.has_to_replace = has_to_replace if is_transpiling: self.functions = {} self.classes = {} self.signatures_func = {} self.is_transpiled = False self.is_compiled = False return self.is_compiling = False if not use_transonified or not has_to_replace: self.is_transpiled = False self.is_compiled = False return if "." in module_name: package, module_short_name = module_name.rsplit(".", 1) module_backend_name = package + "." else: module_short_name = module_name module_backend_name = "" module_backend_name += f"__{backend.name}__." + module_short_name self.path_mod = path_mod = Path(_get_pathfile_from_frame(frame)) suffix = ".py" self.path_backend = path_backend = ( path_mod.parent / f"__{backend.name}__" / (module_short_name + suffix) ) # for Meson, we try to import module_backend_name try: _module_backend = import_module(module_backend_name) except ImportError: path_ext = None else: if backend.check_if_compiled(_module_backend): path_ext = Path(_module_backend.__file__) if not path_ext.exists(): path_ext = None else: path_ext = None if has_to_compile_at_import() and path_mod.exists() and path_ext is None: if mpi.has_to_build(path_backend, path_mod): if path_backend.exists(): time_backend = mpi.modification_date(path_backend) else: time_backend = 0 returncode = None if mpi.rank == 0: print(f"Running transonic on file {path_mod}... ", end="") # better to do this in another process because the file is already run... os.environ["TRANSONIC_NO_MPI"] = "1" returncode = subprocess.call( [ sys.executable, "-m", "transonic.run", "-nc", str(path_mod), ] ) del os.environ["TRANSONIC_NO_MPI"] returncode = mpi.bcast(returncode) if returncode != 0: raise RuntimeError( f"transonic does not manage to produce the {backend.name_capitalized} " f"file for {path_mod}" ) if mpi.rank == 0: print("Done!") path_ext = path_backend.with_name( backend.name_ext_from_path_backend(path_backend) ) time_backend_after = mpi.modification_date(path_backend) # We have to touch the files to signal that they are up-to-date if time_backend_after == time_backend and mpi.rank == 0: if not has_to_build(path_ext, path_backend): path_backend.touch() if path_ext.exists(): path_ext.touch() else: path_backend.touch() path_ext = path_ext or path_backend.with_name( backend.name_ext_from_path_backend(path_backend) ) self.path_extension = path_ext if ( has_to_compile_at_import() and path_mod.exists() and not self.path_extension.exists() ): if mpi.rank == 0: print( f"Launching {backend.name_capitalized} to compile a new extension..." ) self.is_compiling, self.process = backend.compile_extension( path_backend, name_ext_file=self.path_extension.name ) self.is_compiled = not self.is_compiling self.is_transpiled = True if not path_ext.exists() and not self.is_compiling: path_ext_alt = path_backend.with_suffix(backend.suffix_extension) if path_ext_alt.exists(): self.path_extension = path_ext = path_ext_alt self.reload_module_backend(module_backend_name) if not self.is_transpiled: logger.warning( f"Module {path_mod} has not been compiled for " f"Transonic-{backend.name_capitalized}" ) else: self.is_compiled = backend.check_if_compiled(self.module_backend) if self.is_compiled: module = inspect.getmodule(frame) # module can be None if (at least) it has been run with runpy if module is not None: if backend.name == "pythran": module.__pythran__ = self.module_backend.__pythran__ module.__transonic__ = self.module_backend.__transonic__ if hasattr(self.module_backend, "arguments_blocks"): self.arguments_blocks = getattr( self.module_backend, "arguments_blocks" ) modules[module_name] = self def reload_module_backend(self, module_backend_name=None): if module_backend_name is None: module_backend_name = self.module_backend.__name__ if self.path_extension.exists() and not self.is_compiling: self.module_backend = import_from_path( self.path_extension, module_backend_name ) elif self.path_backend.exists(): self.module_backend = import_from_path( self.path_backend, module_backend_name ) else: self.is_transpiled = False self.is_compiled = False
[docs] def transonic_def(self, func): """Decorator used for functions Parameters ---------- func: a function """ if is_method(func): return self.transonic_def_method(func) if is_transpiling or not has_to_replace or not self.is_transpiled: return func if not hasattr(self.module_backend, func.__name__): self.reload_module_backend() try: func_tmp = getattr(self.module_backend, func.__name__) except AttributeError: # TODO: improve what happens in this case logger.warning( f"{self.backend.name_capitalized} file does not seem to be up-to-date:\n" f"{self.module_backend}\nfunc: {func.__name__}" ) func_tmp = func if self.is_compiling: return functools.wraps(func)(CheckCompiling(self, func_tmp)) return func_tmp
[docs] def transonic_def_method(self, func): """Decorator used for methods Parameters ---------- func: a function """ if is_transpiling or not has_to_replace or not self.is_transpiled: return func return TransonicTemporaryMethod(func)
[docs] def boost(self, **kwargs): """Universal decorator for AOT compilation Used for functions, methods and classes. """ return self._boost_decor
[docs] def _boost_decor(self, obj): """Universal decorator for AOT compilation Used for functions, methods and classes. """ if isinstance(obj, type): return self.transonic_class(obj) else: return self.transonic_def(obj)
[docs] def transonic_class(self, cls: type): """Decorator used for classes Parameters ---------- cls: a class """ if is_transpiling: return cls jit_methods = { key: value for key, value in cls.__dict__.items() if isinstance(value, TransonicTemporaryJITMethod) } if jit_methods: cls = jit_class(cls, jit_methods, self.backend) if not has_to_replace or not self.is_transpiled: return cls cls_name = cls.__name__ for key, value in cls.__dict__.items(): if not isinstance(value, TransonicTemporaryMethod): continue func = value.func func_name = func.__name__ name_backend_func = f"__for_method__{cls_name}__{func_name}" name_var_code_new_method = ( f"__code_new_method__{cls_name}__{func_name}" ) if not hasattr(self.module_backend, name_backend_func): self.reload_module_backend() try: backend_func = getattr(self.module_backend, name_backend_func) code_new_method = getattr( self.module_backend, name_var_code_new_method ) except AttributeError: # TODO: improve what happens in this case raise RuntimeError( f"{self.backend.name_capitalized} file does not seem to be up-to-date." ) # setattr(cls, key, func) else: namespace = {"backend_func": backend_func} exec(code_new_method, namespace) setattr(cls, key, functools.wraps(func)(namespace["new_method"])) return cls
[docs] def use_block(self, name): """Use the pythranized version of a code block Parameters ---------- name : str The name of the block. """ if not self.is_transpiled: raise ValueError( "`use_block` has to be used protected by `if ts.is_transpiled`" ) if self.is_compiling and not self.process.is_alive(raise_if_error=True): self.is_compiling = False time.sleep(0.1) self.module_backend = import_from_path( self.path_extension, self.module_backend.__name__ ) assert self.backend.check_if_compiled(self.module_backend) self.is_compiled = True func = getattr(self.module_backend, name) argument_names = self.arguments_blocks[name] locals_caller = get_frame(1).f_locals arguments = [locals_caller[name] for name in argument_names] return func(*arguments)
class TransonicTemporaryMethod: """Internal temporary class for methods""" def __init__(self, func): self.func = func def __call__(self, self_bis, *args, **kwargs): raise RuntimeError( "Did you forget to decorate a class using methods decorated " "with transonic? Please decorate it with @boost." ) class TransonicTemporaryJITMethod: """Internal temporary class for JIT methods""" __transonic__ = "jit_method" def __init__(self, func, native, xsimd, openmp): self.func = func self.native = native self.xsimd = xsimd self.openmp = openmp def __call__(self, self_bis, *args, **kwargs): raise RuntimeError( "Did you forget to decorate a class using methods decorated " "with transonic? Please decorate it with @boost." ) def jit_class(cls, jit_methods, backend): """Modify the class by replacing jit methods 1. create a Python file with @jit functions and methods 2. import the file 3. replace the methods """ if not has_to_replace: return cls cls_name = cls.__name__ mod_name = cls.__module__ module = sys.modules[mod_name] if mod_name == "__main__": mod_name = find_module_name_from_path(module.__file__) path_jit_class = mpi.Path(backend.jit.path_class) # 1. create a Python file with @jit functions and methods python_path_dir = path_jit_class / mod_name.replace(".", os.path.sep) python_path = python_path_dir / (cls_name + ".py") if mpi.has_to_build(python_path, module.__file__): from transonic.justintime import _get_module_jit mod = _get_module_jit(backend_name=backend.name, depth_frame=5) if mpi.rank == 0: python_path = mpi.PathSeq(python_path) python_code = ( mod.info_analysis["codes_dependance_classes"][cls_name] + "\n" ) python_code += backend.jit.produce_code_class(cls) write_if_has_to_write(python_path, python_code) python_path = mpi.Path(python_path) mpi.barrier() # 2. import the file python_mod_name = path_jit_class.name + "." + mod_name + "." + cls_name module = import_from_path(python_path, python_mod_name) # 3. replace the methods for name_method, method in jit_methods.items(): func = method.func name_new_method = f"__new_method__{cls.__name__}__{name_method}" new_method = getattr(module, name_new_method) setattr(cls, name_method, functools.wraps(func)(new_method)) return cls