개요
MLOps 과정 'Airflow 구성하기 - 예제 2 _ 파트 2' 에서 메모리 문제로 error 발생한 부분에 대한 디버깅 및 수정 내용입니다.
디버깅 후 수정이라 Airflow 개발 내용은 생략 합니다.
문제점
- 최종 Airflow HuggingFace 데이터셋 등록 과정에서 에러 발생
- Hugging Face 에 gugalove/mlops_gsod 로 create_repo 는 되나 데이터셋이 업로드 안됨
수정내용
- 도커의 메모리 부족 의심되어 아래와 같이 Memory 확장 하였으나 failed 되는 시간만 늘어나고 결국 에러 발생 함
디버깅 코드 추가 하여 문제점 확인
- mlops-quicklab/airflow/basic/dags/bigquery_to_huggingface.py
- 18Gb 로 확장해도 결국 메모리 터지는 문제 발생함
- 115 개 중 46번째에서 터짐
- 전체 blobs 가 너무 많아서(115개) 다 커버하기 위해서는 메모리를 아주 많이 확보하던가 코드 구조를 변경해야 함
for blob_name in blobs:
blob_content = gcs_hook.download(gcs_bucket_name, blob_name)
print(f"Blob {blob_name} content: {blob_content[:100]}")
dfs.append(pd.read_csv(io.BytesIO(blob_content), sep=","))
print(f"DataFrame shape: {dfs[-1].shape}")
merged_df = pd.concat(dfs, ignore_index=True)
print(merged_df.head())
print(f"Merged DataFrame shape: {merged_df.shape}")
with io.BytesIO() as byte_stream:
merged_df.to_csv(byte_stream)
byte_stream.seek(0)
print(f"Byte stream size: {len(byte_stream.getvalue())} bytes")
print(byte_stream.getvalue()[:100]) # 파일의 첫 100바이트 출력
api.upload_file(
path_or_fileobj=byte_stream,
path_in_repo="train.csv",
repo_id=f"{hf_username}/{dataset_repo_name}",
repo_type="dataset",
token=hf_token,
)
[2025-01-14, 10:10:51 UTC] {logging_mixin.py:188} INFO - Blob dataset__13fcecec-550c-4106-ba5c-6a744992b492_000000000044.csv content: b'station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_de'
[2025-01-14, 10:10:52 UTC] {logging_mixin.py:188} INFO - DataFrame shape: (994914, 31)
[2025-01-14, 10:11:05 UTC] {logging_mixin.py:188} INFO - Blob dataset__13fcecec-550c-4106-ba5c-6a744992b492_000000000045.csv content: b'station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_de'
[2025-01-14, 10:11:07 UTC] {logging_mixin.py:188} INFO - DataFrame shape: (994883, 31)
[2025-01-14, 10:11:18 UTC] {logging_mixin.py:188} INFO - Blob dataset__13fcecec-550c-4106-ba5c-6a744992b492_000000000046.csv content: b'station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_de'
[2025-01-14, 10:11:25 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code -9
[2025-01-14, 10:11:25 UTC] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2025-01-14, 10:11:32 UTC] {scheduler_job_runner.py:1754} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/bigquery_to_huggingface.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'bigquery_to_huggingface', 'Task Id': 'register_dataset_to_huggingface', 'Run Id': 'manual__2025-01-14T09:59:58.278108+00:00', 'Hostname': '3ef93533ae5d', 'External Executor Id': '28071ddb-a4e1-437d-8a4b-cb01567a53d5'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0xffff976fb400>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
실습용 코드 수정
- mlops-quicklab/airflow/basic/dags/bigquery_to_huggingface.py
- blobs이 5개 이상일 경우 5개만 처리 하도록 수정함
gcs_hook = GCSHook()
blobs = gcs_hook.list(gcs_bucket_name, prefix=gcs_object_prefix)
if not blobs:
raise ValueError(f"No files found in bucket {gcs_bucket_name} with prefix {gcs_object_prefix}")
# blobs 리스트에서 최대 5개만 처리
if len(blobs) > 5:
print(f"Found {len(blobs)} blobs. Processing the first 5 only.")
blobs = blobs[:5]
else:
print(f"Found {len(blobs)} blobs. Processing all.")
dfs = []
for blob_name in blobs:
blob_content = gcs_hook.download(gcs_bucket_name, blob_name)
print(f"Blob {blob_name} content: {blob_content[:100]}")
dfs.append(pd.read_csv(io.BytesIO(blob_content), sep=","))
print(f"DataFrame shape: {dfs[-1].shape}")
merged_df = pd.concat(dfs, ignore_index=True)
print(merged_df.head())
print(f"Merged DataFrame shape: {merged_df.shape}")
with io.BytesIO() as byte_stream:
merged_df.to_csv(byte_stream)
byte_stream.seek(0)
print(f"Byte stream size: {len(byte_stream.getvalue())} bytes")
print(byte_stream.getvalue()[:100]) # 파일의 첫 100바이트 출력
api.upload_file(
path_or_fileobj=byte_stream,
path_in_repo="train.csv",
repo_id=f"{hf_username}/{dataset_repo_name}",
repo_type="dataset",
token=hf_token,
)
Airflow 에 수정 code 반영 학인
task 정상 확인
log 메세지 정상 확인
- 파일 크기 : [2025-01-14, 11:08:10 UTC] {logging_mixin.py:188} INFO - Byte stream size: 1051994356 bytes
hugging face 에 train.csv 파일 업로드 확인(약 1.05 GB)
'AI > 아이펠_리서치' 카테고리의 다른 글
[논문리뷰] Masked Autoencoders Are Scalable Vision Learners (1) | 2025.02.14 |
---|---|
[논문리뷰] ComKD-CLIP: Comprehensive Knowledge Distillation for ContrastiveLanguage-Image Pre-traning Model (0) | 2025.02.11 |
Transformer 를 사용한 seq2seq 모델 실습 (4) | 2024.11.27 |
Transformer Decoder 구현 및 학습 (1) | 2024.11.26 |
Transformer Encoder 구현 및 학습 (2) | 2024.11.25 |