SpringBoot與Cassandra整合,實(shí)現(xiàn)高寫入吞吐量用戶事件存儲系統(tǒng)
作者:Java知識日歷
隨著我們的app使用人數(shù)快速發(fā)展,用戶行為數(shù)據(jù)變得越來越重要,這些數(shù)據(jù)可以用來優(yōu)化產(chǎn)品設(shè)計(jì)、提升用戶體驗(yàn)、實(shí)施精準(zhǔn)營銷策略,并支持業(yè)務(wù)決策。然而,處理海量的用戶行為數(shù)據(jù)需要高效的存儲和分析能力,傳統(tǒng)的數(shù)據(jù)庫方案往往難以滿足這些需求。
隨著我們的app使用人數(shù)快速發(fā)展,用戶行為數(shù)據(jù)變得越來越重要,這些數(shù)據(jù)可以用來優(yōu)化產(chǎn)品設(shè)計(jì)、提升用戶體驗(yàn)、實(shí)施精準(zhǔn)營銷策略,并支持業(yè)務(wù)決策。然而,處理海量的用戶行為數(shù)據(jù)需要高效的存儲和分析能力,傳統(tǒng)的數(shù)據(jù)庫方案往往難以滿足這些需求。
為什么選擇使用Cassandra作為用戶事件存儲解決方案?
- 高性能寫入:Cassandra的設(shè)計(jì)使得它非常適合高寫入場景。它的寫操作是無鎖的,并且所有節(jié)點(diǎn)都可以接受寫請求,這大大提高了寫入性能。
- 一致性模型:Cassandra提供了多種一致性級別(如ONE, QUORUM, ALL等),可以根據(jù)具體需求調(diào)整,以平衡一致性和可用性。
- 自動分片:Cassandra會自動將數(shù)據(jù)分布在集群中的多個(gè)節(jié)點(diǎn)上,確保負(fù)載均衡。
- 動態(tài)添加節(jié)點(diǎn):可以隨時(shí)向集群中添加新的節(jié)點(diǎn),以提高容量和性能,而無需停機(jī)維護(hù)。
- 快速讀取:Cassandra支持高效的點(diǎn)查詢和范圍查詢,適合讀取特定用戶的所有事件。
- 二級索引:雖然Cassandra不支持復(fù)雜的查詢,但它提供了一定程度的二級索引功能,可以幫助進(jìn)行更靈活的查詢。
- 多副本復(fù)制:Cassandra默認(rèn)會在多個(gè)節(jié)點(diǎn)之間復(fù)制數(shù)據(jù),確保數(shù)據(jù)的安全性和可靠性。
- 強(qiáng)容錯(cuò)能力:即使某些節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)仍然可以繼續(xù)提供服務(wù)。
- 定期快照和增量備份:支持定期快照和增量備份,方便數(shù)據(jù)恢復(fù)和災(zāi)難恢復(fù)。
- 免費(fèi)開源:作為Apache軟件基金會的一個(gè)頂級項(xiàng)目,Cassandra是免費(fèi)且開源的。
- 硬件利用率高:Cassandra能夠充分利用現(xiàn)有的硬件資源,減少額外的成本投入。
哪些公司選擇Cassandra?
- Capital One:用于存儲金融交易數(shù)據(jù)和客戶行為分析。
- Spotify:用于存儲音樂播放記錄、用戶偏好等數(shù)據(jù)。
- Rovio (Angry Birds):用于存儲游戲內(nèi)玩家的行為數(shù)據(jù)和統(tǒng)計(jì)信息。
- Uber:用于存儲司機(jī)和乘客的位置數(shù)據(jù)、行程信息等。
- Netflix:用于存儲和處理大量的用戶行為數(shù)據(jù),如觀看歷史、搜索記錄等。
- Instagram:用于存儲照片和視頻的元數(shù)據(jù),以及用戶活動日志。
- Apple:用于移動設(shè)備的推送通知系統(tǒng)和其他內(nèi)部應(yīng)用的數(shù)據(jù)存儲。
- Reddit:用于存儲用戶提交的內(nèi)容、評論和其他社交數(shù)據(jù)。
- Zynga:用于存儲在線游戲中的用戶活動和統(tǒng)計(jì)數(shù)據(jù)。
代碼實(shí)操
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cassandra-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cassandra-demo</name>
<description>Demo project for Spring Boot and Cassandra integration</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# 配置Cassandra連接信息
spring.data.cassandra.contact-points=localhost # Cassandra節(jié)點(diǎn)地址
spring.data.cassandra.port=9042 # Cassandra端口號
spring.data.cassandra.keyspace-name=user_events # keyspace名稱
spring.data.cassandra.schema-action=create_if_not_exists # 如果keyspace不存在則創(chuàng)建
實(shí)體類
package com.example.cassandrareactivedemo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.cassandra.core.mapping.Table;
import java.util.Date;
/**
* 用戶事件實(shí)體類
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("user_events")
publicclass UserEvent {
@Id// 標(biāo)記為主鍵
private String eventId; // 事件ID
private String userId; // 用戶ID
private String eventType; // 事件類型
private Date eventTime; // 事件發(fā)生時(shí)間
}
Repository
package com.example.cassandrareactivedemo.repository;
import com.example.cassandrareactivedemo.model.UserEvent;
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
import reactor.core.publisher.Flux;
public interface UserEventRepository extends ReactiveCassandraRepository<UserEvent, String> {
/**
* 根據(jù)用戶ID查找所有相關(guān)的用戶事件
* @param userId 用戶ID
* @return 包含所有匹配用戶的事件流
*/
Flux<UserEvent> findByUserId(String userId);
}
Service
package com.example.cassandrareactivedemo.service;
import com.example.cassandrareactivedemo.model.UserEvent;
import com.example.cassandrareactivedemo.repository.UserEventRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 用戶事件服務(wù)類,處理業(yè)務(wù)邏輯
*/
@Service
publicclass UserEventService {
@Autowired
private UserEventRepository userEventRepository;
/**
* 保存用戶事件到Cassandra數(shù)據(jù)庫
* @param userEvent 用戶事件對象
* @return 包含已保存用戶的Mono對象
*/
public Mono<UserEvent> save(UserEvent userEvent) {
return userEventRepository.save(userEvent);
}
/**
* 根據(jù)用戶ID查找所有相關(guān)的用戶事件
* @param userId 用戶ID
* @return 包含所有匹配用戶的事件流
*/
public Flux<UserEvent> findByUserId(String userId) {
return userEventRepository.findByUserId(userId);
}
}
Controller
package com.example.cassandrareactivedemo.controller;
import com.example.cassandrareactivedemo.model.UserEvent;
import com.example.cassandrareactivedemo.service.UserEventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/user-events")
publicclass UserEventController {
@Autowired
private UserEventService userEventService;
/**
* 創(chuàng)建新的用戶事件
* @param userEvent 用戶事件對象
* @return 包含新創(chuàng)建用戶的Mono對象
*/
@PostMapping("/")
public Mono<UserEvent> createUserEvent(@RequestBody UserEvent userEvent) {
return userEventService.save(userEvent);
}
/**
* 根據(jù)用戶ID獲取所有相關(guān)用戶事件
* @param userId 用戶ID
* @return 包含所有匹配用戶的事件流
*/
@GetMapping("/{userId}")
public Flux<UserEvent> getUserEventsByUserId(@PathVariable String userId) {
return userEventService.findByUserId(userId);
}
}
Application
package com.example.cassandrareactivedemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CassandraDemoApplication {
public static void main(String[] args) {
SpringApplication.run(CassandraDemoApplication.class, args);
}
}
測試
創(chuàng)建新的用戶事件
curl -X POST http://localhost:8080/api/user-events/ \
-H "Content-Type: application/json" \
-d '{
"eventId": "event1",
"userId": "user1",
"eventType": "login",
"eventTime": "2025-05-22T20:10:06Z"
}'
Respons
{
"eventId": "event1",
"userId": "user1",
"eventType": "login",
"eventTime": "2025-05-22T20:10:06Z"
}
獲取特定用戶的用戶事件
curl -X GET http://localhost:8080/api/user-events/user1
Respons
[
{
"eventId": "event1",
"userId": "user1",
"eventType": "login",
"eventTime": "2025-05-22T20:10:06Z"
}
]
責(zé)任編輯:武曉燕
來源:
Java知識日歷