Multiprocessing Pipelines#
Processing astronomical images using multiple CPU cores can significantly accelerate data reduction, especially when working with large datasets. However, this approach requires careful consideration of how large files are accessed and shared between processes.
Concurrent Access to Large Files#
When calibrating each image in a separate process, it is inefficient to repeatedly pass large master calibration files (typically three, each as large as a science image) between processes. Instead, these files can be stored on disk and accessed concurrently by each process. The following example demonstrates how to do this, starting with the creation of master calibration files as shown in the calibration tutorial.
Show code cell source
from astropy.io import fits
from dateutil import parser
from glob import glob
from collections import defaultdict
from pathlib import Path
from datetime import timedelta
from eloy import calibration
def load_calibration_files():
files = glob("./photometry_raw_data/**/*.fit*")
files_meta = defaultdict(dict)
observations = defaultdict(lambda: defaultdict(int))
for file in files:
header = fits.getheader(file)
file_date = parser.parse(header["DATE-OBS"])
# because some observations are taken over midnight
file_date = file_date - timedelta(hours=10)
files_meta[file]["date"] = file_date
files_meta[file]["type"] = Path(file).parent.stem
observations[file_date.date()][files_meta[file]["type"]] += 1
# only picking up the science images
lights = list(filter(lambda f: files_meta[f]["type"] == "ScienceImages", files))
# sorting them by date
lights = sorted(lights, key=lambda f: files_meta[f]["date"])
# selecting the first one
file = lights[0]
def filter_files(files, file_type):
return list(filter(lambda f: files_meta[f]["type"] == file_type, files))
biases = filter_files(files, "Bias")
darks = filter_files(files, "Darks")
flats = filter_files(files, "Flats")
bias = calibration.master_bias(files=biases)
dark = calibration.master_dark(files=darks, bias=bias)
flat = calibration.master_flat(files=flats, bias=bias, dark=dark)
return bias, dark, flat, lights
from eloy import utils
bias, dark, flat, lights = load_calibration_files()
master_files = {
"bias": bias,
"dark": dark,
"flat": flat,
}
shared_data = utils.share_data(master_files)
This approach allows shared access to large numpy arrays saved on disk. For more information, see the documentation on numpy memory-mapped arrays.
shared_data["bias"]
memmap([[1286.5, 1287. , 1304. , ..., 1281. , 1281.5, 1278. ],
[1291.5, 1288.5, 1304. , ..., 1284.5, 1285.5, 1280.5],
[1292. , 1295. , 1296. , ..., 1288.5, 1284. , 1287. ],
...,
[1287.5, 1289. , 1288.5, ..., 1281. , 1278.5, 1284. ],
[1289.5, 1291. , 1298. , ..., 1278. , 1278.5, 1273.5],
[1283.5, 1296. , 1293.5, ..., 1278. , 1280. , 1279.5]])
Example: Multiprocessing Pipeline#
Important
This example will not work inside a Jupyter notebook, as explained in the multiprocessing module documentation.
Below is an example of how a multiprocessing pipeline might be implemented in a standalone Python script:
# Make sure to call the load_calibration_files function and import the necessary modules.
def process_image(index_file, shared_data=None):
i, file = index_file
image = fits.getdata(file)
header = fits.getheader(file)
# Apply the master calibration
calibrated_image = calibration.calibrate(
image,
exposure=header["EXPTIME"],
bias=shared_data["bias"],
dark=shared_data["dark"],
flat=shared_data["flat"],
)
return i, calibrated_image
if __name__ == "__main__":
import multiprocessing as mp
from tqdm import tqdm
from functools import partial
from eloy import utils
bias, dark, flat, lights = load_calibration_files()
master_files = {
"bias": bias,
"dark": dark,
"flat": flat,
}
shared_data = utils.share_data(master_files)
indexes_images = list(enumerate(lights))
calibrated_images = {}
with mp.Pool() as pool:
for i, calibrated_image in tqdm(
pool.imap(partial(process_image, shared_data=shared_data), indexes_images),
total=len(indexes_images),
):
calibrated_images[i] = calibrated_image