feat: add retry to synchan
This commit is contained in:
@@ -16,6 +16,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
@dataclass
|
||||
class SynchanState:
|
||||
"""State information from synchan server."""
|
||||
|
||||
playing: bool
|
||||
currentTime: float
|
||||
duration: float
|
||||
@@ -25,7 +26,7 @@ class SynchanState:
|
||||
|
||||
class SynchanController:
|
||||
"""Controller for synchan server communication."""
|
||||
|
||||
|
||||
def __init__(self, synchan_url: str = "http://localhost:3000"):
|
||||
self.synchan_url = synchan_url
|
||||
self.headers = {"Content-Type": "application/json"}
|
||||
@@ -34,25 +35,17 @@ class SynchanController:
|
||||
"""Seek to a specific time position."""
|
||||
print(f"Seeking to {to}")
|
||||
r = requests.post(
|
||||
f"{self.synchan_url}/trpc/admin.seek",
|
||||
headers=self.headers,
|
||||
data=str(to)
|
||||
f"{self.synchan_url}/trpc/admin.seek", headers=self.headers, data=str(to)
|
||||
)
|
||||
print(r.text)
|
||||
|
||||
def play(self):
|
||||
"""Start playback."""
|
||||
requests.post(
|
||||
f"{self.synchan_url}/trpc/admin.play",
|
||||
headers=self.headers
|
||||
)
|
||||
requests.post(f"{self.synchan_url}/trpc/admin.play", headers=self.headers)
|
||||
|
||||
def pause(self):
|
||||
"""Pause playback."""
|
||||
requests.post(
|
||||
f"{self.synchan_url}/trpc/admin.pause",
|
||||
headers=self.headers
|
||||
)
|
||||
requests.post(f"{self.synchan_url}/trpc/admin.pause", headers=self.headers)
|
||||
|
||||
|
||||
def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url: str):
|
||||
@@ -66,8 +59,6 @@ def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url:
|
||||
@sio.event
|
||||
def connect_error(data):
|
||||
print(f"Synchan connection failed: {data}")
|
||||
observer.on_error(data)
|
||||
observer.on_completed()
|
||||
|
||||
@sio.event
|
||||
def control(data):
|
||||
@@ -87,11 +78,10 @@ def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url:
|
||||
@sio.event
|
||||
def disconnect():
|
||||
print("Disconnected from synchan server")
|
||||
observer.on_completed()
|
||||
|
||||
print(f"Connecting to synchan at {synchan_url}")
|
||||
try:
|
||||
sio.connect(synchan_url, wait_timeout=5)
|
||||
sio.connect(synchan_url, wait_timeout=5, retry=True)
|
||||
sio.wait()
|
||||
except Exception as e:
|
||||
print(f"Failed to connect to synchan: {e}")
|
||||
@@ -99,15 +89,18 @@ def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url:
|
||||
observer.on_completed()
|
||||
|
||||
|
||||
def create_synchan(synchan_url: str = "http://localhost:3000") -> Observable[SynchanState]:
|
||||
def create_synchan(
|
||||
synchan_url: str = "http://localhost:3000",
|
||||
) -> Observable[SynchanState]:
|
||||
"""Create an observable stream of synchan state updates."""
|
||||
# Create a thread pool for socket operations
|
||||
thread_count = multiprocessing.cpu_count()
|
||||
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
|
||||
|
||||
|
||||
return reactivex.create(
|
||||
lambda observer, scheduler: create_socket(observer, scheduler, synchan_url)
|
||||
).pipe(
|
||||
ops.catch(lambda err, src: reactivex.of()), # ignore the error
|
||||
ops.share(),
|
||||
ops.subscribe_on(thread_pool_scheduler)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user