본문 바로가기

AI/아이펠_리서치

Airflow 디버깅 및 수정내용

개요

MLOps 과정 'Airflow 구성하기 - 예제 2 _ 파트 2' 에서 메모리 문제로 error 발생한 부분에 대한 디버깅 및 수정 내용입니다.

디버깅 후 수정이라 Airflow 개발 내용은 생략 합니다.

 

문제점

  • 최종 Airflow HuggingFace 데이터셋 등록 과정에서 에러 발생
  • Hugging Face 에 gugalove/mlops_gsod 로 create_repo 는 되나 데이터셋이 업로드 안됨

Airflow Logs
Hugging Face repo

 

수정내용

  • 도커의 메모리 부족 의심되어 아래와 같이 Memory 확장 하였으나 failed 되는 시간만 늘어나고 결국 에러 발생 함

docker 환경 설정
Airflow DAG

디버깅 코드 추가 하여 문제점 확인

  • mlops-quicklab/airflow/basic/dags/bigquery_to_huggingface.py
  • 18Gb 로 확장해도 결국 메모리 터지는 문제 발생함
  • 115 개 중 46번째에서 터짐
  • 전체 blobs 가 너무 많아서(115개) 다 커버하기 위해서는 메모리를 아주 많이 확보하던가 코드 구조를 변경해야 함
for blob_name in blobs:
      blob_content = gcs_hook.download(gcs_bucket_name, blob_name)
      print(f"Blob {blob_name} content: {blob_content[:100]}")
      dfs.append(pd.read_csv(io.BytesIO(blob_content), sep=","))
      print(f"DataFrame shape: {dfs[-1].shape}")
  merged_df = pd.concat(dfs, ignore_index=True)
  print(merged_df.head())
  print(f"Merged DataFrame shape: {merged_df.shape}")

  with io.BytesIO() as byte_stream:
      merged_df.to_csv(byte_stream)
      byte_stream.seek(0)
      print(f"Byte stream size: {len(byte_stream.getvalue())} bytes")
      print(byte_stream.getvalue()[:100])  # 파일의 첫 100바이트 출력

      api.upload_file(
          path_or_fileobj=byte_stream,
          path_in_repo="train.csv",
          repo_id=f"{hf_username}/{dataset_repo_name}",
          repo_type="dataset",
          token=hf_token,
      )
[2025-01-14, 10:10:51 UTC] {logging_mixin.py:188} INFO - Blob dataset__13fcecec-550c-4106-ba5c-6a744992b492_000000000044.csv content: b'station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_de'
[2025-01-14, 10:10:52 UTC] {logging_mixin.py:188} INFO - DataFrame shape: (994914, 31)
[2025-01-14, 10:11:05 UTC] {logging_mixin.py:188} INFO - Blob dataset__13fcecec-550c-4106-ba5c-6a744992b492_000000000045.csv content: b'station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_de'
[2025-01-14, 10:11:07 UTC] {logging_mixin.py:188} INFO - DataFrame shape: (994883, 31)
[2025-01-14, 10:11:18 UTC] {logging_mixin.py:188} INFO - Blob dataset__13fcecec-550c-4106-ba5c-6a744992b492_000000000046.csv content: b'station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_de'
[2025-01-14, 10:11:25 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code -9
[2025-01-14, 10:11:25 UTC] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2025-01-14, 10:11:32 UTC] {scheduler_job_runner.py:1754} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/bigquery_to_huggingface.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'bigquery_to_huggingface', 'Task Id': 'register_dataset_to_huggingface', 'Run Id': 'manual__2025-01-14T09:59:58.278108+00:00', 'Hostname': '3ef93533ae5d', 'External Executor Id': '28071ddb-a4e1-437d-8a4b-cb01567a53d5'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0xffff976fb400>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)

 

실습용 코드 수정

  • mlops-quicklab/airflow/basic/dags/bigquery_to_huggingface.py
  • blobs이 5개 이상일 경우 5개만 처리 하도록 수정함
gcs_hook = GCSHook()
blobs = gcs_hook.list(gcs_bucket_name, prefix=gcs_object_prefix)
if not blobs:
    raise ValueError(f"No files found in bucket {gcs_bucket_name} with prefix {gcs_object_prefix}")

# blobs 리스트에서 최대 5개만 처리
if len(blobs) > 5:
    print(f"Found {len(blobs)} blobs. Processing the first 5 only.")
    blobs = blobs[:5]
else:
    print(f"Found {len(blobs)} blobs. Processing all.")

dfs = []
for blob_name in blobs:
    blob_content = gcs_hook.download(gcs_bucket_name, blob_name)
    print(f"Blob {blob_name} content: {blob_content[:100]}")
    dfs.append(pd.read_csv(io.BytesIO(blob_content), sep=","))
    print(f"DataFrame shape: {dfs[-1].shape}")
merged_df = pd.concat(dfs, ignore_index=True)
print(merged_df.head())
print(f"Merged DataFrame shape: {merged_df.shape}")

with io.BytesIO() as byte_stream:
    merged_df.to_csv(byte_stream)
    byte_stream.seek(0)
    print(f"Byte stream size: {len(byte_stream.getvalue())} bytes")
    print(byte_stream.getvalue()[:100])  # 파일의 첫 100바이트 출력

    api.upload_file(
        path_or_fileobj=byte_stream,
        path_in_repo="train.csv",
        repo_id=f"{hf_username}/{dataset_repo_name}",
        repo_type="dataset",
        token=hf_token,
    )

 

Airflow 에 수정 code 반영 학인

task 정상 확인

log 메세지 정상 확인

  • 파일 크기 : [2025-01-14, 11:08:10 UTC] {logging_mixin.py:188} INFO - Byte stream size: 1051994356 bytes

hugging face 에 train.csv 파일 업로드 확인(약 1.05 GB)