不動産市場向けのリアルタイム データ パイプライン
不動産市場では、新規物件、値下げなど、天文学的な量のデータが毎日生成されます。 価格、記録された取引、地籍データ、建築許可、占有率 商業用、建設用の原材料価格。この大量の生データを変換する 実用的なリアルタイムの市場指標とプラットフォームを分ける競争上の優位性を導き出す 従来のソリューションから PropTech をリードします。
この記事では、完全なデータの取り込み、処理、分析のパイプラインを構築します。 を活用した不動産市場の アパッチ カフカ ストリーミング用、 アパッチフリンク リアルタイム処理の場合、 タイムスケールDB 時系列 e の場合 エラスティックサーチ 検索と集計用。
何を学ぶか
- リアルタイムの不動産データのための Lambda および Kappa アーキテクチャ
- 異種ソース(MLS、ポータル、土地登記)からのデータの倫理的なスクレイピングと正規化
- Apache Kafka: 不動産イベントのトピック デザイン
- Apache Flink: 市場指標のストリーム処理
- TimescaleDB: 過去の価格の時系列最適化
- 指標の計算: 価格指数、市場に出回る日数、価格下落率
- リアルタイムアラート: 市場機会が出現した際の通知
- Apache Superset と Grafana を使用した分析ダッシュボード
不動産市場のデータソース
不動産市場インテリジェンス パイプラインは、非常に異質なソースをフォーマットで統合する必要があります。 品質とリフレッシュレートがまったく異なります。
主なデータソース
- MLS フィード (RESO Web API): 高品質の構造化データ、RESO/ODATA 標準、15 分ごとに更新
- 不動産ポータル (Imbiliare.it、Idealista、Zillow): レート制限と利用規約に関するスクレイピング
- 地籍 (歳入庁、土地登記所): 記録された取引、地籍価額、住宅ローンデータ
- 建築許可: 近隣地域の成長/衰退の予期信号
- 人口統計データ (ISTAT、国勢調査): 人口、収入、移住動態
- 次に、OpenStreetMap: サービス、交通、地理空間強化のための学校
- ECB/FRB金利: 住宅ローン市場と不動産購入への直接的な影響
不動産イベントのデータモデル
各市場イベントを型付きの Kafka メッセージとしてモデル化します。イベント駆動型パターン あらゆる状態変化が追跡され、長期にわたって再処理可能、分析可能であることが保証されます。
// Event types per il mercato immobiliare
type PropertyEventType =
| 'listing.created'
| 'listing.updated'
| 'listing.price_changed'
| 'listing.status_changed'
| 'listing.removed'
| 'transaction.recorded' // vendita/affitto registrata
| 'appraisal.completed'
| 'permit.issued'; // permesso di costruzione
interface PropertyEvent {
id: string; // UUID evento
type: PropertyEventType;
sourceId: string; // ID nella fonte originale
sourceName: string; // 'mls_rome', 'immobiliare_it', 'catasto'
timestamp: string; // ISO 8601
propertyId?: string; // ID univoco nostra piattaforma (post-dedup)
payload: PropertyListingPayload | TransactionPayload | PermitPayload;
metadata: {
ingestTimestamp: string;
schemaVersion: string;
quality: 'high' | 'medium' | 'low';
};
}
interface PropertyListingPayload {
externalId: string;
title: string;
price: number;
previousPrice?: number; // se price_changed
currency: string;
propertyType: string;
squareMeters: number;
rooms: number;
location: {
lat: number;
lon: number;
address: string;
city: string;
neighborhood?: string;
h3Cell?: string; // pre-calcolato all'ingestione
};
listingType: 'sale' | 'rent';
status: 'active' | 'sold' | 'rented' | 'withdrawn';
listedAt: string;
updatedAt: string;
daysOnMarket?: number; // calcolato
features: Record<string, unknown>;
}
interface TransactionPayload {
transactionId: string;
salePrice: number;
assessedValue?: number; // valore catastale
transactionDate: string;
buyerType: 'individual' | 'company' | 'investment_fund';
financingType: 'cash' | 'mortgage' | 'unknown';
location: { lat: number; lon: number; city: string };
}
Kafka: トポロジとパーティショニング
Kafka トピックの設計はパフォーマンスが重要です。地理的エリアによるパーティションが許可されます 消費者が都市ごとに並列化され、特定のプロパティのイベントの順序が保証されます。
import { Kafka, Partitioners } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'proptech-ingestion',
brokers: process.env['KAFKA_BROKERS']!.split(','),
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env['KAFKA_USERNAME']!,
password: process.env['KAFKA_PASSWORD']!,
},
});
// Topic configuration
// - property.events.raw: tutti gli eventi grezzi, 50 partizioni per citta
// - property.events.enriched: post-processing con geocoding e deduplication
// - market.indicators: medie/aggregazioni per area geografica
// - alerts: opportunità di mercato detected da Flink
const TOPICS = [
{
topic: 'property.events.raw',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) }, // 7 giorni
{ name: 'compression.type', value: 'lz4' },
{ name: 'cleanup.policy', value: 'delete' },
],
},
{
topic: 'property.events.enriched',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(30 * 24 * 60 * 60 * 1000) }, // 30 giorni
{ name: 'compression.type', value: 'zstd' },
],
},
{
topic: 'market.indicators',
numPartitions: 10,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' }, // log compaction per latest state
{ name: 'retention.ms', value: String(-1) }, // indefinita (compacted)
],
},
];
// Producer con partitioning per area geografica
export class PropertyEventProducer {
private producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
transactionTimeout: 30000,
});
async sendEvent(event: PropertyEvent): Promise<void> {
await this.producer.send({
topic: 'property.events.raw',
messages: [{
// Key = city code: garantisce ordine eventi per citta sullo stesso partition
key: this.getCityCode(event),
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'source': event.sourceName,
'schema-version': '1.0',
},
timestamp: String(Date.now()),
}],
});
}
private getCityCode(event: PropertyEvent): string {
const payload = event.payload as PropertyListingPayload;
return payload.location?.city?.toLowerCase().replace(/\s/g, '-') ?? 'unknown';
}
}
Apache Flink: リアルタイムのインジケーター計算
アパッチフリンク 処理する最も強力なストリーム処理フレームワーク 不動産イベントの流れを把握し、スライディング タイム ウィンドウで市場指標を計算します。 Flink と PyFlink を使用して、 H3セルの価格指数 15分ごと。
# PyFlink: calcolo Price Index per area geografica in real-time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import lit, col
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8)
env.get_checkpoint_config().set_checkpointing_interval(60_000) # checkpoint ogni 60s
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
tenv = StreamTableEnvironment.create(env, settings)
# Definizione source Kafka
tenv.execute_sql("""
CREATE TABLE property_events (
event_id STRING,
event_type STRING,
city STRING,
h3_cell STRING,
price DOUBLE,
square_meters DOUBLE,
price_per_sqm DOUBLE AS (price / square_meters),
listing_type STRING,
property_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'property.events.enriched',
'properties.bootstrap.servers' = '{{KAFKA_BROKERS}}',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Calcolo Price Index per H3 cell: media mobile su finestra 15 minuti
tenv.execute_sql("""
CREATE TABLE market_indicators (
h3_cell STRING,
city STRING,
listing_type STRING,
property_type STRING,
avg_price_per_sqm DOUBLE,
median_price DOUBLE,
listing_count BIGINT,
price_index DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
PRIMARY KEY (h3_cell, listing_type, property_type, window_end) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'market.indicators',
'format' = 'json'
)
""")
# Query: aggregazione per finestra temporale scorrevole
tenv.execute_sql("""
INSERT INTO market_indicators
SELECT
h3_cell,
city,
listing_type,
property_type,
AVG(price_per_sqm) AS avg_price_per_sqm,
-- Approssimazione mediana con percentile (Flink non ha MEDIAN nativo)
PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY price_per_sqm) AS median_price,
COUNT(*) AS listing_count,
-- Price Index: rapporto vs baseline di 30 giorni fa (in produzione: join con window storica)
AVG(price_per_sqm) / NULLIF(AVG(price_per_sqm) OVER (
PARTITION BY h3_cell, listing_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '30' DAY PRECEDING AND INTERVAL '15' MINUTE PRECEDING
), 0) AS price_index,
TUMBLE_START(event_time, INTERVAL '15' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '15' MINUTE) AS window_end
FROM property_events
WHERE
event_type IN ('listing.created', 'listing.price_changed')
AND price > 0
AND square_meters > 20
GROUP BY
h3_cell, city, listing_type, property_type,
TUMBLE(event_time, INTERVAL '15' MINUTE)
""")
# Query alert: proprietà con prezzo ridotto >10% ultimi 7 giorni
tenv.execute_sql("""
CREATE VIEW price_reduction_alerts AS
SELECT
event_id,
city,
h3_cell,
price,
(price - LAG(price) OVER (PARTITION BY event_id ORDER BY event_time)) / price AS reduction_pct,
event_time
FROM property_events
WHERE event_type = 'listing.price_changed'
HAVING reduction_pct < -0.10
""")
TimescaleDB: 時系列に最適化されたストレージ
タイムスケールDB 時系列データ用に最適化された PostgreSQL 拡張機能。 テーブルを一時的なチャンク (ハイパーテーブル) に自動的に分割し、細かい圧縮を提供します。 履歴データの場合は 95% であり、時間間隔にわたって優れたクエリ パフォーマンスを発揮します。
-- Schema TimescaleDB per dati mercato immobiliare
-- Tabella principali price observations (time-series)
CREATE TABLE price_observations (
time TIMESTAMPTZ NOT NULL,
h3_cell TEXT NOT NULL,
city TEXT NOT NULL,
listing_type TEXT NOT NULL, -- 'sale' | 'rent'
property_type TEXT NOT NULL,
avg_price_sqm NUMERIC(10, 2),
median_price NUMERIC(12, 2),
listing_count INT,
price_index NUMERIC(8, 4), -- 1.0 = baseline
dom_avg NUMERIC(8, 2) -- Days on Market media
);
-- Converte in hypertable (partizionamento automatico per tempo)
SELECT create_hypertable('price_observations', 'time', chunk_time_interval => INTERVAL '1 week');
-- Indici per query temporali e geografiche
CREATE INDEX idx_price_obs_h3_time ON price_observations (h3_cell, time DESC);
CREATE INDEX idx_price_obs_city_time ON price_observations (city, listing_type, time DESC);
-- Policy di compressione: comprimi chunk >30 giorni (risparmio 90%+ spazio)
ALTER TABLE price_observations SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'h3_cell, city, listing_type'
);
SELECT add_compression_policy('price_observations', INTERVAL '30 days');
-- Retention policy: aggrega automaticamente in bucket mensili dopo 1 anno
-- Usa Continuous Aggregates di TimescaleDB per le aggregazioni
CREATE MATERIALIZED VIEW monthly_market_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', time) AS month,
city,
listing_type,
property_type,
AVG(avg_price_sqm) AS avg_price_sqm,
AVG(price_index) AS avg_price_index,
SUM(listing_count) AS total_listings,
AVG(dom_avg) AS avg_days_on_market
FROM price_observations
GROUP BY month, city, listing_type, property_type
WITH NO DATA;
-- Refresh automatico ogni ora
SELECT add_continuous_aggregate_policy('monthly_market_summary',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Query: trend prezzi mq negli ultimi 12 mesi per un quartiere H3
SELECT
time_bucket('1 month', time) AS month,
AVG(avg_price_sqm) AS avg_price_sqm,
MAX(price_index) AS price_index
FROM price_observations
WHERE
h3_cell = '{h3_cell_value}'
AND listing_type = 'sale'
AND time > NOW() - INTERVAL '12 months'
GROUP BY month
ORDER BY month;
-- Query: Days on Market trend per citta
SELECT
time_bucket('1 week', time) AS week,
city,
AVG(dom_avg) AS avg_dom,
SUM(listing_count) AS listings
FROM price_observations
WHERE city = 'Roma' AND listing_type = 'sale'
AND time > NOW() - INTERVAL '6 months'
GROUP BY week, city
ORDER BY week;
市場指標: 計算と解釈
// Calcolo Days on Market (DOM) e Price Reduction Rate
export interface MarketIndicators {
citySlug: string;
period: string; // '2026-Q1', '2026-03'
listingType: 'sale' | 'rent';
// Prezzi
avgPricePerSqm: number;
medianPricePerSqm: number;
priceIndex: number; // 1.0 = anno base, 1.05 = +5% YoY
// Velocita mercato
avgDaysOnMarket: number;
medianDaysOnMarket: number;
listingCountNew: number; // nuovi annunci nel periodo
listingCountClosed: number; // conclusi (venduti/affittati)
absorbRate: number; // conclusi / nuovi (>1 = mercato caldo)
// Tensione offerta/domanda
priceReductionRate: number; // % annunci con riduzione prezzo
avgPriceReductionPct: number; // riduzione media quando avviene
stockMonths: number; // mesi per esaurire offerta attuale
// Segmenti
byPropertyType: Record<string, { avgPrice: number; count: number }>;
}
export async function computeMarketIndicators(
db: Pool,
city: string,
periodStart: Date,
periodEnd: Date
): Promise<MarketIndicators> {
const [priceResult, velocityResult, reductionResult] = await Promise.all([
// 1. Prezzi medi
db.query(
`SELECT
AVG(price / square_meters) AS avg_price_sqm,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price / square_meters) AS median_price_sqm
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status = 'active'
AND listed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 2. Days on Market
db.query(
`SELECT
AVG(EXTRACT(DAY FROM (closed_at - listed_at))) AS avg_dom,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY EXTRACT(DAY FROM (closed_at - listed_at))) AS median_dom,
COUNT(*) AS closed_count
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status IN ('sold')
AND closed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 3. Price reductions
db.query(
`SELECT
COUNT(*) FILTER (WHERE price_change < 0) * 100.0 / COUNT(*) AS reduction_rate_pct,
AVG(price_change / original_price) FILTER (WHERE price_change < 0) AS avg_reduction_pct
FROM listing_price_history
WHERE city = $1 AND changed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
]);
const activeCount = await db.query(
'SELECT COUNT(*) AS cnt FROM property_listings WHERE city = $1 AND status = $2',
[city, 'active']
);
const closedCount = parseInt(velocityResult.rows[0]?.closed_count ?? '0');
const durationDays = (periodEnd.getTime() - periodStart.getTime()) / (1000 * 86400);
const closedPerDay = closedCount / durationDays;
const stockMonths = closedPerDay > 0
? (parseInt(activeCount.rows[0].cnt) / (closedPerDay * 30))
: 99;
return {
citySlug: city.toLowerCase().replace(/\s/g, '-'),
period: periodStart.toISOString().substring(0, 7),
listingType: 'sale',
avgPricePerSqm: parseFloat(priceResult.rows[0]?.avg_price_sqm ?? '0'),
medianPricePerSqm: parseFloat(priceResult.rows[0]?.median_price_sqm ?? '0'),
priceIndex: 1.0, // In produzione: confronto con periodo precedente
avgDaysOnMarket: parseFloat(velocityResult.rows[0]?.avg_dom ?? '30'),
medianDaysOnMarket: parseFloat(velocityResult.rows[0]?.median_dom ?? '25'),
listingCountNew: 0, // Query separata
listingCountClosed: closedCount,
absorbRate: closedCount / Math.max(1, parseInt(activeCount.rows[0].cnt)),
priceReductionRate: parseFloat(reductionResult.rows[0]?.reduction_rate_pct ?? '0'),
avgPriceReductionPct: parseFloat(reductionResult.rows[0]?.avg_reduction_pct ?? '0'),
stockMonths,
byPropertyType: {}, // Popolare con query separata
};
}
リアルタイム アラート: 市場機会
リアルタイム パイプラインの真の価値は、投資家に通知するプロアクティブなアラートにあります。 そして、市場が吸収する前に機会が出現したときの買い手。
// Sistema di alerting per opportunità di mercato
interface MarketAlert {
alertId: string;
type: 'price_drop' | 'new_listing_below_market' | 'high_demand_area' | 'distressed_sale';
severity: 'low' | 'medium' | 'high';
propertyId?: string;
h3Cell?: string;
city: string;
description: string;
metrics: Record<string, number>;
timestamp: string;
}
export class MarketAlertEngine {
async processEvent(
event: PropertyEvent,
marketContext: MarketIndicators
): Promise<MarketAlert[]> {
const alerts: MarketAlert[] = [];
const payload = event.payload as PropertyListingPayload;
// Alert 1: Prezzo significativamente sotto mercato
if (event.type === 'listing.created' && payload.price > 0) {
const priceSqm = payload.price / payload.squareMeters;
const marketPriceSqm = marketContext.avgPricePerSqm;
const discount = (marketPriceSqm - priceSqm) / marketPriceSqm;
if (discount > 0.15) {
alerts.push({
alertId: `alert-${Date.now()}`,
type: 'new_listing_below_market',
severity: discount > 0.25 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Nuovo annuncio ${(discount * 100).toFixed(0)}% sotto la media di mercato`,
metrics: {
priceSqm,
marketPriceSqm,
discountPercent: discount * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 2: Riduzione prezzo significativa
if (event.type === 'listing.price_changed' && payload.previousPrice) {
const reduction = (payload.previousPrice - payload.price) / payload.previousPrice;
if (reduction > 0.08) {
alerts.push({
alertId: `alert-${Date.now()}-pr`,
type: 'price_drop',
severity: reduction > 0.15 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Riduzione prezzo ${(reduction * 100).toFixed(0)}% (da €${payload.previousPrice} a €${payload.price})`,
metrics: {
previousPrice: payload.previousPrice,
currentPrice: payload.price,
reductionPercent: reduction * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 3: Area con alta domanda (DOM molto basso)
if (marketContext.avgDaysOnMarket < 7 && marketContext.absorbRate > 1.5) {
alerts.push({
alertId: `alert-${Date.now()}-hd`,
type: 'high_demand_area',
severity: 'medium',
h3Cell: payload.location.h3Cell,
city: payload.location.city,
description: `Area ad altissima domanda: DOM medio ${marketContext.avgDaysOnMarket} giorni, absorption rate ${marketContext.absorbRate.toFixed(1)}x`,
metrics: {
avgDom: marketContext.avgDaysOnMarket,
absorbRate: marketContext.absorbRate,
},
timestamp: new Date().toISOString(),
});
}
return alerts;
}
}
主要な市場指標
| インジケータ | 計算 | 解釈 |
|---|---|---|
| 市場出荷日数 (DOM) | 上場からクロージングまでの平均日数 | <30 = ホット マーケット、>90 = コールド マーケット |
| 吸収率 | 売上/月 / 有効在庫数 | >20% = 売り手市場、<15% = 買い手市場 |
| 値下げ率 | 値下げありの出品率 | >30% = 弱気圧力 |
| 供給月数 | 在庫数・月次売上高 | <3 = 不足、4 ~ 6 = 均衡、>6 = 余剰 |
| リスト売上比率 | 販売価格 / 定価 | >100% = 競争市場 (リクエストに応じて提供) |
結論
リアルタイムの不動産市場インテリジェンス パイプラインが生データを洞察に変える 実用的: 適切な投資家はいつどこで購入するかを知っており、不動産管理者はその方法を知っています。 エージェントは、どのプロパティを提示すべきかを知っています。 Kafka + Flink + TimescaleDB の組み合わせ 数千万ものデータを処理できる、堅牢でスケーラブルで保守可能なアーキテクチャを提供します 1 日あたりのイベント数(レイテンシは秒単位)。







