import os
import logging
import json
from astropy.io import fits
from astropy.nddata.utils import Cutout2D
import numpy as np
import copy
import numpy as np
import numpy as np
[docs]
def open_fits_image(file_path):
"""
Opens a FITS file and returns its image data and header.
Parameters:
----------
file_path : str
Path to the FITS file.
Returns:
-------
tuple
A tuple containing the image data and the header.
If the file cannot be opened or lacks data, returns (None, None).
"""
try:
# Open the FITS file
with fits.open(file_path) as hdul:
# Extract the image data and header from the first HDU
image_data = hdul[0].data
header = hdul[0].header
return image_data, header
except FileNotFoundError:
print(f"Error: File not found at '{file_path}'.")
return None, None
except Exception as e:
print(f"Error: Unable to open the FITS file. {e}")
return None, None
[docs]
def recenter_image(image, x, y, final_size=None):
"""
Recenters the image around the coordinates (xm, ym) and returns a square image with odd size.
Parameters:
- image: 2D numpy array.
- xm, ym: floats or ints. Coordinates to be the new center (in image coordinates).
- final_size: optional. Final size of the image. If None, will be the largest odd size possible
that fits in the image given (xm, ym) as center.
Returns:
- centered_image: 2D numpy array cropped and centered at (xm, ym).
"""
h, w = image.shape
# Round center to int
xc, yc = int(round(x)), int(round(y))
# Compute max size if not specified
if final_size is None:
max_half = min(xc, yc, w - xc - 1, h - yc - 1)
final_size = 2 * max_half + 1 # Ensures odd size
elif final_size % 2 == 0:
final_size -= 1 # Make sure it's odd
half = final_size // 2
# Crop region
x1, x2 = xc - half, xc + half + 1
y1, y2 = yc - half, yc + half + 1
# Check boundaries
if x1 < 0 or y1 < 0 or x2 > w or y2 > h:
raise ValueError("Requested recentered image would exceed original image bounds.")
return image[y1:y2, x1:x2]
[docs]
def vary_galaxy_image(image, sigma_bkg = None, num_realizations=100):
"""
Generates multiple realizations of an image with Poisson noise (photon noise) and Gaussian background noise.
Parameters:
- image (ndarray): Background-subtracted galaxy image.
- sigma_bkg (float): Standard deviation of the background noise.
- num_realizations (int): Number of noise realizations to generate.
Returns:
- noisy_images (ndarray): Stack of noisy images with shape (num_realizations, H, W).
"""
image_nonnegative = np.maximum(image, 0) # Ensure non-negative values for Poisson noise
# Generate Poisson noise realizations
poisson_noisy_images = np.random.poisson(image_nonnegative[:, :, np.newaxis],
size=(image.shape[0], image.shape[1], num_realizations))
if sigma_bkg is not None:
# Generate Gaussian background noise realizations
gaussian_noise = np.random.normal(0, sigma_bkg, size=poisson_noisy_images.shape)
# Combine Poisson and Gaussian noise
total_noisy_images = poisson_noisy_images + gaussian_noise
return np.moveaxis(total_noisy_images, -1, 0)
else:
return np.moveaxis(poisson_noisy_images, -1, 0) # Move realization axis to first dimension
def _load_config(config_file):
"""
Load a JSON configuration file.
Parameters:
-----------
config_file : str
Path to the configuration file.
Returns:
--------
config : dict
Configuration dictionary.
"""
if not os.path.exists(config_file):
raise FileNotFoundError(f"Configuration file not found: {config_file}")
with open(config_file, 'r') as f:
config = json.load(f)
return config
def _initialize_logger(log_file):
"""
Initialize a logger to log pipeline events.
Parameters:
-----------
log_file : str
Path to the log file.
"""
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w'
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logging.getLogger().addHandler(console_handler)
def _ensure_directory_exists(directory):
"""
Ensure that a directory exists, creating it if necessary.
Parameters:
-----------
directory : str
Path to the directory.
"""
if not os.path.exists(directory):
os.makedirs(directory)
import numpy as np
def _flux_weighted_center(image, x_center, y_center, a, b, theta):
"""
Calculate the flux-weighted center within an elliptical aperture.
Parameters:
----------
image : ndarray
2D array representing the image.
x_center : float
x-coordinate of the initial center.
y_center : float
y-coordinate of the initial center.
a : float
Semi-major axis of the ellipse.
b : float
Semi-minor axis of the ellipse.
theta : float
Orientation angle of the ellipse in radians (measured counterclockwise from the x-axis).
Returns:
-------
new_x_center : float
x-coordinate of the flux-weighted center.
new_y_center : float
y-coordinate of the flux-weighted center.
"""
# Create a grid of pixel coordinates
y, x = np.indices(image.shape)
# Shift the grid to the center
x_shifted = x - x_center
y_shifted = y - y_center
# Rotate the grid to align with the ellipse's orientation
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
x_rotated = cos_theta * x_shifted + sin_theta * y_shifted
y_rotated = -sin_theta * x_shifted + cos_theta * y_shifted
# Define the elliptical aperture mask
ellipse_mask = (x_rotated / a) ** 2 + (y_rotated / b) ** 2 <= 1
# Isolate the flux within the aperture
aperture_flux = image * ellipse_mask
# Calculate the total flux within the aperture
total_flux = np.sum(aperture_flux)
if total_flux == 0:
raise ValueError("Total flux within the aperture is zero. Cannot calculate the flux-weighted center.")
# Calculate the flux-weighted coordinates
new_x_center = np.sum(aperture_flux * x) / total_flux
new_y_center = np.sum(aperture_flux * y) / total_flux
return new_x_center, new_y_center
def _flatten_dict(d, parent_key='', sep='.'):
"""
Flatten a nested dictionary into a single level with dot-separated keys.
Parameters:
-----------
d : dict
The nested dictionary to flatten.
parent_key : str, optional
The base key for recursion (default is '').
sep : str, optional
Separator for flattened keys (default is '.').
Returns:
--------
flat_dict : dict
The flattened dictionary.
"""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def _validate_config(config, required_keys):
"""
Validate that a configuration dictionary contains all required keys.
Parameters:
-----------
config : dict
The configuration dictionary.
required_keys : list
List of required keys.
Raises:
-------
KeyError
If a required key is missing.
"""
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise KeyError(f"Missing required config keys: {', '.join(missing_keys)}")
def _open_fits_image(file_path):
"""
Opens a FITS file and returns its image data and header.
Parameters:
----------
file_path : str
Path to the FITS file.
Returns:
-------
tuple
A tuple containing the image data and the header.
If the file cannot be opened or lacks data, returns (None, None).
"""
try:
# Open the FITS file
with fits.open(file_path) as hdul:
# Extract the image data and header from the first HDU
image_data = hdul[0].data
header = hdul[0].header
return image_data, header
except FileNotFoundError:
print(f"Error: File not found at '{file_path}'.")
return None, None
except Exception as e:
print(f"Error: Unable to open the FITS file. {e}")
return None, None
def _centralize_on_main_obj(xc, yc, image, size_method="auto", size=0):
"""
Center an image around a specified main object with guaranteed NxN odd dimensions.
Extra pixels are filled with zeros if the cutout extends beyond the image boundaries.
Parameters:
----------
xc : int
x-coordinate of the main object.
yc : int
y-coordinate of the main object.
image : ndarray
Input 2D image array.
size_method : str, optional
Method for determining cutout size. Options: "auto" or "manual". Default is "auto".
size : int, optional
Size for the cutout (only used if size_method="manual").
Returns:
-------
image_recentered : ndarray
Cutout of the input image, centered around the main object, with NxN odd dimensions.
"""
if size_method == "auto":
# Calculate the largest possible square cutout size around (xc, yc)
l1 = len(image[0]) # x-dimension
l2 = len(image) # y-dimension
new_length = 2 * min(yc, xc, l2 - yc - 1, l1 - xc - 1) + 1 # Always odd size
elif size_method == "input":
if size <= 0:
raise ValueError("Size must be a positive integer when size_method='manual'.")
new_length = size
if new_length % 2 == 0: # Ensure odd size
new_length += 1
else:
raise ValueError("Invalid size_method. Options are 'auto' or 'manual'.")
# Create the cutout using astropy's Cutout2D
cutout = Cutout2D(data=image, position=(xc, yc), size=new_length, mode='partial', fill_value=0)
image_recentered = np.array(cutout.data, dtype=np.float32)
return image_recentered
def _prepare_final_image(image, image_to_use, rp):
if image_to_use == 'smoothed':
box_kernel = convolution.Box2DKernel(round(rp/5))
final_image = convolve(image,
box_kernel,
normalize_kernel=True)
return(final_image)
import copy
def _merge_configs(default_config, user_config):
"""
Merge the user-provided configuration into the default configuration.
Keeps missing keys from the default config.
Parameters:
----------
default_config : dict
The default configuration dictionary.
user_config : dict
The user-provided configuration dictionary.
Returns:
-------
dict
Merged configuration dictionary.
"""
def recursive_update(default, user):
"""
Recursively update the default dictionary with user-provided values.
Parameters:
----------
default : dict
Default dictionary to update.
user : dict
User-provided dictionary.
Returns:
-------
dict
Updated dictionary.
"""
for key, value in user.items():
if isinstance(value, dict) and key in default:
# If the value is a dictionary, recurse
default[key] = recursive_update(default[key], value)
else:
# Otherwise, overwrite the value
default[key] = value
return default
# Create a deep copy to ensure the original default config is not modified
merged_config = copy.deepcopy(default_config)
return recursive_update(merged_config, user_config)
[docs]
def remove_central_region(image, remove_radius, xc, yc):
"""
Removes a circular region centered at ``(xc, yc)`` from a segmentation image.
Parameters:
----------
image : np.ndarray
The segmentation image.
remove_radius : float
Radius of the circular region, in pixels, that will be set to zero.
xc, yc : float
Coordinates of the center of the region to remove.
Returns:
-------
image_segmented : np.ndarray
The updated segmentation image with the central region removed.
"""
# Copy the image
image_copy = np.copy(image)
# Create coordinate grids
N1, N2 = image_copy.shape
y, x = np.meshgrid(np.arange(N1), np.arange(N2), indexing='ij')
x0 = (x + 0.5) - xc
y0 = (y + 0.5) - yc
# Calculate distances
D = np.sqrt(x0**2 + y0**2)
# Apply the condition and mask the central ellipse
mask = D < remove_radius
image_copy[mask] = 0
return image_copy