Inhalt

Echtzeit-Daten: Die Event-Driven Architektur von OParl-Lite

Über das Wochenende habe ich die Event-Driven Architektur von OParl-Lite weiter implementiert, vor allem um https://bonn.social/@Sascha/115045394853309363 zu unterstützen und Echtzeit-Daten für alle zugänglich zu machen.

Transparenz in der Politik erfordert nicht nur Zugang zu Informationen, sondern auch deren zeitnahe Verfügbarkeit. Das Open-Source-Projekt OParl-Lite zeigt, wie moderne Event-Driven Architecture parlamentarische Daten in Echtzeit zu Bürgern und Anwendungen bringen kann. In diesem Artikel beleuchten wir die technische Architektur eines Systems, das RSS-Feeds, Redis Pub/Sub und GraphQL-Subscriptions zu einer leistungsstarken Echtzeit-Pipeline verbindet.

Die Vision: Demokratie in Echtzeit

Stellen Sie sich vor, Journalisten erfahren sofort von neuen Stadtratssitzungen, Bürger werden automatisch über relevante Tagesordnungspunkte benachrichtigt, und Watchdog-Organisationen können parlamentarische Aktivitäten in Echtzeit überwachen. OParl-Lite macht diese Vision technisch umsetzbar durch eine durchdachte Microservice-Architektur.

Technologie, Politik, Echtzeit-Daten

Architektur-Überblick: Loosely Coupled, Highly Scalable

Die Echtzeit-Datenfluss-Architektur von OParl-Lite folgt dem Prinzip der Separation of Concerns und nutzt Redis als zentralen Message Broker:

┌─────────────────┐    ┌─────────────────┐    ┌──────────────────┐
│   RSS Poller    │───▶│   Redis Pub/Sub │───▶│  GraphQL Server  │
│   Microservice  │    │   Event Bus     │    │  & Subscriptions │
└─────────────────┘    └─────────────────┘    └──────────────────┘
                                                        │
                                                        ▼
                                              ┌──────────────────┐
                                              │ WebSocket Clients│
                                              │ (Browser, Apps)  │
                                              └──────────────────┘

Diese Architektur bietet entscheidende Vorteile:

  • Fault Isolation: Ein Ausfall des RSS-Pollers beeinflusst nicht die GraphQL-API
  • Independent Scaling: Jeder Service kann unabhängig skaliert werden
  • Technology Diversity: Verschiedene Services können unterschiedliche Tech-Stacks nutzen
  • Event Replay: Redis ermöglicht Event-Wiederholung bei Service-Neustarts

Der RSS Poller: Intelligente Feed-Überwachung

Der RSS-Poller-Microservice ist das Herzstück der Datenerfassung. Er implementiert ein adaptives Polling-System, das sich dynamisch an die Aktualisierungsfrequenz der Datenquellen anpasst:

// src/rss-poller/services/AdaptivePoller.ts
export class AdaptivePoller {
  private calculateNextInterval(lastChangeTime: Date): number {
    const timeSinceChange = Date.now() - lastChangeTime.getTime();
    const hoursNoChanges = timeSinceChange / (1000 * 60 * 60);

    if (hoursNoChanges < 1) return 60000;      // 1 Minute
    if (hoursNoChanges < 24) return 300000;    // 5 Minuten
    return 900000;                             // 15 Minuten
  }

  async pollFeed(feedUrl: string): Promise<MeetingEvent[]> {
    const feed = await this.fetchAndParseFeed(feedUrl);
    const changes = await this.changeDetector.detectChanges(feed);

    for (const change of changes) {
      await this.eventPublisher.publishEvent(change);
    }

    return changes;
  }
}

Der MeetingChangeDetector erkennt dabei nicht nur neue Sitzungen, sondern auch Änderungen an bestehenden Terminen:

// src/rss-poller/services/MeetingChangeDetector.ts
interface MeetingEvent {
  eventId: string;
  type: 'meeting_created' | 'meeting_updated' | 'meeting_deleted';
  timestamp: string;
  meeting: {
    id: string;
    title: string;
    link: string;
    startTime: string;
    location?: string;
    organization: string;
  };
  correlationId: string;
  changeDetails?: {
    field: string;
    oldValue: any;
    newValue: any;
  }[];
}

Redis als Event-Backbone

Redis Pub/Sub fungiert als zentraler Message Broker und ermöglicht lose Kopplung zwischen den Services. Der RedisEventPublisher garantiert zuverlässige Event-Übertragung:

// src/rss-poller/services/RedisEventPublisher.ts
export class RedisEventPublisher {
  private readonly channel = 'oparl:meetings';

  async publishEvent(event: MeetingEvent): Promise<void> {
    const serializedEvent = JSON.stringify({
      ...event,
      metadata: {
        publishedAt: new Date().toISOString(),
        source: 'rss-poller',
        version: '1.0'
      }
    });

    await this.redis.publish(this.channel, serializedEvent);

    this.logger.info('Event published', {
      eventId: event.eventId,
      type: event.type,
      meetingTitle: event.meeting.title
    });
  }
}

GraphQL-Subscriptions: Echtzeit für Clients

Der GraphQL-Server empfängt Events über Redis und stellt sie Clients via WebSocket-Subscriptions zur Verfügung:

// src/schema/subscriptions/meetingSubscription.ts
builder.subscriptionField('meetingUpdates', (t) =>
  t.field({
    type: MeetingEventType,
    description: 'Echtzeit-Updates für parlamentarische Sitzungen',
    args: {
      organizationId: t.arg.string({ required: false }),
      eventTypes: t.arg.stringList({ required: false })
    },
    subscribe: async (_parent, args, context) => {
      const eventTarget = context.eventTarget;

      return eventTarget.subscribe('oparl:meetings', (event) => {
        // Client-seitige Filterung
        if (args.organizationId &&
            event.meeting.organizationId !== args.organizationId) {
          return false;
        }

        if (args.eventTypes &&
            !args.eventTypes.includes(event.type)) {
          return false;
        }

        return true;
      });
    },
    resolve: (event) => event
  })
);

Die Redis-GraphQL-Brücke

Der RedisEventTargetManager verbindet Redis Pub/Sub nahtlos mit GraphQL Yoga’s Event-System:

// src/server/redisEventTarget.ts
export class RedisEventTargetManager {
  private eventTarget: RedisEventTarget;

  constructor(redisClient: Redis) {
    this.eventTarget = new RedisEventTarget({
      redis: redisClient,
      serializer: JSON.stringify,
      deserializer: JSON.parse
    });
  }

  async subscribe(channel: string, filter?: EventFilter) {
    return this.eventTarget.subscribe(channel, (rawEvent) => {
      const event = this.validateAndTransformEvent(rawEvent);

      if (filter && !filter(event)) {
        return null; // Event wird gefiltert
      }

      return event;
    });
  }

  private validateAndTransformEvent(rawEvent: any): MeetingEvent {
    // Event-Validierung und Transformation
    if (!this.isValidMeetingEvent(rawEvent)) {
      throw new Error('Invalid event format');
    }

    return {
      ...rawEvent,
      timestamp: new Date(rawEvent.timestamp).toISOString()
    };
  }
}

Client-Integration: Echtzeit-Monitoring

Entwickler können die Echtzeit-Funktionalität einfach nutzen. Das Projekt enthält sogar ein praktisches Monitoring-Script:

// scripts/monitor-meeting-subscriptions.cjs
const { createClient } = require('graphql-ws');
const WebSocket = require('ws');

const client = createClient({
  url: 'ws://localhost:4000/graphql',
  webSocketImpl: WebSocket,
});

const subscription = `
  subscription MonitorMeetings {
    meetingUpdates {
      eventId
      type
      timestamp
      meeting {
        title
        startTime
        organization
        location
      }
    }
  }
`;

client.subscribe(
  { query: subscription },
  {
    next: (data) => {
      const event = data.data.meetingUpdates;
      console.log(`📅 ${event.type.toUpperCase()}: ${event.meeting.title}`);
      console.log(`   🏢 ${event.meeting.organization}`);
      console.log(`   🕐 ${new Date(event.meeting.startTime).toLocaleString()}`);

      if (event.meeting.location) {
        console.log(`   📍 ${event.meeting.location}`);
      }

      console.log('');
    },
    error: console.error,
    complete: () => console.log('Subscription completed')
  }
);

Datenfluss-Sequenz: Von RSS zu WebSocket

Die vollständige Event-Pipeline zeigt die Eleganz der Architektur:

  sequenceDiagram
    participant RSS as RSS Feed
    participant Poller as AdaptivePoller
    participant Detector as ChangeDetector
    participant Redis as Redis Pub/Sub
    participant GQL as GraphQL Server
    participant Client as WebSocket Client

    RSS->>Poller: RSS Feed Data
    Poller->>Detector: Parse & Analyze
    Detector->>Redis: Publish Event
    Redis->>GQL: Forward Event
    GQL->>Client: Push Update via WebSocket
    Client->>Client: Update UI in Realtime

Technische Vorteile der Architektur

1. Microservice-Isolation

Jeder Service hat eine klar definierte Verantwortung. Der RSS-Poller kann eigenständig entwickelt, getestet und deployed werden, ohne den GraphQL-Server zu beeinträchtigen.

2. Horizontal Skalierbarkeit

  • RSS-Poller: 1-2 Instanzen (mit Leader Election)
  • GraphQL-Server: N Instanzen für hohe Last
  • Redis: Cluster-Setup für Enterprise-Umgebungen

3. Event Sourcing Benefits

Alle Änderungen werden als Events gespeichert, ermöglicht:

  • Audit-Trails für parlamentarische Transparenz
  • Event-Replay bei System-Recovery
  • Historische Datenanalyse

4. Robuste Fehlerbehandlung

export class AdaptivePoller {
  async pollWithRetry(feedUrl: string, maxRetries = 3): Promise<MeetingEvent[]> {
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await this.pollFeed(feedUrl);
      } catch (error) {
        this.logger.warn(`Polling attempt ${attempt} failed`, { error, feedUrl });

        if (attempt === maxRetries) {
          this.metrics.incrementCounter('polling.failures.final');
          throw error;
        }

        await this.sleep(Math.pow(2, attempt) * 1000); // Exponential backoff
      }
    }
  }
}

Authentifizierung und Sicherheit

Produktive Systeme benötigen Zugriffskontrolle. OParl-Lite unterstützt JWT-basierte Authentifizierung:

// Subscription mit Authentifizierung
builder.subscriptionField('meetingUpdates', (t) =>
  t.field({
    type: MeetingEventType,
    authScopes: ['read:meetings'], // Benötigt Berechtigung
    subscribe: async (_parent, args, context) => {
      const user = context.user; // Aus JWT Token

      // Benutzer-spezifische Filterung
      const allowedOrganizations = user.permissions.organizations;

      return context.eventTarget.subscribe('oparl:meetings', (event) => {
        return allowedOrganizations.includes(event.meeting.organizationId);
      });
    }
  })
);

Deployment auf OpenShift

OParl-Lite ist für moderne Container-Orchestrierung optimiert:

# RSS Poller als separate DeploymentConfig
apiVersion: apps.openshift.io/v1
kind: DeploymentConfig
metadata:
  name: oparl-rss-poller
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: poller
        image: registry/oparl-lite/rss-poller:latest
        env:
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: redis-credentials
              key: url
        - name: RSS_FEEDS
          value: "https://www.bonn.de/ratsinformationssystem/rss"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"

Monitoring und Observability

Produktive Event-Systeme benötigen umfassendes Monitoring:

export class MetricsCollector {
  private registry = new promClient.Registry();

  private eventsPublished = new promClient.Counter({
    name: 'oparl_events_published_total',
    help: 'Total number of events published',
    labelNames: ['event_type', 'source']
  });

  private subscriptionConnections = new promClient.Gauge({
    name: 'oparl_active_subscriptions',
    help: 'Number of active WebSocket subscriptions'
  });

  recordEventPublished(eventType: string, source: string) {
    this.eventsPublished.inc({ event_type: eventType, source });
  }
}

Anwendungsfälle für Bürger und Journalisten

1. Journalismus-Dashboard

// Automatische Benachrichtigung für relevante Themen
const subscription = `
  subscription JournalismAlerts {
    meetingUpdates(
      eventTypes: ["meeting_created"]
    ) {
      meeting {
        title
        agendaItems {
          title
          description
        }
      }
    }
  }
`;

// KI-basierte Relevanz-Bewertung
client.subscribe({ query: subscription }, {
  next: (event) => {
    const relevanceScore = analyzeRelevance(event.meeting);
    if (relevanceScore > 0.8) {
      sendSlackNotification(event);
    }
  }
});

2. Bürger-Engagement-App

// Standort-basierte Benachrichtigungen
const subscription = `
  subscription LocalMeetings($location: String!) {
    meetingUpdates(organizationFilter: $location) {
      meeting {
        title
        startTime
        location
        publicParticipationAllowed
      }
    }
  }
`;

Performance und Skalierung

Bei hoher Last zeigt die Architektur ihre Stärken:

  • Redis Clustering: Horizontale Skalierung des Event-Backbones
  • GraphQL Federation: Verteilung der API-Last auf mehrere Server
  • Connection Pooling: Effiziente WebSocket-Verbindungsverwaltung
  • Event Batching: Reduzierung der Message-Overhead
export class BatchEventPublisher {
  private eventBuffer: MeetingEvent[] = [];
  private readonly batchSize = 50;
  private readonly flushInterval = 1000; // 1 Sekunde

  async publishEvent(event: MeetingEvent): Promise<void> {
    this.eventBuffer.push(event);

    if (this.eventBuffer.length >= this.batchSize) {
      await this.flushEvents();
    }
  }

  private async flushEvents(): Promise<void> {
    if (this.eventBuffer.length === 0) return;

    const batch = this.eventBuffer.splice(0);
    await this.redis.publish('oparl:meetings:batch', JSON.stringify(batch));
  }
}

Zukunft: AI und Machine Learning Integration

Die Event-Architektur ermöglicht innovative Erweiterungen:

// KI-basierte Trend-Erkennung
export class TrendAnalyzer {
  async analyzeEventStream(events: MeetingEvent[]): Promise<Trend[]> {
    const topics = events.map(e => this.extractTopics(e.meeting.title));
    const trends = await this.mlModel.detectTrends(topics, {
      timeWindow: '7d',
      significanceThreshold: 0.15
    });

    return trends.map(trend => ({
      topic: trend.topic,
      growth: trend.growthRate,
      relatedMeetings: trend.meetings,
      predictedImportance: trend.importance
    }));
  }
}

Fazit: Demokratie durch Technologie stärken

OParl-Lite zeigt exemplarisch, wie moderne Software-Architektur demokratische Prozesse unterstützen kann. Die Kombination aus Event-Driven Architecture, Microservices und Echtzeit-Kommunikation schafft eine Plattform, die:

  • Transparenz durch sofortige Information fördert
  • Bürgerbeteiligung durch niedrigschwellige Zugänge ermöglicht
  • Journalistische Arbeit durch automatische Alerts unterstützt
  • Entwicklern eine flexible, erweiterbare Basis bietet

Als Open-Source-Projekt lebt OParl-Lite vom Engagement der Community. Die saubere Architektur macht es Entwicklern leicht, eigene Features beizutragen – sei es KI-basierte Analyse, erweiterte Authentifizierung oder Integration mit anderen Civic-Tech-Projekten.

Die Demokratie der Zukunft ist digital vernetzt – OParl-Lite zeigt den technischen Weg dahin. 🚀


OParl-Lite ist verfügbar auf Codeberg Beiträge, Issues und Feature-Requests sind herzlich willkommen!