본문 바로가기
python/금융데이터분석

python 증권데이터 분석 - DB Updata 모듈만들기#2, 네이버증권 일별주식시세 DB로 업데이트하기

by orangecode 2022. 12. 25.
728x90
주식시세 매일 DB에 업데이트 하기

네이버 금융의 주식 시세와 BeautifulSouf, Pandas를 사용해서

Maria DB에 매일 자동으로 업데이트하는 DB Updater 모듈을 만들어보자.

 

지난 포스팅에서는 KRX 한국거래소에서 상장법인 목록을 가져와 DATABASE에 넣고 매일 1번씩 업데이트 되는 기능까지 구현해보았다.

 

 

python 증권데이터 분석 - DB Updata 모듈만들기#1, krx 종목주식시세 DB로 업데이트하기

주식시세 매일 DB에 업데이트 하기 네이버 금융의 주식 시세와 BeautifulSouf, Pandas를 사용해서 Maria DB에 매일 자동으로 업데이트하는 DB Updater 모듈을 만들어보자. DB Updater 클래스 구조 C 드라이버 하

kwonkai.tistory.com

 

 

이번 포스팅에서는 네이버 증권에서 주식별 일별 시세 정보를 가져와 DATABASE에 넣고 매일 1번씩 업데이트 되는 기능을 구현해보고자한다.

 

DB Updater 클래스 구조

C 드라이버 하부에 새로운 폴더(디렉터리)를 만들고 investar 패키지를 생성할 [DB구축] 디렉터리를 만들어줍니다.

1. DBUpdater 스텁(stub) code 작성하기

스텁코드란 인터페이스는 정의되어 있으나 실제 코드가 구현되어 있지 않은 상태의 코드

# DB STUB 코드

class DBUpdater:
    # 생성자 = MariaDB 연결 및 종목코드 딕셔너리 생성
    def __init__(self):
    
    # 소멸자 : MariaDB 연결 해제
    def __del__(self):

    # KRX로부터  상장법인 목록 파일을 읽어와 데이터 프레임 변환
    def read_krx_code(self):

    # 종목코드를 conpany_info 테이블에 업데이트 한 후 딕셔너리에 저장
    def update_comp_info(self):

    # 네이버금융에서 주식 시세를 읽어서 데이터프레임으로 변환
    def read_naver(self, code, company, pages_to_fetch):

    # 네이버 금융에서 읽어온 주식 시세를 DB에 REPLACE
    def replace_into_db(self, df, num, code, company):
    
    # KRX 상장 법인의 주식 시세를 네이버로 부터 읽어 DB에 업데이트
    def update_daily_price(self, pages_to_fetch):

    # 실행 즉시 매일 오후 5시에 daily_price 테이블 업데이트
    def execute_daily(self):

if __name == '__main__':
    dbu = DBUpdater()
    dbu.execute_daily()

 

네이버증권 시세 데이터 읽어오기

네이버 시세 페이지를 스크래핑하여 데이터를 읽어온 뒤, 시세 정보를 investar DATABASE의 daily_prise table 에 업데이트 해보자

 

try_except 처리

네이버 금융에서 웹 스크래핑 하는 코드를 그대로 이용하는데

BeautifulSouf로 전체 페이지 수를 구하고

Pandas의 read_html() 함수를 이용해서 첫 페이지 ~ 마지막페이지까지 순차적으로 주식 시세 데이터를 읽어온다.

 

Pandas의 read_html()함수는 HTTP Error 가 발생하면서 프로그램이 종료될 수 있으므로 try_except 구문을 이용해서 예외처리로 error를 피해준다.

 

 

python 증권데이터 분석 - 네이버 금융 웹 스크래핑(웹 크롤링)

네이버 금융 웹 스크래핑(웹 크롤링)하기 네이버 금융 홈페이지에서 삼성카드(029780) 검색해서 페이지를 들어가서 삼성카드 주식을 조회할 수 있다. 네이버 금융 일별 시세 분석하기 네이버 금융

kwonkai.tistory.com

 

 

네이버 증권시세 읽어오기 read_naver 코드

    def read_naver(self, code, company, pages_to_fetch):
        try:
            url = f'https://finance.naver.com/item/sise_day.nhn?code={code}'
            html = requests.get(url, headers={'User-agent': 'Mozila/5.0'})
            bs = BeautifulSoup(html, "lxml")
            pgrr = bs.find("td", class_="pgRR")
            if pgrr is None:
                return None
            
            s = str(pgrr.a["href"]).split('=')

            # 1. 네이버 금융 일별 시세의 마지막 페이지 탐색
            lastpage = s[-1]
            df = pd.DataFrame()

            # 2. 설정파일에 설정된 페이지수(pages_to_fetch)와 1의 페이지 수에서 작은 것을 선택
            pages = min(int(lastpage), pages_to_fetch)

            # 3. 일별 시세 페이지를 read_html()로 읽어 데이터 프레임에 추가
            for page in range(1, pages+1):
                url = '{}&page={}'.format(url, page)
                req = requests.get(url, headers={'User-agent': 'Mozila/5.0'})
                df = df.append(pd.read_html(req.text, header=0)[0])
                tmnow = datetime.now().strftime('&Y-%m-%d %H %M')
                print('[{}] {} ({}) : {:04d}/{:04d} pages arae downloading...'.format(tmnow, company, code, page, pages), end="\r")
                
                # 4. df dataframe의 columns 명 변경
                df = df.rename(columns = {'날짜':'date', '종가':'close', '전일비':'diff', '시가':'open', '고가':'high', '저가':'low', '거래량':'volume'})                
                # 5. 연,월,일 형식의 date 데이터를 연-월-일 형식으로 변경
                df['date'] = df['date'].replace('.', '-')
                # 6. 결측치 제거
                df = df.dropna()
                # 7. df 데이터프레임 value 값 str -> int로 변경
                df[['close', 'diff', 'open', 'high', 'low','volume']] = df[['close', 'diff', 'open', 'high', 'low','volume']].astype(int)

                # 8.시간, OHLC, DIFF, 거래량만 가져오기
                df = df[['open', 'high', 'low', 'close', 'diff', 'volume']]
        
        except Exception as e:
            print('Exception occured :', str(e))
            return None
        
        return df
1. 네이버 금융 일별 시세의 마지막 페이지 탐색
2. 설정파일에 설정된 페이지수(pages_to_fetch)와 1의 페이지 수에서 작은 것을 선택
 
3. 일별 시세 페이지를 read_html()로 읽어 데이터 프레임에 추가
4. df dataframe의 columns 명 변경
5. 연,월,일 형식의 date 데이터를 연-월-일 형식으로 변경

6. 결측치 제거

7. df 데이터프레임 value 값 str -> int로 변경
8.시간, OHLC, DIFF, 거래량만 가져오기
 
 
 
네이버 금융 일별 시세 데이터 DB에 저장하기

read_naver() method를 사용해서 읽어온 네이버 일별 시세를 DB에 저장하는 replace_into_db method를 이용하여 DB에 저장한다.

 

KRX 종목 데이터 처럼 to_sql() 함수를 이용해서 데이터를 하나하나 가져와 DB에 저장할 수 있지만

 

to_sql()을 사용하면 종목 별로 테이블 구성이 필요하고

to_sql()함수가 데이터 저장 시 전체적으로 교체하기 때문에효율적이지 못하다.

 

 

to_sql()함수를 이용하지 않고 한 테이블에 전체 종목의 시세 데이터를 기록하면 소스 코드를 간단히 작성이 가능하다.

 

네이버 일별 시세 DB저장 replace_into_db 코드

    # 네이버 금융에서 읽어온 주식 시세를 DB에 REPLACE
    def replace_into_db(self, df, num, code, company):
        with self.conn.consor() as curs:
            # 1. 인수로 넘겨받은 dataframe을 tuple로 순회처리한다.
            for r in df.itertuples():
                # 2. REPLACE INTO 구문으로 daily_price 테이블 업데이트
                # 값이 string 이면 '{}', int 이면 {}
                sql = f"REPLACE INTO daily_price VALUES"\
                    f"('{code}', '{r.date}', {r.open}, {r.high}, {r.low}, {r.close}" \
                    f"{r.diff}, {r.volume}"
                curs.execute(sql)
            
            # 3. commit() 함수로 maria DB에 반영한다.
            self.conn.commit()
            print('[{}] #{:04d} {} ({}) : {} rows > REPLCE INTO daily_price [OK]'\
                    .format(datetime.now().strftime('%Y-%m-%d %H:%M'), num+1, company, code, len(df)))
1. 인수로 넘겨받은 dataframe을 tuple로 순회처리한다.
2. REPLACE INTO 구문으로 daily_price 테이블 업데이트
3. commit() 함수로 maria DB에 반영한다.
 
 
 
 
 
전체 상장법인의 네이버 일별 시세 데이터를 읽어와서 DB에 업데이트 하기
    # KRX 상장 법인의 주식 시세를 네이버로 부터 읽어 DB에 업데이트
    def update_daily_price(self, pages_to_fetch):
        # 1. self.codes 딕셔너리에 저장된 종목코드에 대한 순회처리 및 numbering
        for idx, code in enumerate(self.codes):
            # 2. read_naver() method를 이용해 종목코드에 대한 일별 시세 데이터의 dataframe 구하기
            df = self.read_naver(code, self.codes[code], pages_to_fetch)
            
            # df가 None이라도 계속 진행
            if df is None:
                continue

            # 3. 일별 시세 데이터프레임이 구해지면 replace_into_db method로 DB저장
            self.replace_into_db(df, idx, code, self.codes[code])
 
 
1. self.codes 딕셔너리에 저장된 종목코드에 대한 순회처리 및 numbering
2. read_naver() method를 이용해 종목코드에 대한 일별 시세 데이터의 dataframe 구하기
3. 일별 시세 데이터프레임이 구해지면 replace_into_db method로 DB저장
 
 
 

 

 

 

JSON을 이용한 업데이트 페이지 수 설정하기

 

config.json 파일을 사용하면 DBUpdater가 처음 실행되었는지 여부를 체크할 수 있다.

 

config.json 파일이 없다면 DBUpdater 가 처음 실행되는 경우이다.

 

최초 업데이트 이후로 1페이지씩 가져오도록 config.json 파일이 자동으로 변경된다.

 

만약에 업데이트할 페이지 수를 변경하고 싶다면, config.json 파일의 pages_to_fetch 값을 변경하면 된다.

 

 

execute_daily() method는 DBUqdater.py 모듈의 시작 포인트이다.

타이머가 매일 오후 5시에 호출하게 설정했다.

 

 

매일 5시에 새로운 데이터를 업데이트 하는 execute_daily() 코드 

    def execute_daily(self):
        # 1. update_comp_info() method를 호출 해 상장법인 목록을 DB에 업데이트 한다.
        self.update_comp_info()

        # 2. DBUpdater.py가 있는 디렉터리에서 config.json 파일을 읽기모드로 열어준다.
        # 3. 파일이 있다면 page_to_fetch 값을 읽어서 사용한다.
        try:
            with open('config.json','r') as in_file:
                config = json.load(in_file)
                pages_to_fetch = config['pages_to_fetch']
        
        # 4. 파일이 없다면 config.json 파일을 생성해준다. 처음 생성 시 page_to_fetch 100, 이후 1로 설정
        except FileNotFoundError:
            with open('config.json', 'w') as out_file:
                pages_to_fetch = 100
                config = {'pages_to_fetch': 1}
                json.dump(config, out_file)

        # 5. pages_to_fetch 값으로 update_daily_price method를 호출한다.
        self.update_daily_price(pages_to_fetch)

        # 6. 이번달 마지막날(lastday)을 구해 다음날 오후 5시를 계산한다.
        tmnow = datetime.now()
        lastday = calendar.monthrange(tmnow.year, tmnow.month)[1]
        if tmnow.month == 12 and tmnow.day == lastday:
            tmnext = tmnow.replace(year=tmnow.year+1, month=1, day=1, hour=17, minute=0, second=0)
        elif tmnow.month == lastday:
            tmnext = tmnow.replace(month=tmnow.month+1, day=1, hour=17, minute=0, second=0)
        else:
            tmnext = tmnow.replace(day = tmnow.day+1, hour=17, minute=0, second=0)
        
        tmdiff = tmnext - tmnow
        secs = tmdiff.seconds

        # 7. 다음날 오후 5시에 excute_daily() method를 실행하는 타이머 객체를 설정한다.
        t = Timer(secs, self.execute_daily)
        print("Waiting for next update ({}) ...".format(tmnext.strftime('%Y-%m-%d %H:%M')))

        t.start()
1. update_comp_info() method를 호출 해 상장법인 목록을 DB에 업데이트 한다.
 
2. DBUpdater.py가 있는 디렉터리에서 config.json 파일을 읽기모드로 열어준다.
 
3. 파일이 있다면 page_to_fetch 값을 읽어서 사용한다.
4. 파일이 없다면 config.json 파일을 생성해준다. 처음 생성 시 page_to_fetch 100, 이후 1로 설정
 
5. pages_to_fetch 값으로 update_daily_price method를 호출한다.
 
6. 이번달 마지막날(lastday)을 구해 다음날 오후 5시를 계산한다.
 
7. 다음날 오후 5시에 excute_daily() method를 실행하는 타이머 객체를 설정한다.
 
 

 

 

 

 

 

전체 코드
import pandas as pd
from bs4 import BeautifulSoup
import pymysql, calendar, time, json
import requests
from datetime import datetime
from threading import Timer

class DBUpdater:
    # 생성자 = MariaDB 연결 및 종목코드 딕셔너리 생성
    def __init__(self):
        self.conn = pymysql.connect(host='localhost', port=3307, user='root',
                                    password='mariadb', db='INVESTAR', charset='utf8')
        
        with self.conn.cursor() as curs:
            sql = """
            CREATE TABLE IF NOT EXISTS company_info (
                code VARCHAR(20),
                company VARCHAR(40),
                last_update DATE,
                PRIMARY KEY(CODE))
            """
            curs.execute(sql)

            sql = """
            CREATE TABLE IF NOT EXISTS daily_price (
                code VARCHAR(20),
                date DATE,
                open BIGINT(20),
                high BIGINT(20),
                low BIGINT(20),
                close BIGINT(20),
                diff BIGINT(20),
                volume BIGINT(20),
                PRIMARY KEY (code, date))
            """
            curs.execute(sql)
        self.conn.commit()

        self.codes = dict()
        self.update_comp_info()
    
    # 소멸자 : MariaDB 연결 해제
    def __del__(self):
        self.conn.close()

    # KRX로부터  상장법인 목록 파일을 읽어와 데이터 프레임 변환
    def read_krx_code(self):
        # krx 상장법인목록 url 가져와서 read_html로 읽기
        url = 'https://kind.krx.co.kr/corpgeneral/corpList.do?method=download&searchType=13'
        krx = pd.read_html(url, header = 0)[0]

        # krx 상장법인목록 columns 중 종목코드, 회사명만 가져오기
        krx = krx[['종목코드', '회사명']]
        
        # krx 칼럼을 종목코드 -> code로, 회사명을 company로 변경
        krx = krx.rename(columns={'종목코드':'code', '회사명':'company'})

        # krx 종목코드 6자리에 빠진 0을 추가해준다.
        krx.code = krx.code.map('{:06d}'.format)

        return krx # krx를 반환

    # 종목코드를 conpany_info 테이블에 업데이트 한 후 딕셔너리에 저장
    # 오늘 날짜로 업데이트한 기록이 있다면 더이상 업데이트 하지 않음
    def update_comp_info(self):
        """종목코드를 company_info 테이블에 업데이트 한 후 딕셔너리에 저장"""
        sql = "SELECT * FROM company_info"
        df = pd.read_sql(sql, self.conn)
        for idx in range(len(df)):
            self.codes[df['code'].values[idx]] = df['company'].values[idx]
                    
        with self.conn.cursor() as curs:
            sql = "SELECT max(last_update) FROM company_info"
            curs.execute(sql)
            rs = curs.fetchone()
            today = datetime.today().strftime('%Y-%m-%d')
            
            if rs[0] == None or rs[0].strftime('%Y-%m-%d') < today:
                krx = self.read_krx_code()
                for idx in range(len(krx)):
                    code = krx.code.values[idx]
                    company = krx.company.values[idx]
                    with self.conn.cursor() as curs:                
                        sql = f"REPLACE INTO company_info (code, company, last"\
                        f"_update) VALUES ('{code}', '{company}', '{today}')"
                        curs.execute(sql)
                        self.codes[code] = company
                        tmnow = datetime.now().strftime('%Y-%m-%d %H:%M')
                        print(f"[{tmnow}] #{idx+1:04d} REPLACE INTO company_info "\
                            f"VALUES ({code}, {company}, {today})")
                    self.conn.commit()
                    print('')              


    # 네이버금융에서 주식 시세를 읽어서 데이터프레임으로 변환
    def read_naver(self, code, company, pages_to_fetch):
        try:
            url = f"http://finance.naver.com/item/sise_day.nhn?code={code}"
            html = BeautifulSoup(requests.get(url,
                headers={'User-agent': 'Mozilla/5.0'}).text, "lxml")
            pgrr = html.find("td", class_="pgRR")
            if pgrr is None:
                return None
            s = str(pgrr.a["href"]).split('=')
            lastpage = s[-1] 
            df = pd.DataFrame()
            pages = min(int(lastpage), pages_to_fetch)
            for page in range(1, pages + 1):
                pg_url = '{}&page={}'.format(url, page)
                df = df.append(pd.read_html(requests.get(pg_url,
                    headers={'User-agent': 'Mozilla/5.0'}).text)[0])                                          
                tmnow = datetime.now().strftime('%Y-%m-%d %H:%M')
                print('[{}] {} ({}) : {:04d}/{:04d} pages are downloading...'.
                    format(tmnow, company, code, page, pages), end="\r")
            df = df.rename(columns={'날짜':'date','종가':'close','전일비':'diff'
                ,'시가':'open','고가':'high','저가':'low','거래량':'volume'})
            df['date'] = df['date'].replace('.', '-')
            df = df.dropna()
            df[['close', 'diff', 'open', 'high', 'low', 'volume']] = df[['close',
                'diff', 'open', 'high', 'low', 'volume']].astype(int)
            df = df[['date', 'open', 'high', 'low', 'close', 'diff', 'volume']]
        
        except Exception as e:
            print('Exception occured :', str(e))
            return None
        
        return df






    # 네이버 금융에서 읽어온 주식 시세를 DB에 REPLACE
    def replace_into_db(self, df, num, code, company):
        with self.conn.cursor() as curs:
            # 1. 인수로 넘겨받은 dataframe을 tuple로 순회처리한다.
            for r in df.itertuples():
                # 2. REPLACE INTO 구문으로 daily_price 테이블 업데이트
                # 값이 string 이면 '{}', int 이면 {}
                sql = f"REPLACE INTO daily_price VALUES ('{code}', "\
                    f"'{r.date}', {r.open}, {r.high}, {r.low}, {r.close}, "\
                    f"{r.diff}, {r.volume})"
                curs.execute(sql)
            
            # 3. commit() 함수로 maria DB에 반영한다.
            self.conn.commit()
            print('[{}] #{:04d} {} ({}) : {} rows > REPLCE INTO daily_price [OK]'\
                    .format(datetime.now().strftime('%Y-%m-%d %H:%M'), num+1, company, code, len(df)))

    
    # KRX 상장 법인의 주식 시세를 네이버로 부터 읽어 DB에 업데이트
    def update_daily_price(self, pages_to_fetch):
        # 1. self.codes 딕셔너리에 저장된 종목코드에 대한 순회처리 및 numbering
        for idx, code in enumerate(self.codes):
            # 2. read_naver() method를 이용해 종목코드에 대한 일별 시세 데이터의 dataframe 구하기
            df = self.read_naver(code, self.codes[code], pages_to_fetch)
            
            # df가 None이라도 계속 진행
            if df is None:
                continue

            # 3. 일별 시세 데이터프레임이 구해지면 replace_into_db method로 DB저장
            self.replace_into_db(df, idx, code, self.codes[code])


    # 실행 즉시 매일 오후 5시에 daily_price 테이블 업데이트
    def execute_daily(self):
        # 1. update_comp_info() method를 호출 해 상장법인 목록을 DB에 업데이트 한다.
        self.update_comp_info()

        # 2. DBUpdater.py가 있는 디렉터리에서 config.json 파일을 읽기모드로 열어준다.
        # 3. 파일이 있다면 page_to_fetch 값을 읽어서 사용한다.
        try:
            with open('config.json','r') as in_file:
                config = json.load(in_file)
                pages_to_fetch = config['pages_to_fetch']
        
        # 4. 파일이 없다면 config.json 파일을 생성해준다. 처음 생성 시 page_to_fetch 100, 이후 1로 설정
        except FileNotFoundError:
            with open('config.json', 'w') as out_file:
                pages_to_fetch = 100
                config = {'pages_to_fetch': 1}
                json.dump(config, out_file)

        # 5. pages_to_fetch 값으로 update_daily_price method를 호출한다.
        self.update_daily_price(pages_to_fetch)

        # 6. 이번달 마지막날(lastday)을 구해 다음날 오후 5시를 계산한다.
        tmnow = datetime.now()
        lastday = calendar.monthrange(tmnow.year, tmnow.month)[1]
        if tmnow.month == 12 and tmnow.day == lastday:
            tmnext = tmnow.replace(year=tmnow.year+1, month=1, day=1, hour=17, minute=0, second=0)
        elif tmnow.month == lastday:
            tmnext = tmnow.replace(month=tmnow.month+1, day=1, hour=17, minute=0, second=0)
        else:
            tmnext = tmnow.replace(day = tmnow.day+1, hour=17, minute=0, second=0)
        
        tmdiff = tmnext - tmnow
        secs = tmdiff.seconds

        # 7. 다음날 오후 5시에 excute_daily() method를 실행하는 타이머 객체를 설정한다.
        t = Timer(secs, self.execute_daily)
        print("Waiting for next update ({}) ...".format(tmnext.strftime('%Y-%m-%d %H:%M')))

        t.start()
        





if __name__ == '__main__':
    dbu = DBUpdater()
    dbu.execute_daily()

 

 

 

 

 

참고도서

http://www.yes24.com/Product/Goods/90578506

 

파이썬 증권 데이터 분석 - YES24

투자 기법과 프로그래밍 기술로 자신만의 퀀트 투자 시스템을 완성하라『파이썬 증권 데이터 분석』은 웹 스크레이핑으로 증권 데이터를 주기적으로 자동 수집, 분석, 자동 매매, 예측하는 전

www.yes24.com

https://github.com/INVESTAR/StockAnalysisInPython

 

GitHub - INVESTAR/StockAnalysisInPython

Contribute to INVESTAR/StockAnalysisInPython development by creating an account on GitHub.

github.com

 

반응형

댓글