Source code for transonic.compiler

"""Use Pythran to create extensions
===================================

User API
--------

.. autofunction:: wait_for_all_extensions

Internal API
------------

.. autofunction:: make_hex

.. autoclass:: SchedulerPopen
   :members:
   :private-members:

.. autofunction:: compile_extension

"""

import hashlib
import logging
import multiprocessing
import os
import subprocess
import sys
import sysconfig
import time
from datetime import datetime
from typing import Optional, Union

from transonic import mpi
from transonic.log import logger
from transonic.mpi import Path, PathSeq
from transonic.progress import Progress

ext_suffix = sysconfig.get_config_var("EXT_SUFFIX") or ".so"


[docs]def modification_date(pathfile): """Get the modification date of a file""" return datetime.fromtimestamp(os.path.getmtime(str(pathfile)))
[docs]def has_to_build(output_file: Path, input_file: Path): """Check if a file has to be (re)built""" output_file = PathSeq(output_file) input_file = PathSeq(input_file) if not output_file.exists(): return True mod_date_output = modification_date(output_file) if mod_date_output < modification_date(input_file): return True return False
[docs]def make_hex(src): """Produce a hash from a sting""" return hashlib.md5(src.encode("utf8")).hexdigest()
[docs]class SchedulerPopen: """Limit the number of compilations performed in parallel""" deltat = 0.2 def __init__(self, parallel=True): self.progress = Progress(redirect_stdout=False, redirect_stderr=False) if mpi.rank > 0: return self.processes = [] if parallel: self.limit_nb_processes = max(1, multiprocessing.cpu_count() // 2) else: self.limit_nb_processes = 1 def block_until_avail(self, parallel=True): if mpi.rank == 0: if parallel: limit = self.limit_nb_processes else: limit = 1 while len(self.processes) >= limit: time.sleep(self.deltat) self.processes = [ process for process in self.processes if process.is_alive_root() ] mpi.barrier(timeout=None)
[docs] def wait_for_all_extensions(self): """Wait until all compilation processes are done""" if mpi.rank == 0: total = len(scheduler.processes) task = self.progress.add_task("Wait for all extensions", total=total) while self.processes: time.sleep(self.deltat) self.processes = [ process for process in self.processes if process.is_alive_root() ] self.progress.update(task, completed=total - len(self.processes)) mpi.barrier(timeout=None)
def compile_extension( self, path: Path, backend: str, name_ext_file: str, native=False, xsimd=False, openmp=False, str_accelerator_flags: Optional[str] = None, parallel=True, force=True, ): if not force: path_out = path.with_name(name_ext_file) if not has_to_build(path_out, path): logger.warning( f"Do not {backend}ize {path} because it seems up-to-date " "(but the compilation options may have changed). " "You can force the compilation with the option -f." ) return if mpi.rank == 0: task = self.progress.add_task( f"Schedule {backend}ization: {path.name}" ) def advance(value): if mpi.rank == 0: self.progress.update(task, advance=value) if str_accelerator_flags is not None: flags = str_accelerator_flags.strip().split() else: flags = [] def update_flags(flag): if flag not in flags: flags.append(flag) if native and os.name != "nt": update_flags("-march=native") if xsimd: update_flags("-DUSE_XSIMD") if openmp: update_flags("-fopenmp") if logger.is_enable_for("debug"): update_flags("-v") if logger.getEffectiveLevel() < logging.INFO: env = dict(os.environ, TRANSONIC_DEBUG="1") else: env = None words_command = [ sys.executable, "-m", "transonic_cl.run_backend", path.name, "-b", backend, ] words_command.extend(("-o", name_ext_file)) words_command.extend(flags) cwd = path.parent advance(10) self.block_until_avail(parallel) advance(20) process = None if mpi.rank == 0: if logger.getEffectiveLevel() <= logging.INFO: stdout = stderr = None else: stdout = stderr = subprocess.PIPE process = subprocess.Popen( words_command, cwd=cwd, stdout=stdout, stderr=stderr, universal_newlines=True, env=env, ) process = mpi.ShellProcessMPI(process) if mpi.rank == 0: self.processes.append(process) advance(70) return process
scheduler = SchedulerPopen()
[docs]def wait_for_all_extensions(): """Wait until all compilation processes are done""" scheduler.wait_for_all_extensions()
[docs]def compile_extension( path: Union[Path, str], backend: str, name_ext_file: str, native=False, xsimd=False, openmp=False, str_accelerator_flags: Optional[str] = None, parallel=False, force=False, ): if not isinstance(path, Path): path = Path(path) # return the process return scheduler.compile_extension( path, backend, name_ext_file, native=native, xsimd=xsimd, openmp=openmp, str_accelerator_flags=str_accelerator_flags, parallel=parallel, force=force, )