import asyncio
import websockets
import json
import hmac
from datetime import *
import time
class Main():
# Private Websocket에 접속하기 위한 인증준비
def send_auth(self):
api_key = 'USER_API_KEY'
api_secret = 'USER_API_SECRET'
expires = int((time.time() + 10) * 1000)
_val = f'GET/realtime{expires}'
signature = str(hmac.new(
bytes(api_secret, 'utf-8'),
bytes(_val, 'utf-8'), digestmod='sha256'
).hexdigest())
ret = json.dumps({"op": "auth", "args": [api_key, expires, signature]})
return ret
# Private Websocket
async def bybit_private_ws(self):
async for ws_private in websockets.connect("wss://stream.bybit.com/v5/private?max_active_time=10m"): # "?max_active_time=10m" 추가
await ws_private.send(self.send_auth) # private websocket 접속 인증
await ws_private.send('{"op":"subscribe","args":["position","execution","order","wallet"]}')
try:
while True:
data_rcv_strjson = await ws_private.recv()
rawdata = json.loads(data_rcv_strjson)
print(rawdata)
# ConnectionClosed -> async for... 로 돌아가서 websocket에 접속
except websockets.ConnectionClosed:
print('Private websockets.ConnectionClosed')
continue
# Public Websocket
async def bybit_public_ws(self):
async for ws_public in websockets.connect("wss://stream.bybit.com/v5/public/linear"):
await ws_public.send('{"op": "subscribe", "args": ["publicTrade.BTCUSDT"]}')
send_ping_time = datetime.now()
send_ping_process = False
try:
while True:
data_rcv_strjson = await ws_public.recv()
rawdata = json.loads(data_rcv_strjson)
print(rawdata)
if 'ret_msg' in rawdata:
# ping 전달 이후 ret_msg가 pong이 오면 send_ping_time을 현재 시각으로 업데이트 후 대기
if rawdata['ret_msg'] == 'pong':
send_ping_process = False
send_ping_time = datetime.now()
# send_ping_time이 20초가 경과한 경우 ping 전달
if send_ping_time + timedelta(seconds=20) < datetime.now() and send_ping_process is False:
send_ping_process = True
await ws_public.send('{"req_id": "public_ping", "op": "ping"}')
# ConnectionClosed -> async for... 로 돌아가서 websocket에 접속
except websockets.ConnectionClosed:
print('Public websockets.ConnectionClosed')
continue
# asyncio loop 시작
async def start_websocket(self):
await asyncio.gather(self.bybit_public_ws(), self.bybit_private_ws())
if __name__ == "__main__":
main = Main()
asyncio.run(main.start_websocket())
예전에 쓰던 코드가 작동하지 않아서 Bybit 홈페이지에 가보니 V5가 나온 것 같아서 이 버전으로 만들어 봤다. Private과 Public 둘 다 연결하는 것까지는 성공했는데 'keepalive ping timeout; no close frame received' 오류 때문에 고생을 좀 했다.
대충 알아보니 웹소켓 연결은 내가 아무것도 하지 않고 일정 시간 동안 가만히 데이터를 받기만 하면 접속이 끊기게 되어있는 모양이다. 잘은 모르겠지만 이건 API마다 그 시간과 주기가 차이가 있는 것 같고 아무것도 하지 않아도 접속이 계속 유지되는 경우도 꽤나 있었지만 이상하게 Bybit의 V5 Websocket은 10분이 지나고 나니 칼같이 연결이 끊어져버렸다.
아무튼 해결책을 찾기 위해서 API Documentation을 찾아봤고 접속제한 시간을 늘리거나 heartbeat packet을 주기적으로 주고받으라는 설명이 있었다.
https://bybit-exchange.github.io/docs/v5/ws/connect
Connect | Bybit API Documentation
WebSocket public stream:
bybit-exchange.github.io
Public Websocket은 어찌어찌하여 20초마다 ping을 보내서 접속을 살려놓는 데 성공했는데 Private Websocket의 Loop 내에서 어떻게 주기적으로 ping을 보낼지 한참 고민을 했다. 분명히 무언가 방법이 있을 것 같은데 망할 asyncio loop 안에서 이걸 내 실력으로 구현해 내기가 쉽지 않았다. 그렇게 대략 반나절 가량 구글링을 해봤지만...
결론은 Private Websocket의 keep alive는 포기했다. 대신 접속이 끊겼을 경우 프로그램을 종료하지 않고 바로 다시 websocket 연결을 살리는 방법을 쓰기로 했다. websocket을 연결하는 부분을 async for와 try, except으로 감싸놓으면 해결된다.
https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html
Client (asyncio)
Opening a connection: Using a connection:
websockets.readthedocs.io
async for websocket in websockets.connect(...):
try:
...
except websockets.ConnectionClosed:
continue
Public의 경우 계속해서 시세가 들어오지만 Private은 주문 등 행위가 발생하지 않으면 크게 문제가 없기 때문에 최대치인 10분으로 설정해 놓은 max_active_time 이 지나고 순간적으로 발생하는 공백은 크게 문제가 없을 것 같다. 이 짧은 1초 미만의 시간 사이에 주문이 발생하는 것을 피하기 위해서는 websocket 접속이 진행 중이라는 변수를 설정하고 이 변수가 정상인 경우에만 주문을 진행하도록 해두었다.
'Python, API' 카테고리의 다른 글
[Python] 가까운 호가 단위의 가격으로 조정 (np.ceil, np.floor) (0) | 2024.06.27 |
---|---|
[Python][Bitstamp API] OHLCV 요청 (0) | 2024.06.07 |
[Python] os.walk - 하위폴더 내의 모든 파일 리스트 검색하기 (0) | 2024.05.03 |
[Python] 분봉데이터로 당일의 누적OHLCV 계산하기 (0) | 2023.06.22 |
[Python] Sharp Ratio / Sortino Ratio / Carmar Ratio 계산 (0) | 2023.06.19 |