Как я могу согласовать несколько связанных документов (счета-фактуры, возвраты и кредит-ноты) с противоречивыми данными?Python

Программы на Python
Ответить
Anonymous
 Как я могу согласовать несколько связанных документов (счета-фактуры, возвраты и кредит-ноты) с противоречивыми данными?

Сообщение Anonymous »

Мне нужна помощь с довольно сложной задачей, которую мне поручил: сверка документов между различными типами записей.
Короче, мне нужно сопоставить документы с разными «причинными кодами»:
2: Goods receipt (DDT) from the store (PDV)
67: Invoice related to that DDT (from the CEDI)
5: Product return from the store
22: Product return requested by the supplier
20: Discrepancy (e.g. missing products)
68: Credit note

Документы, загружаемые магазинами вручную (2, 5, 20, 22), часто содержат ошибки ввода данных, такие как неверная дата, номер документа, поставщик или сведения о строке (неверный идентификатор товара, количество, скидки и т. д.).
Документы, поступающие из CEDI (67 и 68), обычно верны (счета-фактуры и кредит-ноты в формате XML), за исключением нескольких редких товаров на уровне несоответствия.
Для каждого документа у меня есть следующие данные:
store_id, supplier_id, number, date, total_amount, articles

Каждая статья содержит:
article_id, quantity, value, possible discount or gift flags


Что я сделал на данный момент
Я написал скрипт Python, который пытается найти совпадения:
  • Если в документе магазина указана неправильная дата, он выполняет поиск по store_id, поставщику и номеру.
  • Если у него неправильный номер, он выполняет поиск по store_id, поставщику и дате.
Однако текущая логика имеет несколько проблем:
  • Если существует несколько счетов-фактур за один и тот же день, он выдает неправильные совпадения.
  • Если и дата, и номер неверны, он находит ничего.
  • Это работает только для прямых пар, например:
2 ↔ 67
20 ↔ 68
22 ↔ 68
5 ↔ 68


Чего я хочу
Я хотел бы разработать более разумный алгоритм сопоставления, который сможет:
  • Принимать все документы со всеми данными, упомянутыми выше.
  • Автоматически находить наилучшее возможное совпадение.
  • Агрегировать документы, которые принадлежат одному и тому же «событию» (например: 2 + 67 + 20 + 68 или 2 + 67 + 22 + 68), чтобы замкнуть цикл согласования.
Кто-нибудь сталкивался с подобной проблемой или мог предложить логический подход или структуру данных для обработки такого рода сопоставления и агрегирования?
Ниже есть отрывок, переведенный с итальянского на английский искусственным интеллектом, чтобы сделать его доступным для всех:
class ReconciliazioneTeste:
def __init__(self):
# Colors for different worksheet types
self.sheet_colors = {
'PDV Inversions': 'FFFFA500', # Orange
'Number Errors': 'FFFF6347', # Tomato red
'Date Errors': 'FF4169E1', # Royal blue
'5-Field Matches': 'FF32CD32', # Lime green
'Perfect Matches': 'FF228B22' # Forest green
}

# Scoring system to prioritize match types
self.match_scores = {
'PDV Inversion': 1,
'Number Error': 2,
'Date Error': 3,
'5-Field Match': 4,
'Perfect Match': 5
}

# Input data
self.documenti_pdv = []
self.documenti_grd = []
self.excluded_suppliers = set()

# Reconciliation results
self.results = {
'inversions': [],
'numberErrors': [],
'dateErrors': [],
'fiveFieldMatches': [],
'perfectMatches': [],
'unreconciledPDV': [],
'unreconciledGRD': [],
'supplierErrors': []
}

# Normalize document number
def normalize_doc_number(self, numero):
"""Normalize document number to last 6 digits"""
if pd.isna(numero) or numero == '':
return ''

# Convert to string and strip spaces
num_str = str(numero).strip()

# Keep only last 6 digits
if len(num_str) > 6:
num_str = num_str[-6:]

# Remove leading zeros
num_str = num_str.lstrip('0') or '0'

return num_str

# Format document date
def format_date(self, date_str):
"""Format date as DD/MM/YYYY"""
if pd.isna(date_str) or date_str == '':
return ''

try:
# Try several date formats
if isinstance(date_str, str):
# DD/MM/YYYY format
if '/' in date_str:
parts = date_str.split('/')
if len(parts) == 3:
return f"{parts[0].zfill(2)}/{parts[1].zfill(2)}/{parts[2]}"

# DD-MM-YYYY format
if '-' in date_str:
parts = date_str.split('-')
if len(parts) == 3:
return f"{parts[0].zfill(2)}/{parts[1].zfill(2)}/{parts[2]}"

return str(date_str)
except:
return str(date_str)

# Compute document sign
def calculate_sign(self, td, imponibile):
"""Calculate document sign (+/-)"""
try:
td_num = int(td) if pd.notna(td) else 0
merce = float(str(imponibile).replace(',', '.')) if pd.notna(imponibile) else 0

# Transaction Type (TD) lists
negative_tds = [22, 25, 4, 40, 5, 51, 68, 8]
positive_tds = [1, 2, 41, 50, 55, 67]
neutral_tds = [20]

if td_num in negative_tds:
return '-'
elif td_num in positive_tds:
return '+'
elif td_num in neutral_tds:
return '+' if merce >= 0 else '-'
else:
# Default: use merchandise amount sign
return '+' if merce >= 0 else '-'
except:
return '+'

# Classify document type
def get_document_type(self, td, d):
"""Determine document type (PDV or GRD)"""
try:
td_num = int(td) if pd.notna(td) else 0
d_val = str(d).strip() if pd.notna(d) else ''

# Priority to column D if present
if d_val == 'S':
return 'GRD'
elif d_val == 'N':
return 'PDV'

# Otherwise use TD
grd_tds = [67, 68]
return 'GRD' if td_num in grd_tds else 'PDV'
except:
return 'PDV'

def load_csv_data(self, headers_file, excluded_suppliers_file):
"""Load data from CSV files"""
print("📁 Loading CSV files...")

# Load excluded suppliers
try:
df_suppliers = pd.read_csv(excluded_suppliers_file, sep=';', encoding='utf-8')
self.excluded_suppliers = set(df_suppliers['CodFor'].astype(str))
print(f"✅ Loaded {len(self.excluded_suppliers)} excluded suppliers")
except Exception as e:
print(f"⚠️ Error loading excluded suppliers: {e}")
self.excluded_suppliers = set()

# Load document headers (Italian format with comma as decimal separator)
try:
# CSV has an initial empty column, so use index_col=0 to skip it
df_headers = pd.read_csv(headers_file, sep=';', encoding='utf-8', decimal=',',
index_col=0, # Skip first empty column
dtype={'TD': 'Int64', 'Pdv': 'Int64', 'Anag.': 'Int64',
'Numero': 'str', 'Destinatar': 'Int64', 'Consegnata': 'Int64',
'Pdv Dest': 'Int64'})
print(f"✅ Loaded {len(df_headers)} document headers")

# Process each document
valid_documents = []
supplier_errors = []

for _, row in df_headers.iterrows():
# Check if supplier is excluded
supplier_code = str(row.get('Anag.', ''))
if supplier_code in self.excluded_suppliers:
supplier_errors.append(dict(row))
continue

# ".0" fix
def clean_number_to_string(val):
"""Convert number to string, removing .0 if applicable"""
if pd.isna(val) or val == '':
return ''
try:
# If it's a float representing an integer, convert it
if isinstance(val, float) and val.is_integer():
return str(int(val))
return str(val)
except:
return str(val)

# Create normalized document
doc = {
'TD': clean_number_to_string(row.get('TD', '')),
'Pdv': clean_number_to_string(row.get('Pdv', '')),
'Anag.': clean_number_to_string(row.get('Anag.', '')),
'Ragione Sociale': str(row.get('Ragione Sociale', '')),
'Est': str(row.get('Est', '')),
'Numero': clean_number_to_string(row.get('Numero', '')),
'Data': self.format_date(row.get('Data', '')),
'T.Imponibile': float(row.get('T.Imponibile', 0)) if pd.notna(row.get('T.Imponibile', 0)) else 0,
'Totale Iva': float(row.get('Totale Iva', 0)) if pd.notna(row.get('Totale Iva', 0)) else 0,
'Totale': float(row.get('Totale', 0)) if pd.notna(row.get('Totale', 0)) else 0,
'Destinatar': clean_number_to_string(row.get('Destinatar', '')),
'Consegnata': clean_number_to_string(row.get('Consegnata', '')),
'D': str(row.get('D', '')),
'DT Cons.': str(row.get('DT Cons.', '')),
'Pdv Dest': clean_number_to_string(row.get('Pdv Dest', '')),
'Merce': float(row.get('Merce', 0)) if pd.notna(row.get('Merce', 0)) else 0,

# Computed fields
'NumNorm': self.normalize_doc_number(row.get('Numero', '')),
'Segno': self.calculate_sign(row.get('TD', ''), row.get('T.Imponibile', 0)),
'TipoDoc': self.get_document_type(row.get('TD', ''), row.get('D', ''))
}

valid_documents.append(doc)

# Separate PDV and GRD documents
self.documenti_pdv = [d for d in valid_documents if d['TipoDoc'] == 'PDV']
self.documenti_grd = [d for d in valid_documents if d['TipoDoc'] == 'GRD']
self.results['supplierErrors'] = supplier_errors

print(f"📊 PDV Documents: {len(self.documenti_pdv)}")
print(f"📊 GRD Documents: {len(self.documenti_grd)}")
print(f"❌ Supplier errors: {len(supplier_errors)}")

except Exception as e:
print(f"❌ Error loading document headers: {e}")
raise

def perform_reconciliation(self):
"""Perform reconciliation using the defined logic"""
print("\n🔄 Starting reconciliation...")

# Map to track best match for each PDV document
pdv_matches = {} # pdv_index -> {grd_index, error_type, score}
grd_used = {} # grd_index -> pdv_index that uses it

doc_pdv = self.documenti_pdv
doc_grd = self.documenti_grd

print(f"📋 Documents to reconcile: {len(doc_pdv)} PDV vs {len(doc_grd)} GRD")

# Build indexes to optimize searches
print("🔍 Building indexes for optimization...")

# GRD index for fast lookup: (Date, Anag, NumNorm, Segno) -> [grd_indices]
grd_index = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['Data'], grd['Anag.'], grd['NumNorm'], grd['Segno'])
if key not in grd_index:
grd_index[key] = []
grd_index[key].append(grd_idx)

def add_match_if_better(pdv_idx, grd_idx, match_type, score):
"""Add match if it has higher priority than existing one"""
current_match = pdv_matches.get(pdv_idx)

if not current_match or score > current_match['score']:
current_grd_user = grd_used.get(grd_idx)
if (current_grd_user is None or current_grd_user == pdv_idx or
(current_grd_user in pdv_matches and score > pdv_matches[current_grd_user]['score'])):

if current_grd_user is not None and current_grd_user != pdv_idx:
if current_grd_user in pdv_matches:
del pdv_matches[current_grd_user]

pdv_matches[pdv_idx] = {
'grd_index': grd_idx,
'tipo_errore': match_type,
'score': score
}
grd_used[grd_idx] = pdv_idx
return True
return False

# 1. PDV INVERSION (4 fields: Date, Anag, NumNorm, Segno — BUT different PDVs)
print("🔄 Searching for PDV inversions...")
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Data'], pdv['Anag.'], pdv['NumNorm'], pdv['Segno'])
if key in grd_index:
for grd_idx in grd_index[key]:
grd = doc_grd[grd_idx]
if pdv['Pdv'] != grd['Pdv']: # Different PDVs required for inversion
add_match_if_better(pdv_idx, grd_idx, 'PDV Inversion', self.match_scores['PDV Inversion'])

# 2. DOCUMENT NUMBER ERRORS (4 fields: Anag, PDV, Date, Segno — BUT different NumNorm)
print("❌ Searching for number errors...")
# Specific index for number errors: (Anag, PDV, Date, Segno) -> [grd_indices]
grd_index_numero = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['Anag.'], grd['Pdv'], grd['Data'], grd['Segno'])
if key not in grd_index_numero:
grd_index_numero[key] = []
grd_index_numero[key].append(grd_idx)

for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Anag.'], pdv['Pdv'], pdv['Data'], pdv['Segno'])
if key in grd_index_numero:
for grd_idx in grd_index_numero[key]:
grd = doc_grd[grd_idx]
if pdv['NumNorm'] != grd['NumNorm']: # Different NumNorm required for number error
add_match_if_better(pdv_idx, grd_idx, 'Number Error', self.match_scores['Number Error'])

# 3. DATE ERRORS (4 fields: NumNorm, Anag, PDV, Segno — BUT different Dates)
print("📅 Searching for date errors...")
# Specific index for date errors: (NumNorm, Anag, PDV, Segno) -> [grd_indices]
grd_index_data = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['NumNorm'], grd['Anag.'], grd['Pdv'], grd['Segno'])
if key not in grd_index_data:
grd_index_data[key] = []
grd_index_data[key].append(grd_idx)

for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['NumNorm'], pdv['Anag.'], pdv['Pdv'], pdv['Segno'])
if key in grd_index_data:
for grd_idx in grd_index_data[key]:
grd = doc_grd[grd_idx]
if pdv['Data'] != grd['Data']: # Different dates required for date error
add_match_if_better(pdv_idx, grd_idx, 'Date Error', self.match_scores['Date Error'])

# 4. 5-FIELD MATCH (PDV, Anag, Date, NumNorm, Segno all identical)
print("🔍 Searching for 5-field matches...")
# Specific index for 5-field matches: (PDV, Anag, Date, NumNorm, Segno) -> [grd_indices]
grd_index_5campi = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['Pdv'], grd['Anag.'], grd['Data'], grd['NumNorm'], grd['Segno'])
if key not in grd_index_5campi:
grd_index_5campi[key] = []
grd_index_5campi[key].append(grd_idx)

for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Pdv'], pdv['Anag.'], pdv['Data'], pdv['NumNorm'], pdv['Segno'])
if key in grd_index_5campi:
for grd_idx in grd_index_5campi[key]:
add_match_if_better(pdv_idx, grd_idx, '5-Field Match', self.match_scores['5-Field Match'])

# 5. PERFECT MATCH (PDV, Anag, Date, NumNorm, Merce, Segno all identical)
print("💯 Searching for perfect matches...")
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Pdv'], pdv['Anag.'], pdv['Data'], pdv['NumNorm'], pdv['Segno'])
if key in grd_index_5campi:
for grd_idx in grd_index_5campi[key]:
grd = doc_grd[grd_idx]
if abs(pdv['Merce'] - grd['Merce']) < 0.01: # Same merchandise amount for perfect match
add_match_if_better(pdv_idx, grd_idx, 'Perfect Match', self.match_scores['Perfect Match'])

# Build final results
print("📋 Compiling final results...")

# Categorize final matches by type
for pdv_idx, match in pdv_matches.items():
match_data = {
'pdv': {**doc_pdv[pdv_idx], '_index': pdv_idx},
'grd': {**doc_grd[match['grd_index']], '_index': match['grd_index']},
'tipoErrore': match['tipo_errore']
}

if match['tipo_errore'] == 'PDV Inversion':
self.results['inversions'].append(match_data)
elif match['tipo_errore'] == 'Number Error':
self.results['numberErrors'].append(match_data)
elif match['tipo_errore'] == 'Date Error':
self.results['dateErrors'].append(match_data)
elif match['tipo_errore'] == '5-Field Match':
self.results['fiveFieldMatches'].append(match_data)
elif match['tipo_errore'] == 'Perfect Match':
self.results['perfectMatches'].append(match_data)

# Find unreconciled documents
for pdv_idx, pdv in enumerate(doc_pdv):
if pdv_idx not in pdv_matches:
self.results['unreconciledPDV'].append({**pdv, '_index': pdv_idx})

grd_used_set = set(grd_used.keys())
for grd_idx, grd in enumerate(doc_grd):
if grd_idx not in grd_used_set:
self.results['unreconciledGRD'].append({**grd, '_index': grd_idx})

# Final stats
total_matched = len(pdv_matches)
total_pdv = len(doc_pdv)
total_grd = len(doc_grd)

print(f"✅ Reconciliation completed!")
print(f"📊 Matches found: {total_matched}/{total_pdv} PDV ({total_matched/total_pdv*100:.1f}%)")
print(f"📊 PDV Inversions: {len(self.results['inversions'])}")
print(f"📊 Number Errors: {len(self.results['numberErrors'])}")
print(f"📊 Date Errors: {len(self.results['dateErrors'])}")
print(f"📊 5-Field Matches: {len(self.results['fiveFieldMatches'])}")
print(f"📊 Perfect Matches: {len(self.results['perfectMatches'])}")
print(f"📊 Unreconciled PDV: {len(self.results['unreconciledPDV'])}")
print(f"📊 Unreconciled GRD: {len(self.results['unreconciledGRD'])}")

return self.results

def create_excel_output(self, output_file):


Подробнее здесь: https://stackoverflow.com/questions/798 ... -credit-no
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»