塞完資料庫,當然就玩點有趣的東西:
select
activity_date,
average_taiex,
pbr
from
(
select *, max(report_date) from MarketStatistics where report_type = 'monthly'
group by activity_date
)
order by activity_date
PBR 跟股價指數有強烈的正相關 ((這是廢話))
殖利率跟股價指數有些的負相關 ((這是廢話))
SQLite schema:
create table if not exists MarketStatistics
(
creation_dt datetime default current_timestamp,
report_date datetime not null,
activity_date datetime not null,
report_type text not null,
total_trading_value real,
listed_co_number real,
capital_issued real,
total_listed_shares real,
market_capitalization real,
trading_volume real,
trading_value real,
trans_number real,
average_taiex real,
volume_turnover_rate real,
per real,
dividend_yield real,
pbr real,
trading_days int,
unique (report_date, activity_date, report_type) on conflict ignore
);
Python source code:
# coding: big5
import csv
import logging
import os
import shutil
import sqlite3
import xlrd
from datetime import date
from datetime import datetime
from ..common import logger
class Sourcing():
def __init__(self):
self.LOGGER = logging.getLogger()
self.URL_TEMPLATE = '''http://www.twse.com.tw/ch/inc/download.php?l1=Securities+Trading+Monthly+Statistics&l2=Statistics+of+Securities+Market&url=/ch/statistics/download/02/001/%s_C02001.zip'''
self.DATES = []
self.ZIP_DIR = '''./dataset/market_statistics/zip/'''
self.XLS_DIR = '''./dataset/market_statistics/xls/'''
self.CSV_DIR = '''./dataset/market_statistics/csv/'''
self.DB_FILE = './db/stocktotal.db'
self.SQL_INSERT = '''insert or ignore into MarketStatistics(
report_date,
activity_date,
report_type,
total_trading_value,
listed_co_number,
capital_issued,
total_listed_shares,
market_capitalization,
trading_volume,
trading_value,
trans_number,
average_taiex,
volume_turnover_rate,
per,
dividend_yield,
pbr,
trading_days
) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''
def source(self, begin_date, end_date):
self.init_dates(begin_date, end_date)
self.source_url_to_zip(self.ZIP_DIR)
self.source_zip_to_xls(self.ZIP_DIR, self.XLS_DIR)
self.source_xls_to_csv(self.XLS_DIR, self.CSV_DIR)
self.source_csv_to_sqlite(self.CSV_DIR, self.DB_FILE, self.SQL_INSERT)
def init_dates(self, begin_date, end_date):
begin = datetime.strptime(begin_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
monthly_begin = 12 * begin.year + begin.month - 1
monthly_end = 12 * end.year + end.month
for monthly in range(monthly_begin, monthly_end):
year, month = divmod(monthly, 12)
self.DATES.append(date(year, month + 1, 1))
def source_url_to_zip(self, dest_dir):
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for date in self.DATES:
url = self.URL_TEMPLATE % date.strftime('%Y%m')
dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.zip')
self.__wget(url, dest_file)
def source_zip_to_xls(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for date in self.DATES:
src_file = os.path.join(src_dir, date.strftime('%Y-%m') + '.zip')
dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.xls')
self.source_zip_to_xls_single(src_file, dest_dir, dest_file)
def source_zip_to_xls_single(self, src_file, dest_dir, dest_file):
assert os.path.isfile(src_file)
assert os.path.isdir(dest_dir)
sevenzip_output_dir = os.path.join(dest_dir, 'sevenzip_output_dir')
self.__sevenzip_extract(src_file, sevenzip_output_dir)
if not os.path.exists(sevenzip_output_dir):
self.LOGGER.info('''%s => Failure to extract''' % src_file)
return
file_list = os.listdir(sevenzip_output_dir)
assert len(file_list) is 1
sevenzip_output_file = os.path.join(sevenzip_output_dir, file_list[0])
shutil.copy(sevenzip_output_file, dest_file)
shutil.rmtree(sevenzip_output_dir)
def source_xls_to_csv(self, src_dir, dest_dir):
assert os.path.isdir(src_dir)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
for date in reversed(self.DATES):
src_file = os.path.join(src_dir, date.strftime('%Y-%m') + '.xls')
self.source_xls_to_csv_single(src_file, dest_dir, date)
"""
CSV fields should contains:
Report Date
Activity Date
Report Type (monthly or yearly)
Total Trading Value of TWSE
No. of Listed Co.
Capital Issued
Total Listed Shares
Market Capitalization
Trading Volume
Trading Value
No. of Trans. (1,000)
TAIEX (Average)
Volume Turnover Rate (%)
P/E Ratio (PER)
Dividend Yield (%)
P/B Ratio (PBR)
Trading Days
"""
def source_xls_to_csv_single(self, src_file, dest_dir, date):
assert os.path.isfile(src_file)
assert os.path.isdir(dest_dir)
self.__source_v1_xls_to_csv_single(src_file, dest_dir, date)
self.__source_v2_xls_to_csv_single(src_file, dest_dir, date)
def __source_v1_xls_to_csv_single(self, src_file, dest_dir, date):
if date < datetime(2003, 6, 1).date():
return
book = xlrd.open_workbook(src_file)
sheet = book.sheet_by_index(0)
assert sheet.ncols is 15
assert sheet.cell(12, 14).value == 'Days'
assert sheet.cell(12, 0).value.strip() == 'Month'
dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.csv')
fd = open(dest_file, 'w', newline='')
csv_writer = csv.writer(fd)
for r in self.__build_sheet_records(sheet, 13):
r = [date.strftime('%Y-%m-%d')] + r
r = self.__remove_comment_mark(r)
assert len(r) is 17
csv_writer.writerow(r)
self.LOGGER.debug('''%s => %s''' % (r, dest_file))
fd.close()
def __source_v2_xls_to_csv_single(self, src_file, dest_dir, date):
if date >= datetime(2003, 6, 1).date() or date <= datetime(2000, 9, 1).date():
return
book = xlrd.open_workbook(src_file)
main_sheet = book.sheet_by_index(0)
assert main_sheet.ncols is 12
if date > datetime(2001, 6, 1).date():
assert main_sheet.cell(12, 0).value.strip() == 'Month'
elif date > datetime(2000, 9, 1).date():
assert main_sheet.cell(11, 0).value.strip() == 'Month'
assert main_sheet.cell(12, 0).value.strip() == ''
main_records = self.__build_sheet_records(main_sheet, 13)
rest_sheet = book.sheet_by_index(1)
assert rest_sheet.ncols is 13
assert rest_sheet.cell(10, 0).value.strip() == 'Month'
rest_records = self.__build_sheet_records(rest_sheet, 11)
assert len(main_records) == len(rest_records)
dest_file = os.path.join(dest_dir, date.strftime('%Y-%m') + '.csv')
fd = open(dest_file, 'w', newline='')
csv_writer = csv.writer(fd)
for i in range(len(main_records)):
assert len(main_records[i]) is 13
assert len(rest_records[i]) is 14
assert main_records[i][0] == rest_records[i][0]
assert main_records[i][1] == rest_records[i][1]
r = [date.strftime('%Y-%m-%d')] + \
main_records[i][:-2] + rest_records[i][2:6] + rest_records[i][-2:-1]
r = self.__remove_comment_mark(r)
assert len(r) is 17
csv_writer.writerow(r)
self.LOGGER.debug('''%s => %s''' % (r, dest_file))
fd.close()
def source_csv_to_sqlite(self, src_dir, dest_db, sql_insert):
assert os.path.isdir(src_dir)
assert os.path.isfile(dest_db)
for file in os.listdir(src_dir):
self.source_csv_to_sqlite_single(os.path.join(src_dir, file), dest_db, sql_insert)
def source_csv_to_sqlite_single(self, src_file, dest_db, sql_insert):
self.LOGGER.debug('''%s => %s''' % (src_file, dest_db))
fd = open(src_file, 'r')
csv_reader = csv.reader(fd)
conn = sqlite3.connect(dest_db)
cursor = conn.cursor()
for row in csv_reader:
cursor.execute(self.SQL_INSERT, row)
self.LOGGER.debug(row)
conn.commit()
cursor.close()
conn.close()
fd.close()
def __wget(self, url, dest_file):
wget = os.path.abspath('./src/thirdparty/wget/wget.exe')
assert os.path.isfile(wget)
wget_cmdline = '''%s -N \"%s\" --waitretry=3 -O \"%s\"''' % (wget, url, dest_file)
os.system(wget_cmdline)
def __sevenzip_extract(self, src_file, dest_dir):
sevenzip = os.path.abspath('./src/thirdparty/sevenzip/7z.exe')
assert os.path.isfile(sevenzip)
sevenzip_cmdline = '''%s e %s -y -o%s''' % (sevenzip, src_file, dest_dir)
os.system(sevenzip_cmdline)
def __build_sheet_records(self, sheet, begin_row):
rv = []
monthly_curr_year = ''
for curr_row in range(begin_row, sheet.nrows):
r = sheet.row_values(curr_row)
first_cell = r[0].strip()
if first_cell.startswith('註'): # Check footer.
break
if first_cell.endswith(')月'): # Ignore this year summary because it is partial.
continue
if first_cell.endswith(')'): # Check if yearly record. Example: 93(2004)
curr_date = '''%s-01-01''' % first_cell[first_cell.index('(')+1 : -1]
sheet_record = [curr_date, 'yearly'] + r[1:]
rv.append(sheet_record)
if first_cell.endswith('月'): # Check if monthly record. Example: 95年 1月
curr_month = 0
if '年' in first_cell:
monthly_curr_year = int(first_cell[:first_cell.index('年')]) + 1911
curr_month = int(first_cell[first_cell.index('年')+1 : first_cell.index('月')])
else:
curr_month = int(first_cell[:first_cell.index('月')])
curr_date = '''%s-%02d-01''' % (monthly_curr_year, curr_month)
sheet_record = [curr_date, 'monthly'] + r[1:]
rv.append(sheet_record)
return rv
def __remove_comment_mark(self, csv_record):
rv = csv_record[:3]
for i in range(3, len(csv_record)):
value = csv_record[i]
try:
float(value)
rv.append(value)
except ValueError:
fixed_value = value[value.rindex(' ')+ 1 :].replace(',', '')
float(fixed_value)
rv.append(fixed_value)
return rv
沒有留言:
張貼留言