Echtzeit-Daten: Die Event-Driven Architektur von OParl-Lite
Über das Wochenende habe ich die Event-Driven Architektur von OParl-Lite weiter implementiert, vor allem um https://bonn.social/@Sascha/115045394853309363 zu unterstützen und Echtzeit-Daten für alle zugänglich zu machen.
Transparenz in der Politik erfordert nicht nur Zugang zu Informationen, sondern auch deren zeitnahe Verfügbarkeit. Das Open-Source-Projekt OParl-Lite zeigt, wie moderne Event-Driven Architecture parlamentarische Daten in Echtzeit zu Bürgern und Anwendungen bringen kann. In diesem Artikel beleuchten wir die technische Architektur eines Systems, das RSS-Feeds, Redis Pub/Sub und GraphQL-Subscriptions zu einer leistungsstarken Echtzeit-Pipeline verbindet.
Die Vision: Demokratie in Echtzeit
Stellen Sie sich vor, Journalisten erfahren sofort von neuen Stadtratssitzungen, Bürger werden automatisch über relevante Tagesordnungspunkte benachrichtigt, und Watchdog-Organisationen können parlamentarische Aktivitäten in Echtzeit überwachen. OParl-Lite macht diese Vision technisch umsetzbar durch eine durchdachte Microservice-Architektur.

Technologie, Politik, Echtzeit-Daten
Architektur-Überblick: Loosely Coupled, Highly Scalable
Die Echtzeit-Datenfluss-Architektur von OParl-Lite folgt dem Prinzip der Separation of Concerns und nutzt Redis als zentralen Message Broker:
┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ RSS Poller │───▶│ Redis Pub/Sub │───▶│ GraphQL Server │
│ Microservice │ │ Event Bus │ │ & Subscriptions │
└─────────────────┘ └─────────────────┘ └──────────────────┘
│
▼
┌──────────────────┐
│ WebSocket Clients│
│ (Browser, Apps) │
└──────────────────┘Diese Architektur bietet entscheidende Vorteile:
- Fault Isolation: Ein Ausfall des RSS-Pollers beeinflusst nicht die GraphQL-API
- Independent Scaling: Jeder Service kann unabhängig skaliert werden
- Technology Diversity: Verschiedene Services können unterschiedliche Tech-Stacks nutzen
- Event Replay: Redis ermöglicht Event-Wiederholung bei Service-Neustarts
Der RSS Poller: Intelligente Feed-Überwachung
Der RSS-Poller-Microservice ist das Herzstück der Datenerfassung. Er implementiert ein adaptives Polling-System, das sich dynamisch an die Aktualisierungsfrequenz der Datenquellen anpasst:
// src/rss-poller/services/AdaptivePoller.ts
export class AdaptivePoller {
private calculateNextInterval(lastChangeTime: Date): number {
const timeSinceChange = Date.now() - lastChangeTime.getTime();
const hoursNoChanges = timeSinceChange / (1000 * 60 * 60);
if (hoursNoChanges < 1) return 60000; // 1 Minute
if (hoursNoChanges < 24) return 300000; // 5 Minuten
return 900000; // 15 Minuten
}
async pollFeed(feedUrl: string): Promise<MeetingEvent[]> {
const feed = await this.fetchAndParseFeed(feedUrl);
const changes = await this.changeDetector.detectChanges(feed);
for (const change of changes) {
await this.eventPublisher.publishEvent(change);
}
return changes;
}
}Der MeetingChangeDetector erkennt dabei nicht nur neue Sitzungen, sondern auch Änderungen an bestehenden Terminen:
// src/rss-poller/services/MeetingChangeDetector.ts
interface MeetingEvent {
eventId: string;
type: 'meeting_created' | 'meeting_updated' | 'meeting_deleted';
timestamp: string;
meeting: {
id: string;
title: string;
link: string;
startTime: string;
location?: string;
organization: string;
};
correlationId: string;
changeDetails?: {
field: string;
oldValue: any;
newValue: any;
}[];
}Redis als Event-Backbone
Redis Pub/Sub fungiert als zentraler Message Broker und ermöglicht lose Kopplung zwischen den Services. Der RedisEventPublisher garantiert zuverlässige Event-Übertragung:
// src/rss-poller/services/RedisEventPublisher.ts
export class RedisEventPublisher {
private readonly channel = 'oparl:meetings';
async publishEvent(event: MeetingEvent): Promise<void> {
const serializedEvent = JSON.stringify({
...event,
metadata: {
publishedAt: new Date().toISOString(),
source: 'rss-poller',
version: '1.0'
}
});
await this.redis.publish(this.channel, serializedEvent);
this.logger.info('Event published', {
eventId: event.eventId,
type: event.type,
meetingTitle: event.meeting.title
});
}
}GraphQL-Subscriptions: Echtzeit für Clients
Der GraphQL-Server empfängt Events über Redis und stellt sie Clients via WebSocket-Subscriptions zur Verfügung:
// src/schema/subscriptions/meetingSubscription.ts
builder.subscriptionField('meetingUpdates', (t) =>
t.field({
type: MeetingEventType,
description: 'Echtzeit-Updates für parlamentarische Sitzungen',
args: {
organizationId: t.arg.string({ required: false }),
eventTypes: t.arg.stringList({ required: false })
},
subscribe: async (_parent, args, context) => {
const eventTarget = context.eventTarget;
return eventTarget.subscribe('oparl:meetings', (event) => {
// Client-seitige Filterung
if (args.organizationId &&
event.meeting.organizationId !== args.organizationId) {
return false;
}
if (args.eventTypes &&
!args.eventTypes.includes(event.type)) {
return false;
}
return true;
});
},
resolve: (event) => event
})
);Die Redis-GraphQL-Brücke
Der RedisEventTargetManager verbindet Redis Pub/Sub nahtlos mit GraphQL Yoga’s Event-System:
// src/server/redisEventTarget.ts
export class RedisEventTargetManager {
private eventTarget: RedisEventTarget;
constructor(redisClient: Redis) {
this.eventTarget = new RedisEventTarget({
redis: redisClient,
serializer: JSON.stringify,
deserializer: JSON.parse
});
}
async subscribe(channel: string, filter?: EventFilter) {
return this.eventTarget.subscribe(channel, (rawEvent) => {
const event = this.validateAndTransformEvent(rawEvent);
if (filter && !filter(event)) {
return null; // Event wird gefiltert
}
return event;
});
}
private validateAndTransformEvent(rawEvent: any): MeetingEvent {
// Event-Validierung und Transformation
if (!this.isValidMeetingEvent(rawEvent)) {
throw new Error('Invalid event format');
}
return {
...rawEvent,
timestamp: new Date(rawEvent.timestamp).toISOString()
};
}
}Client-Integration: Echtzeit-Monitoring
Entwickler können die Echtzeit-Funktionalität einfach nutzen. Das Projekt enthält sogar ein praktisches Monitoring-Script:
// scripts/monitor-meeting-subscriptions.cjs
const { createClient } = require('graphql-ws');
const WebSocket = require('ws');
const client = createClient({
url: 'ws://localhost:4000/graphql',
webSocketImpl: WebSocket,
});
const subscription = `
subscription MonitorMeetings {
meetingUpdates {
eventId
type
timestamp
meeting {
title
startTime
organization
location
}
}
}
`;
client.subscribe(
{ query: subscription },
{
next: (data) => {
const event = data.data.meetingUpdates;
console.log(`📅 ${event.type.toUpperCase()}: ${event.meeting.title}`);
console.log(` 🏢 ${event.meeting.organization}`);
console.log(` 🕐 ${new Date(event.meeting.startTime).toLocaleString()}`);
if (event.meeting.location) {
console.log(` 📍 ${event.meeting.location}`);
}
console.log('');
},
error: console.error,
complete: () => console.log('Subscription completed')
}
);Datenfluss-Sequenz: Von RSS zu WebSocket
Die vollständige Event-Pipeline zeigt die Eleganz der Architektur:
sequenceDiagram
participant RSS as RSS Feed
participant Poller as AdaptivePoller
participant Detector as ChangeDetector
participant Redis as Redis Pub/Sub
participant GQL as GraphQL Server
participant Client as WebSocket Client
RSS->>Poller: RSS Feed Data
Poller->>Detector: Parse & Analyze
Detector->>Redis: Publish Event
Redis->>GQL: Forward Event
GQL->>Client: Push Update via WebSocket
Client->>Client: Update UI in Realtime
Technische Vorteile der Architektur
1. Microservice-Isolation
Jeder Service hat eine klar definierte Verantwortung. Der RSS-Poller kann eigenständig entwickelt, getestet und deployed werden, ohne den GraphQL-Server zu beeinträchtigen.
2. Horizontal Skalierbarkeit
- RSS-Poller: 1-2 Instanzen (mit Leader Election)
- GraphQL-Server: N Instanzen für hohe Last
- Redis: Cluster-Setup für Enterprise-Umgebungen
3. Event Sourcing Benefits
Alle Änderungen werden als Events gespeichert, ermöglicht:
- Audit-Trails für parlamentarische Transparenz
- Event-Replay bei System-Recovery
- Historische Datenanalyse
4. Robuste Fehlerbehandlung
export class AdaptivePoller {
async pollWithRetry(feedUrl: string, maxRetries = 3): Promise<MeetingEvent[]> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.pollFeed(feedUrl);
} catch (error) {
this.logger.warn(`Polling attempt ${attempt} failed`, { error, feedUrl });
if (attempt === maxRetries) {
this.metrics.incrementCounter('polling.failures.final');
throw error;
}
await this.sleep(Math.pow(2, attempt) * 1000); // Exponential backoff
}
}
}
}Authentifizierung und Sicherheit
Produktive Systeme benötigen Zugriffskontrolle. OParl-Lite unterstützt JWT-basierte Authentifizierung:
// Subscription mit Authentifizierung
builder.subscriptionField('meetingUpdates', (t) =>
t.field({
type: MeetingEventType,
authScopes: ['read:meetings'], // Benötigt Berechtigung
subscribe: async (_parent, args, context) => {
const user = context.user; // Aus JWT Token
// Benutzer-spezifische Filterung
const allowedOrganizations = user.permissions.organizations;
return context.eventTarget.subscribe('oparl:meetings', (event) => {
return allowedOrganizations.includes(event.meeting.organizationId);
});
}
})
);Deployment auf OpenShift
OParl-Lite ist für moderne Container-Orchestrierung optimiert:
# RSS Poller als separate DeploymentConfig
apiVersion: apps.openshift.io/v1
kind: DeploymentConfig
metadata:
name: oparl-rss-poller
spec:
replicas: 1
template:
spec:
containers:
- name: poller
image: registry/oparl-lite/rss-poller:latest
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: redis-credentials
key: url
- name: RSS_FEEDS
value: "https://www.bonn.de/ratsinformationssystem/rss"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"Monitoring und Observability
Produktive Event-Systeme benötigen umfassendes Monitoring:
export class MetricsCollector {
private registry = new promClient.Registry();
private eventsPublished = new promClient.Counter({
name: 'oparl_events_published_total',
help: 'Total number of events published',
labelNames: ['event_type', 'source']
});
private subscriptionConnections = new promClient.Gauge({
name: 'oparl_active_subscriptions',
help: 'Number of active WebSocket subscriptions'
});
recordEventPublished(eventType: string, source: string) {
this.eventsPublished.inc({ event_type: eventType, source });
}
}Anwendungsfälle für Bürger und Journalisten
1. Journalismus-Dashboard
// Automatische Benachrichtigung für relevante Themen
const subscription = `
subscription JournalismAlerts {
meetingUpdates(
eventTypes: ["meeting_created"]
) {
meeting {
title
agendaItems {
title
description
}
}
}
}
`;
// KI-basierte Relevanz-Bewertung
client.subscribe({ query: subscription }, {
next: (event) => {
const relevanceScore = analyzeRelevance(event.meeting);
if (relevanceScore > 0.8) {
sendSlackNotification(event);
}
}
});2. Bürger-Engagement-App
// Standort-basierte Benachrichtigungen
const subscription = `
subscription LocalMeetings($location: String!) {
meetingUpdates(organizationFilter: $location) {
meeting {
title
startTime
location
publicParticipationAllowed
}
}
}
`;Performance und Skalierung
Bei hoher Last zeigt die Architektur ihre Stärken:
- Redis Clustering: Horizontale Skalierung des Event-Backbones
- GraphQL Federation: Verteilung der API-Last auf mehrere Server
- Connection Pooling: Effiziente WebSocket-Verbindungsverwaltung
- Event Batching: Reduzierung der Message-Overhead
export class BatchEventPublisher {
private eventBuffer: MeetingEvent[] = [];
private readonly batchSize = 50;
private readonly flushInterval = 1000; // 1 Sekunde
async publishEvent(event: MeetingEvent): Promise<void> {
this.eventBuffer.push(event);
if (this.eventBuffer.length >= this.batchSize) {
await this.flushEvents();
}
}
private async flushEvents(): Promise<void> {
if (this.eventBuffer.length === 0) return;
const batch = this.eventBuffer.splice(0);
await this.redis.publish('oparl:meetings:batch', JSON.stringify(batch));
}
}Zukunft: AI und Machine Learning Integration
Die Event-Architektur ermöglicht innovative Erweiterungen:
// KI-basierte Trend-Erkennung
export class TrendAnalyzer {
async analyzeEventStream(events: MeetingEvent[]): Promise<Trend[]> {
const topics = events.map(e => this.extractTopics(e.meeting.title));
const trends = await this.mlModel.detectTrends(topics, {
timeWindow: '7d',
significanceThreshold: 0.15
});
return trends.map(trend => ({
topic: trend.topic,
growth: trend.growthRate,
relatedMeetings: trend.meetings,
predictedImportance: trend.importance
}));
}
}Fazit: Demokratie durch Technologie stärken
OParl-Lite zeigt exemplarisch, wie moderne Software-Architektur demokratische Prozesse unterstützen kann. Die Kombination aus Event-Driven Architecture, Microservices und Echtzeit-Kommunikation schafft eine Plattform, die:
- Transparenz durch sofortige Information fördert
- Bürgerbeteiligung durch niedrigschwellige Zugänge ermöglicht
- Journalistische Arbeit durch automatische Alerts unterstützt
- Entwicklern eine flexible, erweiterbare Basis bietet
Als Open-Source-Projekt lebt OParl-Lite vom Engagement der Community. Die saubere Architektur macht es Entwicklern leicht, eigene Features beizutragen – sei es KI-basierte Analyse, erweiterte Authentifizierung oder Integration mit anderen Civic-Tech-Projekten.
Die Demokratie der Zukunft ist digital vernetzt – OParl-Lite zeigt den technischen Weg dahin. 🚀
OParl-Lite ist verfügbar auf Codeberg Beiträge, Issues und Feature-Requests sind herzlich willkommen!