Короче, мне нужно сопоставить документы с разными «причинными кодами»:
2: Goods receipt (DDT) from the store (PDV)
67: Invoice related to that DDT (from the CEDI)
5: Product return from the store
22: Product return requested by the supplier
20: Discrepancy (e.g. missing products)
68: Credit note
Документы, загружаемые магазинами вручную (2, 5, 20, 22), часто содержат ошибки ввода данных, такие как неверная дата, номер документа, поставщик или сведения о строке (неверный идентификатор товара, количество, скидки и т. д.).
Документы, поступающие из CEDI (67 и 68), обычно верны (счета-фактуры и кредит-ноты в формате XML), за исключением нескольких редких товаров на уровне несоответствия.
Для каждого документа у меня есть следующие данные:
store_id, supplier_id, number, date, total_amount, articles
Каждая статья содержит:
article_id, quantity, value, possible discount or gift flags
Что я сделал на данный момент
Я написал скрипт Python, который пытается найти совпадения:
- Если в документе магазина указана неправильная дата, он выполняет поиск по store_id, поставщику и номеру.
- Если у него неправильный номер, он выполняет поиск по store_id, поставщику и дате.
- Если существует несколько счетов-фактур за один и тот же день, он выдает неправильные совпадения.
- Если и дата, и номер неверны, он находит ничего.
- Это работает только для прямых пар, например:
20 ↔ 68
22 ↔ 68
5 ↔ 68
Чего я хочу
Я хотел бы разработать более разумный алгоритм сопоставления, который сможет:
- Принимать все документы со всеми данными, упомянутыми выше.
- Автоматически находить наилучшее возможное совпадение.
- Агрегировать документы, которые принадлежат одному и тому же «событию» (например: 2 + 67 + 20 + 68 или 2 + 67 + 22 + 68), чтобы замкнуть цикл согласования.
Ниже есть отрывок, переведенный с итальянского на английский искусственным интеллектом, чтобы сделать его доступным для всех:
class ReconciliazioneTeste:
def __init__(self):
# Colors for different worksheet types
self.sheet_colors = {
'PDV Inversions': 'FFFFA500', # Orange
'Number Errors': 'FFFF6347', # Tomato red
'Date Errors': 'FF4169E1', # Royal blue
'5-Field Matches': 'FF32CD32', # Lime green
'Perfect Matches': 'FF228B22' # Forest green
}
# Scoring system to prioritize match types
self.match_scores = {
'PDV Inversion': 1,
'Number Error': 2,
'Date Error': 3,
'5-Field Match': 4,
'Perfect Match': 5
}
# Input data
self.documenti_pdv = []
self.documenti_grd = []
self.excluded_suppliers = set()
# Reconciliation results
self.results = {
'inversions': [],
'numberErrors': [],
'dateErrors': [],
'fiveFieldMatches': [],
'perfectMatches': [],
'unreconciledPDV': [],
'unreconciledGRD': [],
'supplierErrors': []
}
# Normalize document number
def normalize_doc_number(self, numero):
"""Normalize document number to last 6 digits"""
if pd.isna(numero) or numero == '':
return ''
# Convert to string and strip spaces
num_str = str(numero).strip()
# Keep only last 6 digits
if len(num_str) > 6:
num_str = num_str[-6:]
# Remove leading zeros
num_str = num_str.lstrip('0') or '0'
return num_str
# Format document date
def format_date(self, date_str):
"""Format date as DD/MM/YYYY"""
if pd.isna(date_str) or date_str == '':
return ''
try:
# Try several date formats
if isinstance(date_str, str):
# DD/MM/YYYY format
if '/' in date_str:
parts = date_str.split('/')
if len(parts) == 3:
return f"{parts[0].zfill(2)}/{parts[1].zfill(2)}/{parts[2]}"
# DD-MM-YYYY format
if '-' in date_str:
parts = date_str.split('-')
if len(parts) == 3:
return f"{parts[0].zfill(2)}/{parts[1].zfill(2)}/{parts[2]}"
return str(date_str)
except:
return str(date_str)
# Compute document sign
def calculate_sign(self, td, imponibile):
"""Calculate document sign (+/-)"""
try:
td_num = int(td) if pd.notna(td) else 0
merce = float(str(imponibile).replace(',', '.')) if pd.notna(imponibile) else 0
# Transaction Type (TD) lists
negative_tds = [22, 25, 4, 40, 5, 51, 68, 8]
positive_tds = [1, 2, 41, 50, 55, 67]
neutral_tds = [20]
if td_num in negative_tds:
return '-'
elif td_num in positive_tds:
return '+'
elif td_num in neutral_tds:
return '+' if merce >= 0 else '-'
else:
# Default: use merchandise amount sign
return '+' if merce >= 0 else '-'
except:
return '+'
# Classify document type
def get_document_type(self, td, d):
"""Determine document type (PDV or GRD)"""
try:
td_num = int(td) if pd.notna(td) else 0
d_val = str(d).strip() if pd.notna(d) else ''
# Priority to column D if present
if d_val == 'S':
return 'GRD'
elif d_val == 'N':
return 'PDV'
# Otherwise use TD
grd_tds = [67, 68]
return 'GRD' if td_num in grd_tds else 'PDV'
except:
return 'PDV'
def load_csv_data(self, headers_file, excluded_suppliers_file):
"""Load data from CSV files"""
print("
# Load excluded suppliers
try:
df_suppliers = pd.read_csv(excluded_suppliers_file, sep=';', encoding='utf-8')
self.excluded_suppliers = set(df_suppliers['CodFor'].astype(str))
print(f"
except Exception as e:
print(f"
self.excluded_suppliers = set()
# Load document headers (Italian format with comma as decimal separator)
try:
# CSV has an initial empty column, so use index_col=0 to skip it
df_headers = pd.read_csv(headers_file, sep=';', encoding='utf-8', decimal=',',
index_col=0, # Skip first empty column
dtype={'TD': 'Int64', 'Pdv': 'Int64', 'Anag.': 'Int64',
'Numero': 'str', 'Destinatar': 'Int64', 'Consegnata': 'Int64',
'Pdv Dest': 'Int64'})
print(f"
# Process each document
valid_documents = []
supplier_errors = []
for _, row in df_headers.iterrows():
# Check if supplier is excluded
supplier_code = str(row.get('Anag.', ''))
if supplier_code in self.excluded_suppliers:
supplier_errors.append(dict(row))
continue
# ".0" fix
def clean_number_to_string(val):
"""Convert number to string, removing .0 if applicable"""
if pd.isna(val) or val == '':
return ''
try:
# If it's a float representing an integer, convert it
if isinstance(val, float) and val.is_integer():
return str(int(val))
return str(val)
except:
return str(val)
# Create normalized document
doc = {
'TD': clean_number_to_string(row.get('TD', '')),
'Pdv': clean_number_to_string(row.get('Pdv', '')),
'Anag.': clean_number_to_string(row.get('Anag.', '')),
'Ragione Sociale': str(row.get('Ragione Sociale', '')),
'Est': str(row.get('Est', '')),
'Numero': clean_number_to_string(row.get('Numero', '')),
'Data': self.format_date(row.get('Data', '')),
'T.Imponibile': float(row.get('T.Imponibile', 0)) if pd.notna(row.get('T.Imponibile', 0)) else 0,
'Totale Iva': float(row.get('Totale Iva', 0)) if pd.notna(row.get('Totale Iva', 0)) else 0,
'Totale': float(row.get('Totale', 0)) if pd.notna(row.get('Totale', 0)) else 0,
'Destinatar': clean_number_to_string(row.get('Destinatar', '')),
'Consegnata': clean_number_to_string(row.get('Consegnata', '')),
'D': str(row.get('D', '')),
'DT Cons.': str(row.get('DT Cons.', '')),
'Pdv Dest': clean_number_to_string(row.get('Pdv Dest', '')),
'Merce': float(row.get('Merce', 0)) if pd.notna(row.get('Merce', 0)) else 0,
# Computed fields
'NumNorm': self.normalize_doc_number(row.get('Numero', '')),
'Segno': self.calculate_sign(row.get('TD', ''), row.get('T.Imponibile', 0)),
'TipoDoc': self.get_document_type(row.get('TD', ''), row.get('D', ''))
}
valid_documents.append(doc)
# Separate PDV and GRD documents
self.documenti_pdv = [d for d in valid_documents if d['TipoDoc'] == 'PDV']
self.documenti_grd = [d for d in valid_documents if d['TipoDoc'] == 'GRD']
self.results['supplierErrors'] = supplier_errors
print(f"
print(f"
print(f"
except Exception as e:
print(f"
raise
def perform_reconciliation(self):
"""Perform reconciliation using the defined logic"""
print("\n
# Map to track best match for each PDV document
pdv_matches = {} # pdv_index -> {grd_index, error_type, score}
grd_used = {} # grd_index -> pdv_index that uses it
doc_pdv = self.documenti_pdv
doc_grd = self.documenti_grd
print(f"
# Build indexes to optimize searches
print("
# GRD index for fast lookup: (Date, Anag, NumNorm, Segno) -> [grd_indices]
grd_index = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['Data'], grd['Anag.'], grd['NumNorm'], grd['Segno'])
if key not in grd_index:
grd_index[key] = []
grd_index[key].append(grd_idx)
def add_match_if_better(pdv_idx, grd_idx, match_type, score):
"""Add match if it has higher priority than existing one"""
current_match = pdv_matches.get(pdv_idx)
if not current_match or score > current_match['score']:
current_grd_user = grd_used.get(grd_idx)
if (current_grd_user is None or current_grd_user == pdv_idx or
(current_grd_user in pdv_matches and score > pdv_matches[current_grd_user]['score'])):
if current_grd_user is not None and current_grd_user != pdv_idx:
if current_grd_user in pdv_matches:
del pdv_matches[current_grd_user]
pdv_matches[pdv_idx] = {
'grd_index': grd_idx,
'tipo_errore': match_type,
'score': score
}
grd_used[grd_idx] = pdv_idx
return True
return False
# 1. PDV INVERSION (4 fields: Date, Anag, NumNorm, Segno — BUT different PDVs)
print("
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Data'], pdv['Anag.'], pdv['NumNorm'], pdv['Segno'])
if key in grd_index:
for grd_idx in grd_index[key]:
grd = doc_grd[grd_idx]
if pdv['Pdv'] != grd['Pdv']: # Different PDVs required for inversion
add_match_if_better(pdv_idx, grd_idx, 'PDV Inversion', self.match_scores['PDV Inversion'])
# 2. DOCUMENT NUMBER ERRORS (4 fields: Anag, PDV, Date, Segno — BUT different NumNorm)
print("
# Specific index for number errors: (Anag, PDV, Date, Segno) -> [grd_indices]
grd_index_numero = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['Anag.'], grd['Pdv'], grd['Data'], grd['Segno'])
if key not in grd_index_numero:
grd_index_numero[key] = []
grd_index_numero[key].append(grd_idx)
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Anag.'], pdv['Pdv'], pdv['Data'], pdv['Segno'])
if key in grd_index_numero:
for grd_idx in grd_index_numero[key]:
grd = doc_grd[grd_idx]
if pdv['NumNorm'] != grd['NumNorm']: # Different NumNorm required for number error
add_match_if_better(pdv_idx, grd_idx, 'Number Error', self.match_scores['Number Error'])
# 3. DATE ERRORS (4 fields: NumNorm, Anag, PDV, Segno — BUT different Dates)
print("
# Specific index for date errors: (NumNorm, Anag, PDV, Segno) -> [grd_indices]
grd_index_data = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['NumNorm'], grd['Anag.'], grd['Pdv'], grd['Segno'])
if key not in grd_index_data:
grd_index_data[key] = []
grd_index_data[key].append(grd_idx)
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['NumNorm'], pdv['Anag.'], pdv['Pdv'], pdv['Segno'])
if key in grd_index_data:
for grd_idx in grd_index_data[key]:
grd = doc_grd[grd_idx]
if pdv['Data'] != grd['Data']: # Different dates required for date error
add_match_if_better(pdv_idx, grd_idx, 'Date Error', self.match_scores['Date Error'])
# 4. 5-FIELD MATCH (PDV, Anag, Date, NumNorm, Segno all identical)
print("
# Specific index for 5-field matches: (PDV, Anag, Date, NumNorm, Segno) -> [grd_indices]
grd_index_5campi = {}
for grd_idx, grd in enumerate(doc_grd):
key = (grd['Pdv'], grd['Anag.'], grd['Data'], grd['NumNorm'], grd['Segno'])
if key not in grd_index_5campi:
grd_index_5campi[key] = []
grd_index_5campi[key].append(grd_idx)
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Pdv'], pdv['Anag.'], pdv['Data'], pdv['NumNorm'], pdv['Segno'])
if key in grd_index_5campi:
for grd_idx in grd_index_5campi[key]:
add_match_if_better(pdv_idx, grd_idx, '5-Field Match', self.match_scores['5-Field Match'])
# 5. PERFECT MATCH (PDV, Anag, Date, NumNorm, Merce, Segno all identical)
print("
for pdv_idx, pdv in enumerate(doc_pdv):
key = (pdv['Pdv'], pdv['Anag.'], pdv['Data'], pdv['NumNorm'], pdv['Segno'])
if key in grd_index_5campi:
for grd_idx in grd_index_5campi[key]:
grd = doc_grd[grd_idx]
if abs(pdv['Merce'] - grd['Merce']) < 0.01: # Same merchandise amount for perfect match
add_match_if_better(pdv_idx, grd_idx, 'Perfect Match', self.match_scores['Perfect Match'])
# Build final results
print("
# Categorize final matches by type
for pdv_idx, match in pdv_matches.items():
match_data = {
'pdv': {**doc_pdv[pdv_idx], '_index': pdv_idx},
'grd': {**doc_grd[match['grd_index']], '_index': match['grd_index']},
'tipoErrore': match['tipo_errore']
}
if match['tipo_errore'] == 'PDV Inversion':
self.results['inversions'].append(match_data)
elif match['tipo_errore'] == 'Number Error':
self.results['numberErrors'].append(match_data)
elif match['tipo_errore'] == 'Date Error':
self.results['dateErrors'].append(match_data)
elif match['tipo_errore'] == '5-Field Match':
self.results['fiveFieldMatches'].append(match_data)
elif match['tipo_errore'] == 'Perfect Match':
self.results['perfectMatches'].append(match_data)
# Find unreconciled documents
for pdv_idx, pdv in enumerate(doc_pdv):
if pdv_idx not in pdv_matches:
self.results['unreconciledPDV'].append({**pdv, '_index': pdv_idx})
grd_used_set = set(grd_used.keys())
for grd_idx, grd in enumerate(doc_grd):
if grd_idx not in grd_used_set:
self.results['unreconciledGRD'].append({**grd, '_index': grd_idx})
# Final stats
total_matched = len(pdv_matches)
total_pdv = len(doc_pdv)
total_grd = len(doc_grd)
print(f"
print(f"
print(f"
print(f"
print(f"
print(f"
print(f"
print(f"
print(f"
return self.results
def create_excel_output(self, output_file):
Подробнее здесь: https://stackoverflow.com/questions/798 ... -credit-no
Мобильная версия