Как проверить и улучшить трубопровод предварительной обработки данных для поисковой системы электронной коммерции?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как проверить и улучшить трубопровод предварительной обработки данных для поисковой системы электронной коммерции?

Сообщение Anonymous »

Я новичок в разработке данных, и недавно мне было поручено создать Production Data Data Pripheling с Python и MySQL для поисковой системы на платформе электронной коммерции . Трубопровод включает в себя качество данных и проверки проверки , но, поскольку моя команда и я относительно новы в этом домене, мы не уверены в следующем:

Как мы можем проверить, правильно ли реализуется наш текущий конвей Рассмотрим для лучшей производительности, масштабируемости и надежности? > < /li>
< /ol>
Я прикрепил код для трубопровода ниже. Я был бы очень признателен за любую конструктивную обратную связь, критические замечания или предложения
для улучшения. Заранее спасибо за вашу помощь! < /P>
# Database connection details
db_config = {
'user': 'user', # Replace with your database username
'password': 'password', # Replace with your database password
'host': 'host', # Replace with your database host
'port': 3306, # Replace with your database port
'database': 'mysql'
}

# Create connection string and engine
connection_string = f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}"
engine = create_engine(connection_string)

# SQL query
query = """
insert the SQL query here
"""

# Function: Fetch Data from Database with Chunking
def fetch_data(query, engine, chunksize=10000):
"""Fetches data from the database using the given SQL query in chunks."""
print("Connecting to database and fetching data...")
chunks = [] # List to store individual chunks
with engine.connect() as connection:
# Use pd.read_sql with chunksize to fetch data in smaller chunks
for chunk in pd.read_sql(query, connection, chunksize=chunksize):
chunks.append(chunk) # Append each chunk to the list
print("Data fetched successfully.")
# Concatenate all chunks into a single DataFrame
data = pd.concat(chunks, ignore_index=True)
return data

# Function: Preprocess Data
# optimize clean_text to regex pass for better performance -- 2
# and add check for outliers, statistical summary and basic data summaries, data type checks, null/missing value check and many others \
# which then should processed into a report before proceeding with preprocess changes -- 3
def analyze_data(df: pd.DataFrame) -> Dict:
"""Generate statistical summary and data quality report."""
stats = {
'missing_values': df.isnull().sum().to_dict(),
'dtypes': df.dtypes.to_dict(),
'outliers': {},
'unique_counts': df.nunique().to_dict(),
'memory_usage': df.memory_usage(deep=True).sum() / 1024**2 # MB
}

# Detect outliers in numeric columns using IQR method
for col in df.select_dtypes(include=['float64', 'int64']):
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
outliers = df[(df[col] < Q1 - 1.5 * IQR) | (df[col] > Q3 + 1.5 * IQR)]
stats['outliers'][col] = len(outliers)

return stats

def clean_text(text: str) -> str:
"""Optimized text cleaning using compiled regex patterns."""
if pd.isnull(text):
return ""
patterns = {
'html': re.compile(r']+>'),
'special_chars': re.compile(r'[^a-zA-Z0-9\s]'),
'whitespace': re.compile(r'\s+')
}
text = patterns['html'].sub('', text)
text = patterns['special_chars'].sub('', text)
text = patterns['whitespace'].sub(' ', text).strip()
return text.lower()

def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
"""Enhanced data preprocessing with quality checks and optimized cleaning."""
# Create working copy
df = df.copy()

# Generate initial data quality report
initial_stats = analyze_data(df)

# Clean text columns in one operation
text_columns = ['description', 'variant', 'store', 'store_description']
df[text_columns] = df[text_columns].apply(lambda x: x.apply(clean_text))

# Handle numeric nulls with column-specific strategies
numeric_fills = {
'price': df['price'].fillna(0)
}
# Handle categorical nulls with empty strings
var_fills = {
'country':df['country'].fillna("0"),
'state':df['state'].fillna("0"),
'city':df['city'].fillna("0")
}
df.fillna(numeric_fills, inplace=True)
df.fillna(var_fills, inplace=True)

# Check for invalid values
df = df[df['price'] > 0] # Ensure prices are greater than 0

# Convert text columns to lowercase
object_columns = df.select_dtypes(include=['object']).columns
df[object_columns] = df[object_columns].apply(lambda x: x.str.lower())

# Generate final data quality report
final_stats = analyze_data(df)

# Add processing metadata
df.attrs['preprocessing_stats'] = {
'initial': initial_stats,
'final': final_stats,
'rows_processed': len(df),
'columns_processed': len(df.columns)
}

return df

def get_preprocessing_summary(df: pd.DataFrame) -> Dict:
"""Return preprocessing statistics and data quality metrics."""
if 'preprocessing_stats' not in df.attrs:
return {'error': 'No preprocessing statistics available'}
return df.attrs['preprocessing_stats']

# Function: Separate and Save Products and Services
def preprocess_and_separate(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Optimized preprocessing and separation of data into products, services, and shops.
Returns tuple of (products_df, services_df, shops_df)
"""
# Configuration dictionaries
config = {
'products': {
'mask': df['digital'] == 0,
'id_col': 'product_id',
'name_col': 'product_name'
},
'services': {
'mask': df['digital'] == 1,
'id_col': 'service_id',
'name_col': 'service_name'
}
}

# Common columns to drop
drop_cols = ['digital', 'user_id', 'discount', 'class', 'store_id', 'store_description']

# Common column renames
base_rename = {
'unique_id': None, # Will be updated dynamically
'display_name': None, # Will be updated dynamically
'category_names': 'category',
'subcategory_names': 'subcategory',
'subsubcategory_name': 'subsubcategory'
}

# Process products and services in a single loop
processed_dfs = {}
for df_type, params in config.items():
# Update dynamic rename mappings
rename_map = base_rename.copy()
rename_map['unique_id'] = params['id_col']
rename_map['display_name'] = params['name_col']

# Process dataframe
processed_dfs[df_type] = (
df[params['mask']]
.drop(columns=drop_cols)
.rename(columns=rename_map)
.reset_index(drop=True)
)

# Process shops
shop_columns = {
'store': 'first',
'store_id': 'first',
'variant': lambda x: ', '.join(filter(bool, x)),
'store_description': 'first',
'supplier': 'first',
'country': 'first',
'state': 'first',
'city': 'first',
'business_source': 'first'
}

# Create product/service name columns efficiently
name_df = pd.DataFrame({
'product_name': df.apply(lambda x: x['display_name'] if x['digital'] == 0 else "", axis=1),
'service_name': df.apply(lambda x: x['display_name'] if x['digital'] == 1 else "", axis=1)
})

# Add name columns to aggregation config
shop_columns.update({
'product_name': lambda x: ', '.join(filter(bool, x)),
'service_name': lambda x: ', '.join(filter(bool, x))
})

# Process shops dataframe
shops_df = (pd.concat([df[list(shop_columns.keys())], name_df], axis=1)
.groupby('store', as_index=False)
.agg(shop_columns))

# Add metadata to all dataframes
dfs = {
'products': processed_dfs['products'],
'services': processed_dfs['services'],
'shops': shops_df
}

for name, df_obj in dfs.items():
df_obj.attrs['preprocessing_info'] = {
'rows': len(df_obj),
'columns': list(df_obj.columns),
'memory_usage_mb': df_obj.memory_usage(deep=True).sum() / 1024**2
}
# Save each dataframe
filename = f"{'shop' if name == 'shops' else name + '_data'}.csv"
df_obj.to_csv(filename, index=False)
print(f"Saved {len(df_obj)} {name} to '{filename}'")

return tuple(dfs.values())

def get_processing_summary(dfs: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]) -> Dict:
"""Returns summary statistics for the processed dataframes."""
return {
name: df.attrs.get('preprocessing_info', {})
for name, df in zip(['products', 'services', 'shops'], dfs)
}

# data quality checks
def data_quality_checks(df):
"""Checks for missing values, invalid data, and consistency in the dataframe."""
issues = []
critical_columns = ['display_name', 'price']
for col in critical_columns:
if df[col].isnull().any():
issues.append(f"Column '{col}' contains missing values.")

# Check for missing critical values
if df['display_name'].isnull().any():
issues.append("Missing values found in 'display_name' (product/service name).")
if df['price'].isnull().any() or (df['price']
Обработка недостающих значений с использованием замены с помощью нулей.
Нормализация категориальных функций для подготовки данных для поисковой системы.
Применение проверки проверки, таких как проверка для дубликаты и неправильные типы данных.
Я ожидал, что трубопровод будет эффективным, масштабируемым и подходящим для производственной среды, но я не уверен, что: < /p>
следует за лучшими практиками для Система на уровне производства.
Проверки проверки являются достаточными, или если есть лучшие альтернативы.
Текущий подход (без Scikit-learn) является адекватным, или, если интеграция, это улучшит преобразование.
С нетерпением жду отзывов о том, является ли эта реализация звучной и как ее можно оптимизировать.

Подробнее здесь: https://stackoverflow.com/questions/793 ... merce-sear
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»