Multiprocessing Pipelines#

Processing astronomical images using multiple CPU cores can significantly accelerate data reduction, especially when working with large datasets. However, this approach requires careful consideration of how large files are accessed and shared between processes.

Concurrent Access to Large Files#

When calibrating each image in a separate process, it is inefficient to repeatedly pass large master calibration files (typically three, each as large as a science image) between processes. Instead, these files can be stored on disk and accessed concurrently by each process. The following example demonstrates how to do this, starting with the creation of master calibration files as shown in the calibration tutorial.

Hide code cell source
from astropy.io import fits
from dateutil import parser
from glob import glob
from collections import defaultdict
from pathlib import Path
from datetime import timedelta
from eloy import calibration


def load_calibration_files():

    files = glob("./photometry_raw_data/**/*.fit*")
    files_meta = defaultdict(dict)
    observations = defaultdict(lambda: defaultdict(int))

    for file in files:
        header = fits.getheader(file)
        file_date = parser.parse(header["DATE-OBS"])
        # because some observations are taken over midnight
        file_date = file_date - timedelta(hours=10)
        files_meta[file]["date"] = file_date
        files_meta[file]["type"] = Path(file).parent.stem
        observations[file_date.date()][files_meta[file]["type"]] += 1

    # only picking up the science images
    lights = list(filter(lambda f: files_meta[f]["type"] == "ScienceImages", files))
    # sorting them by date
    lights = sorted(lights, key=lambda f: files_meta[f]["date"])
    # selecting the first one
    file = lights[0]

    def filter_files(files, file_type):
        return list(filter(lambda f: files_meta[f]["type"] == file_type, files))

    biases = filter_files(files, "Bias")
    darks = filter_files(files, "Darks")
    flats = filter_files(files, "Flats")

    bias = calibration.master_bias(files=biases)
    dark = calibration.master_dark(files=darks, bias=bias)
    flat = calibration.master_flat(files=flats, bias=bias, dark=dark)

    return bias, dark, flat, lights
from eloy import utils

bias, dark, flat, lights = load_calibration_files()

master_files = {
    "bias": bias,
    "dark": dark,
    "flat": flat,
}

shared_data = utils.share_data(master_files)

This approach allows shared access to large numpy arrays saved on disk. For more information, see the documentation on numpy memory-mapped arrays.

shared_data["bias"]
memmap([[1286.5, 1287. , 1304. , ..., 1281. , 1281.5, 1278. ],
        [1291.5, 1288.5, 1304. , ..., 1284.5, 1285.5, 1280.5],
        [1292. , 1295. , 1296. , ..., 1288.5, 1284. , 1287. ],
        ...,
        [1287.5, 1289. , 1288.5, ..., 1281. , 1278.5, 1284. ],
        [1289.5, 1291. , 1298. , ..., 1278. , 1278.5, 1273.5],
        [1283.5, 1296. , 1293.5, ..., 1278. , 1280. , 1279.5]])

Example: Multiprocessing Pipeline#

Important

This example will not work inside a Jupyter notebook, as explained in the multiprocessing module documentation.

Below is an example of how a multiprocessing pipeline might be implemented in a standalone Python script:

# Make sure to call the load_calibration_files function and import the necessary modules.

def process_image(index_file, shared_data=None):
    i, file = index_file
    image = fits.getdata(file)
    header = fits.getheader(file)

    # Apply the master calibration
    calibrated_image = calibration.calibrate(
        image,
        exposure=header["EXPTIME"],
        bias=shared_data["bias"],
        dark=shared_data["dark"],
        flat=shared_data["flat"],
    )

    return i, calibrated_image

if __name__ == "__main__":

    import multiprocessing as mp
    from tqdm import tqdm
    from functools import partial
    from eloy import utils

    bias, dark, flat, lights = load_calibration_files()

    master_files = {
        "bias": bias,
        "dark": dark,
        "flat": flat,
    }

    shared_data = utils.share_data(master_files)
    indexes_images = list(enumerate(lights))
    calibrated_images = {}

    with mp.Pool() as pool:
        for i, calibrated_image in tqdm(
            pool.imap(partial(process_image, shared_data=shared_data), indexes_images),
            total=len(indexes_images),
        ):
            calibrated_images[i] = calibrated_image