import datetime
import pytest
from airflow.models import BaseOperator, TaskInstance
from airflow.models.dag import DAG
from airflow.utils.types import DagRunType
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils import timezone
class SampleOperator(BaseOperator):
template_fields = ("_start_date", "_end_date")
def __init__(self, start_date, end_date, **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._end_date = end_date
def execute(self, context):
return context
@pytest.fixture
def dag():
with DAG(
dag_id="test_dag_id",
schedule=None,
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2025, 4, 5),
"end_date": datetime.datetime(2025, 4, 6)
}
) as dag:
SampleOperator(
task_id="test_task_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}"
)
return dag
def test_execute(dag):
run_id = f"test_run_id"
dagrun = dag.create_dagrun(
data_interval=(
dag.default_args["start_date"],
dag.default_args["end_date"]
),
run_id=run_id,
start_date=dag.default_args["end_date"],
state=DagRunState.RUNNING,
run_type=DagRunType.MANUAL
)
ti = dagrun.get_task_instance(task_id="test_task_id")
assert ti is not None # error here, ti is None
< /code>
Но задача < /code> none
моя версия воздушного потока - 2.10.5 < /p>
Вот ошибка < /p>
ti = dagrun.get_task_instance(task_id="test_task_id")
> assert ti is not None
E assert None is not None
tests/test_instance_context.py:59: AssertionError
Я ссылался на код https://airflow.apache.org/docs/apache- ... ting-a-dag
Но я не знаю, почему ..
Я хочу проверить задачу. Когда я прошел переменные Jinja, затем воздушный поток автоматически отображает переменные методом запуска. Но я получил ошибку выше ..
При тестировании воздушного потока с Pytest я получил ошибку. < /p> [code]import datetime import pytest
from airflow.models import BaseOperator, TaskInstance from airflow.models.dag import DAG from airflow.utils.types import DagRunType from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils import timezone
class SampleOperator(BaseOperator): template_fields = ("_start_date", "_end_date")
ti = dagrun.get_task_instance(task_id="test_task_id") assert ti is not None # error here, ti is None
< /code> Но задача < /code> none моя версия воздушного потока - 2.10.5 < /p> Вот ошибка < /p> ti = dagrun.get_task_instance(task_id="test_task_id") > assert ti is not None E assert None is not None
tests/test_instance_context.py:59: AssertionError [/code] Я ссылался на код https://airflow.apache.org/docs/apache-airflow/2.10.5/best-practices.html#testing-a-dag Но я не знаю, почему .. Я хочу проверить задачу. Когда я прошел переменные Jinja, затем воздушный поток автоматически отображает переменные методом запуска. Но я получил ошибку выше ..
Допустим, мне нужен дополнительный парамет (запускаемый с помощью конфигурации), который может быть пустой или какая -то строка:
Param( , type= )
Если я оставлю его пустым, он будет отображаться, как никто не то, что я не ожидаю.
Почему такая...
The introduction of virtual threads in Java 21 has created a lot of buzz around the community. While I still try to wrap my head around actual benefit I will be getting by this feature, I stumble upto three terminologies
virtual thread carrier...
The introduction of virtual threads in Java 21 has created a lot of buzz around the community. While I still try to wrap my head around actual benefit I will be getting by this feature, I stumble upto three terminologies
virtual thread carrier...
Я внедрил простой задачи.factory.startnew () И мне интересно, как я могу сделать это с помощью task.run () вместо? Task.Factory.StartNew(new Action(
(x) =>
{
// Do something with 'x'
}), rawData);
Я ожидал задачи. waitall бросить, когда Semaphore достигает своего MaxCount, но в результате все задачи были повесены. SemaphoreSlim semaphore = new(initialCount: 3, maxCount: 10);
var tasks = Enumerable.Range(1, 20).Select(_ =>
{
return...