부동산 시장을 위한 실시간 데이터 파이프라인
부동산 시장은 신규 매물, 매물 감소 등 매일 천문학적인 양의 데이터를 생성합니다. 가격, 기록된 거래, 지적 데이터, 건축 허가, 점유율 건설을 위한 상업 원자재 가격. 원시 데이터의 홍수를 변화시키세요 실행 가능한 실시간 시장 지표와 플랫폼을 구분하는 경쟁 우위로 구분됩니다. 전통적인 솔루션에서 PropTech를 선도합니다.
이 기사에서는 완전한 데이터 수집, 처리 및 분석 파이프라인을 구축합니다. 부동산 시장을 활용한 아파치 카프카 스트리밍을 위해, 아파치 플링크 실시간 처리를 위해, 시간척도DB 시계열 e의 경우 엘라스틱서치 검색 및 집계용.
무엇을 배울 것인가
- 실시간 부동산 데이터를 위한 Lambda 및 Kappa 아키텍처
- 이기종 소스(MLS, 포털, 토지 등록부)의 데이터를 윤리적으로 스크랩하고 표준화합니다.
- Apache Kafka: 부동산 이벤트를 위한 주제 디자인
- Apache Flink: 시장 지표에 대한 스트림 처리
- TimescaleDB: 과거 가격에 대한 시계열 최적화
- 지표 계산: 가격 지수, 시장 출시일, 가격 인하율
- 실시간 경고: 시장 기회가 나타날 때 알림
- Apache Superset 및 Grafana를 사용한 분석 대시보드
부동산 시장 데이터 소스
부동산 시장 인텔리전스 파이프라인은 매우 이질적인 소스와 형식을 통합해야 합니다. 품질과 새로 고침 빈도가 완전히 다릅니다.
주요 데이터 소스
- MLS 피드(RESO 웹 API): 고품질 구조화된 데이터, RESO/ODATA 표준, 15분마다 업데이트
- 부동산 포털(Immobiliare.it, Idealista, Zillow): 요금 한도 및 서비스 약관과 관련된 스크래핑
- 지적 (세입 기관, 토지 등록부): 기록된 거래, 지적 가치, 모기지 데이터
- 건축 허가: 동네 성장/쇠퇴의 예상 신호
- 인구 통계 데이터(ISTAT, 인구 조사): 인구, 소득, 이주 역학
- 그런 다음 OpenStreetMap: 서비스, 교통, 지리공간 강화를 위한 학교
- ECB/연준 금리: 모기지 시장과 부동산 구입에 직접적인 영향을 미칩니다.
부동산 이벤트를 위한 데이터 모델
우리는 각 시장 이벤트를 입력된 Kafka 메시지로 모델링합니다. 이벤트 중심 패턴 시간이 지남에 따라 모든 상태 변경을 추적하고, 재처리하고, 분석할 수 있도록 보장합니다.
// Event types per il mercato immobiliare
type PropertyEventType =
| 'listing.created'
| 'listing.updated'
| 'listing.price_changed'
| 'listing.status_changed'
| 'listing.removed'
| 'transaction.recorded' // vendita/affitto registrata
| 'appraisal.completed'
| 'permit.issued'; // permesso di costruzione
interface PropertyEvent {
id: string; // UUID evento
type: PropertyEventType;
sourceId: string; // ID nella fonte originale
sourceName: string; // 'mls_rome', 'immobiliare_it', 'catasto'
timestamp: string; // ISO 8601
propertyId?: string; // ID univoco nostra piattaforma (post-dedup)
payload: PropertyListingPayload | TransactionPayload | PermitPayload;
metadata: {
ingestTimestamp: string;
schemaVersion: string;
quality: 'high' | 'medium' | 'low';
};
}
interface PropertyListingPayload {
externalId: string;
title: string;
price: number;
previousPrice?: number; // se price_changed
currency: string;
propertyType: string;
squareMeters: number;
rooms: number;
location: {
lat: number;
lon: number;
address: string;
city: string;
neighborhood?: string;
h3Cell?: string; // pre-calcolato all'ingestione
};
listingType: 'sale' | 'rent';
status: 'active' | 'sold' | 'rented' | 'withdrawn';
listedAt: string;
updatedAt: string;
daysOnMarket?: number; // calcolato
features: Record<string, unknown>;
}
interface TransactionPayload {
transactionId: string;
salePrice: number;
assessedValue?: number; // valore catastale
transactionDate: string;
buyerType: 'individual' | 'company' | 'investment_fund';
financingType: 'cash' | 'mortgage' | 'unknown';
location: { lat: number; lon: number; city: string };
}
Kafka: 토폴로지 및 파티셔닝
Kafka 주제 디자인은 성능에 매우 중요합니다. 지리적 영역별 파티션이 허용됩니다. 소비자는 도시별로 병렬화되어 특정 속성에 대한 이벤트 순서를 보장합니다.
import { Kafka, Partitioners } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'proptech-ingestion',
brokers: process.env['KAFKA_BROKERS']!.split(','),
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env['KAFKA_USERNAME']!,
password: process.env['KAFKA_PASSWORD']!,
},
});
// Topic configuration
// - property.events.raw: tutti gli eventi grezzi, 50 partizioni per citta
// - property.events.enriched: post-processing con geocoding e deduplication
// - market.indicators: medie/aggregazioni per area geografica
// - alerts: opportunità di mercato detected da Flink
const TOPICS = [
{
topic: 'property.events.raw',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) }, // 7 giorni
{ name: 'compression.type', value: 'lz4' },
{ name: 'cleanup.policy', value: 'delete' },
],
},
{
topic: 'property.events.enriched',
numPartitions: 50,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: String(30 * 24 * 60 * 60 * 1000) }, // 30 giorni
{ name: 'compression.type', value: 'zstd' },
],
},
{
topic: 'market.indicators',
numPartitions: 10,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' }, // log compaction per latest state
{ name: 'retention.ms', value: String(-1) }, // indefinita (compacted)
],
},
];
// Producer con partitioning per area geografica
export class PropertyEventProducer {
private producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
transactionTimeout: 30000,
});
async sendEvent(event: PropertyEvent): Promise<void> {
await this.producer.send({
topic: 'property.events.raw',
messages: [{
// Key = city code: garantisce ordine eventi per citta sullo stesso partition
key: this.getCityCode(event),
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'source': event.sourceName,
'schema-version': '1.0',
},
timestamp: String(Date.now()),
}],
});
}
private getCityCode(event: PropertyEvent): string {
const payload = event.payload as PropertyListingPayload;
return payload.location?.city?.toLowerCase().replace(/\s/g, '-') ?? 'unknown';
}
}
Apache Flink: 실시간 지표 계산
아파치 플링크 처리할 수 있는 가장 강력한 스트림 처리 프레임워크 부동산 이벤트의 흐름을 파악하고 슬라이딩 시간 창에서 시장 지표를 계산합니다. 우리는 PyFlink와 함께 Flink를 사용하여 다음을 계산합니다. H3 셀 가격 지수 15분마다.
# PyFlink: calcolo Price Index per area geografica in real-time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import lit, col
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8)
env.get_checkpoint_config().set_checkpointing_interval(60_000) # checkpoint ogni 60s
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
tenv = StreamTableEnvironment.create(env, settings)
# Definizione source Kafka
tenv.execute_sql("""
CREATE TABLE property_events (
event_id STRING,
event_type STRING,
city STRING,
h3_cell STRING,
price DOUBLE,
square_meters DOUBLE,
price_per_sqm DOUBLE AS (price / square_meters),
listing_type STRING,
property_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'property.events.enriched',
'properties.bootstrap.servers' = '{{KAFKA_BROKERS}}',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Calcolo Price Index per H3 cell: media mobile su finestra 15 minuti
tenv.execute_sql("""
CREATE TABLE market_indicators (
h3_cell STRING,
city STRING,
listing_type STRING,
property_type STRING,
avg_price_per_sqm DOUBLE,
median_price DOUBLE,
listing_count BIGINT,
price_index DOUBLE,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
PRIMARY KEY (h3_cell, listing_type, property_type, window_end) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'market.indicators',
'format' = 'json'
)
""")
# Query: aggregazione per finestra temporale scorrevole
tenv.execute_sql("""
INSERT INTO market_indicators
SELECT
h3_cell,
city,
listing_type,
property_type,
AVG(price_per_sqm) AS avg_price_per_sqm,
-- Approssimazione mediana con percentile (Flink non ha MEDIAN nativo)
PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY price_per_sqm) AS median_price,
COUNT(*) AS listing_count,
-- Price Index: rapporto vs baseline di 30 giorni fa (in produzione: join con window storica)
AVG(price_per_sqm) / NULLIF(AVG(price_per_sqm) OVER (
PARTITION BY h3_cell, listing_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '30' DAY PRECEDING AND INTERVAL '15' MINUTE PRECEDING
), 0) AS price_index,
TUMBLE_START(event_time, INTERVAL '15' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '15' MINUTE) AS window_end
FROM property_events
WHERE
event_type IN ('listing.created', 'listing.price_changed')
AND price > 0
AND square_meters > 20
GROUP BY
h3_cell, city, listing_type, property_type,
TUMBLE(event_time, INTERVAL '15' MINUTE)
""")
# Query alert: proprietà con prezzo ridotto >10% ultimi 7 giorni
tenv.execute_sql("""
CREATE VIEW price_reduction_alerts AS
SELECT
event_id,
city,
h3_cell,
price,
(price - LAG(price) OVER (PARTITION BY event_id ORDER BY event_time)) / price AS reduction_pct,
event_time
FROM property_events
WHERE event_type = 'listing.price_changed'
HAVING reduction_pct < -0.10
""")
TimescaleDB: 시계열 최적화 스토리지
시간척도DB 시계열 데이터에 최적화된 PostgreSQL 확장입니다. 테이블을 임시 청크(하이퍼테이블)로 자동 분할하고 정밀한 압축 제공 기록 데이터의 경우 95%, 시간 간격에 따른 쿼리 성능이 뛰어납니다.
-- Schema TimescaleDB per dati mercato immobiliare
-- Tabella principali price observations (time-series)
CREATE TABLE price_observations (
time TIMESTAMPTZ NOT NULL,
h3_cell TEXT NOT NULL,
city TEXT NOT NULL,
listing_type TEXT NOT NULL, -- 'sale' | 'rent'
property_type TEXT NOT NULL,
avg_price_sqm NUMERIC(10, 2),
median_price NUMERIC(12, 2),
listing_count INT,
price_index NUMERIC(8, 4), -- 1.0 = baseline
dom_avg NUMERIC(8, 2) -- Days on Market media
);
-- Converte in hypertable (partizionamento automatico per tempo)
SELECT create_hypertable('price_observations', 'time', chunk_time_interval => INTERVAL '1 week');
-- Indici per query temporali e geografiche
CREATE INDEX idx_price_obs_h3_time ON price_observations (h3_cell, time DESC);
CREATE INDEX idx_price_obs_city_time ON price_observations (city, listing_type, time DESC);
-- Policy di compressione: comprimi chunk >30 giorni (risparmio 90%+ spazio)
ALTER TABLE price_observations SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'h3_cell, city, listing_type'
);
SELECT add_compression_policy('price_observations', INTERVAL '30 days');
-- Retention policy: aggrega automaticamente in bucket mensili dopo 1 anno
-- Usa Continuous Aggregates di TimescaleDB per le aggregazioni
CREATE MATERIALIZED VIEW monthly_market_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', time) AS month,
city,
listing_type,
property_type,
AVG(avg_price_sqm) AS avg_price_sqm,
AVG(price_index) AS avg_price_index,
SUM(listing_count) AS total_listings,
AVG(dom_avg) AS avg_days_on_market
FROM price_observations
GROUP BY month, city, listing_type, property_type
WITH NO DATA;
-- Refresh automatico ogni ora
SELECT add_continuous_aggregate_policy('monthly_market_summary',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- Query: trend prezzi mq negli ultimi 12 mesi per un quartiere H3
SELECT
time_bucket('1 month', time) AS month,
AVG(avg_price_sqm) AS avg_price_sqm,
MAX(price_index) AS price_index
FROM price_observations
WHERE
h3_cell = '{h3_cell_value}'
AND listing_type = 'sale'
AND time > NOW() - INTERVAL '12 months'
GROUP BY month
ORDER BY month;
-- Query: Days on Market trend per citta
SELECT
time_bucket('1 week', time) AS week,
city,
AVG(dom_avg) AS avg_dom,
SUM(listing_count) AS listings
FROM price_observations
WHERE city = 'Roma' AND listing_type = 'sale'
AND time > NOW() - INTERVAL '6 months'
GROUP BY week, city
ORDER BY week;
시장 지표: 계산 및 해석
// Calcolo Days on Market (DOM) e Price Reduction Rate
export interface MarketIndicators {
citySlug: string;
period: string; // '2026-Q1', '2026-03'
listingType: 'sale' | 'rent';
// Prezzi
avgPricePerSqm: number;
medianPricePerSqm: number;
priceIndex: number; // 1.0 = anno base, 1.05 = +5% YoY
// Velocita mercato
avgDaysOnMarket: number;
medianDaysOnMarket: number;
listingCountNew: number; // nuovi annunci nel periodo
listingCountClosed: number; // conclusi (venduti/affittati)
absorbRate: number; // conclusi / nuovi (>1 = mercato caldo)
// Tensione offerta/domanda
priceReductionRate: number; // % annunci con riduzione prezzo
avgPriceReductionPct: number; // riduzione media quando avviene
stockMonths: number; // mesi per esaurire offerta attuale
// Segmenti
byPropertyType: Record<string, { avgPrice: number; count: number }>;
}
export async function computeMarketIndicators(
db: Pool,
city: string,
periodStart: Date,
periodEnd: Date
): Promise<MarketIndicators> {
const [priceResult, velocityResult, reductionResult] = await Promise.all([
// 1. Prezzi medi
db.query(
`SELECT
AVG(price / square_meters) AS avg_price_sqm,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price / square_meters) AS median_price_sqm
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status = 'active'
AND listed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 2. Days on Market
db.query(
`SELECT
AVG(EXTRACT(DAY FROM (closed_at - listed_at))) AS avg_dom,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY EXTRACT(DAY FROM (closed_at - listed_at))) AS median_dom,
COUNT(*) AS closed_count
FROM property_listings
WHERE city = $1 AND listing_type = 'sale' AND status IN ('sold')
AND closed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
// 3. Price reductions
db.query(
`SELECT
COUNT(*) FILTER (WHERE price_change < 0) * 100.0 / COUNT(*) AS reduction_rate_pct,
AVG(price_change / original_price) FILTER (WHERE price_change < 0) AS avg_reduction_pct
FROM listing_price_history
WHERE city = $1 AND changed_at BETWEEN $2 AND $3`,
[city, periodStart, periodEnd]
),
]);
const activeCount = await db.query(
'SELECT COUNT(*) AS cnt FROM property_listings WHERE city = $1 AND status = $2',
[city, 'active']
);
const closedCount = parseInt(velocityResult.rows[0]?.closed_count ?? '0');
const durationDays = (periodEnd.getTime() - periodStart.getTime()) / (1000 * 86400);
const closedPerDay = closedCount / durationDays;
const stockMonths = closedPerDay > 0
? (parseInt(activeCount.rows[0].cnt) / (closedPerDay * 30))
: 99;
return {
citySlug: city.toLowerCase().replace(/\s/g, '-'),
period: periodStart.toISOString().substring(0, 7),
listingType: 'sale',
avgPricePerSqm: parseFloat(priceResult.rows[0]?.avg_price_sqm ?? '0'),
medianPricePerSqm: parseFloat(priceResult.rows[0]?.median_price_sqm ?? '0'),
priceIndex: 1.0, // In produzione: confronto con periodo precedente
avgDaysOnMarket: parseFloat(velocityResult.rows[0]?.avg_dom ?? '30'),
medianDaysOnMarket: parseFloat(velocityResult.rows[0]?.median_dom ?? '25'),
listingCountNew: 0, // Query separata
listingCountClosed: closedCount,
absorbRate: closedCount / Math.max(1, parseInt(activeCount.rows[0].cnt)),
priceReductionRate: parseFloat(reductionResult.rows[0]?.reduction_rate_pct ?? '0'),
avgPriceReductionPct: parseFloat(reductionResult.rows[0]?.avg_reduction_pct ?? '0'),
stockMonths,
byPropertyType: {}, // Popolare con query separata
};
}
실시간 경고: 시장 기회
실시간 파이프라인의 실제 가치는 투자자에게 알리는 사전 경고에 있습니다. 그리고 시장이 기회를 흡수하기 전에 기회가 나타날 때 구매자가 됩니다.
// Sistema di alerting per opportunità di mercato
interface MarketAlert {
alertId: string;
type: 'price_drop' | 'new_listing_below_market' | 'high_demand_area' | 'distressed_sale';
severity: 'low' | 'medium' | 'high';
propertyId?: string;
h3Cell?: string;
city: string;
description: string;
metrics: Record<string, number>;
timestamp: string;
}
export class MarketAlertEngine {
async processEvent(
event: PropertyEvent,
marketContext: MarketIndicators
): Promise<MarketAlert[]> {
const alerts: MarketAlert[] = [];
const payload = event.payload as PropertyListingPayload;
// Alert 1: Prezzo significativamente sotto mercato
if (event.type === 'listing.created' && payload.price > 0) {
const priceSqm = payload.price / payload.squareMeters;
const marketPriceSqm = marketContext.avgPricePerSqm;
const discount = (marketPriceSqm - priceSqm) / marketPriceSqm;
if (discount > 0.15) {
alerts.push({
alertId: `alert-${Date.now()}`,
type: 'new_listing_below_market',
severity: discount > 0.25 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Nuovo annuncio ${(discount * 100).toFixed(0)}% sotto la media di mercato`,
metrics: {
priceSqm,
marketPriceSqm,
discountPercent: discount * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 2: Riduzione prezzo significativa
if (event.type === 'listing.price_changed' && payload.previousPrice) {
const reduction = (payload.previousPrice - payload.price) / payload.previousPrice;
if (reduction > 0.08) {
alerts.push({
alertId: `alert-${Date.now()}-pr`,
type: 'price_drop',
severity: reduction > 0.15 ? 'high' : 'medium',
propertyId: payload.externalId,
city: payload.location.city,
description: `Riduzione prezzo ${(reduction * 100).toFixed(0)}% (da €${payload.previousPrice} a €${payload.price})`,
metrics: {
previousPrice: payload.previousPrice,
currentPrice: payload.price,
reductionPercent: reduction * 100,
},
timestamp: new Date().toISOString(),
});
}
}
// Alert 3: Area con alta domanda (DOM molto basso)
if (marketContext.avgDaysOnMarket < 7 && marketContext.absorbRate > 1.5) {
alerts.push({
alertId: `alert-${Date.now()}-hd`,
type: 'high_demand_area',
severity: 'medium',
h3Cell: payload.location.h3Cell,
city: payload.location.city,
description: `Area ad altissima domanda: DOM medio ${marketContext.avgDaysOnMarket} giorni, absorption rate ${marketContext.absorbRate.toFixed(1)}x`,
metrics: {
avgDom: marketContext.avgDaysOnMarket,
absorbRate: marketContext.absorbRate,
},
timestamp: new Date().toISOString(),
});
}
return alerts;
}
}
주요 시장 지표
| 지시자 | 계산 | 해석 |
|---|---|---|
| DOM(시장 출시일) | 상장부터 마감까지 평균 일수 | <30 = 뜨거운 시장, >90 = 차가운 시장 |
| 흡수율 | 매출/월 / 활성재고 | >20% = 판매자 시장, <15% = 구매자 시장 |
| 가격 인하율 | 가격 인하로 % 등재 | >30% = 약세 압력 |
| 공급 기간(개월) | 재고/월매출액 | <3 = 부족, 4-6 = 균형, >6 = 과잉 |
| 목록-판매 비율 | 판매 가격 / 상장 가격 | >100% = 경쟁 시장(요청 시 제공) |
결론
실시간 부동산 시장 인텔리전스 파이프라인은 원시 데이터를 통찰력으로 전환합니다. 실행 가능: 적합한 투자자는 언제 어디서 구매할지 알고, 자산 관리자는 구매 방법을 알고 있습니다. 가격에 따라 에이전트는 어떤 속성을 제시해야 할지 알고 있습니다. Kafka + Flink + TimescaleDB 조합 수천만 건을 처리할 수 있는 강력하고 확장 가능하며 유지 관리 가능한 아키텍처를 제공합니다. 초 단위의 지연 시간을 갖는 일일 이벤트 수입니다.







