{"id":4214,"date":"2023-05-08T11:46:59","date_gmt":"2023-05-08T18:46:59","guid":{"rendered":"https:\/\/jeremywhittaker.com\/?p=4214"},"modified":"2023-05-08T11:47:03","modified_gmt":"2023-05-08T18:47:03","slug":"republish-alpaca-trades-quotes-and-bars-using-redis","status":"publish","type":"post","link":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/","title":{"rendered":"Republish Alpaca trades, quotes, and bars using Redis"},"content":{"rendered":"\n<p>Like many brokers, you don&#8217;t get to make unlimited connections to the <a href=\"https:\/\/alpaca.markets\/\" target=\"_blank\" rel=\"noreferrer noopener\">Alpaca <\/a>API. What I&#8217;m going to set up today is <a href=\"https:\/\/redis.io\/docs\" target=\"_blank\" rel=\"noreferrer noopener\">Redis<\/a> to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca&#8217;s data. <\/p>\n\n\n\n<p>This program also shows you how many symbols are missing each minute, saves the data to dataframes and to your local disk for further analysis. <br><\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><code>main.py<\/code>: This is the entry point of the application, responsible for initializing the subscription and handling user input.<\/li>\n\n\n\n<li><code>redis_subscriber.py<\/code>: This file establishes a connection to Redis and sets up the subscription mechanism for Alpaca trades, quotes, or bars. It listens for incoming data and then broadcasts it to the appropriate channels for further processing.<\/li>\n\n\n\n<li><code>subscriptions.py<\/code>: This file contains functions for subscribing and unsubscribing to Alpaca trades, quotes, and bars. These functions ensure a smooth and error-free data streaming experience.<\/li>\n\n\n\n<li><code>message_processing.py<\/code>: This file processes incoming data from the Alpaca API and formats it for further analysis. It handles any errors that may occur during the process and ensures that the data is consistent and accurate.<\/li>\n\n\n\n<li><code>dataframes.py<\/code>: This file manages the dataframes used for storing and analyzing the received data. It organizes the data in an accessible format and performs any necessary data manipulation tasks.<\/li>\n\n\n\n<li><code>config.py<\/code>: This file manages the configuration, including API keys, directory settings, and the Alpaca REST API instance. Centralizing these configurations ensures easy maintenance and organization.<\/li>\n<\/ol>\n\n\n\n<h2 class=\"wp-block-heading\">Install Redis<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>sudo apt-get update\n\nsudo apt-get install redis-server\n\nsudo systemctl enable redis-server\n\nsudo systemctl status redis<\/code><\/pre>\n\n\n\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png\"><img decoding=\"async\" width=\"935\" height=\"258\" data-src=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png\" alt=\"\" class=\"wp-image-4215 lazyload\" data-srcset=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png 935w, https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7-300x83.png 300w, https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7-768x212.png 768w, https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7-500x138.png 500w\" data-sizes=\"(max-width: 935px) 100vw, 935px\" src=\"data:image\/svg+xml;base64,PHN2ZyB3aWR0aD0iMSIgaGVpZ2h0PSIxIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciPjwvc3ZnPg==\" style=\"--smush-placeholder-width: 935px; --smush-placeholder-aspect-ratio: 935\/258;\" \/><\/a><\/figure>\n\n\n\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-9.png\"><img decoding=\"async\" width=\"615\" height=\"38\" data-src=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-9.png\" alt=\"\" class=\"wp-image-4217 lazyload\" data-srcset=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-9.png 615w, https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-9-300x19.png 300w, https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-9-500x31.png 500w\" data-sizes=\"(max-width: 615px) 100vw, 615px\" src=\"data:image\/svg+xml;base64,PHN2ZyB3aWR0aD0iMSIgaGVpZ2h0PSIxIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciPjwvc3ZnPg==\" style=\"--smush-placeholder-width: 615px; --smush-placeholder-aspect-ratio: 615\/38;\" \/><\/a><\/figure>\n\n\n\n<p>If you successfully get a PONG reply after issuing this command Redis is now configured. You should only run Redis in this set up on a secure internal network. Further hardening is required if this network is accessible. <\/p>\n\n\n\n<h2 class=\"wp-block-heading\">main.py<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>import asyncio\nimport websockets\nimport json\nimport os\nimport traceback\nfrom colorama import Fore, Style, init\nimport redis\n\nfrom config import APCA_API_KEY_ID, APCA_API_SECRET_KEY, live_api\nfrom subscriptions import subscribe_to_trades, subscribe_to_quotes, subscribe_to_bars,\\\n                           unsubscribe_trade_updates, unsubscribe_quote_updates, unsubscribe_bar_updates\nfrom message_processing import process_message\nimport websockets.exceptions\n\n\nimport pandas as pd\nfrom datetime import datetime\n\nfrom dataframes import create_dataframes\n\nsymbols_to_trade = &#91;]\n\nasync def on_message(ws, message):\n    try:\n        messages = json.loads(message)\n        for msg in messages:\n            process_message(msg, trades_df, quotes_df, bars_df)\n            redis_client.publish('alpaca-messages', json.dumps(msg))\n    except Exception as e:\n        print(\"Error in on_message:\")\n        traceback.print_exc()\n\nasync def authenticate(ws):\n    auth_data = {\n        \"action\": \"auth\",\n        \"key\": APCA_API_KEY_ID,\n        \"secret\": APCA_API_SECRET_KEY\n    }\n    await ws.send(json.dumps(auth_data))\n\nasync def create_ws_connection(symbols, source='sip'):\n    base_url = f'wss:\/\/stream.data.alpaca.markets\/v2\/{source}'\n\n    async with websockets.connect(base_url, ping_timeout=60) as ws:  # Set ping_timeout to 60 seconds\n        await authenticate(ws)\n\n        # Subscribe to trades\n        print('Subscribing to trades')\n        await subscribe_to_trades(ws, symbols_to_trade)\n\n        # Subscribe to quotes\n        print('Subscribing to quotes')\n        await subscribe_to_quotes(ws, symbols_to_trade)\n\n        # Subscribe to bars\n        print('Subscribing to bars')\n        await subscribe_to_bars(ws, symbols_to_trade)\n\n        while True:\n            try:\n                message = await ws.recv()\n                await on_message(ws, message)\n            except websockets.exceptions.ConnectionClosedError as e:\n                print(f\"Connection closed: {e}, reconnecting...\")\n                await create_ws_connection(symbols_to_trade, source=source)\n                break\n            except Exception as e:\n                print(f\"Error: {e}\")\n\ndef get_assets(active=True, tradable=False, shortable=False, exclude_curencies=True):\n    global symbols_to_trade\n    assets = live_api.list_assets()\n    filtered_assets_dict = {}\n\n    for asset in assets:\n        if active and asset.status != 'active':\n            continue\n        if tradable and not asset.tradable:\n            continue\n        if shortable and not asset.shortable:\n            continue\n        if exclude_curencies and '\/' in asset.symbol:\n            continue\n        filtered_assets_dict&#91;asset.symbol] = asset.name\n\n    symbols_to_trade = list(filtered_assets_dict.keys())\n    print(f'Returning {len(symbols_to_trade)} assets')\n    return symbols_to_trade\n\nasync def run_stream(symbols_to_trade, source='sip'):\n    while True:\n        try:\n            await create_ws_connection(symbols_to_trade, source=source)\n        except websockets.exceptions.ConnectionClosedError as e:\n            print(f\"Connection closed: {e}, retrying in 1 seconds...\")\n            await asyncio.sleep(1)\n        except Exception as e:\n            print(f\"Error: {e}, retrying in 1 seconds...\")\n            await asyncio.sleep(1)\n\n\n\nif __name__ == \"__main__\":\n    symbols = get_assets(active=True, tradable=False, shortable=False, exclude_curencies=True)\n    trades_df, quotes_df, bars_df = create_dataframes(symbols)\n    redis_client = redis.Redis(host='localhost', port=6379, db=0)\n    asyncio.run(run_stream(symbols_to_trade))\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">redis_subscriber.py<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>import redis\n\ndef on_message(channel, message):\n    print(f\"Message received on channel '{channel}': {message}\")\n\ndef main():\n    redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)\n\n    # Replace 'alpaca-messages' with the desired channel name\n    channel_name = 'alpaca-messages'\n    pubsub = redis_client.pubsub()\n    pubsub.subscribe(channel_name)\n\n    print(f\"Subscribed to '{channel_name}' channel. Awaiting messages...\")\n\n    while True:\n        message = pubsub.get_message()\n        if message:\n            if message&#91;'type'] == 'message':\n                on_message(message&#91;'channel'], message&#91;'data'])\n\nif __name__ == \"__main__\":\n    main()\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">dataframes.py<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>import pandas as pd\nimport traceback\nfrom datetime import datetime\nfrom dateutil.parser import parse\nimport pytz\nimport os\nfrom config import minute_data_dir\nimport asyncio\n\n\ntzinfos = {'ZZ': pytz.UTC}\n\ndef append_to_csv(file_path, data):\n    with open(file_path, 'a') as f:\n        f.write(data + '\\n')\n\ndef check_missing_values():\n    while True:\n        time.sleep(60)  # Wait for 1 minute\n        for df_name, df in zip(&#91;\"trades_df\", \"quotes_df\", \"bars_df\"], &#91;trades_df, quotes_df, bars_df]):\n            row_missing_percentage = (df.isna().sum(axis=1) \/ df.shape&#91;1] * 100).round(2)\n            print(f\"{df_name} row missing values (%):\\n{row_missing_percentage}\\n\")\n\ndef create_dataframes(symbols):\n    global trades_df, quotes_df, bars_df\n    trades_columns = pd.MultiIndex.from_product(&#91;symbols, &#91;'price', 'size', 'timestamp']])\n    quotes_columns = pd.MultiIndex.from_product(&#91;symbols, &#91;'bid', 'ask']])\n    bars_columns = pd.MultiIndex.from_product(&#91;symbols, &#91;'open', 'high', 'low', 'close', 'volume']])\n\n    trades_df = pd.DataFrame(columns=trades_columns)\n    quotes_df = pd.DataFrame(columns=quotes_columns)\n    bars_df = pd.DataFrame(columns=bars_columns)\n    return trades_df, quotes_df, bars_df\n\n\n#\n# async def save_data(df, data_directory, file_name, file_format='pickle'):\n#     #CSV saving does not currently work as it slows down the process and the wss will die.\n#     if not os.path.exists(data_directory):\n#         os.makedirs(data_directory)\n#\n#     file_path = os.path.join(data_directory, f\"{file_name}.{file_format}\")\n#\n#     # Calculate missing data percentage for the row to be appended\n#     row_missing_percentage = (df.isna().sum(axis=1) \/ df.shape&#91;1] * 100).round(2)\n#     print(f\"{file_name} row missing values (%):\\n{row_missing_percentage}\\n\")\n#\n#     if not os.path.exists(file_path):\n#         if file_format == 'csv':\n#             with open(file_path, 'w') as f:\n#                 f.write(df.to_csv(header=True, index=True))\n#         elif file_format == 'pickle':\n#             df.to_pickle(file_path)\n#     else:\n#         if file_format == 'csv':\n#             existing_df = pd.read_csv(file_path, header=&#91;0, 1], index_col=0)\n#             combined_df = pd.concat(&#91;existing_df, df])\n#             # Ensure the correct column order in the combined DataFrame\n#             combined_df = combined_df.reorder_levels(&#91;1, 0], axis=1).sort_index(axis=1)\n#\n#             with open(file_path, 'w') as f:\n#                 f.write(combined_df.to_csv(header=True, index=True))\n#         elif file_format == 'pickle':\n#             existing_df = pd.read_pickle(file_path)\n#             combined_df = pd.concat(&#91;existing_df, df])\n#             combined_df.to_pickle(file_path)\n\ndef update_trades_df(symbol, price, size, timestamp, df, minute_data_dir):\n    try:\n        timestamp_dt = parse(timestamp, tzinfos=tzinfos).replace(second=0, microsecond=0)\n        if timestamp_dt not in df.index:\n            # if len(df.index) &gt; 1:\n            #     asyncio.create_task(save_data(df, minute_data_dir, 'trades', 'pickle'))\n            df.loc&#91;timestamp_dt] = pd.Series(dtype='float64')\n\n        df.at&#91;timestamp_dt, (symbol, 'price')] = price\n\n        # Cumulatively add 1 size to the current size value\n        current_size = df.at&#91;timestamp_dt, (symbol, 'size')]\n        if pd.isna(current_size):\n            current_size = 0\n\n        df.at&#91;timestamp_dt, (symbol, 'size')] = current_size + size\n\n    except Exception as e:\n        print(\"Error:\", e)\n        traceback.print_exc()\n\ndef update_quotes_df(symbol, bid, ask, timestamp, df, minute_data_dir):\n    try:\n        timestamp_dt = parse(timestamp, tzinfos=tzinfos).replace(second=0, microsecond=0)\n        if timestamp_dt not in df.index:\n            # if len(df.index) &gt; 1:\n            #     asyncio.create_task(save_data(df, minute_data_dir, \"quotes\", 'pickle'))\n            df.loc&#91;timestamp_dt] = pd.Series(dtype='float64')\n\n        df.at&#91;timestamp_dt, (symbol, 'bid')] = bid\n        df.at&#91;timestamp_dt, (symbol, 'ask')] = ask\n    except Exception as e:\n        print(\"Error:\", e)\n        traceback.print_exc()\n\ndef update_bars_df(symbol, open_price, high_price, low_price, close_price, volume, timestamp, df, minute_data_dir):\n    try:\n        timestamp_dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')).replace(second=0, microsecond=0)\n        if timestamp_dt not in df.index:\n            # if len(df.index) &gt; 1:\n            #     asyncio.create_task(save_data(df, minute_data_dir, \"bars\", 'pickle'))\n            df.loc&#91;timestamp_dt] = pd.Series(dtype='float64')\n\n        df.at&#91;timestamp_dt, (symbol, 'open')] = open_price\n        df.at&#91;timestamp_dt, (symbol, 'high')] = high_price\n        df.at&#91;timestamp_dt, (symbol, 'low')] = low_price\n        df.at&#91;timestamp_dt, (symbol, 'close')] = close_price\n        df.at&#91;timestamp_dt, (symbol, 'volume')] = volume\n    except Exception as e:\n        print(\"Error:\", e)\n        traceback.print_exc()<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">message_processing.py<br><\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>from constants import exchange_codes, trade_conditions_cts, trade_conditions_utdf, cqs_quote_conditions, uqdf_quote_conditions\nfrom dataframes import update_trades_df, update_quotes_df, update_bars_df\nfrom colorama import Fore, Style\nimport traceback\nfrom config import minute_data_dir\n\n\ndef process_message(msg, trades_df, quotes_df, bars_df):\n    msg_type = msg&#91;'T']\n    try:\n        if msg_type == 't':  # Trade\n            symbol = msg&#91;'S']\n            trade_id = msg&#91;'i']\n            exchange_code = msg&#91;'x']\n            exchange_desc = exchange_codes.get(exchange_code, \"Unknown\")\n            trade_price = msg&#91;'p']\n            trade_size = msg&#91;'s']\n            trade_condition = msg&#91;'c']\n            conditions_desc = &#91;trade_conditions_cts.get(c, \"Unknown\") for c in trade_condition]\n            timestamp = msg&#91;'t']\n            tape = msg&#91;'z']\n            if exchange_code in &#91;\"A\", \"N\", \"P\"]:\n                plan = \"CTA\"\n                trade_conditions_desc = &#91;trade_conditions_cts.get(cond, \"Unknown\") for cond in trade_condition]\n            elif exchange_code in &#91;\"B\", \"Q\", \"S\", \"T\", \"X\"]:\n                plan = \"UTP\"\n                trade_conditions_desc = &#91;trade_conditions_utdf.get(cond, \"Unknown\") for cond in trade_condition]\n            elif exchange_code in &#91;\"C\", \"D\", \"E\", \"F\", \"G\", \"H\", \"I\", \"J\", \"K\", \"L\", \"M\", \"O\", \"R\", \"U\", \"V\", \"W\", \"Y\"]:\n                plan = \"Unknown\"\n                trade_conditions_desc = &#91;trade_conditions_cts.get(cond, \"Unknown\") for cond in trade_condition]\n            else:\n                plan = \"Unknown\"\n                trade_conditions_desc = &#91;trade_conditions_cts.get(cond, \"Unknown\") for cond in trade_condition]\n\n            # update_trades_df(symbol, trade_price, trade_size, timestamp, df=trades_df, minute_data_dir=minute_data_dir)\n\n            # print(\"Trade:\")\n            # print(f\"Symbol: {symbol}\")\n            # print(f\"Trade ID: {trade_id}\")\n            # print(f\"Exchange Code: {exchange_code} ({exchange_desc})\")\n            # print(f\"Trade Price: {trade_price}\")\n            # print(f\"Trade Size: {trade_size}\")\n            # print(f\"Trade Condition: {trade_condition} ({', '.join(conditions_desc)})\")\n            # print(f\"Timestamp: {timestamp}\")\n            # print(f\"Tape: {tape}\")\n            # print(\"-------\")\n\n        elif msg_type == 'q':  # Quote\n            symbol = msg&#91;'S']\n            ask_exchange_code = msg&#91;'ax']\n            ask_exchange_desc = exchange_codes.get(ask_exchange_code, \"Unknown\")\n            ask_price = msg&#91;'ap']\n            ask_size = msg&#91;'as']\n            bid_exchange_code = msg&#91;'bx']\n            bid_exchange_desc = exchange_codes.get(bid_exchange_code, \"Unknown\")\n            bid_price = msg&#91;'bp']\n            bid_size = msg&#91;'bs']\n            quote_condition = msg&#91;'c']\n            timestamp = msg&#91;'t']\n            tape = msg&#91;'z']\n\n            conditions = msg.get(\"c\", &#91;])\n            decoded_conditions = &#91;]\n            for condition_code in conditions:\n                if condition_code in cqs_quote_conditions:\n                    decoded_conditions.append(cqs_quote_conditions&#91;condition_code])\n                elif condition_code in uqdf_quote_conditions:\n                    decoded_conditions.append(uqdf_quote_conditions&#91;condition_code])\n                else:\n                    decoded_conditions.append(f\"Unknown Condition: {condition_code}\")\n\n            # update_quotes_df(symbol, bid_price, ask_price, timestamp, df=quotes_df, minute_data_dir=minute_data_dir)\n\n\n\n            # print(\"Quote:\")\n            # print(f\"Symbol: {symbol}\")\n            # print(f\"Ask Exchange Code: {ask_exchange_code} ({ask_exchange_desc})\")\n            # print(f\"Ask Price: {ask_price}\")\n            # print(f\"Ask Size: {ask_size}\")\n            # print(f\"Bid Exchange Code: {bid_exchange_code} ({bid_exchange_desc})\")\n            # print(f\"Bid Price: {bid_price}\")\n            # print(f\"Bid Size: {bid_size}\")\n            # print(f\"Quote Condition: {quote_condition}\")\n            # print(f\"Quote Condition: {decoded_conditions}\")\n            # print(f\"Timestamp: {timestamp}\")\n            # print(f\"Tape: {tape}\")\n            # print(\"-------\")\n\n        elif msg_type in &#91;'b', 'd', 'u']:  # Bar\n\n            symbol = msg&#91;'S']\n            open_price = msg&#91;'o']\n            high_price = msg&#91;'h']\n            low_price = msg&#91;'l']\n            close_price = msg&#91;'c']\n            volume = msg&#91;'v']\n            timestamp = msg&#91;'t']\n\n            # print(\"Bar:\")\n            # print(f\"Symbol: {symbol}\")\n            # print(f\"Open Price: {open_price}\")\n            # print(f\"High Price: {high_price}\")\n            # print(f\"Low Price: {low_price}\")\n            # print(f\"Close Price: {close_price}\")\n            # print(f\"Volume: {volume}\")\n            # print(f\"Timestamp: {timestamp}\")\n            # print(\"-------\")\n\n            # update_bars_df(symbol, open_price, high_price, low_price, close_price, volume, timestamp, df=bars_df, minute_data_dir=minute_data_dir)\n\n        elif msg_type == 'c':  # Trade Correction\n            symbol = msg&#91;'S']\n            exchange_code = msg&#91;'x']\n            original_trade_id = msg&#91;'oi']\n            original_trade_price = msg&#91;'op']\n            original_trade_size = msg&#91;'os']\n            original_trade_conditions = msg&#91;'oc']\n            corrected_trade_id = msg&#91;'ci']\n            corrected_trade_price = msg&#91;'cp']\n            corrected_trade_size = msg&#91;'cs']\n            corrected_trade_conditions = msg&#91;'cc']\n            timestamp = msg&#91;'t']\n            tape = msg&#91;'z']\n            # Handle the trade correction message here\n\n        elif msg_type == 'x':  # Trade Cancel\/Error\n            symbol = msg&#91;'S']\n            trade_id = msg&#91;'i']\n            exchange_code = msg&#91;'x']\n            trade_price = msg&#91;'p']\n            trade_size = msg&#91;'s']\n            action = msg&#91;'a']\n            timestamp = msg&#91;'t']\n            tape = msg&#91;'z']\n            # Handle the trade cancel\/error message here\n\n        elif msg_type == 'l':  # LULD\n            symbol = msg&#91;'S']\n            limit_up_price = msg&#91;'u']\n            limit_down_price = msg&#91;'d']\n            indicator = msg&#91;'i']\n            timestamp = msg&#91;'t']\n            tape = msg&#91;'z']\n            # Handle the LULD message here\n\n        elif msg_type == 'error':  # Error\n            code = msg&#91;'code']\n            error_msg = msg&#91;'msg']\n            print(Fore.RED + f\"Error Code: {code}\")\n            print(f\"Error Message: {error_msg}\")\n            print(Style.RESET_ALL + \"-------\")\n\n        elif msg&#91;'T'] == 'success':\n            error_msg = msg&#91;'msg']\n            print(Fore.GREEN + f\"Success Message: {error_msg}\")\n            print(Style.RESET_ALL + \"-------\")\n\n        elif msg_type == \"subscription\":\n            print(\"Subscription:\")\n            for item, symbols in msg.items():\n                if item != 'T' and symbols:\n                    num_symbols = len(symbols)\n                    print(f\"{item}: {num_symbols} symbols\")\n            print(\"-------\")\n\n        else:\n            print(Fore.RED + f\"Unknown message type: {msg_type}\" + Style.RESET_ALL)\n\n    except Exception as e:\n        print(Fore.RED + \"Error:\", e, Style.RESET_ALL)\n        traceback.print_exc()<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">subscriptions.py<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>import json\nimport traceback\nfrom colorama import Fore, Style, init\n\ninit(autoreset=True)\n\nasync def subscribe_to_trades(ws, symbols):\n    try:\n        sub_data = {\n            \"action\": \"subscribe\",\n            \"trades\": symbols\n        }\n        message = json.dumps(sub_data)\n        await ws.send(message)\n    except Exception as e:\n        print_error(\"Error in subscribe_to_trades:\")\n        traceback.print_exc()\n\nasync def subscribe_to_quotes(ws, symbols):\n    try:\n        sub_data = {\n            \"action\": \"subscribe\",\n            \"quotes\": symbols\n        }\n        message = json.dumps(sub_data)\n        await ws.send(message)\n    except Exception as e:\n        print_error(\"Error in subscribe_to_quotes:\")\n        traceback.print_exc()\n\nasync def subscribe_to_bars(ws, symbols):\n    try:\n        sub_data = {\n            \"action\": \"subscribe\",\n            \"bars\": symbols\n        }\n        message = json.dumps(sub_data)\n        await ws.send(message)\n    except Exception as e:\n        print_error(\"Error in subscribe_to_bars:\")\n        traceback.print_exc()\n\nasync def unsubscribe_trade_updates(ws, symbols):\n    try:\n        sub_data = {\n            \"action\": \"unsubscribe\",\n            \"trades\": symbols\n        }\n        await ws.send(json.dumps(sub_data))\n    except Exception as e:\n        print_error(\"Error in unsubscribe_trade_updates:\")\n        traceback.print_exc()\n\nasync def unsubscribe_quote_updates(ws, symbols):\n    try:\n        sub_data = {\n            \"action\": \"unsubscribe\",\n            \"quotes\": symbols\n        }\n        await ws.send(json.dumps(sub_data))\n    except Exception as e:\n        print_error(\"Error in unsubscribe_quote_updates:\")\n        traceback.print_exc()\n\nasync def unsubscribe_bar_updates(ws, symbols):\n    try:\n        sub_data = {\n            \"action\": \"unsubscribe\",\n            \"bars\": symbols\n        }\n        await ws.send(json.dumps(sub_data))\n    except Exception as e:\n        print_error(\"Error in unsubscribe_bar_updates:\")\n        traceback.print_exc()\n<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">config.py<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>import os\nimport sys\nimport alpaca_trade_api as tradeapi\n\nparent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))\ndata_dir = os.path.join(parent_dir, 'data\/')\nsys.path.append(data_dir)\nminute_data_dir = os.path.join(data_dir, 'minute_data\/')\n\nimport keys\n\nBASE_URL, API_KEY, SECRET_KEY = #define your own here\nLIVE_BASE_URL, LIVE_API_KEY, LIVE_SECRET_KEY =  #define your own here\n\nLIVE_BASE_URL, APCA_API_KEY_ID, APCA_API_SECRET_KEY =  #define your own here\n\n\nlive_api = tradeapi.REST(LIVE_API_KEY, LIVE_SECRET_KEY, LIVE_BASE_URL, api_version='v2')\n<\/code><\/pre>\n\n\n\n<p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Like many brokers, you don&#8217;t get to make unlimited connections to the Alpaca API. What I&#8217;m going to set up today is Redis to redistribute Alpaca trades, quotes,&#8230;<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-4214","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.2 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Republish Alpaca trades, quotes, and bars using Redis - Jeremy Whittaker<\/title>\n<meta name=\"robots\" content=\"noindex, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Republish Alpaca trades, quotes, and bars using Redis\" \/>\n<meta property=\"og:description\" content=\"Like many brokers, you don&#039;t get to make unlimited connections to the Alpaca API. What I&#039;m going to set up today is Redis to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca&#039;s data.\" \/>\n<meta property=\"og:url\" content=\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/\" \/>\n<meta property=\"og:site_name\" content=\"Jeremy Whittaker\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/WhittakerJeremy\" \/>\n<meta property=\"article:author\" content=\"https:\/\/www.facebook.com\/WhittakerJeremy\" \/>\n<meta property=\"article:published_time\" content=\"2023-05-08T18:46:59+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2023-05-08T18:47:03+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/redis.png\" \/>\n\t<meta property=\"og:image:width\" content=\"944\" \/>\n\t<meta property=\"og:image:height\" content=\"491\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"JeremyWhittaker\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:title\" content=\"Republish Alpaca trades, quotes, and bars using Redis\" \/>\n<meta name=\"twitter:description\" content=\"Like many brokers, you don&#039;t get to make unlimited connections to the Alpaca API. What I&#039;m going to set up today is Redis to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca&#039;s data.\" \/>\n<meta name=\"twitter:image\" content=\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/redis.png\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"JeremyWhittaker\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"9 minutes\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/\"},\"author\":{\"name\":\"JeremyWhittaker\",\"@id\":\"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c\"},\"headline\":\"Republish Alpaca trades, quotes, and bars using Redis\",\"datePublished\":\"2023-05-08T18:46:59+00:00\",\"dateModified\":\"2023-05-08T18:47:03+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/\"},\"wordCount\":299,\"commentCount\":0,\"publisher\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c\"},\"image\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png\",\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/\",\"url\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/\",\"name\":\"Republish Alpaca trades, quotes, and bars using Redis - Jeremy Whittaker\",\"isPartOf\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage\"},\"image\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png\",\"datePublished\":\"2023-05-08T18:46:59+00:00\",\"dateModified\":\"2023-05-08T18:47:03+00:00\",\"breadcrumb\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#breadcrumb\"},\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage\",\"url\":\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png\",\"contentUrl\":\"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png\"},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\/\/new.jeremywhittaker.com\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Republish Alpaca trades, quotes, and bars using Redis\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/new.jeremywhittaker.com\/#website\",\"url\":\"https:\/\/new.jeremywhittaker.com\/\",\"name\":\"Jeremy Whittaker\",\"description\":\"Research, software, markets, housing, and energy\",\"publisher\":{\"@id\":\"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c\"},\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/new.jeremywhittaker.com\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"en-US\"},{\"@type\":[\"Person\",\"Organization\"],\"@id\":\"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c\",\"name\":\"JeremyWhittaker\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g\",\"caption\":\"JeremyWhittaker\"},\"logo\":{\"@id\":\"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g\"},\"sameAs\":[\"http:\/\/www.jeremywhittaker.com\",\"https:\/\/www.facebook.com\/WhittakerJeremy\",\"https:\/\/www.linkedin.com\/in\/jeremywhittaker\/\"],\"url\":\"https:\/\/new.jeremywhittaker.com\/index.php\/author\/jeremywhittaker\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Republish Alpaca trades, quotes, and bars using Redis - Jeremy Whittaker","robots":{"index":"noindex","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"og_locale":"en_US","og_type":"article","og_title":"Republish Alpaca trades, quotes, and bars using Redis","og_description":"Like many brokers, you don't get to make unlimited connections to the Alpaca API. What I'm going to set up today is Redis to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca's data.","og_url":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/","og_site_name":"Jeremy Whittaker","article_publisher":"https:\/\/www.facebook.com\/WhittakerJeremy","article_author":"https:\/\/www.facebook.com\/WhittakerJeremy","article_published_time":"2023-05-08T18:46:59+00:00","article_modified_time":"2023-05-08T18:47:03+00:00","og_image":[{"width":944,"height":491,"url":"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/redis.png","type":"image\/png"}],"author":"JeremyWhittaker","twitter_card":"summary_large_image","twitter_title":"Republish Alpaca trades, quotes, and bars using Redis","twitter_description":"Like many brokers, you don't get to make unlimited connections to the Alpaca API. What I'm going to set up today is Redis to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca's data.","twitter_image":"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/redis.png","twitter_misc":{"Written by":"JeremyWhittaker","Est. reading time":"9 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#article","isPartOf":{"@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/"},"author":{"name":"JeremyWhittaker","@id":"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c"},"headline":"Republish Alpaca trades, quotes, and bars using Redis","datePublished":"2023-05-08T18:46:59+00:00","dateModified":"2023-05-08T18:47:03+00:00","mainEntityOfPage":{"@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/"},"wordCount":299,"commentCount":0,"publisher":{"@id":"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c"},"image":{"@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage"},"thumbnailUrl":"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png","inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/","url":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/","name":"Republish Alpaca trades, quotes, and bars using Redis - Jeremy Whittaker","isPartOf":{"@id":"https:\/\/new.jeremywhittaker.com\/#website"},"primaryImageOfPage":{"@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage"},"image":{"@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage"},"thumbnailUrl":"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png","datePublished":"2023-05-08T18:46:59+00:00","dateModified":"2023-05-08T18:47:03+00:00","breadcrumb":{"@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#primaryimage","url":"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png","contentUrl":"https:\/\/new.jeremywhittaker.com\/wp-content\/uploads\/2023\/05\/image-7.png"},{"@type":"BreadcrumbList","@id":"https:\/\/new.jeremywhittaker.com\/index.php\/2023\/05\/08\/republish-alpaca-trades-quotes-and-bars-using-redis\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/new.jeremywhittaker.com\/"},{"@type":"ListItem","position":2,"name":"Republish Alpaca trades, quotes, and bars using Redis"}]},{"@type":"WebSite","@id":"https:\/\/new.jeremywhittaker.com\/#website","url":"https:\/\/new.jeremywhittaker.com\/","name":"Jeremy Whittaker","description":"Research, software, markets, housing, and energy","publisher":{"@id":"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/new.jeremywhittaker.com\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":["Person","Organization"],"@id":"https:\/\/new.jeremywhittaker.com\/#\/schema\/person\/ed0edfdefb3e180693efef453372980c","name":"JeremyWhittaker","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g","url":"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g","caption":"JeremyWhittaker"},"logo":{"@id":"https:\/\/secure.gravatar.com\/avatar\/c8ac20e6dfa86b5f27ce9bffee4851099770cbea5ae7338a274865bfbc8c0218?s=96&d=retro&r=g"},"sameAs":["http:\/\/www.jeremywhittaker.com","https:\/\/www.facebook.com\/WhittakerJeremy","https:\/\/www.linkedin.com\/in\/jeremywhittaker\/"],"url":"https:\/\/new.jeremywhittaker.com\/index.php\/author\/jeremywhittaker\/"}]}},"_links":{"self":[{"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/posts\/4214","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/comments?post=4214"}],"version-history":[{"count":0,"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/posts\/4214\/revisions"}],"wp:attachment":[{"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/media?parent=4214"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/categories?post=4214"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/new.jeremywhittaker.com\/index.php\/wp-json\/wp\/v2\/tags?post=4214"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}