#
# Copyright (C) 2010-2017 PyTRiP98 Developers.
#
# This file is part of PyTRiP98.
#
# PyTRiP98 is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyTRiP98 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyTRiP98. If not, see <http://www.gnu.org/licenses/>.
#
"""
This module provides the DensityCube class and some special class to find robust angles for treament
based on a quality index factor defined in http://dx.doi.org/10.3109/0284186X.2015.1067720.
"""
import gc
import os
from math import sin, cos
from multiprocessing import Process, Queue
from functools import cmp_to_key
import numpy as np
from pytrip.cube import Cube
from pytrip.res.point import get_basis_from_angles
from pytrip.res.interpolate import RegularInterpolator
import pytriplib
[docs]def cmp_sort(a, b):
""" Sorting key. If gantry angle is equal, then sort by couch angle.
"""
if a["gantry"] == b["gantry"]:
return a["couch"] - b["couch"]
return a["gantry"] - b["gantry"]
[docs]class DensityProjections:
"""
Functions here were mosly used buy for the publication http://dx.doi.org/10.3109/0284186X.2015.1067720
"""
def __init__(self, cube):
""" TODO: Documentation
"""
self.cube = cube
[docs] def calculate_quality_grid(self, voi, gantry, couch, calculate_from=0, stepsize=1.0, avoid=[], gradient=True):
""" TODO: Documentation
"""
l = self.calculate_quality_list(voi, gantry, couch, calculate_from, stepsize, avoid=avoid, gradient=gradient)
l = sorted(l, key=cmp_to_key(cmp_sort))
grid_data = []
for x in l:
grid_data.append(x["data"][0])
l = np.reshape(grid_data, (len(gantry), len(couch)))
return l
[docs] def calculate_quality_list(self, voi, gantry, couch, calculate_from=0, stepsize=1.0, avoid=[], gradient=True):
""" TODO: Documentation
"""
q = Queue(32767)
process = []
d = voi.get_voi_cube()
d.cube = np.array(d.cube, dtype=np.float32)
voi_cube = DensityProjections(d)
result = []
for gantry_angle in gantry:
p = Process(
target=self.calculate_angle_quality_thread,
args=(voi, gantry_angle, couch, calculate_from, stepsize, q, avoid, voi_cube, gradient))
p.start()
p.deamon = True
process.append(p)
if len(process) > 2:
tmp = q.get()
result.append(tmp)
for p in process:
if not p.is_alive():
process.remove(p)
while not len(result) == len(gantry) * len(couch):
tmp = q.get()
result.append(tmp)
return result
[docs] def calculate_best_angles(self, voi, fields, min_dist=20, gantry_limits=[-90, 90], couch_limits=[0, 359], avoid=[]):
""" TODO: Documentation
"""
grid = self.calculate_quality_list(
voi,
range(gantry_limits[0], gantry_limits[1], 20),
range(couch_limits[0], couch_limits[1], 20),
avoid=avoid)
grid = sorted(grid, key=lambda x: x["data"][0])
best_angles = []
i = 0
while len(best_angles) < fields or i >= len(grid):
is_ok = True
point = np.array([grid[i]["gantry"], grid[i]["couch"]])
for angle in best_angles:
if np.linalg.norm(angle - point) < 2 * min_dist:
is_ok = False
if is_ok:
best_angles.append(self.optimize_angle(voi, point[0], point[1], 20, 3, avoid=avoid))
i += 1
return best_angles
[docs] def optimize_angle(self, voi, couch, gantry, margin, iteration, avoid=[]):
""" TODO: Documentation
"""
if iteration is 0:
return [gantry, couch]
grid = self.calculate_quality_list(
voi, np.linspace(gantry, gantry + margin, 3), np.linspace(couch, couch + margin, 3), avoid=avoid)
min_item = min(grid, key=lambda x: x["data"][0])
return self.optimize_angle(voi, min_item["gantry"], min_item["couch"], margin / 2, iteration - 1, avoid=avoid)
[docs] def calculate_angle_quality_thread(self,
voi,
gantry,
couch,
calculate_from=0,
stepsize=1.0,
q=None,
avoid=[],
voi_cube=None,
gradient=True):
""" TODO: Documentation
"""
os.nice(1)
for couch_angle in couch:
qual = self.calculate_angle_quality(voi, gantry, couch_angle, calculate_from, stepsize, avoid, voi_cube,
gradient)
q.put({"couch": couch_angle, "gantry": gantry, "data": qual})
[docs] def calculate_angle_quality(self,
voi,
gantry,
couch,
calculate_from=0,
stepsize=1.0,
avoid=[],
voi_cube=None,
gradient=True):
"""
Calculates a quality index for a given gantry/couch combination.
"""
voi_min, voi_max = voi.get_min_max()
for v in avoid:
v_min, v_max = v.get_min_max()
if voi_cube is None:
d = voi.get_voi_cube()
d.cube = np.array(d.cube, dtype=np.float32)
voi_cube = DensityProjections(d)
data, start, basis = self.calculate_projection(voi, gantry, couch, calculate_from, stepsize)
voi_proj, t1, t2 = voi_cube.calculate_projection(voi, gantry, couch, 1, stepsize)
if gradient:
gradient = np.gradient(data)
data = (gradient[0]**2 + gradient[1]**2)**0.5
a = data * (voi_proj > 0.0)
quality = sum(a)
area = sum(voi_proj > 0.0)
# ~ area = sum(data>0.0)/10
if gradient:
mean_quality = 10 - abs(quality / area)
else:
mean_quality = abs(quality / area)
return mean_quality, quality, area
[docs] def calculate_projection(self, voi, gantry, couch, calculate_from=0, stepsize=1.0):
"""
TODO: documentation
:param Voi voi: tumortarget, type is Voi
:param float gantry: angle in degrees
:param float couch: angle in degrees
:param int calculate_from: 0 is mass center 1 is the most distant point in the tumor from the beamaxis
:param float stepsize: relative to pixelsize, 1 is a step of 1 pixel
:return:
"""
# min_structure = 5 TODO why not used ?
basis = get_basis_from_angles(gantry, couch)
# Convert angles from degrees to radians
gantry /= 180.0 / np.pi
couch /= 180.0 / np.pi
# calculate surface normal
step_vec = -stepsize * self.cube.pixel_size * np.array(basis[0])
step_length = stepsize * self.cube.pixel_size
center = voi.calculate_center()
# ~ (b,c) = self.calculate_plane_vectors(gantry,couch)
b = basis[1]
c = basis[2]
min_window, max_window = voi.get_min_max()
size = np.array(max_window) - np.array(min_window)
window_size = np.array([((sin(couch) * size[1])**2 + (cos(couch) * size[2])**2)**0.5, (
(sin(gantry) * size[0])**2 + (cos(gantry) * size[1])**2 + (sin(couch) * size[2])**2)**0.5]) * 2
dimension = window_size / self.cube.pixel_size
dimension = np.int16(dimension)
start = center - self.cube.pixel_size * 0.5 * np.array(dimension[0] * b + dimension[1] * c)
if calculate_from is 1:
start = self.calculate_back_start_voi(voi, start, step_vec)
if calculate_from is 2:
start = self.calculate_front_start_voi(voi, start, step_vec)
data = pytriplib.calculate_wepl(
self.cube.cube, np.array(start), np.array(basis) * step_length, dimension,
np.array([self.cube.pixel_size, self.cube.pixel_size, self.cube.slice_distance]))
data *= step_length
return data, start, [b, c]
[docs] def calculate_back_start_voi(self, voi, start, beam_axis):
""" TODO: Documentation
:params voi:
:params start:
:params beam axis:
:returns:
"""
points = voi.get_3d_polygon() - start
distance = min(np.dot(points, beam_axis))
return start + distance * beam_axis
[docs] def calculate_front_start_voi(self, voi, start, beam_axis):
""" TODO: Documentation
:params voi:
:params start:
:params beam axis:
:returns:
"""
points = voi.get_3d_polygon() - start
distance = max(np.dot(points, beam_axis))
return start + distance * beam_axis
[docs]class DensityCube(Cube):
""" Class for working with denisity cubes [g/cm3]
"""
def __init__(self, ctxcube, hlut_path="/data/hlut_den.dat"):
""" Creates a DensityCube based on a CTX cube and a Hounsfield lookup table.
:params CtxCube ctxcube:
:params str hlut_path: path to Hounsfield lookup table, relative from where pytrip was executed.
"""
self.ctxcube = ctxcube
super(DensityCube, self).__init__(ctxcube)
self.type = "Density"
self.directory = os.path.dirname(os.path.abspath(__file__))
self.hlut_file = os.path.join(self.directory, hlut_path)
self.import_hlut()
self.calculate_cube()
self.ctxcube = None
[docs] def calculate_cube(self):
"""
Calculate the density values from HU table and interpolating the loaded hlut_data.
"""
ctxdata = self.ctxcube.cube
ctxdata = ctxdata.reshape(self.dimx * self.dimy * self.dimz)
gc.collect()
cube = self.hlut_data(ctxdata)
cube = cube.astype(np.float32)
self.cube = np.reshape(cube, (self.dimz, self.dimy, self.dimx))
[docs] def import_hlut(self):
""" Imports the Hounsfield lookup table and stores it into self.hlut_data object
self.hlut_data is trained linear interpolator, it can be later called to get interpolated values
"""
fp = open(self.hlut_file, "r")
lines = fp.read()
fp.close()
lines = lines.split('\n')
x_data = []
y_data = []
for line in lines:
a = line.split()
if len(a):
x_data.append(float(a[0]))
y_data.append(float(a[3]))
self.hlut_data = RegularInterpolator(np.array(x_data), np.array(y_data), kind='linear')