Я внедряю механизм очередей за лазурными изделиями, чтобы одновременно запустить несколько пиблейнов. У меня есть два функциональных приложения,
functiona_app1, function_app2. В каждом из этих функциональных приложений у меня есть две трубопроводы, созданные для нынешнего клиента и один для нового клиента. Мое требование состоит в том, что один трубопровод должен быть запускается в каждом функциональном приложении одновременно независимо от типа клиента. Другие запускаемые трубопроводы должны быть в очереди до тех пор, пока не станет доступно какое -либо из функционального приложения. Если я запускаю другого текущего клиента, он должен выполнить другой трубопровод в другом приложении функции. И если я запускаю нового клиента, поскольку оба приложения функциональных приложений используются, он должен ждать в очереди.class PipelineManager:
def __init__(self):
self.data_factory_client = DataFactoryManagementClient(
credential=DefaultAzureCredential(), subscription_id=SUBSCRIPTION_ID
)
# Reorganize pipeline configurations by function app
self.pipeline_configs = {
"function_app1": {
"max_concurrent": 1, # Only one pipeline per function app
"pipelines": {
"new_customer": {
"name": "new customer report with validation",
"priority": 1,
},
"current_customer": {
"name": "final current customer optimization",
"priority": 1,
},
}
},
"function_app2": {
"max_concurrent": 1, # Only one pipeline per function app
"pipelines": {
"new_customer": {
"name": "new customer pipeline 02",
"priority": 1,
},
"current_customer": {
"name": "current customer pipeline 02",
"priority": 1,
},
}
}
}
# Track running pipelines with timestamps and status
self.running_pipelines = {}
# Track pipeline usage history for load balancing
self.pipeline_usage = {} # {pipeline_name: last_used_timestamp}
# Track function app usage
self.function_app_running = {} # {function_app: {run_id, start_time}}
def _cleanup_running_pipelines(self):
"""Clean up completed or old pipeline entries and function app tracking"""
current_time = datetime.now()
to_remove = []
for run_id, info in self.running_pipelines.items():
if info["status"] in ["Succeeded", "Failed", "Cancelled"]:
if current_time - info["start_time"] > timedelta(hours=1):
to_remove.append(run_id)
# Clean up function app tracking
for func_app, running_info in self.function_app_running.items():
if running_info.get("run_id") == run_id:
del self.function_app_running[func_app]
elif current_time - info["start_time"] > timedelta(hours=24): # Safety cleanup
to_remove.append(run_id)
for run_id in to_remove:
del self.running_pipelines[run_id]
def _get_function_app_availability(self):
"""Check which function apps are available"""
self._cleanup_running_pipelines()
available_apps = {}
for func_app in self.pipeline_configs.keys():
is_available = func_app not in self.function_app_running or \
self.function_app_running[func_app]["run_id"] not in self.running_pipelines
available_apps[func_app] = is_available
return available_apps
def _calculate_pipeline_score(self, func_app, pipeline_name, priority):
"""Calculate a score for pipeline selection based on multiple factors"""
current_time = datetime.now()
# Time since last use score (0-1, higher is better)
last_used = self.pipeline_usage.get(pipeline_name, current_time - timedelta(hours=24))
time_since_use = (current_time - last_used).total_seconds()
time_score = min(time_since_use / 3600, 1.0) # Cap at 1 hour
# Priority score (0-1, lower is better)
priority_score = priority / 10 # Assuming max priority is 10
# Function app usage score (0-1, higher is better)
app_last_used = self.function_app_running.get(func_app, {}).get("start_time", current_time - timedelta(hours=24))
app_time_score = min((current_time - app_last_used).total_seconds() / 3600, 1.0)
# Add small random factor for tie-breaking (0-0.1)
randomization = random.uniform(0, 0.1)
# Combine scores (lower is better)
final_score = (
(1 - time_score) * 0.3 + # 30% weight to time since last use
priority_score * 0.2 + # 20% weight to priority
(1 - app_time_score) * 0.4 + # 40% weight to function app availability
randomization * 0.1 # 10% weight to randomization
)
return final_score
def get_available_pipeline(self, analysis_type):
"""Choose the most suitable pipeline based on function app availability"""
logger.info(f"\nSelecting pipeline for {analysis_type}")
# Get function app availability
available_apps = self._get_function_app_availability()
logger.info("Function app availability:")
for app, available in available_apps.items():
logger.info(f" {app}: {'Available' if available else 'In Use'}")
available_pipelines = []
for func_app, config in self.pipeline_configs.items():
if not available_apps[func_app]:
continue
if analysis_type in config["pipelines"]:
pipeline_config = config["pipelines"][analysis_type]
score = self._calculate_pipeline_score(
func_app,
pipeline_config["name"],
pipeline_config["priority"]
)
available_pipelines.append({
"name": pipeline_config["name"],
"score": score,
"function_app": func_app
})
if not available_pipelines:
logger.warning("No available pipelines found!")
return None
# Sort by score (lower is better)
available_pipelines.sort(key=lambda x: x["score"])
selected = available_pipelines[0]
logger.info(f"\nSelected pipeline: {selected['name']}")
logger.info(f" Function App: {selected['function_app']}")
logger.info(f" Score: {selected['score']:.3f}")
# Update usage timestamps
self.pipeline_usage[selected['name']] = datetime.now()
return selected["name"]
def get_pipeline_status(self, run_id):
"""Get the status of a pipeline run"""
try:
run_response = self.data_factory_client.pipeline_runs.get(
resource_group_name=RESOURCE_GROUP_NAME,
factory_name=DATA_FACTORY_NAME,
run_id=run_id,
)
status = run_response.status
# Update status in our tracking
if run_id in self.running_pipelines:
self.running_pipelines[run_id]["status"] = status
# If pipeline is complete, clean up function app tracking
if status in ["Succeeded", "Failed", "Cancelled"]:
for func_app, running_info in self.function_app_running.items():
if running_info.get("run_id") == run_id:
del self.function_app_running[func_app]
return status
except Exception as e:
logger.error(f"Failed to get pipeline status: {e}")
if run_id in self.running_pipelines:
# Clean up tracking on error
func_app = self.running_pipelines[run_id].get("function_app")
if func_app and func_app in self.function_app_running:
del self.function_app_running[func_app]
del self.running_pipelines[run_id]
return "Failed"
def run_pipeline(self, analysis_type, parameters):
try:
# Log current pipeline loads and metrics
logger.info("\nStarting pipeline run")
logger.info("Current pipeline metrics:")
metrics = self.get_pipeline_metrics()
logger.info(json.dumps(metrics, indent=2))
pipeline_name = self.get_available_pipeline(analysis_type)
if not pipeline_name:
raise ValueError(f"No available pipeline for analysis type: {analysis_type}")
# Find which function app this pipeline belongs to
function_app = None
for app, config in self.pipeline_configs.items():
if any(p["name"] == pipeline_name for p in config["pipelines"].values()):
function_app = app
break
run_response = self.data_factory_client.pipelines.create_run(
resource_group_name=RESOURCE_GROUP_NAME,
factory_name=DATA_FACTORY_NAME,
pipeline_name=pipeline_name,
parameters=parameters,
)
# Track the running pipeline
self.running_pipelines[run_response.run_id] = {
"pipeline_name": pipeline_name,
"start_time": datetime.now(),
"status": "InProgress",
"function_app": function_app
}
# Track function app usage
self.function_app_running[function_app] = {
"run_id": run_response.run_id,
"start_time": datetime.now()
}
logger.info(f"Started pipeline '{pipeline_name}' on {function_app} with run_id: {run_response.run_id}")
return run_response.run_id
except Exception as e:
logger.error(f"Failed to start pipeline: {e}")
return None
def get_pipeline_metrics(self):
"""Get current metrics for all function apps and pipelines"""
self._cleanup_running_pipelines()
metrics = {}
for func_app, config in self.pipeline_configs.items():
metrics[func_app] = {
"is_available": func_app not in self.function_app_running,
"current_run": None,
"pipelines": {}
}
if func_app in self.function_app_running:
run_id = self.function_app_running[func_app]["run_id"]
if run_id in self.running_pipelines:
metrics[func_app]["current_run"] = {
"pipeline": self.running_pipelines[run_id]["pipeline_name"],
"run_id": run_id,
"start_time": self.running_pipelines[run_id]["start_time"].isoformat(),
"status": self.running_pipelines[run_id]["status"]
}
for analysis_type, pipeline_config in config["pipelines"].items():
pipeline_name = pipeline_config["name"]
last_used = self.pipeline_usage.get(pipeline_name)
metrics[func_app]["pipelines"][analysis_type] = {
"name": pipeline_name,
"last_used": last_used.isoformat() if last_used else None
}
return metrics
< /code>
Я использую механизм оценки, чтобы попробовать, если в зависимости от наличия трубопровода, трубопровод будет выполнен. Но я сталкиваюсь с проблемой, в которой приложение Pipelines из той же функции работают одновременно. Не уверен, где проблема. Пожалуйста, помогите!
Подробнее здесь: https://stackoverflow.com/questions/793 ... he-same-ad