团队的向量模型迭代一直是个痛点。每次对预处理逻辑或模型进行微调,都意味着需要对整个数GB的验证数据集重新生成嵌入向量,这个过程动辄数小时。我们的CI/CD流水线,原本是为了加速交付,现在却成了最主要的瓶颈。问题很明确:大量的计算是重复且不必要的。当输入数据和处理逻辑都没变时,其输出的向量也应该是固定的。
初步的构想自然是引入缓存。但在Tekton这种基于容器的流水线环境中,传统的基于文件系统的缓存策略(如Tekton自带的Workspace PVC)显得笨拙。跨流水线运行(PipelineRun)的缓存共享配置复杂,而且在并行任务中还可能存在读写冲突。我们需要一个更高效、中心化的缓存解决方案。
技术选型决策很快就清晰了:
- Tekton: 作为流水线编排器。它Kubernetes原生的特性让我们能精细化地控制每个步骤的资源和环境,这对于资源密集型的ML任务至关重要。
- Python: ML任务的实现语言,毋庸置疑。
- Redis: 这次选型的关键。我们没有把它仅仅当作一个简单的键值存储。我们选择的是集成了向量搜索能力的Redis Stack。这个决策有两个核心考量:
- 高速缓存层: 利用Redis的内存读写性能,作为我们向量嵌入的缓存。键(Key)是输入数据的哈希,值(Value)是生成的向量。
- 即时验证能力: 既然向量已经存入Redis,我们可以利用其内建的向量索引和搜索功能,在流水线内部就对新生成的向量进行一次快速的“冒烟测试”,比如检查某个基准点的最近邻是否符合预期。这能让错误暴露得更早。
我们的目标是构建一个流水线,它能智能地识别出哪些数据片段需要重新计算嵌入,而哪些可以直接从Redis中拉取,从而将空跑(无实质性变更)的流水线时间从小时级压缩到分钟级。
第一步:基线流水线——缓慢但有效
在优化之前,我们得先有一个能跑通的基础版本。这个版本非常直接:一个Tekton Task,接收一个包含原始文本数据的数据集,然后用一个Python脚本逐行处理,生成向量,最后将结果存为一个文件。
这是最初的Task
定义,它不包含任何缓存逻辑。
# tekton/tasks/generate-embeddings-v1.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: generate-embeddings-vocal
spec:
params:
- name: source-data-path
description: The path to the raw text data file within the workspace.
type: string
default: "data/raw_corpus.txt"
- name: output-embeddings-path
description: The path to store the generated embeddings file.
type: string
default: "output/embeddings.json"
- name: model-name
description: The sentence-transformer model to use.
type: string
default: "all-MiniLM-L6-v2"
workspaces:
- name: data
description: The workspace containing source data and for storing outputs.
steps:
- name: process-data
image: python:3.9-slim
workingDir: $(workspaces.data.path)
script: |
#!/usr/bin/env bash
set -e
echo "--> Installing dependencies..."
pip install sentence-transformers torch numpy
echo "--> Running embedding generation script..."
python3 <<'EOF'
import os
import json
import hashlib
from sentence_transformers import SentenceTransformer
# --- Configuration ---
SOURCE_PATH = "$(params.source-data-path)"
OUTPUT_PATH = "$(params.output-embeddings-path)"
MODEL_NAME = "$(params.model-name)"
print(f"Loading model: {MODEL_NAME}")
model = SentenceTransformer(MODEL_NAME)
embeddings_map = {}
print(f"Reading source data from: {SOURCE_PATH}")
if not os.path.exists(SOURCE_PATH):
print(f"Error: Source file not found at {SOURCE_PATH}")
exit(1)
with open(SOURCE_PATH, 'r', encoding='utf-8') as f:
lines = [line.strip() for line in f if line.strip()]
print(f"Found {len(lines)} lines to process.")
# --- Core Logic ---
# This is the slow part we want to optimize
embeddings = model.encode(lines, show_progress_bar=True)
for i, line in enumerate(lines):
# Use a hash of the content as a unique identifier
line_hash = hashlib.sha256(line.encode('utf-8')).hexdigest()
embeddings_map[line_hash] = {
"text": line,
"vector": embeddings[i].tolist()
}
print(f"Generated {len(embeddings_map)} embeddings.")
# --- Save Output ---
output_dir = os.path.dirname(OUTPUT_PATH)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(OUTPUT_PATH, 'w', encoding='utf-8') as f:
json.dump(embeddings_map, f, ensure_ascii=False, indent=2)
print(f"Successfully saved embeddings to {OUTPUT_PATH}")
EOF
这个Task可以工作,但每次运行PipelineRun
,哪怕raw_corpus.txt
文件一个字没改,model.encode
都会完整地执行一遍,这就是我们要解决的问题。
第二步:引入Redis作为高速缓存层
现在,我们改造这个Task,让它在计算之前先查询Redis。
架构设计:
graph TD A[Tekton PipelineRun Start] --> B{Task: generate-embeddings-cached}; B --> C[Python Script Starts]; C --> D{For each line in raw_corpus.txt}; D --> E[Calculate SHA256 Hash of line]; E --> F{Query Redis with Hash}; F -- Exists --> G[Append vector from Redis to results]; F -- Not Exists --> H[Compute vector using model.encode]; H --> I[Store vector in Redis with Hash as Key]; I --> J[Append new vector to results]; G --> K[Next line]; J --> K; K --> D; D -- All lines processed --> L[Save final embeddings file]; L --> M[Task Finish];
为了让Tekton Task能连接到Redis,我们需要通过Kubernetes Secret
来安全地传递连接信息。
假设我们已经创建了一个名为redis-connection
的Secret:
kubectl create secret generic redis-connection --from-literal=REDIS_URL='redis://:[email protected]:6379'
现在,这是我们的核心组件:带有缓存逻辑的Python脚本和对应的Tekton Task。
缓存感知的Python处理逻辑 (process_with_cache.py
):
这份代码更加健壮,包含了配置管理、日志、错误处理和与Redis的交互。
# scripts/process_with_cache.py
import os
import json
import hashlib
import logging
import sys
import numpy as np
import redis
from sentence_transformers import SentenceTransformer
# --- Standardized Logging ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
# --- Configuration Loading ---
class Config:
"""Load configuration from environment variables."""
try:
REDIS_URL = os.environ['REDIS_URL']
SOURCE_PATH = os.environ['SOURCE_DATA_PATH']
OUTPUT_PATH = os.environ['OUTPUT_EMBEDDINGS_PATH']
MODEL_NAME = os.environ.get('MODEL_NAME', 'all-MiniLM-L6-v2')
# A prefix to namespace keys in Redis, preventing collisions
REDIS_KEY_PREFIX = os.environ.get('REDIS_KEY_PREFIX', 'embeddings:v1')
except KeyError as e:
logging.error(f"Missing essential environment variable: {e}")
sys.exit(1)
def get_redis_client(redis_url: str) -> redis.Redis:
"""
Establish a connection to Redis and verify it.
Includes robust error handling for production environments.
"""
try:
client = redis.Redis.from_url(redis_url, decode_responses=False)
client.ping()
logging.info("Successfully connected to Redis.")
return client
except redis.exceptions.ConnectionError as e:
logging.error(f"Failed to connect to Redis at {redis_url}. Error: {e}")
sys.exit(1)
except Exception as e:
logging.error(f"An unexpected error occurred during Redis connection: {e}")
sys.exit(1)
def generate_cache_key(prefix: str, content: str) -> str:
"""Generate a consistent cache key."""
content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
return f"{prefix}:{content_hash}"
def main():
"""Main execution logic."""
config = Config()
redis_client = get_redis_client(config.REDIS_URL)
logging.info(f"Loading model: {config.MODEL_NAME}")
model = SentenceTransformer(config.MODEL_NAME)
embedding_dim = model.get_sentence_embedding_dimension()
logging.info(f"Reading source data from: {config.SOURCE_PATH}")
if not os.path.exists(config.SOURCE_PATH):
logging.error(f"Source file not found at {config.SOURCE_PATH}")
sys.exit(1)
with open(config.SOURCE_PATH, 'r', encoding='utf-8') as f:
lines = [line.strip() for line in f if line.strip()]
logging.info(f"Found {len(lines)} unique lines to process.")
final_embeddings = {}
lines_to_compute = []
cache_keys_to_compute = []
# --- Phase 1: Cache Checking ---
# We batch check Redis for better performance.
cache_keys = [generate_cache_key(config.REDIS_KEY_PREFIX, line) for line in lines]
cached_vectors_raw = redis_client.mget(cache_keys)
for i, raw_vector in enumerate(cached_vectors_raw):
line = lines[i]
cache_key = cache_keys[i]
if raw_vector:
# Vector found in cache
vector = np.frombuffer(raw_vector, dtype=np.float32).tolist()
final_embeddings[cache_key.split(':')[-1]] = {
"text": line,
"vector": vector
}
else:
# Vector not in cache, add to batch for computation
lines_to_compute.append(line)
cache_keys_to_compute.append(cache_key)
cache_hits = len(lines) - len(lines_to_compute)
logging.info(f"Cache lookup complete. Hits: {cache_hits}, Misses: {len(lines_to_compute)}")
# --- Phase 2: Compute for Cache Misses ---
if lines_to_compute:
logging.info(f"Computing embeddings for {len(lines_to_compute)} new items...")
new_embeddings = model.encode(lines_to_compute, show_progress_bar=True)
# --- Phase 3: Update Cache and Final Results ---
# Use a Redis pipeline for efficient bulk insertion
pipe = redis_client.pipeline()
for i, line in enumerate(lines_to_compute):
vector = new_embeddings[i]
cache_key = cache_keys_to_compute[i]
# Store vector in a compact binary format
pipe.set(cache_key, vector.astype(np.float32).tobytes())
final_embeddings[cache_key.split(':')[-1]] = {
"text": line,
"vector": vector.tolist()
}
try:
pipe.execute()
logging.info(f"Successfully stored {len(lines_to_compute)} new vectors in Redis cache.")
except redis.exceptions.RedisError as e:
logging.error(f"Failed to write to Redis cache: {e}")
# In a real project, you might want to decide if this is a fatal error.
# For now, we continue, but the cache won't be updated.
# --- Save Final Output ---
output_dir = os.path.dirname(config.OUTPUT_PATH)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(config.OUTPUT_PATH, 'w', encoding='utf-8') as f:
json.dump(final_embeddings, f, ensure_ascii=False, indent=2)
logging.info(f"Process complete. Total embeddings: {len(final_embeddings)}. Saved to {config.OUTPUT_PATH}")
if __name__ == "__main__":
main()
更新后的Tekton Task
(generate-embeddings-v2.yaml
):
# tekton/tasks/generate-embeddings-v2.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: generate-embeddings-cached
spec:
params:
- name: source-data-path
type: string
default: "data/raw_corpus.txt"
- name: output-embeddings-path
type: string
default: "output/embeddings.json"
- name: model-name
type: string
default: "all-MiniLM-L6-v2"
- name: python-script
type: string
description: The python script to execute.
workspaces:
- name: source
description: The workspace containing source data, scripts and for storing outputs.
steps:
- name: process-data-with-cache
image: python:3.9-slim
workingDir: $(workspaces.source.path)
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: redis-connection # The secret we created
key: REDIS_URL
- name: SOURCE_DATA_PATH
value: $(params.source-data-path)
- name: OUTPUT_EMBEDDINGS_PATH
value: $(params.output-embeddings-path)
- name: MODEL_NAME
value: $(params.model-name)
- name: REDIS_KEY_PREFIX
value: "embeddings:$(params.model-name)" # Namespace by model
script: |
#!/usr/bin/env bash
set -e
echo "--> Installing dependencies..."
pip install sentence-transformers torch numpy redis
echo "--> Running embedding generation script with cache..."
python3 $(params.python-script)
现在,当我们第一次运行流水线时,日志会显示:Cache lookup complete. Hits: 0, Misses: 10000
Computing embeddings for 10000 new items...
第二次运行时,如果数据没有变化,日志将是:Cache lookup complete. Hits: 10000, Misses: 0
Process complete. Total embeddings: 10000.
整个任务的执行时间从1小时骤降至不到1分钟(主要是Python环境安装和数据读取时间)。
第三步:在流水线中加入向量验证步骤
仅仅生成向量是不够的,我们还需要确保模型的变更没有引入意想不到的偏差。利用Redis Stack的向量搜索能力,我们可以增加一个Task,对新生成的向量进行快速验证。
验证逻辑:
- 我们预先在Redis中存入一组“黄金标准”数据(golden set),包含文本和其对应的、被验证过的向量。
- 创建一个向量索引。
- 在流水线中,当新的向量生成后,我们选取其中几个关键的测试样本。
- 用新生成的向量在Redis中执行KNN(K-Nearest Neighbors)搜索。
- 断言搜索返回的Top-1邻居是否是它自己。如果不是,或者相似度低于某个阈值,说明新模型可能产生了偏移,流水线应该失败。
验证任务的Python脚本 (validate_vectors.py
):
# scripts/validate_vectors.py
import os
import json
import logging
import sys
import numpy as np
import redis
from redis.commands.search.field import VectorField, TagField
from redis.commands.search.query import Query
# --- Logging setup as before ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stdout)
# --- Config ---
class Config:
REDIS_URL = os.environ['REDIS_URL']
# Path to the newly generated embeddings from previous step
INPUT_EMBEDDINGS_PATH = os.environ['INPUT_EMBEDDINGS_PATH']
# Path to a small file with text lines to test against
VALIDATION_SET_PATH = os.environ['VALIDATION_SET_PATH']
MODEL_NAME = os.environ.get('MODEL_NAME', 'all-MiniLM-L6-v2')
REDIS_INDEX_NAME = f"idx:embeddings-validation:{MODEL_NAME}"
SIMILARITY_THRESHOLD = float(os.environ.get('SIMILARITY_THRESHOLD', '0.99'))
def get_redis_client(redis_url: str):
# Same implementation as before
try:
client = redis.Redis.from_url(redis_url, decode_responses=False)
client.ping()
logging.info("Successfully connected to Redis for validation.")
return client
except Exception as e:
logging.error(f"Redis connection failed for validation: {e}")
sys.exit(1)
def main():
config = Config()
redis_client = get_redis_client(config.REDIS_URL)
rs = redis_client.ft(config.REDIS_INDEX_NAME)
# For this example, we assume the golden set is already indexed.
# In a real scenario, another Tekton task might be responsible for setting this up.
logging.info(f"Loading embeddings to validate from: {config.INPUT_EMBEDDINGS_PATH}")
with open(config.INPUT_EMBEDDINGS_PATH, 'r') as f:
embeddings_map = json.load(f)
logging.info(f"Loading validation set texts from: {config.VALIDATION_SET_PATH}")
with open(config.VALIDATION_SET_PATH, 'r') as f:
validation_texts = [line.strip() for line in f if line.strip()]
all_passed = True
for text in validation_texts:
text_hash = hashlib.sha256(text.encode('utf-8')).hexdigest()
if text_hash not in embeddings_map:
logging.warning(f"Validation text '{text[:30]}...' not found in generated embeddings. Skipping.")
continue
vector_to_check = np.array(embeddings_map[text_hash]['vector'], dtype=np.float32)
# Build a KNN query
q = (
Query("*=>[KNN 1 @vector $query_vec as score]")
.sort_by("score")
.return_fields("text", "score")
.dialect(2)
)
query_params = {"query_vec": vector_to_check.tobytes()}
try:
results = rs.search(q, query_params)
if not results.docs:
logging.error(f"Validation failed for '{text[:30]}...': No results returned from KNN search.")
all_passed = False
continue
top_result = results.docs[0]
# In a perfect world, score is 1. We allow a small tolerance.
similarity = (2 - float(top_result.score)) / 2 if hasattr(top_result, 'score') else 0
# The nearest neighbor should be the text itself
if top_result.text == text and similarity >= config.SIMILARITY_THRESHOLD:
logging.info(f"Validation PASSED for '{text[:30]}...' (Similarity: {similarity:.4f})")
else:
logging.error(
f"Validation FAILED for '{text[:30]}...'. "
f"Expected self, got '{top_result.text[:30]}...'. "
f"Similarity: {similarity:.4f} (Threshold: {config.SIMILARITY_THRESHOLD})"
)
all_passed = False
except Exception as e:
logging.error(f"Error during Redis search for validation: {e}")
all_passed = False
if not all_passed:
logging.error("One or more validation checks failed.")
sys.exit(1)
logging.info("All validation checks passed successfully.")
if __name__ == "__main__":
main()
最终的流水线 (Pipeline
定义)
现在我们将这两个Task
串联起来,形成一个完整的Pipeline
。
# tekton/pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: vector-embedding-ci
spec:
workspaces:
- name: shared-data
params:
- name: git-repo-url
type: string
- name: git-revision
type: string
default: "main"
tasks:
- name: fetch-source
taskRef:
name: git-clone
workspaces:
- name: output
workspace: shared-data
params:
- name: url
value: $(params.git-repo-url)
- name: revision
value: $(params.git-revision)
- name: generate-embeddings
taskRef:
name: generate-embeddings-cached
runAfter: ["fetch-source"]
workspaces:
- name: source
workspace: shared-data
params:
- name: source-data-path
value: "data/raw_corpus.txt"
- name: output-embeddings-path
value: "output/embeddings.json"
- name: python-script
value: "scripts/process_with_cache.py"
- name: validate-embeddings
taskRef:
name: validate-embeddings-task # Assume this task is defined similar to generate-embeddings-cached
runAfter: ["generate-embeddings"]
workspaces:
- name: source
workspace: shared-data
params:
- name: input-embeddings-path
value: "output/embeddings.json"
- name: validation-set-path
value: "data/validation_set.txt"
- name: python-script
value: "scripts/validate_vectors.py"
这个方案并非没有局限性。首先,Redis实例的稳定性和容量成为了整个CI流程的关键路径。如果Redis宕机或数据丢失,缓存失效会导致所有任务退化到全量计算模式。其次,我们的缓存键策略基于输入文本的哈希,这意味着它无法感知到模型文件本身的变更。一个更完善的方案应该将模型版本或哈希也作为缓存键的一部分。最后,对于超大规模数据集,一次性加载所有文本到内存中进行哈希计算可能成为瓶颈,未来可以迭代为流式处理,逐块与Redis进行交互,进一步优化内存占用。