Source code for transonic.compiler

"""Use Pythran to create extensions
===================================

User API
--------

.. autofunction:: wait_for_all_extensions

Internal API
------------

.. autofunction:: make_hex

.. autoclass:: SchedulerPopen
   :members:
   :private-members:

.. autofunction:: compile_extension

"""

import multiprocessing
import subprocess
import time
from typing import Union, Optional
import sysconfig
import hashlib
import sys
import os
from datetime import datetime
import logging

from transonic import mpi
from transonic.mpi import Path, PathSeq
from transonic.log import logger
from transonic.progress import Progress

ext_suffix = sysconfig.get_config_var("EXT_SUFFIX") or ".so"


[docs]def modification_date(pathfile): """Get the modification date of a file""" return datetime.fromtimestamp(os.path.getmtime(str(pathfile)))
[docs]def has_to_build(output_file: Path, input_file: Path): """Check if a file has to be (re)built""" output_file = PathSeq(output_file) input_file = PathSeq(input_file) if not output_file.exists(): return True mod_date_output = modification_date(output_file) if mod_date_output < modification_date(input_file): return True return False
[docs]def make_hex(src): """Produce a hash from a sting""" return hashlib.md5(src.encode("utf8")).hexdigest()
[docs]class SchedulerPopen: """Limit the number of compilations performed in parallel""" deltat = 0.2 def __init__(self, parallel=True): self.progress = Progress(redirect_stdout=False, redirect_stderr=False) if mpi.rank > 0: return self.processes = [] if parallel: self.limit_nb_processes = max(1, multiprocessing.cpu_count() // 2) else: self.limit_nb_processes = 1 def block_until_avail(self, parallel=True): if mpi.rank == 0: if parallel: limit = self.limit_nb_processes else: limit = 1 while len(self.processes) >= limit: time.sleep(self.deltat) self.processes = [ process for process in self.processes if process.is_alive_root() ] mpi.barrier(timeout=None)
[docs] def wait_for_all_extensions(self): """Wait until all compilation processes are done""" if mpi.rank == 0: total = len(scheduler.processes) task = self.progress.add_task("Wait for all extensions", total=total) while self.processes: time.sleep(self.deltat) self.processes = [ process for process in self.processes if process.is_alive_root() ] self.progress.update(task, completed=total - len(self.processes)) mpi.barrier(timeout=None)
def compile_extension( self, path: Path, backend: str, name_ext_file: str, native=False, xsimd=False, openmp=False, str_accelerator_flags: Optional[str] = None, parallel=True, force=True, ): if not force: path_out = path.with_name(name_ext_file) if not has_to_build(path_out, path): logger.warning( f"Do not {backend}ize {path} because it seems up-to-date " "(but the compilation options may have changed). " "You can force the compilation with the option -f." ) return if mpi.rank == 0: task = self.progress.add_task( f"Schedule {backend}ization: {path.name}" ) def advance(value): if mpi.rank == 0: self.progress.update(task, advance=value) if str_accelerator_flags is not None: flags = str_accelerator_flags.strip().split() else: flags = [] def update_flags(flag): if flag not in flags: flags.append(flag) if native and os.name != "nt": update_flags("-march=native") if xsimd: update_flags("-DUSE_XSIMD") if openmp: update_flags("-fopenmp") if logger.is_enable_for("debug"): update_flags("-v") if logger.getEffectiveLevel() < logging.INFO: env = dict(os.environ, TRANSONIC_DEBUG="1") else: env = None words_command = [ sys.executable, "-m", "transonic_cl.run_backend", path.name, "-b", backend, ] words_command.extend(("-o", name_ext_file)) words_command.extend(flags) cwd = path.parent advance(10) self.block_until_avail(parallel) advance(20) process = None if mpi.rank == 0: if logger.getEffectiveLevel() <= logging.INFO: stdout = stderr = None else: stdout = stderr = subprocess.PIPE process = subprocess.Popen( words_command, cwd=cwd, stdout=stdout, stderr=stderr, universal_newlines=True, env=env, ) process = mpi.ShellProcessMPI(process) if mpi.rank == 0: self.processes.append(process) advance(70) return process
scheduler = SchedulerPopen()
[docs]def wait_for_all_extensions(): """Wait until all compilation processes are done""" scheduler.wait_for_all_extensions()
[docs]def compile_extension( path: Union[Path, str], backend: str, name_ext_file: str, native=False, xsimd=False, openmp=False, str_accelerator_flags: Optional[str] = None, parallel=False, force=False, ): if not isinstance(path, Path): path = Path(path) # return the process return scheduler.compile_extension( path, backend, name_ext_file, native=native, xsimd=xsimd, openmp=openmp, str_accelerator_flags=str_accelerator_flags, parallel=parallel, force=force, )