카테고리 없음

Trafilatura - requsets - 병렬처리로 시간 단축하기

catalystmind 2025. 6. 7. 21:20
728x90

TL;DR

  • Worker 5개 병렬 처리로 웹 스크래핑 시간을 5분→2분(60% 단축), 처리량 2.5배 향상 달성
  • 이론상 5배 개선 대비 실제 2.5배는 50% 효율이나, 웹 스크래핑 분야에서 준수한 수준
  • 병목 원인은 네트워크 I/O, 서버 rate limiting, 시스템 오버헤드 등으로 예상됨
  • 실용적 가치로 일일 처리량 2.5배 증가, 개발 시간 60% 절약, 서비스 응답성 60% 향상 효과 예상

requsets의 병렬화 필요성

trafilatura와 requests를 활용하여 500개의 기사를 순차적으로 처리할 경우, 기사 하나당 평균 처리 시간은 0.6초로 전체 처리시간은 약 5분이 소요되었다. 분당 100개의 웹사이트 처리하는 것이 느리지는 않으나, 시간을 더 단축해보기로 했다. request도 I/O 바운드 작업이기에 requests + threadpoolexecutor 조합으로 병렬화를 시도했다. (병렬화에 대한 내용은 이전 글을 참고)

 

 

 

Googlenewsdecoder - 병렬처리로 시간 단축하기

TL;DR GoogleNewsDecoder는 I/O 바운드 작업으로 병렬 처리에 적합한 특성을 가짐 Python의 concurrent.futures 라이브러리를 활용해 ThreadPoolExecutor 기반의 병렬 처리 시스템 구현 submit()과 as_completed() 메서드를

catalystmind.tistory.com


requsets와 ThreadPoolExecutor의 통합

requsets는  URL 요청으로 이루어진 I/O 바운드 작업이므로 멀티스레드를 이용한 병렬화 효과가 뛰어나다. 병렬처리를 위한 파이썬 코드는 이전 코드를 조합하여 작성하였다.(전체 파이썬 스크립트 및 사용법은 아래를 참고)

📄 request Batch Processor (w/ Trafilatura) 🔽 펼치기

import pandas as pd
import trafilatura
from trafilatura.metadata import extract_metadata
import requests
import time
from datetime import datetime, timedelta
import csv
import argparse
import concurrent.futures
from typing import List, Optional
import os


def guess_best_decode(data: bytes, encodings: List[str]) -> str:
    """Pick the decoding that yields the most Hangul characters."""
    best_text: Optional[str] = None
    best_score = -1
    for enc in encodings:
        if not enc:
            continue
        try:
            text = data.decode(enc, errors="replace")
        except LookupError:
            continue
        # Score by Hangul character count
        score = sum(0xAC00 <= ord(ch) <= 0xD7A3 for ch in text)
        if score > best_score:
            best_text, best_score = text, score
        # Early exit if score is very high (heuristic)
        if score > 10:
            break
    if best_text is None:
        best_text = data.decode("utf-8", errors="replace")
    return best_text

def fetch_html_with_requests(url: str, timeout: int = 10) -> dict:
    """
    requests로 HTML만 다운로드하는 함수 (본문 추출은 별도로 수행)
    guess_best_decode를 활용한 강력한 인코딩 자동 보정
    """
    result = {
        'url': url,
        'html': '',
        'success': False,
        'error': None,
        'method_used': 'requests_only'
    }

    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
        }
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()

        # ✅ guess_best_decode를 통해 인코딩 복원
        candidates = [
            response.encoding,
            response.apparent_encoding,
            'utf-8',
            'euc-kr',
            'cp949'
        ]
        result['html'] = guess_best_decode(response.content, candidates)
        result['success'] = True

    except Exception as e:
        result['error'] = str(e)

    return result

def process_single_url(url, index=None):
    """단일 URL 처리 함수 - 병렬처리에 최적화"""
    start_time = time.time()
    result = {
        "index": index,
        "url": url,
        "status": "failed",
        "title": None,
        "date": None,
        "content": None,
        "processing_time": 0,
        "error_message": None,
    }
    
    try:
        # HTML 다운로드
        html_result = fetch_html_with_requests(url)
        
        if html_result['success']:
            html_content = html_result['html']
            
            # trafilatura로 메타데이터 추출
            metadata = trafilatura.extract_metadata(html_content)
            
            # trafilatura로 본문 추출
            text = trafilatura.extract(
                html_content,
                output_format="txt",
                include_comments=False,
                favor_precision=True,
            )
            
            if text and len(text.strip()) > 0:
                result.update({
                    "status": "success",
                    "title": metadata.title if metadata and metadata.title else "No title",
                    "date": str(metadata.date) if metadata and metadata.date else "No date",
                    "content": text[:200] + "..." if len(text) > 200 else text,
                })
            else:
                result["error_message"] = "Empty content extracted"
        else:
            result["error_message"] = f"Failed to download page: {html_result['error']}"
            
    except Exception as e:
        result["error_message"] = str(e)
    
    # 처리 시간 계산
    processing_time = time.time() - start_time
    result["processing_time"] = processing_time
    
    return result


def save_results_to_csv(results, output_file=None):
    """결과를 CSV 파일로 저장"""
    # 파일명이 지정되지 않으면 현재 시간을 포함한 파일명 생성
    if output_file is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f"url_processing_results_{timestamp}.csv"

    try:
        with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
            fieldnames = [
                "index",
                "url",
                "status",
                "title",
                "date",
                "content",
                "processing_time",
                "error_message",
            ]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

            writer.writeheader()
            for result in results:
                writer.writerow(result)

        print(f"✅ 결과가 '{output_file}' 파일에 저장되었습니다.")
        return output_file  # 실제 저장된 파일명 반환

    except Exception as e:
        print(f"❌ 결과 저장 실패: {e}")
        return None


def print_summary(results, total_time, skipped_count=0, workers_used=1):
    """처리 결과 요약 출력"""
    total_urls = len(results)
    successful_count = sum(1 for r in results if r["status"] == "success")
    failed_count = total_urls - successful_count
    processing_times = [r["processing_time"] for r in results]

    # 시간 포맷팅 함수
    def format_time(seconds):
        if seconds < 60:
            return f"{seconds:.2f} seconds"
        else:
            return str(timedelta(seconds=round(seconds)))

    print("\n" + "=" * 7 + " SUMMARY " + "=" * 7)
    print(f"Total URLs processed: {total_urls}")
    print(f"Workers used: {workers_used} (Parallel processing: {'Enabled' if workers_used > 1 else 'Disabled'})")
    print(f"Successfully decoded: {successful_count} ({successful_count/total_urls*100:.1f}%)")
    print(f"Failed to decode: {failed_count} ({failed_count/total_urls*100:.1f}%)")
    print(f"Skipped (Google News URLs): {skipped_count} ({skipped_count/(total_urls + skipped_count)*100:.1f}%)")

    if processing_times:
        avg_time = sum(processing_times) / len(processing_times)

        print("\n" + "-" * 5 + " TIMING INFORMATION " + "-" * 5)
        print(f"Total processing time: {format_time(total_time)}")
        print(f"Average processing time per URL: {format_time(avg_time)}")
        print(f"Fastest URL processing time: {format_time(min(processing_times))}")
        print(f"Slowest URL processing time: {format_time(max(processing_times))}")

    print("\nProcess completed successfully. Results saved to CSV file.")


def process_urls_from_csv(csv_file_path, url_column="decoded_url", output_dir=None, 
                         batch_size=10, workers=1):
    """CSV 파일에서 URL들을 읽어서 병렬 처리"""

    print("=" * 50)
    print("🚀 URL 배치 처리 시작")
    print("=" * 50)

    # CSV 파일 읽기
    try:
        df = pd.read_csv(csv_file_path)
        print(f"📋 CSV 파일 컬럼들: {list(df.columns)}")
        print(f"📋 총 행 수: {len(df)}")

        if url_column not in df.columns:
            raise ValueError(f"Column '{url_column}' not found in CSV file")

        print(f"📋 '{url_column}' 컬럼의 NULL이 아닌 값 개수: {df[url_column].notna().sum()}")

        all_urls = df[url_column].dropna().tolist()
        print(f"📋 첫 번째 URL 샘플: {all_urls[0] if all_urls else 'None'}")

        # news.google.com이 포함되지 않은 URL만 필터링
        urls = [url for url in all_urls if "news.google.com" not in str(url)]

        total_urls = len(urls)
        skipped_urls = len(all_urls) - total_urls

        print(f"📊 전체 URL: {len(all_urls)}개")
        print(f"📊 처리 대상 URL (decoded URLs): {total_urls}개")
        print(f"📊 건너뛴 URL (Google News URLs): {skipped_urls}개")
        print(f"📊 배치 크기: {batch_size}")
        print(f"📊 작업자 수: {workers} (병렬 처리: {'활성화' if workers > 1 else '비활성화'})")

        if urls:
            print(f"📋 첫 번째 디코딩된 URL 샘플: {urls[0]}")

        print("-" * 30)

    except Exception as e:
        print(f"❌ CSV 파일 읽기 실패: {e}")
        return None, None

    # 출력 파일 설정
    if output_dir is None:
        output_dir = os.path.dirname(csv_file_path) or '.'
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    input_filename = os.path.basename(csv_file_path).split('.')[0]
    output_filename = f"{input_filename}_processed_{timestamp}.csv"
    output_path = os.path.join(output_dir, output_filename)
    
    print(f"📁 결과 저장 경로: {output_path}")

    total_start_time = time.time()
    results = []
    completed = 0

    # 병렬 처리 사용 여부에 따라 처리 방식 결정
    if workers > 1:
        print(f"🔄 병렬 처리 모드로 실행 중...")
        
        # URL 배치로 나누기 (한 번에 batch_size만큼 병렬 처리)
        for batch_start in range(0, total_urls, batch_size):
            batch_end = min(batch_start + batch_size, total_urls)
            batch_urls = urls[batch_start:batch_end]
            
            print(f"처리 중: URLs {batch_start+1}-{batch_end} of {total_urls}...")
            
            # 병렬 처리 실행
            with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
                # URL을 병렬로 처리 (인덱스도 함께 전달)
                future_to_data = {
                    executor.submit(process_single_url, url, batch_start + i): (url, batch_start + i) 
                    for i, url in enumerate(batch_urls)
                }
                
                # 결과 수집
                for future in concurrent.futures.as_completed(future_to_data):
                    result = future.result()
                    results.append(result)
                    completed += 1
                    
                    # 진행 상황 표시 (10개마다 또는 배치 완료 시)
                    if completed % 10 == 0 or completed == total_urls:
                        successful_in_batch = sum(1 for r in results[-len(batch_urls):] if r["status"] == "success")
                        print(f"진행: {completed}/{total_urls} ({completed/total_urls*100:.1f}%) "
                              f"배치 성공: {successful_in_batch}/{len(batch_urls)}")
            
            # 현재까지의 결과를 CSV 파일로 저장
            save_results_to_csv(results, output_path)
            print(f"💾 {len(results)}개 결과 저장 완료")
            
    else:
        print(f"🔄 순차 처리 모드로 실행 중...")
        
        # 직렬 처리 사용 (기존 방식)
        for i, url in enumerate(urls):
            if i % 10 == 0 or i == total_urls - 1:
                print(f"처리 중: URL {i+1}/{total_urls}...")
            
            # 단일 URL 처리
            result = process_single_url(url, i)
            results.append(result)
            completed += 1
            
            # batch_size마다 중간 결과 저장
            if (i + 1) % batch_size == 0 or i == total_urls - 1:
                save_results_to_csv(results, output_path)
                successful_count = sum(1 for r in results if r["status"] == "success")
                print(f"진행: {completed}/{total_urls} ({completed/total_urls*100:.1f}%) "
                      f"성공: {successful_count}, 실패: {completed-successful_count}")

    total_processing_time = time.time() - total_start_time
    print_summary(results, total_processing_time, skipped_urls, workers)

    return results, output_path


def main():
    # 명령줄 인자 파서 설정
    parser = argparse.ArgumentParser(description='URL content extractor with parallel processing')
    parser.add_argument('csv_file_path', nargs='?', 
                       help='Path to CSV file containing URLs to process')
    parser.add_argument('--url_column', '-c', default='decoded_url',
                       help='Name of the column containing URLs (default: decoded_url)')
    parser.add_argument('--output_dir', '-o', 
                       help='Output directory for results (default: same as input file)')
    parser.add_argument('--batch_size', '-b', type=int, default=10,
                       help='Batch size for saving interim results (default: 10)')
    parser.add_argument('--workers', '-w', type=int, default=1,
                       help='Number of worker threads for parallel processing (default: 1)')
    
    args = parser.parse_args()
    
    # 기본 파일 경로 (명령줄에서 제공되지 않은 경우)
    if args.csv_file_path is None:
        csv_file_path = r"C:\Users\yhsur\Downloads\특징주\sample_data\Combined_sample_data_500_decoded_2025-05-20_224740.csv"
        print(f"⚠️  기본 파일 경로 사용: {csv_file_path}")
    else:
        csv_file_path = args.csv_file_path
    
    # 설정 정보 출력
    print(f"📁 입력 파일: {csv_file_path}")
    print(f"📋 URL 컬럼명: {args.url_column}")
    print(f"📁 출력 디렉토리: {args.output_dir or '입력 파일과 동일'}")
    print(f"📦 배치 크기: {args.batch_size}")
    print(f"👥 작업자 수: {args.workers}")
    
    # 함수 실행
    results, saved_file = process_urls_from_csv(
        csv_file_path, 
        url_column=args.url_column,
        output_dir=args.output_dir,
        batch_size=args.batch_size,
        workers=args.workers
    )

    if results:
        print(f"\n📁 저장된 파일: {saved_file}")

        print("\n🔍 처리 결과 샘플:")
        for i, result in enumerate(results[:3]):
            print(f"\n[{i+1}] {result['url'][:50]}...")
            print(f"    상태: {result['status']}")
            print(f"    제목: {result['title']}")
            print(f"    처리시간: {result['processing_time']:.2f}초")
            if result["status"] == "failed":
                print(f"    오류: {result['error_message']}")
        
        print(f"\n✅ 처리 완료! 총 {len(results)}개 URL 처리됨")
    else:
        print(f"\n❌ 처리 실패. 위의 오류 메시지를 확인하세요.")


if __name__ == "__main__":
    main()
python script.py input.csv --workers 5 --batch_size 20 --url_column "link" --output_dir "./results"
input.csv: 처리할 CSV 파일 경로 (필수 파라미터)
--workers 5: 병렬 처리 스레드 수 (기본값: 1, 추천: CPU 코어 수의 1-2배)
--batch_size 20: 중간 저장 단위 (기본값: 10, 대용량 처리 시 50-100 권장)
--url_column "link": URL이 들어있는 컬럼명 (기본값: "decoded_url")
--output_dir "./results": 결과 파일 저장 디렉토리 (기본값: 입력 파일과 동일한 폴더)

 

🖨️ 실행결과

📊 성능 평가 및 결과 해석

병렬 처리를 적용한 결과, 개당 평균 처리 속도는 0.61 → 0.79초로 증가하였으나, 병렬 처리 효과로 인해 전체 처리 시간은 60% 감소하였다.

성능 지표 순차 처리
(Sequential)
병렬 처리
(Parallel)
비교 결과
총 처리 시간
(500개 기사 수집)
5분 05초 1분 59초 ⏱ 60% 감소
평균 개별
처리 시간
0.61초 0.79초 🐢 30% 감소
전체 처리
효율성
낮음
(순차 대기)
높음
(동시 처리)
2.5배 향상
성공률 81.4% 81.2% 동등 수준
사용된
작업자 수
1개 5개 병렬 처리 활용

 

스레드 수를 5개로 늘렸지만 처리 시간이 정확히 1/5로 줄지는 않았는데, 그 이유는 다음과 같은 요인들 때문일 것으로 추정:

  • 네트워크 지연: 전체 작업 시간 중 대부분이 요청/응답 대기일 수 있음
  • 서버 반응 속도 제한: 일부 서버는 동시 요청을 제한하거나 봇으로 인식해 지연 응답할 가능성
  • 스레드 오버헤드: 스레드 관리 및 메모리 할당 등의 부하가 효율을 일부 상쇄했을 수 있음

※ 실제 병목 원인은 작업 대상의 구조와 환경에 따라 달라질 수 있으며, 위 내용은 일반적인 웹 크롤링 병렬 처리에서 자주 관찰되는 패턴임


마무리

웹 크롤링 작업에서 request와 threadpoolexecutor를 활용해 실행 시간을 5분에서 2분으로 단축하는 데 성공했다. 간단한 병렬화 코드만 추가하여 60% 이상의 시간 절약, 그리고 2.5배 처리 속도 향상이라는 결과를 얻었을 수 있었다. 다음 글에는 requests에서 html 수집에 실패한 동적 웹사이트에서 정보를 가져오는 방법을 다룰 예정이다. 주로 자바스크립트를 이용한 동적 웹사이트에서 html을 수집하기 위해서 playwright와 같은 브라우저 자동화를 적용할 예정이다.

728x90