Source code for imagesplit.file.metaio_reader
# coding=utf-8
"""
Utility for reading and writing data to metaio (mhd/mha) files
Author: Tom Doel
Copyright UCL 2017
"""
import copy
import os
from collections import OrderedDict
import numpy as np
from imagesplit.file.data_type import DataType
from imagesplit.file.file_image_descriptor import FileImageDescriptor
from imagesplit.file.file_wrapper import FileWrapper, FileStreamer
from imagesplit.file.image_file_reader import LinearImageFileReader
from imagesplit.image.combined_image import Axis
from imagesplit.utils.utilities import compute_bytes_per_voxel, \
get_numpy_datatype
[docs]class MetaIoFile(LinearImageFileReader):
"""A class for reading or writing 3D imaging data to/from a MetaIO file
pair (.mhd and .raw). """
# pylint: disable=too-many-instance-attributes, super-with-arguments
def __init__(self, local_file_size, header_filename,
file_handle_factory, header_template):
super(MetaIoFile, self).__init__(local_file_size)
self._file_handle_factory = file_handle_factory
self._header_filename = header_filename
self._input_path = os.path.dirname(os.path.abspath(header_filename))
self._file_wrapper = None
self._file_streamer = None
if header_template:
# File is for writing
self._mode = 'wb'
# Force the raw filename to match the header filename
base_filename = os.path.splitext(
os.path.basename(header_filename))[0]
header = copy.deepcopy(header_template)
header['ElementDataFile'] = base_filename + '.raw'
save_mhd_header(header_filename, header)
self._header = header
else:
# File is for reading
self._mode = 'rb'
self._header = load_mhd_header(header_filename)
self._bytes_per_voxel = compute_bytes_per_voxel(
self._header["ElementType"]) # ToDo: set this based on output format
self._numpy_format = get_numpy_datatype(
self._header["ElementType"],
self._header["BinaryDataByteOrderMSB"])
self._subimage_size = self._header["DimSize"]
self._dimension_ordering = get_condensed_dim_order(self._header)
[docs] @classmethod
def load_and_parse_header(cls, filename):
"""Reads a MetaIO header file and parses"""
header = load_mhd_header(filename)
return parse_mhd(header)
[docs] @classmethod
def create_read_file(cls, subimage_descriptor, file_handle_factory):
"""Create a MetaIoFile class for writing"""
filename = subimage_descriptor.filename
local_file_size = subimage_descriptor.ranges.image_size
return cls(local_file_size, filename, file_handle_factory, None)
[docs] @classmethod
def create_write_file(cls, subimage_descriptor, file_handle_factory):
"""Create a MetaIoFile class for this filename and template"""
header_template = cls._create_meta_header(subimage_descriptor)
# header_template = copy.deepcopy(subimage_descriptor.template)
local_file_size = subimage_descriptor.get_local_size()
local_origin = subimage_descriptor.get_local_origin()
if subimage_descriptor.data_type:
header_template["ElementType"] = \
DataType.metaio_from_name(subimage_descriptor.data_type)
header_template["DimSize"] = local_file_size
header_template["Origin"] = local_origin
filename = subimage_descriptor.filename
return cls(local_file_size, filename, file_handle_factory,
header_template)
[docs] def write_line(self, start_coords, image_line, rescale_limits):
"""Write consecutive voxels to the raw binary file."""
return self._get_file_streamer().write_line(
start_coords, image_line, rescale_limits)
[docs] def read_line(self, start_coords, num_voxels_to_read):
"""Read consecutive voxels of image data from the raw binary file
starting at the specified coordinates. """
return self._get_file_streamer().read_line(start_coords,
num_voxels_to_read)
[docs] def get_bytes_per_voxel(self):
"""Return the number of bytes used to represent a single voxel in
this image. """
header = self._get_header()
return compute_bytes_per_voxel(header["ElementType"])
[docs] def get_dimension_ordering(self):
"""
Return the preferred dimension ordering for writing data.
Returns an array of 3 element, where each element represents a
dimension in the global coordinate system numbered from 1 to 3 and
is positive if data are to be written in ascending coordinates (in
the global system) or negative if to be written in descending global
coordinates along that dimension
"""
return self._dimension_ordering
@classmethod
def _create_meta_header(cls, subimage_descriptor):
local_file_size = subimage_descriptor.get_local_size()
local_origin = subimage_descriptor.get_local_origin()
local_voxel_size = subimage_descriptor.get_local_voxel_size()
transform_matrix = condensed_to_cosine(
subimage_descriptor.axis.to_condensed_format())
header = get_default_metadata()
header["ObjectType"] = 'Image'
header["NDims"] = np.size(local_file_size)
header["BinaryData"] = 'True'
header["BinaryDataByteOrderMSB"] = subimage_descriptor.msb
header["CompressedData"] = 'False'
header["TransformMatrix"] = transform_matrix
header["ElementSize"] = local_voxel_size
header["DimSize"] = local_file_size
header["ElementType"] = \
DataType.metaio_from_name(subimage_descriptor.data_type)
header["Origin"] = local_origin
return header
def _get_header(self):
"""Return an OrderedDict containing the MetaIO metaheader metadata
for this image. """
if not self._header:
self._header = load_mhd_header(self._header_filename)
return self._header
def _get_file_wrapper(self):
"""Return the FileWrapper representing this image, creating it if
it does not already exist. """
if not self._file_wrapper:
header = self._get_header()
filename_raw = os.path.join(self._input_path,
header["ElementDataFile"])
self._file_wrapper = FileWrapper(filename_raw,
self._file_handle_factory,
self._mode)
return self._file_wrapper
def _get_file_streamer(self):
"""Return the FileStreamer representing this image, creating it
if it does not already exist. """
if not self._file_streamer:
self._file_streamer = FileStreamer(self._get_file_wrapper(),
self._subimage_size,
self._bytes_per_voxel,
self._numpy_format,
self._dimension_ordering)
return self._file_streamer
[docs] def close(self):
"""Close the files associated with this image, if they are not
already closed. """
if self._file_streamer:
self._file_streamer.close()
self._file_streamer = None
if self._file_wrapper:
self._file_wrapper.close()
self._file_wrapper = None
[docs]def load_mhd_header(filename):
"""Return an OrderedDict containing metadata loaded from an mhd file."""
metadata = OrderedDict()
with open(filename) as header_file:
for line in header_file:
(key, val) = [x.strip() for x in line.split("=")]
if key in ['ElementSpacing', 'Offset', 'CenterOfRotation',
'TransformMatrix', 'ElementSize']:
new_val = [float(s) for s in val.split()]
elif key in ['NDims', 'ElementNumberOfChannels']:
new_val = int(val)
elif key in ['DimSize']:
new_val = [int(s) for s in val.split()]
elif key in ['BinaryData', 'BinaryDataByteOrderMSB',
'CompressedData']:
# pylint: disable=simplifiable-if-statement
if val.lower() == "true":
new_val = True
else:
new_val = False
else:
new_val = val
metadata[key] = new_val
return metadata
[docs]def save_mhd_header(filename, metadata):
"""Saves a mhd header file to disk using the given metadata"""
# Add default metadata, replacing with custom specified values
header = ''
default_metadata = get_default_metadata()
for key, val in default_metadata.items():
if key in metadata.keys():
value = metadata[key]
else:
value = val
if value or value == 'False' or value == 0:
value = str(value)
value = value.replace("[", "").replace("]", "").replace(",", "")
header += '%s = %s\n' % (key, value)
# Add any custom metadata tags
for key, val in metadata.items():
if key not in default_metadata.keys():
value = str(metadata[key])
value = value.replace("[", "").replace("]", "").replace(",", "")
header += '%s = %s\n' % (key, value)
file_handle = open(filename, 'w')
file_handle.write(header)
file_handle.close()
[docs]def get_default_metadata():
"""Return an OrderedDict containing default mhd file metadata"""
return OrderedDict(
[('ObjectType', 'Image'), ('NDims', '3'), ('BinaryData', 'True'),
('BinaryDataByteOrderMSB', 'True'),
('CompressedData', []), ('CompressedDataSize', []),
('TransformMatrix', []), ('Offset', []),
('CenterOfRotation', []), ('AnatomicalOrientation', []),
('ElementSpacing', []), ('DimSize', []),
('ElementNumberOfChannels', []), ('ElementSize', []),
('ElementType', 'MET_FLOAT'), ('ElementDataFile', []),
('Comment', []), ('SeriesDescription', []), ('AcquisitionDate', []),
('AcquisitionTime', []),
('StudyDate', []), ('StudyTime', [])])
[docs]def get_condensed_dim_order(header):
"""Return the condensed dimension order and flip string for this header"""
if "TransformMatrix" in header and header["TransformMatrix"]:
transform = header["TransformMatrix"]
new_dimension_order, flip_orientation = \
mhd_cosines_to_permutation(
transform[0:3], transform[3:6], transform[6:9])
elif "AnatomicalOrientation" in header and header["AnatomicalOrientation"]:
new_dimension_order, flip_orientation = \
anatomical_to_permutation(
header["AnatomicalOrientation"])
else:
new_dimension_order = [0, 1, 2]
flip_orientation = [False, False, False]
return Axis(new_dimension_order, flip_orientation).to_condensed_format()
[docs]def anatomical_to_permutation(anatomical_orientation_string):
"""Dimension permutation vector corresponding to this orientation string"""
direction_cosine_1 = \
anatomical_to_cosine(anatomical_orientation_string[0])
direction_cosine_2 = \
anatomical_to_cosine(anatomical_orientation_string[1])
direction_cosine_3 = \
anatomical_to_cosine(anatomical_orientation_string[2])
permutation_vector, flip_orientation = \
mhd_cosines_to_permutation(
direction_cosine_1, direction_cosine_2, direction_cosine_3)
return permutation_vector, flip_orientation
[docs]def anatomical_to_cosine(anatomical_orientation_char):
"""Get Dicom direction cosine for this orientation string"""
if anatomical_orientation_char == 'R':
return [1, 0, 0]
elif anatomical_orientation_char == 'L':
return [-1, 0, 0]
elif anatomical_orientation_char == 'A':
return [0, 1, 0]
elif anatomical_orientation_char == 'P':
return [0, -1, 0]
elif anatomical_orientation_char == 'I':
return [0, 0, 1]
elif anatomical_orientation_char == 'S':
return [0, 0, -1]
raise ValueError('No implementation yet for anatomical orientation ' +
anatomical_orientation_char + '.')
[docs]def permutation_to_cosine(permutation, flip):
"""Get mhd direction cosine for this dimension permutation"""
dir_cosine = [0, 0, 0, 0, 0, 0, 0, 0, 0]
dir_cosine[permutation[0]*3] = -1 if flip[0] else 1
dir_cosine[permutation[1]*3 + 1] = -1 if flip[1] else 1
dir_cosine[permutation[2]*3 + 2] = -1 if flip[2] else 1
return dir_cosine
[docs]def condensed_to_cosine(condensed_format):
"""Get mhd direction cosine for this condensed format axis"""
axis = Axis.from_condensed_format(condensed_format)
return permutation_to_cosine(axis.dim_order, axis.dim_flip)
[docs]def mhd_cosines_to_permutation(direction_cosine_1,
direction_cosine_2,
direction_cosine_3):
"""Get dimension permutation vectors for these mhd direction cosines"""
orientation_1 = direction_cosine_1
orientation_2 = direction_cosine_2
orientation_3 = direction_cosine_3
permutation_vector, dimension_1, dimension_2, dimension_3 = \
permutation_from_orientations(orientation_1, orientation_2)
flip_orientation = get_flip_from_orientations(
orientation_1, orientation_2, orientation_3, dimension_1, dimension_2,
dimension_3)
if np.sum(np.equal(permutation_vector, 0)) != 1 or \
np.sum(np.equal(permutation_vector, 1)) != 1 or \
np.sum(np.equal(permutation_vector, 2)) != 1 or \
np.size(np.setdiff1d(permutation_vector, [0, 1, 2])) != 0:
raise ValueError('Invalid permutation vector')
return permutation_vector, flip_orientation
[docs]def permutation_from_orientations(orientation_1, orientation_2):
"""Get dimension permutation vectors for these orientation vectors"""
dimension_1, dimension_2, dimension_3 = \
dimensions_from_orientations(orientation_1, orientation_2)
permutation_vector = [2, 2, 2]
permutation_vector[dimension_1] = 0
permutation_vector[dimension_2] = 1
return permutation_vector, dimension_1, dimension_2, dimension_3
[docs]def dimensions_from_orientations(orientation_vector_1,
orientation_vector_2):
"""Get dimension permutation vectors for these orientation vectors"""
dimension_number_1 = np.argmax(np.abs(orientation_vector_1))
remaining_dimensions = np.setdiff1d([0, 1, 2], dimension_number_1)
reduced_orientation_vector_2 = np.take(orientation_vector_2,
remaining_dimensions)
dim2_from_reduced_set = np.argmax(np.abs(reduced_orientation_vector_2))
dimension_number_2 = remaining_dimensions[dim2_from_reduced_set]
dimension_number_3 = np.setdiff1d([0, 1, 2],
[dimension_number_1, dimension_number_2])
dimension_number_3 = dimension_number_3[0]
return dimension_number_1, dimension_number_2, dimension_number_3
[docs]def get_flip_from_orientations(orientation_1, orientation_2, orientation_3,
dimension_1, dimension_2, dimension_3):
"""Get dimension flip vectors for these orientation vectors"""
flip = [False, False, False]
flip[dimension_1] = orientation_1[dimension_1] < 0
flip[dimension_2] = orientation_2[dimension_2] < 0
flip[dimension_3] = orientation_3[dimension_3] < 0
return flip
[docs]def parse_mhd(header):
"""Read a metaheader and returns a FileImageDescriptor"""
file_format = "mhd"
dim_order = get_condensed_dim_order(header)
data_type = DataType.name_from_metaio(header["ElementType"])
image_size = header["DimSize"]
msb = header["BinaryDataByteOrderMSB"]
compression = None
voxel_size = header['ElementSize']
return (FileImageDescriptor(file_format=file_format,
dim_order=dim_order,
data_type=data_type,
image_size=image_size,
msb=msb,
compression=compression,
voxel_size=voxel_size), header)