This commit is contained in:
@@ -1,23 +1,33 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import queue
|
||||
import threading
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
from telegram import Update
|
||||
from telegram.ext import (
|
||||
Application,
|
||||
CommandHandler,
|
||||
ContextTypes,
|
||||
MessageHandler,
|
||||
filters,
|
||||
ContextTypes,
|
||||
)
|
||||
|
||||
load_dotenv()
|
||||
|
||||
OPENCODE_API_URL = os.getenv("OPENCODE_API_URL", "http://127.0.0.1:5050")
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
OPENCODE_API_URL = os.getenv("OPENCODE_API_URL", "http://127.0.0.1:4096")
|
||||
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
||||
ALLOWED_CHAT_ID = os.getenv("TELEGRAM_ALLOWED_CHAT_ID")
|
||||
|
||||
SESSION_ID = None
|
||||
message_queue = queue.Queue()
|
||||
is_processing = False
|
||||
@@ -25,6 +35,12 @@ current_task = "Idle"
|
||||
processing_lock = threading.Lock()
|
||||
|
||||
|
||||
def is_authorized(update: Update) -> bool:
|
||||
"""Check if the update comes from the allowed chat."""
|
||||
chat_id = str(update.message.chat.id)
|
||||
return chat_id == ALLOWED_CHAT_ID
|
||||
|
||||
|
||||
def get_session():
|
||||
"""Get or create a session."""
|
||||
global SESSION_ID
|
||||
@@ -38,14 +54,14 @@ def get_session():
|
||||
SESSION_ID = sessions[0]["id"]
|
||||
return SESSION_ID
|
||||
except Exception:
|
||||
pass
|
||||
logger.exception("Failed to fetch existing sessions")
|
||||
try:
|
||||
r = requests.post(f"{OPENCODE_API_URL}/session", json={}, timeout=10)
|
||||
if r.ok:
|
||||
SESSION_ID = r.json()["id"]
|
||||
return SESSION_ID
|
||||
except Exception:
|
||||
pass
|
||||
logger.exception("Failed to create new session")
|
||||
return None
|
||||
|
||||
|
||||
@@ -53,7 +69,7 @@ def send_to_opencode(message):
|
||||
"""Send message to opencode and return response."""
|
||||
session_id = get_session()
|
||||
if not session_id:
|
||||
return "Error: Could not connect to opencode session."
|
||||
return "Could not connect to opencode session."
|
||||
|
||||
try:
|
||||
r = requests.post(
|
||||
@@ -73,56 +89,32 @@ def send_to_opencode(message):
|
||||
else "Message sent, no text response."
|
||||
)
|
||||
else:
|
||||
return f"opencode returned {r.status_code}: {r.text[:200]}"
|
||||
logger.error("opencode returned %d: %s", r.status_code, r.text[:500])
|
||||
return "opencode returned an error. Check server logs."
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.error("Connection error to opencode at %s", OPENCODE_API_URL)
|
||||
return "Can't connect to opencode. Is it running?"
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning("opencode request timed out")
|
||||
return "opencode took too long to respond. Please try again."
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
|
||||
def process_queue(bot, chat_id):
|
||||
"""Process messages in the queue one at a time."""
|
||||
global is_processing
|
||||
while True:
|
||||
try:
|
||||
item = message_queue.get(timeout=1)
|
||||
if item is None:
|
||||
break
|
||||
user_id, message_id, user_message = item
|
||||
with processing_lock:
|
||||
is_processing = True
|
||||
try:
|
||||
reply = send_to_opencode(user_message)
|
||||
|
||||
async def send_reply():
|
||||
await bot.edit_message_text(
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
text=f"🔄 Processing...\n\n{reply[:4000]}",
|
||||
)
|
||||
|
||||
import asyncio
|
||||
|
||||
asyncio.run(send_reply())
|
||||
finally:
|
||||
with processing_lock:
|
||||
is_processing = False
|
||||
message_queue.task_done()
|
||||
except queue.Empty:
|
||||
continue
|
||||
except Exception:
|
||||
logger.exception("Unexpected error sending message to opencode")
|
||||
return "An unexpected error occurred. Check server logs."
|
||||
|
||||
|
||||
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update):
|
||||
return
|
||||
await update.message.reply_text(
|
||||
"opencode-dispatch bot\n\n"
|
||||
"Send any message and opencode will process it.\n"
|
||||
f"Server: {OPENCODE_API_URL}"
|
||||
"Commands: /start, /help, /status, /working, /clear"
|
||||
)
|
||||
|
||||
|
||||
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update):
|
||||
return
|
||||
await update.message.reply_text(
|
||||
"How to use:\n\n"
|
||||
"1. Make sure opencode is running\n"
|
||||
@@ -133,7 +125,8 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
|
||||
|
||||
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
session_id = get_session()
|
||||
if not is_authorized(update):
|
||||
return
|
||||
try:
|
||||
r = requests.get(f"{OPENCODE_API_URL}/global/health", timeout=5)
|
||||
healthy = r.ok
|
||||
@@ -141,18 +134,19 @@ async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
healthy = False
|
||||
queue_size = message_queue.qsize()
|
||||
await update.message.reply_text(
|
||||
f"Server: {OPENCODE_API_URL}\n"
|
||||
f"opencode: {'✅' if healthy else '❌'}\n"
|
||||
f"Session: {session_id or 'none'}\n"
|
||||
f"opencode: {'connected' if healthy else 'unreachable'}\n"
|
||||
f"Session: {'active' if SESSION_ID else 'none'}\n"
|
||||
f"Queue: {queue_size} messages"
|
||||
)
|
||||
|
||||
|
||||
async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update):
|
||||
return
|
||||
with processing_lock:
|
||||
if is_processing:
|
||||
await update.message.reply_text(
|
||||
"❌ Can't clear queue while processing. Wait for current task to finish."
|
||||
"Can't clear queue while processing. Wait for current task to finish."
|
||||
)
|
||||
else:
|
||||
while not message_queue.empty():
|
||||
@@ -160,26 +154,23 @@ async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
message_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
await update.message.reply_text("✅ Queue cleared.")
|
||||
await update.message.reply_text("Queue cleared.")
|
||||
|
||||
|
||||
async def working_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
global current_task
|
||||
if not is_authorized(update):
|
||||
return
|
||||
if is_processing:
|
||||
await update.message.reply_text(
|
||||
f'🔄 Currently working on:\n"{current_task}"\n\nQueue: {message_queue.qsize()} messages'
|
||||
f'Currently working on:\n"{current_task}"\n\nQueue: {message_queue.qsize()} messages'
|
||||
)
|
||||
else:
|
||||
await update.message.reply_text("✅ Currently idle. No task in progress.")
|
||||
await update.message.reply_text("Currently idle. No task in progress.")
|
||||
|
||||
|
||||
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
global is_processing, current_task
|
||||
chat_id = str(update.message.chat.id)
|
||||
if ALLOWED_CHAT_ID and chat_id != ALLOWED_CHAT_ID:
|
||||
await update.message.reply_text(
|
||||
"❌ This bot is not authorized to respond to you."
|
||||
)
|
||||
if not is_authorized(update):
|
||||
return
|
||||
user_message = update.message.text
|
||||
user_id = update.effective_user.id if update.effective_user else "unknown"
|
||||
@@ -189,7 +180,7 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
|
||||
if currently_processing:
|
||||
sent = await update.message.reply_text(
|
||||
"⏳ opencode is busy. Your message has been added to the queue.\n"
|
||||
"opencode is busy. Your message has been added to the queue.\n"
|
||||
"I'll respond when ready. Use /status to check queue position."
|
||||
)
|
||||
message_queue.put((user_id, sent.message_id, user_message))
|
||||
@@ -198,14 +189,16 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
user_message[:50] + "..." if len(user_message) > 50 else user_message
|
||||
)
|
||||
await update.message.chat.send_action("typing")
|
||||
sent = await update.message.reply_text("🔄 Processing...")
|
||||
sent = await update.message.reply_text("Processing...")
|
||||
with processing_lock:
|
||||
is_processing = True
|
||||
try:
|
||||
reply = send_to_opencode(user_message)
|
||||
loop = asyncio.get_event_loop()
|
||||
reply = await loop.run_in_executor(None, send_to_opencode, user_message)
|
||||
await sent.edit_text(reply[:4000])
|
||||
except Exception as e:
|
||||
await sent.edit_text(f"Error: {str(e)}")
|
||||
except Exception:
|
||||
logger.exception("Error processing message")
|
||||
await sent.edit_text("An error occurred. Check server logs.")
|
||||
finally:
|
||||
with processing_lock:
|
||||
is_processing = False
|
||||
@@ -213,29 +206,33 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
|
||||
|
||||
async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
chat_id = str(update.message.chat.id)
|
||||
if ALLOWED_CHAT_ID and chat_id != ALLOWED_CHAT_ID:
|
||||
if not is_authorized(update):
|
||||
return
|
||||
await update.message.reply_text("Voice messages not yet supported. Send text.")
|
||||
|
||||
|
||||
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
chat_id = str(update.message.chat.id)
|
||||
if ALLOWED_CHAT_ID and chat_id != ALLOWED_CHAT_ID:
|
||||
if not is_authorized(update):
|
||||
return
|
||||
await update.message.reply_text("File handling not yet supported. Send text.")
|
||||
|
||||
|
||||
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
chat_id = str(update.message.chat.id)
|
||||
if ALLOWED_CHAT_ID and chat_id != ALLOWED_CHAT_ID:
|
||||
if not is_authorized(update):
|
||||
return
|
||||
await update.message.reply_text("Image handling not yet supported. Send text.")
|
||||
|
||||
|
||||
def main():
|
||||
if not BOT_TOKEN:
|
||||
print("Error: TELEGRAM_BOT_TOKEN not set in .env file")
|
||||
logger.error("TELEGRAM_BOT_TOKEN not set in .env file")
|
||||
return
|
||||
|
||||
if not ALLOWED_CHAT_ID:
|
||||
logger.error(
|
||||
"TELEGRAM_ALLOWED_CHAT_ID not set in .env file. "
|
||||
"Refusing to start without access control."
|
||||
)
|
||||
return
|
||||
|
||||
app = Application.builder().token(BOT_TOKEN).build()
|
||||
@@ -250,9 +247,8 @@ def main():
|
||||
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
|
||||
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
|
||||
|
||||
print(f"opencode-dispatch bot starting...")
|
||||
print(f"Connecting to opencode at: {OPENCODE_API_URL}")
|
||||
print("Press Ctrl+C to stop")
|
||||
logger.info("opencode-dispatch bot starting...")
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
app.run_polling(allowed_updates=Update.ALL_TYPES)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user