[Python] Kufar.by Unread Messages

Данная программа предоставляет возможность выводить непрочитанные сообщения в окно
kufar_app.webp

KufarApp.py
Python:
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import requests
import threading
import time
import os

TOKEN_FILE = "token.txt"

HEADERS = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:142.0) Gecko/20100101 Firefox/142.0",
    "Authorization": "",
    "X-App-Name": "Web Kufar",
    "Content-Type": "application/json",
    "Origin": "https://www.kufar.by"
}

URL = "https://api.kufar.by/messaging-api/v3/conversations?limit=10&offset=0"

def format_datetime(iso_str):
    try:
        from datetime import datetime
        dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
        return dt.strftime("%d.%m.%Y %H:%M")
    except:
        return iso_str

def load_token():
    if os.path.exists(TOKEN_FILE):
        with open(TOKEN_FILE, "r", encoding="utf-8") as f:
            return f.read().strip()
    return None

def save_token(token):
    with open(TOKEN_FILE, "w", encoding="utf-8") as f:
        f.write(token.strip())

def fetch_unread_messages(token):
    HEADERS["Authorization"] = f"Bearer {token}"
    try:
        response = requests.get(URL, headers=HEADERS, timeout=10)

        if response.status_code == 401:
            return {"error": "unauthorized"}

        response.raise_for_status()
        data = response.json()

        conversations = data.get("conversations", [])
        unread = []

        for conv in conversations:
            if conv.get("unseen", 0) > 0:
                ad_subject = conv["ad_info"]["subject"]
                participant_name = conv["participant_info"]["name"]
                last_msg_preview = conv["last_message"]["preview"]
                last_msg_time = format_datetime(conv["last_message"]["timestamp"])
                ad_link = conv["ad_info"]["link"]

                unread.append({
                    "subject": ad_subject,
                    "from": participant_name,
                    "message": last_msg_preview,
                    "time": last_msg_time,
                    "link": ad_link
                })

        return unread

    except Exception as e:
        return {"error": str(e)}

class KufarApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Kufar Unread Messages")
        self.root.geometry("700x500")
        self.root.resizable(True, True)

        token = load_token()
        if not token:
            self.show_token_input()
        else:
            HEADERS["Authorization"] = f"Bearer {token}"
            self.setup_ui()
            self.start_auto_refresh()

    def show_token_input(self):
        for widget in self.root.winfo_children():
            widget.destroy()

        frame = ttk.Frame(self.root, padding=20)
        frame.pack(fill="both", expand=True)

        ttk.Label(frame, text="Введите Bearer Token:", font=("Arial", 12)).pack(pady=10)
        self.token_entry = ttk.Entry(frame, width=80, font=("Arial", 10))
        self.token_entry.pack(pady=5)
        self.token_entry.focus()

        ttk.Button(frame, text="Сохранить и продолжить", command=self.save_token_and_start).pack(pady=20)

    def save_token_and_start(self):
        token = self.token_entry.get().strip()
        if not token:
            messagebox.showerror("Ошибка", "Токен не может быть пустым")
            return
        save_token(token)
        self.setup_ui()
        self.start_auto_refresh()

    def setup_ui(self):
        for widget in self.root.winfo_children():
            widget.destroy()

        self.text_area = scrolledtext.ScrolledText(self.root, wrap=tk.WORD, font=("Arial", 10), state="disabled")
        self.text_area.pack(padx=10, pady=10, fill="both", expand=True)

        self.status_bar = ttk.Label(self.root, text="Загрузка...", relief=tk.SUNKEN, anchor=tk.W)
        self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)

        self.next_update_in = 60
        self.last_update_time = time.strftime('%H:%M:%S')
        self.update_messages()

    def update_messages(self):
        self.text_area.config(state="normal")
        self.text_area.delete(1.0, tk.END)

        token = load_token()
        if not token:
            self.text_area.insert(tk.END, "Токен не найден. Перезапустите приложение.")
            self.text_area.config(state="disabled")
            return

        result = fetch_unread_messages(token)

        if isinstance(result, dict) and "error" in result:
            if result["error"] == "unauthorized":
                if os.path.exists(TOKEN_FILE):
                    os.remove(TOKEN_FILE)
                messagebox.showerror("Ошибка авторизации", "Токен недействителен. Приложение будет закрыто.")
                self.root.quit()
                return
            else:
                self.text_area.insert(tk.END, f"Ошибка: {result['error']}\n")
        elif len(result) == 0:
            self.text_area.insert(tk.END, "Нет новых непрочитанных сообщений.\n")
        else:
            for msg in result:
                self.text_area.insert(tk.END, f"Объявление: {msg['subject']}\n")
                self.text_area.insert(tk.END, f"От: {msg['from']}\n")
                self.text_area.insert(tk.END, f"Сообщение: {msg['message']}\n")
                self.text_area.insert(tk.END, f"Дата: {msg['time']}\n")
                self.text_area.insert(tk.END, f"Ссылка: {msg['link']}\n")
                self.text_area.insert(tk.END, "-" * 60 + "\n\n")

        self.text_area.config(state="disabled")
        self.next_update_in = 60
        self.last_update_time = time.strftime('%H:%M:%S')

    def start_auto_refresh(self):
        def countdown():
            while True:
                if self.next_update_in > 0:
                    self.root.after(0, lambda n=self.next_update_in, t=self.last_update_time: self.status_bar.config(
                        text=f"Обновлено: {t} | Следующее обновление через: {n} сек"
                    ))
                    self.next_update_in -= 1
                    time.sleep(1)
                else:
                    self.root.after(0, self.update_messages)
                    self.next_update_in = 60

        thread = threading.Thread(target=countdown, daemon=True)
        thread.start()

if __name__ == "__main__":
    root = tk.Tk()
    app = KufarApp(root)
    root.mainloop()

Build.sh
Bash:
#!/bin/bash

SCRIPT_NAME="KufarApp.py"
SPEC_FILE="KufarApp.spec"

if [ ! -f "$SCRIPT_NAME" ]; then
    echo "Error: $SCRIPT_NAME not found!"
    exit 1
fi

python3 -m venv env
source env/bin/activate

pip install --upgrade pip
pip install requests pyinstaller

pyinstaller --onefile --windowed "$SCRIPT_NAME"

if [ $? -ne 0 ]; then
    deactivate
    rm -rf env
    exit 1
fi

deactivate
rm -rf env build
rm "$SPEC_FILE"

echo "Build completed. Executable is in 'dist' folder."
 
Назад
Сверху