GoF以外のプログラミング・デザインパターン #11 Observer + Reactive Stream

2025/12/02

プログラミング 学習

t f B! P L
eyecatch Observerパターンをおさらいすると、「ある対象(Subject)」の状態変化を、複数の「監視者(Observer)」に通知すると言うパターンでした。 JSだと、EventEmitter や addEventListener が典型例のヤツですね。 通知が発生したときにイベントを受け取る、push型の基本です。 これに、Reactive Streams(リアクティブストリーム)と言うパターンを組み合わせるデザインパターンを紹介します。

Reactive Streams(リアクティブストリーム)とは?

非同期データの「流れ(stream)」を扱うための標準仕様です。 Observerを「連続的なデータストリーム」に拡張した考え方をします。 Netflix(RxJava)、ReactiveX、Project Reactor などで普及しているパターンみたいですね。

このパターンの特徴

・データが「1回」ではなく「継続的」に流れる。 ・Observerが「データ・エラー・完了」を3種類で受け取る。 ・Backpressure(過剰なデータ流入制御)を考慮できる。

Reactive Streamsの基本構造

役割 説明
Publisher データを発行する側(Observable)
Subscriber データを受け取る側(Observer)
Subscription 購読契約(キャンセル・要求制御)
Processor Publisher と Subscriber の両方(中間処理)

サンプルコード(RxJS風)

import { fromEvent } from 'rxjs'; import { map, filter } from 'rxjs/operators'; // Publisher(データ源) const clicks = fromEvent(document, 'click'); // Processor(データ変換) const coords = clicks.pipe( map(event => ({ x: event.clientX, y: event.clientY })), filter(pos => pos.x < 500) // 条件フィルタ ); // Subscriber(購読者) coords.subscribe({ next: pos => console.log('Click at', pos), error: err => console.error('Error:', err), complete: () => console.log('Complete') });

ポイント解説

・fromEvent() が「Publisher」 ・pipe() 内が「Processor」 ・subscribe() が「Observer」 ・イベントが連続して「ストリーム(flow)」として流れる

Observer と Reactive Streams の関係まとめ

概念 Observerパターン Reactive Streams
データ通知 単発イベント 継続的ストリーム
データ方向 一方向(push) 双方向(push + demand)
構造 Subject / Observer Publisher / Subscriber / Processor
制御 通知のみ フロー制御(Backpressure)あり
EventEmitter, DOMイベント RxJS, Reactor, Akka Streams

わかりやすく例えてみる

ラジオに例えると

Observerは、放送を聞くだけに対して、 Reactive Streamsは、放送局に「聞きたい番組だけ送って」と依頼できる、リアルタイム相互放送のようなモノ。

パイプで例えると

Observerは、水が流れるだけに対して、 Reactive Streamsは、水量を調整できるバルブがついている状態。

まとめ

・Reactive StreamsはObserverの発展形。 ・Observerは「単発イベント通知」、Reactiveは「連続データの流れ+制御」。 ・JSなら RxJS が、Javaなら Flow API / Reactor がその代表実装。 ・非同期・リアルタイム処理・イベント連鎖などに最適。

Canvasを使ったデモ

Canvas上で「Publisher → Processor → Subscriber」の流れをアニメーションで可視化するデモを作ってみました。 Publisherが発行する「イベント粒子」がProcessorで変換され、Subscriberに届く様子を見られます。 コード内で Publisher.emitRate や Processor の map / filter を調整でき、Backpressure(制限)風の動作も簡易表現しています。

sample.html

<!DOCTYPE html> <html lang="ja"> <head> <meta charset="utf-8" /> <title>Reactive Stream Visualization (Publisher → Processor → Subscriber)</title> <style> body { font-family: sans-serif; display:flex; flex-direction:column; align-items:center; padding:20px; background:#f7f7f8; } canvas { border:1px solid #ddd; background:#fff; } .controls { margin:12px 0; } label { margin-right:8px; } button { margin-left:8px; padding:6px 10px; } </style> </head> <body> <h2>Reactive Stream可視化デモ</h2> <div class="controls"> <label>emit rate(ms): <input id="rate" type="number" value="600" style="width:80px"></label> <label>filter threshold (value ≥): <input id="filter" type="number" value="40" style="width:60px"></label> <label>map +add: <input id="mapadd" type="number" value="0" style="width:60px"></label> <button id="apply">適用</button> <button id="toggle">Pause</button> <span id="info" style="margin-left:12px;color:#666"></span> </div> <canvas id="c" width="900" height="320"></canvas> <script> /* 構成: - Publisher: 定期的にイベント(粒子)を作る - Processor: map/filter を持ち、流れてきたイベントを変換(あるいは破棄)する - Subscriber: 最終的に受け取りログと受信アニメをする 可視化: - 左: Publisherボックス、中: Processorボックス、右: Subscriberボックス - 粒子が流れていき、色や形で変換を表現 */ const canvas = document.getElementById('c'); const ctx = canvas.getContext('2d'); const W = canvas.width, H = canvas.height; const boxes = { pub: { x: 60, y: 80, w: 160, h: 160, label: 'Publisher' }, proc: { x: 370, y: 80, w: 160, h: 160, label: 'Processor' }, sub: { x: 680, y: 80, w: 160, h: 160, label: 'Subscriber' }, }; let running = true; let emitRate = 600; let filterThreshold = 40; let mapAdd = 0; document.getElementById('apply').onclick = () => { emitRate = Math.max(50, Number(document.getElementById('rate').value) || 600); filterThreshold = Number(document.getElementById('filter').value) || 40; mapAdd = Number(document.getElementById('mapadd').value) || 0; publisher.setRate(emitRate); info(); }; document.getElementById('toggle').onclick = () => { running = !running; document.getElementById('toggle').textContent = running ? 'Pause' : 'Resume'; info(); }; function info() { document.getElementById('info').textContent = `rate=${emitRate}ms filter≥${filterThreshold} map+${mapAdd} running=${running}`; } info(); /* --- シンプルな"Reactive"実体 --- */ class Particle { constructor(value) { this.value = value; this.x = boxes.pub.x + boxes.pub.w; this.y = boxes.pub.y + boxes.pub.h / 2 + (Math.random() - 0.5) * 40; this.radius = 10; this.stage = 'pub'; // pub -> proc -> sub -> done this.color = '#3498db'; this.age = 0; } draw(ctx) { ctx.beginPath(); ctx.fillStyle = this.color; ctx.arc(this.x, this.y, this.radius, 0, Math.PI*2); ctx.fill(); ctx.fillStyle = '#fff'; ctx.font = '10px sans-serif'; ctx.textAlign = 'center'; ctx.fillText(Math.round(this.value), this.x, this.y + 3); } } const particles = []; // 移動中の粒子 const receivedLog = []; // Subscriberでの受信履歴(表示用) /* --- Publisher --- */ const publisher = { counter: 0, rate: emitRate, timer: null, setRate(ms) { this.rate = ms; if (this.timer) this.restart(); }, start() { this.timer = setInterval(() => this.emit(), this.rate); }, stop() { clearInterval(this.timer); this.timer = null; }, restart() { this.stop(); if (running) this.start(); }, emit() { if (!running) return; const v = Math.floor(Math.random()*100); // 0..99 の値を発行 const p = new Particle(v); p.color = '#3498db'; particles.push(p); // visually bump pulse(boxes.pub); } }; /* --- Processor (map + filter) --- */ const processor = { map: (v) => v + mapAdd, filter: (v) => v >= filterThreshold, active: true, processParticle(p) { // arrival visual (change color briefly) pulse(boxes.proc); // simulate processing delay return new Promise(resolve => { setTimeout(() => { const newVal = this.map(p.value); const ok = this.filter(newVal); resolve({ ok, newVal }); }, 300); // fixed processing delay }); } }; /* --- Subscriber --- */ const subscriber = { consume(p) { // mark received p.color = '#2ecc71'; p.stage = 'sub'; receivedLog.unshift({ time: new Date(), value: p.value }); if (receivedLog.length > 6) receivedLog.pop(); // short visual pulse pulse(boxes.sub); } }; /* --- visual helpers --- */ let pulses = []; function pulse(box) { pulses.push({ box, t: 0 }); } /* --- animate & movement rules --- */ function update(dt) { // move particles according to their stage particles.forEach((p, idx) => { p.age += dt; if (p.stage === 'pub') { // move to processor entry const targetX = boxes.proc.x - 20; const speed = 0.15 * dt; p.x += (targetX - p.x) * Math.min(1, speed); if (p.x >= targetX - 1) { // arrived at processor: process then either forward or drop p.stage = 'processing'; // keep reference before async change const particle = p; processor.processParticle(particle).then(res => { // update value if passed if (res.ok) { particle.value = res.newVal; particle.color = '#f1c40f'; // processed color particle.stage = 'procToSub'; } else { // dropped: stage -> done (fade) particle.stage = 'dropped'; particle.color = '#e74c3c'; } }); } } else if (p.stage === 'procToSub') { const targetX = boxes.sub.x - 20; const speed = 0.12 * dt; p.x += (targetX - p.x) * Math.min(1, speed); // move vertically towards center of subscriber p.y += ((boxes.sub.y + boxes.sub.h/2) - p.y) * 0.02 * dt; if (p.x >= targetX - 1) { // arrived at subscriber subscriber.consume(p); // mark done after short delay setTimeout(() => { p.stage = 'done'; }, 400); } } else if (p.stage === 'dropped') { p.y += 0.02 * dt; p.radius *= 0.995; if (p.radius < 2) p.stage = 'done'; } else if (p.stage === 'sub') { // visual linger then done if (p.age > 1200) p.stage = 'done'; } }); // remove done particles for (let i = particles.length-1; i>=0; i--) if (particles[i].stage === 'done') particles.splice(i,1); } function draw() { ctx.clearRect(0,0,W,H); // boxes Object.values(boxes).forEach(b => { ctx.fillStyle = '#f0f0f0'; ctx.fillRect(b.x, b.y, b.w, b.h); ctx.strokeStyle = '#bbb'; ctx.strokeRect(b.x, b.y, b.w, b.h); ctx.fillStyle = '#333'; ctx.font = '16px sans-serif'; ctx.textAlign = 'center'; ctx.fillText(b.label, b.x + b.w/2, b.y + 22); }); // arrows drawArrow(boxes.pub.x + boxes.pub.w, boxes.pub.y + boxes.pub.h/2, boxes.proc.x, boxes.proc.y + boxes.proc.h/2, 'publish → process'); drawArrow(boxes.proc.x + boxes.proc.w, boxes.proc.y + boxes.proc.h/2, boxes.sub.x, boxes.sub.y + boxes.sub.h/2, 'process → subscribe'); // pulses pulses.forEach(p => { p.t += 16; const alpha = Math.max(0, 1 - p.t/400); const b = p.box; ctx.strokeStyle = `rgba(100,150,250,${alpha})`; ctx.lineWidth = 3; ctx.strokeRect(b.x - 4, b.y - 4, b.w + 8, b.h + 8); }); pulses = pulses.filter(p => p.t < 420); // particles particles.forEach(p => p.draw(ctx)); // subscriber log ctx.fillStyle = '#111'; ctx.font = '12px monospace'; ctx.textAlign = 'left'; ctx.fillText('Subscriber received:', boxes.sub.x, boxes.sub.y + boxes.sub.h + 18); receivedLog.forEach((r,i) => { const txt = `${r.time.toLocaleTimeString()} : ${Math.round(r.value)}`; ctx.fillText(txt, boxes.sub.x, boxes.sub.y + boxes.sub.h + 36 + i * 14); }); } function drawArrow(x1,y1,x2,y2,label='') { ctx.strokeStyle = '#888'; ctx.lineWidth = 2; ctx.beginPath(); ctx.moveTo(x1,y1); ctx.lineTo(x2,y2); ctx.stroke(); // arrowhead const angle = Math.atan2(y2-y1,x2-x1); const len = 8; ctx.beginPath(); ctx.moveTo(x2,y2); ctx.lineTo(x2 - len*Math.cos(angle - 0.3), y2 - len*Math.sin(angle - 0.3)); ctx.lineTo(x2 - len*Math.cos(angle + 0.3), y2 - len*Math.sin(angle + 0.3)); ctx.closePath(); ctx.fillStyle = '#888'; ctx.fill(); // label if (label) { ctx.font = '12px sans-serif'; ctx.fillStyle = '#666'; ctx.textAlign = 'center'; ctx.fillText(label, (x1+x2)/2, (y1+y2)/2 - 8); } } /* --- main loop --- */ let last = performance.now(); function loop(now) { const dt = now - last; last = now; update(dt); draw(); requestAnimationFrame(loop); } requestAnimationFrame(loop); /* --- start publisher --- */ publisher.start(); /* --- bind ui updates to params (apply already sets) --- */ (function bindControls() { // adjust ui when inputs change document.getElementById('rate').addEventListener('change', () => { document.getElementById('apply').click(); }); document.getElementById('filter').addEventListener('change', () => { document.getElementById('apply').click(); }); document.getElementById('mapadd').addEventListener('change', () => { document.getElementById('apply').click(); }); })(); /* keep processor's functions in sync with controls */ setInterval(() => { processor.map = v => v + mapAdd; processor.filter = v => v >= filterThreshold; }, 200); /* pause/resume publisher timer when running toggles */ setInterval(() => { if (running && !publisher.timer) publisher.start(); if (!running && publisher.timer) publisher.stop(); }, 250); </script> </body> </html>

ポイント解説

・青い粒子がPublisherから出る。 ・黄色はProcessor通過後(map適用),赤はfilterで破棄(dropped)。 ・緑がSubscriber受信を示す。 ・コントロールで発行レート・filter閾値・map加算値を変更可能。

あとがき

Observer内で、まとまった処理が綺麗なコードで書くことができるのがいいですね。 慣れないと分かりにくいかもしれませんが、イベント処理をたくさん書いていくと、コードが汚くなってしまうので、こうした効率のいいデザインパターンを理解して扱いたいですね。

人気の投稿

このブログを検索

ごあいさつ

このWebサイトは、独自思考で我が道を行くユゲタの少し尖った思考のTechブログです。 毎日興味がどんどん切り替わるので、テーマはマルチになっています。 もしかしたらアイデアに困っている人の助けになるかもしれません。

ブログ アーカイブ