Текущая реализация строит родословную с использованием поиска в ширину (BFS) до фиксированного числа поколений и вычисляет F по классической формуле:
𝐹=∑[(1/2)^(𝑛1+𝑛2+1)] (1+𝐹𝐴)
где:
- n1 — количество поколений между отцом и общим предком,
- n2 — количество поколений между матерью и общим предком предка,
- 𝐹𝐴 — коэффициент инбридинга общего предка.
- рассматривает только кратчайший путь от отца и матери к каждому общему предку,
- не перебирает все возможные циклы в родословная.
- множественные независимые петли,
- повторяющиеся предки по разным путям,
- более глубокие или перекрывающиеся структуры инбридинга.
- алгоритм недооценивает F, когда общий предок появляется через более чем один допустимый цикл,
- вклады от дополнительных путей игнорируются,
- рекурсивное вычисление 𝐹𝐴 распространяет то же ограничение.
- коэффициент Райта-Малеко может быть правильно вычислен только с использованием кратчайших путей, или
- все допустимые исходные циклы должны быть явно перечислены (или иным образом учтены),
- и какой алгоритмический подход рекомендуется для правильной и вычислительно осуществимой реализации в этом контексте.
#inbreeding.py
from collections import deque
from typing import Dict, Tuple, Optional
from decimal import Decimal
import logging
logger = logging.getLogger(__name__)
class InbreedingCalculator:
"""
Correct Wright–Malécot Inbreeding Coefficient Calculator.
Uses SIMPLE LOOPS, not all paths.
"""
def __init__(self, max_generations: int = 10):
self.max_generations = max_generations
self._fa_cache: Dict[int, float] = {}
# ------------------------------------------------------------
# PUBLIC API
# ------------------------------------------------------------
def calculate_coefficient(self, pigeon_id, sire_id, dam_id) -> Decimal:
if not sire_id or not dam_id:
return Decimal("0.0000")
try:
from .models import Pigeon
pedigree = self._build_pedigree(sire_id, dam_id, Pigeon)
f = self._compute_f(sire_id, dam_id, pedigree)
return Decimal(str(round(f, 4)))
except Exception as e:
logger.error(f"Error calculating F for pigeon {pigeon_id}: {e}", exc_info=True)
return Decimal("0.0000")
# ------------------------------------------------------------
# BUILD PEDIGREE (BFS, batched)
# ------------------------------------------------------------
def _build_pedigree(self, sire_id, dam_id, Pigeon):
pedigree = {}
visited = set()
queue = deque([(sire_id, 0), (dam_id, 0)])
while queue:
pid, depth = queue.popleft()
if pid in visited or depth > self.max_generations:
continue
visited.add(pid)
try:
p = Pigeon.objects.only("id", "sire_id", "dam_id").get(pk=pid)
s, d = p.sire_id, p.dam_id
except:
s, d = None, None
pedigree[pid] = (s, d)
if s:
queue.append((s, depth + 1))
if d:
queue.append((d, depth + 1))
return pedigree
# ------------------------------------------------------------
# SIMPLE LOOP METHOD (Wright–Malécot)
# ------------------------------------------------------------
def _compute_f(self, sire_id, dam_id, pedigree):
ancestors_sire = self._shortest_paths(sire_id, pedigree)
ancestors_dam = self._shortest_paths(dam_id, pedigree)
common = set(ancestors_sire.keys()) & set(ancestors_dam.keys())
if not common:
return 0.0
total = 0.0
for A in common:
n1 = ancestors_sire[A]
n2 = ancestors_dam[A]
Fa = self._get_fa(A, pedigree)
contrib = (0.5 ** (n1 + n2 + 1)) * (1 + Fa)
total += contrib
return total
# ------------------------------------------------------------
# SHORTEST PATHS (NOT all paths!)
# ------------------------------------------------------------
def _shortest_paths(self, start_id, pedigree):
"""
Returns: {ancestor_id: distance}
"""
dist = {start_id: 0}
queue = deque([start_id])
while queue:
current = queue.popleft()
depth = dist[current]
if depth >= self.max_generations:
continue
if current not in pedigree:
continue
s, d = pedigree[current]
for parent in (s, d):
if parent and parent not in dist:
dist[parent] = depth + 1
queue.append(parent)
return dist
# ------------------------------------------------------------
# FA CALCULATION (recursive)
# ------------------------------------------------------------
def _get_fa(self, ancestor_id, pedigree):
if ancestor_id in self._fa_cache:
return self._fa_cache[ancestor_id]
s, d = pedigree.get(ancestor_id, (None, None))
if not s or not d:
self._fa_cache[ancestor_id] = 0.0
return 0.0
Fa = self._compute_f(s, d, pedigree)
self._fa_cache[ancestor_id] = Fa
return Fa
def clear_cache(self):
self._fa_cache.clear()
_calculator = InbreedingCalculator(max_generations=10)
def get_calculator():
return _calculator
Подробнее здесь: https://stackoverflow.com/questions/798 ... shortest-p
Мобильная версия