
Observerパターンをおさらいすると、「ある対象(Subject)」の状態変化を、複数の「監視者(Observer)」に通知すると言うパターンでした。
JSだと、EventEmitter や addEventListener が典型例のヤツですね。
通知が発生したときにイベントを受け取る、push型の基本です。
これに、Reactive Streams(リアクティブストリーム)と言うパターンを組み合わせるデザインパターンを紹介します。
Reactive Streams(リアクティブストリーム)とは?
非同期データの「流れ(stream)」を扱うための標準仕様です。
Observerを「連続的なデータストリーム」に拡張した考え方をします。
Netflix(RxJava)、ReactiveX、Project Reactor などで普及しているパターンみたいですね。
このパターンの特徴
・データが「1回」ではなく「継続的」に流れる。
・Observerが「データ・エラー・完了」を3種類で受け取る。
・Backpressure(過剰なデータ流入制御)を考慮できる。
Reactive Streamsの基本構造
| 役割 |
説明 |
| Publisher |
データを発行する側(Observable) |
| Subscriber |
データを受け取る側(Observer) |
| Subscription |
購読契約(キャンセル・要求制御) |
| Processor |
Publisher と Subscriber の両方(中間処理) |
サンプルコード(RxJS風)
import { fromEvent } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Publisher(データ源)
const clicks = fromEvent(document, 'click');
// Processor(データ変換)
const coords = clicks.pipe(
map(event => ({ x: event.clientX, y: event.clientY })),
filter(pos => pos.x < 500) // 条件フィルタ
);
// Subscriber(購読者)
coords.subscribe({
next: pos => console.log('Click at', pos),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
ポイント解説
・fromEvent() が「Publisher」
・pipe() 内が「Processor」
・subscribe() が「Observer」
・イベントが連続して「ストリーム(flow)」として流れる
Observer と Reactive Streams の関係まとめ
| 概念 |
Observerパターン |
Reactive Streams |
| データ通知 |
単発イベント |
継続的ストリーム |
| データ方向 |
一方向(push) |
双方向(push + demand) |
| 構造 |
Subject / Observer |
Publisher / Subscriber / Processor |
| 制御 |
通知のみ |
フロー制御(Backpressure)あり |
| 例 |
EventEmitter, DOMイベント |
RxJS, Reactor, Akka Streams |
わかりやすく例えてみる
ラジオに例えると
Observerは、放送を聞くだけに対して、
Reactive Streamsは、放送局に「聞きたい番組だけ送って」と依頼できる、リアルタイム相互放送のようなモノ。
パイプで例えると
Observerは、水が流れるだけに対して、
Reactive Streamsは、水量を調整できるバルブがついている状態。
まとめ
・Reactive StreamsはObserverの発展形。
・Observerは「単発イベント通知」、Reactiveは「連続データの流れ+制御」。
・JSなら RxJS が、Javaなら Flow API / Reactor がその代表実装。
・非同期・リアルタイム処理・イベント連鎖などに最適。
Canvasを使ったデモ
Canvas上で「Publisher → Processor → Subscriber」の流れをアニメーションで可視化するデモを作ってみました。
Publisherが発行する「イベント粒子」がProcessorで変換され、Subscriberに届く様子を見られます。
コード内で Publisher.emitRate や Processor の map / filter を調整でき、Backpressure(制限)風の動作も簡易表現しています。
sample.html
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="utf-8" />
<title>Reactive Stream Visualization (Publisher → Processor → Subscriber)</title>
<style>
body { font-family: sans-serif; display:flex; flex-direction:column; align-items:center; padding:20px; background:#f7f7f8; }
canvas { border:1px solid #ddd; background:#fff; }
.controls { margin:12px 0; }
label { margin-right:8px; }
button { margin-left:8px; padding:6px 10px; }
</style>
</head>
<body>
<h2>Reactive Stream可視化デモ</h2>
<div class="controls">
<label>emit rate(ms): <input id="rate" type="number" value="600" style="width:80px"></label>
<label>filter threshold (value ≥): <input id="filter" type="number" value="40" style="width:60px"></label>
<label>map +add: <input id="mapadd" type="number" value="0" style="width:60px"></label>
<button id="apply">適用</button>
<button id="toggle">Pause</button>
<span id="info" style="margin-left:12px;color:#666"></span>
</div>
<canvas id="c" width="900" height="320"></canvas>
<script>
/*
構成:
- Publisher: 定期的にイベント(粒子)を作る
- Processor: map/filter を持ち、流れてきたイベントを変換(あるいは破棄)する
- Subscriber: 最終的に受け取りログと受信アニメをする
可視化:
- 左: Publisherボックス、中: Processorボックス、右: Subscriberボックス
- 粒子が流れていき、色や形で変換を表現
*/
const canvas = document.getElementById('c');
const ctx = canvas.getContext('2d');
const W = canvas.width, H = canvas.height;
const boxes = {
pub: { x: 60, y: 80, w: 160, h: 160, label: 'Publisher' },
proc: { x: 370, y: 80, w: 160, h: 160, label: 'Processor' },
sub: { x: 680, y: 80, w: 160, h: 160, label: 'Subscriber' },
};
let running = true;
let emitRate = 600;
let filterThreshold = 40;
let mapAdd = 0;
document.getElementById('apply').onclick = () => {
emitRate = Math.max(50, Number(document.getElementById('rate').value) || 600);
filterThreshold = Number(document.getElementById('filter').value) || 40;
mapAdd = Number(document.getElementById('mapadd').value) || 0;
publisher.setRate(emitRate);
info();
};
document.getElementById('toggle').onclick = () => {
running = !running;
document.getElementById('toggle').textContent = running ? 'Pause' : 'Resume';
info();
};
function info() {
document.getElementById('info').textContent =
`rate=${emitRate}ms filter≥${filterThreshold} map+${mapAdd} running=${running}`;
}
info();
/* --- シンプルな"Reactive"実体 --- */
class Particle {
constructor(value) {
this.value = value;
this.x = boxes.pub.x + boxes.pub.w;
this.y = boxes.pub.y + boxes.pub.h / 2 + (Math.random() - 0.5) * 40;
this.radius = 10;
this.stage = 'pub'; // pub -> proc -> sub -> done
this.color = '#3498db';
this.age = 0;
}
draw(ctx) {
ctx.beginPath();
ctx.fillStyle = this.color;
ctx.arc(this.x, this.y, this.radius, 0, Math.PI*2);
ctx.fill();
ctx.fillStyle = '#fff';
ctx.font = '10px sans-serif';
ctx.textAlign = 'center';
ctx.fillText(Math.round(this.value), this.x, this.y + 3);
}
}
const particles = []; // 移動中の粒子
const receivedLog = []; // Subscriberでの受信履歴(表示用)
/* --- Publisher --- */
const publisher = {
counter: 0,
rate: emitRate,
timer: null,
setRate(ms) { this.rate = ms; if (this.timer) this.restart(); },
start() { this.timer = setInterval(() => this.emit(), this.rate); },
stop() { clearInterval(this.timer); this.timer = null; },
restart() { this.stop(); if (running) this.start(); },
emit() {
if (!running) return;
const v = Math.floor(Math.random()*100); // 0..99 の値を発行
const p = new Particle(v);
p.color = '#3498db';
particles.push(p);
// visually bump
pulse(boxes.pub);
}
};
/* --- Processor (map + filter) --- */
const processor = {
map: (v) => v + mapAdd,
filter: (v) => v >= filterThreshold,
active: true,
processParticle(p) {
// arrival visual (change color briefly)
pulse(boxes.proc);
// simulate processing delay
return new Promise(resolve => {
setTimeout(() => {
const newVal = this.map(p.value);
const ok = this.filter(newVal);
resolve({ ok, newVal });
}, 300); // fixed processing delay
});
}
};
/* --- Subscriber --- */
const subscriber = {
consume(p) {
// mark received
p.color = '#2ecc71';
p.stage = 'sub';
receivedLog.unshift({ time: new Date(), value: p.value });
if (receivedLog.length > 6) receivedLog.pop();
// short visual pulse
pulse(boxes.sub);
}
};
/* --- visual helpers --- */
let pulses = [];
function pulse(box) {
pulses.push({ box, t: 0 });
}
/* --- animate & movement rules --- */
function update(dt) {
// move particles according to their stage
particles.forEach((p, idx) => {
p.age += dt;
if (p.stage === 'pub') {
// move to processor entry
const targetX = boxes.proc.x - 20;
const speed = 0.15 * dt;
p.x += (targetX - p.x) * Math.min(1, speed);
if (p.x >= targetX - 1) {
// arrived at processor: process then either forward or drop
p.stage = 'processing';
// keep reference before async change
const particle = p;
processor.processParticle(particle).then(res => {
// update value if passed
if (res.ok) {
particle.value = res.newVal;
particle.color = '#f1c40f'; // processed color
particle.stage = 'procToSub';
} else {
// dropped: stage -> done (fade)
particle.stage = 'dropped';
particle.color = '#e74c3c';
}
});
}
} else if (p.stage === 'procToSub') {
const targetX = boxes.sub.x - 20;
const speed = 0.12 * dt;
p.x += (targetX - p.x) * Math.min(1, speed);
// move vertically towards center of subscriber
p.y += ((boxes.sub.y + boxes.sub.h/2) - p.y) * 0.02 * dt;
if (p.x >= targetX - 1) {
// arrived at subscriber
subscriber.consume(p);
// mark done after short delay
setTimeout(() => { p.stage = 'done'; }, 400);
}
} else if (p.stage === 'dropped') {
p.y += 0.02 * dt;
p.radius *= 0.995;
if (p.radius < 2) p.stage = 'done';
} else if (p.stage === 'sub') {
// visual linger then done
if (p.age > 1200) p.stage = 'done';
}
});
// remove done particles
for (let i = particles.length-1; i>=0; i--) if (particles[i].stage === 'done') particles.splice(i,1);
}
function draw() {
ctx.clearRect(0,0,W,H);
// boxes
Object.values(boxes).forEach(b => {
ctx.fillStyle = '#f0f0f0';
ctx.fillRect(b.x, b.y, b.w, b.h);
ctx.strokeStyle = '#bbb';
ctx.strokeRect(b.x, b.y, b.w, b.h);
ctx.fillStyle = '#333';
ctx.font = '16px sans-serif';
ctx.textAlign = 'center';
ctx.fillText(b.label, b.x + b.w/2, b.y + 22);
});
// arrows
drawArrow(boxes.pub.x + boxes.pub.w, boxes.pub.y + boxes.pub.h/2,
boxes.proc.x, boxes.proc.y + boxes.proc.h/2, 'publish → process');
drawArrow(boxes.proc.x + boxes.proc.w, boxes.proc.y + boxes.proc.h/2,
boxes.sub.x, boxes.sub.y + boxes.sub.h/2, 'process → subscribe');
// pulses
pulses.forEach(p => {
p.t += 16;
const alpha = Math.max(0, 1 - p.t/400);
const b = p.box;
ctx.strokeStyle = `rgba(100,150,250,${alpha})`;
ctx.lineWidth = 3;
ctx.strokeRect(b.x - 4, b.y - 4, b.w + 8, b.h + 8);
});
pulses = pulses.filter(p => p.t < 420);
// particles
particles.forEach(p => p.draw(ctx));
// subscriber log
ctx.fillStyle = '#111';
ctx.font = '12px monospace';
ctx.textAlign = 'left';
ctx.fillText('Subscriber received:', boxes.sub.x, boxes.sub.y + boxes.sub.h + 18);
receivedLog.forEach((r,i) => {
const txt = `${r.time.toLocaleTimeString()} : ${Math.round(r.value)}`;
ctx.fillText(txt, boxes.sub.x, boxes.sub.y + boxes.sub.h + 36 + i * 14);
});
}
function drawArrow(x1,y1,x2,y2,label='') {
ctx.strokeStyle = '#888';
ctx.lineWidth = 2;
ctx.beginPath();
ctx.moveTo(x1,y1);
ctx.lineTo(x2,y2);
ctx.stroke();
// arrowhead
const angle = Math.atan2(y2-y1,x2-x1);
const len = 8;
ctx.beginPath();
ctx.moveTo(x2,y2);
ctx.lineTo(x2 - len*Math.cos(angle - 0.3), y2 - len*Math.sin(angle - 0.3));
ctx.lineTo(x2 - len*Math.cos(angle + 0.3), y2 - len*Math.sin(angle + 0.3));
ctx.closePath();
ctx.fillStyle = '#888';
ctx.fill();
// label
if (label) {
ctx.font = '12px sans-serif';
ctx.fillStyle = '#666';
ctx.textAlign = 'center';
ctx.fillText(label, (x1+x2)/2, (y1+y2)/2 - 8);
}
}
/* --- main loop --- */
let last = performance.now();
function loop(now) {
const dt = now - last;
last = now;
update(dt);
draw();
requestAnimationFrame(loop);
}
requestAnimationFrame(loop);
/* --- start publisher --- */
publisher.start();
/* --- bind ui updates to params (apply already sets) --- */
(function bindControls() {
// adjust ui when inputs change
document.getElementById('rate').addEventListener('change', () => {
document.getElementById('apply').click();
});
document.getElementById('filter').addEventListener('change', () => {
document.getElementById('apply').click();
});
document.getElementById('mapadd').addEventListener('change', () => {
document.getElementById('apply').click();
});
})();
/* keep processor's functions in sync with controls */
setInterval(() => {
processor.map = v => v + mapAdd;
processor.filter = v => v >= filterThreshold;
}, 200);
/* pause/resume publisher timer when running toggles */
setInterval(() => {
if (running && !publisher.timer) publisher.start();
if (!running && publisher.timer) publisher.stop();
}, 250);
</script>
</body>
</html>
ポイント解説
・青い粒子がPublisherから出る。
・黄色はProcessor通過後(map適用),赤はfilterで破棄(dropped)。
・緑がSubscriber受信を示す。
・コントロールで発行レート・filter閾値・map加算値を変更可能。
あとがき
Observer内で、まとまった処理が綺麗なコードで書くことができるのがいいですね。
慣れないと分かりにくいかもしれませんが、イベント処理をたくさん書いていくと、コードが汚くなってしまうので、こうした効率のいいデザインパターンを理解して扱いたいですね。
0 件のコメント:
コメントを投稿