A dive into the heart of S1Tiling a python orchestrator dedicated to OTB ------ [OTB User Days, 21 November 2024](https://www.orfeo-toolbox.org/otb-user-days-2024/) Luc Hermitte (CS Group FRANCE) --- 01 # The original design ~~~ ## The original design ### Workflow 0. Downloads S1 products on PEPS 1. Scans for all S1 products 2. Calibrates all S1 products 3. Cuts margins on all calibrated images 4. Orthorectifies calibrated and cut images to target S2 MGRS tiles 5. Assembles up-to 2 orthorectified images per target tile ~~~ ## The original design ### Issues - High I/O usage - Many `glob`s, at each step - Numerous undeleted files - Reliance on GPFS if not cautious - A file is produced after each OTB application - Complex stop/start-over - Incomplete image files from previous executions - Reliance on `glob`... --- 02 # Goals turned into features ~~~ ## Goals - Industrialize - Reduce I/O - Handle start-over - Be efficient - Handle GeoTIFF metadata - Support any S1 data provider - Simplify installation - Document - Easy to evolve, and reuse --- ## Goals turned into features ### Improve I/O - Reduce I/O usage - Cache known filenames instead of globbing them - Use _in-memory_ pipelines from OTB Python API - Automate removal of old files - Work on local SSD instead of GPFS - Optional copy of static files (DEM, Geoid...) - But jobs on HPC clusters need to correctly set working directories ~~~ ## Goals turned into features ### Start-over - Never regenerate what we don't need anymore - Yet a `glob` on input products is required at start-up - Distinguish incomplete `.tiff` images from complete ones. ~~~ ## Goals turned into features ### Efficiency - All the above, plus - Parallelise processings - Factorise common steps Note: - "above" == IO improvements - common steps like cut+cal before ortho was already factorized ~~~ ## Goals turned into features ### Other features - Automated setting of GeoTIFF metadata - Support of any data provider thanks to [EODAG](https://eodag.readthedocs.io/en/stable/) - [Documented](https://s1-tiling.pages.orfeo-toolbox.org/s1tiling/latest/) - S1Tiling [packet on pypi](https://pypi.org/project/s1tiling/), and module on TREX - Ready-to-use [dockers](https://gitlab.orfeo-toolbox.org/s1-tiling/s1tiling-dockers) Note: Other industrialisation related features
03 # Zoom into the kernel --- ## Zoom into the kernel ### The core ideas - _In-memory_ pipelines of OTB Applications - Processing chains _à la make_ Note: - These two points are as the heart of reducing I/Os and of supporting start-over
Zoom into the kernel
The core ideas: task dependency analysis
Given all existing inputs,
Two passes:
build graph of all possible flows
trim unrequired tasks
Eventually we have a DAG that we can pass to Dask
## Zoom into the kernel ### Pipeline Example: the straightforward one  Note: - The example is straightforward - One output is consumed by only one kind of step - Steps have only one kind of inputs -- even Concatenation ~~~ ## Zoom into the kernel ### Pipeline Example: a bit more complex one  Note: - Given a S2 tile, we build one single LIA map that can be applied to all orthorectified images. - Some outputs (XYZ) are consumed by several kinds of steps - Some steps (LIA, ApplyMap) consume several kinds of inputs - Races can also be resolved (one LIA map could be generated for each date, but only one is kept!) --- ## Zoom into the kernel ### High level API - Processings are done at each `Step` - Exact `Step`s are instantiated by `StepFactories` - Cut, Calibrate, FixThermalNoiseRemoval, - Orthorectification, Concatenation... - They are assembled into _pipelines_ - Exact starting points are instantiated by `FirstStepFactories` - S1 Products - Precise orbit files - S2 MGRS information... Kernel is (now) independent of any Sentinel-1/S1Tiling specificities Note: Dependency Inversion thanks to Factory Method design pattern. ~~~ ## Zoom into the kernel ### Registration examples #### the straightforward case First we have to manually define the various factories, then their sequencing ```python [1|3-4|6-15] pipelines = PipelineDescriptionSequence(config) # Register input factory pipelines.register_inputs('basename', s1_raster_first_inputs_factory) # Register Step Factories: Calibratation ... Concatenation calib_seq = [ExtractSentinel1Metadata, AnalyseBorders, Calibrate, CorrectDenoising, CutBorders] pipelines.register_pipeline(calib_seq, 'PrepareForOrtho', product_required=False, is_name_incremental=True) pipelines.register_pipeline([OrthoRectify], 'OrthoRectify', product_required=False) pipelines.register_pipeline([Concatenate], product_required=True, is_name_incremental=True) ``` A _pipeline_ is a sequence of steps, and pipelines are also sequenced between themselves Note: - The example is straightforward - Chaining between pipelines is implicit ~~~ ## Zoom into the kernel ### Registration examples #### or the bit more complex case ```python [1|3-4|6-19] pipelines = PipelineDescriptionSequence(config) pipelines.register_inputs('tilename', tilename_first_inputs_factory) pipelines.register_inputs('eof', eof_first_inputs_factory) dem_vrt = pipelines.register_pipeline([AgglomerateDEMOnS2], 'AgglomerateDEM', inputs={'tilename': 'tilename'}) s2_dem = pipelines.register_pipeline([ProjectDEMToS2Tile], "ProjectDEMToS2Tile", inputs={"indem": dem_vrt}) s2_height = pipelines.register_pipeline([ProjectGeoidToS2Tile, SumAllHeights], "GenerateHeightForS2Tile", inputs={"in_s2_dem": s2_dem}) xyz = pipelines.register_pipeline([ComputeGroundAndSatPositionsOnDEMFromEOF], "ComputeGroundAndSatPositionsOnDEM", inputs={'ineof': 'eof', 'inheight': s2_height}) lia = pipelines.register_pipeline([ComputeNormalsOnS2, ComputeLIAOnS2], 'ComputeLIAOnS2', inputs={'xyz': xyz}, product_required=True) ``` Note: - The graph is much more complex, remember the graph a few slides ago... - Inputs are NOT Sentinel-1 products - Chaining between pipelines is explicit to control dataflow ~~~ ## Zoom into the kernel ### The binding code ```python [5-9 | 11-27 | 29-36] # The preparation seen earlier config = ... pipelines = ... # Extra preparation step: the input factories need some data pipelines.register_extra_parameters_for_input_factories( dag=dag, # EODAG object used by s1_raster_first_inputs_factory s1_file_manager=s1_file_manager, # Main object in charge of DL S1 products by the factory ) # Now let's execute with Dask! results : List[Outcome] = [] with s1tiling.libs.utils.dask.DaskContext(config) as dask_client: for idx, tile_it in enumerate(tiles_to_process): res = process_one_tile( # Very S1Tiling specific: handle one S2 tile tile_it, idx, nb_tiles, config, pipelines, dask_client.client, ) # ensure_tiled_workspaces_exist(cfg, tile_name, required_workspaces) # pipelines.register_extra_parameters_for_input_factories(tile_name=tile_name) # dsk, required_products, errors = pipelines.generate_tasks() # if errors: # return errors # return client.get(dsk, required_products) results.extend(res) # Reporting results nb_issues = sum(not bool(res) for res in results) if nb_errors_detected > 0: logger.warning('Execution report: %s errors detected', nb_issues) else: logger.info('Execution report: no error detected') for res in results: logger.log(log_level(res), ' - %s', res) ``` Note: - This is a simplified version without: - Creation of directories - Local copy of DEM & geoid - Search & download errors - Handling of return codes - `Outcome` objects are monadic objects that store either a result or an exception --- ## Zoom into the kernel ### High level API Usage We define: - a domain aware `StepFactory` per processing (OTB app, Python function, External process) - a `PipelineDescription` for a sequence of one or several steps. - Multiple OTB steps will be chained in memory automatically - All the `PipelineDescription` instances are registered in a `PipelineDescriptionSequence` Then the kernel will take care of: - Searching all possible inputs thanks to the `FirstStepFactories`. - ...that'll be used to build the reduced DAG of Dask Tasks ~~~ ## Zoom into the kernel ### High level API Usage: `StepFactories` A concrete and specialized [`StepFactory`](https://s1-tiling.pages.orfeo-toolbox.org/s1tiling/latest/developers.html#step-factories) - is configured from the execution configuration object - relies on input products _metadatas_ to - define what the output filename would be - determine the execution parameters -- according to the kind of `StepFactory` - determine what GeoTIFF metadata should be written in the product Note: Several kinds: - File producers: `OTBStepFactory`, `ExecutableStepFactory`, `AnyProducerStepFactory` - `StepFactory`s that extract metadata from files (may be required when starting new pipelines) ~~~ ## Zoom into the kernel ### High level API Usage #### A simple `StepFactory` example ```python[1|2-12|3,14-16|18-23|25-32] class Calibrate(s1tiling.libs.otbpipeline.OTBStepFactory): def __init__(self, cfg: Configuration) -> None: fname_fmt = cfg.fname_fmt.get('calibration', '{rootname}_{calibration_type}_calOk.tiff') super().__init__(cfg, appname='SARCalibration', name='Calibration', gen_tmp_dir=os.path.join(cfg.tmpdir, 'S1'), gen_output_filename=TemplateOutputFilenameGenerator(fname_fmt), image_description='{calibration_type} calibrated Sentinel-{flying_unit_code_short} IW GRD', ) self.__calibration_type = cfg.calibration_type self.__removethermalnoise = cfg.removethermalnoise def _update_filename_meta_pre_hook(self, meta: Meta) -> Meta: meta['calibration_type'] = self.__calibration_type return meta def update_image_metadata(self, meta: Meta, all_inputs: InputList) -> None: super().update_image_metadata(meta, all_inputs) assert 'image_metadata' in meta imd = meta['image_metadata'] imd['CALIBRATION'] = str(self.__calibration_type) imd['NOISE_REMOVED'] = str(self.__removethermalnoise) def parameters(self, meta: Meta) -> OTBParameters: params : OTBParameters = { 'ram' : ram(self.ram_per_process), self.param_in : s1tiling.libs.meta.in_filename(meta), 'lut' : self.__calibration_type, 'removenoise' : self.__removethermalnoise, } return params ``` --- ## Zoom into the kernel ## Let's dive a bit more - Dask task nodes are actually filenames (not image rasters) - and the edges are `Pipeline` instantiated from: - `PipelineDescriptionSequence` - plus input metadata - When Dask executes a `Pipeline`, it instanciates generic `Step` instances - The [`Steps`](https://s1-tiling.pages.orfeo-toolbox.org/s1tiling/latest/developers.html#step-factories) will generate their output as per specified in the related `StepFactory`, and also update GeoTIFF metadata. But none of it is your concern --- 04 # What's next? ~~~ ## What's next? - New chains in S1Tiling (RTC, IA...) - A new processing chain: SLC Time Series - => Extract S1Tiling kernel into its own packet - YAML description of pipelines --- Q&A