Raw Text Content Remove
snake-gecko-bison



#!/usr/bin/env python3
###
# Bluesky -> Ghost Importer mit neuen sprechenden Überschriften
# Überschrift: Erster TAG (capitalize) + (.notes) für Text oder (.pictures) für Bilder
###
from dotenv import load_dotenv
load_dotenv(dotenv_path='/var/www/writefreely-0.16.0/cron/secret/config.env')
import os
import requests
import sqlite3
from datetime import datetime, timezone
import pytz
import hashlib
import logging
import re
import time
import base64
import hmac
import hashlib
import json
import tempfile
from typing import Optional, List, Dict, Any, Tuple

# Logging-Konfiguration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/opt/writefreely/bluesky_import.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# =============== KONFIGURATION ===============
# BITTE HIER ANPASSEN
BLUESKY_HANDLE = os.getenv('BLUESKY_HANDLE')
BLUESKY_APP_PASSWORD = os.getenv('BLUESKY_APP_PASSWORD')
GHOST_ADMIN_URL = os.getenv('GHOST_ADMIN_URL')  # Anpassen
GHOST_API_KEY = os.getenv('GHOST_API_KEY')  # Format: 'ID:Secret'
GHOST_API_VERSION = 'v5.0'  # An deine Ghost-Version anpassen
TIMEZONE = pytz.timezone('Europe/Berlin')
DB_PATH = '/var/www/writefreely-0.16.0/bluesky_posts.db'

# Tag-Konfiguration für Tumblr-Stil
TAG_WITH_IMAGES_1 = "unterwegs"    # Erster Tag für Bild-Posts
TAG_WITH_IMAGES_2 = "pictures"     # Zweiter Tag für Bild-Posts
TAG_TEXT_ONLY = "notes"            # Tag für reine Text-Posts

# Neue Suffixe für Überschriften
HEADER_SUFFIX_TEXT = "(.notes)"
HEADER_SUFFIX_IMAGES = "(.pictures)"

# Temporärer Ordner für Bilddownloads
TEMP_IMG_DIR = tempfile.mkdtemp(prefix='bluesky_import_')

# ATproto Import
try:
    from atproto import Client, models
    ATPROTO_AVAILABLE = True
    logger.info("✅ atproto erfolgreich importiert")
except ImportError as e:
    logger.error(f"❌ atproto Import fehlgeschlagen: {e}")
    ATPROTO_AVAILABLE = False
    Client = None
    models = None

#--> W o r s c h t < --

# =============== DATENBANK-FUNKTIONEN ===============
def init_db():
    """Initialisiert die Datenbank und passt das Schema bei Bedarf an"""
    with sqlite3.connect(DB_PATH) as conn:
        # 1. Tabelle erstellen, falls nicht vorhanden
        conn.execute('''
            CREATE TABLE IF NOT EXISTS imported_posts (
                uri TEXT PRIMARY KEY,
                created_at TEXT,
                has_images INTEGER DEFAULT 0,
                tag_used TEXT,
                slug TEXT,
                title TEXT
            )
        ''')
        
        # 2. Prüfen, ob alle Spalten existieren
        cursor = conn.execute("PRAGMA table_info(imported_posts)")
        existing_columns = [column[1] for column in cursor.fetchall()]
        
        # 3. Fehlende Spalten hinzufügen
        expected_columns = ['uri', 'created_at', 'has_images', 'tag_used', 'slug', 'title']
        for column in expected_columns:
            if column not in existing_columns:
                if column == 'slug':
                    conn.execute('ALTER TABLE imported_posts ADD COLUMN slug TEXT')
                elif column == 'title':
                    conn.execute('ALTER TABLE imported_posts ADD COLUMN title TEXT')
                elif column == 'tag_used':
                    conn.execute('ALTER TABLE imported_posts ADD COLUMN tag_used TEXT')
                elif column == 'has_images':
                    conn.execute('ALTER TABLE imported_posts ADD COLUMN has_images INTEGER DEFAULT 0')
                logger.info(f"✅ Spalte '{column}' zur Datenbank hinzugefügt")
        
        logger.info("✅ Datenbank initialisiert und Schema geprüft")

# =============== GHOST JWT GENERIERUNG ===============
def generate_ghost_jwt(api_key: str) -> Optional[str]:
    """Generiert JWT-Token für Ghost Admin API"""
    try:
        key_id, key_secret = api_key.split(':')
    except ValueError:
        logger.error("❌ Ungültiger Ghost API Key. Erwartetes Format: ID:Secret")
        return None
    
    try:
        iat = int(time.time())
        exp = iat + 300
        
        header = base64.urlsafe_b64encode(json.dumps({
            "alg": "HS256",
            "kid": key_id,
            "typ": "JWT"
        }).encode()).decode().rstrip('=')
        
        payload = base64.urlsafe_b64encode(json.dumps({
            "iat": iat,
            "exp": exp,
            "aud": "/admin/"
        }).encode()).decode().rstrip('=')
        
        message = f"{header}.{payload}".encode()
        signature = hmac.new(
            bytes.fromhex(key_secret),
            message,
            hashlib.sha256
        ).digest()
        signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip('=')
        
        return f"{header}.{payload}.{signature_b64}"
        
    except Exception as e:
        logger.error(f"❌ Fehler bei JWT-Generierung: {str(e)}")
        return None

# =============== BILDUPLOAD ZU GHOST ===============
def upload_image_to_ghost(image_path: str, ghost_token: str) -> Optional[Dict[str, str]]:
    """Lädt ein Bild via Ghost Admin API hoch"""
    upload_url = f"{GHOST_ADMIN_URL}/images/upload/"
    
    try:
        with open(image_path, 'rb') as image_file:
            files = {'file': (os.path.basename(image_path), image_file, 'image/jpeg')}
            data = {'ref': image_path}
            
            headers = {
                'Authorization': f'Ghost {ghost_token}',
                'Accept-Version': GHOST_API_VERSION
            }
            
            response = requests.post(upload_url, files=files, data=data, headers=headers, timeout=30)
            response.raise_for_status()
            
            response_data = response.json()
            if 'images' in response_data and len(response_data['images']) > 0:
                ghost_image_url = response_data['images'][0]['url']
                logger.info(f"✅ Bild zu Ghost hochgeladen: {os.path.basename(image_path)}")
                return {"url": ghost_image_url}
            else:
                logger.error(f"❌ Ghost API gab keine Bild-URL zurück: {response_data}")
                return None
                
    except Exception as e:
        logger.error(f"❌ Fehler beim Hochladen zu Ghost: {str(e)}")
        return None

# =============== BILDDOWNLOAD VON BLUESKY ===============
def download_image_from_bluesky(blob_ref: Any, client, filename: str) -> Optional[str]:
    """Lädt ein Bild von Bluesky herunter"""
    try:
        local_path = os.path.join(TEMP_IMG_DIR, filename)
        
        image_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?did={client.me.did}&cid={blob_ref.ref.link}"
        headers = {'Authorization': f'Bearer {client._session.access_jwt}'}
        
        response = requests.get(image_url, headers=headers, timeout=15, stream=True)
        response.raise_for_status()
        
        with open(local_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        
        logger.info(f"✅ Bild von Bluesky heruntergeladen: {filename}")
        return local_path
        
    except Exception as e:
        logger.error(f"❌ Bluesky Bilddownload fehlgeschlagen: {str(e)}")
        return None

# =============== HELFER-FUNKTIONEN ===============
def format_german_datetime(dt: datetime) -> str:
    german_months = ['Januar', 'Februar', 'März', 'April', 'Mai', 'Juni',
                     'Juli', 'August', 'September', 'Oktober', 'November', 'Dezember']
    german_weekdays = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag',
                       'Freitag', 'Sonnabend', 'Sonntag']
    weekday = german_weekdays[dt.weekday()]
    day = dt.day
    month = german_months[dt.month - 1]
    year = dt.year
    time = dt.strftime('%H:%M')
    return f"{weekday}, {day}. {month} {year}, {time}"

def parse_iso_datetime(dt_str: str) -> Optional[datetime]:
    try:
        if dt_str.endswith('Z'):
            dt_str = dt_str[:-1] + '+00:00'
        dt = datetime.fromisoformat(dt_str)
        return dt.astimezone(TIMEZONE) if dt.tzinfo else TIMEZONE.localize(dt)
    except ValueError as e:
        logger.error(f"❌ Datumsparsing fehlgeschlagen: {str(e)}")
        return None

def extract_images(record) -> List[Tuple[Any, str]]:
    images = []
    try:
        embed = getattr(record, 'embed', None)
        if isinstance(embed, models.AppBskyEmbedImages.Main):
            for image in embed.images:
                if hasattr(image, 'image'):
                    alt_text = getattr(image, 'alt', '') or ''
                    images.append((image.image, alt_text))
    except Exception as e:
        logger.error(f"❌ Fehler bei extract_images: {str(e)}")
    return images

def extract_hashtags(text: str) -> List[str]:
    """Extrahiert alle Hashtags aus dem Text und gibt sie ohne '#' zurück."""
    return re.findall(r'#(\\w+)', text)

def cleanup_temp_files():
    """Bereinigt temporäre Bilddateien"""
    try:
        for filename in os.listdir(TEMP_IMG_DIR):
            file_path = os.path.join(TEMP_IMG_DIR, filename)
            if os.path.isfile(file_path):
                os.unlink(file_path)
        os.rmdir(TEMP_IMG_DIR)
        logger.info("✅ Temporäre Dateien bereinigt")
    except Exception as e:
        logger.warning(f"⚠️  Konnte temporäre Dateien nicht bereinigen: {str(e)}")

# =============== BLUESKY FUNKTIONEN ===============
def bluesky_login() -> Optional[Client]:
    if not ATPROTO_AVAILABLE:
        logger.error("❌ atproto nicht verfügbar")
        return None
        
    try:
        client = Client()
        client.login(BLUESKY_HANDLE, BLUESKY_APP_PASSWORD)
        logger.info("✅ Bluesky Login erfolgreich")
        return client
    except Exception as e:
        logger.error(f"❌ Login fehlgeschlagen: {str(e)}")
        return None

def get_recent_posts(limit: int = 10) -> List[Dict[str, Any]]:
    imported = []
    
    if not ATPROTO_AVAILABLE:
        logger.error("❌ atproto nicht verfügbar")
        return imported

    client = bluesky_login()
    if not client:
        return imported

    try:
        feed = client.get_author_feed(BLUESKY_HANDLE, limit=limit)
        for item in feed.feed:
            post = item.post
            is_reply = hasattr(post.record, 'reply') and post.record.reply is not None
            is_repost = hasattr(item, 'reason') and item.reason is not None
            is_own_post = post.author.did == client.me.did

            if not (is_own_post and not is_reply and not is_repost):
                continue

            with sqlite3.connect(DB_PATH) as conn:
                already_imported = conn.execute(
                    "SELECT 1 FROM imported_posts WHERE uri = ?",
                    (post.uri,)
                ).fetchone()
                if already_imported:
                    continue

            post_time = parse_iso_datetime(post.record.created_at)
            images = extract_images(post.record)
            imported.append({
                'text': post.record.text,
                'created_at': post.record.created_at,
                'uri': post.uri,
                'record': post.record,
                'images': images,
                'post_time': post_time,
                'client': client
            })
    except Exception as e:
        logger.error(f"❌ Fehler beim Abrufen der Posts: {str(e)}")
        
    return imported

# =============== NEUE TITEL & SLUG FUNKTIONEN ===============
def build_slug_from_title(title: str) -> str:
    """Erzeugt einen sprechenden Slug aus dem Titel"""
    # Entferne Klammern und Punkte
    clean_title = re.sub(r'[()\\.]', '', title)
    # Ersetze Leerzeichen und Sonderzeichen durch Bindestriche
    clean_title = re.sub(r'[^\\w\\s-]', '', clean_title.lower())
    # Ersetze Leerzeichen durch Bindestriche
    clean_title = re.sub(r'[-\\s]+', '-', clean_title)
    # Entferne führende/endende Bindestriche
    clean_title = clean_title.strip('-')
    # Stelle sicher, dass der Slug nicht zu lang ist
    if len(clean_title) > 60:
        clean_title = clean_title[:60].rstrip('-')
    return clean_title

def build_title(hashtags: List[str], has_images: bool) -> str:
    """Erstellt sprechende Überschrift basierend auf Hashtags und Inhalt"""
    if hashtags and len(hashtags) > 0:
        # Ersten Hashtag verwenden (ersten Buchstaben groß)
        first_tag = hashtags[0].capitalize()
    else:
        # Fallback, wenn keine Hashtags vorhanden sind
        first_tag = "Beitrag"
    
    # Suffix basierend auf Inhalt hinzufügen
    if has_images:
        suffix = HEADER_SUFFIX_IMAGES
    else:
        suffix = HEADER_SUFFIX_TEXT
    
    # Titel zusammensetzen
    title = f"{first_tag} {suffix}"
    return title

# =============== TAG-LOGIK FÜR TUMBLR-STIL ===============
def get_tumblr_style_tag(has_images: bool, hashtags: List[str]) -> List[Dict[str, str]]:
    """
    Bestimmt Tags im Tumblr-Stil:
    - MIT Bildern: 'unterwegs', 'pictures' + erster Hashtag (ohne #, falls vorhanden)
    - OHNE Bilder: 'notes' + erster Hashtag (ohne #, falls vorhanden)
    """
    tags = []
    
    if has_images:
        # Posts MIT Bildern: zwei Haupttags
        logger.info(f"📸 Post enthält Bilder → Tags: '{TAG_WITH_IMAGES_1}', '{TAG_WITH_IMAGES_2}'")
        tags.append({"name": TAG_WITH_IMAGES_1})
        tags.append({"name": TAG_WITH_IMAGES_2})
    else:
        # Posts OHNE Bilder: ein Haupttag
        logger.info(f"📝 Reiner Text-Post → Tag: '{TAG_TEXT_ONLY}'")
        tags.append({"name": TAG_TEXT_ONLY})
    
    # Optional: Ersten Hashtag aus dem Text als zusätzlichen Tag (OHNE # Präfix!)
    if hashtags and len(hashtags) > 0:
        first_hashtag = hashtags[0].lower()  # Schon ohne # von extract_hashtags
        
        # Hashtag-Titel für Tag (erster Buchstabe groß, Rest klein)
        hashtag_title = first_hashtag.capitalize()
        
        # Prüfen, ob dieser Hashtag nicht bereits als Tag verwendet wird
        existing_tags = [tag['name'].lower() for tag in tags]
        
        if hashtag_title.lower() not in existing_tags:
            tags.append({"name": hashtag_title})  # OHNE #!
            logger.info(f"   + Zusätzlicher Hashtag-Tag: '{hashtag_title}'")
        else:
            logger.info(f"   Hashtag '{hashtag_title}' bereits als Haupttag vorhanden")
    
    return tags

# =============== GHOST BILD-HTML GENERIERUNG ===============
def create_ghost_image_html(ghost_url: str, alt_text: str = "", caption: str = "") -> str:
    """Erstellt Ghost-kompatibles HTML für Bilder mit ALT-Text und Unterschrift"""
    
    def escape_html(text):
        if not text:
            return ""
        return (text.replace('&', '&amp;')
                    .replace('<', '&lt;')
                    .replace('>', '&gt;')
                    .replace('"', '&quot;')
                    .replace("'", '&#39;'))
    
    escaped_alt = escape_html(alt_text)
    escaped_caption = escape_html(caption)
    
    final_caption = escaped_caption if caption else escaped_alt
    
    if final_caption:
        html = f'''<figure class="kg-card kg-image-card">
    <img src="{ghost_url}" alt="{escaped_alt}" class="kg-image" loading="lazy">
    <figcaption>{final_caption}</figcaption>
</figure>'''
        logger.info(f"   Bild mit Unterschrift: '{final_caption[:50]}{'...' if len(final_caption) > 50 else ''}'")
    else:
        html = f'<img src="{ghost_url}" alt="{escaped_alt}" class="kg-image" loading="lazy">'
        logger.info("   Bild ohne Unterschrift")
    
    return html

# =============== GHOST IMPORT FUNKTION ===============
def import_to_ghost(post: Dict[str, Any]) -> bool:
    """Importiert einen Bluesky-Post zu Ghost mit korrekter ALT-Text-Verarbeitung"""
    try:
        post_time = post['post_time']
        if not post_time:
            logger.error("❌ Keine gültige Zeit für Post")
            return False

        body = post['text']
        hashtags = extract_hashtags(body)
        client = post.get('client')
        
        # 1. Ghost JWT Token generieren
        ghost_token = generate_ghost_jwt(GHOST_API_KEY)
        if not ghost_token:
            return False
        
        # 2. Bilder verarbeiten und zu Ghost hochladen
        ghost_images = []
        for i, (blob_ref, alt_text) in enumerate(post['images'], 1):
            uri_hash = hashlib.md5(post['uri'].encode()).hexdigest()[:8]
            temp_filename = f"bluesky_{uri_hash}_{i}.jpg"
            
            local_path = download_image_from_bluesky(blob_ref, client, temp_filename)
            if local_path:
                ghost_result = upload_image_to_ghost(local_path, ghost_token)
                if ghost_result and 'url' in ghost_result:
                    ghost_url = ghost_result['url']
                    ghost_images.append((ghost_url, alt_text))
                    logger.info(f"   ALT-Text für Bild {i}: '{alt_text[:50]}{'...' if len(alt_text) > 50 else ''}'")
                
                try:
                    os.remove(local_path)
                except OSError:
                    pass
        
        # 3. HTML-Body für Ghost erstellen
        html_body = ""
        
        if body.strip():
            lines = body.strip().split('\\n')
            current_paragraph = []
            
            for line in lines:
                stripped_line = line.strip()
                if stripped_line:
                    current_paragraph.append(stripped_line)
                elif current_paragraph:
                    paragraph_text = ' '.join(current_paragraph)
                    html_body += f"<p>{paragraph_text}</p>\\n"
                    current_paragraph = []
            
            if current_paragraph:
                paragraph_text = ' '.join(current_paragraph)
                html_body += f"<p>{paragraph_text}</p>\\n"
        
        # Bilder mit korrektem HTML-Format hinzufügen
        for ghost_url, alt_text in ghost_images:
            image_html = create_ghost_image_html(ghost_url, alt_text, alt_text)
            html_body += image_html + "\\n"
        
        # 4. NEUE Titel, Slug und Tags generieren
        has_images = len(ghost_images) > 0
        title = build_title(hashtags, has_images)
        slug = build_slug_from_title(title)
        
        # Tumblr-ähnliche Tags
        tags = get_tumblr_style_tag(has_images, hashtags)
        
        # 5. Ghost-konforme JSON-Payload erstellen
        post_data = {
            "posts": [{
                "title": title,
                "html": html_body,
                "slug": slug,
                "status": "draft", #published
                "created_at": post_time.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
                "tags": tags
            }]
        }
        
        # 6. Debug-Informationen
        logger.info(f"📤 Sende zu Ghost:")
        logger.info(f"   Titel: {title}")
        logger.info(f"   Slug: {slug}")
        logger.info(f"   Sprechender Slug für SEO: ✅")
        logger.info(f"   Bilder: {len(ghost_images)}")
        tag_names = [tag['name'] for tag in tags]
        logger.info(f"   Tags: {', '.join(tag_names)}")
        
        # 7. Request an Ghost senden
        ghost_api_url = f"{GHOST_ADMIN_URL}/posts/?source=html"
        
        response = requests.post(
            ghost_api_url,
            headers={
                'Authorization': f'Ghost {ghost_token}',
                'Content-Type': 'application/json',
                'Accept-Version': GHOST_API_VERSION
            },
            json=post_data,
            timeout=30
        )
        
        # 8. Response auswerten
        if response.status_code == 201:
            response_data = response.json()
            
            # Datenbankeintrag mit Fehlerbehandlung
            try:
                # Haupttag für Datenbank bestimmen
                if has_images:
                    main_tag = TAG_WITH_IMAGES_1
                else:
                    main_tag = TAG_TEXT_ONLY
                    
                with sqlite3.connect(DB_PATH) as conn:
                    conn.execute(
                        "INSERT OR REPLACE INTO imported_posts (uri, created_at, has_images, tag_used, slug, title) VALUES (?, ?, ?, ?, ?, ?)",
                        (post['uri'], post['created_at'], 1 if ghost_images else 0, main_tag, slug, title)
                    )
                logger.info(f"✅ Datenbankeintrag für Post {post['uri']} aktualisiert")
            except sqlite3.Error as db_error:
                logger.error(f"⚠️  Datenbankfehler (Post trotzdem in Ghost erstellt): {db_error}")
            
            ghost_post_id = response_data['posts'][0]['id']
            logger.info(f"✅ Erfolgreich zu Ghost importiert (ID: {ghost_post_id})")
            
            if ghost_images:
                alt_texts_summary = ", ".join([f"'{alt[:20]}...'" if alt and len(alt) > 20 else f"'{alt}'" 
                                              for _, alt in ghost_images if alt])
                if alt_texts_summary:
                    logger.info(f"   Verwendete ALT-Texte: {alt_texts_summary}")
            
            return True
            
        else:
            logger.error(f"❌ Ghost API-Fehler {response.status_code}: {response.text}")
            if response.status_code == 422:
                logger.error("⚠️  Möglicherweise ungültiges HTML-Format")
            return False
            
    except Exception as e:
        logger.error(f"❌ Fehler im Ghost-Import: {str(e)}")
        import traceback
        logger.error(traceback.format_exc())
        return False

# =============== HAUPTFUNKTION ===============
if __name__ == '__main__':
    logger.info(f"🚀 Bluesky zu Ghost Import startet: {datetime.now(TIMEZONE).strftime('%d.%m.%Y %H:%M')}")
    logger.info(f"   NEUE Überschriften-Logik: Erster Tag + (.notes/.pictures)")
    logger.info(f"   Tag-Konfiguration: Bilder → '{TAG_WITH_IMAGES_1}', '{TAG_WITH_IMAGES_2}' | Text → '{TAG_TEXT_ONLY}'")
    
    try:
        # Konfigurationsprüfung
        if not all([BLUESKY_HANDLE, BLUESKY_APP_PASSWORD, GHOST_API_KEY]):
            logger.error("❌ Fehlende Konfiguration")
            exit(1)
        
        if GHOST_ADMIN_URL == 'https://DEINE-DOMAIN.DE/ghost/api/admin':
            logger.error("❌ Bitte GHOST_ADMIN_URL anpassen!")
            exit(1)
        
        # Datenbank initialisieren (mit Schema-Update)
        init_db()
        
        if not ATPROTO_AVAILABLE:
            logger.error("❌ atproto nicht verfügbar")
            exit(1)
        
        # JWT-Test
        test_token = generate_ghost_jwt(GHOST_API_KEY)
        if not test_token:
            logger.error("❌ Ghost JWT Generierung fehlgeschlagen")
            exit(1)
        logger.info("✅ Ghost Authentifizierung erfolgreich")
        
        # Posts abrufen
        recent_posts = get_recent_posts(limit=5)
        
        if not recent_posts:
            logger.info("ℹ️  Keine neuen Bluesky-Beiträge gefunden")
        else:
            logger.info(f"📥 {len(recent_posts)} neue Beiträge gefunden")
            
            # In chronologischer Reihenfolge importieren
            for i, post in enumerate(reversed(recent_posts), 1):
                logger.info(f"--- Verarbeite Beitrag {i}/{len(recent_posts)} ---")
                logger.info(f"   Bluesky Text: {post['text'][:100]}...")
                
                if import_to_ghost(post):
                    logger.info(f"✅ Beitrag {i} erfolgreich importiert")
                else:
                    logger.error(f"❌ Beitrag {i} fehlgeschlagen")
                
                if i < len(recent_posts):
                    time.sleep(1)
        
        # Aufräumen
        cleanup_temp_files()
        
    except KeyboardInterrupt:
        logger.info("⚠️  Skript durch Benutzer abgebrochen")
        cleanup_temp_files()
    except Exception as e:
        logger.error(f"💥 Kritischer Fehler: {str(e)}")
        import traceback
        logger.error(traceback.format_exc())
        cleanup_temp_files()
        
    logger.info("🏁 Skript beendet")

Read 5 times, last 31 seconds ago