Bài viết này sẽ giúp bạn giải quyết vấn đề thường gặp khi kiểm thử Airflow DAG bằng Pytest, cụ thể là lỗi ti is None
. Chúng ta sẽ đi sâu vào nguyên nhân gây ra lỗi, cách tái tạo nó và các giải pháp hiệu quả để đảm bảo quy trình kiểm thử Airflow của bạn diễn ra suôn sẻ. Nếu bạn đang gặp khó khăn trong việc kiểm thử các biến Jinja và muốn đảm bảo DAG của mình hoạt động chính xác, đây là bài viết dành cho bạn.
Khi kiểm thử Airflow DAG bằng Pytest, bạn có thể gặp phải lỗi ti is None
. Lỗi này thường xảy ra khi bạn cố gắng truy cập vào một Task Instance (ti
) không tồn tại hoặc chưa được khởi tạo đúng cách trong quá trình kiểm thử.
Để hiểu rõ hơn về lỗi này, hãy xem xét một ví dụ cụ thể. Giả sử bạn có một Airflow DAG đơn giản và bạn muốn kiểm tra xem các biến Jinja được truyền vào task có được render đúng cách hay không.
Dưới đây là một đoạn code ví dụ về một Airflow DAG và một test case sử dụng Pytest:
import datetime
import pytest
from airflow.models import BaseOperator, TaskInstance
from airflow.models.dag import DAG
from airflow.utils.types import DagRunType
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils import timezone
class SampleOperator(BaseOperator):
template_fields = ("_start_date", "_end_date")
def __init__(self, start_date, end_date, **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._end_date = end_date
def execute(self, context):
return context
@pytest.fixture
def dag():
with DAG(
dag_id="test_dag_id",
schedule=None,
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2025, 4, 5),
"end_date": datetime.datetime(2025, 4, 6),
},
) as dag:
SampleOperator(
task_id="test_task_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
)
return dag
def test_execute(dag):
run_id = f"test_run_id"
dagrun = dag.create_dagrun(
data_interval=(
dag.default_args["start_date"],
dag.default_args["end_date"],
),
run_id=run_id,
start_date=dag.default_args["end_date"],
state=DagRunState.RUNNING,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id="test_task_id")
assert ti is not None
# error here, ti is None
Trong ví dụ này, chúng ta tạo một DAG với một task duy nhất sử dụng SampleOperator
. Hàm test_execute
cố gắng lấy Task Instance bằng cách sử dụng dagrun.get_task_instance
. Tuy nhiên, trong một số trường hợp, ti
có thể là None
, dẫn đến lỗi.
Lỗi ti is None
thường xảy ra do một số nguyên nhân sau:
Dưới đây là một số giải pháp bạn có thể thử để khắc phục lỗi ti is None
:
Khi tạo một task bên trong khối with DAG(...) as dag:
, hãy đảm bảo rằng bạn truyền dag=dag
một cách rõ ràng cho operator. Điều này giúp Airflow biết rằng task thuộc về DAG này.
Ví dụ, thay vì:
SampleOperator(
task_id="test_task_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
)
Hãy sử dụng:
SampleOperator(
task_id="test_task_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
dag=dag
)
Đảm bảo rằng quá trình tạo DAG Run diễn ra thành công. Kiểm tra các tham số như start_date
, end_date
, state
và run_type
để đảm bảo chúng được thiết lập đúng cách.
Thay vì sử dụng datetime.datetime.now()
, hãy sử dụng airflow.utils.timezone.utcnow()
để đảm bảo tính nhất quán về múi giờ.
Đảm bảo bạn đang sử dụng phiên bản Airflow ổn định và tương thích với các thư viện và công cụ bạn đang sử dụng. Một số lỗi có thể đã được khắc phục trong các phiên bản mới hơn.
Cấu hình các rule triger , thời gian timeout hợp lý, điều này cũng có thể ảnh hưởng tới việc Task Instance có chạy hay không
Lỗi ti is None
là một vấn đề thường gặp khi kiểm thử Airflow DAG bằng Pytest. Bằng cách hiểu rõ nguyên nhân gây ra lỗi và áp dụng các giải pháp được đề xuất trong bài viết này, bạn có thể khắc phục vấn đề và đảm bảo quy trình kiểm thử Airflow của bạn diễn ra hiệu quả hơn. Đừng quên kiểm tra kỹ code, đảm bảo các task được thêm vào DAG đúng cách và cấu hình các tham số kiểm thử một cách chính xác.
Bài viết liên quan