diff --git a/docs/conf.py b/docs/conf.py index 60f3d9e..1484d4f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -31,48 +31,48 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ['sphinx.ext.coverage'] +extensions = ["sphinx.ext.coverage"] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: # # source_suffix = ['.rst', '.md'] -source_suffix = '.rst' +source_suffix = ".rst" # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = 'twstock' -copyright = '2017, Louie Lu' -author = 'Louie Lu' +project = "twstock" +copyright = "2017, Louie Lu" +author = "Louie Lu" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = '1.0' +version = "1.0" # The full version, including alpha/beta/rc tags. -release = '1.0.1' +release = "1.0.1" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = 'zh_TW' +language = "zh_TW" # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This patterns also effect to html_static_path and html_extra_path -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # If true, `todo` and `todoList` produce output, else they produce nothing. todo_include_todos = False @@ -83,7 +83,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'alabaster' +html_theme = "alabaster" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -94,13 +94,13 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # -- Options for HTMLHelp output ------------------------------------------ # Output file base name for HTML help builder. -htmlhelp_basename = 'twstockdoc' +htmlhelp_basename = "twstockdoc" # -- Options for LaTeX output --------------------------------------------- @@ -109,15 +109,12 @@ # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -127,8 +124,7 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'twstock.tex', 'twstock Documentation', - 'Louie Lu', 'manual'), + (master_doc, "twstock.tex", "twstock Documentation", "Louie Lu", "manual"), ] @@ -136,10 +132,7 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'twstock', 'twstock Documentation', - [author], 1) -] +man_pages = [(master_doc, "twstock", "twstock Documentation", [author], 1)] # -- Options for Texinfo output ------------------------------------------- @@ -148,13 +141,18 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'twstock', 'twstock Documentation', - author, 'twstock', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + "twstock", + "twstock Documentation", + author, + "twstock", + "One line description of project.", + "Miscellaneous", + ), ] - html_sidebars = { - '**': ['globaltoc.html', 'relations.html', 'sourcelink.html', 'searchbox.html'] + "**": ["globaltoc.html", "relations.html", "sourcelink.html", "searchbox.html"] } diff --git a/docs/serve.py b/docs/serve.py index 4e65a80..5387287 100755 --- a/docs/serve.py +++ b/docs/serve.py @@ -1,36 +1,36 @@ #!/usr/bin/env python3 -''' +""" Small wsgiref based web server. Takes a path to serve from and an optional port number (defaults to 8000), then tries to serve files. Mime types are guessed from the file names, 404 errors are raised if the file is not found. Used for the make serve target in Doc. -''' +""" import sys import os import mimetypes from wsgiref import simple_server, util -def app(environ, respond): - fn = os.path.join(path, environ['PATH_INFO'][1:]) - if '.' not in fn.split(os.path.sep)[-1]: - fn = os.path.join(fn, 'index.html') +def app(environ, respond): + fn = os.path.join(path, environ["PATH_INFO"][1:]) + if "." not in fn.split(os.path.sep)[-1]: + fn = os.path.join(fn, "index.html") type = mimetypes.guess_type(fn)[0] if os.path.exists(fn): - respond('200 OK', [('Content-Type', type)]) + respond("200 OK", [("Content-Type", type)]) return util.FileWrapper(open(fn, "rb")) else: - respond('404 Not Found', [('Content-Type', 'text/plain')]) - return [b'not found'] + respond("404 Not Found", [("Content-Type", "text/plain")]) + return [b"not found"] -if __name__ == '__main__': + +if __name__ == "__main__": path = sys.argv[1] port = int(sys.argv[2]) if len(sys.argv) > 2 else 8000 - httpd = simple_server.make_server('', port, app) + httpd = simple_server.make_server("", port, app) print("Serving {} on port {}, control-C to stop".format(path, port)) try: httpd.serve_forever() except KeyboardInterrupt: print("\b\bShutting down.") - diff --git a/test/test_analytics.py b/test/test_analytics.py index 49540c1..e108537 100644 --- a/test/test_analytics.py +++ b/test/test_analytics.py @@ -85,7 +85,7 @@ def test_ma_bias_ratio_pivot(self): class BestFourPointTest(unittest.TestCase): @classmethod def setUpClass(self): - self.stock = stock.Stock('2330') + self.stock = stock.Stock("2330") self.stock.fetch(2015, 5) self.legacy = legacy.LegacyBestFourPoint(self.stock) self.ng = analytics.BestFourPoint(self.stock) @@ -119,27 +119,24 @@ def test_best_sell_4(self): self.assertEqual(self.ng.best_sell_4(), self.legacy.best_sell_4()) def test_best_four_point_to_buy(self): - self.assertEqual(self.ng.best_four_point_to_buy(), - self.legacy.best_four_point_to_buy()) + self.assertEqual( + self.ng.best_four_point_to_buy(), self.legacy.best_four_point_to_buy() + ) def test_best_four_point_to_sell(self): - self.assertEqual(self.ng.best_four_point_to_sell(), - self.legacy.best_four_point_to_sell()) + self.assertEqual( + self.ng.best_four_point_to_sell(), self.legacy.best_four_point_to_sell() + ) def test_best_four_point(self): self.stock.fetch(2014, 5) - self.assertEqual(self.ng.best_four_point(), - self.legacy.best_four_point()) + self.assertEqual(self.ng.best_four_point(), self.legacy.best_four_point()) self.stock.fetch(2015, 5) - self.assertEqual(self.ng.best_four_point(), - self.legacy.best_four_point()) + self.assertEqual(self.ng.best_four_point(), self.legacy.best_four_point()) self.stock.fetch(2016, 5) - self.assertEqual(self.ng.best_four_point(), - self.legacy.best_four_point()) + self.assertEqual(self.ng.best_four_point(), self.legacy.best_four_point()) self.stock.fetch(2017, 5) - self.assertEqual(self.ng.best_four_point(), - self.legacy.best_four_point()) - + self.assertEqual(self.ng.best_four_point(), self.legacy.best_four_point()) diff --git a/test/test_cli.py b/test/test_cli.py index 19f1324..ab821f2 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -6,7 +6,7 @@ class CLIFunctionTest(unittest.TestCase): def setUp(self): - self.stocks = ['2330', '6223'] + self.stocks = ["2330", "6223"] def test_best_four_point(self): cli.best_four_point.run(self.stocks) diff --git a/test/test_mock.py b/test/test_mock.py index d1e132e..9f12ffa 100644 --- a/test/test_mock.py +++ b/test/test_mock.py @@ -4,20 +4,24 @@ class MockTest(unittest.TestCase): def test_mock_get_stock_info_will_work(self): - self.assertIn('msgArray', mock.get_stock_info('2330')) + self.assertIn("msgArray", mock.get_stock_info("2330")) def test_mock_get_stock_info_raw_data(self): self.assertCountEqual( - mock.get_stock_info('2330').keys(), - ['msgArray', 'userDelay', 'rtmessage', 'referer', 'queryTime', 'rtcode']) + mock.get_stock_info("2330").keys(), + ["msgArray", "userDelay", "rtmessage", "referer", "queryTime", "rtcode"], + ) def test_mock_get_stock_info_msgarray(self): - self.assertEqual(mock.get_stock_info('2330')['msgArray'][0]['c'], '2330') + self.assertEqual(mock.get_stock_info("2330")["msgArray"][0]["c"], "2330") def test_mock_get_stock_info_will_change_in_different_index(self): self.assertNotEqual( - mock.get_stock_info('2330', 0), mock.get_stock_info('2330', 1)) + mock.get_stock_info("2330", 0), mock.get_stock_info("2330", 1) + ) self.assertNotEqual( - mock.get_stock_info('2330', 1), mock.get_stock_info('2330', 2)) + mock.get_stock_info("2330", 1), mock.get_stock_info("2330", 2) + ) self.assertNotEqual( - mock.get_stock_info('2330', 0), mock.get_stock_info('2330', 2)) + mock.get_stock_info("2330", 0), mock.get_stock_info("2330", 2) + ) diff --git a/test/test_proxy.py b/test/test_proxy.py index 06e97a0..7034a2b 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -6,7 +6,6 @@ class ProxyProviderTest(unittest.TestCase): - def setUp(self): reset_proxy_provider() @@ -18,24 +17,25 @@ def test_configure(self): self.assertDictEqual({}, get_proxies()) # configure fake proxy - configure_proxy_provider(SingleProxyProvider(dict(http="http-proxy", https="https-proxy"))) - self.assertEqual("http-proxy", get_proxies()['http']) - self.assertEqual("https-proxy", get_proxies()['https']) + configure_proxy_provider( + SingleProxyProvider(dict(http="http-proxy", https="https-proxy")) + ) + self.assertEqual("http-proxy", get_proxies()["http"]) + self.assertEqual("https-proxy", get_proxies()["https"]) # reset proxy reset_proxy_provider() self.assertDictEqual({}, get_proxies()) - def test_rr_proxies_provider(self): - proxies = ['a', 'b', 'c'] + proxies = ["a", "b", "c"] rr_provider = RoundRobinProxiesProvider(proxies) for _ in range(3): for p in proxies: self.assertEqual(rr_provider.get_proxy(), p) - proxies = ['d', 'e', 'f'] + proxies = ["d", "e", "f"] rr_provider.proxies = proxies for _ in range(3): for p in proxies: diff --git a/test/test_realtime.py b/test/test_realtime.py index ffeeba0..1c84254 100644 --- a/test/test_realtime.py +++ b/test/test_realtime.py @@ -8,37 +8,37 @@ class RealtimeTest(unittest.TestCase): def test_realtime_field(self): self.assertCountEqual( - realtime.get_raw('2330').keys(), - twstock.mock.get_stock_info('2330').keys()) + realtime.get_raw("2330").keys(), twstock.mock.get_stock_info("2330").keys() + ) def test_realtime_get_raw(self): - self.assertIn('msgArray', realtime.get_raw('2330')) + self.assertIn("msgArray", realtime.get_raw("2330")) def test_realtime_get_blank(self): - stock = realtime.get('') + stock = realtime.get("") - self.assertFalse(stock['success']) - self.assertIn('rtmessage', stock) - self.assertIn('rtcode', stock) + self.assertFalse(stock["success"]) + self.assertIn("rtmessage", stock) + self.assertIn("rtcode", stock) def test_realtime_get_bad_id(self): - stock = realtime.get('9999') + stock = realtime.get("9999") - self.assertFalse(stock['success']) - self.assertIn('rtmessage', stock) - self.assertIn('rtcode', stock) + self.assertFalse(stock["success"]) + self.assertIn("rtmessage", stock) + self.assertIn("rtcode", stock) - stock = realtime.get(['9999', '8888']) + stock = realtime.get(["9999", "8888"]) - self.assertFalse(stock['success']) - self.assertIn('rtmessage', stock) - self.assertIn('rtcode', stock) + self.assertFalse(stock["success"]) + self.assertIn("rtmessage", stock) + self.assertIn("rtcode", stock) def test_realtime_get_tpex_id(self): - stock = realtime.get('6223') + stock = realtime.get("6223") - self.assertTrue(stock['success']) - self.assertEqual(stock['info']['code'], '6223') + self.assertTrue(stock["success"]) + self.assertEqual(stock["info"]["code"], "6223") class MockRealtimeTest(unittest.TestCase): @@ -51,21 +51,23 @@ def tearDownClass(cls): realtime.mock = False def test_mock_one_stock_id(self): - s = realtime.get('2330') + s = realtime.get("2330") - self.assertTrue(s['success']) - self.assertEqual(s['info']['code'], '2330') - self.assertEqual(s['realtime']['latest_trade_price'], '214.50') - self.assertEqual(s['realtime']['best_bid_price'], - ['214.00', '213.50', '213.00', '212.50', '212.00']) + self.assertTrue(s["success"]) + self.assertEqual(s["info"]["code"], "2330") + self.assertEqual(s["realtime"]["latest_trade_price"], "214.50") + self.assertEqual( + s["realtime"]["best_bid_price"], + ["214.00", "213.50", "213.00", "212.50", "212.00"], + ) - @unittest.skip('Dont want to fix this, is about the code in realtime') + @unittest.skip("Dont want to fix this, is about the code in realtime") def test_mock_multiple_stock_id(self): - s = realtime.get(['2330', '2337']) - - self.assertTrue(s['success']) - self.assertCountEqual(s.keys(), ['2330', '2337', 'success']) - self.assertTrue(s['2330']['success']) - self.assertEqual(s['2330']['info']['code'], '2330') - self.assertEqual(s['2330']['realtime']['latest_trade_price'], '214.50') - self.assertTrue(s['2337']['success']) + s = realtime.get(["2330", "2337"]) + + self.assertTrue(s["success"]) + self.assertCountEqual(s.keys(), ["2330", "2337", "success"]) + self.assertTrue(s["2330"]["success"]) + self.assertEqual(s["2330"]["info"]["code"], "2330") + self.assertEqual(s["2330"]["realtime"]["latest_trade_price"], "214.50") + self.assertTrue(s["2337"]["success"]) diff --git a/test/test_stock.py b/test/test_stock.py index 2900fa4..a228af6 100644 --- a/test/test_stock.py +++ b/test/test_stock.py @@ -5,13 +5,22 @@ class FetcherTest(object): def test_convert_date(self): - date = '106/05/01' + date = "106/05/01" cv_date = self.fetcher._convert_date(date) - self.assertEqual(cv_date, '2017/05/01') + self.assertEqual(cv_date, "2017/05/01") def test_make_datatuple(self): - data = ['106/05/02', '45,851,963', '9,053,856,108', '198.50', - '199.00', '195.50', '196.50', '+2.00', '15,718'] + data = [ + "106/05/02", + "45,851,963", + "9,053,856,108", + "198.50", + "199.00", + "195.50", + "196.50", + "+2.00", + "15,718", + ] dt = self.fetcher._make_datatuple(data) self.assertEqual(dt.date, datetime.datetime(2017, 5, 2)) self.assertEqual(dt.capacity, 45851963) @@ -24,8 +33,17 @@ def test_make_datatuple(self): self.assertEqual(dt.transaction, 15718) def test_make_datatuple_without_prices(self): - data = ['106/05/02', '45,851,963', '9,053,856,108', '--', - '--', '--', '--', ' 0.00', '15,718'] + data = [ + "106/05/02", + "45,851,963", + "9,053,856,108", + "--", + "--", + "--", + "--", + " 0.00", + "15,718", + ] dt = self.fetcher._make_datatuple(data) self.assertEqual(dt.date, datetime.datetime(2017, 5, 2)) self.assertEqual(dt.capacity, 45851963) @@ -46,8 +64,17 @@ class TPEXFetcherTest(unittest.TestCase, FetcherTest): fetcher = stock.TPEXFetcher() def test_make_datatuple(self): - data = ['106/05/02', '45,851', '9,053,856', '198.50', - '199.00', '195.50', '196.50', '2.00', '15,718'] + data = [ + "106/05/02", + "45,851", + "9,053,856", + "198.50", + "199.00", + "195.50", + "196.50", + "2.00", + "15,718", + ] dt = self.fetcher._make_datatuple(data) self.assertEqual(dt.date, datetime.datetime(2017, 5, 2)) self.assertEqual(dt.capacity, 45851000) @@ -60,8 +87,17 @@ def test_make_datatuple(self): self.assertEqual(dt.transaction, 15718) def test_make_datatuple_without_prices(self): - data = ['106/05/02', '45,851', '9,053,856', '--', - '--', '--', '--', '0.00', '15,718'] + data = [ + "106/05/02", + "45,851", + "9,053,856", + "--", + "--", + "--", + "--", + "0.00", + "15,718", + ] dt = self.fetcher._make_datatuple(data) self.assertEqual(dt.date, datetime.datetime(2017, 5, 2)) self.assertEqual(dt.capacity, 45851000) @@ -74,8 +110,17 @@ def test_make_datatuple_without_prices(self): self.assertEqual(dt.transaction, 15718) def test_make_datatuple_with_asterisk(self): - data = ['106/05/02*', '45,851', '9,053,856', '198.50', - '199.00', '195.50', '196.50', '2.00', '15,718'] + data = [ + "106/05/02*", + "45,851", + "9,053,856", + "198.50", + "199.00", + "195.50", + "196.50", + "2.00", + "15,718", + ] dt = self.fetcher._make_datatuple(data) self.assertEqual(dt.date, datetime.datetime(2017, 5, 2)) self.assertEqual(dt.capacity, 45851000) @@ -148,52 +193,134 @@ def test_transaction(self): class TWSEStockTest(unittest.TestCase, StockTest): @classmethod def setUpClass(cls): - cls.stk = stock.Stock('2330') + cls.stk = stock.Stock("2330") def test_price(self): self.stk.fetch(2015, 5) self.assertIsInstance(self.stk.price, list) self.assertEqual(len(self.stk.price), len(self.stk.data)) self.assertEqual(self.stk.price, [d.close for d in self.stk.data]) - self.assertEqual(self.stk.price, - [147.5, 147.0, 147.5, 146.5, 146.5, 148.5, 147.5, - 148.0, 146.0, 146.5, 146.5, 146.5, 146.5, 145.5, - 145.5, 147.5, 146.5, 145.0, 147.0, 146.0]) + self.assertEqual( + self.stk.price, + [ + 147.5, + 147.0, + 147.5, + 146.5, + 146.5, + 148.5, + 147.5, + 148.0, + 146.0, + 146.5, + 146.5, + 146.5, + 146.5, + 145.5, + 145.5, + 147.5, + 146.5, + 145.0, + 147.0, + 146.0, + ], + ) def test_capacity(self): self.stk.fetch(2015, 5) self.assertIsInstance(self.stk.capacity, list) self.assertEqual(len(self.stk.capacity), len(self.stk.data)) self.assertEqual(self.stk.capacity, [d.capacity for d in self.stk.data]) - self.assertEqual(self.stk.capacity, - [30868640, 27789400, 18824208, 21908150, 20035646, - 20402529, 24956498, 19437537, 39888654, 24831890, - 26212375, 26321396, 26984912, 41286686, 22103852, - 16323218, 16069726, 24257941, 36704395, 61983862]) + self.assertEqual( + self.stk.capacity, + [ + 30868640, + 27789400, + 18824208, + 21908150, + 20035646, + 20402529, + 24956498, + 19437537, + 39888654, + 24831890, + 26212375, + 26321396, + 26984912, + 41286686, + 22103852, + 16323218, + 16069726, + 24257941, + 36704395, + 61983862, + ], + ) class TPEXStockTest(unittest.TestCase, StockTest): @classmethod def setUpClass(cls): - cls.stk = stock.Stock('6223') + cls.stk = stock.Stock("6223") def test_price(self): self.stk.fetch(2015, 5) self.assertIsInstance(self.stk.price, list) self.assertEqual(len(self.stk.price), len(self.stk.data)) self.assertEqual(self.stk.price, [d.close for d in self.stk.data]) - self.assertEqual(self.stk.price, - [91.4, 91.8, 91.8, 93.5, 91.0, 84.7, 84.0, 85.8, 87.1, - 86.1, 83.9, 84.5, 86.7, 86.3, 86.0, 86.2, 91.1, 90.9, - 91.7, 90.4]) + self.assertEqual( + self.stk.price, + [ + 91.4, + 91.8, + 91.8, + 93.5, + 91.0, + 84.7, + 84.0, + 85.8, + 87.1, + 86.1, + 83.9, + 84.5, + 86.7, + 86.3, + 86.0, + 86.2, + 91.1, + 90.9, + 91.7, + 90.4, + ], + ) def test_capacity(self): self.stk.fetch(2015, 5) self.assertIsInstance(self.stk.capacity, list) self.assertEqual(len(self.stk.capacity), len(self.stk.data)) self.assertEqual(self.stk.capacity, [d.capacity for d in self.stk.data]) - self.assertEqual(self.stk.capacity, - [374000, 474000, 468000, 1257000, 1079000, 3400000, - 3424000, 1078000, 1433000, 891000, 1202000, 1008000, - 999000, 488000, 706000, 231000, 1890000, 782000, - 1214000, 583000]) + self.assertEqual( + self.stk.capacity, + [ + 374000, + 474000, + 468000, + 1257000, + 1079000, + 3400000, + 3424000, + 1078000, + 1433000, + 891000, + 1202000, + 1008000, + 999000, + 488000, + 706000, + 231000, + 1890000, + 782000, + 1214000, + 583000, + ], + ) diff --git a/twstock/__init__.py b/twstock/__init__.py index ff36338..c06ea96 100644 --- a/twstock/__init__.py +++ b/twstock/__init__.py @@ -11,4 +11,4 @@ from twstock.stock import Stock -__version__ = '1.3.1' +__version__ = "1.3.1" diff --git a/twstock/__main__.py b/twstock/__main__.py index 1a1478c..63ee26b 100644 --- a/twstock/__main__.py +++ b/twstock/__main__.py @@ -4,5 +4,5 @@ from twstock import cli -if __name__ == '__main__': +if __name__ == "__main__": cli.run() diff --git a/twstock/analytics.py b/twstock/analytics.py index 41715e1..ec4a9c6 100644 --- a/twstock/analytics.py +++ b/twstock/analytics.py @@ -2,7 +2,6 @@ class Analytics(object): - def continuous(self, data): diff = [1 if data[-i] > data[-i - 1] else -1 for i in range(1, len(data))] cont = 0 @@ -25,7 +24,9 @@ def ma_bias_ratio(self, day1, day2): """Calculate moving average bias ratio""" data1 = self.moving_average(self.price, day1) data2 = self.moving_average(self.price, day2) - result = [data1[-i] - data2[-i] for i in range(1, min(len(data1), len(data2)) + 1)] + result = [ + data1[-i] - data2[-i] for i in range(1, min(len(data1), len(data2)) + 1) + ] return result[::-1] @@ -40,23 +41,28 @@ def ma_bias_ratio_pivot(self, data, sample_size=5, position=False): check_value = min(sample) pre_check_value = max(sample) < 0 - return ((sample_size - sample.index(check_value) < 4 and - sample.index(check_value) != sample_size - 1 and pre_check_value), - sample_size - sample.index(check_value) - 1, - check_value) + return ( + ( + sample_size - sample.index(check_value) < 4 + and sample.index(check_value) != sample_size - 1 + and pre_check_value + ), + sample_size - sample.index(check_value) - 1, + check_value, + ) class BestFourPoint(object): - BEST_BUY_WHY = ['量大收紅', '量縮價不跌', '三日均價由下往上', '三日均價大於六日均價'] - BEST_SELL_WHY = ['量大收黑', '量縮價跌', '三日均價由上往下', '三日均價小於六日均價'] + BEST_BUY_WHY = ["量大收紅", "量縮價不跌", "三日均價由下往上", "三日均價大於六日均價"] + BEST_SELL_WHY = ["量大收黑", "量縮價跌", "三日均價由上往下", "三日均價小於六日均價"] def __init__(self, stock): self.stock = stock def bias_ratio(self, position=False): return self.stock.ma_bias_ratio_pivot( - self.stock.ma_bias_ratio(3, 6), - position=position) + self.stock.ma_bias_ratio(3, 6), position=position + ) def plus_bias_ratio(self): return self.bias_ratio(True) @@ -65,58 +71,82 @@ def mins_bias_ratio(self): return self.bias_ratio(False) def best_buy_1(self): - return (self.stock.capacity[-1] > self.stock.capacity[-2] and - self.stock.price[-1] > self.stock.open[-1]) + return ( + self.stock.capacity[-1] > self.stock.capacity[-2] + and self.stock.price[-1] > self.stock.open[-1] + ) def best_buy_2(self): - return (self.stock.capacity[-1] < self.stock.capacity[-2] and - self.stock.price[-1] > self.stock.open[-2]) + return ( + self.stock.capacity[-1] < self.stock.capacity[-2] + and self.stock.price[-1] > self.stock.open[-2] + ) def best_buy_3(self): - return self.stock.continuous(self.stock.moving_average(self.stock.price, 3)) == 1 + return ( + self.stock.continuous(self.stock.moving_average(self.stock.price, 3)) == 1 + ) def best_buy_4(self): - return (self.stock.moving_average(self.stock.price, 3)[-1] > - self.stock.moving_average(self.stock.price, 6)[-1]) + return ( + self.stock.moving_average(self.stock.price, 3)[-1] + > self.stock.moving_average(self.stock.price, 6)[-1] + ) def best_sell_1(self): - return (self.stock.capacity[-1] > self.stock.capacity[-2] and - self.stock.price[-1] < self.stock.open[-1]) + return ( + self.stock.capacity[-1] > self.stock.capacity[-2] + and self.stock.price[-1] < self.stock.open[-1] + ) def best_sell_2(self): - return (self.stock.capacity[-1] < self.stock.capacity[-2] and - self.stock.price[-1] < self.stock.open[-2]) + return ( + self.stock.capacity[-1] < self.stock.capacity[-2] + and self.stock.price[-1] < self.stock.open[-2] + ) def best_sell_3(self): - return self.stock.continuous(self.stock.moving_average(self.stock.price, 3)) == -1 + return ( + self.stock.continuous(self.stock.moving_average(self.stock.price, 3)) == -1 + ) def best_sell_4(self): - return (self.stock.moving_average(self.stock.price, 3)[-1] < - self.stock.moving_average(self.stock.price, 6)[-1]) + return ( + self.stock.moving_average(self.stock.price, 3)[-1] + < self.stock.moving_average(self.stock.price, 6)[-1] + ) def best_four_point_to_buy(self): result = [] - check = [self.best_buy_1(), self.best_buy_2(), - self.best_buy_3(), self.best_buy_4()] + check = [ + self.best_buy_1(), + self.best_buy_2(), + self.best_buy_3(), + self.best_buy_4(), + ] if self.mins_bias_ratio() and any(check): for index, v in enumerate(check): if v: result.append(self.BEST_BUY_WHY[index]) else: return False - return ', '.join(result) + return ", ".join(result) def best_four_point_to_sell(self): result = [] - check = [self.best_sell_1(), self.best_sell_2(), - self.best_sell_3(), self.best_sell_4()] + check = [ + self.best_sell_1(), + self.best_sell_2(), + self.best_sell_3(), + self.best_sell_4(), + ] if self.plus_bias_ratio() and any(check): for index, v in enumerate(check): if v: result.append(self.BEST_SELL_WHY[index]) else: return False - return ', '.join(result) + return ", ".join(result) def best_four_point(self): buy = self.best_four_point_to_buy() diff --git a/twstock/cli/__init__.py b/twstock/cli/__init__.py index 9dcce1e..ca1c19a 100644 --- a/twstock/cli/__init__.py +++ b/twstock/cli/__init__.py @@ -9,10 +9,12 @@ def run(): parser = argparse.ArgumentParser() - parser.add_argument('-b', '--bfp', nargs='+') - parser.add_argument('-s', '--stock', nargs='+') - parser.add_argument('-r', '--realtime', nargs='+') - parser.add_argument('-U', '--upgrade-codes', action='store_true', help='Update entites codes') + parser.add_argument("-b", "--bfp", nargs="+") + parser.add_argument("-s", "--stock", nargs="+") + parser.add_argument("-r", "--realtime", nargs="+") + parser.add_argument( + "-U", "--upgrade-codes", action="store_true", help="Update entites codes" + ) args = parser.parse_args() if args.bfp: @@ -22,8 +24,8 @@ def run(): elif args.realtime: realtime.run(args.realtime) elif args.upgrade_codes: - print('Start to update codes') + print("Start to update codes") __update_codes() - print('Done!') + print("Done!") else: parser.print_help() diff --git a/twstock/cli/best_four_point.py b/twstock/cli/best_four_point.py index a8d709c..bbe3aed 100644 --- a/twstock/cli/best_four_point.py +++ b/twstock/cli/best_four_point.py @@ -6,20 +6,21 @@ # XXX: Repalce sys.stdout prevent Windows UnicodeEncodeError on cmd.exe stdout = io.TextIOWrapper( - getattr(sys.stdout, 'buffer', sys.stdout), encoding='utf-8', errors='replace') + getattr(sys.stdout, "buffer", sys.stdout), encoding="utf-8", errors="replace" +) def run(argv): - print('四大買賣點判斷 Best Four Point', file=stdout) - print('------------------------------', file=stdout) + print("四大買賣點判斷 Best Four Point", file=stdout) + print("------------------------------", file=stdout) for sid in argv: bfp = twstock.BestFourPoint(twstock.Stock(sid)) bfp = bfp.best_four_point() - print('%s: ' % (sid), end='', file=stdout) + print("%s: " % (sid), end="", file=stdout) if bfp: if bfp[0]: - print('Buy ', bfp[1], file=stdout) + print("Buy ", bfp[1], file=stdout) else: - print('Sell ', bfp[1], file=stdout) + print("Sell ", bfp[1], file=stdout) else: print("Don't touch", file=stdout) diff --git a/twstock/cli/stock.py b/twstock/cli/stock.py index 1f11977..f30a3c1 100644 --- a/twstock/cli/stock.py +++ b/twstock/cli/stock.py @@ -6,7 +6,7 @@ def run(argv): for sid in argv: s = twstock.Stock(sid) - print('-------------- %s ---------------- ' % sid) - print('high : {:>5} {:>5} {:>5} {:>5} {:>5}'.format(*s.high[-5:])) - print('low : {:>5} {:>5} {:>5} {:>5} {:>5}'.format(*s.low[-5:])) - print('price: {:>5} {:>5} {:>5} {:>5} {:>5}'.format(*s.price[-5:])) + print("-------------- %s ---------------- " % sid) + print("high : {:>5} {:>5} {:>5} {:>5} {:>5}".format(*s.high[-5:])) + print("low : {:>5} {:>5} {:>5} {:>5} {:>5}".format(*s.low[-5:])) + print("price: {:>5} {:>5} {:>5} {:>5} {:>5}".format(*s.price[-5:])) diff --git a/twstock/codes/codes.py b/twstock/codes/codes.py index ae6f361..e5319e0 100644 --- a/twstock/codes/codes.py +++ b/twstock/codes/codes.py @@ -11,11 +11,12 @@ from collections import namedtuple -ROW = namedtuple('StockCodeInfo', ['type', 'code', 'name', 'ISIN', 'start', - 'market', 'group', 'CFI']) +ROW = namedtuple( + "StockCodeInfo", ["type", "code", "name", "ISIN", "start", "market", "group", "CFI"] +) PACKAGE_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) -TPEX_EQUITIES_CSV_PATH = os.path.join(PACKAGE_DIRECTORY, 'tpex_equities.csv') -TWSE_EQUITIES_CSV_PATH = os.path.join(PACKAGE_DIRECTORY, 'twse_equities.csv') +TPEX_EQUITIES_CSV_PATH = os.path.join(PACKAGE_DIRECTORY, "tpex_equities.csv") +TWSE_EQUITIES_CSV_PATH = os.path.join(PACKAGE_DIRECTORY, "twse_equities.csv") codes = {} tpex = {} @@ -24,17 +25,17 @@ def read_csv(path, types): global codes, twse, tpex - with open(path, newline='', encoding='utf_8') as csvfile: + with open(path, newline="", encoding="utf_8") as csvfile: reader = csv.reader(csvfile) csvfile.readline() for row in reader: row = ROW(*(item.strip() for item in row)) codes[row.code] = row - if types == 'tpex': + if types == "tpex": tpex[row.code] = row else: twse[row.code] = row -read_csv(TPEX_EQUITIES_CSV_PATH, 'tpex') -read_csv(TWSE_EQUITIES_CSV_PATH, 'twse') +read_csv(TPEX_EQUITIES_CSV_PATH, "tpex") +read_csv(TWSE_EQUITIES_CSV_PATH, "twse") diff --git a/twstock/codes/fetch.py b/twstock/codes/fetch.py index 892acf5..e6b3332 100644 --- a/twstock/codes/fetch.py +++ b/twstock/codes/fetch.py @@ -15,29 +15,30 @@ from twstock.proxy import get_proxies -TWSE_EQUITIES_URL = 'http://isin.twse.com.tw/isin/C_public.jsp?strMode=2' -TPEX_EQUITIES_URL = 'http://isin.twse.com.tw/isin/C_public.jsp?strMode=4' -ROW = namedtuple('Row', ['type', 'code', 'name', 'ISIN', 'start', - 'market', 'group', 'CFI']) +TWSE_EQUITIES_URL = "http://isin.twse.com.tw/isin/C_public.jsp?strMode=2" +TPEX_EQUITIES_URL = "http://isin.twse.com.tw/isin/C_public.jsp?strMode=4" +ROW = namedtuple( + "Row", ["type", "code", "name", "ISIN", "start", "market", "group", "CFI"] +) def make_row_tuple(typ, row): - code, name = row[1].split('\u3000') - return ROW(typ, code, name, *row[2: -1]) + code, name = row[1].split("\u3000") + return ROW(typ, code, name, *row[2:-1]) def fetch_data(url): r = requests.get(url, proxies=get_proxies()) root = etree.HTML(r.text) - trs = root.xpath('//tr')[1:] + trs = root.xpath("//tr")[1:] result = [] - typ = '' + typ = "" for tr in trs: tr = list(map(lambda x: x.text, tr.iter())) if len(tr) == 4: # This is type - typ = tr[2].strip(' ') + typ = tr[2].strip(" ") else: # This is the row data result.append(make_row_tuple(typ, tr)) @@ -46,9 +47,10 @@ def fetch_data(url): def to_csv(url, path): data = fetch_data(url) - with open(path, 'w', newline='', encoding='utf_8') as csvfile: - writer = csv.writer(csvfile, - delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + with open(path, "w", newline="", encoding="utf_8") as csvfile: + writer = csv.writer( + csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL + ) writer.writerow(data[0]._fields) for d in data: writer.writerow([_ for _ in d]) @@ -57,10 +59,11 @@ def to_csv(url, path): def __update_codes(): def get_directory(): return os.path.dirname(os.path.abspath(__file__)) - to_csv(TWSE_EQUITIES_URL, os.path.join(get_directory(), 'twse_equities.csv')) - to_csv(TPEX_EQUITIES_URL, os.path.join(get_directory(), 'tpex_equities.csv')) + to_csv(TWSE_EQUITIES_URL, os.path.join(get_directory(), "twse_equities.csv")) + to_csv(TPEX_EQUITIES_URL, os.path.join(get_directory(), "tpex_equities.csv")) -if __name__ == '__main__': - to_csv(TWSE_EQUITIES_URL, 'twse_equities.csv') - to_csv(TPEX_EQUITIES_URL, 'tpex_equities.csv') + +if __name__ == "__main__": + to_csv(TWSE_EQUITIES_URL, "twse_equities.csv") + to_csv(TPEX_EQUITIES_URL, "tpex_equities.csv") diff --git a/twstock/legacy.py b/twstock/legacy.py index 864c630..dec27dc 100644 --- a/twstock/legacy.py +++ b/twstock/legacy.py @@ -5,10 +5,10 @@ class LegacyAnalytics(object): """Legacy analytics from toomore/grs""" def cal_continue(self, list_data): - """ 計算持續天數 + """計算持續天數 - :rtype: int - :returns: 向量數值:正數向上、負數向下。 + :rtype: int + :returns: 向量數值:正數向上、負數向下。 """ diff_data = [] for i in range(1, len(list_data)): @@ -25,9 +25,9 @@ def cal_continue(self, list_data): return cont * diff_data[0] def moving_average(self, data, days): - """ 計算移動平均數 + """計算移動平均數 - :rtype: 序列 舊→新 + :rtype: 序列 舊→新 """ result = [] data = data[:] @@ -38,12 +38,12 @@ def moving_average(self, data, days): return result def ma_bias_ratio(self, date1, date2, data): - """ 計算乖離率(均價) - date1 - date2 + """計算乖離率(均價) + date1 - date2 - :param int data1: n 日 - :param int data2: m 日 - :rtype: 序列 舊→新 + :param int data1: n 日 + :param int data2: m 日 + :rtype: 序列 舊→新 """ data1 = self.moving_average(data, date1) data2 = self.moving_average(data, date2) @@ -53,15 +53,14 @@ def ma_bias_ratio(self, date1, date2, data): cal_list.reverse() return cal_list - def ma_bias_ratio_point(cls, data, sample=5, - positive_or_negative=False): + def ma_bias_ratio_point(cls, data, sample=5, positive_or_negative=False): """判斷轉折點位置 - :param list data: 計算資料 - :param int sample: 計算的區間樣本數量 - :param bool positive_or_negative: 正乖離 為 True,負乖離 為 False - :rtype: tuple - :returns: (True or False, 第幾個轉折日, 轉折點值) + :param list data: 計算資料 + :param int sample: 計算的區間樣本數量 + :param bool positive_or_negative: 正乖離 為 True,負乖離 為 False + :rtype: tuple + :returns: (True or False, 第幾個轉折日, 轉折點值) """ sample_data = data[-sample:] if positive_or_negative: # 正 @@ -70,98 +69,109 @@ def ma_bias_ratio_point(cls, data, sample=5, else: ckvalue = min(sample_data) # 尋找最小值 preckvalue = max(sample_data) < 0 # 區間最大值必須為負 - return (sample - sample_data.index(ckvalue) < 4 and \ - sample_data.index(ckvalue) != sample - 1 and preckvalue, - sample - sample_data.index(ckvalue) - 1, - ckvalue) + return ( + sample - sample_data.index(ckvalue) < 4 + and sample_data.index(ckvalue) != sample - 1 + and preckvalue, + sample - sample_data.index(ckvalue) - 1, + ckvalue, + ) class LegacyBestFourPoint(object): - """ 四大買點組合 + """四大買點組合 - :param grs.Stock data: 個股資料 + :param grs.Stock data: 個股資料 """ + def __init__(self, data): self.data = data def bias_ratio(self, position=False): - """ 判斷乖離 + """判斷乖離 - :param bool positive_or_negative: 正乖離 為 True,負乖離 為 False + :param bool positive_or_negative: 正乖離 為 True,負乖離 為 False """ return self.data.ma_bias_ratio_pivot( - self.data.ma_bias_ratio(3, 6), - position=position) + self.data.ma_bias_ratio(3, 6), position=position + ) def check_plus_bias_ratio(self): - """ 正乖離扣至最大 """ + """正乖離扣至最大""" return self.bias_ratio(True) def check_mins_bias_ratio(self): - """ 負乖離扣至最大 """ + """負乖離扣至最大""" return self.bias_ratio() ##### 四大買點 ##### def best_buy_1(self): - """量大收紅 - """ - result = self.data.capacity[-1] > self.data.capacity[-2] and \ - self.data.price[-1] > self.data.open[-1] + """量大收紅""" + result = ( + self.data.capacity[-1] > self.data.capacity[-2] + and self.data.price[-1] > self.data.open[-1] + ) return result def best_buy_2(self): - """量縮價不跌 - """ - result = self.data.capacity[-1] < self.data.capacity[-2] and \ - self.data.price[-1] > self.data.price[-2] + """量縮價不跌""" + result = ( + self.data.capacity[-1] < self.data.capacity[-2] + and self.data.price[-1] > self.data.price[-2] + ) return result def best_buy_3(self): - """三日均價由下往上 - """ + """三日均價由下往上""" return self.data.continuous(self.data.moving_average(self.data.price, 3)) == 1 def best_buy_4(self): - """三日均價大於六日均價 - """ - return self.data.moving_average(self.data.price, 3)[-1] > \ - self.data.moving_average(self.data.price, 6)[-1] + """三日均價大於六日均價""" + return ( + self.data.moving_average(self.data.price, 3)[-1] + > self.data.moving_average(self.data.price, 6)[-1] + ) ##### 四大賣點 ##### def best_sell_1(self): - """量大收黑 - """ - result = self.data.capacity[-1] > self.data.capacity[-2] and \ - self.data.price[-1] < self.data.open[-1] + """量大收黑""" + result = ( + self.data.capacity[-1] > self.data.capacity[-2] + and self.data.price[-1] < self.data.open[-1] + ) return result def best_sell_2(self): - """量縮價跌 - """ - result = self.data.capacity[-1] < self.data.capacity[-2] and \ - self.data.price[-1] < self.data.price[-2] + """量縮價跌""" + result = ( + self.data.capacity[-1] < self.data.capacity[-2] + and self.data.price[-1] < self.data.price[-2] + ) return result def best_sell_3(self): - """三日均價由上往下 - """ + """三日均價由上往下""" return self.data.continuous(self.data.moving_average(self.data.price, 3)) == -1 def best_sell_4(self): - """三日均價小於六日均價 - """ - return self.data.moving_average(self.data.price, 3)[-1] < \ - self.data.moving_average(self.data.price, 6)[-1] + """三日均價小於六日均價""" + return ( + self.data.moving_average(self.data.price, 3)[-1] + < self.data.moving_average(self.data.price, 6)[-1] + ) def best_four_point_to_buy(self): - """ 判斷是否為四大買點 + """判斷是否為四大買點 - :rtype: str or False + :rtype: str or False """ result = [] - if self.check_mins_bias_ratio() and \ - (self.best_buy_1() or self.best_buy_2() or self.best_buy_3() or \ - self.best_buy_4()): + if self.check_mins_bias_ratio() and ( + self.best_buy_1() + or self.best_buy_2() + or self.best_buy_3() + or self.best_buy_4() + ): if self.best_buy_1(): result.append(self.best_buy_1.__doc__.strip()) if self.best_buy_2(): @@ -170,20 +180,23 @@ def best_four_point_to_buy(self): result.append(self.best_buy_3.__doc__.strip()) if self.best_buy_4(): result.append(self.best_buy_4.__doc__.strip()) - result = ', '.join(result) + result = ", ".join(result) else: result = False return result def best_four_point_to_sell(self): - """ 判斷是否為四大賣點 + """判斷是否為四大賣點 - :rtype: str or False + :rtype: str or False """ result = [] - if self.check_plus_bias_ratio() and \ - (self.best_sell_1() or self.best_sell_2() or self.best_sell_3() or \ - self.best_sell_4()): + if self.check_plus_bias_ratio() and ( + self.best_sell_1() + or self.best_sell_2() + or self.best_sell_3() + or self.best_sell_4() + ): if self.best_sell_1(): result.append(self.best_sell_1.__doc__.strip()) if self.best_sell_2(): @@ -192,16 +205,16 @@ def best_four_point_to_sell(self): result.append(self.best_sell_3.__doc__.strip()) if self.best_sell_4(): result.append(self.best_sell_4.__doc__.strip()) - result = ', '.join(result) + result = ", ".join(result) else: result = False return result def best_four_point(self): - """ 判斷買點或賣點 + """判斷買點或賣點 - :rtype: tuple - :returns: (bool, str) + :rtype: tuple + :returns: (bool, str) """ buy = self.best_four_point_to_buy() sell = self.best_four_point_to_sell() diff --git a/twstock/mock/__init__.py b/twstock/mock/__init__.py index 65764ac..87dbce1 100644 --- a/twstock/mock/__init__.py +++ b/twstock/mock/__init__.py @@ -3,7 +3,8 @@ import json -TSE_2330_TW = [""" +TSE_2330_TW = [ + """ {"msgArray": [{"ts": "0", "tk0": "2330.tw_tse_20170724_B_9999778918", "tk1": "2330.tw_tse_20170724_B_9999777950", "tlong": "1500860849000", "f": "853_1193_972_1209_817_", "ex": "tse", "g": "1221_1530_817_1038_1193_", "d": @@ -19,7 +20,8 @@ -1, "sysDate": "20170724", "sessionKey": "tse_2330.tw_20170724|", "sessionFromTime": -1, "stockInfoItem": 2065, "showChart": false, "sessionStr": "UserSession", "stockInfo": 204322}, "rtcode": "0000"} -""", """ +""", + """ {"msgArray": [{"ts": "0", "tk0": "2330.tw_tse_20170724_B_9999766224", "tk1": "2330.tw_tse_20170724_B_9999765954", "tlong": "1500861105000", "f": "1059_1079_1014_1229_907_", "ex": "tse", "g": "1455_1598_797_1019_1134_", "d": @@ -35,7 +37,8 @@ -1, "sysDate": "20170724", "sessionKey": "tse_2330.tw_20170724|", "sessionFromTime": -1, "stockInfoItem": 2055, "showChart": false, "sessionStr": "UserSession", "stockInfo": 130895}, "rtcode": "0000"} -""", """ +""", + """ {"msgArray": [{"ts": "0", "tk0": "2330.tw_tse_20170724_B_9999760446", "tk1": "2330.tw_tse_20170724_B_9999759382", "tlong": "1500861243000", "f": "1034_1028_1009_1253_933_", "ex": "tse", "g": "1466_1625_798_987_1117_", "d": @@ -51,11 +54,12 @@ -1, "sysDate": "20170724", "sessionKey": "tse_2330.tw_20170724|", "sessionFromTime": -1, "stockInfoItem": 1602, "showChart": false, "sessionStr": "UserSession", "stockInfo": 119518}, "rtcode": "0000"} -"""] +""", +] stock_list = { - '2330': TSE_2330_TW, + "2330": TSE_2330_TW, } @@ -64,13 +68,13 @@ def get_stock_info(stock_id, index=0): def get_stocks_info(stocks): - data = json.loads(stock_list['2330'][0]) + data = json.loads(stock_list["2330"][0]) for _ in range(len(stocks)): - data['msgArray'].append(data['msgArray'][0]) + data["msgArray"].append(data["msgArray"][0]) return data + def get(stocks): if isinstance(stocks, list): return get_stocks_info(stocks) return get_stock_info(stocks) - diff --git a/twstock/proxy.py b/twstock/proxy.py index 3f7ad00..604d174 100644 --- a/twstock/proxy.py +++ b/twstock/proxy.py @@ -34,10 +34,10 @@ def proxies(self): @proxies.setter def proxies(self, proxies: list): if not isinstance(proxies, list): - raise ValueError('Proxies only accept list') + raise ValueError("Proxies only accept list") self._proxies = proxies - self._proxies_cycle = cycle(proxies) + self._proxies_cycle = cycle(proxies) def get_proxy(self): return next(self._proxies_cycle) diff --git a/twstock/realtime.py b/twstock/realtime.py index e096d37..9aa9655 100644 --- a/twstock/realtime.py +++ b/twstock/realtime.py @@ -9,61 +9,65 @@ from twstock.proxy import get_proxies -SESSION_URL = 'http://mis.twse.com.tw/stock/index.jsp' -STOCKINFO_URL = 'http://mis.twse.com.tw/stock/api/getStockInfo.jsp?ex_ch={stock_id}&_={time}' +SESSION_URL = "http://mis.twse.com.tw/stock/index.jsp" +STOCKINFO_URL = ( + "http://mis.twse.com.tw/stock/api/getStockInfo.jsp?ex_ch={stock_id}&_={time}" +) # Mock data mock = False def _format_stock_info(data) -> dict: - result = { - 'timestamp': 0.0, - 'info': {}, - 'realtime': {} - } + result = {"timestamp": 0.0, "info": {}, "realtime": {}} # Timestamp - result['timestamp'] = int(data['tlong']) / 1000 + result["timestamp"] = int(data["tlong"]) / 1000 # Information - result['info']['code'] = data['c'] - result['info']['channel'] = data['ch'] - result['info']['name'] = data['n'] - result['info']['fullname'] = data['nf'] - result['info']['time'] = datetime.datetime.fromtimestamp( - int(data['tlong']) / 1000).strftime('%Y-%m-%d %H:%M:%S') + result["info"]["code"] = data["c"] + result["info"]["channel"] = data["ch"] + result["info"]["name"] = data["n"] + result["info"]["fullname"] = data["nf"] + result["info"]["time"] = datetime.datetime.fromtimestamp( + int(data["tlong"]) / 1000 + ).strftime("%Y-%m-%d %H:%M:%S") # Process best result def _split_best(d): if d: - return d.strip('_').split('_') + return d.strip("_").split("_") return d # Realtime information - result['realtime']['latest_trade_price'] = data.get('z', None) - result['realtime']['trade_volume'] = data.get('tv', None) - result['realtime']['accumulate_trade_volume'] = data.get('v', None) - result['realtime']['best_bid_price'] = _split_best(data.get('b', None)) - result['realtime']['best_bid_volume'] = _split_best(data.get('g', None)) - result['realtime']['best_ask_price'] = _split_best(data.get('a', None)) - result['realtime']['best_ask_volume'] = _split_best(data.get('f', None)) - result['realtime']['open'] = data.get('o', None) - result['realtime']['high'] = data.get('h', None) - result['realtime']['low'] = data.get('l', None) + result["realtime"]["latest_trade_price"] = data.get("z", None) + result["realtime"]["trade_volume"] = data.get("tv", None) + result["realtime"]["accumulate_trade_volume"] = data.get("v", None) + result["realtime"]["best_bid_price"] = _split_best(data.get("b", None)) + result["realtime"]["best_bid_volume"] = _split_best(data.get("g", None)) + result["realtime"]["best_ask_price"] = _split_best(data.get("a", None)) + result["realtime"]["best_ask_volume"] = _split_best(data.get("f", None)) + result["realtime"]["open"] = data.get("o", None) + result["realtime"]["high"] = data.get("h", None) + result["realtime"]["low"] = data.get("l", None) # Success fetching - result['success'] = True + result["success"] = True return result def _join_stock_id(stocks) -> str: if isinstance(stocks, list): - return '|'.join(['{}_{}.tw'.format( - 'tse' if s in twstock.twse else 'otc', s) for s in stocks]) - return '{}_{stock_id}.tw'.format( - 'tse' if stocks in twstock.twse else 'otc', stock_id=stocks) + return "|".join( + [ + "{}_{}.tw".format("tse" if s in twstock.twse else "otc", s) + for s in stocks + ] + ) + return "{}_{stock_id}.tw".format( + "tse" if stocks in twstock.twse else "otc", stock_id=stocks + ) def get_raw(stocks) -> dict: @@ -72,19 +76,20 @@ def get_raw(stocks) -> dict: r = req.get( STOCKINFO_URL.format( - stock_id=_join_stock_id(stocks), - time=int(time.time()) * 1000)) + stock_id=_join_stock_id(stocks), time=int(time.time()) * 1000 + ) + ) if sys.version_info < (3, 5): try: return r.json() except ValueError: - return {'rtmessage': 'json decode error', 'rtcode': '5000'} + return {"rtmessage": "json decode error", "rtcode": "5000"} else: try: return r.json() except json.decoder.JSONDecodeError: - return {'rtmessage': 'json decode error', 'rtcode': '5000'} + return {"rtmessage": "json decode error", "rtcode": "5000"} def get(stocks, retry=3): @@ -92,31 +97,32 @@ def get(stocks, retry=3): data = get_raw(stocks) if not mock else twstock.mock.get(stocks) # Set success - data['success'] = False + data["success"] = False # JSONdecode error, could be too fast, retry - if data['rtcode'] == '5000': + if data["rtcode"] == "5000": # XXX: Stupit retry, you will dead here if retry: return get(stocks, retry - 1) return data # No msgArray, dead - if 'msgArray' not in data: + if "msgArray" not in data: return data # Check have data - if not len(data['msgArray']): - data['rtmessage'] = 'Empty Query.' - data['rtcode'] = '5001' + if not len(data["msgArray"]): + data["rtmessage"] = "Empty Query." + data["rtcode"] = "5001" return data # Return multiple stock data if isinstance(stocks, list): result = { - data['info']['code']: data for data in map(_format_stock_info, data['msgArray']) + data["info"]["code"]: data + for data in map(_format_stock_info, data["msgArray"]) } - result['success'] = True + result["success"] = True return result - return _format_stock_info(data['msgArray'][0]) + return _format_stock_info(data["msgArray"][0]) diff --git a/twstock/stock.py b/twstock/stock.py index 7c926c2..d742c34 100644 --- a/twstock/stock.py +++ b/twstock/stock.py @@ -17,17 +17,29 @@ from . import analytics from .codes import codes except ImportError as e: - if e.name == 'lxml': + if e.name == "lxml": # Fix #69 raise e import analytics from codes import codes -TWSE_BASE_URL = 'http://www.twse.com.tw/' -TPEX_BASE_URL = 'http://www.tpex.org.tw/' -DATATUPLE = namedtuple('Data', ['date', 'capacity', 'turnover', 'open', - 'high', 'low', 'close', 'change', 'transaction']) +TWSE_BASE_URL = "http://www.twse.com.tw/" +TPEX_BASE_URL = "http://www.tpex.org.tw/" +DATATUPLE = namedtuple( + "Data", + [ + "date", + "capacity", + "turnover", + "open", + "high", + "low", + "close", + "change", + "transaction", + ], +) class BaseFetcher(object): @@ -36,7 +48,7 @@ def fetch(self, year, month, sid, retry): def _convert_date(self, date): """Convert '106/05/01' to '2017/05/01'""" - return '/'.join([str(int(date.split('/')[0]) + 1911)] + date.split('/')[1:]) + return "/".join([str(int(date.split("/")[0]) + 1911)] + date.split("/")[1:]) def _make_datatuple(self, data): pass @@ -46,17 +58,15 @@ def purify(self, original_data): class TWSEFetcher(BaseFetcher): - REPORT_URL = urllib.parse.urljoin( - TWSE_BASE_URL, 'exchangeReport/STOCK_DAY') + REPORT_URL = urllib.parse.urljoin(TWSE_BASE_URL, "exchangeReport/STOCK_DAY") def __init__(self): pass - def fetch(self, year: int, month: int, sid: str, retry: int=5): - params = {'date': '%d%02d01' % (year, month), 'stockNo': sid} + def fetch(self, year: int, month: int, sid: str, retry: int = 5): + params = {"date": "%d%02d01" % (year, month), "stockNo": sid} for retry_i in range(retry): - r = requests.get(self.REPORT_URL, params=params, - proxies=get_proxies()) + r = requests.get(self.REPORT_URL, params=params, proxies=get_proxies()) try: data = r.json() except JSONDecodeError: @@ -65,45 +75,45 @@ def fetch(self, year: int, month: int, sid: str, retry: int=5): break else: # Fail in all retries - data = {'stat': '', 'data': []} + data = {"stat": "", "data": []} - if data['stat'] == 'OK': - data['data'] = self.purify(data) + if data["stat"] == "OK": + data["data"] = self.purify(data) else: - data['data'] = [] + data["data"] = [] return data def _make_datatuple(self, data): - data[0] = datetime.datetime.strptime( - self._convert_date(data[0]), '%Y/%m/%d') - data[1] = int(data[1].replace(',', '')) - data[2] = int(data[2].replace(',', '')) - data[3] = None if data[3] == '--' else float(data[3].replace(',', '')) - data[4] = None if data[4] == '--' else float(data[4].replace(',', '')) - data[5] = None if data[5] == '--' else float(data[5].replace(',', '')) - data[6] = None if data[6] == '--' else float(data[6].replace(',', '')) + data[0] = datetime.datetime.strptime(self._convert_date(data[0]), "%Y/%m/%d") + data[1] = int(data[1].replace(",", "")) + data[2] = int(data[2].replace(",", "")) + data[3] = None if data[3] == "--" else float(data[3].replace(",", "")) + data[4] = None if data[4] == "--" else float(data[4].replace(",", "")) + data[5] = None if data[5] == "--" else float(data[5].replace(",", "")) + data[6] = None if data[6] == "--" else float(data[6].replace(",", "")) # +/-/X表示漲/跌/不比價 - data[7] = float(0.0 if data[7].replace(',', '') == - 'X0.00' else data[7].replace(',', '')) - data[8] = int(data[8].replace(',', '')) + data[7] = float( + 0.0 if data[7].replace(",", "") == "X0.00" else data[7].replace(",", "") + ) + data[8] = int(data[8].replace(",", "")) return DATATUPLE(*data) def purify(self, original_data): - return [self._make_datatuple(d) for d in original_data['data']] + return [self._make_datatuple(d) for d in original_data["data"]] class TPEXFetcher(BaseFetcher): - REPORT_URL = urllib.parse.urljoin(TPEX_BASE_URL, - 'web/stock/aftertrading/daily_trading_info/st43_result.php') + REPORT_URL = urllib.parse.urljoin( + TPEX_BASE_URL, "web/stock/aftertrading/daily_trading_info/st43_result.php" + ) def __init__(self): pass - def fetch(self, year: int, month: int, sid: str, retry: int=5): - params = {'d': '%d/%d' % (year - 1911, month), 'stkno': sid} + def fetch(self, year: int, month: int, sid: str, retry: int = 5): + params = {"d": "%d/%d" % (year - 1911, month), "stkno": sid} for retry_i in range(retry): - r = requests.get(self.REPORT_URL, params=params, - proxies=get_proxies()) + r = requests.get(self.REPORT_URL, params=params, proxies=get_proxies()) try: data = r.json() except JSONDecodeError: @@ -112,40 +122,39 @@ def fetch(self, year: int, month: int, sid: str, retry: int=5): break else: # Fail in all retries - data = {'aaData': []} + data = {"aaData": []} - data['data'] = [] - if data['aaData']: - data['data'] = self.purify(data) + data["data"] = [] + if data["aaData"]: + data["data"] = self.purify(data) return data def _convert_date(self, date): """Convert '106/05/01' to '2017/05/01'""" - return '/'.join([str(int(date.split('/')[0]) + 1911)] + date.split('/')[1:]) + return "/".join([str(int(date.split("/")[0]) + 1911)] + date.split("/")[1:]) def _make_datatuple(self, data): - data[0] = datetime.datetime.strptime(self._convert_date(data[0].replace('*', '')), - '%Y/%m/%d') - data[1] = int(data[1].replace(',', '')) * 1000 - data[2] = int(data[2].replace(',', '')) * 1000 - data[3] = None if data[3] == '--' else float(data[3].replace(',', '')) - data[4] = None if data[4] == '--' else float(data[4].replace(',', '')) - data[5] = None if data[5] == '--' else float(data[5].replace(',', '')) - data[6] = None if data[6] == '--' else float(data[6].replace(',', '')) - data[7] = float(data[7].replace(',', '')) - data[8] = int(data[8].replace(',', '')) + data[0] = datetime.datetime.strptime( + self._convert_date(data[0].replace("*", "")), "%Y/%m/%d" + ) + data[1] = int(data[1].replace(",", "")) * 1000 + data[2] = int(data[2].replace(",", "")) * 1000 + data[3] = None if data[3] == "--" else float(data[3].replace(",", "")) + data[4] = None if data[4] == "--" else float(data[4].replace(",", "")) + data[5] = None if data[5] == "--" else float(data[5].replace(",", "")) + data[6] = None if data[6] == "--" else float(data[6].replace(",", "")) + data[7] = float(data[7].replace(",", "")) + data[8] = int(data[8].replace(",", "")) return DATATUPLE(*data) def purify(self, original_data): - return [self._make_datatuple(d) for d in original_data['aaData']] + return [self._make_datatuple(d) for d in original_data["aaData"]] class Stock(analytics.Analytics): - - def __init__(self, sid: str, initial_fetch: bool=True): + def __init__(self, sid: str, initial_fetch: bool = True): self.sid = sid - self.fetcher = TWSEFetcher( - ) if codes[sid].market == '上市' else TPEXFetcher() + self.fetcher = TWSEFetcher() if codes[sid].market == "上市" else TPEXFetcher() self.raw_data = [] self.data = [] @@ -163,7 +172,7 @@ def _month_year_iter(self, start_month, start_year, end_month, end_year): def fetch(self, year: int, month: int): """Fetch year month data""" self.raw_data = [self.fetcher.fetch(year, month, self.sid)] - self.data = self.raw_data[0]['data'] + self.data = self.raw_data[0]["data"] return self.data def fetch_from(self, year: int, month: int): @@ -173,7 +182,7 @@ def fetch_from(self, year: int, month: int): today = datetime.datetime.today() for year, month in self._month_year_iter(month, year, today.month, today.year): self.raw_data.append(self.fetcher.fetch(year, month, self.sid)) - self.data.extend(self.raw_data[-1]['data']) + self.data.extend(self.raw_data[-1]["data"]) return self.data def fetch_31(self):