From 9ccc0cd2b57eae28dca3e3b7ffe6513e670d12d5 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Sat, 11 Oct 2025 16:39:13 +1100 Subject: [PATCH] feat: add retry to synchan --- vlchan/synchan.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/vlchan/synchan.py b/vlchan/synchan.py index 513cea8..e418f64 100644 --- a/vlchan/synchan.py +++ b/vlchan/synchan.py @@ -16,6 +16,7 @@ from concurrent.futures import ThreadPoolExecutor @dataclass class SynchanState: """State information from synchan server.""" + playing: bool currentTime: float duration: float @@ -25,7 +26,7 @@ class SynchanState: class SynchanController: """Controller for synchan server communication.""" - + def __init__(self, synchan_url: str = "http://localhost:3000"): self.synchan_url = synchan_url self.headers = {"Content-Type": "application/json"} @@ -34,25 +35,17 @@ class SynchanController: """Seek to a specific time position.""" print(f"Seeking to {to}") r = requests.post( - f"{self.synchan_url}/trpc/admin.seek", - headers=self.headers, - data=str(to) + f"{self.synchan_url}/trpc/admin.seek", headers=self.headers, data=str(to) ) print(r.text) def play(self): """Start playback.""" - requests.post( - f"{self.synchan_url}/trpc/admin.play", - headers=self.headers - ) + requests.post(f"{self.synchan_url}/trpc/admin.play", headers=self.headers) def pause(self): """Pause playback.""" - requests.post( - f"{self.synchan_url}/trpc/admin.pause", - headers=self.headers - ) + requests.post(f"{self.synchan_url}/trpc/admin.pause", headers=self.headers) def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url: str): @@ -66,8 +59,6 @@ def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url: @sio.event def connect_error(data): print(f"Synchan connection failed: {data}") - observer.on_error(data) - observer.on_completed() @sio.event def control(data): @@ -87,11 +78,10 @@ def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url: @sio.event def disconnect(): print("Disconnected from synchan server") - observer.on_completed() print(f"Connecting to synchan at {synchan_url}") try: - sio.connect(synchan_url, wait_timeout=5) + sio.connect(synchan_url, wait_timeout=5, retry=True) sio.wait() except Exception as e: print(f"Failed to connect to synchan: {e}") @@ -99,15 +89,18 @@ def create_socket(observer: ObserverBase[SynchanState], scheduler, synchan_url: observer.on_completed() -def create_synchan(synchan_url: str = "http://localhost:3000") -> Observable[SynchanState]: +def create_synchan( + synchan_url: str = "http://localhost:3000", +) -> Observable[SynchanState]: """Create an observable stream of synchan state updates.""" # Create a thread pool for socket operations thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) - + return reactivex.create( lambda observer, scheduler: create_socket(observer, scheduler, synchan_url) ).pipe( + ops.catch(lambda err, src: reactivex.of()), # ignore the error ops.share(), ops.subscribe_on(thread_pool_scheduler) )