#!/usr/bin/env python3
###
# Bluesky -> Ghost Importer mit neuen sprechenden Überschriften
# Überschrift: Erster TAG (capitalize) + (.notes) für Text oder (.pictures) für Bilder
###
from dotenv import load_dotenv
load_dotenv(dotenv_path='/var/www/writefreely-0.16.0/cron/secret/config.env')
import os
import requests
import sqlite3
from datetime import datetime, timezone
import pytz
import hashlib
import logging
import re
import time
import base64
import hmac
import hashlib
import json
import tempfile
from typing import Optional, List, Dict, Any, Tuple
# Logging-Konfiguration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/opt/writefreely/bluesky_import.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# =============== KONFIGURATION ===============
# BITTE HIER ANPASSEN
BLUESKY_HANDLE = os.getenv('BLUESKY_HANDLE')
BLUESKY_APP_PASSWORD = os.getenv('BLUESKY_APP_PASSWORD')
GHOST_ADMIN_URL = os.getenv('GHOST_ADMIN_URL') # Anpassen
GHOST_API_KEY = os.getenv('GHOST_API_KEY') # Format: 'ID:Secret'
GHOST_API_VERSION = 'v5.0' # An deine Ghost-Version anpassen
TIMEZONE = pytz.timezone('Europe/Berlin')
DB_PATH = '/var/www/writefreely-0.16.0/bluesky_posts.db'
# Tag-Konfiguration für Tumblr-Stil
TAG_WITH_IMAGES_1 = "unterwegs" # Erster Tag für Bild-Posts
TAG_WITH_IMAGES_2 = "pictures" # Zweiter Tag für Bild-Posts
TAG_TEXT_ONLY = "notes" # Tag für reine Text-Posts
# Neue Suffixe für Überschriften
HEADER_SUFFIX_TEXT = "(.notes)"
HEADER_SUFFIX_IMAGES = "(.pictures)"
# Temporärer Ordner für Bilddownloads
TEMP_IMG_DIR = tempfile.mkdtemp(prefix='bluesky_import_')
# ATproto Import
try:
from atproto import Client, models
ATPROTO_AVAILABLE = True
logger.info("✅ atproto erfolgreich importiert")
except ImportError as e:
logger.error(f"❌ atproto Import fehlgeschlagen: {e}")
ATPROTO_AVAILABLE = False
Client = None
models = None
#--> W o r s c h t < --
# =============== DATENBANK-FUNKTIONEN ===============
def init_db():
"""Initialisiert die Datenbank und passt das Schema bei Bedarf an"""
with sqlite3.connect(DB_PATH) as conn:
# 1. Tabelle erstellen, falls nicht vorhanden
conn.execute('''
CREATE TABLE IF NOT EXISTS imported_posts (
uri TEXT PRIMARY KEY,
created_at TEXT,
has_images INTEGER DEFAULT 0,
tag_used TEXT,
slug TEXT,
title TEXT
)
''')
# 2. Prüfen, ob alle Spalten existieren
cursor = conn.execute("PRAGMA table_info(imported_posts)")
existing_columns = [column[1] for column in cursor.fetchall()]
# 3. Fehlende Spalten hinzufügen
expected_columns = ['uri', 'created_at', 'has_images', 'tag_used', 'slug', 'title']
for column in expected_columns:
if column not in existing_columns:
if column == 'slug':
conn.execute('ALTER TABLE imported_posts ADD COLUMN slug TEXT')
elif column == 'title':
conn.execute('ALTER TABLE imported_posts ADD COLUMN title TEXT')
elif column == 'tag_used':
conn.execute('ALTER TABLE imported_posts ADD COLUMN tag_used TEXT')
elif column == 'has_images':
conn.execute('ALTER TABLE imported_posts ADD COLUMN has_images INTEGER DEFAULT 0')
logger.info(f"✅ Spalte '{column}' zur Datenbank hinzugefügt")
logger.info("✅ Datenbank initialisiert und Schema geprüft")
# =============== GHOST JWT GENERIERUNG ===============
def generate_ghost_jwt(api_key: str) -> Optional[str]:
"""Generiert JWT-Token für Ghost Admin API"""
try:
key_id, key_secret = api_key.split(':')
except ValueError:
logger.error("❌ Ungültiger Ghost API Key. Erwartetes Format: ID:Secret")
return None
try:
iat = int(time.time())
exp = iat + 300
header = base64.urlsafe_b64encode(json.dumps({
"alg": "HS256",
"kid": key_id,
"typ": "JWT"
}).encode()).decode().rstrip('=')
payload = base64.urlsafe_b64encode(json.dumps({
"iat": iat,
"exp": exp,
"aud": "/admin/"
}).encode()).decode().rstrip('=')
message = f"{header}.{payload}".encode()
signature = hmac.new(
bytes.fromhex(key_secret),
message,
hashlib.sha256
).digest()
signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip('=')
return f"{header}.{payload}.{signature_b64}"
except Exception as e:
logger.error(f"❌ Fehler bei JWT-Generierung: {str(e)}")
return None
# =============== BILDUPLOAD ZU GHOST ===============
def upload_image_to_ghost(image_path: str, ghost_token: str) -> Optional[Dict[str, str]]:
"""Lädt ein Bild via Ghost Admin API hoch"""
upload_url = f"{GHOST_ADMIN_URL}/images/upload/"
try:
with open(image_path, 'rb') as image_file:
files = {'file': (os.path.basename(image_path), image_file, 'image/jpeg')}
data = {'ref': image_path}
headers = {
'Authorization': f'Ghost {ghost_token}',
'Accept-Version': GHOST_API_VERSION
}
response = requests.post(upload_url, files=files, data=data, headers=headers, timeout=30)
response.raise_for_status()
response_data = response.json()
if 'images' in response_data and len(response_data['images']) > 0:
ghost_image_url = response_data['images'][0]['url']
logger.info(f"✅ Bild zu Ghost hochgeladen: {os.path.basename(image_path)}")
return {"url": ghost_image_url}
else:
logger.error(f"❌ Ghost API gab keine Bild-URL zurück: {response_data}")
return None
except Exception as e:
logger.error(f"❌ Fehler beim Hochladen zu Ghost: {str(e)}")
return None
# =============== BILDDOWNLOAD VON BLUESKY ===============
def download_image_from_bluesky(blob_ref: Any, client, filename: str) -> Optional[str]:
"""Lädt ein Bild von Bluesky herunter"""
try:
local_path = os.path.join(TEMP_IMG_DIR, filename)
image_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?did={client.me.did}&cid={blob_ref.ref.link}"
headers = {'Authorization': f'Bearer {client._session.access_jwt}'}
response = requests.get(image_url, headers=headers, timeout=15, stream=True)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info(f"✅ Bild von Bluesky heruntergeladen: {filename}")
return local_path
except Exception as e:
logger.error(f"❌ Bluesky Bilddownload fehlgeschlagen: {str(e)}")
return None
# =============== HELFER-FUNKTIONEN ===============
def format_german_datetime(dt: datetime) -> str:
german_months = ['Januar', 'Februar', 'März', 'April', 'Mai', 'Juni',
'Juli', 'August', 'September', 'Oktober', 'November', 'Dezember']
german_weekdays = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag',
'Freitag', 'Sonnabend', 'Sonntag']
weekday = german_weekdays[dt.weekday()]
day = dt.day
month = german_months[dt.month - 1]
year = dt.year
time = dt.strftime('%H:%M')
return f"{weekday}, {day}. {month} {year}, {time}"
def parse_iso_datetime(dt_str: str) -> Optional[datetime]:
try:
if dt_str.endswith('Z'):
dt_str = dt_str[:-1] + '+00:00'
dt = datetime.fromisoformat(dt_str)
return dt.astimezone(TIMEZONE) if dt.tzinfo else TIMEZONE.localize(dt)
except ValueError as e:
logger.error(f"❌ Datumsparsing fehlgeschlagen: {str(e)}")
return None
def extract_images(record) -> List[Tuple[Any, str]]:
images = []
try:
embed = getattr(record, 'embed', None)
if isinstance(embed, models.AppBskyEmbedImages.Main):
for image in embed.images:
if hasattr(image, 'image'):
alt_text = getattr(image, 'alt', '') or ''
images.append((image.image, alt_text))
except Exception as e:
logger.error(f"❌ Fehler bei extract_images: {str(e)}")
return images
def extract_hashtags(text: str) -> List[str]:
"""Extrahiert alle Hashtags aus dem Text und gibt sie ohne '#' zurück."""
return re.findall(r'#(\\w+)', text)
def cleanup_temp_files():
"""Bereinigt temporäre Bilddateien"""
try:
for filename in os.listdir(TEMP_IMG_DIR):
file_path = os.path.join(TEMP_IMG_DIR, filename)
if os.path.isfile(file_path):
os.unlink(file_path)
os.rmdir(TEMP_IMG_DIR)
logger.info("✅ Temporäre Dateien bereinigt")
except Exception as e:
logger.warning(f"⚠️ Konnte temporäre Dateien nicht bereinigen: {str(e)}")
# =============== BLUESKY FUNKTIONEN ===============
def bluesky_login() -> Optional[Client]:
if not ATPROTO_AVAILABLE:
logger.error("❌ atproto nicht verfügbar")
return None
try:
client = Client()
client.login(BLUESKY_HANDLE, BLUESKY_APP_PASSWORD)
logger.info("✅ Bluesky Login erfolgreich")
return client
except Exception as e:
logger.error(f"❌ Login fehlgeschlagen: {str(e)}")
return None
def get_recent_posts(limit: int = 10) -> List[Dict[str, Any]]:
imported = []
if not ATPROTO_AVAILABLE:
logger.error("❌ atproto nicht verfügbar")
return imported
client = bluesky_login()
if not client:
return imported
try:
feed = client.get_author_feed(BLUESKY_HANDLE, limit=limit)
for item in feed.feed:
post = item.post
is_reply = hasattr(post.record, 'reply') and post.record.reply is not None
is_repost = hasattr(item, 'reason') and item.reason is not None
is_own_post = post.author.did == client.me.did
if not (is_own_post and not is_reply and not is_repost):
continue
with sqlite3.connect(DB_PATH) as conn:
already_imported = conn.execute(
"SELECT 1 FROM imported_posts WHERE uri = ?",
(post.uri,)
).fetchone()
if already_imported:
continue
post_time = parse_iso_datetime(post.record.created_at)
images = extract_images(post.record)
imported.append({
'text': post.record.text,
'created_at': post.record.created_at,
'uri': post.uri,
'record': post.record,
'images': images,
'post_time': post_time,
'client': client
})
except Exception as e:
logger.error(f"❌ Fehler beim Abrufen der Posts: {str(e)}")
return imported
# =============== NEUE TITEL & SLUG FUNKTIONEN ===============
def build_slug_from_title(title: str) -> str:
"""Erzeugt einen sprechenden Slug aus dem Titel"""
# Entferne Klammern und Punkte
clean_title = re.sub(r'[()\\.]', '', title)
# Ersetze Leerzeichen und Sonderzeichen durch Bindestriche
clean_title = re.sub(r'[^\\w\\s-]', '', clean_title.lower())
# Ersetze Leerzeichen durch Bindestriche
clean_title = re.sub(r'[-\\s]+', '-', clean_title)
# Entferne führende/endende Bindestriche
clean_title = clean_title.strip('-')
# Stelle sicher, dass der Slug nicht zu lang ist
if len(clean_title) > 60:
clean_title = clean_title[:60].rstrip('-')
return clean_title
def build_title(hashtags: List[str], has_images: bool) -> str:
"""Erstellt sprechende Überschrift basierend auf Hashtags und Inhalt"""
if hashtags and len(hashtags) > 0:
# Ersten Hashtag verwenden (ersten Buchstaben groß)
first_tag = hashtags[0].capitalize()
else:
# Fallback, wenn keine Hashtags vorhanden sind
first_tag = "Beitrag"
# Suffix basierend auf Inhalt hinzufügen
if has_images:
suffix = HEADER_SUFFIX_IMAGES
else:
suffix = HEADER_SUFFIX_TEXT
# Titel zusammensetzen
title = f"{first_tag} {suffix}"
return title
# =============== TAG-LOGIK FÜR TUMBLR-STIL ===============
def get_tumblr_style_tag(has_images: bool, hashtags: List[str]) -> List[Dict[str, str]]:
"""
Bestimmt Tags im Tumblr-Stil:
- MIT Bildern: 'unterwegs', 'pictures' + erster Hashtag (ohne #, falls vorhanden)
- OHNE Bilder: 'notes' + erster Hashtag (ohne #, falls vorhanden)
"""
tags = []
if has_images:
# Posts MIT Bildern: zwei Haupttags
logger.info(f"📸 Post enthält Bilder → Tags: '{TAG_WITH_IMAGES_1}', '{TAG_WITH_IMAGES_2}'")
tags.append({"name": TAG_WITH_IMAGES_1})
tags.append({"name": TAG_WITH_IMAGES_2})
else:
# Posts OHNE Bilder: ein Haupttag
logger.info(f"📝 Reiner Text-Post → Tag: '{TAG_TEXT_ONLY}'")
tags.append({"name": TAG_TEXT_ONLY})
# Optional: Ersten Hashtag aus dem Text als zusätzlichen Tag (OHNE # Präfix!)
if hashtags and len(hashtags) > 0:
first_hashtag = hashtags[0].lower() # Schon ohne # von extract_hashtags
# Hashtag-Titel für Tag (erster Buchstabe groß, Rest klein)
hashtag_title = first_hashtag.capitalize()
# Prüfen, ob dieser Hashtag nicht bereits als Tag verwendet wird
existing_tags = [tag['name'].lower() for tag in tags]
if hashtag_title.lower() not in existing_tags:
tags.append({"name": hashtag_title}) # OHNE #!
logger.info(f" + Zusätzlicher Hashtag-Tag: '{hashtag_title}'")
else:
logger.info(f" Hashtag '{hashtag_title}' bereits als Haupttag vorhanden")
return tags
# =============== GHOST BILD-HTML GENERIERUNG ===============
def create_ghost_image_html(ghost_url: str, alt_text: str = "", caption: str = "") -> str:
"""Erstellt Ghost-kompatibles HTML für Bilder mit ALT-Text und Unterschrift"""
def escape_html(text):
if not text:
return ""
return (text.replace('&', '&')
.replace('<', '<')
.replace('>', '>')
.replace('"', '"')
.replace("'", '''))
escaped_alt = escape_html(alt_text)
escaped_caption = escape_html(caption)
final_caption = escaped_caption if caption else escaped_alt
if final_caption:
html = f'''<figure class="kg-card kg-image-card">
<img src="{ghost_url}" alt="{escaped_alt}" class="kg-image" loading="lazy">
<figcaption>{final_caption}</figcaption>
</figure>'''
logger.info(f" Bild mit Unterschrift: '{final_caption[:50]}{'...' if len(final_caption) > 50 else ''}'")
else:
html = f'<img src="{ghost_url}" alt="{escaped_alt}" class="kg-image" loading="lazy">'
logger.info(" Bild ohne Unterschrift")
return html
# =============== GHOST IMPORT FUNKTION ===============
def import_to_ghost(post: Dict[str, Any]) -> bool:
"""Importiert einen Bluesky-Post zu Ghost mit korrekter ALT-Text-Verarbeitung"""
try:
post_time = post['post_time']
if not post_time:
logger.error("❌ Keine gültige Zeit für Post")
return False
body = post['text']
hashtags = extract_hashtags(body)
client = post.get('client')
# 1. Ghost JWT Token generieren
ghost_token = generate_ghost_jwt(GHOST_API_KEY)
if not ghost_token:
return False
# 2. Bilder verarbeiten und zu Ghost hochladen
ghost_images = []
for i, (blob_ref, alt_text) in enumerate(post['images'], 1):
uri_hash = hashlib.md5(post['uri'].encode()).hexdigest()[:8]
temp_filename = f"bluesky_{uri_hash}_{i}.jpg"
local_path = download_image_from_bluesky(blob_ref, client, temp_filename)
if local_path:
ghost_result = upload_image_to_ghost(local_path, ghost_token)
if ghost_result and 'url' in ghost_result:
ghost_url = ghost_result['url']
ghost_images.append((ghost_url, alt_text))
logger.info(f" ALT-Text für Bild {i}: '{alt_text[:50]}{'...' if len(alt_text) > 50 else ''}'")
try:
os.remove(local_path)
except OSError:
pass
# 3. HTML-Body für Ghost erstellen
html_body = ""
if body.strip():
lines = body.strip().split('\\n')
current_paragraph = []
for line in lines:
stripped_line = line.strip()
if stripped_line:
current_paragraph.append(stripped_line)
elif current_paragraph:
paragraph_text = ' '.join(current_paragraph)
html_body += f"<p>{paragraph_text}</p>\\n"
current_paragraph = []
if current_paragraph:
paragraph_text = ' '.join(current_paragraph)
html_body += f"<p>{paragraph_text}</p>\\n"
# Bilder mit korrektem HTML-Format hinzufügen
for ghost_url, alt_text in ghost_images:
image_html = create_ghost_image_html(ghost_url, alt_text, alt_text)
html_body += image_html + "\\n"
# 4. NEUE Titel, Slug und Tags generieren
has_images = len(ghost_images) > 0
title = build_title(hashtags, has_images)
slug = build_slug_from_title(title)
# Tumblr-ähnliche Tags
tags = get_tumblr_style_tag(has_images, hashtags)
# 5. Ghost-konforme JSON-Payload erstellen
post_data = {
"posts": [{
"title": title,
"html": html_body,
"slug": slug,
"status": "draft", #published
"created_at": post_time.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
"tags": tags
}]
}
# 6. Debug-Informationen
logger.info(f"📤 Sende zu Ghost:")
logger.info(f" Titel: {title}")
logger.info(f" Slug: {slug}")
logger.info(f" Sprechender Slug für SEO: ✅")
logger.info(f" Bilder: {len(ghost_images)}")
tag_names = [tag['name'] for tag in tags]
logger.info(f" Tags: {', '.join(tag_names)}")
# 7. Request an Ghost senden
ghost_api_url = f"{GHOST_ADMIN_URL}/posts/?source=html"
response = requests.post(
ghost_api_url,
headers={
'Authorization': f'Ghost {ghost_token}',
'Content-Type': 'application/json',
'Accept-Version': GHOST_API_VERSION
},
json=post_data,
timeout=30
)
# 8. Response auswerten
if response.status_code == 201:
response_data = response.json()
# Datenbankeintrag mit Fehlerbehandlung
try:
# Haupttag für Datenbank bestimmen
if has_images:
main_tag = TAG_WITH_IMAGES_1
else:
main_tag = TAG_TEXT_ONLY
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT OR REPLACE INTO imported_posts (uri, created_at, has_images, tag_used, slug, title) VALUES (?, ?, ?, ?, ?, ?)",
(post['uri'], post['created_at'], 1 if ghost_images else 0, main_tag, slug, title)
)
logger.info(f"✅ Datenbankeintrag für Post {post['uri']} aktualisiert")
except sqlite3.Error as db_error:
logger.error(f"⚠️ Datenbankfehler (Post trotzdem in Ghost erstellt): {db_error}")
ghost_post_id = response_data['posts'][0]['id']
logger.info(f"✅ Erfolgreich zu Ghost importiert (ID: {ghost_post_id})")
if ghost_images:
alt_texts_summary = ", ".join([f"'{alt[:20]}...'" if alt and len(alt) > 20 else f"'{alt}'"
for _, alt in ghost_images if alt])
if alt_texts_summary:
logger.info(f" Verwendete ALT-Texte: {alt_texts_summary}")
return True
else:
logger.error(f"❌ Ghost API-Fehler {response.status_code}: {response.text}")
if response.status_code == 422:
logger.error("⚠️ Möglicherweise ungültiges HTML-Format")
return False
except Exception as e:
logger.error(f"❌ Fehler im Ghost-Import: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
# =============== HAUPTFUNKTION ===============
if __name__ == '__main__':
logger.info(f"🚀 Bluesky zu Ghost Import startet: {datetime.now(TIMEZONE).strftime('%d.%m.%Y %H:%M')}")
logger.info(f" NEUE Überschriften-Logik: Erster Tag + (.notes/.pictures)")
logger.info(f" Tag-Konfiguration: Bilder → '{TAG_WITH_IMAGES_1}', '{TAG_WITH_IMAGES_2}' | Text → '{TAG_TEXT_ONLY}'")
try:
# Konfigurationsprüfung
if not all([BLUESKY_HANDLE, BLUESKY_APP_PASSWORD, GHOST_API_KEY]):
logger.error("❌ Fehlende Konfiguration")
exit(1)
if GHOST_ADMIN_URL == 'https://DEINE-DOMAIN.DE/ghost/api/admin':
logger.error("❌ Bitte GHOST_ADMIN_URL anpassen!")
exit(1)
# Datenbank initialisieren (mit Schema-Update)
init_db()
if not ATPROTO_AVAILABLE:
logger.error("❌ atproto nicht verfügbar")
exit(1)
# JWT-Test
test_token = generate_ghost_jwt(GHOST_API_KEY)
if not test_token:
logger.error("❌ Ghost JWT Generierung fehlgeschlagen")
exit(1)
logger.info("✅ Ghost Authentifizierung erfolgreich")
# Posts abrufen
recent_posts = get_recent_posts(limit=5)
if not recent_posts:
logger.info("ℹ️ Keine neuen Bluesky-Beiträge gefunden")
else:
logger.info(f"📥 {len(recent_posts)} neue Beiträge gefunden")
# In chronologischer Reihenfolge importieren
for i, post in enumerate(reversed(recent_posts), 1):
logger.info(f"--- Verarbeite Beitrag {i}/{len(recent_posts)} ---")
logger.info(f" Bluesky Text: {post['text'][:100]}...")
if import_to_ghost(post):
logger.info(f"✅ Beitrag {i} erfolgreich importiert")
else:
logger.error(f"❌ Beitrag {i} fehlgeschlagen")
if i < len(recent_posts):
time.sleep(1)
# Aufräumen
cleanup_temp_files()
except KeyboardInterrupt:
logger.info("⚠️ Skript durch Benutzer abgebrochen")
cleanup_temp_files()
except Exception as e:
logger.error(f"💥 Kritischer Fehler: {str(e)}")
import traceback
logger.error(traceback.format_exc())
cleanup_temp_files()
logger.info("🏁 Skript beendet")
Read 5 times, last 31 seconds ago