이슈

[Python] 1. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API)

Master potato 2024. 4. 7. 15:00

 

대신증권 API를 활용해서 해외 주식의 주가를 가져오는 코드를 작성 중이였는데 문제가 발생했다.

 

2800 사이즈의 데이터를 넣는 과정

 

너무 오래 걸린다..

 

한 종목당 2 ~ 5초 정도는 걸리는 듯하다. 한 종목당 2800개의 데이터를 가져오는데, 종목 전체가 11,464개이다.

위 이미지에 보이는 4000개 정도 넣은 것도 귀찮아서 반나절은 기다려서 받은 것이다.

 

문제가 되는 코드를 보자.

    # 모든 종목코드 순회
    for idx, stock_info in enumerate(all_stock_code):
        logging.info(f"{idx} / {len(all_stock_code)} .... {round((idx/len(all_stock_code)) * 100, 2)}%")
        
        # 특정 종목코드의 금액 정보 가져오기
        maria_connection.clean_bind()
        maria_connection.add_bind(stock_info['stock_code'])
        curr_price_days_diff = maria_connection.select(StockPrice.SELECT_CURR_PRICE_DAYS_DIFF) # 테이블에 있는 데이터와 차이나는 날짜
        days_diff = curr_price_days_diff[0].get('DAYS_DIFF', 10000)
        data_size = days_diff-1

        # 어제 데이터까지 존재한다면 1
        if days_diff > 1:
            stock_price_list = stock_price_fetcher.get_stock_price(stock_code=stock_info['stock_code'], data_size=data_size)
            bind_stock_list = []

            logging.info(f"종목: {stock_info['stock_name']}, 넣을 데이터양: {len(stock_price_list)}")
            for stock_price in stock_price_list:
                bind_stock = (
                    stock_info['stock_code'],   # 종목 코드
                    stock_price['date'],        # 날짜(YYYYMMDD)
                    stock_price['open'],        # 시가
                    stock_price['high'],        # 고가
                    stock_price['low'],         # 저가
                    stock_price['close'],       # 종가
                    stock_price['volume']       # 거래량
                )
                bind_stock_list.append(bind_stock)
            maria_connection.clean_bind()
            maria_connection.add_bind(bind_stock_list)
            maria_connection.insert_many(StockPrice.INSERT_STOCK_PRICE)
        else:
            logging.info(f"종목: {stock_info['stock_name']}  [PASS]")
    maria_connection.close()

 

모든 종목코드의 주식 데이터를 넣기 위해 모든 종목에 대해서 순회를 돌며 데이터를 삽입한다.

커넥션 하나로 11,464(종목) X 2800(주가) 만큼의 데이터를 넣으려고 하니 시간이 오래 걸리는 것 같다.

 

내가 생각한 개선 방법으로는

1. async(비동기) 처리

2. 멀티스레딩 처리

 

위 두 가지 방법을 생각했는데, 공통적으로 결국 커넥션 풀이 필요하다.

 

[커넥션 풀 코드]

# Connection pool

import queue
import threading
import mysql.connector

from common.database.MySQL import MySQL


class MySQLPool:
    def __init__(self):
        self.acquire_lock = threading.Lock()
        self.release_lock = threading.Lock()
        self.connection_pool = queue.Queue()

    def connect(self, host: str, user: str, passwd: str, database: str,  max_count: int = 10, port: int = 3306) -> None:
        for _ in range(max_count):
            self.connection_pool.put(
                mysql.connector.connect(
                    host=host,
                    port=port,
                    user=user,
                    passwd=passwd,
                    database=database
                )
            )

    def close(self):
        for _ in range(self.connection_pool.qsize()):
            connection: MySQL = self.connection_pool.get()
            connection.close()

    def get_connection(self):
        with self.acquire_lock:
            try:
                # 모두 꺼낸 경우 무한대기 상태 방지
                connection = self.connection_pool.get(block=False)
            except queue.Empty:
                connection = None
        return connection

    def put_connection(self, connection: MySQL) -> None:
        with self.release_lock:
            self.connection_pool.put(connection)

    def get_size(self) -> int:
        return self.connection_pool.qsize()

 

[테스트 코드]

# Client code

from common.database.MySQLPool import MySQLPool


if __name__ == '__main__':
    maria_pool = MySQLPool()
    maria_pool.connect(
        host='192.168.35.175',
        user='root',
        passwd='1234',
        database='daesin',
        max_count=10
    )
    # 최초 생성시 풀 사이즈 체크(10)
    pool_size = maria_pool.get_size()
    print(f"pool_size: {pool_size}")

    # 커넥션 획득 후 mysql.connector 오브젝트 확인
    connection = maria_pool.get_connection()
    print(connection.__class__)

    # 하나의 커넥션을 꺼낸 후 풀 사이즈 체크(9)
    pool_size = maria_pool.get_size()
    print(f"pool_size: {pool_size}")

    # 커넥션 반환 후 풀 사이즈 체크(10)
    maria_pool.put_connection(connection)

    pool_size = maria_pool.get_size()
    print(f"pool_size: {pool_size}")

    maria_pool.close()

 

<실행 결과>

 

위와 같이 간단하게 커넥션 풀을 만들어보았다.

 

이제 하루종일 걸리는 데이터 삽입 처리를 개선해보자.