2012年10月16日 星期二

Source: Statistics of Securities Market

剛爬完所有 2000-10 到 2012-09 的資料,資料筆數沒有很多。


塞完資料庫,當然就玩點有趣的東西:

select
    activity_date,
    average_taiex,
    pbr
from
(
    select *, max(report_date) from MarketStatistics where report_type = 'monthly'
    group by activity_date
)
order by activity_date

PBR 跟股價指數有強烈的正相關 ((這是廢話))
殖利率跟股價指數有些的負相關 ((這是廢話))



SQLite schema:

create table if not exists MarketStatistics
(
    creation_dt datetime default current_timestamp,
    report_date datetime not null,
    activity_date datetime not null,
    report_type text not null,
    total_trading_value real,
    listed_co_number real,
    capital_issued real,
    total_listed_shares real,
    market_capitalization real,
    trading_volume real,
    trading_value real,
    trans_number real,
    average_taiex real,
    volume_turnover_rate real,
    per real,
    dividend_yield real,
    pbr real,
    trading_days int,
    unique (report_date, activity_date, report_type) on conflict ignore
);



Python source code:

# coding: big5

import csv
import logging
import os
import shutil
import sqlite3
import xlrd

from datetime import date
from datetime import datetime

from ..common import logger

class Sourcing():

    def __init__(self):
        self.LOGGER = logging.getLogger()
        self.URL_TEMPLATE = '''http://www.twse.com.tw/ch/inc/download.php?l1=Securities+Trading+Monthly+Statistics&l2=Statistics+of+Securities+Market&url=/ch/statistics/download/02/001/%s_C02001.zip'''
        self.DATES = []
        self.ZIP_DIR = '''./dataset/market_statistics/zip/'''
        self.XLS_DIR = '''./dataset/market_statistics/xls/'''
        self.CSV_DIR = '''./dataset/market_statistics/csv/'''
        self.DB_FILE = './db/stocktotal.db'
        self.SQL_INSERT = '''insert or ignore into MarketStatistics(
                report_date,
                activity_date,
                report_type,
                total_trading_value,
                listed_co_number,
                capital_issued,
                total_listed_shares,
                market_capitalization,
                trading_volume,
                trading_value,
                trans_number,
                average_taiex,
                volume_turnover_rate,
                per,
                dividend_yield,
                pbr,
                trading_days
            ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''

    def source(self, begin_date, end_date):
        self.init_dates(begin_date, end_date)
        self.source_url_to_zip(self.ZIP_DIR)
        self.source_zip_to_xls(self.ZIP_DIR, self.XLS_DIR)
        self.source_xls_to_csv(self.XLS_DIR, self.CSV_DIR)
        self.source_csv_to_sqlite(self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)

    def init_dates(self, begin_date, end_date):
        begin = datetime.strptime(begin_date, '%Y-%m-%d')
        end = datetime.strptime(end_date, '%Y-%m-%d')
        monthly_begin = 12 * begin.year + begin.month - 1
        monthly_end = 12 * end.year + end.month
        for monthly in range(monthly_begin, monthly_end):
            year, month = divmod(monthly, 12)
            self.DATES.append(date(year, month + 1, 1))
         
    def source_url_to_zip(self, dest_dir):
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for date in self.DATES:
            url = self.URL_TEMPLATE % date.strftime('%Y%m')
            dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.zip')
            self.__wget(url, dest_file)

    def source_zip_to_xls(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for date in self.DATES:
            src_file = os.path.join(src_dir, date.strftime('%Y-%m') + '.zip')
            dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.xls')
            self.source_zip_to_xls_single(src_file, dest_dir, dest_file)
 
    def source_zip_to_xls_single(self, src_file, dest_dir, dest_file):
        assert os.path.isfile(src_file)
        assert os.path.isdir(dest_dir)

        sevenzip_output_dir = os.path.join(dest_dir, 'sevenzip_output_dir')
        self.__sevenzip_extract(src_file, sevenzip_output_dir)
        if not os.path.exists(sevenzip_output_dir):
            self.LOGGER.info('''%s => Failure to extract''' % src_file)
            return

        file_list = os.listdir(sevenzip_output_dir)
        assert len(file_list) is 1
        sevenzip_output_file = os.path.join(sevenzip_output_dir, file_list[0])
        shutil.copy(sevenzip_output_file, dest_file)
        shutil.rmtree(sevenzip_output_dir)
 
    def source_xls_to_csv(self, src_dir, dest_dir):
        assert os.path.isdir(src_dir)
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        for date in reversed(self.DATES):
            src_file = os.path.join(src_dir, date.strftime('%Y-%m') + '.xls')
            self.source_xls_to_csv_single(src_file, dest_dir, date)  

    """
    CSV fields should contains:
        Report Date
        Activity Date
        Report Type (monthly or yearly)
        Total Trading Value of TWSE
        No. of Listed Co.
        Capital Issued
        Total Listed Shares
        Market Capitalization
        Trading Volume
        Trading Value
        No. of Trans. (1,000)
        TAIEX (Average)
        Volume Turnover Rate (%)
        P/E Ratio (PER)
        Dividend Yield (%)
        P/B Ratio (PBR)
        Trading Days
    """
    def source_xls_to_csv_single(self, src_file, dest_dir, date):
        assert os.path.isfile(src_file)
        assert os.path.isdir(dest_dir)
        self.__source_v1_xls_to_csv_single(src_file, dest_dir, date)
        self.__source_v2_xls_to_csv_single(src_file, dest_dir, date)
 
    def __source_v1_xls_to_csv_single(self, src_file, dest_dir, date):
        if date < datetime(2003, 6, 1).date():
            return
        book = xlrd.open_workbook(src_file)
        sheet = book.sheet_by_index(0)
        assert sheet.ncols is 15
        assert sheet.cell(12, 14).value == 'Days'
        assert sheet.cell(12, 0).value.strip() == 'Month'
     
        dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.csv')
        fd = open(dest_file, 'w', newline='')
        csv_writer = csv.writer(fd)
        for r in self.__build_sheet_records(sheet, 13):
            r = [date.strftime('%Y-%m-%d')] + r
            r = self.__remove_comment_mark(r)
            assert len(r) is 17
            csv_writer.writerow(r)
            self.LOGGER.debug('''%s => %s''' % (r, dest_file))
        fd.close()
     
    def __source_v2_xls_to_csv_single(self, src_file, dest_dir, date):
        if date >= datetime(2003, 6, 1).date() or date <= datetime(2000, 9, 1).date():
            return
        book = xlrd.open_workbook(src_file)
        main_sheet = book.sheet_by_index(0)
        assert main_sheet.ncols is 12
     
        if date > datetime(2001, 6, 1).date():
            assert main_sheet.cell(12, 0).value.strip() == 'Month'
        elif date > datetime(2000, 9, 1).date():
            assert main_sheet.cell(11, 0).value.strip() == 'Month'
            assert main_sheet.cell(12, 0).value.strip() == ''
        main_records = self.__build_sheet_records(main_sheet, 13)
     
        rest_sheet = book.sheet_by_index(1)
        assert rest_sheet.ncols is 13
        assert rest_sheet.cell(10, 0).value.strip() == 'Month'
        rest_records = self.__build_sheet_records(rest_sheet, 11)

        assert len(main_records) == len(rest_records)
     
        dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.csv')
        fd = open(dest_file, 'w', newline='')
        csv_writer = csv.writer(fd)
        for i in range(len(main_records)):
            assert len(main_records[i]) is 13
            assert len(rest_records[i]) is 14
            assert main_records[i][0] == rest_records[i][0]
            assert main_records[i][1] == rest_records[i][1]
            r = [date.strftime('%Y-%m-%d')] + \
                    main_records[i][:-2] + rest_records[i][2:6] + rest_records[i][-2:-1]
            r = self.__remove_comment_mark(r)
            assert len(r) is 17
            csv_writer.writerow(r)
            self.LOGGER.debug('''%s => %s''' % (r, dest_file))
        fd.close()

    def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
        assert os.path.isdir(src_dir)
        assert os.path.isfile(dest_db)
        for file in os.listdir(src_dir):
            self.source_csv_to_sqlite_single(os.path.join(src_dir, file), dest_db, sql_insert)
         
    def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
        self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
        fd = open(src_file, 'r')
        csv_reader = csv.reader(fd)
        conn = sqlite3.connect(dest_db)
        cursor = conn.cursor()
        for row in csv_reader:
            cursor.execute(self.SQL_INSERT, row)
            self.LOGGER.debug(row)
        conn.commit()
        cursor.close()
        conn.close()
        fd.close()
     
    def __wget(self, url, dest_file):
        wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
        assert os.path.isfile(wget)
        wget_cmdline = '''%s -N \"%s\" --waitretry=3 -O \"%s\"''' % (wget, url, dest_file)
        os.system(wget_cmdline)

    def __sevenzip_extract(self, src_file, dest_dir):
        sevenzip = os.path.abspath('./src/thirdparty/sevenzip/7z.exe')
        assert os.path.isfile(sevenzip)
        sevenzip_cmdline = '''%s e %s -y -o%s''' % (sevenzip, src_file, dest_dir)
        os.system(sevenzip_cmdline)
 
    def __build_sheet_records(self, sheet, begin_row):
        rv = []
     
        monthly_curr_year = ''
        for curr_row in range(begin_row, sheet.nrows):
            r = sheet.row_values(curr_row)
            first_cell = r[0].strip()
         
            if first_cell.startswith('註'): # Check footer.
                break
            if first_cell.endswith(')月'): # Ignore this year summary because it is partial.
                continue
            if first_cell.endswith(')'): # Check if yearly record. Example: 93(2004)
                curr_date = '''%s-01-01''' % first_cell[first_cell.index('(')+1 : -1]
                sheet_record = [curr_date, 'yearly'] + r[1:]
                rv.append(sheet_record)
            if first_cell.endswith('月'): # Check if monthly record. Example: 95年  1月
                curr_month = 0
                if '年' in first_cell:
                    monthly_curr_year = int(first_cell[:first_cell.index('年')]) + 1911
                    curr_month = int(first_cell[first_cell.index('年')+1 : first_cell.index('月')])
                else:
                    curr_month = int(first_cell[:first_cell.index('月')])
                curr_date = '''%s-%02d-01''' % (monthly_curr_year, curr_month)
                sheet_record = [curr_date, 'monthly'] + r[1:]
                rv.append(sheet_record)
        return rv
     
    def __remove_comment_mark(self, csv_record):
        rv = csv_record[:3]
        for i in range(3, len(csv_record)):
            value = csv_record[i]
            try:
                float(value)
                rv.append(value)
            except ValueError:
                fixed_value = value[value.rindex(' ')+ 1 :].replace(',', '')
                float(fixed_value)
                rv.append(fixed_value)
        return rv

沒有留言:

張貼留言