이슈

[Python] 2. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API)

Master potato 2024. 4. 12. 15:47

 

 

2024.04.07 - [Python] - [Python] 1. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API)

 

[Python] 1. 멀티스레딩으로 MariaDB 병렬 삽입(with 대신증권 API)

대신증권 API를 활용해서 해외 주식의 주가를 가져오는 코드를 작성 중이였는데 문제가 발생했다. 너무 오래 걸린다.. 한 종목당 2 ~ 5초 정도는 걸리는 듯하다. 한 종목당 2800개의 데이터를 가져

potatoparadox.tistory.com

 

전에 대신증권 API를 통해서 받은 주식 데이터를 MariaDB에 넣으려고 했으나 오래걸리는 이슈가 있었다.

그래서 병렬작업으로 데이터를 밀어넣기 위해 커넥션 풀을 만들어줬다.

 

 

그 후 작업으로는 생성한 커넥션 풀을 가지고 병렬작업을 해주어야 하는데

방법은

1. 멀티스레딩

2. 비동기 처리

이정도로 생각해볼 수 있다.

 

 

일단 나는 스레딩 처리가 익숙해서 멀티스레딩 처리로 코드를 수정했고, 코드를 돌려보니 에러가 발생했다.

대신증권 API 에러

 

아..

 

찾아보니 누군가 문의한 글이 있다.

문의글 답변

출처: https://money2.creontrade.com/e5/mboard/ptype_basic/Basic_018/DW_Basic_Read_Page.aspx?boardseq=60&seq=26737&page=1&searchString=&p=8829&v=8637&m=9505

 

클레온플러스Q&A - 크레온

전체 Re : InteliClient 생성 실패 800401f0 문의 작성일 2024-01-24 오후 4:22:07 조회수 38 파이썬 운영 관** 오류 메시지 Cputil.Cybos의 GetLimitRemainCount 사용중 CPUTIL오류 팝업창이 아래 메시지로 나타남. InteliCli

money2.creontrade.com

 

API 함수 호출을 스레딩으로는 지원하진 않는다고 한다.

 

 

내가 하려는 방식은

종목 코드(API, 11,000) -> 코드에 대한 주가 데이터(API, 2800) -> 데이터 삽입(Maria)

위와 같은 순서이기 때문에 가져온 주가 데이터만 스레딩 처리한다면 문제는 없겠지만...

 

 

음... 그래도 2800건에 대한 데이터를 쪼개서 처리하기보단 11,000건에 대해서 쪼개는 게 훨씬 효과적이지 않을까?

 

 

일단 다른 방안으로는 비동기처리하는 방법이 있으니 비동기 방법으로도 해봤다.

227.7 / 50(batch_size) = 4.5

=> 한 종목당 4.5초

??

 

API에서 데이터를 가져오는데 시간은 크게 걸리지 않는데 RDB에 데이터를 넣는데 오래 걸린다.

동시성 코드에서 I/O 바운드 작업이 일어날 땐 블럭킹이 되어 의도한대로 동작하지 못하는 경우가 있다.

이런 경우 run_in_executor() 메서드를 통해서 개선할 수 있다.

 

35.2 / 50(batch_size) = 0.7

 

from DaesinAPI.StockCodeFetcher import StockCodeFetcher
from DaesinAPI.StockPriceFetcher import StockPriceFetcher

from common.database.MySQL import MySQL
from common.database.MySQLPool import MySQLPool
from resources.queries import StockPrice

import asyncio
import logging, traceback
import time


class InsertDaesinStockPrice:
    def __init__(self):
        self.maria_pool = MySQLPool()
        # 종목가격 인스턴스 생성
        self.stock_price_fetcher = StockPriceFetcher()

    def maria_pool_connect(self):
        self.maria_pool.connect(
            host='192.168.35.175',
            user='root',
            passwd='1234',
            database='daesin',
            max_count=50
        )

    def maria_pool_close(self):
        self.maria_pool.close()

    async def process_stock_price(self, stock_code):
        def insert_stock_price():
            maria_connection.clean_bind()
            maria_connection.add_bind(bind_stock_list)
            maria_connection.insert_many(StockPrice.INSERT_STOCK_PRICE)
        try:
            # 특정 종목코드의 금액 정보 가져오기
            maria_connection: MySQL = self.maria_pool.get_connection()
            try:
                maria_connection.clean_bind()
                maria_connection.add_bind(stock_code)
                curr_price_days_diff = maria_connection.select(StockPrice.SELECT_CURR_PRICE_DAYS_DIFF) # 테이블에 있는 데이터와 차이나는 날짜
                days_diff = curr_price_days_diff[0].get('DAYS_DIFF', 10000)
                data_size = days_diff-1

                # 어제 데이터까지 존재한다면 1
                if days_diff > 1:
                    stock_price_list = self.stock_price_fetcher.get_stock_price(stock_code=stock_code,
                                                                           data_size=data_size)
                    bind_stock_list = []

                    logging.info(f"종목코드: {stock_code}, 넣을 데이터양: {len(stock_price_list)}")
                    for stock_price in stock_price_list:
                        bind_stock = (
                            stock_code,  # 종목 코드
                            stock_price['date'],  # 날짜(YYYYMMDD)
                            stock_price['open'],  # 시가
                            stock_price['high'],  # 고가
                            stock_price['low'],  # 저가
                            stock_price['close'],  # 종가
                            stock_price['volume']  # 거래량
                        )
                        bind_stock_list.append(bind_stock)
                    maria_connection.clean_bind()
                    maria_connection.add_bind(bind_stock_list)
                    await asyncio.get_running_loop().run_in_executor(None, insert_stock_price)
                    # maria_connection.insert_many(StockPrice.INSERT_STOCK_PRICE)
                else:
                    logging.info(f"종목코드: {stock_code}  [PASS]")
            except Exception as error_message:
                maria_connection.rollback()
                logging.info(traceback.print_exc())
                # log.info(error_message)
            finally:
                self.maria_pool.put_connection(maria_connection)
        except Exception as e:
            logging.error(e)

    async def process_stock_batch(self, batch_data):
        # 사이즈만큼의 데이터를 한 번에 동작
        tasks = []
        for stock_info in batch_data:
            task = asyncio.create_task(self.process_stock_price(stock_info['stock_code']))
            tasks.append(task)
        await asyncio.gather(*tasks)

    async def insert_all_stock_price(self):
        # 마리아 커넥션 풀 생성
        self.maria_pool_connect()

        # 종목코드 인스턴스 생성
        stock_code_fetcher = StockCodeFetcher()

        # 종목코드 모두 가져오기
        all_stock_code = stock_code_fetcher.select_code_all()

        # 50개씩 동작
        batch_size = 50
        for idx in range(0, len(all_stock_code), batch_size):
            start_time = time.time()
            logging.info(f"{idx} / {len(all_stock_code)} .... {round((idx/len(all_stock_code)) * 100, 2)}%")
            batch_data = all_stock_code[idx:idx+batch_size]
            await self.process_stock_batch(batch_data)
            logging.info(f"It took {round(time.time() - start_time, 1)} seconds")

        # 마리아 커넥션 풀 해제
        self.maria_pool_close()


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s [%(filename)s:%(lineno)d]')

    insert_daesin_stock_price = InsertDaesinStockPrice()
    asyncio.run(insert_daesin_stock_price.insert_all_stock_price())